Source code for intake_erddap.erddap

"""Reader implementations for intake-erddap."""

from logging import getLogger
from typing import List, Union

import cf_pandas  # noqa: F401
import fsspec
import pandas as pd
import requests
import xarray as xr

from erddapy import ERDDAP
from intake.readers.readers import BaseReader


log = getLogger("intake-erddap")


class ERDDAPReader(BaseReader):
    """
    ERDDAP Reader (Base Class). This class represents the abstract base class
    for an intake data reader object for ERDDAP. Clients should use either
    ``TableDAPReader`` or ``GridDAPReader``.

    Parameters
    ----------

    dataset_id : str
        The unique datasetID value returned from ERDDAP.
    protocol : str
        Either `'griddap'` or `'tabledap'`.
    variables : list of str
    constraints : dict
        The query constraints to apply to TableDAP requests.
    metadata : dict
    erddap_client : class, optional
        The client object to use for connections to ERDDAP. Must conform to
        the `erddapy.ERDDAP` interface.
    http_client : class, optional
        The client object to use for HTTP requests. Must conform to the
        `requests` interface.
    open_kwargs : dict, optional
        Keyword arguments to pass on to the open function like `e.to_pandas`
        for a DataFrame. For example, {"parse_dates": True}

    Note
    ----
    Caches entire dataframe in memory.
    """

    output_instance = "xarray:Dataset"

    def get_client(
        self, server, protocol, dataset_id, variables, constraints, client=ERDDAP, **_
    ) -> ERDDAP:
        """Return an initialized ERDDAP Client."""
        e = client(server=server)
        e.protocol = protocol
        e.dataset_id = dataset_id
        e.variables = variables
        e.constraints = constraints
        return e


[docs] class TableDAPReader(ERDDAPReader): """Creates a Data Reader for an ERDDAP TableDAP Dataset. Parameters ---------- server : str URL to the ERDDAP service. Example: ``"https://coastwatch.pfeg.noaa.gov/erddap"`` Note ---- Do not include a trailing slash. dataset_id : str The dataset identifier from ERDDAP. variables : list of str, optional A list of variables to retrieve from the dataset. constraints : dict, optional A mapping of conditions and constraints. Example: ``{"time>=": "2022-01-02T12:00:00Z", "lon>": -140, "lon<": 0}`` metadata : dict, optional Additional metadata to include with the reader passed from the catalog. erddap_client : type, optional A class that implements an interface like erdappy's ERDDAP class. The reader will rely on this client to interface with ERDDAP for most requests. http_client : module or object, optional An object or module that implements an HTTP Client similar to request's interface. The reader will use this object to make HTTP requests to ERDDAP in some cases. mask_failed_qartod : bool, False WARNING ALPHA FEATURE. If True and `*_qc_agg` columns associated with data columns are available, data values associated with QARTOD flags other than 1 and 2 will be nan'ed out. Has not been thoroughly tested. dropna : bool, False. WARNING ALPHA FEATURE. If True, rows with data columns of nans will be dropped from data frame. Has not been thoroughly tested. cache_kwargs : dict, optional WARNING ALPHA FEATURE. If you want to have the data you access stored locally in a cache, use this keyword to input a dictionary of keywords. The cache is set up using ``fsspec``'s simple cache. Example configuration is ``cache_kwargs=dict(cache_storage="/tmp/fnames/", same_names=True)``. Examples -------- Readers are normally returned from a catalog object, but a Reader can be instantiated directly: >>> reader = TableDAPReader("https://erddap.senors.axds.co/erddap", ... "gov_usgs_waterdata_441759103261203") Getting a pandas DataFrame from the reader: >>> ds = reader.read() Once the dataset object has been instantiated, the dataset's full metadata is available in the reader. >>> reader.metadata {'info_url': 'https://erddap.sensors.axds.co/erddap/info/gov_usgs_waterdata_404513098181201...', 'catalog_dir': '', 'variables': {'time': {'_CoordinateAxisType': 'Time', 'actual_range': [1430828100.0, 1668079800.0], 'axis': 'T', 'ioos_category': 'Time', 'long_name': 'Time', 'standard_name': 'time', 'time_origin': '01-JAN-1970 00:00:00', 'units': 'seconds since 1970-01-01T00:00:00Z'}, ... """ output_instance = "pandas:DataFrame" def _read( self, server, dataset_id, variables=None, mask_failed_qartod=False, dropna=False, cache_kwargs=None, open_kwargs=None, constraints=None, **kw, ): open_kwargs = open_kwargs or {} variables = variables or [] kw.pop("protocol", None) protocol = kw.pop("protocol", "tabledap") # check for variables in user-input list that are not available for the dataset meta2 = self._get_dataset_metadata(server, dataset_id) variables_diff = set(variables) - set(meta2["variables"].keys()) if len(variables_diff) > 0: variables = [var for var in variables if var not in variables_diff] e = self.get_client( server, protocol, dataset_id, variables=variables, constraints=constraints or {}, **kw, ) if cache_kwargs is not None: if "response" in open_kwargs: response = open_kwargs["response"] open_kwargs.pop("response") url = e.get_download_url(response=response) else: url = e.get_download_url( response="csvp" ) # should this be the default or csv? try: with fsspec.open(f"simplecache://::{url}", **(cache_kwargs or {})) as f: dataframe: pd.DataFrame = pd.read_csv(f, **open_kwargs) except OSError as e: # might get file name too long print(e) print( "If your filenames are too long, input only a few variables" "to return or input into cache kwargs `same_names=False`" ) else: dataframe: pd.DataFrame = e.to_pandas( requests_kwargs={"timeout": 60}, **open_kwargs ) if mask_failed_qartod: dataframe = self.run_mask_failed_qartod(dataframe) if dropna: dataframe = self.run_dropna(dataframe) return dataframe @staticmethod def data_cols(df): """Columns that are not axes, coordinates, nor qc_agg columns.""" # find data columns which are what we'll use in the final step to drop nan's # don't include dimension/coordinates-type columns (dimcols) nor qc_agg columns (qccols) dimcols = df.cf.axes_cols + df.cf.coordinates_cols qccols = list(df.columns[df.columns.str.contains("_qc_agg")]) datacols = [col for col in df.columns if col not in dimcols + qccols] return datacols def run_mask_failed_qartod(self, df): """Nan data values for which corresponding qc_agg columns is not equal to 1 or 2. To get this to work you may need to specify the "qc_agg" columns to come along specifically in the variables input. """ # if a data column has an associated qc column, use it to weed out bad data by # setting it to nan. for datacol in self.data_cols(df): qccol = f"{datacol}_qc_agg" if qccol in df.columns: df.loc[~df[qccol].isin([1, 2]), datacol] = pd.NA df.drop(columns=[qccol], inplace=True) return df def run_dropna(self, df): """Drop nan rows based on the data columns.""" return df.dropna(subset=self.data_cols(df)) def _get_dataset_metadata(self, server, dataset_id) -> dict: """Fetch and return the metadata document for the dataset.""" url = f"{server}/info/{dataset_id}/index.json" resp = requests.get(url) resp.raise_for_status() metadata: dict = {"variables": {}} for rowtype, varname, attrname, dtype, value in resp.json()["table"]["rows"]: if rowtype != "attribute": continue try: value = self._parse_metadata_value(value=value, dtype=dtype) except ValueError: log.warning(f"could not convert {dtype} {varname}:{attrname} = {value}") continue if varname == "NC_GLOBAL": metadata[attrname] = value else: if varname not in metadata["variables"]: metadata["variables"][varname] = {} metadata["variables"][varname][attrname] = value return metadata def _parse_metadata_value( self, value: str, dtype: str ) -> Union[int, float, str, List[int], List[float]]: """Return the value from ERDDAPs metadata table parsed into a Python type.""" newvalue: Union[int, float, str, List[int], List[float]] = value if dtype in ("int", "double", "float") and "," in value: tmp = [i.strip() for i in value.split(",")] if dtype == "int": newvalue = [int(i) for i in tmp] if dtype in ("float", "double"): newvalue = [float(i) for i in tmp] elif dtype == "int": newvalue = int(value) elif dtype in ("float", "double"): newvalue = float(value) return newvalue
[docs] class GridDAPReader(ERDDAPReader): """Creates a Data Reader for an ERDDAP GridDAP Dataset. Parameters ---------- server : str URL to the ERDDAP service. Example: ``"https://coastwatch.pfeg.noaa.gov/erddap"`` Note ---- Do not include a trailing slash. dataset_id : str The dataset identifier from ERDDAP. constraints : dict, optional A mapping of conditions and constraints. chunks : None or int or dict or str, optional If chunks is provided, it is used to load the new dataset into dask arrays. chunks=-1 loads the dataset with dask using a single chunk for all arrays. chunks={} loads the dataset with dask using engine preferred chunks if exposed by the backend, otherwise with a single chunk for all arrays. chunks='auto' will use dask auto chunking taking into account the engine preferred chunks. See dask chunking for more details. xarray_kwargs : dict, optional Arguments to be passed to the xarray open_dataset function. Examples -------- Readers are normally returned from a catalog object, but a reader can be instantiated directly: >>> reader = GridDAPReader("https://coastwatch.pfeg.noaa.gov/erddap", "charmForecast1day", ... chunks={"time": 1}) Getting an xarray dataset from the reader object: >>> ds = reader.read() Once the dataset object has been instantiated, the dataset's full metadata is available in the reader. >>> reader.metadata {'catalog_dir': '', 'dims': {'time': 1182, 'latitude': 391, 'longitude': 351}, 'data_vars': {'pseudo_nitzschia': ['time', 'latitude', 'longitude'], 'particulate_domoic': ['time', 'latitude', 'longitude'], 'cellular_domoic': ['time', 'latitude', 'longitude'], 'chla_filled': ['time', 'latitude', 'longitude'], 'r555_filled': ['time', 'latitude', 'longitude'], 'r488_filled': ['time', 'latitude', 'longitude']}, 'coords': ('time', 'latitude', 'longitude'), 'acknowledgement': ... """ # def __init__( # self, # server: str, # dataset_id: str, # constraints: dict = None, # chunks: Union[None, int, dict, str] = None, # xarray_kwargs: dict = None, # **kwargs, # ): # self._server = server # self._chunks = chunks # self._constraints = constraints or {} # self._xarray_kwargs = xarray_kwargs or {} # # Initialized by the private getter _get_schema # self.urlpath = f"{server}/griddap/{dataset_id}" # # https://github.com/python/mypy/issues/6799 # kwargs.pop("protocol", None) # super().__init__(dataset_id=dataset_id, protocol="griddap", **kwargs) # type: ignore def _read( self, server: str, dataset_id: str, constraints: dict = None, chunks: Union[None, int, dict, str] = None, xarray_kwargs: dict = None, **kw, ): constraints = constraints or {} chunks = chunks or {} xarray_kwargs = xarray_kwargs or {} urlpath = f"{server}/griddap/{dataset_id}" ds = xr.open_dataset(urlpath, chunks=chunks, **xarray_kwargs) # _NCProperties is an internal property which xarray does not yet deal # with specially, so we remove it here to prevent it from causing # problems for clients. if "_NCProperties" in ds.attrs: del ds.attrs["_NCProperties"] return ds