Source code for gwgen.preproc

# -*- coding: utf-8 -*-
"""Additional routines for preprocessing"""
import tempfile
import os.path as osp
from collections import namedtuple
import numpy as np
import pandas as pd
from psyplot.compat.pycompat import OrderedDict
import gwgen.utils as utils
from gwgen.utils import docstrings


[docs]class CloudPreproc(utils.TaskBase): @property def task_data_dir(self): return osp.join(self.data_dir, 'eecra') _registry = []
[docs]class CloudInventory(CloudPreproc): """A task for computing the EECRA inventory for each station""" name = 'eecra_inventory' summary = 'Compute the inventory of the EECRA stations' http_xstall = 'http://cdiac.ornl.gov/ftp/ndp026c/XSTALL' _datafile = 'eecra_inventory.csv' dbname = 'eecra_inventory' has_run = True @property def setup_parallel(self): return self.setup_from == 'scratch' @property def default_config(self): return default_cloud_inventory_config()._replace( **super(CloudInventory, self).default_config._asdict()) @property def xstall_df(self): """The dataframe corresponding to the XSTALL stations""" use_xstall = self.task_config.xstall if utils.isstring(use_xstall): fname = self.task_config.no_xstall else: fname = tempfile.NamedTemporaryFile().name utils.download_file(self.http_xstall, fname) arr = np.loadtxt(fname, usecols=[1, 2, 3]) df = pd.DataFrame(arr, columns=['station_id', 'lat', 'lon']) df['station_id'] = df.station_id.astype(int) df.set_index('station_id', inplace=True) return df @classmethod def _modify_parser(cls, parser): parser.setup_args(default_cloud_inventory_config) cls.has_run = False parser, setup_grp, run_grp = super(CloudInventory, cls)._modify_parser( parser) parser.update_arg('xstall', group=setup_grp) cls.has_run = True return parser, setup_grp, run_grp def __init__(self, *args, **kwargs): super(CloudInventory, self).__init__(*args, **kwargs) self.__setup = False
[docs] def setup(self, *args, **kwargs): self.__setup = True super(CloudInventory, self).setup(*args, **kwargs)
[docs] def init_from_scratch(self): from gwgen.parameterization import HourlyCloud task = HourlyCloud.from_task(self) task.download_src(task.raw_dir) # make sure the source files exist
[docs] def setup_from_file(self, **kwargs): """Set up the task from already stored files (and avoid locating the stations of this task)""" kwargs = self._split_kwargs(kwargs) for i, datafile in enumerate(utils.safe_list(self.datafile)): self._set_data(pd.read_csv(datafile, **kwargs[i]), i)
[docs] def setup_from_db(self, **kwargs): """Set up the task from datatables already created (and avoid locating the stations of this task)""" kwargs = self._split_kwargs(kwargs) for i, dbname in enumerate(utils.safe_list(self.dbname)): self._set_data(pd.read_sql_query( "SELECT * FROM %s" % (self.dbname), self.engine, **kwargs[i]), i)
[docs] def setup_from_scratch(self): from gwgen.parse_eecra import parse_file def compute(fname): g = parse_file(fname).groupby('station_id')[ ['lat', 'lon', 'year']] df = g.mean() df['counts'] = g.size() std = g.std() df['lon_std'] = std.lon df['lat_std'] = std.lat return df self.data = pd.concat(list(map(compute, self.stations)))
[docs] def write2db(self, *args, **kwargs): if self.__setup: return kwargs.setdefault('if_exists', 'replace') super(CloudInventory, self).write2db(*args, **kwargs)
[docs] def write2file(self, *args, **kwargs): if self.__setup: return super(CloudInventory, self).write2file(*args, **kwargs)
[docs] def run(self, info): self.__setup = False if self.setup_from == 'scratch': df = self.data # we may use a parallel setup which requires a weighted average g = df.groupby(level='station_id') total_counts = g.counts.transform("sum") df['lat'] = df.counts / total_counts * df.lat df['lon'] = df.counts / total_counts * df.lon df['lat_std'] = (df.counts / total_counts) * df.lat_std ** 2 df['lon_std'] = (df.counts / total_counts) * df.lon_std ** 2 eecra = g.agg(OrderedDict([ ('lat', 'sum'), ('lon', 'sum'), ('lat_std', 'sum'), ('lon_std', 'sum'), ('year', ('min', 'max')), ('counts', 'sum')])) eecra.columns = ['lat', 'lon', 'lat_std', 'lon_std', 'firstyear', 'lastyear', 'counts'] eecra[['lat_std', 'lon_std']] **= 0.5 use_xstall = self.task_config.xstall if use_xstall: to_replace = self.xstall_df # keep only matching stations to_replace = to_replace.join(eecra[[]], how='inner') eecra.loc[to_replace.index, ['lat', 'lon']] = to_replace self.data = eecra if self.task_config.to_csv: self.write2file() if self.task_config.to_db: self.write2db()
[docs]class CloudGHCNMap(CloudPreproc): """A task for computing the EECRA inventory for each station""" name = 'eecra_ghcn_map' setup_requires = ['eecra_inventory'] summary = 'Compute the inventory of the EECRA stations' _datafile = 'eecra_ghcn_map.csv' dbname = 'eecra_ghcn_map' has_run = True @property def default_config(self): return default_cloud_ghcn_map_config()._replace( **super(CloudGHCNMap, self).default_config._asdict()) @classmethod def _modify_parser(cls, parser): parser.setup_args(default_cloud_ghcn_map_config) cls.has_run = False parser, setup_grp, run_grp = super(CloudGHCNMap, cls)._modify_parser( parser) parser.update_arg('max_distance', group=setup_grp, short='md') parser.pop_arg('to_db') parser.pop_arg('setup_from') cls.has_run = True return parser, setup_grp, run_grp def __init__(self, *args, **kwargs): super(CloudGHCNMap, self).__init__(*args, **kwargs) self.task_config = self.task_config._replace( setup_from='scratch', to_db=False) self.__setup = False
[docs] def setup(self, *args, **kwargs): self.__setup = True super(CloudGHCNMap, self).setup(*args, **kwargs)
[docs] def init_from_scratch(self): from gwgen.parameterization import HourlyCloud task = HourlyCloud.from_task(self) task.download_src(task.raw_dir) # make sure the source files exist
[docs] def setup_from_scratch(self): """Does nothing but initializing an empty data frame. The real work is done in the :meth:`run` method""" self.data = pd.DataFrame([], columns=['station_id', 'dist'], index=pd.Index([], name='id'))
[docs] def write2db(self, *args, **kwargs): raise NotImplementedError( 'The data is always written to the database!')
[docs] def write2file(self, *args, **kwargs): if self.__setup: return super(CloudGHCNMap, self).write2file(*args, **kwargs)
[docs] def run(self, info): def create_geog(table): conn.execute( 'ALTER TABLE %s ADD COLUMN geog geography(POINT,4326) ;' % ( table)) conn.execute(""" UPDATE %s SET geog = ST_SetSRID(ST_MakePoint(lon,lat),4326);""" % ( table)) conn.execute( 'CREATE INDEX {0}_geog ON {0} USING GIST (geog);'.format(table) ) from gwgen.evaluation import EvaluationPreparation self.__setup = False inv = self.eecra_inventory if not self.engine.has_table(inv.dbname): inv.write2db() conn = self.engine.connect() if 'geog' not in pd.read_sql_query('SELECT * FROM %s LIMIT 0' % ( inv.name), conn).columns: create_geog(inv.dbname) t = EvaluationPreparation.from_task(self) # download inventory t.download_src() ghcn = t.station_list ghcn = ghcn.ix[ghcn.vname == 'PRCP'].set_index('id') ghcn.to_sql('ghcn_inventory', self.engine, if_exists='replace') create_geog('ghcn_inventory') conn.execute("DROP TABLE IF EXISTS eecra_ghcn_map;") conn.execute(""" CREATE TABLE eecra_ghcn_map AS ( SELECT DISTINCT ON (id) id, station_id, dist FROM ( SELECT DISTINCT ON (a.station_id) b.id, a.station_id, ST_Distance(a.geog, b.geog) AS dist FROM eecra_inventory a INNER JOIN ghcn_inventory b ON ST_DWithin( a.geog, b.geog, 1000) ORDER BY a.station_id, ST_Distance(a.geog, b.geog)) foo ORDER BY id, dist);""") self.data = pd.read_sql('eecra_ghcn_map', self.engine, index_col='id') conn.close() if self.task_config.to_csv: self.write2file()
CloudInventoryConfig = namedtuple( 'CloudInventoryConfig', ['xstall'] + list(utils.TaskConfig._fields)) # to_db is set to True by default because it is required docstrings.delete_params('TaskConfig.parameters', 'to_db', 'setup_from') CloudInventoryConfig = utils.append_doc( CloudInventoryConfig, docstrings.get_sections(docstrings.dedents(""" Parameters ---------- xstall: bool or str If True (default), download the XSTALL file from %s. This file contains some estimates of station longitude and latitude. If ``False`` or empty string, the file is not used, otherwise, if set with a string, it is interpreted as the path to the local file %%(TaskConfig.parameters.no_to_db|setup_from)s """ % CloudInventory.http_xstall), 'CloudInventoryConfig')) @docstrings.dedent
[docs]def default_cloud_inventory_config(xstall=True, *args, **kwargs): """ Default config for :class:`CloudInventory` Parameters ---------- %(CloudInventoryConfig.parameters)s""" return CloudInventoryConfig( xstall, *utils.default_config(*args, **kwargs))
CloudGHCNMapConfig = namedtuple( 'CloudGHCNMapConfig', ['max_distance'] + list(utils.TaskConfig._fields)) # to_db is set to True by default because it is required docstrings.delete_params('TaskConfig.parameters', 'to_db', 'setup_from') CloudGHCNMapConfig = utils.append_doc( CloudGHCNMapConfig, docstrings.get_sections(docstrings.dedents(""" Parameters ---------- max_distance: float The maximum distance in meters for which we consider two stations as equal (Default: 1000m) %(TaskConfig.parameters.no_to_db|setup_from)s """), 'CloudGHCNMapConfig')) @docstrings.dedent
[docs]def default_cloud_ghcn_map_config(max_distance=1000., *args, **kwargs): """ Default config for :class:`CloudGHCNMap` Parameters ---------- %(CloudGHCNMapConfig.parameters)s""" return CloudGHCNMapConfig( max_distance, *utils.default_config(*args, **kwargs))