Top-level package for Netatmo GeoPy.
auth
Authentication.
NetatmoConnect
NetatmoConnect.
Source code in netatmo_geopy/auth.py
class NetatmoConnect(object):
"""NetatmoConnect."""
_session = None
_token = None
def __init__(self, client_id, client_secret, username, password, *args, **kwargs):
super(NetatmoConnect, self).__init__(*args, **kwargs)
self.client_id = client_id
self.client_secret = client_secret
self.username = username
self.password = password
self.scope = "read_station"
def token_updater(self, token):
"""Token updater."""
self._token = token
@property
def session(self):
"""Session."""
if self._session is None:
self._session = RefreshOAuth2Session(
client=oauth2.LegacyApplicationClient(
client_id=self.client_id, scope=self.scope
),
token=self.token,
token_updater=self.token_updater,
auto_refresh_kwargs={
"token_url": settings.OAUTH2_TOKEN_URL,
"client_id": self.client_id,
"client_secret": self.client_secret,
"username": self.username,
"password": self.password,
},
)
return self._session
@property
def token(self):
"""Token."""
if self._token is None:
self._token = RefreshOAuth2Session(
client=oauth2.LegacyApplicationClient(
client_id=self.client_id, scope=self.scope
)
).fetch_token(
token_url=settings.OAUTH2_TOKEN_URL,
client_id=self.client_id,
client_secret=self.client_secret,
username=self.username,
password=self.password,
)
return self._token
session
property
readonly
Session.
token
property
readonly
Token.
token_updater(self, token)
Token updater.
Source code in netatmo_geopy/auth.py
def token_updater(self, token):
"""Token updater."""
self._token = token
RefreshOAuth2Session (OAuth2Session)
RefreshOAuth2Session.
Source code in netatmo_geopy/auth.py
class RefreshOAuth2Session(requests_oauthlib.OAuth2Session):
"""RefreshOAuth2Session."""
# see https://github.com/requests/requests-oauthlib/issues/260
def request(self, *args, **kwargs):
try:
return super().request(*args, **kwargs)
except oauth2.TokenExpiredError:
self.token = self.fetch_token(**self.auto_refresh_kwargs)
self.token_updater(self.token)
return super().request(*args, **kwargs)
request(self, *args, **kwargs)
Intercept all requests and add the OAuth 2 token if present.
Source code in netatmo_geopy/auth.py
def request(self, *args, **kwargs):
try:
return super().request(*args, **kwargs)
except oauth2.TokenExpiredError:
self.token = self.fetch_token(**self.auto_refresh_kwargs)
self.token_updater(self.token)
return super().request(*args, **kwargs)
cli
Console script for netatmo_geopy.
help()
Show CLI help.
Source code in netatmo_geopy/cli.py
def help():
"""Show CLI help."""
print("netatmo_geopy")
print("=" * len("netatmo_geopy"))
print("Pythonic package to access Netatmo CWS data")
main()
Main.
Source code in netatmo_geopy/cli.py
def main():
"""Main."""
fire.Fire({"help": help})
core
Main module.
CWSDataset
CWSDataset.
Source code in netatmo_geopy/core.py
class CWSDataset(object):
"""CWSDataset."""
def __init__(
self,
*,
ts_gdf_filepath=None,
snapshot_filepaths=None,
snapshot_data_dir=None,
snapshot_file_ext=None,
):
"""
Initialize a CWS dataset from recorded snapshot files.
Parameters
----------
ts_gdf_filepath : str, path or file-like object, optional
Path to the input time series geo-data frame file, passed to
`geopandas.read_file`.
snapshot_filepaths : list-like of str, path or file-like objects, optional
List of paths to the input snapshot recording files, passed to
`geopandas.read_file`. Ignored if `ts_gdf_filepath` is provided.
snapshot_data_dir : str or pathlib.Path object
Path to the directory where the snapshot recording files are located.
Ignored if `snapshot_filepaths` is provided.
snapshot_file_ext : str, optional
File extension of the snapshot recording, used to obtain the list of input
files in `snapshot_data_dir`. If None, the value from
`settings.SNAPSHOT_FILE_EXT` is used. Ignored if `snapshot_filepaths` is
provided.
"""
super(CWSDataset, self).__init__()
if ts_gdf_filepath is None:
if snapshot_filepaths is None:
if snapshot_file_ext is None:
snapshot_file_ext = settings.SNAPSHOT_FILE_EXT
snapshot_filepaths = glob.glob(
path.join(snapshot_data_dir, f"*.{snapshot_file_ext}")
)
# self.snapshot_filepaths = snapshot_filepaths
ts_gdf = _join_snapshot_gdfs(snapshot_filepaths)
else:
ts_gdf = gpd.read_file(ts_gdf_filepath).set_index("station_id")
self.ts_gdf = ts_gdf
def get_mislocated_stations(self):
"""
Get mislocated stations.
When multiple stations share the same location, it is likely due to an incorrect
set up that led to automatic location assignment based on the IP address of the
wireless network.
Returns
-------
mislocated_stations : `pandas.Series`
Boolean series indicating whether a station (index) is mislocated (indicated
by a value of `True`).
"""
# ACHTUNG: this approach to dropping geometry duplicates only works for point
# geometries - see https://github.com/geopandas/geopandas/issues/521
return (
self.ts_gdf["geometry"].apply(lambda geom: geom.wkb).duplicated(keep=False)
)
def get_outlier_stations(
self, *, low_alpha=None, high_alpha=None, station_outlier_threshold=None
):
"""
Get outlier stations.
Measurements can show suspicious deviations from a normal distribution (based on
a modified z-score using robust Qn variance estimators). Stations with high
proportion of such measurements can be related to radiative errors in non-shaded
areas or other measurement errors.
Parameters
----------
low_alpha, high_alpha : numeric, optional
Values for the lower and upper tail respectively (in proportion from 0 to 1)
that lead to the rejection of the null hypothesis (i.e., the corresponding
measurement does not follow a normal distribution can be considered an
outlier). If None, the respective values from `settings.OUTLIER_LOW_ALPHA`
and `settings.OUTLIER_HIGH_ALPHA` are used.
station_outlier_threshold : numeric, optonal
Maximum proportion (from 0 to 1) of outlier measurements after which the
respective station may be flagged as faulty. If None, the value from
`settings.STATION_OUTLIER_THRESHOLD` is used.
Returns
-------
outlier_stations : `pandas.Series`
Boolean series indicating whether a station (index) is considered an outlier
(indicated by a value of `True`).
"""
def z_score(x):
return (x - x.median()) / scale.qn_scale(x.dropna())
if low_alpha is None:
low_alpha = settings.OUTLIER_LOW_ALPHA
if high_alpha is None:
high_alpha = settings.OUTLIER_HIGH_ALPHA
if station_outlier_threshold is None:
station_outlier_threshold = settings.STATION_OUTLIER_THRESHOLD
ts_df = pd.DataFrame(self.ts_gdf.drop("geometry", axis=1).T)
nonnan_df = ~ts_df.isna()
low_z = norm.ppf(low_alpha)
high_z = norm.ppf(high_alpha)
outlier_df = (
~ts_df.apply(z_score, axis=1).apply(
lambda z: z.between(low_z, high_z, inclusive="neither"), axis=1
)
& nonnan_df
)
prop_outlier_ser = outlier_df.sum() / nonnan_df.sum()
return prop_outlier_ser > station_outlier_threshold
def get_indoor_stations(self, *, station_indoor_corr_threshold=None):
"""
Get indoor stations.
Stations whose time series of measurements show low correlations with the
spatial median time series are likely set up indoors.
Parameters
----------
station_indoor_corr_threshold : numeric, optonal
Stations showing Pearson correlations (with the overall station median
distribution) lower than this threshold are likely set up indoors. If None,
the value from `settings.STATION_INDOOR_CORR_THRESHOLD` is used.
Returns
-------
indoor_stations : `pandas.Series`
Boolean series indicating whether a station (index) is likely set up indoors
(indicated by a value of `True`).
"""
if station_indoor_corr_threshold is None:
station_indoor_corr_threshold = settings.STATION_INDOOR_CORR_THRESHOLD
ts_df = self.ts_gdf.drop("geometry", axis=1)
median_ser = ts_df.median()
corr_ser = ts_df.apply(
lambda station_ser: station_ser.corr(median_ser),
axis=1,
)
return corr_ser < station_indoor_corr_threshold
__init__(self, *, ts_gdf_filepath=None, snapshot_filepaths=None, snapshot_data_dir=None, snapshot_file_ext=None)
special
Initialize a CWS dataset from recorded snapshot files.
Parameters
ts_gdf_filepath : str, path or file-like object, optional
Path to the input time series geo-data frame file, passed to
geopandas.read_file
.
snapshot_filepaths : list-like of str, path or file-like objects, optional
List of paths to the input snapshot recording files, passed to
geopandas.read_file
. Ignored if ts_gdf_filepath
is provided.
snapshot_data_dir : str or pathlib.Path object
Path to the directory where the snapshot recording files are located.
Ignored if snapshot_filepaths
is provided.
snapshot_file_ext : str, optional
File extension of the snapshot recording, used to obtain the list of input
files in snapshot_data_dir
. If None, the value from
settings.SNAPSHOT_FILE_EXT
is used. Ignored if snapshot_filepaths
is
provided.
Source code in netatmo_geopy/core.py
def __init__(
self,
*,
ts_gdf_filepath=None,
snapshot_filepaths=None,
snapshot_data_dir=None,
snapshot_file_ext=None,
):
"""
Initialize a CWS dataset from recorded snapshot files.
Parameters
----------
ts_gdf_filepath : str, path or file-like object, optional
Path to the input time series geo-data frame file, passed to
`geopandas.read_file`.
snapshot_filepaths : list-like of str, path or file-like objects, optional
List of paths to the input snapshot recording files, passed to
`geopandas.read_file`. Ignored if `ts_gdf_filepath` is provided.
snapshot_data_dir : str or pathlib.Path object
Path to the directory where the snapshot recording files are located.
Ignored if `snapshot_filepaths` is provided.
snapshot_file_ext : str, optional
File extension of the snapshot recording, used to obtain the list of input
files in `snapshot_data_dir`. If None, the value from
`settings.SNAPSHOT_FILE_EXT` is used. Ignored if `snapshot_filepaths` is
provided.
"""
super(CWSDataset, self).__init__()
if ts_gdf_filepath is None:
if snapshot_filepaths is None:
if snapshot_file_ext is None:
snapshot_file_ext = settings.SNAPSHOT_FILE_EXT
snapshot_filepaths = glob.glob(
path.join(snapshot_data_dir, f"*.{snapshot_file_ext}")
)
# self.snapshot_filepaths = snapshot_filepaths
ts_gdf = _join_snapshot_gdfs(snapshot_filepaths)
else:
ts_gdf = gpd.read_file(ts_gdf_filepath).set_index("station_id")
self.ts_gdf = ts_gdf
get_indoor_stations(self, *, station_indoor_corr_threshold=None)
Get indoor stations.
Stations whose time series of measurements show low correlations with the spatial median time series are likely set up indoors.
Parameters
station_indoor_corr_threshold : numeric, optonal
Stations showing Pearson correlations (with the overall station median
distribution) lower than this threshold are likely set up indoors. If None,
the value from settings.STATION_INDOOR_CORR_THRESHOLD
is used.
Returns
indoor_stations : pandas.Series
Boolean series indicating whether a station (index) is likely set up indoors
(indicated by a value of True
).
Source code in netatmo_geopy/core.py
def get_indoor_stations(self, *, station_indoor_corr_threshold=None):
"""
Get indoor stations.
Stations whose time series of measurements show low correlations with the
spatial median time series are likely set up indoors.
Parameters
----------
station_indoor_corr_threshold : numeric, optonal
Stations showing Pearson correlations (with the overall station median
distribution) lower than this threshold are likely set up indoors. If None,
the value from `settings.STATION_INDOOR_CORR_THRESHOLD` is used.
Returns
-------
indoor_stations : `pandas.Series`
Boolean series indicating whether a station (index) is likely set up indoors
(indicated by a value of `True`).
"""
if station_indoor_corr_threshold is None:
station_indoor_corr_threshold = settings.STATION_INDOOR_CORR_THRESHOLD
ts_df = self.ts_gdf.drop("geometry", axis=1)
median_ser = ts_df.median()
corr_ser = ts_df.apply(
lambda station_ser: station_ser.corr(median_ser),
axis=1,
)
return corr_ser < station_indoor_corr_threshold
get_mislocated_stations(self)
Get mislocated stations.
When multiple stations share the same location, it is likely due to an incorrect set up that led to automatic location assignment based on the IP address of the wireless network.
Returns
mislocated_stations : pandas.Series
Boolean series indicating whether a station (index) is mislocated (indicated
by a value of True
).
Source code in netatmo_geopy/core.py
def get_mislocated_stations(self):
"""
Get mislocated stations.
When multiple stations share the same location, it is likely due to an incorrect
set up that led to automatic location assignment based on the IP address of the
wireless network.
Returns
-------
mislocated_stations : `pandas.Series`
Boolean series indicating whether a station (index) is mislocated (indicated
by a value of `True`).
"""
# ACHTUNG: this approach to dropping geometry duplicates only works for point
# geometries - see https://github.com/geopandas/geopandas/issues/521
return (
self.ts_gdf["geometry"].apply(lambda geom: geom.wkb).duplicated(keep=False)
)
get_outlier_stations(self, *, low_alpha=None, high_alpha=None, station_outlier_threshold=None)
Get outlier stations.
Measurements can show suspicious deviations from a normal distribution (based on a modified z-score using robust Qn variance estimators). Stations with high proportion of such measurements can be related to radiative errors in non-shaded areas or other measurement errors.
Parameters
low_alpha, high_alpha : numeric, optional
Values for the lower and upper tail respectively (in proportion from 0 to 1)
that lead to the rejection of the null hypothesis (i.e., the corresponding
measurement does not follow a normal distribution can be considered an
outlier). If None, the respective values from settings.OUTLIER_LOW_ALPHA
and settings.OUTLIER_HIGH_ALPHA
are used.
station_outlier_threshold : numeric, optonal
Maximum proportion (from 0 to 1) of outlier measurements after which the
respective station may be flagged as faulty. If None, the value from
settings.STATION_OUTLIER_THRESHOLD
is used.
Returns
outlier_stations : pandas.Series
Boolean series indicating whether a station (index) is considered an outlier
(indicated by a value of True
).
Source code in netatmo_geopy/core.py
def get_outlier_stations(
self, *, low_alpha=None, high_alpha=None, station_outlier_threshold=None
):
"""
Get outlier stations.
Measurements can show suspicious deviations from a normal distribution (based on
a modified z-score using robust Qn variance estimators). Stations with high
proportion of such measurements can be related to radiative errors in non-shaded
areas or other measurement errors.
Parameters
----------
low_alpha, high_alpha : numeric, optional
Values for the lower and upper tail respectively (in proportion from 0 to 1)
that lead to the rejection of the null hypothesis (i.e., the corresponding
measurement does not follow a normal distribution can be considered an
outlier). If None, the respective values from `settings.OUTLIER_LOW_ALPHA`
and `settings.OUTLIER_HIGH_ALPHA` are used.
station_outlier_threshold : numeric, optonal
Maximum proportion (from 0 to 1) of outlier measurements after which the
respective station may be flagged as faulty. If None, the value from
`settings.STATION_OUTLIER_THRESHOLD` is used.
Returns
-------
outlier_stations : `pandas.Series`
Boolean series indicating whether a station (index) is considered an outlier
(indicated by a value of `True`).
"""
def z_score(x):
return (x - x.median()) / scale.qn_scale(x.dropna())
if low_alpha is None:
low_alpha = settings.OUTLIER_LOW_ALPHA
if high_alpha is None:
high_alpha = settings.OUTLIER_HIGH_ALPHA
if station_outlier_threshold is None:
station_outlier_threshold = settings.STATION_OUTLIER_THRESHOLD
ts_df = pd.DataFrame(self.ts_gdf.drop("geometry", axis=1).T)
nonnan_df = ~ts_df.isna()
low_z = norm.ppf(low_alpha)
high_z = norm.ppf(high_alpha)
outlier_df = (
~ts_df.apply(z_score, axis=1).apply(
lambda z: z.between(low_z, high_z, inclusive="neither"), axis=1
)
& nonnan_df
)
prop_outlier_ser = outlier_df.sum() / nonnan_df.sum()
return prop_outlier_ser > station_outlier_threshold
CWSRecorder
CWSRecorder.
Source code in netatmo_geopy/core.py
class CWSRecorder(object):
"""CWSRecorder."""
def __init__(
self,
lon_sw,
lat_sw,
lon_ne,
lat_ne,
*,
dst_dir=None,
client_id=None,
client_secret=None,
username=None,
password=None,
time_unit=None,
interval=None,
at=None,
until=None,
datetime_format=None,
snapshot_file_ext=None,
save_responses=None,
save_responses_dir=None,
):
"""
Initialize a CWS recorder for a given region.
Parameters
----------
lon_sw, lat_sw, lon_ne, lat_ne : numeric
Latitude/longitude coordinates of the bounds of the region of interest
dst_dir : str or pathlib.Path object, optional
Path to the directory where the recorded snapshots are to be dumped. Only
used when the `dump_snapshot_gdf` method is called, ignored otherwise. If
None, the value from `settings.RECORDER_DST_DIR`.
client_id, client_secret, username, password : str, optional
Authentication credentials for Netatmo. If None, the respective values set
in the "NETATMO_CLIENT_ID", "NETATMO_CLIENT_SECRET", "NETATMO_USERNAME" and
"NETATMO_PASSWORD" environment variables are used.
time_unit : str {"second", "seconds", "minute", "minutes", "hour", "hours", \
"day", "days", "week", "weeks", "monday", "tuesday", "wednesday", \
"thursday", "friday", "saturday", "sunday"}, optional
Time unit. If None, no snaphots are taken periodically - snapshots are only
taken by manually calling `get_snapshot_gdf` or `dump_snapshot_gdf`.
interval : int, optional
Quantity of the time unit set in `time_unit`, altogether defining the
interval between snapshots. If None, the default value from the `schedule`
library, i.e., 1, is used. Ignored if `time_unit` is None.
at : str, optional
Time string defining the particular time when snapshots are taken. See also
https://schedule.readthedocs.io/en/stable/reference.html#schedule.Job.at.
Ignored if `time_unit` is None. The following formats are accepted:
* for daily jobs -> HH:MM:SS or HH:MM
* for hourly jobs -> MM:SS or :MM
* for minute jobs -> :SS.
until : datetime.datetime, datetime.timedelta, datetime.time or str, optional
Latest time (in the future) when a snapshot will be taken. If None, the
periodic snapshots are taken indefinetly. Ignored if `time_unit` is None.
The following formats are accepted:
* datetime.datetime
* datetime.timedelta
* datetime.time
* string in one of the following formats: "%Y-%m-%d %H:%M:%S",
"%Y-%m-%d %H:%M", "%Y-%m-%d", "%H:%M:%S", "%H:%M" as defined by
`datetime.strptime` behaviour.
datetime_format : str, optional
Datetime format string. Used to name the geo-data frame columns and the
snapshot file dumps. If None, the value from `settings.DATETIME_FORMAT` is
used.
snapshot_file_ext : str, optional
File extension used when dumping recorded snapshot files, which must match
an OGR vector format driver (see `fiona.supported_drivers`). If None, the
value from `settings.SNAPSHOT_FILE_EXT` is used.
save_responses : bool, optional
Whether the JSON responses from the Netatmo public data API calls are
stored. If None, the value from `settings.SAVE_RESPONSES` is used.
save_responses_dir : str or pathlib.Path object, optional.
Path to the directory where the JSON responses are to be stored. If None,
the value from `settings.SAVE_RESPONSES_DIR` is used. Ignored if
`save_responses` is False.
"""
super(CWSRecorder, self).__init__()
self.lon_sw = lon_sw
self.lat_sw = lat_sw
self.lon_ne = lon_ne
self.lat_ne = lat_ne
self.dst_dir = dst_dir
# auth
self._client_id = client_id
self._client_secret = client_secret
self._username = username
self._password = password
# IO
if datetime_format is None:
datetime_format = settings.DATETIME_FORMAT
self.datetime_format = datetime_format
if snapshot_file_ext is None:
snapshot_file_ext = settings.SNAPSHOT_FILE_EXT
self.snapshot_file_ext = snapshot_file_ext
if save_responses is None:
save_responses = settings.SAVE_RESPONSES
self.save_responses = save_responses
if save_responses_dir is None:
save_responses_dir = settings.SAVE_RESPONSES_DIR
self.save_responses_dir = save_responses_dir
# schedule
if time_unit:
if interval:
caller = schedule.every(interval)
else:
caller = schedule.every()
caller = getattr(caller, time_unit)
if at:
caller = caller.at(at)
if until:
caller = caller.until(until)
caller.do(self.dump_snapshot_gdf)
while schedule.get_jobs():
schedule.run_pending()
time.sleep(1)
def get_snapshot_gdf(self):
"""Get current CWS temperature snapshot."""
response_json = _get_public_data(
self.lon_sw,
self.lat_sw,
self.lon_ne,
self.lat_ne,
client_id=self._client_id,
client_secret=self._client_secret,
username=self._username,
password=self._password,
)
snapshot_gdf = _gdf_from_response_json(response_json, self.datetime_format)
if self.save_responses:
dst_response_filepath = path.join(
self.save_responses_dir, f"{_get_basename(snapshot_gdf)}.json"
)
with open(dst_response_filepath, "w") as dst:
json.dump(response_json, dst)
utils.log(f"Dumped response to file '{dst_response_filepath}'")
return snapshot_gdf
def dump_snapshot_gdf(self):
"""Get current CWS temperature snapshot and dump it to a file."""
snapshot_gdf = self.get_snapshot_gdf()
dst_filepath = path.join(
self.dst_dir, f"{_get_basename(snapshot_gdf)}.{self.snapshot_file_ext}"
)
snapshot_gdf.to_file(dst_filepath)
utils.log(f"Dumped snapshot geo-data frame to file '{dst_filepath}'")
__init__(self, lon_sw, lat_sw, lon_ne, lat_ne, *, dst_dir=None, client_id=None, client_secret=None, username=None, password=None, time_unit=None, interval=None, at=None, until=None, datetime_format=None, snapshot_file_ext=None, save_responses=None, save_responses_dir=None)
special
Initialize a CWS recorder for a given region.
Parameters
lon_sw, lat_sw, lon_ne, lat_ne : numeric
Latitude/longitude coordinates of the bounds of the region of interest
dst_dir : str or pathlib.Path object, optional
Path to the directory where the recorded snapshots are to be dumped. Only
used when the dump_snapshot_gdf
method is called, ignored otherwise. If
None, the value from settings.RECORDER_DST_DIR
.
client_id, client_secret, username, password : str, optional
Authentication credentials for Netatmo. If None, the respective values set
in the "NETATMO_CLIENT_ID", "NETATMO_CLIENT_SECRET", "NETATMO_USERNAME" and
"NETATMO_PASSWORD" environment variables are used.
time_unit : str {"second", "seconds", "minute", "minutes", "hour", "hours", "day", "days", "week", "weeks", "monday", "tuesday", "wednesday", "thursday", "friday", "saturday", "sunday"}, optional
Time unit. If None, no snaphots are taken periodically - snapshots are only
taken by manually calling get_snapshot_gdf
or dump_snapshot_gdf
.
interval : int, optional
Quantity of the time unit set in time_unit
, altogether defining the
interval between snapshots. If None, the default value from the schedule
library, i.e., 1, is used. Ignored if time_unit
is None.
at : str, optional
Time string defining the particular time when snapshots are taken. See also
https://schedule.readthedocs.io/en/stable/reference.html#schedule.Job.at.
Ignored if time_unit
is None. The following formats are accepted:
* for daily jobs -> HH:MM:SS or HH:MM
* for hourly jobs -> MM:SS or :MM
* for minute jobs -> :SS.
until : datetime.datetime, datetime.timedelta, datetime.time or str, optional
Latest time (in the future) when a snapshot will be taken. If None, the
periodic snapshots are taken indefinetly. Ignored if time_unit
is None.
The following formats are accepted:
* datetime.datetime
* datetime.timedelta
* datetime.time
* string in one of the following formats: "%Y-%m-%d %H:%M:%S",
"%Y-%m-%d %H:%M", "%Y-%m-%d", "%H:%M:%S", "%H:%M" as defined by
`datetime.strptime` behaviour.
datetime_format : str, optional
Datetime format string. Used to name the geo-data frame columns and the
snapshot file dumps. If None, the value from settings.DATETIME_FORMAT
is
used.
snapshot_file_ext : str, optional
File extension used when dumping recorded snapshot files, which must match
an OGR vector format driver (see fiona.supported_drivers
). If None, the
value from settings.SNAPSHOT_FILE_EXT
is used.
save_responses : bool, optional
Whether the JSON responses from the Netatmo public data API calls are
stored. If None, the value from settings.SAVE_RESPONSES
is used.
save_responses_dir : str or pathlib.Path object, optional.
Path to the directory where the JSON responses are to be stored. If None,
the value from settings.SAVE_RESPONSES_DIR
is used. Ignored if
save_responses
is False.
Source code in netatmo_geopy/core.py
def __init__(
self,
lon_sw,
lat_sw,
lon_ne,
lat_ne,
*,
dst_dir=None,
client_id=None,
client_secret=None,
username=None,
password=None,
time_unit=None,
interval=None,
at=None,
until=None,
datetime_format=None,
snapshot_file_ext=None,
save_responses=None,
save_responses_dir=None,
):
"""
Initialize a CWS recorder for a given region.
Parameters
----------
lon_sw, lat_sw, lon_ne, lat_ne : numeric
Latitude/longitude coordinates of the bounds of the region of interest
dst_dir : str or pathlib.Path object, optional
Path to the directory where the recorded snapshots are to be dumped. Only
used when the `dump_snapshot_gdf` method is called, ignored otherwise. If
None, the value from `settings.RECORDER_DST_DIR`.
client_id, client_secret, username, password : str, optional
Authentication credentials for Netatmo. If None, the respective values set
in the "NETATMO_CLIENT_ID", "NETATMO_CLIENT_SECRET", "NETATMO_USERNAME" and
"NETATMO_PASSWORD" environment variables are used.
time_unit : str {"second", "seconds", "minute", "minutes", "hour", "hours", \
"day", "days", "week", "weeks", "monday", "tuesday", "wednesday", \
"thursday", "friday", "saturday", "sunday"}, optional
Time unit. If None, no snaphots are taken periodically - snapshots are only
taken by manually calling `get_snapshot_gdf` or `dump_snapshot_gdf`.
interval : int, optional
Quantity of the time unit set in `time_unit`, altogether defining the
interval between snapshots. If None, the default value from the `schedule`
library, i.e., 1, is used. Ignored if `time_unit` is None.
at : str, optional
Time string defining the particular time when snapshots are taken. See also
https://schedule.readthedocs.io/en/stable/reference.html#schedule.Job.at.
Ignored if `time_unit` is None. The following formats are accepted:
* for daily jobs -> HH:MM:SS or HH:MM
* for hourly jobs -> MM:SS or :MM
* for minute jobs -> :SS.
until : datetime.datetime, datetime.timedelta, datetime.time or str, optional
Latest time (in the future) when a snapshot will be taken. If None, the
periodic snapshots are taken indefinetly. Ignored if `time_unit` is None.
The following formats are accepted:
* datetime.datetime
* datetime.timedelta
* datetime.time
* string in one of the following formats: "%Y-%m-%d %H:%M:%S",
"%Y-%m-%d %H:%M", "%Y-%m-%d", "%H:%M:%S", "%H:%M" as defined by
`datetime.strptime` behaviour.
datetime_format : str, optional
Datetime format string. Used to name the geo-data frame columns and the
snapshot file dumps. If None, the value from `settings.DATETIME_FORMAT` is
used.
snapshot_file_ext : str, optional
File extension used when dumping recorded snapshot files, which must match
an OGR vector format driver (see `fiona.supported_drivers`). If None, the
value from `settings.SNAPSHOT_FILE_EXT` is used.
save_responses : bool, optional
Whether the JSON responses from the Netatmo public data API calls are
stored. If None, the value from `settings.SAVE_RESPONSES` is used.
save_responses_dir : str or pathlib.Path object, optional.
Path to the directory where the JSON responses are to be stored. If None,
the value from `settings.SAVE_RESPONSES_DIR` is used. Ignored if
`save_responses` is False.
"""
super(CWSRecorder, self).__init__()
self.lon_sw = lon_sw
self.lat_sw = lat_sw
self.lon_ne = lon_ne
self.lat_ne = lat_ne
self.dst_dir = dst_dir
# auth
self._client_id = client_id
self._client_secret = client_secret
self._username = username
self._password = password
# IO
if datetime_format is None:
datetime_format = settings.DATETIME_FORMAT
self.datetime_format = datetime_format
if snapshot_file_ext is None:
snapshot_file_ext = settings.SNAPSHOT_FILE_EXT
self.snapshot_file_ext = snapshot_file_ext
if save_responses is None:
save_responses = settings.SAVE_RESPONSES
self.save_responses = save_responses
if save_responses_dir is None:
save_responses_dir = settings.SAVE_RESPONSES_DIR
self.save_responses_dir = save_responses_dir
# schedule
if time_unit:
if interval:
caller = schedule.every(interval)
else:
caller = schedule.every()
caller = getattr(caller, time_unit)
if at:
caller = caller.at(at)
if until:
caller = caller.until(until)
caller.do(self.dump_snapshot_gdf)
while schedule.get_jobs():
schedule.run_pending()
time.sleep(1)
dump_snapshot_gdf(self)
Get current CWS temperature snapshot and dump it to a file.
Source code in netatmo_geopy/core.py
def dump_snapshot_gdf(self):
"""Get current CWS temperature snapshot and dump it to a file."""
snapshot_gdf = self.get_snapshot_gdf()
dst_filepath = path.join(
self.dst_dir, f"{_get_basename(snapshot_gdf)}.{self.snapshot_file_ext}"
)
snapshot_gdf.to_file(dst_filepath)
utils.log(f"Dumped snapshot geo-data frame to file '{dst_filepath}'")
get_snapshot_gdf(self)
Get current CWS temperature snapshot.
Source code in netatmo_geopy/core.py
def get_snapshot_gdf(self):
"""Get current CWS temperature snapshot."""
response_json = _get_public_data(
self.lon_sw,
self.lat_sw,
self.lon_ne,
self.lat_ne,
client_id=self._client_id,
client_secret=self._client_secret,
username=self._username,
password=self._password,
)
snapshot_gdf = _gdf_from_response_json(response_json, self.datetime_format)
if self.save_responses:
dst_response_filepath = path.join(
self.save_responses_dir, f"{_get_basename(snapshot_gdf)}.json"
)
with open(dst_response_filepath, "w") as dst:
json.dump(response_json, dst)
utils.log(f"Dumped response to file '{dst_response_filepath}'")
return snapshot_gdf
plot_snapshot(snapshot_gdf, *, snapshot_column=None, ax=None, cmap=None, legend=None, legend_position=None, legend_size=None, legend_pad=None, title=None, add_basemap=None, attribution=None, subplot_kws=None, plot_kws=None, set_title_kws=None, add_basemap_kws=None, append_axes_kws=None)
Plot a snapshot of station measurements.
Parameters
snapshot_gdf : geopandas.GeoDataFrame
Geo-data frame of CWS temperature measurements.
snapshot_column : str, optional
Column of CWS temperature measurements to plot. If None, the first column (other
than geometry
) is used.
ax : matplotlib.axes.Axes
instancd, optional
Plot in given axis. If None creates a new figure.
cmap : str or matplotlib.colors.Colormap
instance, optional
Colormap of the plot. If None, the value from settings.PLOT_CMAP
is used.
legend : bool, optional
Whether a legend should be added to the plot. If None, the value from
settings.PLOT_LEGEND
is used.
legend_position : str {"left", "right", "bottom", "top"}, optional
Position of the legend axes, passed to
mpl_toolkits.axes_grid1.axes_divider.AxesDivider.append_axes
. If None, the
value from settings.PLOT_LEGEND_POSITION
is used.
legend_size : numeric or str, optional
Size of the legend axes, passed to
mpl_toolkits.axes_grid1.axes_divider.AxesDivider.append_axes
. If None, the
value from settings.PLOT_LEGEND_SIZE
is used.
legend_pad : numeric or str, optional
Padding between the plot and legend axes, passed to
mpl_toolkits.axes_grid1.axes_divider.AxesDivider.append_axes
. If None, the
value from settings.PLOT_LEGEND_PAD
is used.
title : bool or str, optional
Whether a title should be added to the plot. If True, the timestamp of the
snapshot (geo-data frame column) is used. It is also possible to pass a string
so that it is used as title label (instead of the timestamp). If None, the value
from settings.PLOT_TITLE
is used.
add_basemap : bool, optional
Whether a basemap should be added to the plot using contextily.add_basemap
. If
None, the value from settings.PLOT_ADD_BASEMAP
is used.
attribution : str or bool, optional
Attribution text for the basemap source, added to the bottom of the plot, passed
to contextily.add_basemap
. If False, no attribution is added. If None, the
value from settings.PLOT_ATTRIBUTION
is used.
subplot_kws, plot_kws, set_title_kws, add_basemap_kws, append_axes_kws : dict, optional
Keyword arguments passed to matplotlib.pyplot.subplots
,
geopandas.GeoDataFrame.plot
, matplotlib.axes.Axes.set_title
,
contextily.add_basemap
and
mpl_toolkits.axes_grid1.axes_divider.AxesDivider.append_axes
respectively.
Returns
ax : matplotlib.axes.Axes
Axes with the plot drawn onto it.
Source code in netatmo_geopy/core.py
def plot_snapshot( # noqa: C901
snapshot_gdf,
*,
snapshot_column=None,
ax=None,
cmap=None,
legend=None,
legend_position=None,
legend_size=None,
legend_pad=None,
title=None,
add_basemap=None,
attribution=None,
subplot_kws=None,
plot_kws=None,
set_title_kws=None,
add_basemap_kws=None,
append_axes_kws=None,
):
"""
Plot a snapshot of station measurements.
Parameters
----------
snapshot_gdf : geopandas.GeoDataFrame
Geo-data frame of CWS temperature measurements.
snapshot_column : str, optional
Column of CWS temperature measurements to plot. If None, the first column (other
than `geometry`) is used.
ax : `matplotlib.axes.Axes` instancd, optional
Plot in given axis. If None creates a new figure.
cmap : str or `matplotlib.colors.Colormap` instance, optional
Colormap of the plot. If None, the value from `settings.PLOT_CMAP` is used.
legend : bool, optional
Whether a legend should be added to the plot. If None, the value from
`settings.PLOT_LEGEND` is used.
legend_position : str {"left", "right", "bottom", "top"}, optional
Position of the legend axes, passed to
`mpl_toolkits.axes_grid1.axes_divider.AxesDivider.append_axes`. If None, the
value from `settings.PLOT_LEGEND_POSITION` is used.
legend_size : numeric or str, optional
Size of the legend axes, passed to
`mpl_toolkits.axes_grid1.axes_divider.AxesDivider.append_axes`. If None, the
value from `settings.PLOT_LEGEND_SIZE` is used.
legend_pad : numeric or str, optional
Padding between the plot and legend axes, passed to
`mpl_toolkits.axes_grid1.axes_divider.AxesDivider.append_axes`. If None, the
value from `settings.PLOT_LEGEND_PAD` is used.
title : bool or str, optional
Whether a title should be added to the plot. If True, the timestamp of the
snapshot (geo-data frame column) is used. It is also possible to pass a string
so that it is used as title label (instead of the timestamp). If None, the value
from `settings.PLOT_TITLE` is used.
add_basemap : bool, optional
Whether a basemap should be added to the plot using `contextily.add_basemap`. If
None, the value from `settings.PLOT_ADD_BASEMAP` is used.
attribution : str or bool, optional
Attribution text for the basemap source, added to the bottom of the plot, passed
to `contextily.add_basemap`. If False, no attribution is added. If None, the
value from `settings.PLOT_ATTRIBUTION` is used.
subplot_kws, plot_kws, set_title_kws, add_basemap_kws, append_axes_kws : dict, \
optional
Keyword arguments passed to `matplotlib.pyplot.subplots`,
`geopandas.GeoDataFrame.plot`, `matplotlib.axes.Axes.set_title`,
`contextily.add_basemap` and
`mpl_toolkits.axes_grid1.axes_divider.AxesDivider.append_axes` respectively.
Returns
-------
ax : `matplotlib.axes.Axes`
Axes with the plot drawn onto it.
"""
# if no column is provided, we plot the "first" column other than "geometry"
if snapshot_column is None:
snapshot_column = snapshot_gdf.columns.drop("geometry")[0]
# subplots arguments
if ax is None:
if subplot_kws is None:
subplot_kws = {}
fig, ax = plt.subplots(**subplot_kws)
# plot arguments
if plot_kws is None:
_plot_kws = {}
else:
_plot_kws = plot_kws.copy()
# _plot_kws = {key: plot_kws[key] for key in plot_kws}
if cmap is None:
cmap = _plot_kws.pop("cmap", settings.PLOT_CMAP)
if legend is None:
legend = _plot_kws.pop("legend", settings.PLOT_LEGEND)
# plot
if legend:
divider = make_axes_locatable(ax)
if legend_position is None:
legend_position = settings.PLOT_LEGEND_POSITION
if legend_size is None:
legend_size = settings.PLOT_LEGEND_SIZE
if append_axes_kws is None:
_append_axes_kws = {}
else:
_append_axes_kws = append_axes_kws.copy()
if legend_pad is None:
legend_pad = _append_axes_kws.pop("pad", settings.PLOT_LEGEND_PAD)
_plot_kws["cax"] = divider.append_axes(
legend_position, legend_size, pad=legend_pad, **_append_axes_kws
)
snapshot_gdf.plot(
column=snapshot_column, cmap=cmap, ax=ax, legend=legend, **_plot_kws
)
if title is None:
title = settings.PLOT_TITLE
if title:
if title is True:
title_label = snapshot_column
elif isinstance(title, str):
title_label = title
if set_title_kws is None:
set_title_kws = {}
ax.set_title(title_label, **set_title_kws)
# basemap
if add_basemap is None:
add_basemap = settings.PLOT_ADD_BASEMAP
if add_basemap:
# raise ImportError(
# "The contextily package is required for adding basemaps. "
# "You can install it using 'conda install -c conda-forge contextily' or "
# "'pip install contextily'."
# )
# add_basemap arguments
if add_basemap_kws is None:
_add_basemap_kws = {}
else:
_add_basemap_kws = add_basemap_kws.copy()
# _add_basemap_kws = {key: add_basemap_kws[key] for key in add_basemap_kws}
if attribution is None:
attribution = _add_basemap_kws.pop("attribution", settings.PLOT_ATTRIBUTION)
# add basemap
cx.add_basemap(
ax=ax,
crs=snapshot_gdf.crs,
attribution=attribution,
**_add_basemap_kws,
)
return ax
settings
Settings.
utils
Utils (mostly from osmnx.utils
).
log(message, level=None, name=None, filename=None)
Write a message to the logger.
This logs to file and/or prints to the console (terminal), depending on
the current configuration of settings.LOG_FILE
and settings.LOG_CONSOLE
.
Parameters
message : str The message to log. level : int One of Python's logger.level constants. name : str Name of the logger. filename : str Name of the log file, without file extension.
Source code in netatmo_geopy/utils.py
def log(message, level=None, name=None, filename=None):
"""
Write a message to the logger.
This logs to file and/or prints to the console (terminal), depending on
the current configuration of `settings.LOG_FILE` and `settings.LOG_CONSOLE`.
Parameters
----------
message : str
The message to log.
level : int
One of Python's logger.level constants.
name : str
Name of the logger.
filename : str
Name of the log file, without file extension.
"""
if level is None:
level = settings.LOG_LEVEL
if name is None:
name = settings.LOG_NAME
if filename is None:
filename = settings.LOG_FILENAME
# if logging to file is turned on
if settings.LOG_FILE:
# get the current logger (or create a new one, if none), then log
# message at requested level
logger = _get_logger(level=level, name=name, filename=filename)
if level == lg.DEBUG:
logger.debug(message)
elif level == lg.INFO:
logger.info(message)
elif level == lg.WARNING:
logger.warning(message)
elif level == lg.ERROR:
logger.error(message)
# if logging to console (terminal window) is turned on
if settings.LOG_CONSOLE:
# prepend timestamp
message = f"{ts()} {message}"
# convert to ascii so it doesn't break windows terminals
message = (
unicodedata.normalize("NFKD", str(message))
.encode("ascii", errors="replace")
.decode()
)
# print explicitly to terminal in case jupyter notebook is the stdout
if getattr(sys.stdout, "_original_stdstream_copy", None) is not None:
# redirect captured pipe back to original
os.dup2(sys.stdout._original_stdstream_copy, sys.__stdout__.fileno())
sys.stdout._original_stdstream_copy = None
with redirect_stdout(sys.__stdout__):
print(message, file=sys.__stdout__, flush=True)
ts(style='datetime', template=None)
Get current timestamp as str.
Parameters
style : str {"datetime", "date", "time"} Format the timestamp with this built-in template. template : str If not None, format the timestamp with this template instead of one of the built-in styles.
Returns
ts : str The string timestamp.
Source code in netatmo_geopy/utils.py
def ts(style="datetime", template=None):
"""
Get current timestamp as str.
Parameters
----------
style : str {"datetime", "date", "time"}
Format the timestamp with this built-in template.
template : str
If not None, format the timestamp with this template instead of one of the
built-in styles.
Returns
-------
ts : str
The string timestamp.
"""
if template is None:
if style == "datetime":
template = "{:%Y-%m-%d %H:%M:%S}"
elif style == "date":
template = "{:%Y-%m-%d}"
elif style == "time":
template = "{:%H:%M:%S}"
else: # pragma: no cover
raise ValueError(f'unrecognized timestamp style "{style}"')
ts = template.format(dt.datetime.now())
return ts