Source code for haxr.utilities

# Keep type hints unevaluated so Sphinx renders/link public aliases (needed for
# h5py.File) instead of private impl paths.
from __future__ import annotations

from collections.abc import Iterator, Sequence
from typing import TypeAlias

import h5py
import numpy as np
import pandas as pd

H5File: TypeAlias = h5py.File

Float1DLike: TypeAlias = (
    Sequence[float]
    | np.ndarray[tuple[int], np.dtype[np.float32]]
    | np.ndarray[tuple[int], np.dtype[np.float32]]
    | np.ndarray[tuple[int], np.dtype[np.float64]]
    | np.ndarray[tuple[int], np.dtype[np.float64]]
)
Float1D: TypeAlias = np.ndarray[tuple[int], np.dtype[np.float64]]
Float2D: TypeAlias = np.ndarray[tuple[int, int], np.dtype[np.float64]]


[docs] def load_cycle(data: H5File, k: int) -> pd.DataFrame: """Load one radar cycle from an open HDF5 radar file. The cycle boundaries are stored as inclusive cell-index ranges in ``cycle/first`` and ``cycle/last``. This function slices the per-cell datasets (``tod``, ``az1``, ``az2``, ``r1``, ``r2``, ``amp``) for cycle ``k`` and returns them as a DataFrame indexed by the original cell indices. Column names are annotated with units if the underlying dataset defines an HDF5 attribute ``unit`` (e.g. ``tod (second)``). Two derived columns are added: ``az`` is the circular mean of ``az1`` and ``az2`` (degrees, rounded to 3 decimals) and ``r`` is the mid-range between ``r1`` and ``r2`` (rounded to 2 decimals). The derived column names use the units from ``az1`` and ``r1``. For the distinction between *cycles* and *frames*, see :ref:`Cycle vs frame <cycle-vs-frame>`. To iterate frames, use :func:`~haxr.utilities.iter_frames` and :func:`~haxr.utilities.load_frames`. Args: data: Open :class:`h5py.File` containing ``cycle/first``, ``cycle/last``, ``tod``, ``az1``, ``az2``, ``r1``, ``r2``, and ``amp``. k: Cycle index into ``cycle/first`` and ``cycle/last``. Returns: A :class:`pandas:pandas.DataFrame` containing the raw per-cell fields plus the derived columns ``az`` and ``r`` for cycle ``k``. Raises: IndexError: If ``k`` is out of bounds for ``cycle/first`` or ``cycle/last``. KeyError: If required datasets are missing, or if ``az1`` or ``r1`` lack the ``unit`` attribute needed to name the derived columns. Example: .. code-block:: python from haxr import Store from haxr.utilities import load_cycle with Store(base_url=..., cache_dir=...) as store: chunk = store.get_chunk(...) with store.open(chunk.radar_file) as f: df = load_cycle(f, 0) """ i = data["cycle/first"][k] j = data["cycle/last"][k] + 1 cols = dict() for v in ["tod", "az1", "az2", "r1", "r2", "amp"]: unit = data[v].attrs.get("unit") cols[f"{v} ({unit})" if unit else v] = data[v][i:j] az1 = np.deg2rad(data["az1"][i:j]) az2 = np.deg2rad(data["az2"][i:j]) az = np.atan2(np.sin(az1) + np.sin(az2), np.cos(az1) + np.cos(az2)) az = np.rad2deg(az) az = np.where(az < 0, az + 360, az) cols["az"] = np.round(az, 3) cols["r"] = np.round((data["r1"][i:j] + data["r2"][i:j]) / 2, 2) mapper = { v: f"{v} ({unit})" for v in ["az1", "az2", "r1", "r2", "amp"] if data[v].attrs.get("unit") } mapper["az"] = f"az ({data['az1'].attrs['unit']})" mapper["r"] = f"r ({data['r1'].attrs['unit']})" return pd.DataFrame(cols, index=range(i, j)).rename(columns=mapper)
[docs] def load_frames(data: H5File, k: int, *, n: int = 1) -> tuple[pd.DataFrame, int]: """Load up to ``n`` frames starting at cycle index ``k``. A frame is a sparse, non-overlapping subset of cycles as produced by :func:`~haxr.utilities.iter_frames` (see ref:`Cycle vs frame <cycle-vs-frame>`). This function loads the frames via :func:`~haxr.utilities.load_cycle`, concatenates them into a single DataFrame, and adds a 1-based ``frame`` column (``1..m``) identifying which loaded frame each row belongs to. Args: data: Open :class:`h5py.File` containing the cycle datasets required by :func:`~haxr.utilities.load_cycle`. k: Start cycle index (interpreted as the first frame). n: Maximum number of frames to load (must be ``>= 1``). Returns: A tuple ``(df, m)`` where ``df`` is the concatenated DataFrame and ``m`` is the number of frames actually loaded (``m <= n``). The returned DataFrame contains all columns from :func:`~haxr.utilities.load_cycle` plus the ``frame`` column. Raises: ValueError: If no frames are loaded (e.g. ``k`` is out of range), because :func:`pandas.concat` has no objects to concatenate. IndexError: If ``k`` (or a subsequent frame index) is out of bounds. KeyError: If required datasets are missing from ``data``. Example: .. code-block:: python from haxr import Store from haxr.utilities import load_frames with Store(base_url=..., cache_dir=...) as store: chunk = store.get_chunk(...) with store.open(chunk.radar_file) as f: # load 5 frames starting at cycle `k = 100` df, m = load_frames(f, k=100, n=5) first_frame = df[df["frame"] == 1] last_frame = df[df["frame"] == m] ... """ dfs = [load_cycle(data, i) for i in iter_frames(data, k, n=n)] for i, df in enumerate(dfs, start=1): df["frame"] = i return pd.concat(dfs), len(dfs)
[docs] def arg_next_frame(data: H5File, k: int) -> int | None: """Return the index of the next frame after frame ``k``. The radar stream is stored as overlapping cycles with inclusive cell-index bounds in ``cycle/first`` and ``cycle/last``. A frame is defined here as the next cycle that starts strictly after the current cycle ends. This helper therefore returns the first cycle index whose start index is greater than ``cycle/last[k]``. See :ref:`Cycle vs Frame <cycle-vs-frame>` for the underlying definition. This is a low-level primitive used by :func:`~haxr.utilities.iter_frames` and :func:`~haxr.utilities.load_frames`. Args: data: Open :class:`h5py.File` containing the datasets ``cycle/first`` and ``cycle/last``. k: Current cycle index (interpreted as the current frame). Returns: The integer index of the next frame, or ``None`` if no cycle starts after ``cycle/last[k]``. Raises: IndexError: If ``k`` is out of bounds for ``cycle/last``. KeyError: If ``cycle/first`` or ``cycle/last`` is missing from ``data``. """ first = data["cycle/first"] last = data["cycle/last"] idx = np.searchsorted(first, last[k], side="right").item() return idx if idx < first.size else None
[docs] def iter_frames(data: H5File, k: int, n: int | None = None) -> Iterator[int]: """Iterate cycle indices starting at ``k``, jumping by frames. Consecutive radar cycles overlap heavily. This iterator yields a sparse subset of cycle indices by repeatedly applying :func:`~haxr.utilities.arg_next_frame`, i.e., it advances to the first cycle whose start index is strictly greater than the end index of the previously yielded cycle. See :ref:`Cycle vs Frame <cycle-vs-frame>` for why adjacent cycles overlap and how frames are selected. Args: data: Open :class:`h5py.File` containing ``cycle/first`` and ``cycle/last``. k: Start cycle index. n: Maximum number of indices to yield (including ``k``). If ``None`` (default), iterate until no next frame exists. Yields: Cycle indices ``k, k2, k3, ...`` corresponding to successive frames. Raises: KeyError: If ``cycle/first`` or ``cycle/last`` is missing from ``data``. IndexError: If ``k`` (or a subsequent index) is out of bounds. Example: .. code-block:: python from haxr import Store from haxr.utilities import iter_frames, load_cycle with Store(base_url=..., cache_dir=...) as store: chunk = store.get_chunk(...) with store.open(chunk.radar_file) as f: # iterate 5 frames starting at cycle `k = 100` for k in iter_frames(f, k=100, n=5): df = load_cycle(f, k) ... For loading several frames into one DataFrame, use :func:`~haxr.utilities.load_frames`. """ if k < data["cycle/first"].size: yield k if n is None: n = data["cycle/last"].size - k maybe_k = k for _ in range(n - 1): maybe_k = arg_next_frame(data, maybe_k) if maybe_k is not None: yield maybe_k else: break
[docs] def fill_histogram( *, az: Float1DLike, r: Float1DLike, weights: Float1DLike | None = None, az_edges: Float1DLike | None = None, r_edges: Float1DLike | None = None, ) -> tuple[Float2D, Float1D, Float1D]: """Fill a 2D histogram in polar coordinates (azimuth, range). This is a thin wrapper around :func:`numpy.histogram2d`. If ``weights`` is provided, it is passed through as the per-sample weight array; a canonical choice is the per cell amplitude value. If ``weights`` is ``None``, the histogram contains counts. If ``az_edges`` or ``r_edges`` is ``None``, bin edges are inferred from the data. The helper :func:`~haxr.utilities.infer_az_edges` returns a schema compatible with :func:`numpy.linspace` (keys ``start``, ``stop``, ``num``), and :func:`~haxr.utilities.infer_r_edges` returns a schema compatible with :func:`numpy.arange` (keys ``start``, ``step``). These schemas are used to construct ``az_edges`` and ``r_edges`` for this function. Args: az: Azimuth samples (degrees). r: Range samples. weights: Optional per-sample weights (same length as ``az`` and ``r``). az_edges: Optional azimuth bin edges. If ``None``, inferred via :func:`~haxr.utilities.infer_az_edges`. r_edges: Optional range bin edges. If ``None``, inferred via :func:`~haxr.utilities.infer_r_edges`. Returns: A tuple ``(hist, az_edges, r_edges)`` as returned by :func:`numpy.histogram2d`, where ``hist.shape == (len(az_edges) - 1, len(r_edges) - 1)``. Raises: ValueError: If the provided or inferred bins are invalid for :func:`numpy.histogram2d`. KeyError: If inferred edge schemas are empty or missing required keys. """ if az_edges is None: pattern = infer_az_edges(az=az) start: float = pattern["start"] num: int = int(pattern["num"]) stop: float = pattern["stop"] az_edges = np.linspace(start=start, stop=stop, num=num, dtype=np.float64) if r_edges is None: pattern = infer_r_edges(r=r) start: float = pattern["start"] step: float = pattern["step"] stop: float = step + float(np.max(r)) r_edges = np.arange(start, stop=stop, step=step, dtype=np.float64) az_flat = np.asarray(az, dtype=np.float64).ravel() r_flat = np.asarray(r, dtype=np.float64).ravel() weights_flat = ( None if weights is None else np.asarray(weights, dtype=np.float64).ravel() ) az_edges_flat = np.asarray(az_edges, dtype=np.float64).ravel() r_edges_flat = np.asarray(r_edges, dtype=np.float64).ravel() return np.histogram2d( az_flat, r_flat, weights=weights_flat, bins=(az_edges_flat, r_edges_flat) )
[docs] def infer_r_edges(*, r: Float1DLike) -> dict[str, float]: """Infer a range-bin schema for histogramming. The returned schema is meant to be used with :func:`numpy.arange` to construct range bin edges. It contains ``start`` and ``step`` only (no ``stop``), because the appropriate upper bound depends on the use case. A common choice is ``stop = float(np.max(r)) + step`` (as done in :func:`~haxr.utilities.fill_histogram`). The step size is estimated as the median of ``np.diff(r)`` and the first edge is placed half a step before the minimum value. Warning: This function uses heuristics; the inferred edges may be incorrect for irregularly sampled or noisy inputs. If the inferred edges look unreasonable, prefer computing them explicitly. Args: r: Range samples with approximately constant spacing. Returns: A dict with keys ``start`` and ``step`` suitable for:: schema = infer_r_edges(r=r) stop = float(np.max(r)) + schema["step"] r_edges = numpy.arange(stop=stop, **schema) Raises: ValueError: If ``r`` is empty or cannot be reduced (e.g. ``min`` / ``diff`` fails). """ r = np.asarray(r) r0 = r.min() dr = np.median(np.diff(r)).item() return {"start": r0 - dr / 2, "step": dr}
[docs] def infer_az_edges(*, az: Float1DLike) -> dict[str, float | int]: """Infer an azimuth-bin schema for histogramming. The returned schema is meant to be used with :func:`numpy.linspace` to construct azimuth bin edges that cover one full turn. The edges are chosen such that the interval ``[0, 360]`` is fully covered and a wrap-around bin spanning the discontinuity at 360/0 is represented naturally (e.g., a bin ``355 .. 5`` degrees). This helper is primarily intended for :func:`~haxr.utilities.fill_histogram`. If inference fails (e.g., the azimuth step size cannot be determined reliably), an empty dict is returned. Warning: This function uses heuristics; the inferred edges may be incorrect for irregularly sampled or noisy inputs. If the inferred edges look unreasonable, prefer computing them explicitly. Args: az: 1D azimuth samples in degrees, typically in acquisition order. Returns: A schema dict with keys ``start``, ``stop``, and ``num`` suitable for:: schema = infer_az_edges(az=az) az_edges = numpy.linspace(**schema) The returned ``start`` may be negative and ``stop`` may be greater than 360 to ensure wrap-around coverage. Raises: ValueError: If ``az`` is empty or cannot be reduced (e.g. ``max`` / ``diff`` fails). """ az = np.asarray(az) az0 = az[np.argmax(az)] daz = np.diff(az) daz = daz[(daz > 0.01) & (daz < 10.0)] daz, c = np.unique(daz, return_counts=True) if daz.size < 1: return dict() frac = daz / daz[np.argmax(c)] sel = (frac > 0.9) & (frac < 1.1) daz = np.average(daz[sel], weights=c[sel]) az_num = round(360.0 / daz) daz = 360.0 / az_num az_start = (az0 + np.sign(180 - az0) * daz / 2) % daz - daz az_end = az_start + np.ceil((360 - az_start) / daz) * daz if az_end - daz > 360: az_end -= daz assert az_start <= 0, az_start assert az_end >= 360, az_end return { "start": az_start.item(), "num": round((az_end - az_start) / daz) + 1, "stop": az_end.item(), }
[docs] def histogram_to_cartesian_meshgrid( hist: Float2D, az_edges: Float1DLike, r_edges: Float1DLike, ) -> tuple[Float2D, Float2D, Float2D]: """Convert a polar 2D histogram into a Cartesian meshgrid for plotting. Given histogram values binned in polar coordinates (azimuth, range) together with their bin edges, this function computes Cartesian coordinate grids ``x`` and ``y`` for the bin corners. The result is suitable for visualizing the polar histogram in a Cartesian plot (e.g., via :meth:`matplotlib.axes.Axes.pcolormesh`). Azimuth edges are interpreted as degrees and converted to radians. The Cartesian mapping follows the usual radar convention (azimuth measured from North, increasing clockwise): ``x = r * sin(az)`` (East) and ``y = r * cos(az)`` (North). Typically, ``hist``, ``az_edges`` and ``r_edges`` come from :func:`~haxr.utilities.fill_histogram`. Args: hist: 2D histogram values binned over azimuth and range. az_edges: 1D azimuth bin edges in degrees. r_edges: 1D range bin edges. Returns: A tuple ``(x, y, values)`` where ``x`` and ``y`` are 2D arrays of bin-corner coordinates with shape ``(len(az_edges), len(r_edges))`` and ``values`` are the corresponding per-bin histogram values suitable for plotting against these edges. Example: .. code-block:: python hist, az_edges, r_edges = fill_histogram(az=az, r=r, weights=amp) x, y, values = histogram_to_cartesian_meshgrid(hist, az_edges, r_edges) fig, ax = plt.subplots() ax.pcolormesh(x, y, values, shading="auto") ax.set_aspect("equal", adjustable="box") """ az_edges_flat = np.asarray(az_edges, dtype=np.float64).ravel() r_edges_flat = np.asarray(r_edges, dtype=np.float64).ravel() r, az = np.meshgrid(r_edges_flat, np.deg2rad(az_edges_flat), indexing="xy") return r * np.sin(az), r * np.cos(az), hist