Source code for intake_erddap.erddap_cat

"""Catalog implementation for intake-erddap."""

from copy import deepcopy
from datetime import datetime
from logging import getLogger
from typing import (
    Dict,
    List,
    Mapping,
    MutableMapping,
    Optional,
    Sequence,
    Tuple,
    Type,
    Union,
)
from urllib.error import HTTPError

import pandas as pd
import requests

from erddapy import ERDDAP
from intake.readers.entry import Catalog, DataDescription
from intake.readers.readers import BaseReader

from intake_erddap.cache import CacheStore

from . import utils
from .utils import match_key_to_category


log = getLogger("intake-erddap")


[docs] class ERDDAPCatalogReader(BaseReader): """ Makes data sources out of all datasets the given ERDDAP service Parameters ---------- server : str URL to the ERDDAP service. Example: ``"https://coastwatch.pfeg.noaa.gov/erddap"`` Note ---- Do not include a trailing slash. bbox : tuple of 4 floats, optional For explicit geographic search queries, pass a tuple of four floats in the `bbox` argument. The bounding box parameters are `(min_lon, min_lat, max_lon, max_lat)`. standard_names : list of str, optional For explicit search queries for datasets containing a given standard_name use this argument. Example: `["air_temperature", "air_pressure"]`. variable_names: list of str, optional For explicit search queries for datasets containing a variable with a given name. This can be useful when the client knows of a particular variable name or a convention applied where there is no CF standard name. start_time : datetime, optional For explicit search queries for datasets that contain data after `start_time`. end_time : datetime, optional For explicit search queries for datasets that contain data before `end_time`. search_for : list of str, optional For explicit search queries for datasets that any contain of the terms specified in this keyword argument. kwargs_search : dict, optional Keyword arguments to input to search on the server before making the catalog. Options are: - to search by bounding box: include all of min_lon, max_lon, min_lat, max_lat: (int, float) Longitudes must be between -180 to +180. - to search within a datetime range: include both of min_time, max_time: interpretable datetime string, e.g., "2021-1-1" - to search using a textual keyword: include `search_for` as either a string or a list of strings. Multiple values will be searched individually and combined in the final catalog results. category_search : list, tuple, optional Use this to narrow search by ERDDAP category. The syntax is `(category, key)`, e.g. ("standard_name", "temp"). `category` is the ERDDAP category for filtering results. Good choices for selecting variables are "standard_name" and "variableName". `key` is the custom_criteria key to narrow the search by, which will be matched to the category results using the custom_criteria that must be set up or input by the user, with `cf-pandas`. Currently only a single key can be matched at a time. use_source_constraints : bool, default True Any relevant search parameter defined in kwargs_search will be passed to the source objects as constraints. protocol : str, default "tabledap" One of the two supported ERDDAP Data Access Protocols: "griddap", or "tabledap". "tabledap" will present tabular datasets using pandas, meanwhile "griddap" will use xarray. chunks : dict, optional For griddap protocol, pass a dictionary of chunk sizes for the xarray. xarray_kwargs : dict, optional For griddap protocol, pass a dictionary of kwargs to pass to the xarray.open_dataset method. metadata : dict, optional Extra metadata for the intake catalog. variables : list of str, optional List of variables to limit the dataset to, if available. If you're not sure what variables are available, check info_url for the station, or look up the dataset on the ERDDAP server. query_type : str, default "union" Specifies how the catalog should apply the query parameters. Choices are ``"union"`` or ``"intersection"``. If the ``query_type`` is set to ``"intersection"``, then the set of results will be the intersection of each individual query made to ERDDAP. This is equivalent to a logical AND of the results. If the value is ``"union"`` then the results will be the union of each resulting dataset. This is equivalent to a logical OR. open_kwargs : dict, optional Keyword arguments to pass to the `open` method of the ERDDAP Reader, e.g. pandas read_csv. Response is an optional keyword argument that will be used by ERDDAPY to determine the response format. Default is "csvp" and for TableDAP Readers, "csv" and "csv0" are reasonable choices too. mask_failed_qartod : bool, False WARNING ALPHA FEATURE. If True and `*_qc_agg` columns associated with data columns are available, data values associated with QARTOD flags other than 1 and 2 will be nan'ed out. Has not been thoroughly tested. dropna : bool, False. WARNING ALPHA FEATURE. If True, rows with data columns of nans will be dropped from data frame. Has not been thoroughly tested. cache_kwargs : dict, optional WARNING ALPHA FEATURE. If you want to have the data you access stored locally in a cache, use this keyword to input a dictionary of keywords. The cache is set up using ``fsspec``'s simple cache. Example configuration is ``cache_kwargs=dict(cache_storage="/tmp/fnames/", same_names=True)``. Attributes ---------- search_url : str If a search is performed on the ERDDAP server, the search url is saved as an attribute. server : str The Base URL of the ERDDAP instance. """ name = "erddap_cat" output_instance = "intake.readers.entry:Catalog"
[docs] def __init__( self, server: str, bbox: Optional[Tuple[float, float, float, float]] = None, standard_names: Optional[List[str]] = None, variable_names: Optional[List[str]] = None, start_time: Optional[Union[datetime, str]] = None, end_time: Optional[Union[datetime, str]] = None, search_for: Optional[List[str]] = None, kwargs_search: MutableMapping[ str, Union[str, int, float, Sequence[str]] ] = None, category_search: Optional[Tuple[str, str]] = None, erddap_client: Optional[Type[ERDDAP]] = None, use_source_constraints: bool = True, protocol: str = "tabledap", chunks: Optional[dict] = None, xarray_kwargs: Optional[dict] = None, metadata: dict = None, variables: list = None, query_type: str = "union", cache_period: Optional[Union[int, float]] = 500, open_kwargs: dict = None, mask_failed_qartod: bool = False, dropna: bool = False, cache_kwargs: Optional[dict] = None, **kwargs, ): if server.endswith("/"): server = server[:-1] self._erddap_client = erddap_client or ERDDAP self._entries: Dict[str, Catalog] = {} self._use_source_constraints = use_source_constraints self._protocol = protocol self._chunks = chunks self._xarray_kwargs = xarray_kwargs self._dataset_metadata: Optional[Mapping[str, dict]] = None self._query_type = query_type self.server = server self.search_url = None self.cache_store = CacheStore(cache_period=cache_period) self.open_kwargs = open_kwargs or {} self._mask_failed_qartod = mask_failed_qartod self._dropna = dropna self._cache_kwargs = cache_kwargs if variables is not None: variables = ["time", "latitude", "longitude", "z"] + variables self.variables = variables chunks = chunks or {} xarray_kwargs = xarray_kwargs or {} if kwargs_search is not None: checks = [ ["min_lon", "max_lon", "min_lat", "max_lat"], ["min_time", "max_time"], ] for check in checks: if any(key in kwargs_search for key in check) and not all( key in kwargs_search for key in check ): raise ValueError( f"If any of {check} are input, they all must be input." ) else: kwargs_search = {} # Use deepcopy so we don't mangle objects passed in from clients self.kwargs_search = deepcopy(kwargs_search) if bbox is not None: if not isinstance(bbox, tuple): raise TypeError( f"Expecting a tuple of four floats for argument bbox: {type(bbox)}" ) if len(bbox) != 4: raise ValueError("bbox argument requires a tuple of four floats") self.kwargs_search["min_lon"] = bbox[0] self.kwargs_search["min_lat"] = bbox[1] self.kwargs_search["max_lon"] = bbox[2] self.kwargs_search["max_lat"] = bbox[3] if standard_names is not None: if not isinstance(standard_names, (list, tuple)): raise TypeError( f"Expecting list of strings for standard_names argument: {repr(standard_names)}" ) self.kwargs_search["standard_name"] = standard_names if variable_names is not None: if not isinstance(variable_names, (list, tuple)): raise TypeError( f"Expecting list of strings for variable_names argument: {repr(variable_names)}" ) self.kwargs_search["variableName"] = variable_names if start_time is not None: if not isinstance(start_time, (str, datetime)): raise TypeError( f"Expecting a datetime for start_time argument: {repr(start_time)}" ) if isinstance(start_time, str): start_time = datetime.strptime(start_time, "%Y-%m-%dT%H:%M:%SZ") self.kwargs_search["min_time"] = f"{start_time:%Y-%m-%dT%H:%M:%SZ}" if end_time is not None: if not isinstance(end_time, (str, datetime)): raise TypeError( f"Expecting a datetime for end_time argument: {repr(end_time)}" ) if isinstance(end_time, str): end_time = datetime.strptime(end_time, "%Y-%m-%dT%H:%M:%SZ") self.kwargs_search["max_time"] = f"{end_time:%Y-%m-%dT%H:%M:%SZ}" if search_for is not None: if not isinstance(search_for, (list, tuple)): raise TypeError( f"Expecting list of strings for search_for argument: {repr(search_for)}" ) self.kwargs_search["search_for"] = search_for if category_search is not None: category, key = category_search # Currently just take first match, but there could be more than one. self.kwargs_search[category] = match_key_to_category( self.server, key, category, cache_store=self.cache_store ) metadata = metadata or {} metadata["kwargs_search"] = self.kwargs_search # Clear the cache of old stale data on initialization self.cache_store.clear_cache(cache_period) super(ERDDAPCatalogReader, self).__init__(metadata=metadata, **kwargs)
def _load_df(self) -> pd.DataFrame: frames = [] for url in self.get_search_urls(): try: df = self.cache_store.read_csv(url) except HTTPError as e: if e.code == 404: log.warning(f"search {url} returned HTTP 404") df = pd.DataFrame({"datasetID": []}) else: raise except requests.exceptions.HTTPError as e: if e.response.status_code == 404: log.warning(f"search {url} returned HTTP 404") df = pd.DataFrame({"datasetID": []}) else: raise df.rename(columns={"Dataset ID": "datasetID"}, inplace=True) frames.append(df) if self._query_type == "union": result = pd.concat(frames) result = result.drop_duplicates("datasetID") return result elif self._query_type == "intersection": result = None for frame in frames: if result is None: result = frame else: result = result.merge(frame, how="inner", on="datasetID") return result else: raise ValueError(f"_query_type is unexpected value: {self._query_type}") def _load_metadata(self) -> Mapping[str, dict]: """Returns all of the dataset metadata available from allDatasets API.""" if self._dataset_metadata is None: self._dataset_metadata = utils.get_erddap_metadata( self.server, self.kwargs_search, cache_store=self.cache_store ) return self._dataset_metadata def get_search_urls(self) -> List[str]: """Return the search URLs used in generating the catalog.""" e = self.get_client() urls = [] # cases: # - ks.standard_name is a list # - variableName is a list # - both are lists # Generalize approach: if either are defined, set to list and iterate if not any( [ i in self.kwargs_search for i in ("standard_name", "variableName", "search_for") ] ): search_url = e.get_search_url( response="csv", **self.kwargs_search, items_per_page=100000, ) return [search_url] if "standard_name" in self.kwargs_search: urls.extend( self._get_standard_name_search_urls( utils.as_a_list(self.kwargs_search["standard_name"]) ) ) if "variableName" in self.kwargs_search: urls.extend( self._get_variable_name_search_urls( utils.as_a_list(self.kwargs_search["variableName"]) ) ) if "search_for" in self.kwargs_search: urls.extend( self._get_search_for_search_urls( utils.as_a_list(self.kwargs_search["search_for"]) ) ) return urls def _get_standard_name_search_urls(self, standard_names: List[str]) -> List[str]: """Return the search urls for each standard_name.""" e = self.get_client() urls = [] # mypy is annoying sometimes. assert isinstance(self.kwargs_search, dict) for standard_name in standard_names: params = self.kwargs_search.copy() params.pop("variableName", None) params.pop("search_for", None) params["standard_name"] = standard_name search_url = e.get_search_url( response="csv", **params, items_per_page=100000, ) urls.append(search_url) return urls def _get_variable_name_search_urls(self, variable_names: List[str]) -> List[str]: """Return the search urls for each variable name.""" e = self.get_client() urls = [] # mypy is annoying sometimes. assert isinstance(self.kwargs_search, dict) for variable_name in variable_names: params = self.kwargs_search.copy() params.pop("standard_name", None) params.pop("search_for", None) params["variableName"] = variable_name search_url = e.get_search_url( response="csv", **params, items_per_page=100000, ) urls.append(search_url) return urls def _get_search_for_search_urls(self, search_for: List[str]) -> List[str]: """Return the search urls for each search query.""" e = self.get_client() urls = [] # mypy is annoying sometimes. assert isinstance(self.kwargs_search, dict) for query in search_for: params = self.kwargs_search.copy() params.pop("standard_name", None) params.pop("variableName", None) # query is passed directly in method invocation below params.pop("search_for", None) search_url = e.get_search_url( response="csv", search_for=query, **params, items_per_page=100000, ) urls.append(search_url) return urls def get_client(self) -> ERDDAP: """Return an initialized ERDDAP Client.""" e = self._erddap_client(self.server) e.protocol = self._protocol e.dataset_id = "allDatasets" return e def read(self): dataidkey = "datasetID" e = self.get_client() df = self._load_df() all_metadata = self._load_metadata() self._entries = {} # Remove datasets that are redundant if len(df) > 0: df = df[ (~df["datasetID"].str.startswith("ism-")) * (df["datasetID"] != "allDatasets") ] entries, aliases = {}, {} for index, row in df.iterrows(): dataset_id = row[dataidkey] metadata = all_metadata.get(dataset_id, {}) args = { "server": self.server, "dataset_id": dataset_id, "variables": self.variables, "protocol": self._protocol, "constraints": {}, "open_kwargs": self.open_kwargs, } if self._protocol == "tabledap": args.update( { "mask_failed_qartod": self._mask_failed_qartod, "dropna": self._dropna, "cache_kwargs": self._cache_kwargs, } ) args["constraints"].update(self._get_tabledap_constraints()) datatype = "intake_erddap.erddap:TableDAPReader" elif self._protocol == "griddap": args.update( { "chunks": self._chunks, "xarray_kwargs": self._xarray_kwargs, } ) # no equivalent for griddap, though maybe it works the same? args["constraints"].update(self._get_tabledap_constraints()) datatype = "intake_erddap.erddap:GridDAPReader" else: raise ValueError(f"Unsupported protocol: {self._protocol}") metadata["info_url"] = e.get_info_url(response="csv", dataset_id=dataset_id) entries[dataset_id] = DataDescription( datatype, kwargs={"dataset_id": dataset_id, **args}, metadata=metadata, ) aliases[dataset_id] = dataset_id cat = Catalog( data=entries, aliases=aliases, ) return cat def _get_tabledap_constraints(self) -> Dict[str, Union[str, int, float]]: """Return the constraints dictionary for a tabledap Reader.""" result = {} if self._use_source_constraints and "min_time" in self.kwargs_search: min_time = self.kwargs_search["min_time"] if isinstance(min_time, (str, int, float)): result["time>="] = min_time if self._use_source_constraints and "max_time" in self.kwargs_search: max_time = self.kwargs_search["max_time"] if isinstance(max_time, (str, int, float)): result["time<="] = max_time return result