"""Module holding the parameterization scripts for the weather generator"""
from __future__ import division
import os
import os.path as osp
import tempfile
import datetime as dt
import six
from functools import partial
from collections import namedtuple
import inspect
import subprocess as spr
from itertools import product, chain
import pandas as pd
import numpy as np
import calendar
import gwgen.utils as utils
from gwgen.utils import docstrings
from psyplot.compat.pycompat import OrderedDict, filterfalse
try:
from pandas.tseries.offsets import MonthEnd
except ImportError:
from pandas.datetools import MonthEnd
def _requirement_property(requirement):
def get_x(self):
return self._requirements[requirement]
return property(
get_x, doc=requirement + " parameterization instance")
[docs]class Parameterizer(utils.TaskBase):
"""Base class for parameterization tasks"""
#: The registered parameterization classes (are set up automatically when
#: subclassing this class)
_registry = []
#: A mapping from the keys in the namelist that are modified by this
#: parameterization task to the key as it is used in the task information
namelist_keys = {}
#: A mapping from the keys in the namelist that are modified by this
#: parameterization task to the error as stored in the configuration
error_keys = {}
#: A mapping from the keys in the namelist that are modified by this
#: parameterization task to the key as it is used in the task_config
#: attribute
task_config_keys = {}
@property
def task_data_dir(self):
"""The directory where to store data"""
return self.param_dir
# reimplement make_run_config because of additional full_nml parameter
@docstrings.get_sectionsf('Parameterizer.make_run_config')
[docs] def make_run_config(self, sp, info, full_nml):
"""
Configure the experiment
Parameters
----------
%(TaskBase.make_run_config.parameters)s
full_nml: dict
The dictionary with all the namelists"""
nml = full_nml.setdefault('weathergen_ctl', OrderedDict())
for key in ['rsquared', 'slope', 'intercept']:
info[key] = float(sp.plotters[0].plot_data[1].attrs[key])
nml['g_scale_coeff'] = float(
sp.plotters[0].plot_data[1].attrs['slope'])
nml.update(self._gp_nml)
info.update(self._gp_info)
@classmethod
[docs] def get_task_for_nml_key(cls, nml_key):
try:
return next(para_cls for para_cls in cls._registry[::-1]
if nml_key in para_cls.namelist_keys)
except StopIteration:
raise KeyError("Unknown namelist key %s" % (nml_key, ))
[docs] def get_error(self, nml_key):
key, d = utils.go_through_dict(
self.error_keys[nml_key],
self.config['parameterization'][self.name])
return d[key]
@classmethod
[docs] def get_config_key(cls, nml_key):
return cls.task_config_keys.get(nml_key)
[docs]class DailyGHCNData(Parameterizer):
"""The parameterizer that reads in the daily data"""
name = 'day'
_datafile = "ghcn_daily.csv"
dbname = 'ghcn_daily'
summary = 'Read in the daily GHCN data'
http_source = (
'ftp://ftp.ncdc.noaa.gov/pub/data/ghcn/daily/ghcnd_all.tar.gz')
http_single = (
'ftp://ftp.ncdc.noaa.gov/pub/data/ghcn/daily/all/{}')
@property
def default_config(self):
return default_daily_ghcn_config()._replace(
**super(DailyGHCNData, self).default_config._asdict())
@property
def data_dir(self):
return osp.join(super(DailyGHCNData, self).data_dir,
'ghcn', 'ghcnd_all')
@property
def raw_src_files(self):
return list(map(lambda s: osp.join(self.data_dir, s + '.dly'),
self.stations))
flags = ['tmax_m', 'prcp_s', 'tmax_q', 'prcp_m', 'tmin_m', 'tmax_s',
'tmin_s', 'prcp_q', 'tmin_q']
@property
def sql_dtypes(self):
import sqlalchemy
ret = super(DailyGHCNData, self).sql_dtypes
flags = self.flags
ret.update({flag: sqlalchemy.CHAR(length=1) for flag in flags})
return ret
[docs] def setup_from_file(self, *args, **kwargs):
kwargs['index_col'] = ['id', 'year', 'month', 'day']
kwargs['dtype'] = {
'prcp': np.float64,
'prcp_m': str,
'prcp_q': str,
'prcp_s': str,
'tmax': np.float64,
'tmax_m': str,
'tmax_q': str,
'tmax_s': str,
'tmin': np.float64,
'tmin_m': str,
'tmin_q': str,
'tmin_s': str}
super(DailyGHCNData, self).setup_from_file(*args, **kwargs)
for flag in self.flags:
self.data[flag].fillna('', inplace=True)
[docs] def setup_from_db(self, *args, **kwargs):
kwargs['index_col'] = ['id', 'year', 'month', 'day']
super(DailyGHCNData, self).setup_from_db(*args, **kwargs)
for flag in self.flags:
self.data[flag] = self.data[flag].str.strip()
[docs] def setup_from_scratch(self):
from gwgen.parseghcnrow import read_ghcn_file
logger = self.logger
stations = self.stations
logger.debug('Reading daily ghcn data for %s stations', len(stations))
src_dir = self.data_dir
logger.debug(' Data source: %s', src_dir)
files = list(map(lambda s: osp.join(src_dir, s + '.dly'), stations))
self.data = pd.concat(
list(map(read_ghcn_file, files)), copy=False).set_index(
['id', 'year', 'month', 'day'])
self.logger.debug('Done.')
@classmethod
def _modify_parser(cls, parser):
parser.setup_args(default_daily_ghcn_config)
parser, setup_grp, run_grp = super(DailyGHCNData, cls)._modify_parser(
parser)
parser.update_arg('download', short='d', group=setup_grp,
choices=['single', 'all'])
return parser, setup_grp, run_grp
[docs] def init_from_scratch(self):
"""Reimplemented to download the data if not existent"""
logger = self.logger
logger.debug('Initializing %s', self.name)
stations = self.stations
logger.debug('Reading data for %s stations', len(stations))
src_dir = self.data_dir
logger.debug(' Expected data source: %s', src_dir)
files = self.raw_src_files
missing = list(filterfalse(osp.exists, files))
if missing:
download = self.task_config.download
msg = '%i Required files are not existent in %s' % (
len(missing), src_dir)
if download is None:
raise ValueError(
msg + (". Set download to 'single' or 'all' to download "
"the missing data!"))
elif download == 'single':
logger.debug(msg)
for f in missing:
utils.download_file(
self.http_single.format(osp.basename(f)), f)
else:
import tarfile
logger.debug(msg)
tarfname = self.project_config.get('ghcn_src', src_dir +
'.tar.gz')
if not osp.exists(tarfname):
if download is None:
if six.PY2:
raise IOError(msg)
else:
raise FileNotFoundError(msg)
else:
logger.debug(' Downloading rawdata from %s',
self.http_source)
if not osp.exists(osp.dirname(tarfname)):
os.makedirs(osp.dirname(tarfname))
utils.download_file(self.http_source, tarfname)
self.project_config['ghcn_download'] = dt.datetime.now()
self.project_config['ghcn_src'] = tarfname
taro = tarfile.open(tarfname, 'r|gz')
logger.debug(' Extracting to %s', osp.dirname(src_dir))
taro.extractall(osp.dirname(src_dir))
_DailyGHCNConfig = namedtuple('_DailyGHCNConfig', ['download'])
_DailyGHCNConfig = utils.append_doc(
_DailyGHCNConfig, docstrings.get_sections("""
Parameters
----------
download: { 'single' | 'all' | None }
What to do if a stations file is missing. The default is ``None`` which
raises an Error.
Otherwise, if ``'single'``, download the missing file from %s. If ``'all'``
the entire tarball is downloaded from %s
""" % (DailyGHCNData.http_single, DailyGHCNData.http_source),
'_DailyGHCNConfig'))
DailyGHCNConfig = utils.enhanced_config(_DailyGHCNConfig, 'DailyGHCNConfig')
@docstrings.dedent
[docs]def default_daily_ghcn_config(
download=None, *args, **kwargs):
"""
The default configuration for :class:`DailyGHCNData` instances.
See also the :attr:`DailyGHCNData.default_config` attribute
Parameters
----------
%(DailyGHCNConfig.parameters)s"""
return DailyGHCNConfig(download, *utils.default_config(*args, **kwargs))
[docs]class MonthlyGHCNData(Parameterizer):
"""The parameterizer that calculates the monthly summaries from the daily
data"""
name = 'month'
setup_requires = ['day']
_datafile = "ghcn_monthly.csv"
dbname = 'ghcn_monthly'
summary = "Calculate monthly means from the daily GHCN data"
@property
def sql_dtypes(self):
import sqlalchemy
ret = super(MonthlyGHCNData, self).sql_dtypes
for col in set(self.data.columns).difference(ret):
if 'complete' in col:
ret[col] = sqlalchemy.BOOLEAN
else:
ret[col] = sqlalchemy.REAL
return ret
[docs] def setup_from_file(self, *args, **kwargs):
kwargs['index_col'] = ['id', 'year', 'month']
return super(MonthlyGHCNData, self).setup_from_file(*args, **kwargs)
[docs] def setup_from_db(self, *args, **kwargs):
kwargs['index_col'] = ['id', 'year', 'month']
return super(MonthlyGHCNData, self).setup_from_db(*args, **kwargs)
@staticmethod
[docs] def monthly_summary(df):
n = calendar.monthrange(df.index[0][1], df.index[0][2])[1]
return pd.DataFrame.from_dict(OrderedDict([
('tmin', [df.tmin.mean()]), ('tmax', [df.tmax.mean()]),
('trange', [(df.tmax - df.tmin).mean()]),
('prcp', [df.prcp.sum()]), ('tmin_abs', [df.tmin.min()]),
('tmax_abs', [df.tmax.max()]), ('prcpmax', [df.prcp.max()]),
('tmin_complete', [df.tmin.count() == n]),
('tmax_complete', [df.tmax.count() == n]),
('prcp_complete', [df.prcp.count() == n]),
('wet_day', [df.prcp[df.prcp.notnull() & (df.prcp > 0)].size])]))
[docs] def setup_from_scratch(self):
def year_complete(series):
"""Check whether the data for the given is complete"""
return series.astype(int).sum() == 12
data = self.day.data.groupby(level=['id', 'year', 'month']).apply(
self.monthly_summary)
data.index = data.index.droplevel(-1)
complete_cols = [col for col in data.columns
if col.endswith('complete')]
df_yearly = data[complete_cols].groupby(level=['id', 'year']).agg(
year_complete)
names = data.index.names
data = data.reset_index().merge(
df_yearly[complete_cols].reset_index(), on=['id', 'year'],
suffixes=['', '_year']).set_index(names)
self.data = data
[docs]class CompleteMonthlyGHCNData(MonthlyGHCNData):
"""The parameterizer that calculates the monthly summaries from the daily
data"""
name = 'cmonth'
setup_requires = ['month']
_datafile = "complete_ghcn_monthly.csv"
dbname = 'complete_ghcn_monthly'
summary = "Extract the complete months from the monthly data"
[docs] def setup_from_scratch(self):
all_months = self.month.data
self.data = all_months[all_months.prcp_complete &
all_months.tmin_complete &
all_months.tmax_complete]
[docs]class YearlyCompleteMonthlyGHCNData(CompleteMonthlyGHCNData):
"""The parameterizer that calculates the monthly summaries from the daily
data"""
name = 'yearly_cmonth'
setup_requires = ['month']
_datafile = "yearly_complete_ghcn_monthly.csv"
dbname = 'yearly_complete_ghcn_monthly'
summary = (
"Extract the complete months from the monthly data in complete years")
[docs] def setup_from_scratch(self):
all_months = self.month.data
self.data = all_months[all_months.prcp_complete_year &
all_months.tmin_complete_year &
all_months.tmax_complete_year]
[docs]class CompleteDailyGHCNData(DailyGHCNData):
"""The parameterizer that calculates the days in complete months"""
name = 'cday'
setup_requires = ['day', 'month']
_datafile = "complete_ghcn_daily.csv"
dbname = 'complete_ghcn_daily'
summary = "Get the days of the complete months"
@property
def default_config(self):
return super(DailyGHCNData, self).default_config
@classmethod
def _modify_parser(cls, parser):
return super(DailyGHCNData, cls)._modify_parser(parser)
[docs] def init_from_scratch(self):
pass
[docs] def setup_from_scratch(self):
monthly = self.month.data
self.data = self.day.data.reset_index().merge(
monthly[monthly.prcp_complete &
monthly.tmin_complete &
monthly.tmax_complete][[]].reset_index(),
how='inner', on=['id', 'year', 'month'], copy=False).set_index(
['id', 'year', 'month', 'day'])
[docs]class YearlyCompleteDailyGHCNData(CompleteDailyGHCNData):
"""The parameterizer that calculates the days in complete months"""
name = 'yearly_cday'
setup_requires = ['day', 'month']
_datafile = "yearly_complete_ghcn_daily.csv"
dbname = 'yearly_complete_ghcn_daily'
summary = "Get the days of the complete months in complete years"
[docs] def setup_from_scratch(self):
monthly = self.month.data
self.data = self.day.data.reset_index().merge(
monthly[monthly.prcp_complete_year &
monthly.tmin_complete_year &
monthly.tmax_complete_year][[]].reset_index(),
how='inner', on=['id', 'year', 'month'], copy=False).set_index(
['id', 'year', 'month', 'day'])
_PrcpConfig = namedtuple('_PrcpConfig', ['thresh', 'threshs2compute'])
_PrcpConfig = utils.append_doc(_PrcpConfig, docstrings.get_sections("""
Parameters
----------
thresh: float
The threshold to use for the configuration
threshs2compute: list of floats
The thresholds to compute during the setup of the data
""", '_PrcpConfig'))
PrcpConfig = utils.enhanced_config(_PrcpConfig, 'PrcpConfig')
@docstrings.dedent
[docs]def default_prcp_config(
thresh=5., threshs2compute=[5, 7.5, 10, 12.5, 15, 17.5, 20],
*args, **kwargs):
"""
The default configuration for :class:`PrcpDistParams` instances.
See also the :attr:`PrcpDistParams.default_config` attribute
Parameters
----------
%(PrcpConfig.parameters)s"""
return PrcpConfig(thresh, threshs2compute,
*utils.default_config(*args, **kwargs))
[docs]class PrcpDistParams(Parameterizer):
"""The parameterizer to calculate the precipitation distribution parameters
"""
name = 'prcp'
setup_requires = ['cday']
_datafile = "prcp_dist_parameters.csv"
dbname = 'prcp_dist_params'
summary = ('Calculate the precipitation distribution parameters of the '
'hybrid Gamma-GP')
namelist_keys = {'thresh': 'thresh', 'g_scale_coeff': 'slope',
'gp_shape': 'gpshape'}
error_keys = {'gp_shape': 'gpshape_std'}
task_config_keys = {'thresh': 'thresh'}
has_run = True
#: default formatoptions for the
#: :class:`psy_reg.plotters.DensityRegPlotter` plotter
fmt = kwargs = dict(
legend={'loc': 'upper left'},
cmap='w_Reds',
precision=0.1,
xlabel='{desc}',
ylabel='{desc}',
xrange=(0, ['rounded', 95]),
yrange=(0, ['rounded', 95]),
fix=0,
legendlabels=['$\\theta$ = %(slope)1.4f * $\\bar{{p}}_d$, '
'$R^2$ = %(rsquared)1.3f'],
bounds=['minmax', 11, 0, 99],
cbar='',
bins=10,
)
@property
def default_config(self):
return default_prcp_config()._replace(
**super(PrcpDistParams, self).default_config._asdict())
@property
def sql_dtypes(self):
import sqlalchemy
ret = super(PrcpDistParams, self).sql_dtypes
for flag in {'gshape', 'pscale', 'gscale', 'pscale_orig', 'mean_wet',
'thresh', 'pshape'}:
ret[flag] = sqlalchemy.REAL
for flag in {'n', 'ngamma', 'ngp'}:
ret[flag] = sqlalchemy.BIGINT
return ret
@classmethod
def _modify_parser(cls, parser):
parser.setup_args(default_prcp_config)
parser, setup_grp, run_grp = super(PrcpDistParams, cls)._modify_parser(
parser)
parser.update_arg('thresh', short='t', group=run_grp)
parser.append2help('thresh', '. Default: %(default)s')
parser.update_arg('threshs2compute', short='t2c', group=setup_grp,
type=float, nargs='+', metavar='float')
return parser, setup_grp, run_grp
[docs] def setup_from_file(self, *args, **kwargs):
kwargs['index_col'] = ['id', 'month', 'thresh']
return super(PrcpDistParams, self).setup_from_file(*args, **kwargs)
[docs] def setup_from_db(self, *args, **kwargs):
kwargs['index_col'] = ['id', 'month', 'thresh']
return super(PrcpDistParams, self).setup_from_db(*args, **kwargs)
@property
def filtered_data(self):
"""Return the data that only belongs to the specified threshold of the
task"""
sl = (slice(None), slice(None), self.task_config.thresh)
return self.data.sort_index().loc[sl, :]
[docs] def prcp_dist_params(
self, df, threshs=np.array([5, 7.5, 10, 12.5, 15, 17.5, 20])):
from scipy import stats
vals = df.prcp.values[~np.isnan(df.prcp.values)]
N = len(threshs)
n = len(vals) * N
vals = vals[vals > 0]
ngamma = len(vals)
ngp = [np.nan] * N
gshape = np.nan
gscale = np.nan
pshape = [np.nan] * N
pscale = [np.nan] * N
pscale_orig = [np.nan] * N
if ngamma > 10:
# fit the gamma curve. We fix the (unnecessary) location parameter
# to improve the result (see
# http://stackoverflow.com/questions/16963415/why-does-the-gamma-distribution-in-scipy-have-three-parameters)
try:
gshape, _, gscale = stats.gamma.fit(vals, floc=0)
except:
self.logger.critical(
'Error while calculating GP parameters for %s!',
', '.join(str(df.index.get_level_values(n).unique())
for n in df.index.names),
exc_info=True)
tmp = tempfile.NamedTemporaryFile(suffix='.csv').name
df.to_csv(tmp)
self.logger.critical('Data stored in %s', tmp)
else:
for i, thresh in enumerate(threshs):
arr = vals[vals >= thresh]
ngp[i] = len(arr)
if ngp[i] > 10:
try:
pshape[i], _, pscale_orig[i] = stats.genpareto.fit(
arr, floc=thresh)
except:
self.logger.critical(
'Error while calculating GP parameters for '
'%s!', ', '.join(
str(df.index.get_level_values(n).unique())
for n in df.index.names),
exc_info=True)
tmp = tempfile.NamedTemporaryFile(
suffix='.csv').name
df.to_csv(tmp)
self.logger.critical('Data stored in %s', tmp)
else:
# find the crossover point where the gamma and
# pareto distributions should match this follows
# Neykov et al. (Nat. Hazards Earth Syst. Sci.,
# 14, 2321-2335, 2014) bottom of page 2330 (left
# column)
pscale[i] = (1 - stats.gamma.cdf(
thresh, gshape, scale=gscale))/stats.gamma.pdf(
thresh, gshape, scale=gscale)
return pd.DataFrame.from_dict(OrderedDict([
('n', np.repeat(n, N)), ('ngamma', np.repeat(ngamma, N)),
('mean_wet', np.repeat(vals.mean(), N)),
('ngp', ngp), ('thresh', threshs),
('gshape', np.repeat(gshape, N)),
('gscale', np.repeat(gscale, N)), ('pshape', pshape),
('pscale', pscale),
('pscale_orig', pscale_orig)])).set_index('thresh')
[docs] def setup_from_scratch(self):
self.logger.debug('Calculating precipitation parameters.')
df = self.cday.data
threshs = np.array(self.task_config.threshs2compute)
if self.task_config.thresh not in threshs:
threshs = np.append(threshs, self.task_config.thresh)
func = partial(self.prcp_dist_params, threshs=threshs)
self.data = df.groupby(level=['id', 'month']).apply(func)
self.logger.debug('Done.')
@property
def ds(self):
"""The dataset of the :attr:`data` dataframe"""
import xarray as xr
data = self.filtered_data.set_index('mean_wet')
ds = xr.Dataset.from_dataframe(data)
ds.mean_wet.attrs['long_name'] = 'Mean precip. on wet days'
ds.mean_wet.attrs['units'] = 'mm'
ds.gscale.attrs['long_name'] = 'Gamma scale parameter'
ds.gscale.attrs['units'] = 'mm'
return ds
[docs] def create_project(self, ds):
"""Make the gamma shape - number of wet days plot
Parameters
----------
%(TaskBase.create_project.parameters)s
"""
import seaborn as sns
import psyplot.project as psy
sns.set_style("white")
return psy.plot.densityreg(ds, name='gscale', fmt=self.fmt)
[docs] def make_run_config(self, sp, info, full_nml):
"""
Configure the experiment with information on gamma scale and GP shape
Parameters
----------
%(Parameterizer.make_run_config.parameters)s"""
nml = full_nml.setdefault('weathergen_ctl', OrderedDict())
for key in ['rsquared', 'slope', 'intercept']:
info[key] = float(sp.plotters[0].plot_data[1].attrs[key])
nml['g_scale_coeff'] = float(
sp.plotters[0].plot_data[1].attrs['slope'])
nml['thresh'] = self.task_config.thresh
nml.update(self._gp_nml)
info.update(self._gp_info)
@docstrings.dedent
[docs] def plot_additionals(self, pdf=None):
"""
Plot the histogram of GP shape
Parameters
----------
%(TaskBase.plot_additionals.parameters)s
"""
import seaborn as sns
import matplotlib.pyplot as plt
sns.set_style('darkgrid')
df = self.filtered_data
fig = plt.figure(figsize=(12, 2.5))
fig.subplots_adjust(hspace=0)
ax2 = plt.subplot2grid((4, 2), (0, 1))
ax3 = plt.subplot2grid((4, 2), (1, 1), rowspan=3, sharex=ax2)
pshape = df.pshape[df.pshape.notnull() & (df.ngp > 100)]
sns.boxplot(pshape, ax=ax2, whis=[1, 99], showmeans=True,
meanline=True)
sns.distplot(pshape, hist=True, kde=True, ax=ax3)
ax3.set_xlim(*np.percentile(pshape, [0.1, 99.9]))
ax3.set_xlabel('Generalized Pareto Shape parameter')
ax3.set_ylabel('Counts')
median = float(np.round(pshape.median(), 4))
mean = float(np.round(pshape.mean(), 4))
std = float(np.round(pshape.std(), 4))
median_line = next(
l for l in ax2.lines if np.all(
np.round(l.get_xdata(), 4) == median))
mean_line = next(
l for l in ax2.lines if np.all(np.round(l.get_xdata(), 4) == mean))
ax3.legend(
(median_line, mean_line),
('median = %1.4f' % median, 'mean = %1.4f' % mean), loc='center',
bbox_to_anchor=[0.7, 0.2], bbox_transform=ax3.transAxes)
if pdf:
pdf.savefig(fig, bbox_inches='tight')
else:
plt.savefig(self.pdf_file, bbox_inches='tight')
self._gp_nml = dict(gp_shape=mean)
self._gp_info = dict(gpshape_mean=mean, gpshape_median=median,
gpshape_std=std)
[docs]class MarkovChain(Parameterizer):
"""The parameterizer to calculate the Markov Chain parameters"""
name = 'markov'
setup_requires = ['cday']
_datafile = 'markov_chain_parameters.csv'
dbname = 'markov'
summary = "Calculate the markov chain parameterization"
has_run = True
namelist_keys = ['p101_1', 'p001_1', 'p001_2', 'p11_1', 'p11_2', 'p101_2']
fmt = kwargs = dict(
legend={'loc': 'upper left'},
cmap='w_Reds',
ylabel='%(long_name)s',
xlim=(0, 1),
ylim=(0, 1),
fix=0,
bins=10,
bounds=['minmax', 11, 0, 99],
cbar='',
ci=None,
legendlabels=['$%(symbol)s$ = %(slope)1.4f * %(xname)s, '
'$R^2$ = %(rsquared)1.3f'],
)
@property
def sql_dtypes(self):
import sqlalchemy
ret = super(MarkovChain, self).sql_dtypes
for flag in {'p11', 'p001', 'wetf', 'p101', 'p01'}:
ret[flag] = sqlalchemy.REAL
return ret
[docs] def setup_from_file(self, *args, **kwargs):
kwargs['index_col'] = ['id', 'month']
return super(MarkovChain, self).setup_from_file(*args, **kwargs)
[docs] def setup_from_db(self, *args, **kwargs):
kwargs['index_col'] = ['id', 'month']
return super(MarkovChain, self).setup_from_db(*args, **kwargs)
@classmethod
[docs] def calc_ndays(cls, df):
if not len(df):
return pd.DataFrame([[np.nan] * 9], columns=[
'n', 'nwet', 'ndry', 'np11', 'np01', 'np001', 'np001_denom',
'np101', 'np101_denom'], dtype=int)
vals = df.prcp.values
n = len(vals)
nwet = (vals[:-1] > 0.0).sum() #: number of wet days
ndry = (vals[:-1] == 0.0).sum() #: number of dry days
# ---------------------------------------------------
# PWW = Prob(Wet then Wet) = P11
# = wetwet / wetwet + wetdry
# = wetwet / nwet
np11 = ((vals[:-1] > 0.0) & (vals[1:] > 0.0)).sum()
# ---------------------------------------------------
# PWD = Prob(Dry then Wet) = P01
# = drywet / drywet + drydry
# = wetwet / ndry
np01 = ((vals[:-1] == 0.0) & (vals[1:] > 0.0)).sum()
# ---------------------------------------------------
# PWDD = Prob(Dry Dry Wet) = P001
# = drydryWET / (drydryWET + drydryDRY)
np001 = ((vals[:-2] == 0.0) & (vals[1:-1] == 0.0) &
(vals[2:] > 0.0)).sum()
np001_denom = np001 + ((vals[:-2] == 0.0) & (vals[1:-1] == 0.0) &
(vals[2:] == 0.0)).sum()
# ---------------------------------------------------
# PWDW = Prob(Wet Dry Wet) = P101
# = wetdryWET / (wetdryWET + wetdryDRY)
np101 = ((vals[:-2] > 0.0) & (vals[1:-1] == 0.0) &
(vals[2:] > 0.0)).sum()
np101_denom = np101 + ((vals[:-2] > 0.0) & (vals[1:-1] == 0.0) &
(vals[2:] == 0.0)).sum()
return pd.DataFrame(
[[n, nwet, ndry, np11, np01, np001, np001_denom, np101,
np101_denom]],
columns=['n', 'nwet', 'ndry', 'np11', 'np01', 'np001',
'np001_denom', 'np101', 'np101_denom'])
@classmethod
[docs] def calculate_probabilities(cls, df):
"""Calculate the transition probabilities for one month across multiple
years"""
# we group here for each month because we do not want to treat each
# month separately
g = df.groupby(level=['year', 'month'])
if g.ngroups > 10:
dfs = g.apply(cls.calc_ndays).sum()
return pd.DataFrame.from_dict(OrderedDict([
('p11', [dfs.np11 / dfs.nwet if dfs.nwet > 0 else 0]),
('p01', [dfs.np01 / dfs.ndry if dfs.ndry > 0 else 0]),
('p001', [dfs.np001 / dfs.np001_denom
if dfs.np001_denom > 0 else 0]),
('p101', [dfs.np101 / dfs.np101_denom
if dfs.np101_denom > 0 else 0]),
('wetf', [dfs.nwet / dfs.n if dfs.n > 0 else 0])]))
else:
return pd.DataFrame.from_dict(
{'p11': [], 'p01': [], 'p001': [], 'p101': [], 'wetf': []})
[docs] def setup_from_scratch(self):
self.logger.debug('Calculating markov chain parameters')
df = self.cday.data
data = df.groupby(level=['id', 'month']).apply(
self.calculate_probabilities)
data.index = data.index.droplevel(-1)
self.data = data
self.logger.debug('Done.')
@property
def ds(self):
"""The dataset of the :attr:`data` DataFrame"""
import xarray as xr
ds = xr.Dataset.from_dataframe(self.data.set_index('wetf'))
ds.wetf.attrs['long_name'] = 'Fraction of wet days'
ds.p11.attrs['long_name'] = 'Prob. Wet then Wet'
ds.p101.attrs['long_name'] = 'Prob. Wet then Dry then Wet'
ds.p001.attrs['long_name'] = 'Prob. Dry then Dry then Wet'
ds.p11.attrs['symbol'] = 'p_{11}'
ds.p101.attrs['symbol'] = 'p_{101}'
ds.p001.attrs['symbol'] = 'p_{001}'
return ds
@docstrings.dedent
[docs] def create_project(self, ds):
"""
Create the project of the plots of the transition probabilities
Parameters
----------
%(TaskBase.create_project.parameters)s"""
import matplotlib.pyplot as plt
import seaborn as sns
import psyplot.project as psy
sns.set_style('white')
fig, axes = plt.subplots(3, 1, figsize=(10, 12))
axes = axes.ravel()
sp = psy.plot.densityreg(
ds, name=['p11', 'p101', 'p001'], fmt=self.fmt, ax=axes.ravel(),
share='xlim')
sp(name='p11').update(
fix=[(1., 1.)], legendlabels=[
'$%(symbol)s$ = %(intercept)1.4f + '
'%(slope)1.4f * %(xname)s, $R^2$ = %(rsquared)1.3f'])
sp(ax=axes[-1]).update(xlabel='%(long_name)s')
return sp
@docstrings.dedent
[docs] def make_run_config(self, sp, info, full_nml):
"""
Configure the experiment with the MarkovChain relationships
Parameters
----------
%(Parameterizer.make_run_config.parameters)s"""
nml = full_nml.setdefault('weathergen_ctl', OrderedDict())
for plotter in sp.plotters:
d = info[plotter.data.name] = OrderedDict()
for key in ['rsquared', 'slope', 'intercept']:
d[key] = float(plotter.plot_data[1].attrs[key])
for plotter in sp.plotters:
name = plotter.data.name
nml[name + '_1'] = float(plotter.plot_data[1].attrs.get(
'intercept', 0))
nml[name + '_2'] = float(plotter.plot_data[1].attrs.get('slope'))
_TempConfig = namedtuple('_TempConfig',
['cutoff',
'tmin_range1', 'tmin_range2',
'tmax_range1', 'tmax_range2'])
_TempConfig = utils.append_doc(_TempConfig, docstrings.get_sections("""
Parameters
----------
cutoff: int
The minimum number of values that is required for fitting the standard
deviation
tmin_range1: list of floats with length 2
The ranges ``[vmin, vmax]`` to use for the extrapolation of minimum
temperatures standard deviation below 0. The fit will be used for all
points below the given ``vmax``
tmin_range2: list of floats with length 2
The ranges ``[vmin, vmax]`` to use for the extrapolation of minimum
temperatures standard deviation above 0. The fit will be used for all
points above the given ``vmin``
tmax_range1: list of floats with length 2
The ranges ``[vmin, vmax]`` to use for the extrapolation of maximum
temperatures standard deviation below 0. The fit will be used for all
points below the given ``vmax``
tmax_range2: list of floats with length 2
The ranges ``[vmin, vmax]`` to use for the extrapolation of maximum
temperatures standard deviation above 0. The fit will be used for all
points above the given ``vmin``
""", '_TempConfig'))
TempConfig = utils.enhanced_config(_TempConfig, 'TempConfig')
@docstrings.dedent
[docs]def default_temp_config(
cutoff=10,
tmin_range1=[-50, -40], tmin_range2=[25, 30],
tmax_range1=[-40, -30], tmax_range2=[35, 45],
*args, **kwargs):
"""
The default configuration for :class:`TemperatureParameterizer` instances.
See also the :attr:`PrcpDistParams.default_config` attribute
Parameters
----------
%(TempConfig.parameters)s"""
return TempConfig(cutoff, tmin_range1, tmin_range2, tmax_range1,
tmax_range2, *utils.default_config(*args, **kwargs))
def _range_label(vmin, vmax):
if vmin is None:
return '$%%(symbol)s \\leq %1.4g ^\\circ C$' % vmax
elif vmax is None:
return '$%1.4g ^\\circ C < %%(symbol)s$' % vmin
else:
return '$%1.4g ^\\circ C < %%(symbol)s \\leq %1.4g ^\\circ C$' % (
vmin, vmax)
[docs]class TemperatureParameterizer(Parameterizer):
"""Parameterizer to correlate the monthly mean and standard deviation on
wet and dry days with the montly mean"""
name = 'temp'
summary = 'Temperature mean correlations'
setup_requires = ['cday']
_datafile = ['temperature_full.csv', 'temperature.csv']
dbname = ['temperature_full', 'temperature']
has_run = True
namelist_keys = {
'tmin_w1': 'tmin_wet.intercept',
'tmin_w2': 'tmin_wet.slope',
'tmin_d1': 'tmin_dry.intercept',
'tmin_d2': 'tmin_dry.slope',
'tmin_sd_w': 'tminstddev_wet.coeffs',
'tmin_sd_d': 'tminstddev_dry.coeffs',
'tmin_sd_breaks': 'tminstddev.breaks',
'tmax_w1': 'tmax_wet.intercept',
'tmax_w2': 'tmax_wet.slope',
'tmax_d1': 'tmax_dry.intercept',
'tmax_d2': 'tmax_dry.slope',
'tmax_sd_w': 'tmaxstddev_wet.coeffs',
'tmax_sd_d': 'tmaxstddev_dry.coeffs',
'tmax_sd_breaks': 'tmaxstddev.breaks',
}
fmt = dict(
cmap='w_Reds',
precision=0.1,
xrange=(['rounded', 5], ['rounded', 95]),
yrange=(['rounded', 5], ['rounded', 95]),
bounds=['minmax', 11, 0, 99],
cbar='',
bins=10,
legend={'loc': 'upper left'},
legendlabels=[
'$%(symbol)s$ = %(intercept)1.4f + %(slope)1.4f * $%(xsymbol)s$'],
xlabel='on %(state)s days',
)
sd_hist_fmt = dict(
cmap='w_Reds',
precision=0.1,
xrange=(['rounded', 5], ['rounded', 95]),
yrange=(0, ['rounded', 95]),
bounds=['minmax', 11, 0, 99],
cbar='',
bins=10,
xlabel='on %(state)s days'
)
sd_fit_fmt = dict(
legend={'loc': 'upper left'},
xlabel='on %(state)s days',
)
@classmethod
def _modify_parser(cls, parser):
parser.setup_args(default_temp_config)
parser, setup_grp, run_grp = super(
TemperatureParameterizer, cls)._modify_parser(parser)
for arg in ['tmin_range1', 'tmin_range2', 'tmax_range1',
'tmax_range2']:
parser.update_arg(arg, nargs=2)
parser.append2help(arg, '. Default: %(default)s')
parser.append2help('cutoff', '. Default: %(default)s')
return parser, setup_grp, run_grp
@property
def sql_dtypes(self):
def get_names(df):
if df is not None:
return chain(df.columns, df.index.names)
return []
import sqlalchemy
ret = super(TemperatureParameterizer, self).sql_dtypes
for col in set(chain.from_iterable(map(
get_names, self.data))).difference(ret):
ret[col] = [sqlalchemy.REAL, sqlalchemy.REAL]
return ret
@property
def default_config(self):
return default_temp_config()._replace(
**super(TemperatureParameterizer, self).default_config._asdict())
@property
def ds(self):
"""The dataframe of this parameterization task converted to a dataset
"""
import xarray as xr
# full data for std
df = self.data[0]
cols = [col for col in df.columns if (
col.startswith('tmin') or col.startswith('tmax'))]
ds = xr.Dataset.from_dataframe(df[cols].reset_index())
temp_bins = np.arange(-100., 100., 0.1)
ds['temp_bins'] = xr.Variable('temp_bins',
(temp_bins[1:] + temp_bins[:-1]) * 0.5)
for v, t, state in product(['tmin', 'tmax'], ['stddev', ''],
['', 'wet', 'dry']):
vname = v + t + (('_' + state) if state else '')
varo = ds.variables[vname]
what = v[1:]
label = 'std. dev.' if t else 'mean'
varo.attrs['long_name'] = '%s of %s. temperature' % (label, what)
varo.attrs['units'] = 'degC'
varo.attrs['symbol'] = 't_\mathrm{{%s%s%s}}' % (
what, ', sd' if t else '', (', ' + state) if state else '')
varo.attrs['state'] = state or 'all'
# calculate the source for the bars
cutoff = self.task_config.cutoff
if state and t:
g = df.groupby(pd.cut(df[v + '_' + state], temp_bins))
df_v = v + t + '_' + state #: The variable name in df
counts = g[df_v].count().values
ds[vname + '_counts'] = xr.Variable(
('temp_bins', ), counts, attrs=varo.attrs.copy())
means = g[df_v].mean().values
means[counts <= cutoff] = np.nan
ds[vname + '_mean'] = xr.Variable(
('temp_bins', ), means, attrs=varo.attrs.copy())
std = g[df_v].std().values
std[counts <= cutoff] = np.nan
ds[vname + '_sd'] = xr.Variable(
('temp_bins', ), std, attrs=varo.attrs.copy())
# means
df = self.data[1]
for v, t, state in product(['tmin', 'tmax'], ['stddev', ''],
['', 'wet', 'dry']):
vname = v + t + (('_' + state) if state else '')
ds_vname = vname + '_means'
varo = ds[ds_vname] = xr.Variable(
('index_mean', ), np.asarray(df[vname]),
attrs=ds[vname].attrs.copy())
return ds
@staticmethod
[docs] def calc_monthly_props(df):
"""
Calculate the statistics for one single month in one year
"""
prcp_vals = df.prcp.values
wet = prcp_vals > 0.0
dry = prcp_vals == 0
arr_tmin = df.tmin.values
arr_tmax = df.tmax.values
arr_tmin_wet = arr_tmin[wet]
arr_tmin_dry = arr_tmin[dry]
arr_tmax_wet = arr_tmax[wet]
arr_tmax_dry = arr_tmax[dry]
# prcp values
d = {
# wet values
'tmin_wet': arr_tmin_wet.mean(),
'tmax_wet': arr_tmax_wet.mean(),
'tminstddev_wet': arr_tmin_wet.std(),
'tmaxstddev_wet': arr_tmax_wet.std(),
'trange_wet': (arr_tmax_wet - arr_tmin_wet).mean(),
'trangestddev_wet': (arr_tmax_wet - arr_tmin_wet).std(),
# dry values
'tmin_dry': arr_tmin_dry.mean(),
'tmax_dry': arr_tmax_dry.mean(),
'tminstddev_dry': arr_tmin_dry.std(),
'tmaxstddev_dry': arr_tmax_dry.std(),
'trange_dry': (arr_tmax_dry - arr_tmin_dry).mean(),
'trangestddev_dry': (arr_tmax_dry - arr_tmin_dry).std(),
# general mean
'tmin': arr_tmin.mean(),
'tmax': arr_tmax.mean(),
'tminstddev': arr_tmin.std(),
'tmaxstddev': arr_tmax.std(),
'trange': (arr_tmin - arr_tmax).mean(),
'trangestddev': (arr_tmin - arr_tmax).std(),
't': ((arr_tmin + arr_tmax) * 0.5).mean(),
'tstddev': ((arr_tmin + arr_tmax) * 0.5).std()}
d['prcp_wet'] = am = prcp_vals[wet].mean() # arithmetic mean
gm = np.exp(np.log(prcp_vals[wet]).mean()) # geometric mean
fields = am != gm
d['alpha'] = (
0.5000876 / np.log(am[fields] / gm[fields]) + 0.16488552 -
0.0544274 * np.log(am[fields] / gm[fields]))
d['beta'] = am / d['alpha']
return pd.DataFrame.from_dict(d)
@classmethod
[docs] def calculate_probabilities(cls, df):
"""Calculate the statistics for one month across multiple years"""
# we group here for each month because we do not want to treat each
# month separately
g = df.groupby(level=['year'])
return g.apply(cls.calc_monthly_props).mean()
[docs] def setup_from_file(self, *args, **kwargs):
kwargs['index_col'] = [['id', 'year', 'month'], ['id', 'month']]
return super(TemperatureParameterizer, self).setup_from_file(
*args, **kwargs)
[docs] def setup_from_db(self, *args, **kwargs):
kwargs['index_col'] = [['id', 'year', 'month'], ['id', 'month']]
return super(TemperatureParameterizer, self).setup_from_db(
*args, **kwargs)
[docs] def setup_from_scratch(self):
self.data = [None, None]
self.data[0] = self.cday.data.groupby(
level=['id', 'year', 'month']).apply(self.calc_monthly_props)
if len(self.data[0]):
self.data[0].index = self.data[0].index.droplevel(-1)
self.data[1] = self.data[0].groupby(level=['id', 'month']).mean()
[docs] def create_project(self, ds):
"""
Create the plots of the wet/dry - mean relationships
Parameters
----------
%(TaskBase.create_project)s"""
import matplotlib.pyplot as plt
import seaborn as sns
import psyplot.project as psy
sns.set_style('white')
axes = np.concatenate([
plt.subplots(1, 2, figsize=(12, 4))[1] for _ in range(4)])
for fig in set(ax.get_figure() for ax in axes):
fig.subplots_adjust(bottom=0.25)
middle = (
axes[0].get_position().x0 + axes[1].get_position().x1) / 2.
axes = iter(axes)
variables = ['tmin', 'tmax']
# mean fits
for v in variables:
psy.plot.densityreg(
ds, name=v + '_wet_means', ax=next(axes),
coord=v + '_means',
ylabel='%(long_name)s [$^\circ$C]\non %(state)s days',
text=[(middle, 0.03, '%(xlong_name)s [$^\circ$C]', 'fig', dict(
weight='bold', ha='center'))], fmt=self.fmt)
psy.plot.densityreg(
ds, name=v + '_dry_means', ax=next(axes),
coord=v + '_means',
ylabel='on %(state)s days', fmt=self.fmt)
# sd fits
for v in variables:
ax = next(axes)
base = v + 'stddev'
# ---- wet days----
# density plot
psy.plot.density(
ds, name=base + '_wet', ax=ax,
coord=v + '_wet',
ylabel='%(long_name)s [$^\circ$C]\non %(state)s days',
text=[(middle, 0.03, '%(xlong_name)s [$^\circ$C]', 'fig', dict(
weight='bold', ha='center'))], fmt=self.sd_hist_fmt)
# bars
psy.plot.barplot(ds, name=base + '_wet_mean', ax=ax,
color='k', alpha=0.5, widths='data')
limits = ax.get_xlim()
r1 = getattr(self.task_config, v + '_range1')
r2 = getattr(self.task_config, v + '_range2')
psy.plot.linreg(ds, name=base + '_wet_mean', ax=ax,
temp_bins=[
slice(*r1), # extrapolation < 0
slice(None, 0.0), # polynomial < 0
slice(0.0, None), # polynomial > 0
slice(*r2), # extrapolation > 0
],
line_xlim=[
[limits[0], r1[1]],
[r1[1], 0.],
[0., r2[0]],
[r2[0], limits[1]],
],
fit=['poly1', 'poly5', 'poly5', 'poly1'],
legendlabels=[
_range_label(None, r1[1]),
_range_label(r1[1], 0.0),
_range_label(0.0, r2[0]),
_range_label(r2[0], None),
],
fmt=self.sd_fit_fmt, method='sel')
psy.gcp(True)(ax=ax).share(keys=['xlim', 'ylim'])
# ---- dry days ----
ax = next(axes)
psy.plot.density(
ds, name=base + '_dry', ax=ax,
coord=v + '_dry',
ylabel='%(long_name)s [$^\circ$C]\non %(state)s days',
text=[(middle, 0.03, '%(xlong_name)s [$^\circ$C]', 'fig', dict(
weight='bold', ha='center'))], fmt=self.sd_hist_fmt)
# bars
psy.plot.barplot(ds, name=base + '_dry_mean', ax=ax,
color='k', alpha=0.5, widths='data')
limits = ax.get_xlim()
psy.plot.linreg(ds, name=base + '_dry_mean', ax=ax,
temp_bins=[
slice(*r1), # extrapolation < 0
slice(None, 0.0), # polynomial < 0
slice(0.0, None), # polynomial > 0
slice(*r2), # extrapolation > 0
],
line_xlim=[
[limits[0], r1[1]],
[r1[1], 0.],
[0., r2[0]],
[r2[0], limits[1]],
],
fit=['poly1', 'poly5', 'poly5', 'poly1'],
legendlabels=[
_range_label(None, r1[1]),
_range_label(r1[1], 0.0),
_range_label(0.0, r2[0]),
_range_label(r2[0], None),
],
fmt=self.sd_fit_fmt, method='sel')
psy.gcp(True)(ax=ax).share(keys=['xlim', 'ylim'])
return psy.gcp(True)[:]
@docstrings.dedent
[docs] def make_run_config(self, sp, info, full_nml):
"""
Configure the experiment with the correlations of wet/dry temperature
to mean temperature
Parameters
----------
%(Parameterizer.make_run_config.parameters)s
"""
variables = ['tmin', 'tmax']
states = ['wet', 'dry']
nml = full_nml.setdefault('weathergen_ctl', OrderedDict())
tc = self.task_config
for v, state in product(variables, states):
# means
vname = '%s_%s_means' % (v, state)
nml_name = v + '_' + state[0]
vinfo = info.setdefault(vname, {})
plotter = sp(name=vname).plotters[0]
for key in ['rsquared', 'slope', 'intercept']:
vinfo[key] = float(plotter.plot_data[1].attrs[key])
nml[nml_name + '1'] = float(
plotter.plot_data[1].attrs.get('intercept', 0))
nml[nml_name + '2'] = float(
plotter.plot_data[1].attrs.get('slope'))
# standard deviation
vinfo = info.setdefault('%s_%s_sd' % (v, state), {})
nml[v + '_sd_breaks'] = breaks = [
getattr(tc, v + '_range1')[1], 0.0,
getattr(tc, v + '_range2')[0]]
full_breaks = list(zip([-np.inf] + breaks, breaks + [np.inf]))
sd_coeffs = np.zeros((4, 6))
base = v + 'stddev'
for i, arr in enumerate(
sp.linreg(name='_'.join(
[base, state, 'mean'])).plotters[0].plot_data):
for j in range(6):
sd_coeffs[i, j] = arr.attrs.get('c%i' % j, 0.0)
vinfo[full_breaks[i]] = {
'params': np.round(sd_coeffs[i, :], 8).tolist(),
'rsquared': float(arr.attrs['rsquared'])}
nml[v + '_sd_' + state[0]] = np.round(sd_coeffs, 8).tolist()
_CloudConfig = namedtuple('_CloudConfig', ['args_type'])
_CloudConfig = utils.append_doc(_CloudConfig, docstrings.get_sections("""
Parameters
----------
args_type: str
The type of the stations. One of
ghcn
Stations are GHCN ids
eecra
Stations are EECRA station numbers
""", '_CloudConfig'))
CloudConfig = utils.enhanced_config(_CloudConfig, 'CloudConfig')
@docstrings.dedent
[docs]def default_cloud_config(args_type='ghcn', *args, **kwargs):
"""
The default configuration for :class:`CloudParameterizerBase` instances.
See also the :attr:`CloudParameterizerBase.default_config` attribute
Parameters
----------
%(CloudConfig.parameters)s"""
return CloudConfig(args_type, *utils.default_config(*args, **kwargs))
[docs]class CloudParameterizerBase(Parameterizer):
"""Abstract base class for cloud parameterizers"""
allow_files = False
@property
def default_config(self):
return default_cloud_config()._replace(
**super(CloudParameterizerBase, self).default_config._asdict())
@property
def args_type(self):
return self.task_config.args_type
@property
def setup_from(self):
if self.allow_files and self.args_type == 'files':
return 'scratch'
return super(CloudParameterizerBase, self).setup_from
@setup_from.setter
def setup_from(self, value):
super(CloudParameterizerBase, self.__class__).setup_from.fset(
self, value)
@property
def sql_dtypes(self):
import sqlalchemy
ret = super(CloudParameterizerBase, self).sql_dtypes
if self.args_type in ['eecra', 'files']:
ret['id'] = sqlalchemy.INTEGER
return ret
@property
def stations(self):
return self._stations
@stations.setter
def stations(self, stations):
at = self.args_type
if self.allow_files and at == 'files':
self._stations = self.eecra_stations = stations
elif at == 'eecra':
self._stations = self.eecra_stations = np.asarray(stations,
dtype=int)
elif at == 'ghcn':
Norig = len(stations)
df_map = self.eecra_ghcn_map()
try:
df_map = df_map.loc[stations].dropna()
except KeyError:
self._stations = df_map.index.values[:0]
self.eecra_stations = df_map.station_id.values[:0].astype(int)
else:
self._stations = df_map.index.values
self.eecra_stations = df_map.station_id.values.astype(int)
self.logger.debug(
'Using %i cloud stations in the %i given stations',
len(self._stations), Norig)
else:
raise ValueError(
"args_type must be one of {'eecra', 'ghcn'%s}! Not %s" % (
", 'files'" if self.allow_files else '', at))
[docs] def eecra_ghcn_map(self):
"""Get a dataframe mapping from GHCN id to EECRA station_id"""
cls = CloudParameterizerBase
if self.args_type == 'eecra':
return pd.DataFrame(self.eecra_stations, columns=['station_id'],
index=pd.Index(self.eecra_stations, name='id'))
try:
return cls._eecra_ghcn_map
except AttributeError:
fname = osp.join(self.data_dir, 'eecra_ghcn_map.csv')
if not osp.exists(fname):
fname = osp.join(
utils.get_module_path(inspect.getmodule(cls)), 'data',
'eecra_ghcn_map.csv')
cls._eecra_ghcn_map = pd.read_csv(fname, index_col='id')
return cls._eecra_ghcn_map
@classmethod
[docs] def filter_stations(cls, stations):
"""Get the GHCN stations that are also in the EECRA dataset
Parameters
----------
stations: np.ndarray
A string array with stations to use
Returns
-------
np.ndarray
The ids in `stations` that can be mapped to the eecra dataset"""
return cls.eecra_ghcn_map().loc[stations].dropna().index.values
@classmethod
def _modify_parser(cls, parser):
parser.setup_args(default_cloud_config)
parser, setup_grp, run_grp = super(
CloudParameterizerBase, cls)._modify_parser(parser)
parser.pop_key('args_type', 'metavar', None)
choices = ['ghcn', 'eecra']
if cls.allow_files:
choices += ['files']
append = (
"\n files\n"
" Arguments are paths to raw EECRA files")
else:
append = ''
parser.update_arg('args_type', short='at', group=setup_grp,
choices=choices)
parser.append2help('args_type', append + '\nDefault: %(default)s')
return parser, setup_grp, run_grp
@classmethod
@docstrings.dedent
[docs] def from_task(cls, task, *args, **kwargs):
"""
%(TaskBase.from_task.summary)s
Parameters
----------
%(TaskBase.from_task.parameters)s
Other Parameters
----------------
%(TaskBase.from_task.other_parameters)s
"""
if cls.allow_files and getattr(task.task_config, 'args_type', None):
kwargs.setdefault('args_type', task.task_config.args_type)
return super(CloudParameterizerBase, cls).from_task(
task, *args, **kwargs)
[docs]class HourlyCloud(CloudParameterizerBase):
"""Parameterizer that loads the hourly cloud data from the EECRA database
"""
name = 'hourly_cloud'
summary = 'Hourly cloud data'
_datafile = 'hourly_cloud.csv'
dbname = 'hourly_cloud'
allow_files = True
urls = {
((1971, 1), (1977, 4)): (
'http://cdiac.ess-dive.lbl.gov/ftp/ndp026c/land_197101_197704/'),
((1977, 5), (1982, 10)): (
'http://cdiac.ess-dive.lbl.gov/ftp/ndp026c/land_197705_198210/'),
((1982, 11), (1987, 6)): (
'http://cdiac.ess-dive.lbl.gov/ftp/ndp026c/land_198211_198706/'),
((1987, 7), (1992, 2)): (
'http://cdiac.ess-dive.lbl.gov/ftp/ndp026c/land_198707_199202/'),
((1992, 3), (1996, 12)): (
'http://cdiac.ess-dive.lbl.gov/ftp/ndp026c/land_199203_199612/'),
((1997, 1), (2009, 12)): (
'http://cdiac.ess-dive.lbl.gov/ftp/ndp026c/land_199701_200912/')}
mon_map = dict(zip(
range(1, 13),
"JAN FEB MAR APR MAY JUN JUL AUG SEP OCT NOV DEC".split()))
_continue = False
@property
def sql_dtypes(self):
import sqlalchemy
ret = super(HourlyCloud, self).sql_dtypes
ret.update({"IB": sqlalchemy.SMALLINT,
"lat": sqlalchemy.REAL,
"lon": sqlalchemy.REAL,
"station_id": sqlalchemy.INTEGER,
"LO": sqlalchemy.SMALLINT,
"ww": sqlalchemy.SMALLINT,
"N": sqlalchemy.SMALLINT,
"Nh": sqlalchemy.SMALLINT,
"h": sqlalchemy.SMALLINT,
"CL": sqlalchemy.SMALLINT,
"CM": sqlalchemy.SMALLINT,
"CH": sqlalchemy.SMALLINT,
"AM": sqlalchemy.REAL,
"AH": sqlalchemy.REAL,
"UM": sqlalchemy.SMALLINT,
"UH": sqlalchemy.SMALLINT,
"IC": sqlalchemy.SMALLINT,
"SA": sqlalchemy.REAL,
"RI": sqlalchemy.REAL,
"SLP": sqlalchemy.REAL,
"WS": sqlalchemy.REAL,
"WD": sqlalchemy.SMALLINT,
"AT": sqlalchemy.REAL,
"DD": sqlalchemy.REAL,
"EL": sqlalchemy.SMALLINT,
"IW": sqlalchemy.SMALLINT,
"IP": sqlalchemy.SMALLINT})
return ret
years = range(1971, 2010)
months = range(1, 13)
@property
def raw_dir(self):
"""The directory where we expect the raw files"""
return osp.join(self.data_dir, 'raw')
@property
def data_dir(self):
return osp.join(super(HourlyCloud, self).data_dir, 'eecra')
@property
def raw_src_files(self):
src_dir = osp.join(self.data_dir, 'raw')
return OrderedDict([
(yrmon, osp.join(src_dir, self.eecra_fname(*yrmon)))
for yrmon in product(self.years, self.months)])
@property
def src_files(self):
src_dir = osp.join(self.data_dir, 'stations')
return [
osp.join(src_dir, str(s) + '.csv') for s in self.eecra_stations]
@classmethod
@docstrings.get_sectionsf('HourlyCloud.eecra_fname')
@docstrings.dedent
[docs] def eecra_fname(cls, year, mon, ext=''):
"""The the name of the eecra file
Parameters
---------
year: int
The year of the data
month: int
The integer of the month between 1 and 12"""
c_mon = cls.mon_map[mon]
c_yr = str(year)
return c_mon + c_yr[-2:] + 'L' + ext
@classmethod
@docstrings.dedent
[docs] def get_eecra_url(cls, year, mon):
"""
Get the download path for the file for a specific year and month
Parameters
----------
%(HourlyCloud.eecra_fname.parameters)s
"""
for (d0, d1), url in cls.urls.items():
if (year, mon) >= d0 and (year, mon) <= d1:
return url + cls.eecra_fname(year, mon, '.Z')
def _download_worker(self, qin, qout):
while self._continue:
yrmon, fname = qin.get()
utils.download_file(self.get_eecra_url(*yrmon), fname)
spr.call(['gzip', '-d', fname])
qout.put((yrmon, osp.splitext(fname)[0]))
qin.task_done()
[docs] def init_from_scratch(self):
"""Reimplemented to download the data if not existent"""
if self.args_type == 'files':
return
logger = self.logger
logger.debug('Initializing %s', self.name)
stations = self.stations
logger.debug('Reading data for %s stations', len(stations))
raw_dir = self.raw_dir
stations_dir = osp.join(self.data_dir, 'stations')
if not osp.isdir(raw_dir):
os.makedirs(raw_dir)
if not osp.isdir(stations_dir):
os.makedirs(stations_dir)
src_files = self.src_files
eecra_stations = self.eecra_stations
missing = [station_id for station_id, fname in zip(
eecra_stations, src_files) if not osp.exists(fname)]
if missing:
logger.debug(
'files for %i stations are missing. Start extraction...',
len(missing))
from gwgen.parse_eecra import extract_data
self.download_src(raw_dir)
extract_data(missing, raw_dir, stations_dir,
self.years, self.months)
logger.debug('Done')
[docs] def get_data_from_files(self, files):
def save_loc(fname):
try:
return pd.read_csv(fname, index_col='station_id').loc[
station_ids]
except KeyError:
return pd.DataFrame()
station_ids = self.eecra_stations
self.logger.debug('Extracting data for %i stations from %i files',
len(station_ids), len(files))
return pd.concat(
list(map(save_loc, files)), ignore_index=False, copy=False)
[docs] def download_src(self, src_dir, force=False, keep=False):
"""Download the source files from the EECRA ftp server"""
logger = self.logger
logger.debug(' Expected data source: %s', src_dir)
files = self.raw_src_files
missing = {yrmon: fname for yrmon, fname in six.iteritems(files)
if not osp.exists(fname)}
logger.debug('%i raw source files are missing.', len(missing))
for yrmon, fname in six.iteritems(missing):
compressed_fname = fname + '.Z'
if force or not osp.exists(compressed_fname):
utils.download_file(self.get_eecra_url(*yrmon),
compressed_fname)
spr.call(['gzip', '-d', compressed_fname] + (['-k'] * keep))
[docs] def setup_from_file(self, *args, **kwargs):
kwargs['index_col'] = ['id', 'year', 'month', 'day', 'hour']
return super(HourlyCloud, self).setup_from_file(*args, **kwargs)
[docs] def setup_from_db(self, *args, **kwargs):
kwargs['index_col'] = ['id', 'year', 'month', 'day', 'hour']
return super(HourlyCloud, self).setup_from_db(*args, **kwargs)
[docs] def setup_from_scratch(self):
"""Set up the data"""
if self.args_type == 'files':
from gwgen.parse_eecra import parse_file
self.data = pd.concat(list(map(parse_file, self.stations))).rename(
columns={'station_id': 'id'})
self.data.set_index(['id', 'year', 'month', 'day', 'hour'],
inplace=True)
self.data.sort_index(inplace=True)
else:
files = self.src_files
df_map = pd.DataFrame.from_dict(
{'id': self.stations, 'station_id': self.eecra_stations})
self.data = pd.concat(
[pd.read_csv(fname).merge(df_map, on='station_id', how='inner')
for fname in files], ignore_index=False, copy=False)
self.data.set_index(['id', 'year', 'month', 'day', 'hour'],
inplace=True)
self.data.sort_index(inplace=True)
[docs]class DailyCloud(CloudParameterizerBase):
"""Parameterizer to calculate the daily cloud values from hourly cloud data
"""
name = 'daily_cloud'
summary = 'Calculate the daily cloud values from hourly cloud data'
_datafile = 'daily_cloud.csv'
dbname = 'daily_cloud'
setup_requires = ['hourly_cloud']
allow_files = True
@staticmethod
[docs] def calculate_daily(df):
ww = df.ww.values
at = df.AT
return pd.DataFrame.from_dict(OrderedDict([
('wet_day', [(
((ww >= 50) & (ww <= 75)) |
(ww == 75) |
(ww == 77) |
(ww == 79) |
((ww >= 80) & (ww <= 99))).any().astype(int)]),
('tmin', [at.min()]),
('tmax', [at.max()]),
('mean_cloud', [df.N.mean() / 8.]),
('wind', [df.WS.mean()])
]))
[docs] def setup_from_file(self, *args, **kwargs):
kwargs['index_col'] = ['id', 'year', 'month', 'day']
return super(DailyCloud, self).setup_from_file(*args, **kwargs)
[docs] def setup_from_db(self, *args, **kwargs):
kwargs['index_col'] = ['id', 'year', 'month', 'day']
return super(DailyCloud, self).setup_from_db(*args, **kwargs)
[docs] def setup_from_scratch(self):
df = self.hourly_cloud.data
data = df.groupby(level=['id', 'year', 'month', 'day']).apply(
self.calculate_daily)
data.index = data.index.droplevel(-1)
self.data = data
[docs]class MonthlyCloud(CloudParameterizerBase):
"""Parameterizer to calculate the monthly cloud values from daily cloud"""
name = 'monthly_cloud'
summary = 'Calculate the monthly cloud values from daily cloud data'
_datafile = 'monthly_cloud.csv'
dbname = 'monthly_cloud'
setup_requires = ['daily_cloud']
allow_files = True
@property
def sql_dtypes(self):
import sqlalchemy
ret = super(MonthlyCloud, self).sql_dtypes
for col in set(self.data.columns).difference(ret):
if 'complete' in col:
ret[col] = sqlalchemy.BOOLEAN
elif 'wet_day' in col:
ret[col] = sqlalchemy.SMALLINT
else:
ret[col] = sqlalchemy.REAL
return ret
@staticmethod
[docs] def calculate_monthly(df):
if len(df) > 1:
wet = df.wet_day.values.astype(bool)
return pd.DataFrame.from_dict(OrderedDict([
('wet_day', [df.wet_day.sum()]),
('mean_cloud_wet', [df.mean_cloud.ix[wet].mean()]),
('mean_cloud_dry', [df.mean_cloud.ix[~wet].mean()]),
('mean_cloud', [df.mean_cloud.mean()]),
('sd_cloud_wet', [df.mean_cloud.ix[wet].std()]),
('sd_cloud_dry', [df.mean_cloud.ix[~wet].std()]),
('sd_cloud', [df.mean_cloud.std()]),
('wind_wet', [df.wind.ix[wet].mean()]),
('wind_dry', [df.wind.ix[~wet].mean()]),
('wind', [df.wind.mean()]),
('sd_wind_wet', [df.wind.ix[wet].std()]),
('sd_wind_dry', [df.wind.ix[~wet].std()]),
('sd_wind', [df.wind.std()]),
]))
else:
return pd.DataFrame([], columns=[
'wet_day', 'mean_cloud_wet', 'mean_cloud_dry',
'mean_cloud', 'sd_cloud_wet', 'sd_cloud_dry', 'sd_cloud',
'wind_wet', 'wind_dry',
'wind', 'sd_wind_wet', 'sd_wind_dry', 'sd_wind'])
[docs] def setup_from_file(self, *args, **kwargs):
kwargs['index_col'] = ['id', 'year', 'month']
return super(MonthlyCloud, self).setup_from_file(*args, **kwargs)
[docs] def setup_from_db(self, *args, **kwargs):
kwargs['index_col'] = ['id', 'year', 'month']
return super(MonthlyCloud, self).setup_from_db(*args, **kwargs)
[docs] def setup_from_scratch(self):
df = self.daily_cloud.data
g = df.groupby(level=['id', 'year', 'month'])
data = g.apply(self.calculate_monthly)
# wet_day might be converted due to empty dataframe
data['wet_day'] = data['wet_day'].astype(int)
data.index = data.index.droplevel(-1)
# number of records per month
df_nums = g.count()
df_nums['day'] = 1
s = pd.to_datetime(df_nums.reset_index()[['year', 'month', 'day']])
s.index = df_nums.index
df_nums['ndays'] = ((s + MonthEnd(0)) - s).dt.days + 1
cols = ['wet_day', 'tmin', 'tmax', 'mean_cloud', 'wind']
complete_cols = [col + '_complete' for col in cols]
for col, tcol in zip(cols, complete_cols):
df_nums[tcol] = df_nums[col] == df_nums.ndays
df_nums.drop('day', 1, inplace=True)
self.data = pd.merge(
data, df_nums[complete_cols], left_index=True,
right_index=True)
[docs]def cloud_func(x, a):
"""Function for fitting the mean of wet and dry cloud to the mean of all
cloud
This function returns y with
.. math::
y = ((-a - 1) / (a^2 x - a^2 - a)) - \\frac{1}{a}
Parameters
----------
x: np.ndarray
The x input data
a: float
The parameter as mentioned in the equation above"""
return ((-a - 1) / (a*a*x - a*a - a)) - 1/a
[docs]def cloud_sd_func(x, a):
"""Function for fitting the standard deviation of wet and dry cloud to the
mean of wet or dry cloud
This function returns y with
.. math::
y = a^2 x (1 - x)
Parameters
----------
x: np.ndarray
The x input data
a: float
The parameter as mentioned in the equation above"""
return a * a * x * (1 - x)
[docs]class CompleteMonthlyCloud(MonthlyCloud):
"""Parameterizer to extract the months with complete clouds"""
name = 'cmonthly_cloud'
setup_requires = ['monthly_cloud']
dbname = 'complete_monthly_cloud'
_datafile = 'complete_monthly_cloud.csv'
summary = "Extract the months with complete cloud data"
cols = ['wet_day', 'mean_cloud'] # not 'tmin', 'tmax'
[docs] def setup_from_scratch(self):
cols = self.cols
complete_cols = [col + '_complete' for col in cols]
self.data = self.monthly_cloud.data[
self.monthly_cloud.data[complete_cols].values.all(axis=1)]
[docs]class YearlyCompleteMonthlyCloud(CompleteMonthlyCloud):
"""Parameterizer to extract the months with complete clouds"""
name = 'yearly_cmonthly_cloud'
setup_requires = ['cmonthly_cloud']
dbname = 'yearly_complete_monthly_cloud'
_datafile = 'yearly_complete_monthly_cloud.csv'
summary = "Extract the months with complete cloud data in complete years"
allow_files = False
cols = ['wet_day', 'mean_cloud'] # not 'tmin', 'tmax'
[docs] def setup_from_scratch(self):
def year_complete(series):
"""Check whether the data for the given is complete"""
return series.astype(int).sum() == 12
all_monthly = self.cmonthly_cloud.data
cols = self.cols
complete_cols = [col + '_complete' for col in cols]
df_yearly = all_monthly[complete_cols].groupby(
level=['id', 'year']).agg(year_complete)
names = all_monthly.index.names
all_monthly = all_monthly.reset_index().merge(
df_yearly[complete_cols].reset_index(), on=['id', 'year'],
suffixes=['', '_year']).set_index(names)
ycomplete_cols = [col + '_complete_year' for col in cols]
self.data = all_monthly.ix[
all_monthly[ycomplete_cols].values.all(axis=1)]
[docs]class CloudParameterizer(CompleteMonthlyCloud):
"""Parameterizer to extract the months with complete clouds"""
name = 'cloud'
summary = 'Parameterize the cloud data'
setup_requires = ['cmonthly_cloud']
_datafile = 'cloud_correlation.csv'
dbname = 'cloud_correlation'
allow_files = False
fmt = dict(
cmap='w_Reds',
xrange=(0, 1),
yrange=(0, 1),
legend=False,
bounds=['minmax', 11, 0, 99],
cbar='',
bins=10,
xlabel='on %(state)s days',
xlim=(0, 1),
ylim=(0, 1),
)
has_run = True
namelist_keys = {
'cldf_w': 'mean_cloud_wet.a',
'cldf_d': 'mean_cloud_dry.a',
'cldf_sd_w': 'sd_cloud_wet.a',
'cldf_sd_d': 'sd_cloud_dry.a'}
error_keys = {
'cldf_w': 'mean_cloud_wet.a_err',
'cldf_d': 'mean_cloud_dry.a_err',
'cldf_sd_w': 'sd_cloud_wet.a_err',
'cldf_sd_d': 'sd_cloud_dry.a_err'}
@property
def sql_dtypes(self):
import sqlalchemy
ret = super(CloudParameterizer, self).sql_dtypes
ret['wet_day'] = sqlalchemy.REAL
return ret
[docs] def setup_from_file(self, *args, **kwargs):
kwargs['index_col'] = ['id', 'month']
return Parameterizer.setup_from_file(self, *args, **kwargs)
[docs] def setup_from_db(self, *args, **kwargs):
kwargs['index_col'] = ['id', 'month']
return Parameterizer.setup_from_db(self, *args, **kwargs)
@property
def ds(self):
"""The dataframe of this parameterization task converted to a dataset
"""
import xarray as xr
ds = xr.Dataset.from_dataframe(self.data.reset_index())
for t, state in product(['sd', 'mean'], ['', 'wet', 'dry']):
vname = t + '_cloud' + (('_' + state) if state else '')
varo = ds.variables[vname]
label = 'std. dev.' if t == 'sd' else 'mean'
varo.attrs['long_name'] = '%s cloud fraction' % (label)
varo.attrs['units'] = '-'
varo.attrs['symbol'] = 'c_\mathrm{{%s%s}}' % (
'sd' if t == 'sd' else '', (', ' + state) if state else '')
varo.attrs['state'] = state or 'all'
return ds
[docs] def setup_from_scratch(self):
g = self.cmonthly_cloud.data.groupby(level=['id', 'month'])
data = g.mean()
cols = [col for col in data.columns if '_complete' not in col]
self.data = data[cols]
@docstrings.dedent
[docs] def create_project(self, ds):
"""
Plot the relationship wet/dry cloud - mean cloud
Parameters
----------
%(TaskBase.create_project.parameters)s"""
import matplotlib.pyplot as plt
import seaborn as sns
import psyplot.project as psy
sns.set_style('white')
types = ['mean', 'sd']
axes = np.concatenate([
plt.subplots(1, 2, figsize=(12, 4))[1] for _ in range(2)])
for fig in set(ax.get_figure() for ax in axes):
fig.subplots_adjust(bottom=0.25)
middle = (
axes[0].get_position().x0 + axes[1].get_position().x1) / 2.
axes = iter(axes)
fit_funcs = {'mean': cloud_func, 'sd': cloud_sd_func}
for t in types:
psy.plot.densityreg(
ds, name='%s_cloud_wet' % (t), ax=next(axes),
ylabel='%(long_name)s\non %(state)s days',
text=[(middle, 0.03, '%(xlong_name)s', 'fig', dict(
weight='bold', ha='center'))], fmt=self.fmt,
fit=fit_funcs[t],
coord='mean_cloud' + ('_wet' if t == 'sd' else ''))
psy.plot.densityreg(
ds, name='%s_cloud_dry' % (t), ax=next(axes),
ylabel='on %(state)s days', fmt=self.fmt,
fit=fit_funcs[t],
coord='mean_cloud' + ('_dry' if t == 'sd' else ''))
return psy.gcp(True)[:]
@docstrings.dedent
[docs] def make_run_config(self, sp, info, full_nml):
"""
Configure with the wet/dry cloud - mean cloud correlation
Parameters
----------
%(Parameterizer.make_run_config.parameters)s
"""
nml = full_nml.setdefault('weathergen_ctl', OrderedDict())
states = ['wet', 'dry']
types = ['mean', 'sd']
for t, state in product(types, states):
vname = '%s_cloud_%s' % (t, state)
nml_name = 'cldf%s_%s' % ("_sd" if t == 'sd' else '', state[:1])
info[vname] = vinfo = {}
plotter = sp(name=vname).plotters[0]
for key in ['a', 'a_err', 'rsquared']:
vinfo[key] = float(plotter.plot_data[1].attrs[key])
nml[nml_name] = float(
plotter.plot_data[1].attrs.get('a', 0))
[docs]class CompleteMonthlyWind(CompleteMonthlyCloud):
"""Parameterizer to extract the months with complete wind"""
name = 'cmonthly_wind'
setup_requires = ['monthly_cloud']
dbname = 'complete_monthly_wind'
_datafile = 'complete_monthly_wind.csv'
summary = "Extract the months with complete wind data"
cols = ['wet_day', 'wind'] # not 'tmin', 'tmax'
[docs]class YearlyCompleteMonthlyWind(YearlyCompleteMonthlyCloud):
"""Parameterizer to extract the months with complete wind"""
name = 'yearly_cmonthly_wind'
setup_requires = ['cmonthly_wind']
dbname = 'yearly_complete_monthly_wind'
_datafile = 'yearly_complete_monthly_wind.csv'
summary = "Extract the months with complete wind data in complete years"
cols = ['wet_day', 'wind']
@property
def cmonthly_cloud(self):
return self.cmonthly_wind
[docs]def wind_sd_func(x, c1, c2, c3):
# Degree 3 polynomial that goes through 0
return c1 * x + c2 * x * x + c3 * x * x * x
[docs]class WindParameterizer(CompleteMonthlyWind):
"""Parameterizer to extract the months with complete clouds"""
name = 'wind'
summary = 'Parameterize the wind data'
setup_requires = ['cmonthly_wind']
_datafile = 'wind_correlation.csv'
dbname = 'wind_correlation'
allow_files = False
fmt = dict(
legend={'loc': 'upper left'},
cmap='w_Reds',
xrange=(0, ['rounded', 95]),
yrange=(0, ['rounded', 95]),
legendlabels=[
'$%(symbol)s = %(intercept)1.4f %(slope)+1.4f \\cdot %(xsymbol)s$'
],
bounds=['minmax', 11, 0, 99],
cbar='',
bins=100,
fix=0,
xlabel='on %(state)s days',
)
has_run = True
namelist_keys = {
'wind_w1': 'wind_wet.intercept',
'wind_w2': 'wind_wet.slope',
'wind_d1': 'wind_dry.intercept',
'wind_d2': 'wind_dry.slope',
'wind_sd_w': 'sd_wind_wet',
'wind_sd_d': 'sd_wind_dry',
}
@property
def sql_dtypes(self):
import sqlalchemy
ret = super(WindParameterizer, self).sql_dtypes
ret['wet_day'] = sqlalchemy.REAL
return ret
[docs] def setup_from_file(self, *args, **kwargs):
kwargs['index_col'] = ['id', 'month']
return Parameterizer.setup_from_file(self, *args, **kwargs)
[docs] def setup_from_db(self, *args, **kwargs):
kwargs['index_col'] = ['id', 'month']
return Parameterizer.setup_from_db(self, *args, **kwargs)
@property
def ds(self):
"""The dataframe of this parameterization task converted to a dataset
"""
import xarray as xr
df = self.data.reset_index()
ds = xr.Dataset.from_dataframe(df)
ds['wind_bins'] = xr.Variable(
('wind_bins', ), np.arange(0.05, 49.955, 0.1), attrs=dict(
long_name='mean wind speed', units='m/s'))
for t, state in product(['sd_', ''], ['', 'wet', 'dry']):
vname = t + 'wind' + (('_' + state) if state else '')
varo = ds.variables[vname]
label = 'std. dev. of ' if t else 'mean'
varo.attrs['long_name'] = '%s wind speed' % (label)
varo.attrs['units'] = 'm/s'
varo.attrs['symbol'] = 'w_\mathrm{{%s%s%s}}' % (
'sd' if t else '', ', ' if state and t else '', state)
varo.attrs['state'] = state or 'all'
if t == 'sd_' and state:
g = df.groupby(pd.cut(df['wind_' + state],
np.arange(0.0, 50.05, 0.1)))
ds[vname + '_mean'] = xr.Variable(
('wind_bins', ), g[vname].mean(), attrs=ds[vname].attrs)
ds[vname + '_std'] = xr.Variable(
('wind_bins', ), g[vname].std(), attrs=ds[vname].attrs)
return ds
[docs] def setup_from_scratch(self):
def mean(s):
try:
return s.mean()
except:
return np.nan
all_data = self.cmonthly_wind.data
cols = [col for col in all_data.columns if '_complete' not in col]
g = all_data[cols].groupby(level=['id', 'month'])
# HACK: somehow g.mean() turned into an Error
data = g.agg(mean)
self.data = data
@docstrings.dedent
[docs] def create_project(self, ds):
"""
Plot the relationship wet/dry cloud - mean cloud
Parameters
----------
%(TaskBase.create_project.parameters)s"""
import matplotlib.pyplot as plt
import seaborn as sns
import psyplot.project as psy
sns.set_style('white')
types = ['', 'sd_']
axes = np.concatenate([
plt.subplots(1, 2, figsize=(12, 4))[1] for _ in range(2)])
for fig in set(ax.get_figure() for ax in axes):
fig.subplots_adjust(bottom=0.25)
middle = (
axes[0].get_position().x0 + axes[1].get_position().x1) / 2.
axes = iter(axes)
for t in types:
if t == 'sd_':
kws = {'fit': wind_sd_func,
'legendlabels': [
'$%(symbol)s = '
'%(c1)1.3f \\cdot %(xsymbol)s '
'%(c2)+1.3f \\cdot %(xsymbol)s^2 '
'%(c3)+1.3f \\cdot %(xsymbol)s^3$']}
else:
kws = {}
ax = next(axes)
if t == 'sd_':
psy.plot.barplot(ds, name='sd_wind_wet_mean',
widths='data', alpha=0.5, color='k', ax=ax)
psy.plot.densityreg(
ds, name='%swind_wet' % (t, ), ax=ax,
ylabel='%(long_name)s [%(units)s]\non %(state)s days',
text=[(middle, 0.03,
'%(xlong_name)s [%(xunits)s]', 'fig',
dict(weight='bold', ha='center'))], fmt=self.fmt,
coord='wind' + ('_wet' if t == 'sd_' else ''), **kws)
ax = next(axes)
if t == 'sd_':
psy.plot.barplot(ds, name='sd_wind_dry_mean',
widths='data', alpha=0.5, color='k', ax=ax)
psy.plot.densityreg(
ds, name='%swind_dry' % (t, ), ax=ax,
ylabel='on %(state)s days', fmt=self.fmt,
coord='wind' + ('_dry' if t == 'sd_' else ''),
**kws)
return psy.gcp(True)[:]
@docstrings.dedent
[docs] def make_run_config(self, sp, info, full_nml):
"""
Configure with the wet/dry cloud - mean cloud correlation
Parameters
----------
%(Parameterizer.make_run_config.parameters)s
"""
nml = full_nml.setdefault('weathergen_ctl', OrderedDict())
states = ['wet', 'dry']
for state in states:
# linear fits of means
t = ''
vname = '%swind_%s' % (t, state)
nml_name = 'wind%s_%s' % ("_sd" if t == 'sd_' else '', state[:1])
info[vname] = vinfo = {}
plotter = sp(name=vname).plotters[0]
for key in ['rsquared', 'slope', 'intercept']:
vinfo[key] = float(plotter.plot_data[1].attrs[key])
nml[nml_name + '1'] = float(
plotter.plot_data[1].attrs.get('intercept', 0))
nml[nml_name + '2'] = float(
plotter.plot_data[1].attrs.get('slope'))
# polynomial fits of std
t = 'sd_'
vname = '%swind_%s' % (t, state)
nml_name = 'wind%s_%s' % ("_sd" if t == 'sd_' else '', state[:1])
info[vname] = vinfo = {}
plotter = sp(name=vname).plotters[0]
da = plotter.plot_data[1]
arr = np.zeros(6)
for i in range(1, 4):
arr[i] = da.attrs['c%i' % i]
arr = np.round(arr, 8).tolist()
vinfo['params'] = nml[nml_name] = arr
vinfo['rsquared'] = float(da.attrs['rsquared'])
[docs]class CompleteDailyCloud(DailyCloud):
"""The parameterizer that calculates the days in complete months of cloud
data"""
name = 'cdaily_cloud'
setup_requires = ['daily_cloud', 'monthly_cloud']
_datafile = "complete_daily_cloud.csv"
dbname = 'complete_daily_cloud'
summary = "Get the days of the complete daily cloud months"
cols = ['wet_day', 'tmin', 'tmax', 'mean_cloud', 'wind']
[docs] def init_from_scratch(self):
pass
[docs] def setup_from_scratch(self):
monthly = self.monthly_cloud.data
cols = self.cols
complete_cols = [col + '_complete' for col in cols]
self.data = self.daily_cloud.data.reset_index().merge(
monthly[
monthly[complete_cols].values.all(axis=1)][[]].reset_index(),
how='inner', on=['id', 'year', 'month'], copy=False).set_index(
['id', 'year', 'month', 'day'])
[docs]class YearlyCompleteDailyCloud(CompleteDailyCloud):
"""The parameterizer that calculates the days in complete months of cloud
data"""
name = 'yearly_cdaily_cloud'
setup_requires = ['cdaily_cloud', 'cmonthly_cloud']
_datafile = "yearly_complete_daily_cloud.csv"
dbname = 'yearly_complete_daily_cloud'
summary = (
"Get the days of the complete daily cloud months in complete years")
allow_files = False
cols = ['wet_day', 'mean_cloud', 'tmin', 'tmax', 'wind']
[docs] def setup_from_scratch(self):
def year_complete(series):
"""Check whether the data for the given is complete"""
return series.astype(int).sum() == 12
all_monthly = self.cmonthly_cloud.data
cols = self.cols
complete_cols = [col + '_complete' for col in cols]
df_yearly = all_monthly[complete_cols].groupby(
level=['id', 'year']).agg(year_complete)
names = all_monthly.index.names
all_monthly = all_monthly.reset_index().merge(
df_yearly[complete_cols].reset_index(), on=['id', 'year'],
suffixes=['', '_year']).set_index(names)
ycomplete_cols = [col + '_complete_year' for col in cols]
monthly = all_monthly.ix[
all_monthly[ycomplete_cols].values.all(axis=1)][[]].reset_index()
self.data = self.cdaily_cloud.data.reset_index().merge(
monthly, how='inner', on=['id', 'year', 'month'],
copy=False).set_index(['id', 'year', 'month', 'day'])
[docs]class CrossCorrelation(Parameterizer):
"""Class to calculate the cross correlation between the variables"""
name = 'corr'
summary = 'Cross corellation between temperature and cloudiness'
_datafile = 'cross_correlation.csv'
dbname = 'cross_correlation'
# choose only complete years to have the maximum length of consecutive
# days
setup_requires = ['yearly_cdaily_cloud']
cols = ['tmin', 'tmax', 'mean_cloud', 'wind']
namelist_keys = {'a': None, 'b': None}
# does not have stations in the index --> must be processed serial
setup_parallel = False
has_run = True
@property
def sql_dtypes(self):
import sqlalchemy
ret = super(CrossCorrelation, self).sql_dtypes
for col in self.cols:
ret[col + '1'] = ret[col]
ret['variable'] = sqlalchemy.TEXT
return ret
[docs] def setup_from_file(self, **kwargs):
"""Set up the parameterizer from already stored files"""
kwargs.setdefault('index_col', 'variable')
self.data = pd.read_csv(self.datafile, **kwargs)
self.data.columns.name = 'variable'
[docs] def setup_from_db(self, **kwargs):
"""Set up the parameterizer from datatables already created"""
kwargs.setdefault('index_col', 'variable')
self.data = pd.read_sql_table(self.dbname, self.engine, **kwargs)
self.data.columns.name = 'variable'
[docs] def setup_from_scratch(self):
import dask.dataframe as dd
if not self.global_config.get('serial'):
scheduler = 'processes'
nprocs = self.global_config.get('nprocs', 'all')
if nprocs == 'all':
import multiprocessing as mp
nprocs = mp.cpu_count()
kws = {'num_workers': nprocs}
else:
scheduler = 'single-threaded'
kws = {}
df = self.yearly_cdaily_cloud.data.sort_index()
df['wind'] = df['wind'] ** 0.5
df['date'] = vals = pd.to_datetime(df[[]].reset_index().drop('id', 1))
df['date'].values[:] = vals
df.set_index('date', inplace=True)
chunksize = self.global_config.get('chunksize', 10 ** 6)
ddf = dd.from_pandas(df, chunksize=chunksize)
cols = self.cols
# m0
self.data = final = ddf[cols].corr().compute(scheduler=scheduler,
**kws)
final.index.name = 'variable'
# shift the data by one row (day)
shifted = df.shift(1)
# set first day of each year to NaN because it might not come from the
# previous year
shifted.iloc[(shifted.index.month == 1) &
(shifted.index.day == 1)] = np.nan
dshifted = dd.from_pandas(shifted, chunksize=chunksize)
# m1
for col in cols:
final[col + '1'] = 0
for col2 in cols:
# HACK: rename the shifted column to account for
# https://github.com/dask/dask/issues/4906
final.loc[col2, col + '1'] = ddf[col].corr(
dshifted[col2].rename('shifted')).compute(
scheduler=scheduler, **kws)
final.columns.name = 'variable'
[docs] def run(self, info, full_nml):
cols = self.cols
lag1_cols = [col + '1' for col in cols]
nml = full_nml.setdefault('weathergen_ctl', OrderedDict())
m0 = self.data[cols]
m1 = self.data[lag1_cols].rename(columns=dict(zip(lag1_cols, cols)))
m0i = np.linalg.inv(m0)
# a and b are transposed before given to the weathergenmod because
# otherwise you get wrong values with matmul
nml['a'] = np.dot(m1, m0i).tolist()
nml['b'] = np.linalg.cholesky(
m0 - np.dot(np.dot(m1, m0i), m1.T)).tolist()
info['M0'] = m0.values.tolist()
info['M1'] = m1.values.tolist()
@classmethod
def _modify_parser(cls, parser):
"""Reimplemented because there are no run keywords for this task"""
cls.has_run = False
ret = super(CrossCorrelation, cls)._modify_parser(parser)
cls.has_run = True
return ret