import copy
import hashlib
import struct
import sys
import tempfile
import warnings
from collections.abc import Iterable
from concurrent.futures import ThreadPoolExecutor, as_completed
from diskcache import Cache
from pathlib import Path
from typing import (
Optional,
TypeVar,
Union,
)
import aiohttp
import orjson
import requests
from rapidfuzz import fuzz, process
from ._config import (
API_BASE_URL,
DISCOVERY_ATTRIBUTES,
DISCOVERY_FACETS,
MAGIC,
QUERY_FACETS,
SAMPLE_QUERY_FACETS,
VERSION,
CACHE_DIR,
MATERIAL_SAMPLE_TYPE_MAP,
)
from ._exceptions import EcoPlotsError
from ._nlp_utils import (
resolve_facet,
resolve_region_type,
validate_facet,
)
from ._utils import _ensure_ecoproj_path, _parse_date, _validate_spatial_input
SelfType = TypeVar("SelfType", bound="EcoPlotsBase")
[docs]
class EcoPlotsBase:
"""Base class providing internal filter management and API query building.
This class implements the low-level mechanics for holding, validating and
converting user-supplied filters into API-ready query filters. It is
intended for internal use by ecoplots subclasses and is not part of the
user-facing public API.
Attributes:
_filters: Human/canonical filter values as provided.
_query_filters: API-ready filter values (usually URLs).
"""
def __init__(
self,
filterset: Optional[dict] = None,
query_filters: Optional[dict] = None,
mode: str = "observations",
):
"""Initialize an EcoPlotsBase instance.
The constructor only sets up internal state. Validation of filters is
performed lazily by calls to select()/remove() which in turn call
_validate_filters().
Args:
filterset: Optional mapping of facet -> list of human/canonical values
to pre-populate the instance.
query_filters: Optional mapping of facet -> list of API-ready values
(eg. URLs) to pre-populate the instance.
mode: Operational mode, either "observations" (default) or "samples".
"""
self._base_url = API_BASE_URL
self._mode = mode
self._filters = filterset or {}
self._query_filters = query_filters or {}
# In samples mode we enforce a persistent dataset selection which
# cannot be removed by the user. Ensure both human-facing and
# query-side filters contain the required values.
if self._mode == "samples":
persistent_label = "TERN Ecosystem Surveillance"
persistent_uri = "http://linked.data.gov.au/dataset/ausplots"
if "dataset" not in self._filters:
self._filters["dataset"] = [persistent_label]
else:
if persistent_label not in self._filters["dataset"]:
self._filters["dataset"].insert(0, persistent_label)
if "dataset" not in self._query_filters:
self._query_filters["dataset"] = [persistent_uri]
else:
if persistent_uri not in self._query_filters["dataset"]:
self._query_filters["dataset"].insert(0, persistent_uri)
@staticmethod
def _display_warning(message: str) -> None:
"""Display a clean, formatted warning message in Jupyter/IPython environments.
This method provides a cleaner alternative to Python's default warnings.warn()
which includes verbose file paths and line numbers. In Jupyter notebooks,
it prints a styled warning message directly.
Args:
message: The warning message to display.
"""
# Check if we're in IPython/Jupyter
try:
get_ipython # type: ignore # noqa: F821
# In Jupyter/IPython - use clean print with styling
print(f"\n⚠️ Warning: {message}\n", file=sys.stderr) # noqa: T201
except NameError:
# Not in IPython - use standard warnings
warnings.warn(message, UserWarning, stacklevel=4)
def __str__(self) -> str:
"""Return a user-friendly string representation of the instance.
The string includes a professional header with the class name and version,
followed by a clean, organized summary of the current filter configuration.
Uses Unicode box-drawing characters for a polished, institutional look.
Returns:
A formatted string summarizing the instance and its filter state.
Examples:
>>> ec = EcoPlots()
>>> ec.select(site_id="TCFTNS0002")
>>> print(ec)
╔══════════════════════════════════════════════════════════════════════════════╗
║ EcoPlots Observations ║
║ Version: 1.0.0 ║
╠══════════════════════════════════════════════════════════════════════════════╣
║ Active Filters: ║
║ • site_id: TCFTNS0002 ║
╚══════════════════════════════════════════════════════════════════════════════╝
"""
# Box drawing constants
BOX_WIDTH = 78
INDENT = " " # 4 spaces for continuation lines
# Header with decorative separator
header = f"╔{'═' * BOX_WIDTH}╗"
title = f"║ {self.__class__.__name__} {self._mode.capitalize():<{BOX_WIDTH - 2 - len(self.__class__.__name__) - 1}} ║"
version_line = f"║ Version: {VERSION:<{BOX_WIDTH - 11}} ║"
separator = f"╠{'═' * BOX_WIDTH}╣"
footer = f"╚{'═' * BOX_WIDTH}╝"
# Filter summary
filter_count = len(self._filters)
query_filter_count = len(self._query_filters)
lines = [header, title, version_line, separator]
# Filter section
if filter_count > 0:
lines.append(f"║ {'Active Filters:':<{BOX_WIDTH - 2}} ║")
for key, value in self._filters.items():
value_str = str(value)
# First line: " • key: "
prefix = f" • {key}: "
max_first_line = BOX_WIDTH - 2 - len(prefix)
if len(value_str) <= max_first_line:
# Fits on one line
content = f"{prefix}{value_str}"
lines.append(f"║ {content:<{BOX_WIDTH - 2}} ║")
else:
# Needs wrapping
# First line
first_chunk = value_str[:max_first_line]
content = f"{prefix}{first_chunk}"
lines.append(f"║ {content:<{BOX_WIDTH - 2}} ║")
# Continuation lines
remaining = value_str[max_first_line:]
max_cont_line = BOX_WIDTH - 2 - len(INDENT)
while remaining:
chunk = remaining[:max_cont_line]
remaining = remaining[max_cont_line:]
content = f"{INDENT}{chunk}"
lines.append(f"║ {content:<{BOX_WIDTH - 2}} ║")
else:
lines.append(f"║ {'No filters applied':<{BOX_WIDTH - 2}} ║")
# Query filter section (internal)
if query_filter_count > 0:
lines.append(f"║ {'':<{BOX_WIDTH - 2}} ║")
query_info = f"Resolved Query Filters: {query_filter_count}"
lines.append(f"║ {query_info:<{BOX_WIDTH - 2}} ║")
lines.append(footer)
return "\n".join(lines)
def __repr__(self) -> str:
"""Return a string representation that can reconstruct the instance.
The representation is a valid Python expression that could be used to
recreate an equivalent instance with the same filter configuration.
Returns:
A string representation suitable for debugging and reconstruction.
"""
# Format filters for readability
filters_repr = repr(self._filters) if self._filters else "{}"
query_filters_repr = repr(self._query_filters) if self._query_filters else "{}"
return (
f"{self.__class__.__name__} {repr(self._mode)}("
f"filterset={filters_repr}, "
f"query_filters={query_filters_repr}, "
)
def __eq__(self, other) -> bool:
"""Compare two instances for structural equality.
Two instances are considered equal when they are of the exact same
runtime type and both their internal _filters and _query_filters dicts
compare equal.
Args:
other: Another object to compare against.
Returns:
True if both type and internal filter state match, False otherwise.
Examples:
Basic usage:
.. code-block:: python
from terndata.ecoplots import EcoPlots
ec1 = EcoPlots()
ec1.select(site_id="TCFTNS0002")
ec2 = EcoPlots()
ec2.select(site_id="TCFTNS0002")
ec1 == ec2 # True
"""
if type(self) is not type(other): # noqa: PIE789
return False
return self._filters == other._filters and self._query_filters == other._query_filters
def __bool__(self) -> bool:
"""Truthiness of the instance.
The instance is considered truthy only when it has both human-facing
_filters and resolved/_query_filters populated. This reflects that a
fully-formed selection requires both sides.
Returns:
True if both _filters and _query_filters are non-empty, False otherwise.
Examples:
Basic usage:
.. code-block:: python
from terndata.ecoplots import EcoPlots
ec = EcoPlots()
bool(ec) # False
ec.select(site_id="TCFTNS0002")
_ = ec.preview() # or: _ = ec.get_data(); resolves query-side filters
bool(ec) # True
"""
return bool(self._filters) and bool(self._query_filters)
def __len__(self) -> int:
"""Return the count of selected filter values.
The length is computed as the total number of individual values across
all facets in self._filters (i.e. counts values, not facets).
Returns:
Total number of selected filter values (int).
Examples:
Basic usage:
.. code-block:: python
from terndata.ecoplots import EcoPlots
ec = EcoPlots()
ec.select(site_id=["TCFTNS0002"], dataset="TERN Surveillance")
len(ec) # 2
"""
return sum(len(v) for v in self._filters.values())
def __copy__(self):
"""Create a shallow copy of the instance.
The returned instance is of the same concrete type and receives shallow
copies of the _filters and _query_filters mappings. This is useful for
small modifications without affecting the original instance.
Returns:
A new instance (same type) with shallow-copied internal state.
Examples:
Basic usage:
.. code-block:: python
import copy
from terndata.ecoplots import EcoPlots
ec1 = EcoPlots().select(site_id="TCFTNS0002")
ec2 = copy.copy(ec1)
ec1 is ec2 # False
ec1 == ec2 # True (same filter state)
"""
return type(self)(
filterset=copy.copy(self._filters),
query_filters=copy.copy(self._query_filters),
)
def __deepcopy__(self, memo):
"""Create a deep copy of the instance.
Uses copy.deepcopy on the internal mappings to ensure no shared
references remain between the new and original instance.
Args:
memo: Memoization dictionary passed by the copy protocol.
Returns:
A new instance (same type) with deeply-copied internal state.
Examples:
Basic usage:
.. code-block:: python
import copy
from terndata.ecoplots import EcoPlots
ec1 = EcoPlots().select(site_id=["TCFTNS0002", "TCFTNS0003"])
ec2 = copy.deepcopy(ec1)
ec1 is ec2 # False
ec1 == ec2 # True
"""
return type(self)(
filterset=copy.deepcopy(self._filters, memo),
query_filters=copy.deepcopy(self._query_filters, memo),
)
def __contains__(self, item: str) -> bool:
"""Check if a facet is currently applied.
This method first validates the facet name against QUERY_FACETS. For
internal consistency it returns True only if the facet exists in both
the human-visible _filters and the resolved _query_filters. Raises a
KeyError for unknown facets.
Args:
item: Facet name to check.
Returns:
True if the facet is present in both _filters and _query_filters.
Raises:
KeyError: If the provided facet name is not known.
Examples:
Basic usage:
.. code-block:: python
ec = EcoPlots().select(site_id="TCFTNS0002")
_ = ec.preview() # ensure resolution
"site_id" in ec # True
# The following raises KeyError (unknown facet):
# "not_a_facet" in ec
"""
if item not in QUERY_FACETS:
raise KeyError(f"Invalid key `{item}`. Allowed: " + ", ".join(QUERY_FACETS))
return item in self._filters and item in self._query_filters
def __getitem__(self, item: str) -> list:
"""Retrieve the human-visible values for a given facet.
Validates the facet name and ensures it is present in the current
instance. This is an internal accessor that returns the list of values
stored in _filters for the facet.
Args:
item: Facet name to retrieve.
Returns:
List of values for the facet.
Raises:
KeyError: If facet is invalid or not present in current filters.
Examples:
Basic usage:
.. code-block:: python
from terndata.ecoplots import EcoPlots
ec = EcoPlots().select(site_id=["TCFTNS0002", "TCFTNS0003"])
ec["site_id"] # ["TCFTNS0002", "TCFTNS0003"]
"""
if item not in QUERY_FACETS:
raise KeyError(f"Invalid key `{item}`. Allowed: " + ", ".join(QUERY_FACETS))
if item not in self._filters:
raise KeyError(f"Key `{item}` not present in current instance.")
return self._filters.get(item, [])
def __setitem__(self, facet: str, values: Union[str, list[str]]):
"""Assign values to a facet (delegates to select).
This operator-style API is a convenience wrapper that validates the
facet is allowed and then forwards the work to select(), which handles
normalization and validation.
Args:
facet: Facet name to set (must be listed in QUERY_FACETS).
values: Single value or iterable of values to apply to the facet.
Raises:
KeyError: If the facet is not allowed.
Examples:
Basic usage:
.. code-block:: python
from terndata.ecoplots import EcoPlots
ec = EcoPlots()
ec["site_id"] = "TCFTNS0002"
ec["site_id"] = ["TCFTNS0002", "TCFTNS0003"]
ec["dataset"] = "TERN Surveillance"
"""
if facet not in QUERY_FACETS:
raise KeyError(f"Invalid key `{facet}`. Allowed: " + ", ".join(QUERY_FACETS))
self.select(filters={facet: values})
def __delitem__(self, key) -> None:
"""Delete a facet or specific values from a facet (delegates to remove).
The method validates provided facet names and delegates to remove(),
raising appropriate errors for unknown facets or malformed keys.
Args:
key: Either a single facet name (str) or a tuple (facet, values).
Raises:
KeyError: If the facet is unknown or tuple form is malformed.
Examples:
Basic usage:
.. code-block:: python
from terndata.ecoplots import EcoPlots
ec = EcoPlots()
del ec['site_id'] # remove entire facet
del ec['site_id', 'TCFTNS0002'] # remove a single value
del ec['site_id', ['A','B','C']] # remove multiple values
"""
allowed_facets = SAMPLE_QUERY_FACETS if self._mode == "samples" else QUERY_FACETS
if isinstance(key, tuple):
if len(key) != 2:
raise KeyError("Expected ('facet', values) for value deletion.")
facet, values = key
# Allow human/canonical names here by resolving to canonical
if facet not in allowed_facets:
raise KeyError(f"Unknown facet {facet!r}. Allowed: {', '.join(allowed_facets)}")
# Delegate; remove() expects canonical keys, same as select()
self.remove(filters={facet: values})
else:
if key not in allowed_facets:
raise KeyError(f"Unknown facet {key!r}. Allowed: {', '.join(allowed_facets)}")
self.remove(filters={key: None})
[docs]
def select(self: SelfType, filters: Optional[dict] = None, **kwargs) -> SelfType:
"""Add/merge filters and validate them.
Accepts either a dict or keyword arguments.
Args:
filters: Mapping like ``{"site_id": [...], "dataset": [...]}``.
**kwargs: Alternative way to pass filters, e.g. ``site_id="ABC"``.
Special keyword filters (handled separately from facet resolution):
- ``spatial``: WKT string or GeoJSON geometry ``dict`` to
spatially restrict results to a custom region.
- ``has_image`` (``bool``, *samples* mode only): Limit to
samples that have attached images.
- ``soil_subsite_id`` (``int`` or ``list[int]``, *samples* mode
only): Restrict to specific soil sub-site identifiers.
- ``soil_depth_range`` (``[min, max]`` or
``{"min": x, "max": y}``, *samples* mode only): Filter
samples by soil depth in metres.
- ``date_from`` (``str``): Earliest date (inclusive) in any
recognisable format — ``"DD/MM/YYYY"``, ``"21 May 2020"``,
``"21st May 2020"``, ``"YYYY-MM-DD"`` etc. Normalised to
``"YYYY-MM-DD"`` internally. Day-first is assumed for
all-numeric inputs (``MM-DD-YYYY`` is never accepted).
- ``date_to`` (``str``): Latest date (inclusive), same format
rules as ``date_from``.
Raises:
EcoPlotsError: Unknown filter keys.
EcoPlotsError: ``region`` provided without current or new ``region_type``.
Returns:
self for chaining.
"""
# print(f"Current filters: {self._filters}") # Debugging output
# Merge filters from dict and kwargs
input_filters = {}
if filters:
input_filters.update(filters)
if kwargs:
input_filters.update(kwargs)
if self._mode == "samples":
alias_map = {
"soil_subsite": "soil_subsite_id",
}
normalized_input = {}
for key, value in input_filters.items():
canonical_key = alias_map.get(key, key)
if canonical_key in normalized_input and value is not None:
existing = normalized_input[canonical_key]
if isinstance(existing, list):
if isinstance(value, (list, tuple)):
existing.extend(list(value))
else:
existing.append(value)
normalized_input[canonical_key] = existing
else:
normalized_input[canonical_key] = value
else:
normalized_input[canonical_key] = value
input_filters = normalized_input
# 1. Determine allowed facets based on mode
allowed_facets = SAMPLE_QUERY_FACETS if self._mode == "samples" else QUERY_FACETS
# 2. Validate allowed keys
invalid_keys = set(input_filters) - set(allowed_facets)
if invalid_keys:
raise EcoPlotsError(f"Invalid filter keys: {invalid_keys}. Allowed: {allowed_facets}")
# 3. Validate region logic
if "region" in input_filters:
region_type_now = "region_type" in input_filters
region_type_before = "region_type" in self._filters
if not (region_type_now or region_type_before):
raise EcoPlotsError("'region_type' must be provided before or with 'region'.")
# 4. Save current state for potential rollback
filters_backup = copy.deepcopy(self._filters)
# 5. Merge filters (always as list)
for k, v in input_filters.items():
if v is None:
continue
if k == "has_image":
if self._mode != "samples":
raise EcoPlotsError("'has_image' filter is only available in 'samples' mode.")
if isinstance(v, (list, tuple)):
if len(v) != 1:
raise EcoPlotsError("'has_image' accepts a single boolean value.")
v = v[0]
if not isinstance(v, bool):
raise EcoPlotsError("'has_image' must be a boolean (True/False).")
self._filters["has_image"] = v
self._query_filters["has_image"] = v
continue
if k == "spatial":
_validate_spatial_input(v) # validate spatial filter
# replace any existing spatial filter
self._filters["spatial"] = v
continue
if k == "soil_subsite_id":
if self._mode != "samples":
raise EcoPlotsError(
"'soil_subsite_id' filter is only available in 'samples' mode."
)
vals = list(v) if isinstance(v, (list, tuple)) else [v]
normalized_subsite_ids = []
for raw in vals:
try:
subsite_id = int(raw)
except (TypeError, ValueError):
raise EcoPlotsError(
"'soil_subsite_id' must be an integer or list of integers."
)
normalized_subsite_ids.append(subsite_id)
existing_ids = self._filters.get("soil_subsite_id", [])
if not isinstance(existing_ids, list):
existing_ids = [existing_ids]
merged = list(existing_ids)
for sid in normalized_subsite_ids:
if sid not in merged:
merged.append(sid)
self._filters["soil_subsite_id"] = merged
continue
if k == "soil_depth_range":
if self._mode != "samples":
raise EcoPlotsError(
"'soil_depth_range' filter is only available in 'samples' mode."
)
min_depth = None
max_depth = None
if isinstance(v, dict):
min_depth = v.get("min")
max_depth = v.get("max")
elif isinstance(v, (list, tuple)) and len(v) == 2:
min_depth, max_depth = v
else:
raise EcoPlotsError(
"'soil_depth_range' must be [min, max], (min, max), "
"or {'min': x, 'max': y}."
)
try:
min_depth = float(min_depth)
max_depth = float(max_depth)
except (TypeError, ValueError):
raise EcoPlotsError(
"'soil_depth_range' min/max must be numeric values."
)
if max_depth <= min_depth:
raise EcoPlotsError(
"Invalid 'soil_depth_range': max must be greater than min."
)
self._filters["soil_depth_range"] = [
min_depth,
max_depth
]
continue
if k in ("date_from", "date_to"):
parsed = _parse_date(v if isinstance(v, str) else str(v))
self._filters[k] = parsed
continue
if not isinstance(v, (list, tuple)):
v = [v]
if k in self._filters:
self._filters[k].extend(list(v))
else:
self._filters[k] = list(v)
# 6. Validate filters - if validation fails (returns False), rollback
validation_passed = self._validate_filters()
if not validation_passed:
# Rollback to previous state
self._filters = filters_backup
# print(f"Filters updated: {self._filters}") # Debugging output
return self
[docs]
def remove(self: SelfType, filters: Optional[dict] = None, **kwargs) -> SelfType:
"""Remove whole facets or specific values (same ergonomics as ``select``).
Accepts either a dict or keyword arguments. For each facet:
- value is ``None`` → remove the **entire facet**
- value is a string → remove that **single value**
- value is a list/tuple → remove **those values**
Args:
filters: Mapping like ``{"site_id": ["TCFTNS0002"], "dataset": None}``.
**kwargs: Alternative way to pass removals, e.g. ``site_id="TCFTNS0002"``.
Raises:
EcoPlotsError: Unknown filter keys (not in ``QUERY_FACETS``).
EcoPlotsError: If ``dataset`` is targeted while in ``samples`` mode
(the ``TERN Ecosystem Surveillance`` dataset is protected).
KeyError: Facet not present in current filters.
EcoPlotsError: Specific values requested but not found for that facet.
Returns:
self (chainable)
"""
# Merge inputs (dict + kwargs), exactly like select()
input_filters = {}
if filters:
input_filters.update(filters)
if kwargs:
input_filters.update(kwargs)
# 1. Determine allowed facets based on mode
allowed_facets = SAMPLE_QUERY_FACETS if self._mode == "samples" else QUERY_FACETS
# 2. Validate allowed keys
invalid_keys = set(input_filters) - set(allowed_facets)
if invalid_keys:
raise EcoPlotsError(f"Invalid filter keys: {invalid_keys}. Allowed: {allowed_facets}")
# Protect persistent dataset in samples mode
if self._mode == "samples" and "dataset" in input_filters:
raise EcoPlotsError(
"Cannot remove 'dataset' when in 'samples' mode. The dataset 'TERN Surveillance' is required and protected."
)
removed_facets = set()
# 2. Apply removals
for facet, vals in input_filters.items():
if facet not in self._filters:
raise KeyError(f"Facet {facet!r} not present in filters.")
if vals is None:
# remove entire facet
self._filters.pop(facet, None)
removed_facets.add(facet)
continue
if facet == "spatial" and vals is not None:
raise EcoPlotsError(
"Cannot remove specific values from 'spatial' filter; "
"use None to clear entire facet."
)
# normalize to a list
if isinstance(vals, (str, bytes)) or not isinstance(vals, Iterable):
vals = [vals]
else:
vals = list(vals)
existing = self._filters.get(facet, [])
missing = [v for v in vals if v not in existing]
if missing:
raise EcoPlotsError(f"Values not found in facet {facet!r}: {missing}")
# remove requested values
self._filters[facet] = [v for v in existing if v not in vals]
if not self._filters[facet]:
self._filters.pop(facet, None)
removed_facets.add(facet)
# 3. Handle region invariants
# If region_type was removed, clear all 'region' facet.
if "region_type" in removed_facets and "region" in self._filters:
self._filters.pop("region", None)
# 4. Rebuild API query filters
self._query_filters = {}
if self._filters:
self._validate_filters()
return self
[docs]
def clear(self: SelfType) -> SelfType:
"""Clear all filters from the instance.
The method mutates the instance and returns it to allow fluent/chained calls.
Returns:
self (chainable)
Notes:
In ``samples`` mode the ``TERN Ecosystem Surveillance`` dataset
filter is preserved; only user-added filters are cleared.
"""
# Preserve persistent dataset when in samples mode
if self._mode == "samples":
self._filters = {"dataset": ["TERN Ecosystem Surveillance"]}
self._query_filters = {"dataset": ["http://linked.data.gov.au/dataset/ausplots"]}
else:
self._filters = {}
self._query_filters = {}
return self
[docs]
def from_date(self: SelfType, date: str) -> SelfType:
"""Set an earliest-date filter (inclusive).
Chainable with :meth:`till`. The date string is parsed tolerantly —
``"DD/MM/YYYY"``, ``"21 May 2020"``, ``"21st May 2020"``,
``"YYYY-MM-DD"`` etc. are all accepted. For all-numeric inputs the
day-first convention (``DD-MM-YYYY``) is always used.
Equivalent to ``select(date_from=date)``.
Args:
date: Start date in any recognisable human format.
Returns:
self (chainable)
Raises:
EcoPlotsError: If the date string cannot be parsed.
Examples:
.. code-block:: python
ec.select(site_id="ABC").from_date("01/01/2020").to_date("31/12/2022")
"""
return self.select(date_from=date)
[docs]
def to_date(self: SelfType, date: str) -> SelfType:
"""Set a latest-date filter (inclusive).
Chainable with :meth:`from_date`. Accepts the same flexible date
formats — ``"DD/MM/YYYY"``, ``"31 Dec 2022"``, ``"YYYY-MM-DD"``, etc.
Equivalent to ``select(date_to=date)``.
Args:
date: End date in any recognisable human format.
Returns:
self (chainable)
Raises:
EcoPlotsError: If the date string cannot be parsed.
Examples:
.. code-block:: python
ec.select(site_id="ABC").from_date("01/01/2020").to_date("31/12/2022")
"""
return self.select(date_to=date)
[docs]
def get_filter(self, facet: Optional[str] = None) -> Union[list, dict, None]:
"""Return the current filter values for a specific facet or all applied filters.
Args:
facet: The facet to retrieve the filter for. Defaults to All.
Raises:
EcoPlotsError: If an invalid facet name is provided.
Returns:
A list of values for the specified facet, or ``None`` if the facet
is not currently applied. If *facet* is ``None``, returns a ``dict``
mapping each applied facet to its list of values.
"""
if facet:
facet_val = resolve_facet(facet, QUERY_FACETS)
if facet_val:
return self._filters.get(facet_val)
raise EcoPlotsError(
f"Invalid facet name `{facet}`. Allowed facets: " + ", ".join(QUERY_FACETS)
)
return self._filters
[docs]
def get_api_query_filters(self, facet: Optional[str] = None) -> Union[list, dict, None]:
"""Return the current query filters for ecoplots API for a specified facet or all facet.
Args:
facet: The facet to retrieve the query filters for. Defaults to None.
Raises:
EcoPlotsError: If an invalid facet name is provided.
Returns:
A list of resolved API values for the specified facet, or ``None``
if the facet is not currently applied. If *facet* is ``None``,
returns a ``dict`` of all resolved query filters.
"""
if facet:
facet_val = resolve_facet(facet, QUERY_FACETS)
if facet_val:
return self._query_filters.get(facet_val)
raise EcoPlotsError(
f"Invalid facet name `{facet}`. Allowed facets: " + ", ".join(QUERY_FACETS)
)
return self._query_filters
[docs]
def discover(
self,
discovery_facet: str,
region_type: Optional[str] = None,
) -> dict:
"""Resolve and call the discovery endpoint for a facet.
Args:
discovery_facet: Facet to discover (must resolve via configured discovery facets).
region_type: Optional region type used when discovering regions.
Returns:
Parsed JSON payload returned by the discovery endpoint.
Raises:
EcoPlotsError: If the facet cannot be resolved.
Notes:
- Internal use only
- A 60-second request timeout is enforced.
"""
facet_param = resolve_facet(discovery_facet, DISCOVERY_FACETS)
if not facet_param:
raise EcoPlotsError(f"Invalid discovery facet: {discovery_facet}")
if facet_param == "region" and region_type:
region_type_val = resolve_region_type(region_type)
url = f"{self._base_url}/api/v1.0/discovery/{facet_param}?region_type={region_type_val}"
else:
url = f"{self._base_url}/api/v1.0/discovery/{facet_param}"
payload = {"query": copy.deepcopy(self._query_filters)}
resp = requests.post(url, json=payload, timeout=60)
resp.raise_for_status()
return orjson.loads(resp.content)
[docs]
def discover_samples(
self,
discovery_facet: str,
region_type: Optional[str] = None,
) -> dict:
"""Resolve and call the discovery endpoint for samples.
Args:
discovery_facet: Facet to discover (must resolve via SAMPLE_DISCOVERY_FACETS).
region_type: Optional region type used when discovering regions.
Required when discovery_facet is "region".
Returns:
Parsed JSON payload returned by the discovery endpoint.
Raises:
EcoPlotsError: If the facet cannot be resolved or region_type is missing
when discovering regions.
Notes:
- Internal use only
- A 60-second request timeout is enforced.
"""
from ._config import SAMPLE_DISCOVERY_FACETS
facet_param = resolve_facet(discovery_facet, SAMPLE_DISCOVERY_FACETS)
if not facet_param:
raise EcoPlotsError(f"Invalid discovery facet: {discovery_facet}")
if discovery_facet == "dataset":
# hardcoded, doesn't change
return [{
"key": "TERN Ecosystem Surveillance",
"uri": "http://linked.data.gov.au/dataset/ausplots",
}]
url = f"{self._base_url}/api/v1.0/ui/facet/samples"
# Build query with only the facets defined in SAMPLE_DISCOVERY_FACETS
query = {}
for facet in SAMPLE_DISCOVERY_FACETS:
# Skip region_type if we're discovering regions; will be resolved from args
if discovery_facet == "region" and facet == "region_type":
continue
if facet in self._query_filters:
query[facet] = self._query_filters[facet]
# Additional query-side sample filters used by dedicated sample discovery endpoints.
for facet in ("soil_subsite_id", "soil_depth_range", "date_from", "date_to"):
if facet in self._query_filters:
query[facet] = self._query_filters[facet]
# When discovering regions, resolve region_type from args and add to query
if discovery_facet == "region":
if not region_type:
raise EcoPlotsError("region_type is required when discovering regions")
facet, urls, matched, unmatched, corrected = validate_facet(
"region_type", [region_type]
)
if urls:
query["region_type"] = urls
else:
raise EcoPlotsError(f"Could not resolve region_type: {region_type}")
payload = {"query": query, "has_image": self._query_filters.get("has_image", False)}
params = []
if discovery_facet == "sample_name":
params.append(("facet", facet_param))
resp = requests.post(
url,
params=params if params else None,
json=payload,
timeout=60,
)
resp.raise_for_status()
# Parse JSON from response text
parsed = orjson.loads(resp.text)
if discovery_facet == "region_type":
# Ensure we have a list
parsed = parsed["aggregations"]["region_type"]["buckets"]
if not isinstance(parsed, list):
return []
# Remove doc_count and add uri from cache
with Cache(CACHE_DIR) as cache:
region_type_map = cache.get("region_type", {})
for res in parsed:
if isinstance(res, dict):
res.pop("doc_count", None)
key = res.get("key")
res["uri"] = key
res["key"] = region_type_map.get(key, "N/A")
elif discovery_facet == "region":
# Ensure we have a list
parsed = parsed["aggregations"]["region"]["buckets"]
if not isinstance(parsed, list):
return []
# Remove doc_count and add uri from cache
with Cache(CACHE_DIR) as cache:
region_map = cache.get("region", {})
for res in parsed:
if isinstance(res, dict):
res.pop("doc_count", None)
key = res.get("key")
res["uri"] = key
res["key"] = region_map.get(key, "N/A")
elif discovery_facet == "material_sample_type":
parsed = parsed["aggregations"][facet_param]["value"]["buckets"]
if not isinstance(parsed, list):
return []
for res in parsed:
if isinstance(res, dict):
res.pop("doc_count", None)
key = res.get("key")
res["uri"] = key
res["key"] = MATERIAL_SAMPLE_TYPE_MAP.get(key, "N/A")
elif discovery_facet in ("site_id", "used_procedure"):
parsed = parsed["aggregations"][facet_param]["value"]["buckets"]
if not isinstance(parsed, list):
return []
with Cache(CACHE_DIR) as cache:
facet_map = cache.get(facet_param, {})
for res in parsed:
if isinstance(res, dict):
res.pop("doc_count", None)
key = res.get("key")
res["uri"] = key
res["key"] = facet_map.get(key, "N/A")
elif discovery_facet == "sample_name":
parsed = parsed["aggregations"][facet_param]["value"]["buckets"]
if not isinstance(parsed, list):
return []
for res in parsed:
if isinstance(res, dict):
res.pop("doc_count", None)
return parsed
[docs]
def discover_soil_depth_range(self):
"""Discover soil depth range aggregates for the current query.
Sends the current samples query to the ``/samples/soildepth`` endpoint and
returns a single-row GeoDataFrame with descriptive depth summary columns.
Returns:
geopandas.GeoDataFrame: One-row table with soil depth aggregate values.
Raises:
EcoPlotsError: If called outside ``samples`` mode.
"""
if self._mode != "samples":
raise EcoPlotsError("Soil depth range discovery is only available in 'samples' mode.")
try:
import geopandas
except ImportError:
raise EcoPlotsError(
"geopandas is required for discover_soil_depth_range(). "
"Install it with: pip install geopandas"
)
payload = {"query": copy.deepcopy(self._query_filters)}
# Keep payload compatible with discovery-style samples endpoints.
resp = requests.post(
f"{self._base_url}/api/v1.0/samples/soildepth",
json=payload,
timeout=30,
)
resp.raise_for_status()
parsed = orjson.loads(resp.content)
aggs = parsed.get("aggregations", {}) if isinstance(parsed, dict) else {}
def _value(key: str):
node = aggs.get(key, {})
if isinstance(node, dict):
return node.get("value")
return None
def _range_text(min_key: str, max_key: str):
low = _value(min_key)
high = _value(max_key)
if low is None or high is None:
return None
return f"{low}-{high} m"
row = {
"overall_depth_min_meter": _value("min_soil_depth_min"),
"overall_depth_max_meter": _value("max_soil_depth_max"),
"min_depth_range": _range_text("min_soil_depth_min", "min_soil_depth_max"),
"max_depth_range": _range_text("max_soil_depth_min", "max_soil_depth_max"),
}
return geopandas.GeoDataFrame([row])
[docs]
def discover_soilpit(self):
"""Discover soil pit distribution for the current samples query.
Sends the current query to the ``/samples/soilpit`` endpoint and returns
a two-column table with the soil pit identifier and its document count.
Returns:
pandas.DataFrame: Columns are ``soilpit`` and ``counts``.
Raises:
EcoPlotsError: If called outside ``samples`` mode.
"""
if self._mode != "samples":
raise EcoPlotsError("Soil pit discovery is only available in 'samples' mode.")
try:
import pandas as pd
except ImportError:
raise EcoPlotsError(
"pandas is required for discover_soilpit(). Install it with: pip install pandas"
)
payload = {"query": copy.deepcopy(self._query_filters)}
resp = requests.post(
f"{self._base_url}/api/v1.0/samples/soilpit",
json=payload,
timeout=30,
)
resp.raise_for_status()
parsed = orjson.loads(resp.content)
buckets = (
(((parsed or {}).get("aggregations", {}) or {}).get("soil_subsite_id", {}) or {})
.get("value", {})
.get("buckets", [])
)
rows = []
if isinstance(buckets, list):
for bucket in buckets:
if not isinstance(bucket, dict):
continue
rows.append(
{
"soilpit": bucket.get("key"),
"counts": bucket.get("doc_count"),
}
)
return pd.DataFrame(rows, columns=["soilpit", "counts"])
[docs]
def discover_species(self):
"""Discover species name distribution for the current samples query.
Sends the current query to the ``/samples/speciesname`` endpoint and
returns a two-column table with species name and document count.
Notes:
- Preserves all query filters including ``has_image``.
Returns:
pandas.DataFrame: Columns are ``speciesname`` and ``count``.
Raises:
EcoPlotsError: If called outside ``samples`` mode.
"""
if self._mode != "samples":
raise EcoPlotsError("Species discovery is only available in 'samples' mode.")
try:
import pandas as pd
except ImportError:
raise EcoPlotsError(
"pandas is required for discover_speciesname(). Install it with: pip install pandas"
)
payload = {"query": copy.deepcopy(self._query_filters)}
resp = requests.post(
f"{self._base_url}/api/v1.0/samples/speciesname",
json=payload,
timeout=30,
)
resp.raise_for_status()
parsed = orjson.loads(resp.content)
buckets = (
(((parsed or {}).get("aggregations", {}) or {}).get("speciesname", {}) or {})
.get("value", {})
.get("buckets", [])
)
rows = []
if isinstance(buckets, list):
for bucket in buckets:
if not isinstance(bucket, dict):
continue
rows.append(
{
"speciesname": bucket.get("key"),
"count": bucket.get("doc_count"),
}
)
return pd.DataFrame(rows, columns=["speciesname", "count"])
[docs]
async def fetch_data(
self,
page_number: Optional[int] = None,
page_size: Optional[int] = None,
dformat: str = "geojson",
**extras,
) -> dict:
"""Fetch data for the current query, optionally paginated.
Posts the current query filters to `EcoPlots API` data endpoint.
Args:
page_number: Page index to request. Must be provided together with ``page_size``.
page_size: Number of items per page. Must be provided together with ``page_number``.
dformat: Output format, either "geojson" (default) or "csv".
**extras: Additional query filters to merge into the current query
Returns:
Parsed JSON payload (GeoJSON) returned by the data endpoint.
Raises:
EcoPlotsError: If an invalid dformat is provided.
Notes:
- Timeout is 300s (5 min) when pagination is used; 3000s (50 min) otherwise.
- Socket read timeout matches total timeout; connection timeout is 30s.
- Intended for internal use only.
"""
if dformat not in ("geojson", "csv"):
raise EcoPlotsError("dformat must be one of 'geojson' or 'csv'")
payload = {
"query": copy.deepcopy(self._query_filters),
"page_number": page_number,
"page_size": page_size,
}
if extras and isinstance(payload["query"], dict):
payload["query"].update(extras)
if page_number and page_size:
payload.update({"page_number": page_number, "page_size": page_size})
timeout = aiohttp.ClientTimeout(total=300, sock_read=300, sock_connect=30)
else:
del payload["page_number"]
del payload["page_size"]
timeout = aiohttp.ClientTimeout(total=3000, sock_read=3000, sock_connect=30)
async with aiohttp.ClientSession() as session:
async with session.post(
f"{self._base_url}/api/v1.0/data/stream?dformat={dformat}",
json=payload,
timeout=timeout,
headers={"Accept": "text/event-stream"}
) as resp:
resp.raise_for_status()
chunks = []
async for line in resp.content:
line = line.decode('utf-8').strip()
if line:
chunks.append(line)
# Combine all chunks into final response
if dformat == "csv":
return '\n'.join(chunks).encode('utf-8')
else:
# For GeoJSON, parse the complete JSON response
complete_data = ''.join(chunks)
return orjson.loads(complete_data)
[docs]
async def fetch_samples_data(self):
"""Fetch sample data from the samples endpoint.
Returns data as a geopandas GeoDataFrame extracted from the _source field
of response hits.
Returns:
A geopandas GeoDataFrame with sample data.
Raises:
EcoPlotsError: If material_sample_type is not selected or if multiple
material_sample_types are selected (only one allowed).
Notes:
- Timeout is 300s (5 min)
- material_sample_type is required and must be single-valued
- Intended for internal use only
"""
try:
import geopandas
except ImportError:
raise EcoPlotsError(
"geopandas is required for fetch_samples_data(). "
"Install it with: pip install geopandas"
)
# Check that material_sample_type is present
if "material_sample_type" not in self._query_filters:
self._display_warning(
"material_sample_type must be selected to fetch samples data. "
"Please select a material_sample_type using select()."
)
return geopandas.GeoDataFrame()
# Check that only one material_sample_type is provided
material_sample_types = self._query_filters.get("material_sample_type", [])
if len(material_sample_types) != 1:
raise EcoPlotsError(
f"Exactly one material_sample_type must be selected, "
f"got {len(material_sample_types)}"
)
payload = {
"query": copy.deepcopy(self._query_filters),
"context": "samples"
}
has_image = bool(payload["query"].pop("has_image", False))
if has_image:
payload["has_image"] = True
timeout = aiohttp.ClientTimeout(total=300, sock_read=300, sock_connect=30)
async with aiohttp.ClientSession() as session:
try:
async with session.post(
f"{self._base_url}/api/v1.0/ui/data/samples",
json=payload,
timeout=timeout,
) as resp:
resp.raise_for_status()
data = await resp.json()
except aiohttp.ClientResponseError as exc:
if has_image and exc.status >= 500:
self._display_warning(
"Server rejected 'has_image=true'. Retrying without has_image "
"and filtering image rows client-side."
)
fallback_payload = {
"query": copy.deepcopy(self._query_filters),
"context": "samples",
}
async with session.post(
f"{self._base_url}/api/v1.0/ui/data/samples",
json=fallback_payload,
timeout=timeout,
) as resp:
resp.raise_for_status()
data = await resp.json()
else:
raise
# Extract hits from response
hits = data.get("hits", {}).get("hits", [])
if has_image:
def _has_sample_images(value):
if not isinstance(value, list):
return False
for item in value:
if isinstance(item, str) and item.strip():
return True
if isinstance(item, dict):
for url in item.values():
if isinstance(url, str) and url.strip():
return True
return False
hits = [
hit for hit in hits
if _has_sample_images((hit.get("_source", {}) or {}).get("sample_images"))
]
if not hits:
self._display_warning("No sample data found for the current filters.")
return geopandas.GeoDataFrame()
def _extract_value_field(raw_value):
if isinstance(raw_value, list) and raw_value and isinstance(raw_value[0], dict):
return raw_value[0].get("value")
if isinstance(raw_value, dict):
return raw_value.get("value")
return None
def _extract_label_field(raw_value):
if isinstance(raw_value, list) and raw_value and isinstance(raw_value[0], dict):
candidate = raw_value[0]
elif isinstance(raw_value, dict):
candidate = raw_value
else:
return None
if "label" in candidate:
return candidate.get("label")
value = candidate.get("value")
if isinstance(value, list) and value and isinstance(value[0], dict):
return value[0].get("label") or value[0].get("value")
if isinstance(value, dict):
return value.get("label") or value.get("value")
current = candidate.get("current")
if isinstance(current, list) and current and isinstance(current[0], dict):
return current[0].get("label") or current[0].get("value")
if isinstance(current, dict):
return current.get("label") or current.get("value")
return None
def _resolve_vocab_value(raw_value, vocab_map):
if isinstance(raw_value, list):
resolved = [vocab_map.get(v, v) for v in raw_value if v is not None]
if not resolved:
return None
return resolved[0] if len(resolved) == 1 else resolved
if raw_value is None:
return None
return vocab_map.get(raw_value, raw_value)
records = []
with Cache(CACHE_DIR) as cache:
dataset_map = cache.get("dataset", {}) or {}
feature_type_map = cache.get("feature_type", {}) or {}
site_visit_id_map = cache.get("site_visit_id", {}) or {}
used_procedure_map = cache.get("used_procedure", {}) or {}
dropped_fields = {
"dataset_attr_count",
"direct_site_id",
"geographic_information",
"observed_property",
"obsverved_property",
"related_samples",
"sample_attr_count",
"sample_information",
"site_attr_count",
"sites_hierarchy",
"site_hierarchy",
"site_hirarchy",
"site_visit_attr_count",
}
for hit in hits:
source = hit.get("_source", {})
if not isinstance(source, dict):
continue
cleaned = {}
for key, value in source.items():
# Drop explicit fields and region* columns
if key in {"geopoint", "tags"} or key.startswith("region"):
continue
if key in dropped_fields:
continue
# Flatten and rename igsn_information -> igsn
if key == "igsn_information":
igsn_val = None
if isinstance(value, list) and value and isinstance(value[0], dict):
nested_value = value[0].get("value")
if isinstance(nested_value, dict):
igsn_val = nested_value.get("label")
if igsn_val is not None:
cleaned["igsn"] = igsn_val
continue
# Flatten and rename visit_start_date -> visit_date
if key == "visit_start_date":
visit_date = _extract_value_field(value)
if visit_date is not None:
cleaned["visit_date"] = visit_date
continue
# Resolve used_procedure labels from cache
if key == "used_procedure":
resolved_used_procedure = _resolve_vocab_value(value, used_procedure_map)
if resolved_used_procedure is not None:
cleaned["used_procedure"] = resolved_used_procedure
continue
if key == "dataset":
resolved_dataset = _resolve_vocab_value(value, dataset_map)
if resolved_dataset is not None:
cleaned["dataset"] = resolved_dataset
continue
if key == "feature_type":
resolved_feature_type = _resolve_vocab_value(value, feature_type_map)
if resolved_feature_type is not None:
cleaned["feature_type"] = resolved_feature_type
continue
if key == "site_visit_id":
resolved_site_visit_id = _resolve_vocab_value(value, site_visit_id_map)
if resolved_site_visit_id is not None:
cleaned["site_visit_id"] = resolved_site_visit_id
continue
if key == "material_sample_type":
resolved_material_sample_type = _resolve_vocab_value(
value, MATERIAL_SAMPLE_TYPE_MAP
)
if resolved_material_sample_type is not None:
cleaned["material_sample_type"] = resolved_material_sample_type
continue
# Remove all null-valued fields
if value is not None:
cleaned[key] = value
records.append(cleaned)
# Convert list of records to GeoDataFrame
gdf = geopandas.GeoDataFrame(records)
# Drop columns that are entirely null after normalization
if not gdf.empty:
gdf = gdf.dropna(axis=1, how="all")
if "sample_images" in gdf.columns:
has_any_sample_image = gdf["sample_images"].apply(
lambda v: isinstance(v, list) and len(v) > 0
).any()
if not has_any_sample_image:
gdf = gdf.drop(columns=["sample_images"])
return gdf
[docs]
def discover_attributes(
self,
discovery_attribute: str,
) -> dict:
"""Discovers attribute URIs for a given entity type.
Args:
discovery_attribute: Attribute namespace to discover
(must resolve via configured discovery attributes).
Returns:
Parsed JSON payload containing attribute identifiers.
Raises:
EcoPlotsError: If the attribute cannot be resolved.
Notes:
- A 30-second request timeout is enforced.
- Intended for internal use only.
"""
facet_param = resolve_facet(discovery_attribute, DISCOVERY_ATTRIBUTES)
if not facet_param:
raise EcoPlotsError(f"Invalid discovery facet: {discovery_attribute}")
url = f"{self._base_url}/api/v1.0/discovery/attributes"
payload = {"query": copy.deepcopy(self._query_filters)}
params = [("type", facet_param)]
resp = requests.post(url, params=params, json=payload, timeout=30)
resp.raise_for_status()
return orjson.loads(resp.content)
[docs]
def summarise_data(self, query_filters: Optional[dict] = None) -> dict:
"""Request a lightweight summary for the given or current query filters.
Args:
query_filters: Canonical filters to use for the summary. When omitted, the
instance's current canonical query filters are used.
Returns:
Parsed JSON payload containing counts and related summary fields.
Notes:
- A 30-second request timeout is enforced.
- Intended for internal use only.
"""
payload = {
"query": (
copy.deepcopy(query_filters)
if (query_filters is not None)
else copy.deepcopy(self._query_filters)
),
}
if self._mode == "samples":
payload["context"] = "samples"
if "has_image" in payload["query"]:
payload["has_image"] = payload["query"].pop("has_image")
resp = requests.post(f"{self._base_url}/api/v1.0/data/summary", json=payload, timeout=30)
resp.raise_for_status()
return orjson.loads(resp.content)
# async def stream_data(self, query: dict = {}) -> dict:
# payload = copy.deepcopy(query)
# async with httpx.AsyncClient() as client:
# response = await client.post(f"{self.base_url}/api/v1.0/data/stream", json=payload)
# response.raise_for_status()
# return response.json()
[docs]
def save(self, path: Optional[Union[str, Path]] = None) -> str:
"""Save project state to a single `.ecoproj` file (atomic, checksummed).
Writes the current `filters` and `query_filters` into a compact binary file
with a small header and a JSON (orjson) payload. The filename resolution is:
- If `path` is `None`: save as `./ecoplots_<UTCSTAMP>.ecoproj`.
- If `path` has no `.ecoproj` suffix and no parent directory: save as
`./<name>.ecoproj` in the current working directory.
- If `path` ends with `.ecoproj`: save exactly to that location.
Args:
path: Optional target path or bare name. If omitted, a timestamped
filename is created in the current working directory.
Returns:
Absolute path to the saved `.ecoproj` file.
Raises:
Exception: Any unexpected error during the write; temporary files are
cleaned up best-effort before re-raising.
"""
target = _ensure_ecoproj_path(path).resolve()
target.parent.mkdir(parents=True, exist_ok=True)
payload = {"filters": self._filters, "query_filters": self._query_filters}
body = orjson.dumps(payload, option=orjson.OPT_SORT_KEYS)
sha = hashlib.sha256(body).digest()
# Header: MAGIC (4) | VERSION (1) | SHA256 (32) | LEN (8, big-endian)
tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".ecoproj")
tmp_path = Path(tmp.name)
try:
with open(tmp_path, "wb") as f:
f.write(MAGIC)
f.write(struct.pack(">B", VERSION))
f.write(sha)
f.write(struct.pack(">Q", len(body)))
f.write(body)
# Atomic replace
target.replace(target) if target.exists() else None
tmp_path.replace(target)
except Exception as e:
tmp_path.unlink(missing_ok=True)
raise e
finally:
if tmp_path.exists():
tmp_path.unlink(missing_ok=True)
return str(target)
[docs]
@classmethod
def load(cls: type[SelfType], path: Union[str, Path]) -> SelfType:
"""Load a `.ecoproj` file, validate integrity, and return a new instance.
Args:
path: Path to a `.ecoproj` file previously created by :meth:`save`.
Returns:
A new instance of the calling class with ``filters`` and
``query_filters`` restored from the file.
Raises:
FileNotFoundError: If the file does not exist.
EcoPlotsError: If the file does not have a `.ecoproj` suffix, the magic
header or version is invalid, the file is truncated, or the checksum
does not match the payload.
"""
p = Path(path).resolve()
if not p.exists():
raise FileNotFoundError(str(p))
if p.suffix != ".ecoproj":
raise EcoPlotsError(f"Expected a '.ecoproj' file, got: {p.name}")
with open(p, "rb") as f:
if f.read(4) != MAGIC:
raise EcoPlotsError("Invalid project file (bad magic).")
ver = struct.unpack(">B", f.read(1))[0]
if ver != VERSION:
raise EcoPlotsError(f"Incompatible project version: {ver} (expected {VERSION}).")
sha = f.read(32)
n = struct.unpack(">Q", f.read(8))[0]
body = f.read(n)
if len(body) != n:
raise EcoPlotsError("Truncated project file.")
if hashlib.sha256(body).digest() != sha:
raise EcoPlotsError("Project integrity check failed (checksum mismatch).")
data = orjson.loads(body)
return cls(filterset=data.get("filters", {}), query_filters=data.get("query_filters", {}))
def _validate_filters(self) -> bool:
"""Validate filters in parallel for all facets. Not for direct user use.
Returns:
`True` if filters validate and the summary indicates one or more
matching records; `False` if validation succeeded but the selection
yields zero records (a warning is issued).
Raises:
EcoPlotsError: If any facet contains values that cannot be matched.
Notes:
- Intended for internal use only.
"""
query_filters = copy.deepcopy(self._query_filters)
all_unmatched: dict = {}
all_matched = copy.deepcopy(self._filters)
if "spatial" in all_matched:
query_filters["spatial"] = all_matched["spatial"]
if "soil_subsite_id" in all_matched:
query_filters["soil_subsite_id"] = all_matched["soil_subsite_id"]
if "soil_depth_range" in all_matched:
query_filters["soil_depth_range"] = all_matched["soil_depth_range"]
if "date_from" in all_matched:
query_filters["date_from"] = all_matched["date_from"]
if "date_to" in all_matched:
query_filters["date_to"] = all_matched["date_to"]
if "speciesname" in all_matched:
user_species_values = all_matched.get("speciesname", [])
if not isinstance(user_species_values, (list, tuple)):
user_species_values = [user_species_values]
species_df = self.discover_species()
if "speciesname" in species_df.columns:
available_species = [
str(v).strip()
for v in species_df["speciesname"].dropna().tolist()
if str(v).strip()
]
else:
available_species = []
if not available_species:
raise EcoPlotsError(
"Unable to validate 'speciesname': discovery returned no species values "
"for the current filters."
)
matched_species = []
unmatched_species = []
for raw_value in user_species_values:
candidate = str(raw_value).strip()
if not candidate:
unmatched_species.append(raw_value)
continue
# First pass: case-insensitive exact match.
exact_match = next(
(name for name in available_species if name.casefold() == candidate.casefold()),
None,
)
if exact_match is not None:
matched_species.append(exact_match)
continue
# Fuzzy fallback to tolerate minor spelling/casing/missing tokens.
fuzzy = process.extractOne(candidate, available_species, scorer=fuzz.WRatio)
if fuzzy is None:
unmatched_species.append(raw_value)
continue
best_name, score, _ = fuzzy
if score >= 80:
if best_name.casefold() != candidate.casefold():
self._display_warning(
f"Value '{candidate}' for facet 'speciesname' corrected to "
f"'{best_name}'."
)
matched_species.append(best_name)
else:
unmatched_species.append(raw_value)
if unmatched_species:
all_unmatched.setdefault("speciesname", [])
all_unmatched["speciesname"].extend(unmatched_species)
else:
# Deduplicate while preserving order.
deduped_species = []
seen_species = set()
for item in matched_species:
if item not in seen_species:
seen_species.add(item)
deduped_species.append(item)
all_matched["speciesname"] = deduped_species
query_filters["speciesname"] = deduped_species
to_validate = {
k: v
for k, v in self._filters.items()
if k
not in {
"spatial",
"has_image",
"soil_subsite_id",
"soil_depth_range",
"speciesname",
"date_from",
"date_to",
}
}
with ThreadPoolExecutor() as executor:
futures = {
# NOTE: `validate_facet` uses rapidfuzz under the hood,
# rapidfuzz itself releases the GIL (written in C++),
# so we can leverage "true" parallelism here with ThreadPoolExecutor
# for CPU bound fuzzy matching and is much faster than asyncio.gather.
executor.submit(validate_facet, facet, value): facet
for facet, value in to_validate.items()
}
for future in as_completed(futures):
facet, urls, matched, unmatched, corrected = future.result()
# Convert to set for updating
existing = set(query_filters.get(facet, []))
existing.update(urls)
query_filters[facet] = list(existing)
all_matched.setdefault(facet, [])
# ensure corrected values are excluded
all_matched[facet] = [x for x in matched if x not in corrected]
# for val in filtered_matched:
# if val not in all_matched[facet]:
# all_matched[facet].append(val)
if unmatched:
all_unmatched.setdefault(facet, [])
all_unmatched[facet].extend(unmatched)
# convert sets to lists
# query_filters = {facet: list(urls) for facet, urls in query_filters.items()}
if all_unmatched:
msg = "The following filter values could not be matched:\n" + "\n".join(
f"Facet '{facet}': {unmatched}" for facet, unmatched in all_unmatched.items()
)
raise EcoPlotsError(msg)
summary_query_filters = copy.deepcopy(query_filters)
summary_query_filters.pop("has_image", None)
data = self.summarise_data(query_filters=summary_query_filters)
if data.get("total_doc", 0) == 0:
self._display_warning(
"The applied filters result in zero matching records. "
"Please adjust your filters. Skipping current selection..."
)
return False
self._query_filters = query_filters
self._filters = all_matched
return True
def _fetch_clusters(self, geojson: Optional[dict] = None) -> dict:
"""Fetch clustered data points for map visualization.
Args:
geojson: Optional GeoJSON polygon to define the area of interest.
Returns:
Parsed JSON payload containing clustered data points.
Notes:
- Intended for internal use only.
"""
payload = {
"query": copy.deepcopy(self._query_filters),
"clustering_precision": 3,
"geojson": geojson or {
"type":"Polygon",
"coordinates": [
[
[107.68366383276675,-9.83285528397626],
[159.86061589572708,-9.83285528397626],
[159.86061589572708,-44.49207177551449],
[107.68366383276675,-44.49207177551449],
[107.68366383276675,-9.83285528397626]
]
]
}
}
if self._mode == "samples":
payload["context"] = "samples"
has_image = payload["query"].pop("has_image", None)
if has_image is True:
payload["has_image"] = True
resp = requests.post(
f"{self._base_url}/api/v1.0/ui/map/clusters",
json=payload,
timeout=30,
)
resp.raise_for_status()
return orjson.loads(resp.content)
def _ensure_required_material_sample_types(self, required_labels: list[str], context: str) -> None:
"""Ensure at least one required material sample type is selected.
Args:
required_labels: Human-readable material sample type labels where at
least one must be present in current ``material_sample_type``.
context: Short workflow name used in error messages.
Raises:
EcoPlotsError: If none of the required sample types are selected.
"""
label_to_uri = {label: uri for uri, label in MATERIAL_SAMPLE_TYPE_MAP.items()}
selected = self._query_filters.get("material_sample_type", [])
if isinstance(selected, str):
selected_uris = {selected}
else:
selected_uris = set(selected)
required_uris = {
label_to_uri[label]
for label in required_labels
if label in label_to_uri
}
has_any_required = bool(selected_uris.intersection(required_uris))
if not has_any_required:
selected_labels = [
MATERIAL_SAMPLE_TYPE_MAP.get(uri, uri) for uri in sorted(selected_uris)
]
selected_display = ", ".join(selected_labels) if selected_labels else "none"
raise EcoPlotsError(
f"{context} requires material_sample_type to include at least one of: "
f"{', '.join(required_labels)}. Currently selected: {selected_display}."
)