Source code for terndata.ecoplots.ecoplots

"""Provide synchronous and asynchronous clients for the EcoPlots REST API.

This module exposes :class:`~terndata.ecoplots.ecoplots.EcoPlots` (sync) and
:class:`~terndata.ecoplots.ecoplots.AsyncEcoPlots` (async) for discovering,
filtering, previewing, and retrieving ecological plot data. Results can be
returned as ``pandas.DataFrame``/``geopandas.GeoDataFrame`` or raw GeoJSON.
Projects are saveable/loadable via ``.ecoproj`` files for reproducible workflows.

Examples:
    Synchronous:

    .. code-block:: python

        from terndata.ecoplots import EcoPlots

        ec = EcoPlots()
        ec.select(site_id="TCFTNS0002")
        gdf = ec.get_data()

    Asynchronous:

    .. code-block:: python

        from terndata.ecoplots import AsyncEcoPlots

        async def main():
            aec = AsyncEcoPlots()
            aec.select(site_id="TCFTNS0002")
            gdf = await aec.get_data()
            return gdf

        # In a script: asyncio.run(main())
        # In a notebook: await main()
"""

import asyncio
import io
from typing import Optional, Union, cast

import geopandas as gpd
import pandas as pd
from diskcache import Cache

from ._base import EcoPlotsBase
from ._config import CACHE_DIR, MATERIAL_SAMPLE_TYPE_MAP
from ._exceptions import EcoPlotsError
from ._gui import igsn_viewer, sample_image_viewer, spatial_selector
from ._utils import (
    _align_and_concat,
    _run_sync,
    _to_geopandas,
)


[docs] class EcoPlots(EcoPlotsBase): """High-level Python client for the EcoPlots REST API. Provides a small, Pythonic surface for **discovering**, **filtering**, **previewing**, and **retrieving** ecological plot data. Returns tidy structures for analysis (``pandas.DataFrame``, ``geopandas.GeoDataFrame``) or raw GeoJSON. The class mirrors the async runtime (`AsyncEcoPlots`) but is synchronous for notebook and script workflows. Projects can be serialised and reloaded via ``.ecoproj`` files to make analyses reproducible. Examples: Basic usage: .. code-block:: python from terndata.ecoplots import EcoPlots ecoplots = EcoPlots() ecoplots.get_datasources().head() # discover datasets ecoplots.select(site_id="TCFTNS0002") # add filters (validated & fuzzy-resolved) ecoplots.preview().head() # quick look (first page) df = ecoplots.get_data() # full pull as GeoDataFrame Save / load a project: .. code-block:: python path = ecoplots.save("myproject.ecoproj") ecoplots2 = EcoPlots.load(path) ecoplots2.get_filter("site_id") # ['TCFTNS0002'] See Also: :class:`~terndata.ecoplots.ecoplots.AsyncEcoPlots`: Async counterpart with the same surface area. """ def __init__( self, filterset: Optional[dict] = None, query_filters: Optional[dict] = None, mode: Optional[str] = "observations" ): """Initialise the EcoPlots client. All parameters default to empty/``None``; the typical workflow is to create the client first and then apply filters via :meth:`select`. Args: filterset: Initial filter set. Defaults to None. query_filters: Initial query filters. Defaults to None. mode: The mode of operation. Defaults to "observations". """ super().__init__(filterset=filterset, query_filters=query_filters, mode=mode)
[docs] def summary(self, dformat: Optional[str] = None) -> Union[pd.DataFrame, str]: """Summarize the EcoPlots data. Args: dformat: The desired format for the summary. If ``"json"``, returns the raw summary ``dict`` from the API. Defaults to ``None``, which returns a :class:`pandas.DataFrame`. Returns: When *dformat* is ``"json"``, returns the raw summary ``dict`` from the API. Otherwise, returns a :class:`pandas.DataFrame` with columns ``metric`` and ``count`` summarising the current selection (e.g. total observations, unique sites, datasets). """ data = self.summarise_data() if dformat == "json": return data if self._mode == "observations": pairs = {"observations": data["total_doc"], **data["unique_count"]} elif self._mode == "samples": pairs = {**data["unique_count"]} return pd.Series(pairs, name="count").rename_axis("metric").reset_index()
[docs] def preview(self, dformat: Optional[str] = None) -> Union[gpd.GeoDataFrame, dict, str]: """Fetch a small preview of EcoPlots data. # noqa: DAR401, D415 Mirrors :meth:`get_data` but limits results to 10 records for a quick look. In ``observations`` mode, fetches CSV from up to 2 feature types (5 rows each). In ``samples`` mode, calls the samples endpoint and returns the first 10 rows; ``"geojson"``/``"json"`` formats are not supported in this mode. Args: dformat: Output format. - ``"geojson"`` or ``"json"``: returns a GeoJSON dict (observations only). - ``"pandas"`` (or ``"pd"``): returns a :class:`pandas.DataFrame`. - ``"geopandas"`` (or ``"gpd"``) (default): returns a :class:`~geopandas.GeoDataFrame`. Returns: Preview data in the requested format. Raises: EcoPlotsError: If an invalid dformat is provided. RuntimeError: If no feature types found (observations mode). """ if self._mode == "samples": if dformat not in (None, "pandas", "geopandas", "pd", "gpd"): raise EcoPlotsError( "In 'samples' mode, supported dformat values are: " "'pandas' (or 'pd') and 'geopandas' (or 'gpd')." ) samples_gdf = cast(gpd.GeoDataFrame, _run_sync(self.fetch_samples_data())) samples_gdf = samples_gdf.head(10) if dformat in ("pandas", "pd"): return pd.DataFrame(samples_gdf) return samples_gdf if dformat in ("geojson", "json"): geojson_data = _run_sync(self.fetch_data(page_number=1, page_size=10)) return geojson_data if dformat not in (None, "pandas", "geopandas", "pd", "gpd"): raise EcoPlotsError( "Invalid 'dformat' specified. Supported values are: None, " "'pandas' (or 'pd'), 'geopandas' (or 'gpd'), 'geojson', and 'json'." ) # Same strategy as get_data(): fetch CSV per feature type, but limit to 2 feature types feature_types_df = self.get_feature_types() if "uri" not in feature_types_df.columns: raise RuntimeError("No feature types found; cannot preview data.") uris = feature_types_df["uri"].dropna().astype(str).tolist() if not uris: # No feature types found, return empty gdf return gpd.GeoDataFrame() # Limit to first 2 feature types for preview preview_uris = uris[:2] # Fetch paginated data (10 records per feature type) async def _fetch_preview(): tasks = [ self.fetch_data(page_number=1, page_size=5, dformat="csv", feature_type=[uri]) for uri in preview_uris ] return await asyncio.gather(*tasks, return_exceptions=True) csv_payloads = _run_sync(_fetch_preview()) dfs = [] for payload in csv_payloads: if isinstance(payload, BaseException): raise payload csv_bytes = cast(bytes, payload) df = pd.read_csv(io.StringIO(csv_bytes.decode("utf-8"))) dfs.append(df) aligned_df = _align_and_concat(dfs) # Limit preview to 10 records maximum if len(aligned_df) > 10: aligned_df = aligned_df.head(10) if dformat in ("pandas", "pd"): return aligned_df return _to_geopandas(aligned_df)
[docs] def get_datasources(self) -> pd.DataFrame: """Get the data sources available for applied filters. Returns: A DataFrame containing the data sources. """ if self._mode == "observations": data = self.discover("dataset") elif self._mode == "samples": # Hardcoded value for now until # we have more datasets with samples data = self.discover_samples("dataset") else: raise EcoPlotsError(f"Unsupported mode '{self._mode}' for discovering data sources.") return pd.DataFrame(data)
[docs] def get_datasources_attributes(self) -> pd.DataFrame: """Get the attributes of data sources from the applied filters. Returns: A DataFrame containing the attributes of the data sources. """ data = self.discover_attributes("dataset") uris = data.get("dataset_attributes", []) or [] rows = [] with Cache(CACHE_DIR) as cache: ds_map = cache.get("attributes", {}) or {} for uri in uris: val = ds_map.get(uri) row = {"key": val, "uri": uri} rows.append(row) return pd.DataFrame(rows)
[docs] def get_sites(self) -> pd.DataFrame: """Get the sites from the applied filters. Returns: A DataFrame containing the sites. """ if self._mode == "samples": data = self.discover_samples("site_id") else: data = self.discover("site_id") return pd.DataFrame(data)
[docs] def get_sites_attributes(self) -> pd.DataFrame: """Get the attributes of sites from the applied filters. Returns: A DataFrame containing the attributes of the sites. """ data = self.discover_attributes("site") uris = data.get("site_attributes", []) or [] rows = [] with Cache(CACHE_DIR) as cache: site_map = cache.get("attributes", {}) or {} for uri in uris: val = site_map.get(uri) row = {"key": val, "uri": uri} rows.append(row) return pd.DataFrame(rows)
[docs] def get_site_visit_attributes(self) -> pd.DataFrame: """Get the attributes of site visits from the applied filters. Returns: A DataFrame containing the attributes of the site visits. """ data = self.discover_attributes("site_visit") uris = data.get("site_visit_attributes", []) or [] rows = [] with Cache(CACHE_DIR) as cache: sv_map = cache.get("attributes", {}) or {} for uri in uris: val = sv_map.get(uri) row = {"key": val, "uri": uri} rows.append(row) return pd.DataFrame(rows)
[docs] def get_region_types(self) -> pd.DataFrame: """Get the available region types from the applied filters. Returns: A DataFrame containing the region types. """ if self._mode == "samples": # For samples, we have a fixed region type of "plot" data = self.discover_samples("region_type") else: data = self.discover("region_type") return pd.DataFrame(data)
[docs] def get_regions(self, region_type: str) -> pd.DataFrame: """Get the available regions for a specific region type from the applied filters. Args: region_type: The region type to retrieve regions for. Returns: A DataFrame containing the regions for the specified region type. """ if self._mode == "samples": # For samples, we have a fixed region type of "plot", so we ignore the input and use "plot" data = self.discover_samples("region", region_type=region_type) else: data = self.discover("region", region_type=region_type) return pd.DataFrame(data)
[docs] def get_feature_types(self) -> pd.DataFrame: """Get the feature types from the applied filters. Returns: A DataFrame containing the feature types. """ data = self.discover("feature_type") return pd.DataFrame(data)
[docs] def get_observed_properties(self) -> pd.DataFrame: """Get the observed properties from the applied filters. Returns: A DataFrame containing the observed properties. """ data = self.discover("observed_property") return pd.DataFrame(data)
[docs] def get_used_procedures(self) -> pd.DataFrame: """Get the used procedures available for the current filters. Available in both ``observations`` and ``samples`` modes. Returns: A DataFrame containing the used procedures. """ if self._mode == "samples": data = self.discover_samples("used_procedure") else: data = self.discover("used_procedure") return pd.DataFrame(data)
[docs] def get_observation_attributes(self) -> pd.DataFrame: """Get the attributes of observations from the applied filters. Available only in "observations" mode. Returns: A DataFrame containing the attributes of the observations. Raises: EcoPlotsError: If called in a mode other than "observations". """ if self._mode != "observations": raise EcoPlotsError("Observation attributes are only available in 'observations' mode.") data = self.discover_attributes("observation") uris = data.get("observation_attributes", []) or [] rows = [] with Cache(CACHE_DIR) as cache: obs_map = cache.get("attributes", {}) or {} for uri in uris: val = obs_map.get(uri) row = {"key": val, "uri": uri} rows.append(row) return pd.DataFrame(rows)
[docs] def get_material_sample_types(self) -> pd.DataFrame: """Get the material sample types from the applied filters. Available only in "samples" mode. Returns: A DataFrame containing the material sample types. Raises: EcoPlotsError: If called in a mode other than "samples". """ if self._mode != "samples": raise EcoPlotsError("Material sample types are only available in 'samples' mode.") data = self.discover_samples("material_sample_type") return pd.DataFrame(data)
[docs] def get_sample_igsn(self) -> pd.DataFrame: """Get sample names and derived IGSN values. Available only in ``samples`` mode. This method discovers ``sample_name`` values using the current query filters, then returns a DataFrame with: - ``sample_name``: sample name with alphabetic characters capitalized. - ``igsn``: derived as ``10.60792/{sample_name_raw}``. Returns: A DataFrame with columns ``sample_name`` and ``igsn``. Raises: EcoPlotsError: If called in a mode other than ``samples``. """ if self._mode != "samples": raise EcoPlotsError("Sample IGSN lookup is only available in 'samples' mode.") data = self.discover_samples("sample_name") rows = [] for item in data: if not isinstance(item, dict): continue sample_name_raw = item.get("key") if not isinstance(sample_name_raw, str) or not sample_name_raw: continue sample_name = "".join( ch.upper() if ch.isalpha() else ch for ch in sample_name_raw ) rows.append( { "sample_name": sample_name, "igsn": f"10.60792/{sample_name_raw}", } ) return pd.DataFrame(rows, columns=["sample_name", "igsn"])
[docs] def view_sample_igsn(self, igsn: Optional[str] = None): """Open an interactive notebook viewer for sample IGSN DOI pages. Available only in ``samples`` mode. This method discovers sample names, builds IGSN values, and displays either: - a dropdown + iframe widget (default), or - a single iframe for a provided IGSN/DOI value. Args: igsn: Optional IGSN value or DOI URL. Accepted inputs include ``10.60792/...``, ``doi.org/10.60792/...``, and ``https://doi.org/10.60792/...``. Returns: ipywidgets.VBox: Interactive IGSN viewer widget. Raises: EcoPlotsError: If called in a mode other than ``samples``. EcoPlotsError: If no material sample type is selected. """ if self._mode != "samples": raise EcoPlotsError("IGSN viewer is only available in 'samples' mode.") self._ensure_required_material_sample_types( list(MATERIAL_SAMPLE_TYPE_MAP.values()), "IGSN viewer", ) # FIXME: aggregation requests sent to Elasticsearch with an empty query # result in a 404 response. Which is unlike any other discovery facet. # The root cause is unknown and needs to be investigated. igsn_df = self.get_sample_igsn() return igsn_viewer(igsn_df, igsn=igsn)
[docs] def get_soil_depth_range(self) -> gpd.GeoDataFrame: """Get the soil depth range for the current filters. Available only in "samples" mode. Returns: A GeoDataFrame containing aggregated soil depth range values. Raises: EcoPlotsError: If called in a mode other than "samples". EcoPlotsError: If none of the required material sample types are selected. """ if self._mode != "samples": raise EcoPlotsError("Soil depth range is only available in 'samples' mode.") self._ensure_required_material_sample_types( ["Soil Subsite Sample", "Soil Pit Sample"], "Soil depth range", ) return cast(gpd.GeoDataFrame, self.discover_soil_depth_range())
[docs] def get_soilpit(self) -> pd.DataFrame: """Get soil pit distribution for the current filters. Available only in "samples" mode. Returns: A DataFrame with two columns: ``soilpit`` and ``counts``. Raises: EcoPlotsError: If called in a mode other than "samples". EcoPlotsError: If none of the required material sample types are selected. """ if self._mode != "samples": raise EcoPlotsError("Soil pit distribution is only available in 'samples' mode.") self._ensure_required_material_sample_types( ["Soil Metagenomic Sample", "Soil Subsite Sample"], "Soil pit distribution", ) return cast(pd.DataFrame, self.discover_soilpit())
[docs] def get_speciesname(self) -> pd.DataFrame: """Get species name distribution for the current filters. Available only in "samples" mode. This method preserves all current query filters, including ``has_image``. Returns: A DataFrame with two columns: ``speciesname`` and ``count``. Raises: EcoPlotsError: If called in a mode other than "samples". EcoPlotsError: If none of the required material sample types are selected. """ if self._mode != "samples": raise EcoPlotsError("Species distribution is only available in 'samples' mode.") self._ensure_required_material_sample_types( ["Plant Tissue Sample", "Plant Voucher Specimen"], "Species distribution", ) return cast(pd.DataFrame, self.discover_species())
[docs] def get_data( self, allow_full_download: Optional[bool] = False, dformat: Optional[str] = "gpd", ) -> gpd.GeoDataFrame: """Retrieve EcoPlots data based on the current filters. Args: allow_full_download: If True, allows downloading the full dataset without filters. Defaults to False. dformat: Output format. - "geojson" or "json": returns a pretty-printed GeoJSON string. - "pandas" (or 'pd'): returns a pandas DataFrame. - "geopandas" (or 'gpd') (default): returns a GeoDataFrame. In "samples" mode, only "pandas"/"pd" and "geopandas"/"gpd" are supported (no "geojson"/"json"). In "samples" mode, exactly one ``material_sample_type`` must be selected at a time. Raises: RuntimeError: If no filters are set and allow_full_download is False. EcoPlotsError: If an invalid dformat is provided. Returns: Data in the requested format. """ if self._mode == "samples": if dformat not in ("pandas", "geopandas", "pd", "gpd"): raise EcoPlotsError( "In 'samples' mode, supported dformat values are: " "'pandas' (or 'pd') and 'geopandas' (or 'gpd')." ) samples_gdf = cast(gpd.GeoDataFrame, _run_sync(self.fetch_samples_data())) if dformat in ("pandas", "pd"): return pd.DataFrame(samples_gdf) return samples_gdf if not self._query_filters and not allow_full_download: raise RuntimeError( "No filters specified! Downloading full EcoPlots dataset " "can crash your environment. Proceed with caution!\n" "If you are sure, call get_data(allow_full_download=True)." ) if dformat in ("geojson", "json"): data = _run_sync(self.fetch_data()) return data if dformat not in ("pandas", "geopandas", "pd", "gpd"): raise EcoPlotsError( "Invalid 'dformat' specified. Supported values are: None, " "'pandas' (or 'pd'), 'geopandas' (or 'gpd'), 'geojson', and 'json'." ) feature_types_df = self.get_feature_types() if "uri" not in feature_types_df.columns: raise RuntimeError("No feature types found; cannot fetch data.") uris = feature_types_df["uri"].dropna().astype(str).tolist() if not uris: # No feature types found, so no data to fetch; # return empty gdf return gpd.GeoDataFrame() dfs = [] for uri in uris: csv_bytes = cast(bytes, _run_sync(self.fetch_data(dformat="csv", feature_type=[uri]))) df = pd.read_csv(io.StringIO(csv_bytes.decode("utf-8"))) dfs.append(df) aligned_df = _align_and_concat(dfs) if dformat in ("pandas", "pd"): return aligned_df return _to_geopandas(aligned_df)
[docs] def select_spatial(self, **kwargs): """Open the spatial selection widget. A minimal map based spatial selector, similar to spatial selection tool in EcoPlots Portal. Args: **kwargs: Additional keyword arguments to pass to the widget. Returns: ipywidgets.VBox: The widget. Use it in a notebook cell to display. """ return spatial_selector(self, **kwargs)
[docs] def view_sample_images( self, data: Optional[pd.DataFrame] = None, image_column: str = "sample_images", sample_id_column: str = "sample_id", sample_name_column: str = "sample_name", scientific_name_column: str = "scientific_name", ): """Open an interactive notebook image browser for sample images. Args: data: Optional DataFrame to browse. If omitted, data is fetched in samples mode using current filters. image_column: Name of image column in dataframe. sample_id_column: Name of sample identifier column in dataframe. sample_name_column: Name of sample name column in dataframe. scientific_name_column: Name of scientific name column in dataframe. Returns: ipywidgets.VBox: Interactive viewer widget. """ if data is None: if self._mode != "samples": raise EcoPlotsError("Sample image viewer is only available in 'samples' mode.") has_image_selected = bool(self._query_filters.get("has_image", False)) if not has_image_selected: raise EcoPlotsError( "To inspect images, set has_image via select(), for example: " "select(has_image=True)." ) plant_voucher_uri = ( "http://linked.data.gov.au/def/tern-cv/18317af1-7c83-468d-883e-ba791500c6e3" ) selected_mst = self._query_filters.get("material_sample_type", []) if plant_voucher_uri not in selected_mst: raise EcoPlotsError( "Image viewer currently requires material_sample_type to be " "'Plant Voucher Specimen'. Please select it before viewing images." ) fetched = self.get_data(dformat="pd") if asyncio.iscoroutine(fetched): fetched = _run_sync(fetched) data = cast(pd.DataFrame, fetched) if not isinstance(data, pd.DataFrame): data = pd.DataFrame(data) return sample_image_viewer( data, image_column=image_column, sample_id_column=sample_id_column, sample_name_column=sample_name_column, scientific_name_column=scientific_name_column, )
[docs] class AsyncEcoPlots(EcoPlots): """High-level **async** client for the EcoPlots REST API. Provides an awaitable `get_data()` for large/long-running fetches while reusing the synchronous ergonomics elsewhere. Ideal for web backends (ASGI) or notebooks wanting to parallelise I/O heavy pulls. Examples: Basic async usage: .. code-block:: python from terndata.ecoplots import AsyncEcoPlots ec = AsyncEcoPlots() ec.select(site_id="TCFTNS0002") # selection etc. is sync but cheap gdf = await ec.get_data() # await the heavy network call Notes: - Only `get_data()` is async here. Other methods inherited from `EcoPlots` are synchronous and **will block**. - Safety guard: `get_data()` raises `RuntimeError` when no filters are set unless `allow_full_download=True`. """ def __init__( self, filterset: Optional[dict] = None, query_filters: Optional[dict] = None, mode: Optional[str] = "observations" ): """Initialise the AsyncEcoPlots client. All parameters default to empty/``None``; the typical workflow is to create the client first and then apply filters via :meth:`select`. Args: filterset: Initial filter set. Defaults to None. query_filters: Initial query filters. Defaults to None. mode: The mode of operation. Defaults to "observations". """ super().__init__(filterset=filterset, query_filters=query_filters, mode=mode)
[docs] async def get_data( self, allow_full_download: Optional[bool] = False, dformat: Optional[str] = "gpd", ) -> gpd.GeoDataFrame: # noqa: DAR401 """Retrieve EcoPlots data asynchronously based on the current filters. Args: allow_full_download: If True, allows downloading the full dataset without filters. Defaults to False. dformat: Output format. - "geojson" or "json": returns a pretty-printed GeoJSON string. - "pandas" (or "pd"): returns a pandas DataFrame. - "geopandas" (or "gpd") (default): returns a GeoDataFrame. In "samples" mode, only "pandas"/"pd" and "geopandas"/"gpd" are supported (no "geojson"/"json"). In "samples" mode, exactly one ``material_sample_type`` must be selected at a time. Raises: RuntimeError: If no filters are set and allow_full_download is False. EcoPlotsError: If an invalid dformat is provided. BaseException: Propagated from underlying fetch tasks when data retrieval fails. #noqa: DAR402 Returns: Data in the requested format. """ if self._mode == "samples": if dformat not in ("pandas", "geopandas", "pd", "gpd"): raise EcoPlotsError( "In 'samples' mode, supported dformat values are: " "'pandas' (or 'pd') and 'geopandas' (or 'gpd')." ) samples_gdf = cast(gpd.GeoDataFrame, await self.fetch_samples_data()) if dformat in ("pandas", "pd"): return pd.DataFrame(samples_gdf) return samples_gdf if not self._filters and not allow_full_download: raise RuntimeError( "No filters specified! Downloading full EcoPlots dataset " "can crash your environment. Proceed with caution!\n" "If you are sure, call get_data(allow_full_download=True)." ) if dformat in ("geojson", "json"): data = await self.fetch_data() return data if dformat not in (None, "pandas", "geopandas", "pd", "gpd"): raise EcoPlotsError( "Invalid 'dformat' specified. Supported values are: None, " "'pandas' (or 'pd'), 'geopandas' (or 'gpd'), 'geojson', and 'json'." ) # for pandas/geopandas output, we request one csv per feature type and merge feature_types_df = self.get_feature_types() if "uri" not in feature_types_df.columns: raise RuntimeError("No feature types found; cannot fetch data.") uris = feature_types_df["uri"].dropna().astype(str).tolist() if not uris: # No feature types found, so no data to fetch; # return empty gdf return gpd.GeoDataFrame() tasks = [self.fetch_data(dformat="csv", feature_type=[uri]) for uri in uris] csv_payloads = await asyncio.gather(*tasks, return_exceptions=True) dfs = [] for payload in csv_payloads: if isinstance(payload, BaseException): raise payload csv_bytes = cast(bytes, payload) df = pd.read_csv(io.StringIO(csv_bytes.decode("utf-8"))) dfs.append(df) aligned_df = _align_and_concat(dfs) if dformat in ("pandas", "pd"): return aligned_df return _to_geopandas(aligned_df)