Skip to content

Core: Data

All dataset, preprocessing, representation, and discovery machinery.

Current layout

  • datasets/ - raw source adapters and dataset/source builders.
  • discovery/ - signal profiles, canonical entities, and provisional hypotheses for cross-vehicle alignment.
  • preprocessing/ - explicit representations, views, segments, materialization, PyG packing, temporal streams, scaler config, vocab config, and graph transforms.
  • datamodule/ - training-time loaders and batching policy.
  • state.py - process-local dataset state for reuse within one Python process.

graphids.core.data

data

Data-layer public API.

The runtime datamodules are imported lazily so preprocessing and discovery can be used without importing optional training dependencies.

CANBusDataset

CANBusDataset(root: str | Path, raw_dir: str | Path, *, val_fraction: float, split: str = 'train', source_dirs: list[str] | None = None, seed: int = 42, shared_vocab: dict | None = None, shared_vocab_digest: str | None = None, scaler_cfg: ScalerCfg = _DEFAULT_SCALER_CFG, representation_cfg: GraphRepresentationCfg = _DEFAULT_REPRESENTATION_CFG, transform=None, pre_transform=None)

Bases: BaseGraphDataset

One graph is one sliding window of CAN messages.

Source code in graphids/core/data/datasets/_base.py
def __init__(
    self,
    root: str | Path,
    raw_dir: str | Path,
    *,
    val_fraction: float,
    split: str = "train",
    source_dirs: list[str] | None = None,
    seed: int = 42,
    shared_vocab: dict | None = None,
    shared_vocab_digest: str | None = None,
    scaler_cfg: ScalerCfg = _DEFAULT_SCALER_CFG,
    representation_cfg: GraphRepresentationCfg = _DEFAULT_REPRESENTATION_CFG,
    transform=None,
    pre_transform=None,
):
    self.raw_data_dir = Path(raw_dir)
    self.split = split
    self.val_fraction = val_fraction
    self.source_dirs = source_dirs
    self.seed = seed
    self._shared_vocab = shared_vocab
    self._shared_vocab_digest = shared_vocab_digest
    self.scaler_cfg = scaler_cfg
    self.scaler_strategy = scaler_kind(scaler_cfg)
    self.representation_cfg = representation_cfg
    self.representation_kind = representation_kind(representation_cfg)
    self.window_size, self.stride = (
        self._resolved_window_size_stride(representation_cfg)
    )
    super().__init__(str(root), transform, pre_transform)
    self.load(self.processed_paths[0])
    self.num_ids = int(load_metadata(Path(self.root))["num_arb_ids"])
    if self.split in ("train", "val"):
        n = len(self)
        perm = torch.randperm(n, generator=torch.Generator().manual_seed(self.seed))
        n_val = int(n * self.val_fraction)
        self._indices = (perm[:n_val] if self.split == "val" else perm[n_val:]).tolist()

CANBusSource dataclass

CANBusSource(name: str, lake_root: str | None = None, val_fraction: float = 0.2, seed: int = 42, scaler_cfg: ScalerCfg = ZBenignScalerCfg(), representation_cfg: GraphRepresentationCfg = SnapshotRepresentationCfg(), vocab_scope: Literal['train', 'all'] = 'train')

Bases: BaseGraphSource

Catalog to train/val/test CANBusDataset builder.

DatasetState dataclass

DatasetState(train: Any, val: Any, test: dict[str, Any])

Ready-to-serve train/val/test splits.

clear_cache

clear_cache() -> None

Drop all cached states. Intended for test teardown.

Source code in graphids/core/data/state.py
def clear_cache() -> None:
    """Drop all cached states. Intended for test teardown."""
    _REGISTRY.clear()

get_or_build

get_or_build(dataset: _CacheableDataset) -> DatasetState

Return cached DatasetState for dataset.

Source code in graphids/core/data/state.py
def get_or_build(dataset: _CacheableDataset) -> DatasetState:
    """Return cached ``DatasetState`` for ``dataset``."""
    key = dataset.cache_key
    state = _REGISTRY.get(key)
    if state is None:
        state = dataset.build()
        _REGISTRY[key] = state
    return state

datamodule

DataModule primitives for graph and fusion datasets.

GraphDataModule

GraphDataModule(dataset, batch_size: int = 32, num_workers: int | None = None, prefetch_factor: int = 2, dynamic_batching: bool = True, label_filter: str | None = None, difficulty: Callable[..., Tensor] | None = None, scope_label: int = 0, min_steps_per_epoch: int = 1, require_cache: bool = False)

Bases: LightningDataModule

Source code in graphids/core/data/datamodule/graph.py
def __init__(
    self,
    dataset,
    batch_size: int = 32,
    num_workers: int | None = None,
    prefetch_factor: int = 2,
    dynamic_batching: bool = True,
    label_filter: str | None = None,
    difficulty: Callable[..., torch.Tensor] | None = None,
    scope_label: int = 0,
    min_steps_per_epoch: int = 1,
    require_cache: bool = False,
):
    super().__init__()
    self.source = dataset
    self.batch_size = batch_size
    self.num_workers = num_workers
    self.prefetch_factor = prefetch_factor
    self.dynamic_batching = dynamic_batching
    self.label_filter = label_filter
    self.difficulty = difficulty
    self.scope_label = scope_label
    self.min_steps_per_epoch = min_steps_per_epoch
    self.require_cache = require_cache
    self._train: InMemoryDataset | None = None
    self._val: InMemoryDataset | None = None
    self._tests: dict[str, InMemoryDataset] = {}
    self._train_graphs: list | None = None
    self._train_plans: list[list[int]] | None = None
    self._budget = None
train_eval_dataloader
train_eval_dataloader()

Eval-style train loader for calibration and centroid stats.

Source code in graphids/core/data/datamodule/graph.py
def train_eval_dataloader(self):
    """Eval-style train loader for calibration and centroid stats."""
    return self._fixed_loader(self._train_view(), shuffle=False)

TemporalDataModule

TemporalDataModule(dataset, batch_size: int = 256)

Bases: LightningDataModule

Serve temporal event streams with PyG's TemporalDataLoader.

Source code in graphids/core/data/datamodule/temporal.py
def __init__(self, dataset, batch_size: int = 256):
    super().__init__()
    self.source = dataset
    self.batch_size = batch_size
    self._train = None
    self._val = None
    self._tests: dict[str, object] = {}

fusion

Fusion data module for pre-extracted TensorDict caches.

graph

Lightning data module for graph datasets.

GraphDataModule
GraphDataModule(dataset, batch_size: int = 32, num_workers: int | None = None, prefetch_factor: int = 2, dynamic_batching: bool = True, label_filter: str | None = None, difficulty: Callable[..., Tensor] | None = None, scope_label: int = 0, min_steps_per_epoch: int = 1, require_cache: bool = False)

Bases: LightningDataModule

Source code in graphids/core/data/datamodule/graph.py
def __init__(
    self,
    dataset,
    batch_size: int = 32,
    num_workers: int | None = None,
    prefetch_factor: int = 2,
    dynamic_batching: bool = True,
    label_filter: str | None = None,
    difficulty: Callable[..., torch.Tensor] | None = None,
    scope_label: int = 0,
    min_steps_per_epoch: int = 1,
    require_cache: bool = False,
):
    super().__init__()
    self.source = dataset
    self.batch_size = batch_size
    self.num_workers = num_workers
    self.prefetch_factor = prefetch_factor
    self.dynamic_batching = dynamic_batching
    self.label_filter = label_filter
    self.difficulty = difficulty
    self.scope_label = scope_label
    self.min_steps_per_epoch = min_steps_per_epoch
    self.require_cache = require_cache
    self._train: InMemoryDataset | None = None
    self._val: InMemoryDataset | None = None
    self._tests: dict[str, InMemoryDataset] = {}
    self._train_graphs: list | None = None
    self._train_plans: list[list[int]] | None = None
    self._budget = None
train_eval_dataloader
train_eval_dataloader()

Eval-style train loader for calibration and centroid stats.

Source code in graphids/core/data/datamodule/graph.py
def train_eval_dataloader(self):
    """Eval-style train loader for calibration and centroid stats."""
    return self._fixed_loader(self._train_view(), shuffle=False)

sampler

Offline next-fit decreasing packer for variable-size graphs.

pack_offline
pack_offline(sizes: Tensor, max_num: int, *, edge_sizes: Tensor | None = None, max_edges: int | None = None) -> list[list[int]]

Pack graph indices under node and edge budgets.

The sorted next-fit strategy is intentionally linear after sorting. Exact first-fit gives slightly tighter bins, but it is quadratic on large cached graph datasets and can spend minutes on CPU before the first GPU step.

Source code in graphids/core/data/datamodule/sampler.py
def pack_offline(
    sizes: torch.Tensor,
    max_num: int,
    *,
    edge_sizes: torch.Tensor | None = None,
    max_edges: int | None = None,
) -> list[list[int]]:
    """Pack graph indices under node and edge budgets.

    The sorted next-fit strategy is intentionally linear after sorting. Exact
    first-fit gives slightly tighter bins, but it is quadratic on large cached
    graph datasets and can spend minutes on CPU before the first GPU step.
    """
    if max_num <= 0:
        raise ValueError(f"max_num must be positive, got {max_num}")
    if edge_sizes is not None:
        if len(edge_sizes) != len(sizes):
            raise ValueError(
                f"edge_sizes length ({len(edge_sizes)}) != sizes length ({len(sizes)})"
            )
        if max_edges is None or max_edges <= 0:
            raise ValueError("max_edges must be a positive int when edge_sizes is given")

    sizes = sizes.to(torch.long)
    es = edge_sizes.to(torch.long) if edge_sizes is not None else None
    order = torch.argsort(sizes, descending=True).tolist()

    bins: list[_Bin] = []
    current = _Bin()
    skipped = 0
    for i in order:
        n_i = int(sizes[i])
        e_i = int(es[i]) if es is not None else 0
        if n_i > max_num or (max_edges is not None and e_i > max_edges):
            skipped += 1
            continue
        fits_current = current.n_sum + n_i <= max_num and (
            max_edges is None or current.e_sum + e_i <= max_edges
        )
        if current.indices and not fits_current:
            bins.append(current)
            current = _Bin()
        current.indices.append(i)
        current.n_sum += n_i
        current.e_sum += e_i

    if current.indices:
        bins.append(current)

    if skipped:
        log.warning(
            "sampler_skipped_oversize",
            n_skipped=skipped,
            n_total=len(sizes),
            max_nodes=max_num,
            max_edges=max_edges,
        )
    return [b.indices for b in bins]

temporal

Lightning data module for temporal PyG event streams.

TemporalDataModule
TemporalDataModule(dataset, batch_size: int = 256)

Bases: LightningDataModule

Serve temporal event streams with PyG's TemporalDataLoader.

Source code in graphids/core/data/datamodule/temporal.py
def __init__(self, dataset, batch_size: int = 256):
    super().__init__()
    self.source = dataset
    self.batch_size = batch_size
    self._train = None
    self._val = None
    self._tests: dict[str, object] = {}

datasets

CANBusDataset

CANBusDataset(root: str | Path, raw_dir: str | Path, *, val_fraction: float, split: str = 'train', source_dirs: list[str] | None = None, seed: int = 42, shared_vocab: dict | None = None, shared_vocab_digest: str | None = None, scaler_cfg: ScalerCfg = _DEFAULT_SCALER_CFG, representation_cfg: GraphRepresentationCfg = _DEFAULT_REPRESENTATION_CFG, transform=None, pre_transform=None)

Bases: BaseGraphDataset

One graph is one sliding window of CAN messages.

Source code in graphids/core/data/datasets/_base.py
def __init__(
    self,
    root: str | Path,
    raw_dir: str | Path,
    *,
    val_fraction: float,
    split: str = "train",
    source_dirs: list[str] | None = None,
    seed: int = 42,
    shared_vocab: dict | None = None,
    shared_vocab_digest: str | None = None,
    scaler_cfg: ScalerCfg = _DEFAULT_SCALER_CFG,
    representation_cfg: GraphRepresentationCfg = _DEFAULT_REPRESENTATION_CFG,
    transform=None,
    pre_transform=None,
):
    self.raw_data_dir = Path(raw_dir)
    self.split = split
    self.val_fraction = val_fraction
    self.source_dirs = source_dirs
    self.seed = seed
    self._shared_vocab = shared_vocab
    self._shared_vocab_digest = shared_vocab_digest
    self.scaler_cfg = scaler_cfg
    self.scaler_strategy = scaler_kind(scaler_cfg)
    self.representation_cfg = representation_cfg
    self.representation_kind = representation_kind(representation_cfg)
    self.window_size, self.stride = (
        self._resolved_window_size_stride(representation_cfg)
    )
    super().__init__(str(root), transform, pre_transform)
    self.load(self.processed_paths[0])
    self.num_ids = int(load_metadata(Path(self.root))["num_arb_ids"])
    if self.split in ("train", "val"):
        n = len(self)
        perm = torch.randperm(n, generator=torch.Generator().manual_seed(self.seed))
        n_val = int(n * self.val_fraction)
        self._indices = (perm[:n_val] if self.split == "val" else perm[n_val:]).tolist()

CANBusSource dataclass

CANBusSource(name: str, lake_root: str | None = None, val_fraction: float = 0.2, seed: int = 42, scaler_cfg: ScalerCfg = ZBenignScalerCfg(), representation_cfg: GraphRepresentationCfg = SnapshotRepresentationCfg(), vocab_scope: Literal['train', 'all'] = 'train')

Bases: BaseGraphSource

Catalog to train/val/test CANBusDataset builder.

TemporalCANBusSource dataclass

TemporalCANBusSource(name: str, seed: int, lake_root: str | None = None, val_fraction: float = 0.2, vocab_scope: Literal['train', 'all'] = 'train', representation_cfg: TemporalRepresentationCfg = TemporalRepresentationCfg())

Catalog to build temporal CAN event streams.

can

CAN-specific dataset adapters and source primitives.

CANBusDataset
CANBusDataset(root: str | Path, raw_dir: str | Path, *, val_fraction: float, split: str = 'train', source_dirs: list[str] | None = None, seed: int = 42, shared_vocab: dict | None = None, shared_vocab_digest: str | None = None, scaler_cfg: ScalerCfg = _DEFAULT_SCALER_CFG, representation_cfg: GraphRepresentationCfg = _DEFAULT_REPRESENTATION_CFG, transform=None, pre_transform=None)

Bases: BaseGraphDataset

One graph is one sliding window of CAN messages.

Source code in graphids/core/data/datasets/_base.py
def __init__(
    self,
    root: str | Path,
    raw_dir: str | Path,
    *,
    val_fraction: float,
    split: str = "train",
    source_dirs: list[str] | None = None,
    seed: int = 42,
    shared_vocab: dict | None = None,
    shared_vocab_digest: str | None = None,
    scaler_cfg: ScalerCfg = _DEFAULT_SCALER_CFG,
    representation_cfg: GraphRepresentationCfg = _DEFAULT_REPRESENTATION_CFG,
    transform=None,
    pre_transform=None,
):
    self.raw_data_dir = Path(raw_dir)
    self.split = split
    self.val_fraction = val_fraction
    self.source_dirs = source_dirs
    self.seed = seed
    self._shared_vocab = shared_vocab
    self._shared_vocab_digest = shared_vocab_digest
    self.scaler_cfg = scaler_cfg
    self.scaler_strategy = scaler_kind(scaler_cfg)
    self.representation_cfg = representation_cfg
    self.representation_kind = representation_kind(representation_cfg)
    self.window_size, self.stride = (
        self._resolved_window_size_stride(representation_cfg)
    )
    super().__init__(str(root), transform, pre_transform)
    self.load(self.processed_paths[0])
    self.num_ids = int(load_metadata(Path(self.root))["num_arb_ids"])
    if self.split in ("train", "val"):
        n = len(self)
        perm = torch.randperm(n, generator=torch.Generator().manual_seed(self.seed))
        n_val = int(n * self.val_fraction)
        self._indices = (perm[:n_val] if self.split == "val" else perm[n_val:]).tolist()
CANBusSource dataclass
CANBusSource(name: str, lake_root: str | None = None, val_fraction: float = 0.2, seed: int = 42, scaler_cfg: ScalerCfg = ZBenignScalerCfg(), representation_cfg: GraphRepresentationCfg = SnapshotRepresentationCfg(), vocab_scope: Literal['train', 'all'] = 'train')

Bases: BaseGraphSource

Catalog to train/val/test CANBusDataset builder.

TemporalCANBusSource dataclass
TemporalCANBusSource(name: str, seed: int, lake_root: str | None = None, val_fraction: float = 0.2, vocab_scope: Literal['train', 'all'] = 'train', representation_cfg: TemporalRepresentationCfg = TemporalRepresentationCfg())

Catalog to build temporal CAN event streams.

infer_attack_type
infer_attack_type(csv: Path) -> int

Infer the attack code from filename/path substrings.

Source code in graphids/core/data/datasets/can_bus.py
def infer_attack_type(csv: Path) -> int:
    """Infer the attack code from filename/path substrings."""
    s = csv.stem.lower() + " " + csv.parent.name.lower()
    for kw, code in ATTACK_TYPE_CODES.items():
        if kw in s:
            return code
    return 0
load_can_rows
load_can_rows(raw_dir: Path, source_dirs: list[str]) -> pl.DataFrame

Load, normalize, and parse raw CAN CSVs from source dirs.

Source code in graphids/core/data/datasets/can_bus.py
def load_can_rows(raw_dir: Path, source_dirs: list[str]) -> pl.DataFrame:
    """Load, normalize, and parse raw CAN CSVs from source dirs."""
    if not source_dirs:
        raise ValueError("source_dirs is empty; cannot load CAN rows")
    frames: list[pl.LazyFrame] = []
    for sub in source_dirs:
        sub_path = raw_dir / sub
        if not sub_path.is_dir():
            raise FileNotFoundError(f"declared source_dir {sub!r} missing under {raw_dir}")
        for csv_path in sorted(sub_path.rglob("*.csv")):
            at = infer_attack_type(csv_path)
            frames.append(
                pl.scan_csv(csv_path).with_columns(
                    pl.lit(at).alias("attack_type"),
                    pl.lit(sub).alias("vehicle_id"),
                )
            )
    if not frames:
        raise ValueError(f"no CSVs under any of {source_dirs!r} in {raw_dir}")

    combined = pl.concat(frames).sort("timestamp")
    cols = combined.collect_schema().names()
    renames = {
        old: new
        for old, new in (("arbitration_id", "arb_id"), ("data_field", "payload"))
        if old in cols
    }
    if renames:
        combined = combined.rename(renames)
    return parse_payload(combined).collect()
parse_payload
parse_payload(lf: LazyFrame) -> pl.LazyFrame

Hex payload to byte_0..7 plus Shannon entropy.

Source code in graphids/core/data/datasets/can_bus.py
def parse_payload(lf: pl.LazyFrame) -> pl.LazyFrame:
    """Hex ``payload`` to ``byte_0..7`` plus Shannon entropy."""
    if "byte_0" in lf.collect_schema().names():
        return lf
    byte_exprs = [
        pl.col("payload").str.slice(i * 2, 2).str.to_integer(base=16, strict=False)
        .fill_null(0).cast(pl.Float32).alias(f"byte_{i}")
        for i in range(N_BYTES)
    ]
    lf = lf.with_columns(byte_exprs)
    bcols = [pl.col(c) for c in BYTE_COLS]
    row_sum = pl.sum_horizontal(bcols).clip(1e-12, None)
    entropy = pl.sum_horizontal(
        [pl.when(c > 0).then(-(c / row_sum) * (c / row_sum).log()).otherwise(0.0) for c in bcols]
    ).alias("entropy")
    return lf.with_columns(entropy)

can_bus

CAN bus dataset adapter and schema.

CANBusDataset
CANBusDataset(root: str | Path, raw_dir: str | Path, *, val_fraction: float, split: str = 'train', source_dirs: list[str] | None = None, seed: int = 42, shared_vocab: dict | None = None, shared_vocab_digest: str | None = None, scaler_cfg: ScalerCfg = _DEFAULT_SCALER_CFG, representation_cfg: GraphRepresentationCfg = _DEFAULT_REPRESENTATION_CFG, transform=None, pre_transform=None)

Bases: BaseGraphDataset

One graph is one sliding window of CAN messages.

Source code in graphids/core/data/datasets/_base.py
def __init__(
    self,
    root: str | Path,
    raw_dir: str | Path,
    *,
    val_fraction: float,
    split: str = "train",
    source_dirs: list[str] | None = None,
    seed: int = 42,
    shared_vocab: dict | None = None,
    shared_vocab_digest: str | None = None,
    scaler_cfg: ScalerCfg = _DEFAULT_SCALER_CFG,
    representation_cfg: GraphRepresentationCfg = _DEFAULT_REPRESENTATION_CFG,
    transform=None,
    pre_transform=None,
):
    self.raw_data_dir = Path(raw_dir)
    self.split = split
    self.val_fraction = val_fraction
    self.source_dirs = source_dirs
    self.seed = seed
    self._shared_vocab = shared_vocab
    self._shared_vocab_digest = shared_vocab_digest
    self.scaler_cfg = scaler_cfg
    self.scaler_strategy = scaler_kind(scaler_cfg)
    self.representation_cfg = representation_cfg
    self.representation_kind = representation_kind(representation_cfg)
    self.window_size, self.stride = (
        self._resolved_window_size_stride(representation_cfg)
    )
    super().__init__(str(root), transform, pre_transform)
    self.load(self.processed_paths[0])
    self.num_ids = int(load_metadata(Path(self.root))["num_arb_ids"])
    if self.split in ("train", "val"):
        n = len(self)
        perm = torch.randperm(n, generator=torch.Generator().manual_seed(self.seed))
        n_val = int(n * self.val_fraction)
        self._indices = (perm[:n_val] if self.split == "val" else perm[n_val:]).tolist()
CANBusSource dataclass
CANBusSource(name: str, lake_root: str | None = None, val_fraction: float = 0.2, seed: int = 42, scaler_cfg: ScalerCfg = ZBenignScalerCfg(), representation_cfg: GraphRepresentationCfg = SnapshotRepresentationCfg(), vocab_scope: Literal['train', 'all'] = 'train')

Bases: BaseGraphSource

Catalog to train/val/test CANBusDataset builder.

TemporalCANBusSource dataclass
TemporalCANBusSource(name: str, seed: int, lake_root: str | None = None, val_fraction: float = 0.2, vocab_scope: Literal['train', 'all'] = 'train', representation_cfg: TemporalRepresentationCfg = TemporalRepresentationCfg())

Catalog to build temporal CAN event streams.

infer_attack_type
infer_attack_type(csv: Path) -> int

Infer the attack code from filename/path substrings.

Source code in graphids/core/data/datasets/can_bus.py
def infer_attack_type(csv: Path) -> int:
    """Infer the attack code from filename/path substrings."""
    s = csv.stem.lower() + " " + csv.parent.name.lower()
    for kw, code in ATTACK_TYPE_CODES.items():
        if kw in s:
            return code
    return 0
load_can_rows
load_can_rows(raw_dir: Path, source_dirs: list[str]) -> pl.DataFrame

Load, normalize, and parse raw CAN CSVs from source dirs.

Source code in graphids/core/data/datasets/can_bus.py
def load_can_rows(raw_dir: Path, source_dirs: list[str]) -> pl.DataFrame:
    """Load, normalize, and parse raw CAN CSVs from source dirs."""
    if not source_dirs:
        raise ValueError("source_dirs is empty; cannot load CAN rows")
    frames: list[pl.LazyFrame] = []
    for sub in source_dirs:
        sub_path = raw_dir / sub
        if not sub_path.is_dir():
            raise FileNotFoundError(f"declared source_dir {sub!r} missing under {raw_dir}")
        for csv_path in sorted(sub_path.rglob("*.csv")):
            at = infer_attack_type(csv_path)
            frames.append(
                pl.scan_csv(csv_path).with_columns(
                    pl.lit(at).alias("attack_type"),
                    pl.lit(sub).alias("vehicle_id"),
                )
            )
    if not frames:
        raise ValueError(f"no CSVs under any of {source_dirs!r} in {raw_dir}")

    combined = pl.concat(frames).sort("timestamp")
    cols = combined.collect_schema().names()
    renames = {
        old: new
        for old, new in (("arbitration_id", "arb_id"), ("data_field", "payload"))
        if old in cols
    }
    if renames:
        combined = combined.rename(renames)
    return parse_payload(combined).collect()
parse_payload
parse_payload(lf: LazyFrame) -> pl.LazyFrame

Hex payload to byte_0..7 plus Shannon entropy.

Source code in graphids/core/data/datasets/can_bus.py
def parse_payload(lf: pl.LazyFrame) -> pl.LazyFrame:
    """Hex ``payload`` to ``byte_0..7`` plus Shannon entropy."""
    if "byte_0" in lf.collect_schema().names():
        return lf
    byte_exprs = [
        pl.col("payload").str.slice(i * 2, 2).str.to_integer(base=16, strict=False)
        .fill_null(0).cast(pl.Float32).alias(f"byte_{i}")
        for i in range(N_BYTES)
    ]
    lf = lf.with_columns(byte_exprs)
    bcols = [pl.col(c) for c in BYTE_COLS]
    row_sum = pl.sum_horizontal(bcols).clip(1e-12, None)
    entropy = pl.sum_horizontal(
        [pl.when(c > 0).then(-(c / row_sum) * (c / row_sum).log()).otherwise(0.0) for c in bcols]
    ).alias("entropy")
    return lf.with_columns(entropy)

discovery

Signal-discovery primitives for cross-vehicle ontology building.

CanonicalEntitySpec dataclass

CanonicalEntitySpec(canonical_id: str, name: str, aliases: tuple[str, ...] = (), vehicle_aliases: dict[str, tuple[str, ...]] = dict(), kind: Literal['signal', 'message', 'state', 'entity'] = 'signal', description: str | None = None)

One shared semantic entity across vehicles.

CanonicalFeatureFrameSpec dataclass

CanonicalFeatureFrameSpec(time_col: str = 'timestamp', vehicle_col: str | None = 'vehicle_id', alias_col: str = 'signal', value_col: str = 'value', keep_cols: tuple[str, ...] = (), feature_cols: tuple[str, ...] = (), unmapped: Literal['raise', 'drop', 'keep'] = 'raise')

How to flatten decoded vehicle rows into canonical feature records.

CanonicalRegistry dataclass

CanonicalRegistry(entities: tuple[CanonicalEntitySpec, ...])

Lookup table for canonical entities and vehicle-specific aliases.

lookup_frame
lookup_frame() -> pl.DataFrame

Return a canonical lookup frame for vectorized joins.

Source code in graphids/core/data/discovery/canonical.py
def lookup_frame(self) -> pl.DataFrame:
    """Return a canonical lookup frame for vectorized joins."""
    rows: list[dict[str, str]] = []
    for spec in self.entities:
        for key in (spec.canonical_id, spec.name, *spec.aliases):
            rows.append(
                {
                    "vehicle_key": "*",
                    "alias_key": _norm(key),
                    "canonical_id": spec.canonical_id,
                    "canonical_name": spec.name,
                    "kind": spec.kind,
                }
            )
        for vehicle, aliases in spec.vehicle_aliases.items():
            for alias in aliases:
                rows.append(
                    {
                        "vehicle_key": _norm(vehicle),
                        "alias_key": _norm(alias),
                        "canonical_id": spec.canonical_id,
                        "canonical_name": spec.name,
                        "kind": spec.kind,
                    }
                )
    return pl.DataFrame(rows).unique(["vehicle_key", "alias_key"], keep="last")
lookup_table
lookup_table() -> dict[str, CanonicalEntitySpec]

Return alias -> entity mapping using vehicle::alias keys.

Source code in graphids/core/data/discovery/canonical.py
def lookup_table(self) -> dict[str, CanonicalEntitySpec]:
    """Return alias -> entity mapping using ``vehicle::alias`` keys."""
    table: dict[str, CanonicalEntitySpec] = {}
    for spec in self.entities:
        keys = [spec.canonical_id, spec.name, *spec.aliases]
        for key in keys:
            self._insert(table, f"*::{_norm(key)}", spec)
        for vehicle, aliases in spec.vehicle_aliases.items():
            for alias in aliases:
                self._insert(table, f"{_norm(vehicle)}::{_norm(alias)}", spec)
    return table
resolve
resolve(alias: str, *, vehicle: str | None = None) -> CanonicalEntitySpec

Resolve an alias to a canonical entity.

Source code in graphids/core/data/discovery/canonical.py
def resolve(self, alias: str, *, vehicle: str | None = None) -> CanonicalEntitySpec:
    """Resolve an alias to a canonical entity."""
    table = self.lookup_table()
    alias_key = _norm(alias)
    if vehicle is not None:
        spec = table.get(f"{_norm(vehicle)}::{alias_key}")
        if spec is not None:
            return spec
    spec = table.get(f"*::{alias_key}")
    if spec is None:
        raise KeyError(f"unknown canonical alias: {alias!r}")
    return spec

DataLayerLayout dataclass

DataLayerLayout(root: Path)

A single place to point at the three persistent data layers.

DiscoveryStore dataclass

DiscoveryStore(root: Path, profiles_name: str = 'signal_profiles.parquet', hypotheses_name: str = 'canonical_hypotheses.parquet', manifest_name: str = 'discovery_manifest.json')

File-backed signal-profile and hypothesis tables.

rank_hypotheses
rank_hypotheses() -> pl.DataFrame

Return ranked profiles joined to the stored hypothesis table.

Source code in graphids/core/data/discovery/hypotheses.py
def rank_hypotheses(self) -> pl.DataFrame:
    """Return ranked profiles joined to the stored hypothesis table."""
    return rank_signal_hypotheses(self.load_profiles(), self.load_hypotheses())
rank_profiles
rank_profiles() -> pl.DataFrame

Return ranked signal profiles from the stored profile table.

Source code in graphids/core/data/discovery/hypotheses.py
def rank_profiles(self) -> pl.DataFrame:
    """Return ranked signal profiles from the stored profile table."""
    return rank_signal_profiles(self.load_profiles())

HypothesisRecordSpec dataclass

HypothesisRecordSpec(vehicle_id: str, raw_signal: str, candidate_canonical_id: str | None = None, confidence: float = 0.0, status: Literal['unreviewed', 'provisional', 'confirmed', 'rejected'] = 'unreviewed', evidence: tuple[str, ...] = (), profile_path: Path | None = None, feature_digest: str | None = None)

Provisional semantic mapping for a raw signal.

MaterializedViewSpec dataclass

MaterializedViewSpec(root: Path, name: str = 'materialized_views', view_kind: ViewKind = 'snapshot', partition_cols: tuple[str, ...] = ('vehicle_id', 'view_kind', 'split'), format: Literal['parquet', 'sqlite', 'duckdb'] = 'parquet', key_cols: tuple[str, ...] = ('vehicle_id', 'canonical_id', 'timestamp'), feature_cols: tuple[str, ...] = (), label_cols: tuple[str, ...] = ('attack', 'attack_type'))

Training-facing view materialization contract.

RawEventTableSpec dataclass

RawEventTableSpec(root: Path, name: str = 'raw_can_events', partition_cols: tuple[str, ...] = ('vehicle_id', 'day'), format: Literal['parquet', 'sqlite', 'duckdb'] = 'parquet', primary_time_col: str = 'timestamp', raw_id_col: str = 'arb_id', vehicle_col: str = 'vehicle_id', attack_col: str = 'attack', signal_hint_col: str = 'signal_hint')

Canonical storage for immutable decoded CAN rows.

SignalHypothesisSpec dataclass

SignalHypothesisSpec(vehicle_id: str, raw_signal: str, candidate_canonical_id: str | None = None, confidence: float = 0.0, status: Literal['unreviewed', 'provisional', 'confirmed', 'rejected'] = 'unreviewed', evidence: tuple[str, ...] = ())

A provisional cross-vehicle mapping for a raw signal.

SignalProfileSpec dataclass

SignalProfileSpec(vehicle_col: str = 'vehicle_id', signal_col: str = 'arb_id', time_col: str = 'timestamp', entropy_col: str = 'entropy', attack_col: str = 'attack', byte_prefix: str = 'byte_', include_attack: bool = True)

How to aggregate raw CAN rows into per-signal profiles.

build_canonical_feature_frame

build_canonical_feature_frame(df: DataFrame, registry: CanonicalRegistry, *, spec: CanonicalFeatureFrameSpec | None = None) -> pl.DataFrame

Normalize decoded rows into a long canonical feature table.

Source code in graphids/core/data/discovery/canonical.py
def build_canonical_feature_frame(
    df: pl.DataFrame,
    registry: CanonicalRegistry,
    *,
    spec: CanonicalFeatureFrameSpec | None = None,
) -> pl.DataFrame:
    """Normalize decoded rows into a long canonical feature table."""
    spec = spec or CanonicalFeatureFrameSpec()
    feature_cols = tuple(spec.feature_cols)
    if feature_cols:
        index_cols = [spec.time_col, *spec.keep_cols]
        if spec.vehicle_col:
            index_cols.append(spec.vehicle_col)
        index_cols = [c for i, c in enumerate(index_cols) if c in df.columns and c not in index_cols[:i]]
        frame = df.unpivot(
            on=list(feature_cols),
            index=index_cols,
            variable_name=spec.alias_col,
            value_name=spec.value_col,
        )
    else:
        frame = df.clone()
    if spec.alias_col not in frame.columns:
        raise ValueError(f"missing alias column {spec.alias_col!r}")
    if spec.value_col not in frame.columns:
        raise ValueError(f"missing value column {spec.value_col!r}")

    lookup = registry.lookup_frame()
    out = frame.with_columns(
        pl.col(spec.alias_col).cast(pl.Utf8).str.to_lowercase().alias("_alias_key"),
        pl.lit("*").alias("_vehicle_key"),
    )
    if spec.vehicle_col and spec.vehicle_col in out.columns:
        out = out.with_columns(pl.col(spec.vehicle_col).cast(pl.Utf8).str.to_lowercase().alias("_vehicle_key"))
    exact = out.join(
        lookup,
        left_on=["_vehicle_key", "_alias_key"],
        right_on=["vehicle_key", "alias_key"],
        how="left",
        suffix="_exact",
    )
    global_lookup = lookup.filter(pl.col("vehicle_key") == "*").drop("vehicle_key")
    global_join = out.join(
        global_lookup,
        left_on="_alias_key",
        right_on="alias_key",
        how="left",
        suffix="_global",
    )
    out = exact.with_columns(
        pl.coalesce(
            [pl.col("canonical_id"), global_join["canonical_id"]],
        ).alias("canonical_id"),
        pl.coalesce(
            [pl.col("canonical_name"), global_join["canonical_name"]],
        ).alias("canonical_name"),
        pl.coalesce(
            [pl.col("kind"), global_join["kind"]],
        ).alias("kind"),
    ).drop(["_alias_key", "_vehicle_key"], strict=False)

    if spec.unmapped == "raise":
        missing = out.filter(pl.col("canonical_id").is_null())
        if missing.height:
            sample = missing.select(spec.alias_col).head(5).to_series().to_list()
            raise KeyError(f"unmapped canonical aliases: {sample!r}")
    elif spec.unmapped == "drop":
        out = out.filter(pl.col("canonical_id").is_not_null())
    else:
        out = out.with_columns(
            pl.when(pl.col("canonical_id").is_null())
            .then(pl.col(spec.alias_col).cast(pl.Utf8))
            .otherwise(pl.col("canonical_id"))
            .alias("canonical_id")
        )

    spec_by_id = {e.canonical_id: e for e in registry.entities}
    out = out.with_columns(
        pl.col("canonical_id").map_elements(
            lambda cid: spec_by_id.get(str(cid)).name if cid is not None and str(cid) in spec_by_id else None,
            return_dtype=pl.Utf8,
        ).alias("canonical_name"),
        pl.col("canonical_id").map_elements(
            lambda cid: spec_by_id.get(str(cid)).kind if cid is not None and str(cid) in spec_by_id else None,
            return_dtype=pl.Utf8,
        ).alias("kind"),
    )
    return out

build_signal_profiles

build_signal_profiles(df: DataFrame, spec: SignalProfileSpec | None = None) -> pl.DataFrame

Aggregate raw CAN rows into one profile per vehicle/signal pair.

Source code in graphids/core/data/discovery/hypotheses.py
def build_signal_profiles(df: pl.DataFrame, spec: SignalProfileSpec | None = None) -> pl.DataFrame:
    """Aggregate raw CAN rows into one profile per vehicle/signal pair."""
    spec = spec or SignalProfileSpec()
    missing = [c for c in (spec.vehicle_col, spec.signal_col) if c not in df.columns]
    if missing:
        raise ValueError(f"build_signal_profiles missing columns: {missing}")

    sort_cols = [c for c in (spec.vehicle_col, spec.signal_col, spec.time_col) if c in df.columns]
    if sort_cols:
        df = df.sort(sort_cols)

    byte_cols = _byte_cols(df, prefix=spec.byte_prefix)
    group_cols = [spec.vehicle_col, spec.signal_col]
    aggs: list[pl.Expr] = [pl.len().cast(pl.Int64).alias("msg_count")]
    if spec.time_col in df.columns:
        aggs.extend(
            [
                pl.col(spec.time_col).min().cast(pl.Float64).alias("timestamp_min"),
                pl.col(spec.time_col).max().cast(pl.Float64).alias("timestamp_max"),
                (pl.col(spec.time_col).max() - pl.col(spec.time_col).min())
                .cast(pl.Float64)
                .alias("duration"),
                pl.col(spec.time_col).diff().mean().cast(pl.Float64).alias("iat_mean"),
                pl.col(spec.time_col).diff().std().fill_nan(0).cast(pl.Float64).alias("iat_std"),
            ]
        )
    if spec.entropy_col in df.columns:
        aggs.extend(
            [
                pl.col(spec.entropy_col).mean().cast(pl.Float64).alias("entropy_mean"),
                pl.col(spec.entropy_col).std().fill_nan(0).cast(pl.Float64).alias("entropy_std"),
            ]
        )
    if byte_cols:
        aggs.extend(
            [
                *[pl.col(c).mean().cast(pl.Float64).alias(f"{c}_mean") for c in byte_cols],
                *[pl.col(c).std().fill_nan(0).cast(pl.Float64).alias(f"{c}_std") for c in byte_cols],
                *[pl.col(c).min().cast(pl.Float64).alias(f"{c}_min") for c in byte_cols],
                *[pl.col(c).max().cast(pl.Float64).alias(f"{c}_max") for c in byte_cols],
                *[(pl.col(c).max() - pl.col(c).min()).cast(pl.Float64).alias(f"{c}_range") for c in byte_cols],
                pl.mean_horizontal(
                    *[(pl.col(c).diff().abs().drop_nulls() > 0).mean().fill_null(0) for c in byte_cols]
                ).cast(pl.Float64).alias("change_rate"),
                pl.mean_horizontal(
                    *[pl.col(c).skew().fill_nan(0).fill_null(0).clip(-10, 10) for c in byte_cols]
                ).cast(pl.Float64).alias("skewness"),
                pl.mean_horizontal(
                    *[pl.col(c).kurtosis().fill_nan(0).fill_null(0).clip(-10, 10) for c in byte_cols]
                ).cast(pl.Float64).alias("kurtosis"),
            ]
        )
    if spec.include_attack and spec.attack_col in df.columns:
        aggs.extend(
            [
                pl.col(spec.attack_col).max().cast(pl.Int64).alias("attack_max"),
                pl.col(spec.attack_col).mean().cast(pl.Float64).alias("attack_rate"),
            ]
        )
    out = df.group_by(group_cols).agg(*aggs)
    out = out.with_columns(
        pl.concat_str(
            [pl.col(spec.vehicle_col).cast(pl.Utf8), pl.col(spec.signal_col).cast(pl.Utf8)],
            separator="::",
        ).alias("signal_key")
    )
    return out

initialize_hypotheses

initialize_hypotheses(profiles: DataFrame) -> pl.DataFrame

Create an empty hypothesis table aligned to a profile table.

Source code in graphids/core/data/discovery/hypotheses.py
def initialize_hypotheses(profiles: pl.DataFrame) -> pl.DataFrame:
    """Create an empty hypothesis table aligned to a profile table."""
    required = ["vehicle_id", "arb_id", "signal_key"]
    missing = [c for c in required if c not in profiles.columns]
    if missing:
        raise ValueError(f"initialize_hypotheses missing columns: {missing}")
    return profiles.select("vehicle_id", "arb_id", "signal_key").with_columns(
        pl.lit(None, dtype=pl.Utf8).alias("candidate_canonical_id"),
        pl.lit(0.0).cast(pl.Float64).alias("confidence"),
        pl.lit("unreviewed").alias("status"),
        pl.lit("").cast(pl.Utf8).alias("evidence"),
    )

rank_signal_hypotheses

rank_signal_hypotheses(profiles: DataFrame, hypotheses: DataFrame | None = None) -> pl.DataFrame

Join profile scores to provisional hypotheses when available.

Source code in graphids/core/data/discovery/ranking.py
def rank_signal_hypotheses(
    profiles: pl.DataFrame,
    hypotheses: pl.DataFrame | None = None,
) -> pl.DataFrame:
    """Join profile scores to provisional hypotheses when available."""
    ranked = rank_signal_profiles(profiles)
    if hypotheses is None or hypotheses.is_empty():
        return ranked

    join_col = _pick_group_col(profiles)
    hyp_cols = [c for c in ("vehicle_id", "raw_signal", "candidate_canonical_id", "status", "confidence") if c in hypotheses.columns]
    if join_col in hypotheses.columns:
        cols = [join_col, *[c for c in hyp_cols if c != join_col]]
        return ranked.join(hypotheses.select(*cols), on=join_col, how="left")
    if "raw_signal" in hypotheses.columns and join_col != "raw_signal":
        cols = ["raw_signal", *[c for c in hyp_cols if c != "raw_signal"]]
        return ranked.join(hypotheses.select(*cols), left_on=join_col, right_on="raw_signal", how="left")
    return ranked

rank_signal_profiles

rank_signal_profiles(profiles: DataFrame) -> pl.DataFrame

Score per-signal profile rows by cross-vehicle support and stability.

The goal is not to claim a final ontology match; it is to give the discovery layer a concrete relational ranking pass that can surface stable signals for review.

Source code in graphids/core/data/discovery/ranking.py
def rank_signal_profiles(profiles: pl.DataFrame) -> pl.DataFrame:
    """Score per-signal profile rows by cross-vehicle support and stability.

    The goal is not to claim a final ontology match; it is to give the
    discovery layer a concrete relational ranking pass that can surface
    stable signals for review.
    """
    group_col = _pick_group_col(profiles)
    required = {group_col, "vehicle_id"}
    missing = sorted(required.difference(profiles.columns))
    if missing:
        raise ValueError(f"rank_signal_profiles missing columns: {missing}")

    optional_defaults = {
        "msg_count": 0.0,
        "entropy_mean": 0.0,
        "entropy_std": 0.0,
        "change_rate": 0.0,
        "attack_rate": 0.0,
        "skewness": 0.0,
        "kurtosis": 0.0,
    }
    for col, default in optional_defaults.items():
        if col not in profiles.columns:
            profiles = profiles.with_columns(pl.lit(default).cast(pl.Float64).alias(col))

    support = (
        profiles.group_by(group_col)
        .agg(
            pl.n_unique("vehicle_id").alias("vehicle_support"),
            pl.len().alias("profile_rows"),
            pl.col("msg_count").mean().fill_null(0).cast(pl.Float64).alias("msg_count_mean"),
            pl.col("msg_count").std().fill_null(0).cast(pl.Float64).alias("msg_count_std"),
            pl.col("entropy_mean").mean().fill_null(0).cast(pl.Float64).alias("entropy_mean"),
            pl.col("entropy_std").mean().fill_null(0).cast(pl.Float64).alias("entropy_std"),
            pl.col("change_rate").mean().fill_null(0).cast(pl.Float64).alias("change_rate"),
            pl.col("attack_rate").mean().fill_null(0).cast(pl.Float64).alias("attack_rate"),
            pl.col("skewness").mean().fill_null(0).cast(pl.Float64).alias("skewness"),
            pl.col("kurtosis").mean().fill_null(0).cast(pl.Float64).alias("kurtosis"),
        )
        .with_columns(
            (
                pl.col("vehicle_support").cast(pl.Float64)
                * (1.0 + (pl.col("msg_count_mean") + 1.0).log())
                / (1.0 + pl.col("msg_count_std"))
                / (1.0 + pl.col("entropy_std"))
                / (1.0 + pl.col("change_rate").abs())
                / (1.0 + pl.col("attack_rate").clip(0.0, 1.0))
            ).alias("ranking_score")
        )
        .sort(["ranking_score", "vehicle_support", "profile_rows"], descending=True)
    )
    return support

canonical

Canonical entity and feature-table primitives for cross-vehicle views.

CanonicalEntitySpec dataclass
CanonicalEntitySpec(canonical_id: str, name: str, aliases: tuple[str, ...] = (), vehicle_aliases: dict[str, tuple[str, ...]] = dict(), kind: Literal['signal', 'message', 'state', 'entity'] = 'signal', description: str | None = None)

One shared semantic entity across vehicles.

CanonicalFeatureFrameSpec dataclass
CanonicalFeatureFrameSpec(time_col: str = 'timestamp', vehicle_col: str | None = 'vehicle_id', alias_col: str = 'signal', value_col: str = 'value', keep_cols: tuple[str, ...] = (), feature_cols: tuple[str, ...] = (), unmapped: Literal['raise', 'drop', 'keep'] = 'raise')

How to flatten decoded vehicle rows into canonical feature records.

CanonicalRegistry dataclass
CanonicalRegistry(entities: tuple[CanonicalEntitySpec, ...])

Lookup table for canonical entities and vehicle-specific aliases.

lookup_frame
lookup_frame() -> pl.DataFrame

Return a canonical lookup frame for vectorized joins.

Source code in graphids/core/data/discovery/canonical.py
def lookup_frame(self) -> pl.DataFrame:
    """Return a canonical lookup frame for vectorized joins."""
    rows: list[dict[str, str]] = []
    for spec in self.entities:
        for key in (spec.canonical_id, spec.name, *spec.aliases):
            rows.append(
                {
                    "vehicle_key": "*",
                    "alias_key": _norm(key),
                    "canonical_id": spec.canonical_id,
                    "canonical_name": spec.name,
                    "kind": spec.kind,
                }
            )
        for vehicle, aliases in spec.vehicle_aliases.items():
            for alias in aliases:
                rows.append(
                    {
                        "vehicle_key": _norm(vehicle),
                        "alias_key": _norm(alias),
                        "canonical_id": spec.canonical_id,
                        "canonical_name": spec.name,
                        "kind": spec.kind,
                    }
                )
    return pl.DataFrame(rows).unique(["vehicle_key", "alias_key"], keep="last")
lookup_table
lookup_table() -> dict[str, CanonicalEntitySpec]

Return alias -> entity mapping using vehicle::alias keys.

Source code in graphids/core/data/discovery/canonical.py
def lookup_table(self) -> dict[str, CanonicalEntitySpec]:
    """Return alias -> entity mapping using ``vehicle::alias`` keys."""
    table: dict[str, CanonicalEntitySpec] = {}
    for spec in self.entities:
        keys = [spec.canonical_id, spec.name, *spec.aliases]
        for key in keys:
            self._insert(table, f"*::{_norm(key)}", spec)
        for vehicle, aliases in spec.vehicle_aliases.items():
            for alias in aliases:
                self._insert(table, f"{_norm(vehicle)}::{_norm(alias)}", spec)
    return table
resolve
resolve(alias: str, *, vehicle: str | None = None) -> CanonicalEntitySpec

Resolve an alias to a canonical entity.

Source code in graphids/core/data/discovery/canonical.py
def resolve(self, alias: str, *, vehicle: str | None = None) -> CanonicalEntitySpec:
    """Resolve an alias to a canonical entity."""
    table = self.lookup_table()
    alias_key = _norm(alias)
    if vehicle is not None:
        spec = table.get(f"{_norm(vehicle)}::{alias_key}")
        if spec is not None:
            return spec
    spec = table.get(f"*::{alias_key}")
    if spec is None:
        raise KeyError(f"unknown canonical alias: {alias!r}")
    return spec
build_canonical_feature_frame
build_canonical_feature_frame(df: DataFrame, registry: CanonicalRegistry, *, spec: CanonicalFeatureFrameSpec | None = None) -> pl.DataFrame

Normalize decoded rows into a long canonical feature table.

Source code in graphids/core/data/discovery/canonical.py
def build_canonical_feature_frame(
    df: pl.DataFrame,
    registry: CanonicalRegistry,
    *,
    spec: CanonicalFeatureFrameSpec | None = None,
) -> pl.DataFrame:
    """Normalize decoded rows into a long canonical feature table."""
    spec = spec or CanonicalFeatureFrameSpec()
    feature_cols = tuple(spec.feature_cols)
    if feature_cols:
        index_cols = [spec.time_col, *spec.keep_cols]
        if spec.vehicle_col:
            index_cols.append(spec.vehicle_col)
        index_cols = [c for i, c in enumerate(index_cols) if c in df.columns and c not in index_cols[:i]]
        frame = df.unpivot(
            on=list(feature_cols),
            index=index_cols,
            variable_name=spec.alias_col,
            value_name=spec.value_col,
        )
    else:
        frame = df.clone()
    if spec.alias_col not in frame.columns:
        raise ValueError(f"missing alias column {spec.alias_col!r}")
    if spec.value_col not in frame.columns:
        raise ValueError(f"missing value column {spec.value_col!r}")

    lookup = registry.lookup_frame()
    out = frame.with_columns(
        pl.col(spec.alias_col).cast(pl.Utf8).str.to_lowercase().alias("_alias_key"),
        pl.lit("*").alias("_vehicle_key"),
    )
    if spec.vehicle_col and spec.vehicle_col in out.columns:
        out = out.with_columns(pl.col(spec.vehicle_col).cast(pl.Utf8).str.to_lowercase().alias("_vehicle_key"))
    exact = out.join(
        lookup,
        left_on=["_vehicle_key", "_alias_key"],
        right_on=["vehicle_key", "alias_key"],
        how="left",
        suffix="_exact",
    )
    global_lookup = lookup.filter(pl.col("vehicle_key") == "*").drop("vehicle_key")
    global_join = out.join(
        global_lookup,
        left_on="_alias_key",
        right_on="alias_key",
        how="left",
        suffix="_global",
    )
    out = exact.with_columns(
        pl.coalesce(
            [pl.col("canonical_id"), global_join["canonical_id"]],
        ).alias("canonical_id"),
        pl.coalesce(
            [pl.col("canonical_name"), global_join["canonical_name"]],
        ).alias("canonical_name"),
        pl.coalesce(
            [pl.col("kind"), global_join["kind"]],
        ).alias("kind"),
    ).drop(["_alias_key", "_vehicle_key"], strict=False)

    if spec.unmapped == "raise":
        missing = out.filter(pl.col("canonical_id").is_null())
        if missing.height:
            sample = missing.select(spec.alias_col).head(5).to_series().to_list()
            raise KeyError(f"unmapped canonical aliases: {sample!r}")
    elif spec.unmapped == "drop":
        out = out.filter(pl.col("canonical_id").is_not_null())
    else:
        out = out.with_columns(
            pl.when(pl.col("canonical_id").is_null())
            .then(pl.col(spec.alias_col).cast(pl.Utf8))
            .otherwise(pl.col("canonical_id"))
            .alias("canonical_id")
        )

    spec_by_id = {e.canonical_id: e for e in registry.entities}
    out = out.with_columns(
        pl.col("canonical_id").map_elements(
            lambda cid: spec_by_id.get(str(cid)).name if cid is not None and str(cid) in spec_by_id else None,
            return_dtype=pl.Utf8,
        ).alias("canonical_name"),
        pl.col("canonical_id").map_elements(
            lambda cid: spec_by_id.get(str(cid)).kind if cid is not None and str(cid) in spec_by_id else None,
            return_dtype=pl.Utf8,
        ).alias("kind"),
    )
    return out

hypotheses

Signal profile and hypothesis-store primitives for cross-vehicle discovery.

DiscoveryStore dataclass
DiscoveryStore(root: Path, profiles_name: str = 'signal_profiles.parquet', hypotheses_name: str = 'canonical_hypotheses.parquet', manifest_name: str = 'discovery_manifest.json')

File-backed signal-profile and hypothesis tables.

rank_hypotheses
rank_hypotheses() -> pl.DataFrame

Return ranked profiles joined to the stored hypothesis table.

Source code in graphids/core/data/discovery/hypotheses.py
def rank_hypotheses(self) -> pl.DataFrame:
    """Return ranked profiles joined to the stored hypothesis table."""
    return rank_signal_hypotheses(self.load_profiles(), self.load_hypotheses())
rank_profiles
rank_profiles() -> pl.DataFrame

Return ranked signal profiles from the stored profile table.

Source code in graphids/core/data/discovery/hypotheses.py
def rank_profiles(self) -> pl.DataFrame:
    """Return ranked signal profiles from the stored profile table."""
    return rank_signal_profiles(self.load_profiles())
SignalHypothesisSpec dataclass
SignalHypothesisSpec(vehicle_id: str, raw_signal: str, candidate_canonical_id: str | None = None, confidence: float = 0.0, status: Literal['unreviewed', 'provisional', 'confirmed', 'rejected'] = 'unreviewed', evidence: tuple[str, ...] = ())

A provisional cross-vehicle mapping for a raw signal.

SignalProfileSpec dataclass
SignalProfileSpec(vehicle_col: str = 'vehicle_id', signal_col: str = 'arb_id', time_col: str = 'timestamp', entropy_col: str = 'entropy', attack_col: str = 'attack', byte_prefix: str = 'byte_', include_attack: bool = True)

How to aggregate raw CAN rows into per-signal profiles.

build_signal_profiles
build_signal_profiles(df: DataFrame, spec: SignalProfileSpec | None = None) -> pl.DataFrame

Aggregate raw CAN rows into one profile per vehicle/signal pair.

Source code in graphids/core/data/discovery/hypotheses.py
def build_signal_profiles(df: pl.DataFrame, spec: SignalProfileSpec | None = None) -> pl.DataFrame:
    """Aggregate raw CAN rows into one profile per vehicle/signal pair."""
    spec = spec or SignalProfileSpec()
    missing = [c for c in (spec.vehicle_col, spec.signal_col) if c not in df.columns]
    if missing:
        raise ValueError(f"build_signal_profiles missing columns: {missing}")

    sort_cols = [c for c in (spec.vehicle_col, spec.signal_col, spec.time_col) if c in df.columns]
    if sort_cols:
        df = df.sort(sort_cols)

    byte_cols = _byte_cols(df, prefix=spec.byte_prefix)
    group_cols = [spec.vehicle_col, spec.signal_col]
    aggs: list[pl.Expr] = [pl.len().cast(pl.Int64).alias("msg_count")]
    if spec.time_col in df.columns:
        aggs.extend(
            [
                pl.col(spec.time_col).min().cast(pl.Float64).alias("timestamp_min"),
                pl.col(spec.time_col).max().cast(pl.Float64).alias("timestamp_max"),
                (pl.col(spec.time_col).max() - pl.col(spec.time_col).min())
                .cast(pl.Float64)
                .alias("duration"),
                pl.col(spec.time_col).diff().mean().cast(pl.Float64).alias("iat_mean"),
                pl.col(spec.time_col).diff().std().fill_nan(0).cast(pl.Float64).alias("iat_std"),
            ]
        )
    if spec.entropy_col in df.columns:
        aggs.extend(
            [
                pl.col(spec.entropy_col).mean().cast(pl.Float64).alias("entropy_mean"),
                pl.col(spec.entropy_col).std().fill_nan(0).cast(pl.Float64).alias("entropy_std"),
            ]
        )
    if byte_cols:
        aggs.extend(
            [
                *[pl.col(c).mean().cast(pl.Float64).alias(f"{c}_mean") for c in byte_cols],
                *[pl.col(c).std().fill_nan(0).cast(pl.Float64).alias(f"{c}_std") for c in byte_cols],
                *[pl.col(c).min().cast(pl.Float64).alias(f"{c}_min") for c in byte_cols],
                *[pl.col(c).max().cast(pl.Float64).alias(f"{c}_max") for c in byte_cols],
                *[(pl.col(c).max() - pl.col(c).min()).cast(pl.Float64).alias(f"{c}_range") for c in byte_cols],
                pl.mean_horizontal(
                    *[(pl.col(c).diff().abs().drop_nulls() > 0).mean().fill_null(0) for c in byte_cols]
                ).cast(pl.Float64).alias("change_rate"),
                pl.mean_horizontal(
                    *[pl.col(c).skew().fill_nan(0).fill_null(0).clip(-10, 10) for c in byte_cols]
                ).cast(pl.Float64).alias("skewness"),
                pl.mean_horizontal(
                    *[pl.col(c).kurtosis().fill_nan(0).fill_null(0).clip(-10, 10) for c in byte_cols]
                ).cast(pl.Float64).alias("kurtosis"),
            ]
        )
    if spec.include_attack and spec.attack_col in df.columns:
        aggs.extend(
            [
                pl.col(spec.attack_col).max().cast(pl.Int64).alias("attack_max"),
                pl.col(spec.attack_col).mean().cast(pl.Float64).alias("attack_rate"),
            ]
        )
    out = df.group_by(group_cols).agg(*aggs)
    out = out.with_columns(
        pl.concat_str(
            [pl.col(spec.vehicle_col).cast(pl.Utf8), pl.col(spec.signal_col).cast(pl.Utf8)],
            separator="::",
        ).alias("signal_key")
    )
    return out
initialize_hypotheses
initialize_hypotheses(profiles: DataFrame) -> pl.DataFrame

Create an empty hypothesis table aligned to a profile table.

Source code in graphids/core/data/discovery/hypotheses.py
def initialize_hypotheses(profiles: pl.DataFrame) -> pl.DataFrame:
    """Create an empty hypothesis table aligned to a profile table."""
    required = ["vehicle_id", "arb_id", "signal_key"]
    missing = [c for c in required if c not in profiles.columns]
    if missing:
        raise ValueError(f"initialize_hypotheses missing columns: {missing}")
    return profiles.select("vehicle_id", "arb_id", "signal_key").with_columns(
        pl.lit(None, dtype=pl.Utf8).alias("candidate_canonical_id"),
        pl.lit(0.0).cast(pl.Float64).alias("confidence"),
        pl.lit("unreviewed").alias("status"),
        pl.lit("").cast(pl.Utf8).alias("evidence"),
    )

layout

Storage-layout primitives for raw events, views, and hypotheses.

DataLayerLayout dataclass
DataLayerLayout(root: Path)

A single place to point at the three persistent data layers.

HypothesisRecordSpec dataclass
HypothesisRecordSpec(vehicle_id: str, raw_signal: str, candidate_canonical_id: str | None = None, confidence: float = 0.0, status: Literal['unreviewed', 'provisional', 'confirmed', 'rejected'] = 'unreviewed', evidence: tuple[str, ...] = (), profile_path: Path | None = None, feature_digest: str | None = None)

Provisional semantic mapping for a raw signal.

MaterializedViewSpec dataclass
MaterializedViewSpec(root: Path, name: str = 'materialized_views', view_kind: ViewKind = 'snapshot', partition_cols: tuple[str, ...] = ('vehicle_id', 'view_kind', 'split'), format: Literal['parquet', 'sqlite', 'duckdb'] = 'parquet', key_cols: tuple[str, ...] = ('vehicle_id', 'canonical_id', 'timestamp'), feature_cols: tuple[str, ...] = (), label_cols: tuple[str, ...] = ('attack', 'attack_type'))

Training-facing view materialization contract.

RawEventTableSpec dataclass
RawEventTableSpec(root: Path, name: str = 'raw_can_events', partition_cols: tuple[str, ...] = ('vehicle_id', 'day'), format: Literal['parquet', 'sqlite', 'duckdb'] = 'parquet', primary_time_col: str = 'timestamp', raw_id_col: str = 'arb_id', vehicle_col: str = 'vehicle_id', attack_col: str = 'attack', signal_hint_col: str = 'signal_hint')

Canonical storage for immutable decoded CAN rows.

ranking

Ranking helpers for cross-vehicle signal discovery.

rank_signal_hypotheses
rank_signal_hypotheses(profiles: DataFrame, hypotheses: DataFrame | None = None) -> pl.DataFrame

Join profile scores to provisional hypotheses when available.

Source code in graphids/core/data/discovery/ranking.py
def rank_signal_hypotheses(
    profiles: pl.DataFrame,
    hypotheses: pl.DataFrame | None = None,
) -> pl.DataFrame:
    """Join profile scores to provisional hypotheses when available."""
    ranked = rank_signal_profiles(profiles)
    if hypotheses is None or hypotheses.is_empty():
        return ranked

    join_col = _pick_group_col(profiles)
    hyp_cols = [c for c in ("vehicle_id", "raw_signal", "candidate_canonical_id", "status", "confidence") if c in hypotheses.columns]
    if join_col in hypotheses.columns:
        cols = [join_col, *[c for c in hyp_cols if c != join_col]]
        return ranked.join(hypotheses.select(*cols), on=join_col, how="left")
    if "raw_signal" in hypotheses.columns and join_col != "raw_signal":
        cols = ["raw_signal", *[c for c in hyp_cols if c != "raw_signal"]]
        return ranked.join(hypotheses.select(*cols), left_on=join_col, right_on="raw_signal", how="left")
    return ranked
rank_signal_profiles
rank_signal_profiles(profiles: DataFrame) -> pl.DataFrame

Score per-signal profile rows by cross-vehicle support and stability.

The goal is not to claim a final ontology match; it is to give the discovery layer a concrete relational ranking pass that can surface stable signals for review.

Source code in graphids/core/data/discovery/ranking.py
def rank_signal_profiles(profiles: pl.DataFrame) -> pl.DataFrame:
    """Score per-signal profile rows by cross-vehicle support and stability.

    The goal is not to claim a final ontology match; it is to give the
    discovery layer a concrete relational ranking pass that can surface
    stable signals for review.
    """
    group_col = _pick_group_col(profiles)
    required = {group_col, "vehicle_id"}
    missing = sorted(required.difference(profiles.columns))
    if missing:
        raise ValueError(f"rank_signal_profiles missing columns: {missing}")

    optional_defaults = {
        "msg_count": 0.0,
        "entropy_mean": 0.0,
        "entropy_std": 0.0,
        "change_rate": 0.0,
        "attack_rate": 0.0,
        "skewness": 0.0,
        "kurtosis": 0.0,
    }
    for col, default in optional_defaults.items():
        if col not in profiles.columns:
            profiles = profiles.with_columns(pl.lit(default).cast(pl.Float64).alias(col))

    support = (
        profiles.group_by(group_col)
        .agg(
            pl.n_unique("vehicle_id").alias("vehicle_support"),
            pl.len().alias("profile_rows"),
            pl.col("msg_count").mean().fill_null(0).cast(pl.Float64).alias("msg_count_mean"),
            pl.col("msg_count").std().fill_null(0).cast(pl.Float64).alias("msg_count_std"),
            pl.col("entropy_mean").mean().fill_null(0).cast(pl.Float64).alias("entropy_mean"),
            pl.col("entropy_std").mean().fill_null(0).cast(pl.Float64).alias("entropy_std"),
            pl.col("change_rate").mean().fill_null(0).cast(pl.Float64).alias("change_rate"),
            pl.col("attack_rate").mean().fill_null(0).cast(pl.Float64).alias("attack_rate"),
            pl.col("skewness").mean().fill_null(0).cast(pl.Float64).alias("skewness"),
            pl.col("kurtosis").mean().fill_null(0).cast(pl.Float64).alias("kurtosis"),
        )
        .with_columns(
            (
                pl.col("vehicle_support").cast(pl.Float64)
                * (1.0 + (pl.col("msg_count_mean") + 1.0).log())
                / (1.0 + pl.col("msg_count_std"))
                / (1.0 + pl.col("entropy_std"))
                / (1.0 + pl.col("change_rate").abs())
                / (1.0 + pl.col("attack_rate").clip(0.0, 1.0))
            ).alias("ranking_score")
        )
        .sort(["ranking_score", "vehicle_support", "profile_rows"], descending=True)
    )
    return support

extract

Extract and cache fusion features as a TensorDict.

Each upstream model implements extract_features(batch, device) -> dict[str, Tensor] returning per-graph named feature tensors. This module collects those dicts under the model name (vgae, gat, ...), stacks across batches, and saves the resulting nested TensorDict to disk. No flat state vector, no offsets — the fusion side reads keys directly.

Invoked by the experiment extraction pipeline. Idempotent on output_dir.

extract_states

extract_states(*, checkpoints: dict[str, str], dataset: str, output_dir: str, max_samples: int = 150000, max_val_samples: int = 30000, batch_size: int = 256, seed: int = 42, val_fraction: float = 0.2, representation_cfg: GraphRepresentationCfg) -> None

Load model checkpoints, extract and cache fusion features.

Idempotent per-file: each split's cache is checked independently so re-running after adding test splits only extracts the missing files.

Source code in graphids/core/data/extract.py
def extract_states(
    *,
    checkpoints: dict[str, str],
    dataset: str,
    output_dir: str,
    max_samples: int = 150_000,
    max_val_samples: int = 30_000,
    batch_size: int = 256,
    seed: int = 42,
    val_fraction: float = 0.2,
    representation_cfg: GraphRepresentationCfg,
) -> None:
    """Load model checkpoints, extract and cache fusion features.

    Idempotent per-file: each split's cache is checked independently so
    re-running after adding test splits only extracts the missing files.
    """
    from graphids.core.data.datamodule.graph import GraphDataModule
    from graphids.core.data.datasets.can_bus import CANBusSource
    from graphids.core.models.base import safe_load_checkpoint

    # Build DM first so test split names are known before the idempotency check.
    source = CANBusSource(
        name=dataset,
        seed=seed,
        val_fraction=val_fraction,
        representation_cfg=representation_cfg,
    )
    dm = GraphDataModule(dataset=source, dynamic_batching=False)
    dm.setup(None)

    out = Path(output_dir)
    train_path = out / TRAIN_FILENAME
    val_path = out / VAL_FILENAME
    test_paths = {name: out / f"{name}_states.pt" for name in dm.test_datasets.keys()}

    def _version_ok(p: Path) -> bool:
        if not p.exists():
            return False
        try:
            return (
                torch.load(p, map_location="cpu", weights_only=False).get("version")
                == CACHE_VERSION
            )
        except Exception:
            return False

    if all(_version_ok(p) for p in [train_path, val_path, *test_paths.values()]):
        log.info("cache_hit", output_dir=str(out), version=CACHE_VERSION)
        return

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    models = {}
    for model_type, ckpt_path in checkpoints.items():
        log.info("loading_model", model_type=model_type, ckpt=ckpt_path)
        module = safe_load_checkpoint(model_type, Path(ckpt_path), map_location=device)
        module.to(device).eval()
        models[model_type] = module

    train_ds, val_ds = dm.train_dataset, dm.val_dataset

    # Stash schema's attack code → name map so the fusion test path can emit
    # ``auroc_per_attack/{name}`` keys (looked up in FusionDataModule).
    schema = getattr(type(train_ds), "SCHEMA", None)
    names_map = getattr(schema, "attack_type_names", None) if schema is not None else None
    blob_extras = {"version": CACHE_VERSION, "attack_type_names": dict(names_map or {0: "benign"})}

    out.mkdir(parents=True, exist_ok=True)

    if not _version_ok(train_path):
        log.info("extracting_train", n_graphs=len(train_ds), max_samples=max_samples)
        train_td = _extract_states(models, list(train_ds), device, max_samples, batch_size).cpu()
        torch.save({"td": train_td.to_dict(), **blob_extras}, train_path)

    if not _version_ok(val_path):
        log.info("extracting_val", n_graphs=len(val_ds), max_samples=max_val_samples)
        val_td = _extract_states(models, list(val_ds), device, max_val_samples, batch_size).cpu()
        torch.save({"td": val_td.to_dict(), **blob_extras}, val_path)

    for name, test_ds in dm.test_datasets.items():
        p = test_paths[name]
        if not _version_ok(p):
            n = len(test_ds)
            log.info("extracting_test", split=name, n_graphs=n)
            test_td = _extract_states(models, list(test_ds), device, n, batch_size).cpu()
            torch.save({"td": test_td.to_dict(), **blob_extras}, p)

    log.info("states_saved", output_dir=str(out), version=CACHE_VERSION)

preprocessing

Preprocessing primitives for cache build.

CanonicalEntitySpec dataclass

CanonicalEntitySpec(canonical_id: str, name: str, aliases: tuple[str, ...] = (), vehicle_aliases: dict[str, tuple[str, ...]] = dict(), kind: Literal['signal', 'message', 'state', 'entity'] = 'signal', description: str | None = None)

One shared semantic entity across vehicles.

CanonicalFeatureFrameSpec dataclass

CanonicalFeatureFrameSpec(time_col: str = 'timestamp', vehicle_col: str | None = 'vehicle_id', alias_col: str = 'signal', value_col: str = 'value', keep_cols: tuple[str, ...] = (), feature_cols: tuple[str, ...] = (), unmapped: Literal['raise', 'drop', 'keep'] = 'raise')

How to flatten decoded vehicle rows into canonical feature records.

CanonicalRegistry dataclass

CanonicalRegistry(entities: tuple[CanonicalEntitySpec, ...])

Lookup table for canonical entities and vehicle-specific aliases.

lookup_frame
lookup_frame() -> pl.DataFrame

Return a canonical lookup frame for vectorized joins.

Source code in graphids/core/data/discovery/canonical.py
def lookup_frame(self) -> pl.DataFrame:
    """Return a canonical lookup frame for vectorized joins."""
    rows: list[dict[str, str]] = []
    for spec in self.entities:
        for key in (spec.canonical_id, spec.name, *spec.aliases):
            rows.append(
                {
                    "vehicle_key": "*",
                    "alias_key": _norm(key),
                    "canonical_id": spec.canonical_id,
                    "canonical_name": spec.name,
                    "kind": spec.kind,
                }
            )
        for vehicle, aliases in spec.vehicle_aliases.items():
            for alias in aliases:
                rows.append(
                    {
                        "vehicle_key": _norm(vehicle),
                        "alias_key": _norm(alias),
                        "canonical_id": spec.canonical_id,
                        "canonical_name": spec.name,
                        "kind": spec.kind,
                    }
                )
    return pl.DataFrame(rows).unique(["vehicle_key", "alias_key"], keep="last")
lookup_table
lookup_table() -> dict[str, CanonicalEntitySpec]

Return alias -> entity mapping using vehicle::alias keys.

Source code in graphids/core/data/discovery/canonical.py
def lookup_table(self) -> dict[str, CanonicalEntitySpec]:
    """Return alias -> entity mapping using ``vehicle::alias`` keys."""
    table: dict[str, CanonicalEntitySpec] = {}
    for spec in self.entities:
        keys = [spec.canonical_id, spec.name, *spec.aliases]
        for key in keys:
            self._insert(table, f"*::{_norm(key)}", spec)
        for vehicle, aliases in spec.vehicle_aliases.items():
            for alias in aliases:
                self._insert(table, f"{_norm(vehicle)}::{_norm(alias)}", spec)
    return table
resolve
resolve(alias: str, *, vehicle: str | None = None) -> CanonicalEntitySpec

Resolve an alias to a canonical entity.

Source code in graphids/core/data/discovery/canonical.py
def resolve(self, alias: str, *, vehicle: str | None = None) -> CanonicalEntitySpec:
    """Resolve an alias to a canonical entity."""
    table = self.lookup_table()
    alias_key = _norm(alias)
    if vehicle is not None:
        spec = table.get(f"{_norm(vehicle)}::{alias_key}")
        if spec is not None:
            return spec
    spec = table.get(f"*::{alias_key}")
    if spec is None:
        raise KeyError(f"unknown canonical alias: {alias!r}")
    return spec

DiscoveryStore dataclass

DiscoveryStore(root: Path, profiles_name: str = 'signal_profiles.parquet', hypotheses_name: str = 'canonical_hypotheses.parquet', manifest_name: str = 'discovery_manifest.json')

File-backed signal-profile and hypothesis tables.

rank_hypotheses
rank_hypotheses() -> pl.DataFrame

Return ranked profiles joined to the stored hypothesis table.

Source code in graphids/core/data/discovery/hypotheses.py
def rank_hypotheses(self) -> pl.DataFrame:
    """Return ranked profiles joined to the stored hypothesis table."""
    return rank_signal_hypotheses(self.load_profiles(), self.load_hypotheses())
rank_profiles
rank_profiles() -> pl.DataFrame

Return ranked signal profiles from the stored profile table.

Source code in graphids/core/data/discovery/hypotheses.py
def rank_profiles(self) -> pl.DataFrame:
    """Return ranked signal profiles from the stored profile table."""
    return rank_signal_profiles(self.load_profiles())

EdgePolicy dataclass

EdgePolicy(name: str, src_col: str = 'node_id', dst_col: str = 'node_id', dst_shift: int = -1, src_alias: str = 'src', dst_alias: str = 'dst')

How to derive directed edges from windowed rows.

EntityRepresentationCfg dataclass

EntityRepresentationCfg(kind: Literal['entity'] = 'entity', anchor_column: str = 'node_id', anchor_value: str | int | None = None, history_window_size: int = 100, future_window_size: int = 0)

Entity-centric representation centered on one signal or message family.

EntitySegmentCfg dataclass

EntitySegmentCfg(anchor_column: str = 'node_id', anchor_value: str | int | None = None, history_window_size: int = 100, future_window_size: int = 0)

A segment centered on one arbitration ID or message family.

EntityViewCfg dataclass

EntityViewCfg(anchor_column: str = 'node_id', anchor_value: str | int | None = None, history_window_size: int = 100, future_window_size: int = 0)

Bases: _ViewCfg

Entity-centric view centered on one signal or message family.

EventChunkViewCfg dataclass

EventChunkViewCfg(message_count: int | None = 200, duration_ms: float | None = None, overlap: float = 0.0)

Bases: _ViewCfg

Chunk raw events by message count or duration.

GraphRepresentationPlan dataclass

GraphRepresentationPlan(kind: Literal['snapshot', 'snapshot_sequence', 'multi_scale', 'temporal', 'entity'], cfg: GraphRepresentationCfg)

Resolved representation kind and config payload.

GraphSegmentPlan dataclass

GraphSegmentPlan(kind: Literal['window', 'sequence', 'multi_scale', 'entity'], cfg: SegmentCfg)

Resolved sample-shape plan for a dataset view.

GraphTransform dataclass

GraphTransform(name: str, requires: tuple[str, ...], produces: tuple[str, ...], fn: Callable[[DataFrame, DataFrame], tuple[DataFrame, DataFrame]])

A declarative graph transform with explicit input/output columns.

MultiScaleRepresentationCfg dataclass

MultiScaleRepresentationCfg(kind: Literal['multi_scale'] = 'multi_scale', window_sizes: tuple[int, ...] = (50, 100, 200), stride: int = 100)

Parallel snapshots at multiple window sizes.

MultiScaleSegmentCfg dataclass

MultiScaleSegmentCfg(window_sizes: tuple[int, ...], stride: int)

Parallel windows at multiple temporal scales.

MultiScaleViewCfg dataclass

MultiScaleViewCfg(window_sizes: tuple[int, ...] = (50, 100, 200), stride: int = 100)

Bases: _ViewCfg

Parallel snapshot views at multiple window sizes.

RollingStreamViewCfg dataclass

RollingStreamViewCfg(history_messages: int = 500, prediction_horizon: int = 1, update_mode: Literal['append', 'replace'] = 'append')

Bases: _ViewCfg

Online/streaming view with bounded history.

Segmenter

Bases: Protocol

Primitive that turns raw rows into a shaped sample view.

SequenceSegmentCfg dataclass

SequenceSegmentCfg(window_size: int, stride: int, sequence_length: int = 4, sequence_stride: int = 1)

A sequence of ordered windows from the same raw stream.

SignalHypothesisSpec dataclass

SignalHypothesisSpec(vehicle_id: str, raw_signal: str, candidate_canonical_id: str | None = None, confidence: float = 0.0, status: Literal['unreviewed', 'provisional', 'confirmed', 'rejected'] = 'unreviewed', evidence: tuple[str, ...] = ())

A provisional cross-vehicle mapping for a raw signal.

SignalProfileSpec dataclass

SignalProfileSpec(vehicle_col: str = 'vehicle_id', signal_col: str = 'arb_id', time_col: str = 'timestamp', entropy_col: str = 'entropy', attack_col: str = 'attack', byte_prefix: str = 'byte_', include_attack: bool = True)

How to aggregate raw CAN rows into per-signal profiles.

SnapshotRepresentationCfg dataclass

SnapshotRepresentationCfg(kind: Literal['snapshot'] = 'snapshot', window_size: int = 100, stride: int = 100)

One graph per sliding window.

SnapshotSequenceRepresentationCfg dataclass

SnapshotSequenceRepresentationCfg(kind: Literal['snapshot_sequence'] = 'snapshot_sequence', window_size: int = 100, stride: int = 100, sequence_length: int = 4, sequence_stride: int = 1)

Ordered sequence of snapshot graphs.

SnapshotSequenceViewCfg dataclass

SnapshotSequenceViewCfg(window_size: int = 100, stride: int = 100, sequence_length: int = 4, sequence_stride: int = 1)

Bases: _ViewCfg

An ordered sequence of snapshot graphs.

SnapshotViewCfg dataclass

SnapshotViewCfg(window_size: int = 100, stride: int = 100)

Bases: _ViewCfg

One fixed graph per sliding window.

TemporalGraphSpec dataclass

TemporalGraphSpec(edge_policy: EdgePolicy = temporal_edge_policy(), time_col: str = 'timestamp', feature_cols: tuple[str, ...] = (), target_col: str = 'attack', aux_label_cols: tuple[str, ...] = ('attack_type',), binary_target: bool = True)

How to turn ordered rows into a PyG TemporalData object.

TemporalRepresentationCfg dataclass

TemporalRepresentationCfg(kind: Literal['temporal'] = 'temporal', time_col: str = 'timestamp', binary_target: bool = True, history_messages: int | None = None)

Event stream representation built as PyG TemporalData.

WindowSegmentCfg dataclass

WindowSegmentCfg(window_size: int, stride: int)

One fixed window of rows.

WindowSegmenter dataclass

WindowSegmenter(window_size: int, stride: int)

Default snapshot segmenter: one fixed sliding window per graph.

WindowedRows dataclass

WindowedRows(rows: DataFrame, n_rows: int, n_windows: int, max_wid: int)

Rows plus derived window metadata for snapshot-style segments.

build_canonical_feature_frame

build_canonical_feature_frame(df: DataFrame, registry: CanonicalRegistry, *, spec: CanonicalFeatureFrameSpec | None = None) -> pl.DataFrame

Normalize decoded rows into a long canonical feature table.

Source code in graphids/core/data/discovery/canonical.py
def build_canonical_feature_frame(
    df: pl.DataFrame,
    registry: CanonicalRegistry,
    *,
    spec: CanonicalFeatureFrameSpec | None = None,
) -> pl.DataFrame:
    """Normalize decoded rows into a long canonical feature table."""
    spec = spec or CanonicalFeatureFrameSpec()
    feature_cols = tuple(spec.feature_cols)
    if feature_cols:
        index_cols = [spec.time_col, *spec.keep_cols]
        if spec.vehicle_col:
            index_cols.append(spec.vehicle_col)
        index_cols = [c for i, c in enumerate(index_cols) if c in df.columns and c not in index_cols[:i]]
        frame = df.unpivot(
            on=list(feature_cols),
            index=index_cols,
            variable_name=spec.alias_col,
            value_name=spec.value_col,
        )
    else:
        frame = df.clone()
    if spec.alias_col not in frame.columns:
        raise ValueError(f"missing alias column {spec.alias_col!r}")
    if spec.value_col not in frame.columns:
        raise ValueError(f"missing value column {spec.value_col!r}")

    lookup = registry.lookup_frame()
    out = frame.with_columns(
        pl.col(spec.alias_col).cast(pl.Utf8).str.to_lowercase().alias("_alias_key"),
        pl.lit("*").alias("_vehicle_key"),
    )
    if spec.vehicle_col and spec.vehicle_col in out.columns:
        out = out.with_columns(pl.col(spec.vehicle_col).cast(pl.Utf8).str.to_lowercase().alias("_vehicle_key"))
    exact = out.join(
        lookup,
        left_on=["_vehicle_key", "_alias_key"],
        right_on=["vehicle_key", "alias_key"],
        how="left",
        suffix="_exact",
    )
    global_lookup = lookup.filter(pl.col("vehicle_key") == "*").drop("vehicle_key")
    global_join = out.join(
        global_lookup,
        left_on="_alias_key",
        right_on="alias_key",
        how="left",
        suffix="_global",
    )
    out = exact.with_columns(
        pl.coalesce(
            [pl.col("canonical_id"), global_join["canonical_id"]],
        ).alias("canonical_id"),
        pl.coalesce(
            [pl.col("canonical_name"), global_join["canonical_name"]],
        ).alias("canonical_name"),
        pl.coalesce(
            [pl.col("kind"), global_join["kind"]],
        ).alias("kind"),
    ).drop(["_alias_key", "_vehicle_key"], strict=False)

    if spec.unmapped == "raise":
        missing = out.filter(pl.col("canonical_id").is_null())
        if missing.height:
            sample = missing.select(spec.alias_col).head(5).to_series().to_list()
            raise KeyError(f"unmapped canonical aliases: {sample!r}")
    elif spec.unmapped == "drop":
        out = out.filter(pl.col("canonical_id").is_not_null())
    else:
        out = out.with_columns(
            pl.when(pl.col("canonical_id").is_null())
            .then(pl.col(spec.alias_col).cast(pl.Utf8))
            .otherwise(pl.col("canonical_id"))
            .alias("canonical_id")
        )

    spec_by_id = {e.canonical_id: e for e in registry.entities}
    out = out.with_columns(
        pl.col("canonical_id").map_elements(
            lambda cid: spec_by_id.get(str(cid)).name if cid is not None and str(cid) in spec_by_id else None,
            return_dtype=pl.Utf8,
        ).alias("canonical_name"),
        pl.col("canonical_id").map_elements(
            lambda cid: spec_by_id.get(str(cid)).kind if cid is not None and str(cid) in spec_by_id else None,
            return_dtype=pl.Utf8,
        ).alias("kind"),
    )
    return out

build_graph_tables

build_graph_tables(df: DataFrame, *, node_stat_exprs: list[Expr], label_exprs: list[Expr], edge_policy: EdgePolicy | None = None, edge_stat_exprs: list[Expr], edge_base_cols: list[str], graph_transforms: list[GraphTransform] | None = None, debug_artifacts_dir: str | Path | None = None, segment_cfg: WindowSegmentCfg | SequenceSegmentCfg | MultiScaleSegmentCfg | EntitySegmentCfg | None = None) -> GraphTables

Compose the graph preprocessing primitives into staged graph tables.

Source code in graphids/core/data/preprocessing/materialization.py
def build_graph_tables(
    df: pl.DataFrame,
    *,
    node_stat_exprs: list[pl.Expr],
    label_exprs: list[pl.Expr],
    edge_policy: EdgePolicy | None = None,
    edge_stat_exprs: list[pl.Expr],
    edge_base_cols: list[str],
    graph_transforms: list[GraphTransform] | None = None,
    debug_artifacts_dir: str | Path | None = None,
    segment_cfg: WindowSegmentCfg | SequenceSegmentCfg | MultiScaleSegmentCfg | EntitySegmentCfg | None = None,
) -> GraphTables:
    """Compose the graph preprocessing primitives into staged graph tables."""
    if segment_cfg is None:
        raise ValueError("build_graph_tables requires an explicit segment_cfg")
    if isinstance(segment_cfg, MultiScaleSegmentCfg):
        out: list[GraphTables] = []
        offset = 0
        for scale_id, scale_window_size in enumerate(segment_cfg.window_sizes):
            tables = _build_graph_tables_windowed(
                df,
                window_size=scale_window_size,
                stride=segment_cfg.stride,
                node_stat_exprs=node_stat_exprs,
                label_exprs=label_exprs,
                edge_policy=edge_policy,
                edge_stat_exprs=edge_stat_exprs,
                edge_base_cols=edge_base_cols,
                graph_transforms=graph_transforms,
                debug_artifacts_dir=debug_artifacts_dir,
                tags={"scale_id": scale_id, "scale_window_size": scale_window_size},
            )
            if tables.node_stats.is_empty():
                continue
            out.append(GraphTables(
                node_stats=tables.node_stats.with_columns((pl.col("_wid") + offset).alias("_wid")),
                edge_df=tables.edge_df.with_columns((pl.col("_wid") + offset).alias("_wid")),
                labels=tables.labels.with_columns((pl.col("_wid") + offset).alias("_wid")),
                n_rows=tables.n_rows,
            ))
            offset += int(tables.node_stats.select(pl.col("_wid").max()).item()) + 1
        if not out:
            return GraphTables(pl.DataFrame(), pl.DataFrame(), pl.DataFrame(), len(df))
        return GraphTables(
            node_stats=pl.concat([t.node_stats for t in out], how="vertical"),
            edge_df=pl.concat([t.edge_df for t in out], how="vertical"),
            labels=pl.concat([t.labels for t in out], how="vertical"),
            n_rows=len(df),
        )

    if isinstance(segment_cfg, SequenceSegmentCfg):
        return _build_graph_tables_sequence(
            df,
            cfg=segment_cfg,
            node_stat_exprs=node_stat_exprs,
            label_exprs=label_exprs,
            edge_policy=edge_policy,
            edge_stat_exprs=edge_stat_exprs,
            edge_base_cols=edge_base_cols,
            graph_transforms=graph_transforms,
            debug_artifacts_dir=debug_artifacts_dir,
        )

    if isinstance(segment_cfg, EntitySegmentCfg):
        return _build_graph_tables_windowed(
            df,
            window_size=segment_cfg.history_window_size + segment_cfg.future_window_size + 1,
            stride=max(1, segment_cfg.future_window_size or 1),
            node_stat_exprs=node_stat_exprs,
            label_exprs=label_exprs,
            edge_policy=edge_policy,
            edge_stat_exprs=edge_stat_exprs,
            edge_base_cols=edge_base_cols,
            graph_transforms=graph_transforms,
            debug_artifacts_dir=debug_artifacts_dir,
            tags={
                "anchor_column": segment_cfg.anchor_column,
                "anchor_value": segment_cfg.anchor_value if segment_cfg.anchor_value is not None else "",
            },
        )

    if isinstance(segment_cfg, WindowSegmentCfg):
        return _build_graph_tables_windowed(
            df,
            window_size=segment_cfg.window_size,
            stride=segment_cfg.stride,
            node_stat_exprs=node_stat_exprs,
            label_exprs=label_exprs,
            edge_policy=edge_policy,
            edge_stat_exprs=edge_stat_exprs,
            edge_base_cols=edge_base_cols,
            graph_transforms=graph_transforms,
            debug_artifacts_dir=debug_artifacts_dir,
        )
    raise TypeError(f"unsupported segment config: {type(segment_cfg)!r}")

build_signal_profiles

build_signal_profiles(df: DataFrame, spec: SignalProfileSpec | None = None) -> pl.DataFrame

Aggregate raw CAN rows into one profile per vehicle/signal pair.

Source code in graphids/core/data/discovery/hypotheses.py
def build_signal_profiles(df: pl.DataFrame, spec: SignalProfileSpec | None = None) -> pl.DataFrame:
    """Aggregate raw CAN rows into one profile per vehicle/signal pair."""
    spec = spec or SignalProfileSpec()
    missing = [c for c in (spec.vehicle_col, spec.signal_col) if c not in df.columns]
    if missing:
        raise ValueError(f"build_signal_profiles missing columns: {missing}")

    sort_cols = [c for c in (spec.vehicle_col, spec.signal_col, spec.time_col) if c in df.columns]
    if sort_cols:
        df = df.sort(sort_cols)

    byte_cols = _byte_cols(df, prefix=spec.byte_prefix)
    group_cols = [spec.vehicle_col, spec.signal_col]
    aggs: list[pl.Expr] = [pl.len().cast(pl.Int64).alias("msg_count")]
    if spec.time_col in df.columns:
        aggs.extend(
            [
                pl.col(spec.time_col).min().cast(pl.Float64).alias("timestamp_min"),
                pl.col(spec.time_col).max().cast(pl.Float64).alias("timestamp_max"),
                (pl.col(spec.time_col).max() - pl.col(spec.time_col).min())
                .cast(pl.Float64)
                .alias("duration"),
                pl.col(spec.time_col).diff().mean().cast(pl.Float64).alias("iat_mean"),
                pl.col(spec.time_col).diff().std().fill_nan(0).cast(pl.Float64).alias("iat_std"),
            ]
        )
    if spec.entropy_col in df.columns:
        aggs.extend(
            [
                pl.col(spec.entropy_col).mean().cast(pl.Float64).alias("entropy_mean"),
                pl.col(spec.entropy_col).std().fill_nan(0).cast(pl.Float64).alias("entropy_std"),
            ]
        )
    if byte_cols:
        aggs.extend(
            [
                *[pl.col(c).mean().cast(pl.Float64).alias(f"{c}_mean") for c in byte_cols],
                *[pl.col(c).std().fill_nan(0).cast(pl.Float64).alias(f"{c}_std") for c in byte_cols],
                *[pl.col(c).min().cast(pl.Float64).alias(f"{c}_min") for c in byte_cols],
                *[pl.col(c).max().cast(pl.Float64).alias(f"{c}_max") for c in byte_cols],
                *[(pl.col(c).max() - pl.col(c).min()).cast(pl.Float64).alias(f"{c}_range") for c in byte_cols],
                pl.mean_horizontal(
                    *[(pl.col(c).diff().abs().drop_nulls() > 0).mean().fill_null(0) for c in byte_cols]
                ).cast(pl.Float64).alias("change_rate"),
                pl.mean_horizontal(
                    *[pl.col(c).skew().fill_nan(0).fill_null(0).clip(-10, 10) for c in byte_cols]
                ).cast(pl.Float64).alias("skewness"),
                pl.mean_horizontal(
                    *[pl.col(c).kurtosis().fill_nan(0).fill_null(0).clip(-10, 10) for c in byte_cols]
                ).cast(pl.Float64).alias("kurtosis"),
            ]
        )
    if spec.include_attack and spec.attack_col in df.columns:
        aggs.extend(
            [
                pl.col(spec.attack_col).max().cast(pl.Int64).alias("attack_max"),
                pl.col(spec.attack_col).mean().cast(pl.Float64).alias("attack_rate"),
            ]
        )
    out = df.group_by(group_cols).agg(*aggs)
    out = out.with_columns(
        pl.concat_str(
            [pl.col(spec.vehicle_col).cast(pl.Utf8), pl.col(spec.signal_col).cast(pl.Utf8)],
            separator="::",
        ).alias("signal_key")
    )
    return out

build_temporal_data

build_temporal_data(df: DataFrame, spec: TemporalGraphSpec) -> Any

Build a PyG TemporalData stream from ordered rows.

Source code in graphids/core/data/preprocessing/temporal.py
def build_temporal_data(df: pl.DataFrame, spec: TemporalGraphSpec) -> Any:
    """Build a PyG ``TemporalData`` stream from ordered rows."""
    from torch_geometric.data import TemporalData

    rows = df.sort(spec.time_col).with_columns(
        pl.col(spec.edge_policy.src_col).alias("src"),
        pl.col(spec.edge_policy.dst_col)
        .shift(-spec.edge_policy.dst_shift)
        .alias("dst"),
    )
    rows = rows.filter(pl.col("dst").is_not_null())

    src = _torchize(rows, ["src"], dtype=pl.Int64)
    dst = _torchize(rows, ["dst"], dtype=pl.Int64)
    t = rows.select(spec.time_col).fill_null(0).fill_nan(0).to_torch(dtype=pl.Float32).squeeze(-1)

    kwargs: dict[str, Any] = {"src": src, "dst": dst, "t": t}
    if spec.feature_cols:
        kwargs["msg"] = rows.select(list(spec.feature_cols)).fill_null(0).fill_nan(0).to_torch(dtype=pl.Float32)

    target = rows.select(spec.target_col).fill_null(0).fill_nan(0).to_torch(dtype=pl.Int64).squeeze(-1)
    kwargs["y"] = (target > 0).to(torch.int64) if spec.binary_target else target

    for col in spec.aux_label_cols:
        if col in rows.columns:
            kwargs[col] = rows.select(col).fill_null(0).fill_nan(0).to_torch(dtype=pl.Int64).squeeze(-1)

    return TemporalData(**kwargs)

default_graph_transforms

default_graph_transforms() -> list[GraphTransform]

Default graph transforms used in cache builds.

Source code in graphids/core/data/preprocessing/graph_ops.py
def default_graph_transforms() -> list[GraphTransform]:
    """Default graph transforms used in cache builds."""
    return [
        GraphTransform(
            name="edge_frequency",
            requires=("_wid", "src", "dst"),
            produces=("edge_freq",),
            fn=_add_edge_frequency,
        ),
        GraphTransform(
            name="bidir",
            requires=("_wid", "src", "dst"),
            produces=("bidir",),
            fn=_add_bidir,
        ),
        GraphTransform(
            name="topology",
            requires=("_wid", "node_id", "src", "dst"),
            produces=("clustering_coeff", "in_degree", "out_degree"),
            fn=_add_graph_topology,
        ),
    ]

graph_tables_to_pyg

graph_tables_to_pyg(tables: GraphTables, *, node_col_order: list[str], edge_col_order: tuple[str, ...], label_exprs: list[Expr]) -> tuple[Data, dict, int, int]

Compose staged tables into pre-collated PyG tensors.

Source code in graphids/core/data/preprocessing/pyg.py
def graph_tables_to_pyg(
    tables: GraphTables,
    *,
    node_col_order: list[str],
    edge_col_order: tuple[str, ...],
    label_exprs: list[pl.Expr],
) -> tuple[Data, dict, int, int]:
    """Compose staged tables into pre-collated PyG tensors."""
    label_names = [e.meta.output_name() for e in label_exprs]
    x = tables.node_stats.select(node_col_order).fill_null(0).fill_nan(0).to_torch(dtype=pl.Float32)
    node_id = tables.node_stats.select("node_id").to_torch(dtype=pl.Int64).squeeze(-1)
    edge_index = tables.edge_df.select("src_local", "dst_local").to_torch(dtype=pl.Int64).t().contiguous()
    edge_attr = tables.edge_df.select(list(edge_col_order)).fill_null(0).fill_nan(0).to_torch(dtype=pl.Float32)
    kept_wids = tables.node_stats.group_by("_wid", maintain_order=True).first().select("_wid")
    num_graphs = len(kept_wids)
    node_counts = tables.node_stats.group_by("_wid", maintain_order=True).len()["len"]
    edge_counts = tables.edge_df.group_by("_wid", maintain_order=True).len()["len"]
    node_slice = _slices_from_counts(node_counts)
    edge_slice = _slices_from_counts(edge_counts)
    graph_idx = torch.arange(num_graphs + 1, dtype=torch.long)
    labels_aligned = kept_wids.join(tables.labels, on="_wid", how="left").fill_null(0)
    label_tensors = {
        n: labels_aligned.select(n).to_torch(dtype=pl.Int64).squeeze(-1)
        for n in label_names
    }
    extra_tensors: dict[str, torch.Tensor] = {}
    extra_slices: dict[str, torch.Tensor] = {}
    node_optional_cols = {
        "sequence_id": "node_sequence_id",
        "sequence_step": "node_sequence_step",
        "sequence_length": "node_sequence_length",
        "sequence_stride": "node_sequence_stride",
        "snapshot_wid": "node_snapshot_wid",
    }
    for col, attr in node_optional_cols.items():
        if col in tables.node_stats.columns:
            extra_tensors[attr] = _optional_tensor(tables.node_stats, col, dtype=pl.Int64)
            extra_slices[attr] = node_slice

    edge_optional_cols = {
        "sequence_id": "edge_sequence_id",
        "sequence_step": "edge_sequence_step",
        "sequence_length": "edge_sequence_length",
        "sequence_stride": "edge_sequence_stride",
        "snapshot_wid": "edge_snapshot_wid",
    }
    for col, attr in edge_optional_cols.items():
        if col in tables.edge_df.columns:
            extra_tensors[attr] = _optional_tensor(tables.edge_df, col, dtype=pl.Int64)
            extra_slices[attr] = edge_slice

    graph_optional_cols = ("sequence_id", "sequence_length", "sequence_stride")
    for col in graph_optional_cols:
        if col in labels_aligned.columns:
            extra_tensors[col] = _optional_tensor(labels_aligned, col, dtype=pl.Int64)
            extra_slices[col] = graph_idx

    data = Data(
        x=x,
        edge_index=edge_index,
        edge_attr=edge_attr,
        node_id=node_id,
        **label_tensors,
        **extra_tensors,
    )
    slices = {
        "x": node_slice,
        "edge_index": edge_slice,
        "edge_attr": edge_slice,
        "node_id": node_slice,
        **{n: graph_idx for n in label_names},
        **extra_slices,
    }
    return data, slices, num_graphs, tables.n_rows

initialize_hypotheses

initialize_hypotheses(profiles: DataFrame) -> pl.DataFrame

Create an empty hypothesis table aligned to a profile table.

Source code in graphids/core/data/discovery/hypotheses.py
def initialize_hypotheses(profiles: pl.DataFrame) -> pl.DataFrame:
    """Create an empty hypothesis table aligned to a profile table."""
    required = ["vehicle_id", "arb_id", "signal_key"]
    missing = [c for c in required if c not in profiles.columns]
    if missing:
        raise ValueError(f"initialize_hypotheses missing columns: {missing}")
    return profiles.select("vehicle_id", "arb_id", "signal_key").with_columns(
        pl.lit(None, dtype=pl.Utf8).alias("candidate_canonical_id"),
        pl.lit(0.0).cast(pl.Float64).alias("confidence"),
        pl.lit("unreviewed").alias("status"),
        pl.lit("").cast(pl.Utf8).alias("evidence"),
    )

rank_signal_hypotheses

rank_signal_hypotheses(profiles: DataFrame, hypotheses: DataFrame | None = None) -> pl.DataFrame

Join profile scores to provisional hypotheses when available.

Source code in graphids/core/data/discovery/ranking.py
def rank_signal_hypotheses(
    profiles: pl.DataFrame,
    hypotheses: pl.DataFrame | None = None,
) -> pl.DataFrame:
    """Join profile scores to provisional hypotheses when available."""
    ranked = rank_signal_profiles(profiles)
    if hypotheses is None or hypotheses.is_empty():
        return ranked

    join_col = _pick_group_col(profiles)
    hyp_cols = [c for c in ("vehicle_id", "raw_signal", "candidate_canonical_id", "status", "confidence") if c in hypotheses.columns]
    if join_col in hypotheses.columns:
        cols = [join_col, *[c for c in hyp_cols if c != join_col]]
        return ranked.join(hypotheses.select(*cols), on=join_col, how="left")
    if "raw_signal" in hypotheses.columns and join_col != "raw_signal":
        cols = ["raw_signal", *[c for c in hyp_cols if c != "raw_signal"]]
        return ranked.join(hypotheses.select(*cols), left_on=join_col, right_on="raw_signal", how="left")
    return ranked

rank_signal_profiles

rank_signal_profiles(profiles: DataFrame) -> pl.DataFrame

Score per-signal profile rows by cross-vehicle support and stability.

The goal is not to claim a final ontology match; it is to give the discovery layer a concrete relational ranking pass that can surface stable signals for review.

Source code in graphids/core/data/discovery/ranking.py
def rank_signal_profiles(profiles: pl.DataFrame) -> pl.DataFrame:
    """Score per-signal profile rows by cross-vehicle support and stability.

    The goal is not to claim a final ontology match; it is to give the
    discovery layer a concrete relational ranking pass that can surface
    stable signals for review.
    """
    group_col = _pick_group_col(profiles)
    required = {group_col, "vehicle_id"}
    missing = sorted(required.difference(profiles.columns))
    if missing:
        raise ValueError(f"rank_signal_profiles missing columns: {missing}")

    optional_defaults = {
        "msg_count": 0.0,
        "entropy_mean": 0.0,
        "entropy_std": 0.0,
        "change_rate": 0.0,
        "attack_rate": 0.0,
        "skewness": 0.0,
        "kurtosis": 0.0,
    }
    for col, default in optional_defaults.items():
        if col not in profiles.columns:
            profiles = profiles.with_columns(pl.lit(default).cast(pl.Float64).alias(col))

    support = (
        profiles.group_by(group_col)
        .agg(
            pl.n_unique("vehicle_id").alias("vehicle_support"),
            pl.len().alias("profile_rows"),
            pl.col("msg_count").mean().fill_null(0).cast(pl.Float64).alias("msg_count_mean"),
            pl.col("msg_count").std().fill_null(0).cast(pl.Float64).alias("msg_count_std"),
            pl.col("entropy_mean").mean().fill_null(0).cast(pl.Float64).alias("entropy_mean"),
            pl.col("entropy_std").mean().fill_null(0).cast(pl.Float64).alias("entropy_std"),
            pl.col("change_rate").mean().fill_null(0).cast(pl.Float64).alias("change_rate"),
            pl.col("attack_rate").mean().fill_null(0).cast(pl.Float64).alias("attack_rate"),
            pl.col("skewness").mean().fill_null(0).cast(pl.Float64).alias("skewness"),
            pl.col("kurtosis").mean().fill_null(0).cast(pl.Float64).alias("kurtosis"),
        )
        .with_columns(
            (
                pl.col("vehicle_support").cast(pl.Float64)
                * (1.0 + (pl.col("msg_count_mean") + 1.0).log())
                / (1.0 + pl.col("msg_count_std"))
                / (1.0 + pl.col("entropy_std"))
                / (1.0 + pl.col("change_rate").abs())
                / (1.0 + pl.col("attack_rate").clip(0.0, 1.0))
            ).alias("ranking_score")
        )
        .sort(["ranking_score", "vehicle_support", "profile_rows"], descending=True)
    )
    return support

representation_digest

representation_digest(cfg: GraphRepresentationCfg) -> str

Short stable digest for paths and cache keys.

Source code in graphids/core/data/preprocessing/representations.py
def representation_digest(cfg: GraphRepresentationCfg) -> str:
    """Short stable digest for paths and cache keys."""
    payload = json.dumps(representation_payload(cfg), sort_keys=True, separators=(",", ":"))
    return hashlib.sha1(payload.encode("utf-8")).hexdigest()[:12]

representation_kind

representation_kind(cfg: GraphRepresentationCfg) -> str

Stable label for logging and config routing.

Source code in graphids/core/data/preprocessing/representations.py
def representation_kind(cfg: GraphRepresentationCfg) -> str:
    """Stable label for logging and config routing."""
    if isinstance(cfg, SnapshotRepresentationCfg):
        return "snapshot"
    if isinstance(cfg, SnapshotSequenceRepresentationCfg):
        return "snapshot_sequence"
    if isinstance(cfg, MultiScaleRepresentationCfg):
        return "multi_scale"
    if isinstance(cfg, TemporalRepresentationCfg):
        return "temporal"
    if isinstance(cfg, EntityRepresentationCfg):
        return "entity"
    raise TypeError(f"unsupported representation config: {type(cfg)!r}")

representation_payload

representation_payload(cfg: GraphRepresentationCfg) -> dict[str, object]

Stable JSON-serializable payload for cache identity and metadata.

Source code in graphids/core/data/preprocessing/representations.py
def representation_payload(cfg: GraphRepresentationCfg) -> dict[str, object]:
    """Stable JSON-serializable payload for cache identity and metadata."""
    return asdict(cfg)

representation_plan

representation_plan(cfg: GraphRepresentationCfg) -> GraphRepresentationPlan

Wrap a representation config with its stable kind label.

Source code in graphids/core/data/preprocessing/representations.py
def representation_plan(cfg: GraphRepresentationCfg) -> GraphRepresentationPlan:
    """Wrap a representation config with its stable kind label."""
    return GraphRepresentationPlan(kind=representation_kind(cfg), cfg=cfg)

representation_segment

representation_segment(cfg: GraphRepresentationCfg) -> WindowSegmentCfg | SequenceSegmentCfg | MultiScaleSegmentCfg | EntitySegmentCfg

Map a representation config to the corresponding segment primitive.

Source code in graphids/core/data/preprocessing/representations.py
def representation_segment(
    cfg: GraphRepresentationCfg,
) -> WindowSegmentCfg | SequenceSegmentCfg | MultiScaleSegmentCfg | EntitySegmentCfg:
    """Map a representation config to the corresponding segment primitive."""
    if isinstance(cfg, SnapshotRepresentationCfg):
        return WindowSegmentCfg(window_size=cfg.window_size, stride=cfg.stride)
    if isinstance(cfg, SnapshotSequenceRepresentationCfg):
        return SequenceSegmentCfg(
            window_size=cfg.window_size,
            stride=cfg.stride,
            sequence_length=cfg.sequence_length,
            sequence_stride=cfg.sequence_stride,
        )
    if isinstance(cfg, MultiScaleRepresentationCfg):
        return MultiScaleSegmentCfg(window_sizes=cfg.window_sizes, stride=cfg.stride)
    if isinstance(cfg, EntityRepresentationCfg):
        return EntitySegmentCfg(
            anchor_column=cfg.anchor_column,
            anchor_value=cfg.anchor_value,
            history_window_size=cfg.history_window_size,
            future_window_size=cfg.future_window_size,
        )
    raise TypeError(
        f"representation {type(cfg).__name__} does not map to a segment primitive"
    )

representation_temporal_spec

representation_temporal_spec(cfg: GraphRepresentationCfg) -> TemporalGraphSpec

Map a representation config to the temporal-stream spec.

Source code in graphids/core/data/preprocessing/representations.py
def representation_temporal_spec(cfg: GraphRepresentationCfg) -> TemporalGraphSpec:
    """Map a representation config to the temporal-stream spec."""
    if isinstance(cfg, TemporalRepresentationCfg):
        return TemporalGraphSpec(
            time_col=cfg.time_col,
            binary_target=cfg.binary_target,
            feature_cols=(),
            target_col="attack",
            aux_label_cols=("attack_type",),
        )
    raise TypeError(
        f"representation {type(cfg).__name__} does not map to a temporal spec"
    )

representation_view

representation_view(cfg: GraphRepresentationCfg) -> ViewCfg

Map a representation config to the corresponding public view config.

Source code in graphids/core/data/preprocessing/representations.py
def representation_view(cfg: GraphRepresentationCfg) -> ViewCfg:
    """Map a representation config to the corresponding public view config."""
    if isinstance(cfg, SnapshotRepresentationCfg):
        return SnapshotViewCfg(window_size=cfg.window_size, stride=cfg.stride)
    if isinstance(cfg, SnapshotSequenceRepresentationCfg):
        return SnapshotSequenceViewCfg(
            window_size=cfg.window_size,
            stride=cfg.stride,
            sequence_length=cfg.sequence_length,
            sequence_stride=cfg.sequence_stride,
        )
    if isinstance(cfg, MultiScaleRepresentationCfg):
        return MultiScaleViewCfg(window_sizes=cfg.window_sizes, stride=cfg.stride)
    if isinstance(cfg, TemporalRepresentationCfg):
        return RollingStreamViewCfg(
            history_messages=cfg.history_messages or 500,
            prediction_horizon=1,
        )
    if isinstance(cfg, EntityRepresentationCfg):
        return EntityViewCfg(
            anchor_column=cfg.anchor_column,
            anchor_value=cfg.anchor_value,
            history_window_size=cfg.history_window_size,
            future_window_size=cfg.future_window_size,
        )
    raise TypeError(f"unsupported representation config: {type(cfg)!r}")

representation_window_defaults

representation_window_defaults(cfg: GraphRepresentationCfg) -> tuple[int, int]

Derive legacy window knobs from the explicit representation config.

Source code in graphids/core/data/preprocessing/representations.py
def representation_window_defaults(cfg: GraphRepresentationCfg) -> tuple[int, int]:
    """Derive legacy window knobs from the explicit representation config."""
    if isinstance(cfg, SnapshotRepresentationCfg):
        return cfg.window_size, cfg.stride
    if isinstance(cfg, SnapshotSequenceRepresentationCfg):
        return cfg.window_size, cfg.stride
    if isinstance(cfg, MultiScaleRepresentationCfg):
        return min(cfg.window_sizes), cfg.stride
    if isinstance(cfg, EntityRepresentationCfg):
        return (
            cfg.history_window_size + cfg.future_window_size + 1,
            max(1, cfg.future_window_size or 1),
        )
    if isinstance(cfg, TemporalRepresentationCfg):
        return 100, 100
    raise TypeError(f"unsupported representation config: {type(cfg)!r}")

secondary_graph_transforms

secondary_graph_transforms() -> list[GraphTransform]

Additional exploratory graph transforms used in feature tests.

Source code in graphids/core/data/preprocessing/graph_ops.py
def secondary_graph_transforms() -> list[GraphTransform]:
    """Additional exploratory graph transforms used in feature tests."""
    return [
        GraphTransform(
            name="secondary_node_stats",
            requires=("in_degree", "out_degree"),
            produces=("in_out_ratio", "neighbor_entropy"),
            fn=_add_secondary_node_stats,
        )
    ]

segment_kind

segment_kind(cfg: SegmentCfg) -> str

Stable human-readable label for logging and config routing.

Source code in graphids/core/data/preprocessing/segments.py
def segment_kind(cfg: SegmentCfg) -> str:
    """Stable human-readable label for logging and config routing."""
    if isinstance(cfg, WindowSegmentCfg):
        return "window"
    if isinstance(cfg, SequenceSegmentCfg):
        return "sequence"
    if isinstance(cfg, MultiScaleSegmentCfg):
        return "multi_scale"
    if isinstance(cfg, EntitySegmentCfg):
        return "entity"
    raise TypeError(f"unsupported segment config: {type(cfg)!r}")

segment_plan

segment_plan(cfg: SegmentCfg) -> GraphSegmentPlan

Wrap a segment config with its stable kind label.

Source code in graphids/core/data/preprocessing/segments.py
def segment_plan(cfg: SegmentCfg) -> GraphSegmentPlan:
    """Wrap a segment config with its stable kind label."""
    return GraphSegmentPlan(kind=segment_kind(cfg), cfg=cfg)

temporal_edge_policy

temporal_edge_policy(*, src_col: str = 'node_id', dst_col: str = 'node_id', dst_shift: int = -1) -> EdgePolicy

Temporal adjacency policy: edge from row t to row t + dst_shift.

Source code in graphids/core/data/preprocessing/edge_policy.py
def temporal_edge_policy(
    *,
    src_col: str = "node_id",
    dst_col: str = "node_id",
    dst_shift: int = -1,
) -> EdgePolicy:
    """Temporal adjacency policy: edge from row ``t`` to row ``t + dst_shift``."""
    return EdgePolicy(
        name="temporal_shift",
        src_col=src_col,
        dst_col=dst_col,
        dst_shift=dst_shift,
    )

temporal_len

temporal_len(data: Any) -> int

Return the number of events if the object exposes it.

Source code in graphids/core/data/preprocessing/temporal.py
def temporal_len(data: Any) -> int:
    """Return the number of events if the object exposes it."""
    return int(getattr(data, "num_events", len(data)))

view_kind

view_kind(view: ViewCfg) -> ViewKind

Stable human-readable label for config selection and logging.

Source code in graphids/core/data/preprocessing/views.py
def view_kind(view: ViewCfg) -> ViewKind:
    """Stable human-readable label for config selection and logging."""
    if isinstance(view, SnapshotViewCfg):
        return "snapshot"
    if isinstance(view, SnapshotSequenceViewCfg):
        return "snapshot_sequence"
    if isinstance(view, MultiScaleViewCfg):
        return "multi_scale"
    if isinstance(view, EventChunkViewCfg):
        return "event_chunk"
    if isinstance(view, RollingStreamViewCfg):
        return "rolling_stream"
    if isinstance(view, EntityViewCfg):
        return "entity"
    raise TypeError(f"unsupported view config: {type(view)!r}")

curriculum

Curriculum difficulty scorers used by the graph datamodule.

score_random
score_random(graphs: list, seed: int = 0) -> torch.Tensor

Uniform random per-graph difficulty for curriculum control runs.

Source code in graphids/core/data/preprocessing/curriculum.py
def score_random(graphs: list, seed: int = 0) -> torch.Tensor:
    """Uniform random per-graph difficulty for curriculum control runs."""
    g = torch.Generator().manual_seed(int(seed))
    return torch.rand(len(graphs), generator=g)
score_vgae
score_vgae(graphs: list, ckpt_path: str) -> torch.Tensor

Per-graph reconstruction MSE from a trained VGAE checkpoint.

Higher = harder. Loads the VGAE on CPU, computes per-graph mean MSE via torch_geometric.utils.scatter, releases the model.

Source code in graphids/core/data/preprocessing/curriculum.py
@torch.no_grad()
def score_vgae(graphs: list, ckpt_path: str) -> torch.Tensor:
    """Per-graph reconstruction MSE from a trained VGAE checkpoint.

    Higher = harder. Loads the VGAE on CPU, computes per-graph mean MSE
    via ``torch_geometric.utils.scatter``, releases the model.
    """
    if not ckpt_path:
        raise ValueError("score_vgae requires a non-empty ckpt_path")

    from torch_geometric.loader import DataLoader as PyGDataLoader
    from torch_geometric.utils import scatter

    from graphids.core.models.base import safe_load_checkpoint

    vgae = safe_load_checkpoint("vgae", Path(ckpt_path), map_location="cpu")
    try:
        device = next(vgae.parameters()).device
        was_training = vgae.training
        vgae.eval()
        try:
            scores: list[float] = []
            for batch in PyGDataLoader(graphs, batch_size=500):
                batch = batch.clone().to(device, non_blocking=True)
                cont, _canid, _nbr, _z, _kl, _edge = vgae(batch)
                node_mse = (cont - batch.x).pow(2).mean(dim=1)
                graph_mse = scatter(node_mse, batch.batch, reduce="mean")
                scores.extend(graph_mse.tolist())
        finally:
            vgae.train(was_training)
    finally:
        del vgae
        gc.collect()
    return torch.tensor(scores, dtype=torch.float)

edge_policy

Declarative edge construction policies for graph preprocessing.

EdgePolicy dataclass
EdgePolicy(name: str, src_col: str = 'node_id', dst_col: str = 'node_id', dst_shift: int = -1, src_alias: str = 'src', dst_alias: str = 'dst')

How to derive directed edges from windowed rows.

temporal_edge_policy
temporal_edge_policy(*, src_col: str = 'node_id', dst_col: str = 'node_id', dst_shift: int = -1) -> EdgePolicy

Temporal adjacency policy: edge from row t to row t + dst_shift.

Source code in graphids/core/data/preprocessing/edge_policy.py
def temporal_edge_policy(
    *,
    src_col: str = "node_id",
    dst_col: str = "node_id",
    dst_shift: int = -1,
) -> EdgePolicy:
    """Temporal adjacency policy: edge from row ``t`` to row ``t + dst_shift``."""
    return EdgePolicy(
        name="temporal_shift",
        src_col=src_col,
        dst_col=dst_col,
        dst_shift=dst_shift,
    )

graph_ops

Composable graph transforms over node/edge preprocessing tables.

GraphTransform dataclass
GraphTransform(name: str, requires: tuple[str, ...], produces: tuple[str, ...], fn: Callable[[DataFrame, DataFrame], tuple[DataFrame, DataFrame]])

A declarative graph transform with explicit input/output columns.

default_graph_transforms
default_graph_transforms() -> list[GraphTransform]

Default graph transforms used in cache builds.

Source code in graphids/core/data/preprocessing/graph_ops.py
def default_graph_transforms() -> list[GraphTransform]:
    """Default graph transforms used in cache builds."""
    return [
        GraphTransform(
            name="edge_frequency",
            requires=("_wid", "src", "dst"),
            produces=("edge_freq",),
            fn=_add_edge_frequency,
        ),
        GraphTransform(
            name="bidir",
            requires=("_wid", "src", "dst"),
            produces=("bidir",),
            fn=_add_bidir,
        ),
        GraphTransform(
            name="topology",
            requires=("_wid", "node_id", "src", "dst"),
            produces=("clustering_coeff", "in_degree", "out_degree"),
            fn=_add_graph_topology,
        ),
    ]
secondary_graph_transforms
secondary_graph_transforms() -> list[GraphTransform]

Additional exploratory graph transforms used in feature tests.

Source code in graphids/core/data/preprocessing/graph_ops.py
def secondary_graph_transforms() -> list[GraphTransform]:
    """Additional exploratory graph transforms used in feature tests."""
    return [
        GraphTransform(
            name="secondary_node_stats",
            requires=("in_degree", "out_degree"),
            produces=("in_out_ratio", "neighbor_entropy"),
            fn=_add_secondary_node_stats,
        )
    ]

materialization

Graph table materialization from windowed preprocessing primitives.

build_graph_tables
build_graph_tables(df: DataFrame, *, node_stat_exprs: list[Expr], label_exprs: list[Expr], edge_policy: EdgePolicy | None = None, edge_stat_exprs: list[Expr], edge_base_cols: list[str], graph_transforms: list[GraphTransform] | None = None, debug_artifacts_dir: str | Path | None = None, segment_cfg: WindowSegmentCfg | SequenceSegmentCfg | MultiScaleSegmentCfg | EntitySegmentCfg | None = None) -> GraphTables

Compose the graph preprocessing primitives into staged graph tables.

Source code in graphids/core/data/preprocessing/materialization.py
def build_graph_tables(
    df: pl.DataFrame,
    *,
    node_stat_exprs: list[pl.Expr],
    label_exprs: list[pl.Expr],
    edge_policy: EdgePolicy | None = None,
    edge_stat_exprs: list[pl.Expr],
    edge_base_cols: list[str],
    graph_transforms: list[GraphTransform] | None = None,
    debug_artifacts_dir: str | Path | None = None,
    segment_cfg: WindowSegmentCfg | SequenceSegmentCfg | MultiScaleSegmentCfg | EntitySegmentCfg | None = None,
) -> GraphTables:
    """Compose the graph preprocessing primitives into staged graph tables."""
    if segment_cfg is None:
        raise ValueError("build_graph_tables requires an explicit segment_cfg")
    if isinstance(segment_cfg, MultiScaleSegmentCfg):
        out: list[GraphTables] = []
        offset = 0
        for scale_id, scale_window_size in enumerate(segment_cfg.window_sizes):
            tables = _build_graph_tables_windowed(
                df,
                window_size=scale_window_size,
                stride=segment_cfg.stride,
                node_stat_exprs=node_stat_exprs,
                label_exprs=label_exprs,
                edge_policy=edge_policy,
                edge_stat_exprs=edge_stat_exprs,
                edge_base_cols=edge_base_cols,
                graph_transforms=graph_transforms,
                debug_artifacts_dir=debug_artifacts_dir,
                tags={"scale_id": scale_id, "scale_window_size": scale_window_size},
            )
            if tables.node_stats.is_empty():
                continue
            out.append(GraphTables(
                node_stats=tables.node_stats.with_columns((pl.col("_wid") + offset).alias("_wid")),
                edge_df=tables.edge_df.with_columns((pl.col("_wid") + offset).alias("_wid")),
                labels=tables.labels.with_columns((pl.col("_wid") + offset).alias("_wid")),
                n_rows=tables.n_rows,
            ))
            offset += int(tables.node_stats.select(pl.col("_wid").max()).item()) + 1
        if not out:
            return GraphTables(pl.DataFrame(), pl.DataFrame(), pl.DataFrame(), len(df))
        return GraphTables(
            node_stats=pl.concat([t.node_stats for t in out], how="vertical"),
            edge_df=pl.concat([t.edge_df for t in out], how="vertical"),
            labels=pl.concat([t.labels for t in out], how="vertical"),
            n_rows=len(df),
        )

    if isinstance(segment_cfg, SequenceSegmentCfg):
        return _build_graph_tables_sequence(
            df,
            cfg=segment_cfg,
            node_stat_exprs=node_stat_exprs,
            label_exprs=label_exprs,
            edge_policy=edge_policy,
            edge_stat_exprs=edge_stat_exprs,
            edge_base_cols=edge_base_cols,
            graph_transforms=graph_transforms,
            debug_artifacts_dir=debug_artifacts_dir,
        )

    if isinstance(segment_cfg, EntitySegmentCfg):
        return _build_graph_tables_windowed(
            df,
            window_size=segment_cfg.history_window_size + segment_cfg.future_window_size + 1,
            stride=max(1, segment_cfg.future_window_size or 1),
            node_stat_exprs=node_stat_exprs,
            label_exprs=label_exprs,
            edge_policy=edge_policy,
            edge_stat_exprs=edge_stat_exprs,
            edge_base_cols=edge_base_cols,
            graph_transforms=graph_transforms,
            debug_artifacts_dir=debug_artifacts_dir,
            tags={
                "anchor_column": segment_cfg.anchor_column,
                "anchor_value": segment_cfg.anchor_value if segment_cfg.anchor_value is not None else "",
            },
        )

    if isinstance(segment_cfg, WindowSegmentCfg):
        return _build_graph_tables_windowed(
            df,
            window_size=segment_cfg.window_size,
            stride=segment_cfg.stride,
            node_stat_exprs=node_stat_exprs,
            label_exprs=label_exprs,
            edge_policy=edge_policy,
            edge_stat_exprs=edge_stat_exprs,
            edge_base_cols=edge_base_cols,
            graph_transforms=graph_transforms,
            debug_artifacts_dir=debug_artifacts_dir,
        )
    raise TypeError(f"unsupported segment config: {type(segment_cfg)!r}")

metadata

Cache metadata contract for dataset builds.

load_metadata
load_metadata(cache_dir: Path) -> dict[str, Any]

Read and version-gate cache_metadata.json.

Source code in graphids/core/data/preprocessing/metadata.py
def load_metadata(cache_dir: Path) -> dict[str, Any]:
    """Read and version-gate ``cache_metadata.json``."""
    path = cache_dir / "cache_metadata.json"
    if not path.exists():
        raise FileNotFoundError(f"cache_metadata.json missing at {path}; run rebuild-caches")
    meta = json.loads(path.read_text())
    ver = meta.get("metadata_schema_version")
    if ver != METADATA_SCHEMA_VERSION:
        raise ValueError(
            f"{path} schema {ver!r} != expected {METADATA_SCHEMA_VERSION}; rebuild caches"
        )
    return meta
merge_split_into_metadata
merge_split_into_metadata(cache_dir: Path, split_name: str, split_entry: dict[str, Any], *, invariants: dict[str, Any], dataset_name: str, num_arb_ids: int) -> dict[str, Any]

Merge one split's entry into cache_metadata.json under FileLock.

First writer seeds top-level fields; later writers must match invariants + dataset name or raise.

Source code in graphids/core/data/preprocessing/metadata.py
def merge_split_into_metadata(
    cache_dir: Path,
    split_name: str,
    split_entry: dict[str, Any],
    *,
    invariants: dict[str, Any],
    dataset_name: str,
    num_arb_ids: int,
) -> dict[str, Any]:
    """Merge one split's entry into ``cache_metadata.json`` under FileLock.

    First writer seeds top-level fields; later writers must match
    invariants + dataset name or raise.
    """
    cache_dir.mkdir(parents=True, exist_ok=True)
    meta_path = cache_dir / "cache_metadata.json"

    missing = [k for k in INVARIANT_KEYS if k not in invariants]
    if missing:
        raise ValueError(f"invariants missing required keys: {missing}")

    with FileLock(str(cache_dir / ".metadata_lock")):
        existing: dict[str, Any] = {}
        if meta_path.exists():
            existing = json.loads(meta_path.read_text())
            ver = existing.get("metadata_schema_version")
            if ver not in (None, METADATA_SCHEMA_VERSION):
                raise ValueError(
                    f"{meta_path} schema {ver!r} != {METADATA_SCHEMA_VERSION}; "
                    "delete or rebuild --delete-existing"
                )
            for k in INVARIANT_KEYS:
                if k in existing and existing[k] != invariants[k]:
                    raise ValueError(
                        f"{meta_path} invariant mismatch: {k}={existing[k]!r} "
                        f"!= writer {invariants[k]!r}; rebuild caches"
                    )
            if existing.get("dataset") not in (None, dataset_name):
                raise ValueError(
                    f"{meta_path} dataset={existing.get('dataset')!r} != writer {dataset_name!r}"
                )

        meta: dict[str, Any] = {
            "metadata_schema_version": METADATA_SCHEMA_VERSION,
            "dataset": dataset_name,
            "built_at": existing.get("built_at") or datetime.now(UTC).isoformat(),
            "num_arb_ids": num_arb_ids,
            **{k: invariants[k] for k in INVARIANT_KEYS},
            "splits": dict(existing.get("splits") or {}),
        }
        meta["splits"][split_name] = split_entry
        meta["aggregate"] = _aggregate(meta["splits"])
        atomic_write_text(meta_path, json.dumps(meta, indent=2, sort_keys=True))
        return meta

pipeline

Thin composer over graph preprocessing primitives.

GraphPipeline dataclass
GraphPipeline(node_stat_exprs: list[Expr], edge_stat_exprs: list[Expr], node_col_order: list[str], edge_col_order: tuple[str, ...], label_exprs: list[Expr], edge_base_cols: list[str], edge_policy: EdgePolicy | None = None, graph_transforms: list[GraphTransform] | None = None, debug_artifacts_dir: str | None = None, representation_cfg: GraphRepresentationCfg = SnapshotRepresentationCfg(), segment_cfg: WindowSegmentCfg | SequenceSegmentCfg | MultiScaleSegmentCfg | EntitySegmentCfg | None = None)

Pure config carrier for graph preprocessing primitives.

pyg

PyG tensor packing primitives for staged graph tables.

graph_tables_to_pyg
graph_tables_to_pyg(tables: GraphTables, *, node_col_order: list[str], edge_col_order: tuple[str, ...], label_exprs: list[Expr]) -> tuple[Data, dict, int, int]

Compose staged tables into pre-collated PyG tensors.

Source code in graphids/core/data/preprocessing/pyg.py
def graph_tables_to_pyg(
    tables: GraphTables,
    *,
    node_col_order: list[str],
    edge_col_order: tuple[str, ...],
    label_exprs: list[pl.Expr],
) -> tuple[Data, dict, int, int]:
    """Compose staged tables into pre-collated PyG tensors."""
    label_names = [e.meta.output_name() for e in label_exprs]
    x = tables.node_stats.select(node_col_order).fill_null(0).fill_nan(0).to_torch(dtype=pl.Float32)
    node_id = tables.node_stats.select("node_id").to_torch(dtype=pl.Int64).squeeze(-1)
    edge_index = tables.edge_df.select("src_local", "dst_local").to_torch(dtype=pl.Int64).t().contiguous()
    edge_attr = tables.edge_df.select(list(edge_col_order)).fill_null(0).fill_nan(0).to_torch(dtype=pl.Float32)
    kept_wids = tables.node_stats.group_by("_wid", maintain_order=True).first().select("_wid")
    num_graphs = len(kept_wids)
    node_counts = tables.node_stats.group_by("_wid", maintain_order=True).len()["len"]
    edge_counts = tables.edge_df.group_by("_wid", maintain_order=True).len()["len"]
    node_slice = _slices_from_counts(node_counts)
    edge_slice = _slices_from_counts(edge_counts)
    graph_idx = torch.arange(num_graphs + 1, dtype=torch.long)
    labels_aligned = kept_wids.join(tables.labels, on="_wid", how="left").fill_null(0)
    label_tensors = {
        n: labels_aligned.select(n).to_torch(dtype=pl.Int64).squeeze(-1)
        for n in label_names
    }
    extra_tensors: dict[str, torch.Tensor] = {}
    extra_slices: dict[str, torch.Tensor] = {}
    node_optional_cols = {
        "sequence_id": "node_sequence_id",
        "sequence_step": "node_sequence_step",
        "sequence_length": "node_sequence_length",
        "sequence_stride": "node_sequence_stride",
        "snapshot_wid": "node_snapshot_wid",
    }
    for col, attr in node_optional_cols.items():
        if col in tables.node_stats.columns:
            extra_tensors[attr] = _optional_tensor(tables.node_stats, col, dtype=pl.Int64)
            extra_slices[attr] = node_slice

    edge_optional_cols = {
        "sequence_id": "edge_sequence_id",
        "sequence_step": "edge_sequence_step",
        "sequence_length": "edge_sequence_length",
        "sequence_stride": "edge_sequence_stride",
        "snapshot_wid": "edge_snapshot_wid",
    }
    for col, attr in edge_optional_cols.items():
        if col in tables.edge_df.columns:
            extra_tensors[attr] = _optional_tensor(tables.edge_df, col, dtype=pl.Int64)
            extra_slices[attr] = edge_slice

    graph_optional_cols = ("sequence_id", "sequence_length", "sequence_stride")
    for col in graph_optional_cols:
        if col in labels_aligned.columns:
            extra_tensors[col] = _optional_tensor(labels_aligned, col, dtype=pl.Int64)
            extra_slices[col] = graph_idx

    data = Data(
        x=x,
        edge_index=edge_index,
        edge_attr=edge_attr,
        node_id=node_id,
        **label_tensors,
        **extra_tensors,
    )
    slices = {
        "x": node_slice,
        "edge_index": edge_slice,
        "edge_attr": edge_slice,
        "node_id": node_slice,
        **{n: graph_idx for n in label_names},
        **extra_slices,
    }
    return data, slices, num_graphs, tables.n_rows

representations

Explicit graph-representation configs for training and discovery.

EntityRepresentationCfg dataclass
EntityRepresentationCfg(kind: Literal['entity'] = 'entity', anchor_column: str = 'node_id', anchor_value: str | int | None = None, history_window_size: int = 100, future_window_size: int = 0)

Entity-centric representation centered on one signal or message family.

GraphRepresentationPlan dataclass
GraphRepresentationPlan(kind: Literal['snapshot', 'snapshot_sequence', 'multi_scale', 'temporal', 'entity'], cfg: GraphRepresentationCfg)

Resolved representation kind and config payload.

MultiScaleRepresentationCfg dataclass
MultiScaleRepresentationCfg(kind: Literal['multi_scale'] = 'multi_scale', window_sizes: tuple[int, ...] = (50, 100, 200), stride: int = 100)

Parallel snapshots at multiple window sizes.

SnapshotRepresentationCfg dataclass
SnapshotRepresentationCfg(kind: Literal['snapshot'] = 'snapshot', window_size: int = 100, stride: int = 100)

One graph per sliding window.

SnapshotSequenceRepresentationCfg dataclass
SnapshotSequenceRepresentationCfg(kind: Literal['snapshot_sequence'] = 'snapshot_sequence', window_size: int = 100, stride: int = 100, sequence_length: int = 4, sequence_stride: int = 1)

Ordered sequence of snapshot graphs.

TemporalRepresentationCfg dataclass
TemporalRepresentationCfg(kind: Literal['temporal'] = 'temporal', time_col: str = 'timestamp', binary_target: bool = True, history_messages: int | None = None)

Event stream representation built as PyG TemporalData.

representation_digest
representation_digest(cfg: GraphRepresentationCfg) -> str

Short stable digest for paths and cache keys.

Source code in graphids/core/data/preprocessing/representations.py
def representation_digest(cfg: GraphRepresentationCfg) -> str:
    """Short stable digest for paths and cache keys."""
    payload = json.dumps(representation_payload(cfg), sort_keys=True, separators=(",", ":"))
    return hashlib.sha1(payload.encode("utf-8")).hexdigest()[:12]
representation_kind
representation_kind(cfg: GraphRepresentationCfg) -> str

Stable label for logging and config routing.

Source code in graphids/core/data/preprocessing/representations.py
def representation_kind(cfg: GraphRepresentationCfg) -> str:
    """Stable label for logging and config routing."""
    if isinstance(cfg, SnapshotRepresentationCfg):
        return "snapshot"
    if isinstance(cfg, SnapshotSequenceRepresentationCfg):
        return "snapshot_sequence"
    if isinstance(cfg, MultiScaleRepresentationCfg):
        return "multi_scale"
    if isinstance(cfg, TemporalRepresentationCfg):
        return "temporal"
    if isinstance(cfg, EntityRepresentationCfg):
        return "entity"
    raise TypeError(f"unsupported representation config: {type(cfg)!r}")
representation_payload
representation_payload(cfg: GraphRepresentationCfg) -> dict[str, object]

Stable JSON-serializable payload for cache identity and metadata.

Source code in graphids/core/data/preprocessing/representations.py
def representation_payload(cfg: GraphRepresentationCfg) -> dict[str, object]:
    """Stable JSON-serializable payload for cache identity and metadata."""
    return asdict(cfg)
representation_plan
representation_plan(cfg: GraphRepresentationCfg) -> GraphRepresentationPlan

Wrap a representation config with its stable kind label.

Source code in graphids/core/data/preprocessing/representations.py
def representation_plan(cfg: GraphRepresentationCfg) -> GraphRepresentationPlan:
    """Wrap a representation config with its stable kind label."""
    return GraphRepresentationPlan(kind=representation_kind(cfg), cfg=cfg)
representation_segment
representation_segment(cfg: GraphRepresentationCfg) -> WindowSegmentCfg | SequenceSegmentCfg | MultiScaleSegmentCfg | EntitySegmentCfg

Map a representation config to the corresponding segment primitive.

Source code in graphids/core/data/preprocessing/representations.py
def representation_segment(
    cfg: GraphRepresentationCfg,
) -> WindowSegmentCfg | SequenceSegmentCfg | MultiScaleSegmentCfg | EntitySegmentCfg:
    """Map a representation config to the corresponding segment primitive."""
    if isinstance(cfg, SnapshotRepresentationCfg):
        return WindowSegmentCfg(window_size=cfg.window_size, stride=cfg.stride)
    if isinstance(cfg, SnapshotSequenceRepresentationCfg):
        return SequenceSegmentCfg(
            window_size=cfg.window_size,
            stride=cfg.stride,
            sequence_length=cfg.sequence_length,
            sequence_stride=cfg.sequence_stride,
        )
    if isinstance(cfg, MultiScaleRepresentationCfg):
        return MultiScaleSegmentCfg(window_sizes=cfg.window_sizes, stride=cfg.stride)
    if isinstance(cfg, EntityRepresentationCfg):
        return EntitySegmentCfg(
            anchor_column=cfg.anchor_column,
            anchor_value=cfg.anchor_value,
            history_window_size=cfg.history_window_size,
            future_window_size=cfg.future_window_size,
        )
    raise TypeError(
        f"representation {type(cfg).__name__} does not map to a segment primitive"
    )
representation_temporal_spec
representation_temporal_spec(cfg: GraphRepresentationCfg) -> TemporalGraphSpec

Map a representation config to the temporal-stream spec.

Source code in graphids/core/data/preprocessing/representations.py
def representation_temporal_spec(cfg: GraphRepresentationCfg) -> TemporalGraphSpec:
    """Map a representation config to the temporal-stream spec."""
    if isinstance(cfg, TemporalRepresentationCfg):
        return TemporalGraphSpec(
            time_col=cfg.time_col,
            binary_target=cfg.binary_target,
            feature_cols=(),
            target_col="attack",
            aux_label_cols=("attack_type",),
        )
    raise TypeError(
        f"representation {type(cfg).__name__} does not map to a temporal spec"
    )
representation_view
representation_view(cfg: GraphRepresentationCfg) -> ViewCfg

Map a representation config to the corresponding public view config.

Source code in graphids/core/data/preprocessing/representations.py
def representation_view(cfg: GraphRepresentationCfg) -> ViewCfg:
    """Map a representation config to the corresponding public view config."""
    if isinstance(cfg, SnapshotRepresentationCfg):
        return SnapshotViewCfg(window_size=cfg.window_size, stride=cfg.stride)
    if isinstance(cfg, SnapshotSequenceRepresentationCfg):
        return SnapshotSequenceViewCfg(
            window_size=cfg.window_size,
            stride=cfg.stride,
            sequence_length=cfg.sequence_length,
            sequence_stride=cfg.sequence_stride,
        )
    if isinstance(cfg, MultiScaleRepresentationCfg):
        return MultiScaleViewCfg(window_sizes=cfg.window_sizes, stride=cfg.stride)
    if isinstance(cfg, TemporalRepresentationCfg):
        return RollingStreamViewCfg(
            history_messages=cfg.history_messages or 500,
            prediction_horizon=1,
        )
    if isinstance(cfg, EntityRepresentationCfg):
        return EntityViewCfg(
            anchor_column=cfg.anchor_column,
            anchor_value=cfg.anchor_value,
            history_window_size=cfg.history_window_size,
            future_window_size=cfg.future_window_size,
        )
    raise TypeError(f"unsupported representation config: {type(cfg)!r}")
representation_window_defaults
representation_window_defaults(cfg: GraphRepresentationCfg) -> tuple[int, int]

Derive legacy window knobs from the explicit representation config.

Source code in graphids/core/data/preprocessing/representations.py
def representation_window_defaults(cfg: GraphRepresentationCfg) -> tuple[int, int]:
    """Derive legacy window knobs from the explicit representation config."""
    if isinstance(cfg, SnapshotRepresentationCfg):
        return cfg.window_size, cfg.stride
    if isinstance(cfg, SnapshotSequenceRepresentationCfg):
        return cfg.window_size, cfg.stride
    if isinstance(cfg, MultiScaleRepresentationCfg):
        return min(cfg.window_sizes), cfg.stride
    if isinstance(cfg, EntityRepresentationCfg):
        return (
            cfg.history_window_size + cfg.future_window_size + 1,
            max(1, cfg.future_window_size or 1),
        )
    if isinstance(cfg, TemporalRepresentationCfg):
        return 100, 100
    raise TypeError(f"unsupported representation config: {type(cfg)!r}")

scaler

Per-column feature scalers for tensor-based graph preprocessing.

segments

Public segment primitives for alternate dataset views.

EntitySegmentCfg dataclass
EntitySegmentCfg(anchor_column: str = 'node_id', anchor_value: str | int | None = None, history_window_size: int = 100, future_window_size: int = 0)

A segment centered on one arbitration ID or message family.

GraphSegmentPlan dataclass
GraphSegmentPlan(kind: Literal['window', 'sequence', 'multi_scale', 'entity'], cfg: SegmentCfg)

Resolved sample-shape plan for a dataset view.

MultiScaleSegmentCfg dataclass
MultiScaleSegmentCfg(window_sizes: tuple[int, ...], stride: int)

Parallel windows at multiple temporal scales.

Segmenter

Bases: Protocol

Primitive that turns raw rows into a shaped sample view.

SequenceSegmentCfg dataclass
SequenceSegmentCfg(window_size: int, stride: int, sequence_length: int = 4, sequence_stride: int = 1)

A sequence of ordered windows from the same raw stream.

WindowSegmentCfg dataclass
WindowSegmentCfg(window_size: int, stride: int)

One fixed window of rows.

WindowSegmenter dataclass
WindowSegmenter(window_size: int, stride: int)

Default snapshot segmenter: one fixed sliding window per graph.

WindowedRows dataclass
WindowedRows(rows: DataFrame, n_rows: int, n_windows: int, max_wid: int)

Rows plus derived window metadata for snapshot-style segments.

segment_kind
segment_kind(cfg: SegmentCfg) -> str

Stable human-readable label for logging and config routing.

Source code in graphids/core/data/preprocessing/segments.py
def segment_kind(cfg: SegmentCfg) -> str:
    """Stable human-readable label for logging and config routing."""
    if isinstance(cfg, WindowSegmentCfg):
        return "window"
    if isinstance(cfg, SequenceSegmentCfg):
        return "sequence"
    if isinstance(cfg, MultiScaleSegmentCfg):
        return "multi_scale"
    if isinstance(cfg, EntitySegmentCfg):
        return "entity"
    raise TypeError(f"unsupported segment config: {type(cfg)!r}")
segment_plan
segment_plan(cfg: SegmentCfg) -> GraphSegmentPlan

Wrap a segment config with its stable kind label.

Source code in graphids/core/data/preprocessing/segments.py
def segment_plan(cfg: SegmentCfg) -> GraphSegmentPlan:
    """Wrap a segment config with its stable kind label."""
    return GraphSegmentPlan(kind=segment_kind(cfg), cfg=cfg)

temporal

Temporal graph primitives for stream and sequence views.

TemporalGraphSpec dataclass
TemporalGraphSpec(edge_policy: EdgePolicy = temporal_edge_policy(), time_col: str = 'timestamp', feature_cols: tuple[str, ...] = (), target_col: str = 'attack', aux_label_cols: tuple[str, ...] = ('attack_type',), binary_target: bool = True)

How to turn ordered rows into a PyG TemporalData object.

build_temporal_data
build_temporal_data(df: DataFrame, spec: TemporalGraphSpec) -> Any

Build a PyG TemporalData stream from ordered rows.

Source code in graphids/core/data/preprocessing/temporal.py
def build_temporal_data(df: pl.DataFrame, spec: TemporalGraphSpec) -> Any:
    """Build a PyG ``TemporalData`` stream from ordered rows."""
    from torch_geometric.data import TemporalData

    rows = df.sort(spec.time_col).with_columns(
        pl.col(spec.edge_policy.src_col).alias("src"),
        pl.col(spec.edge_policy.dst_col)
        .shift(-spec.edge_policy.dst_shift)
        .alias("dst"),
    )
    rows = rows.filter(pl.col("dst").is_not_null())

    src = _torchize(rows, ["src"], dtype=pl.Int64)
    dst = _torchize(rows, ["dst"], dtype=pl.Int64)
    t = rows.select(spec.time_col).fill_null(0).fill_nan(0).to_torch(dtype=pl.Float32).squeeze(-1)

    kwargs: dict[str, Any] = {"src": src, "dst": dst, "t": t}
    if spec.feature_cols:
        kwargs["msg"] = rows.select(list(spec.feature_cols)).fill_null(0).fill_nan(0).to_torch(dtype=pl.Float32)

    target = rows.select(spec.target_col).fill_null(0).fill_nan(0).to_torch(dtype=pl.Int64).squeeze(-1)
    kwargs["y"] = (target > 0).to(torch.int64) if spec.binary_target else target

    for col in spec.aux_label_cols:
        if col in rows.columns:
            kwargs[col] = rows.select(col).fill_null(0).fill_nan(0).to_torch(dtype=pl.Int64).squeeze(-1)

    return TemporalData(**kwargs)
temporal_len
temporal_len(data: Any) -> int

Return the number of events if the object exposes it.

Source code in graphids/core/data/preprocessing/temporal.py
def temporal_len(data: Any) -> int:
    """Return the number of events if the object exposes it."""
    return int(getattr(data, "num_events", len(data)))

transforms

Reusable graph-transform expressions for dataset schemas.

views

Public view primitives for turning raw CAN/event data into graph samples.

EntityViewCfg dataclass
EntityViewCfg(anchor_column: str = 'node_id', anchor_value: str | int | None = None, history_window_size: int = 100, future_window_size: int = 0)

Bases: _ViewCfg

Entity-centric view centered on one signal or message family.

EventChunkViewCfg dataclass
EventChunkViewCfg(message_count: int | None = 200, duration_ms: float | None = None, overlap: float = 0.0)

Bases: _ViewCfg

Chunk raw events by message count or duration.

MultiScaleViewCfg dataclass
MultiScaleViewCfg(window_sizes: tuple[int, ...] = (50, 100, 200), stride: int = 100)

Bases: _ViewCfg

Parallel snapshot views at multiple window sizes.

RollingStreamViewCfg dataclass
RollingStreamViewCfg(history_messages: int = 500, prediction_horizon: int = 1, update_mode: Literal['append', 'replace'] = 'append')

Bases: _ViewCfg

Online/streaming view with bounded history.

SnapshotSequenceViewCfg dataclass
SnapshotSequenceViewCfg(window_size: int = 100, stride: int = 100, sequence_length: int = 4, sequence_stride: int = 1)

Bases: _ViewCfg

An ordered sequence of snapshot graphs.

SnapshotViewCfg dataclass
SnapshotViewCfg(window_size: int = 100, stride: int = 100)

Bases: _ViewCfg

One fixed graph per sliding window.

view_kind
view_kind(view: ViewCfg) -> ViewKind

Stable human-readable label for config selection and logging.

Source code in graphids/core/data/preprocessing/views.py
def view_kind(view: ViewCfg) -> ViewKind:
    """Stable human-readable label for config selection and logging."""
    if isinstance(view, SnapshotViewCfg):
        return "snapshot"
    if isinstance(view, SnapshotSequenceViewCfg):
        return "snapshot_sequence"
    if isinstance(view, MultiScaleViewCfg):
        return "multi_scale"
    if isinstance(view, EventChunkViewCfg):
        return "event_chunk"
    if isinstance(view, RollingStreamViewCfg):
        return "rolling_stream"
    if isinstance(view, EntityViewCfg):
        return "entity"
    raise TypeError(f"unsupported view config: {type(view)!r}")

vocab

Vocabulary scan, digest, persist, and load primitives.

load_vocab
load_vocab(path: Path) -> tuple[dict[str, int], str]

Return (entries, digest) from a persisted vocab file.

Source code in graphids/core/data/preprocessing/vocab.py
def load_vocab(path: Path) -> tuple[dict[str, int], str]:
    """Return ``(entries, digest)`` from a persisted vocab file."""
    payload = json.loads(path.read_text())
    return payload["entries"], payload["digest"]
persist_vocab
persist_vocab(vocab: dict[Any, int], path: Path) -> str

Atomic write and return the digest.

Source code in graphids/core/data/preprocessing/vocab.py
def persist_vocab(vocab: dict[Any, int], path: Path) -> str:
    """Atomic write and return the digest."""
    digest = vocab_digest(vocab)
    payload = {
        "digest": digest,
        "unk_index": UNK_INDEX,
        "entries": {str(k): v for k, v in vocab.items()},
    }
    atomic_write_text(path, json.dumps(payload, indent=2, sort_keys=True))
    return digest
scan_arb_ids
scan_arb_ids(raw_dir: Path, source_dirs: list[str]) -> list[Any]

Sorted unique arb_id across every CSV under source_dirs.

Source code in graphids/core/data/preprocessing/vocab.py
def scan_arb_ids(raw_dir: Path, source_dirs: list[str]) -> list[Any]:
    """Sorted unique ``arb_id`` across every CSV under ``source_dirs``."""
    if not source_dirs:
        raise ValueError("source_dirs is empty; cannot scan for arb_ids")
    frames: list[pl.LazyFrame] = []
    for sub in source_dirs:
        sub_path = raw_dir / sub
        if not sub_path.is_dir():
            raise FileNotFoundError(f"Source dir missing: {sub_path}")
        for csv_path in sorted(sub_path.rglob("*.csv")):
            lf = pl.scan_csv(csv_path)
            cols = lf.collect_schema().names()
            col = "arbitration_id" if "arbitration_id" in cols else "arb_id"
            if col not in cols:
                raise ValueError(
                    f"{csv_path} has neither arbitration_id nor arb_id; got {cols!r}"
                )
            frames.append(lf.select(pl.col(col).alias("arb_id")))
    if not frames:
        raise ValueError(f"No CSVs under {source_dirs!r} in {raw_dir}")
    return pl.concat(frames).collect()["arb_id"].unique().sort().to_list()
vocab_digest
vocab_digest(vocab: dict[Any, int]) -> str

SHA256 over (id, index) pairs sorted by index.

Source code in graphids/core/data/preprocessing/vocab.py
def vocab_digest(vocab: dict[Any, int]) -> str:
    """SHA256 over ``(id, index)`` pairs sorted by index."""
    canon = json.dumps(
        sorted(((str(k), v) for k, v in vocab.items()), key=lambda kv: kv[1]),
        sort_keys=True,
    )
    return hashlib.sha256(canon.encode()).hexdigest()

state

Process-level dataset cache.

DatasetState dataclass

DatasetState(train: Any, val: Any, test: dict[str, Any])

Ready-to-serve train/val/test splits.

clear_cache

clear_cache() -> None

Drop all cached states. Intended for test teardown.

Source code in graphids/core/data/state.py
def clear_cache() -> None:
    """Drop all cached states. Intended for test teardown."""
    _REGISTRY.clear()

get_or_build

get_or_build(dataset: _CacheableDataset) -> DatasetState

Return cached DatasetState for dataset.

Source code in graphids/core/data/state.py
def get_or_build(dataset: _CacheableDataset) -> DatasetState:
    """Return cached ``DatasetState`` for ``dataset``."""
    key = dataset.cache_key
    state = _REGISTRY.get(key)
    if state is None:
        state = dataset.build()
        _REGISTRY[key] = state
    return state