Core: Data¶
All dataset, preprocessing, representation, and discovery machinery.
Current layout¶
datasets/- raw source adapters and dataset/source builders.discovery/- signal profiles, canonical entities, and provisional hypotheses for cross-vehicle alignment.preprocessing/- explicit representations, views, segments, materialization, PyG packing, temporal streams, scaler config, vocab config, and graph transforms.datamodule/- training-time loaders and batching policy.state.py- process-local dataset state for reuse within one Python process.
What to read next¶
graphids.core.data¶
data ¶
Data-layer public API.
The runtime datamodules are imported lazily so preprocessing and discovery can be used without importing optional training dependencies.
CANBusDataset ¶
CANBusDataset(root: str | Path, raw_dir: str | Path, *, val_fraction: float, split: str = 'train', source_dirs: list[str] | None = None, seed: int = 42, shared_vocab: dict | None = None, shared_vocab_digest: str | None = None, scaler_cfg: ScalerCfg = _DEFAULT_SCALER_CFG, representation_cfg: GraphRepresentationCfg = _DEFAULT_REPRESENTATION_CFG, transform=None, pre_transform=None)
Bases: BaseGraphDataset
One graph is one sliding window of CAN messages.
Source code in graphids/core/data/datasets/_base.py
CANBusSource
dataclass
¶
CANBusSource(name: str, lake_root: str | None = None, val_fraction: float = 0.2, seed: int = 42, scaler_cfg: ScalerCfg = ZBenignScalerCfg(), representation_cfg: GraphRepresentationCfg = SnapshotRepresentationCfg(), vocab_scope: Literal['train', 'all'] = 'train')
Bases: BaseGraphSource
Catalog to train/val/test CANBusDataset builder.
DatasetState
dataclass
¶
Ready-to-serve train/val/test splits.
clear_cache ¶
get_or_build ¶
Return cached DatasetState for dataset.
Source code in graphids/core/data/state.py
datamodule ¶
DataModule primitives for graph and fusion datasets.
GraphDataModule ¶
GraphDataModule(dataset, batch_size: int = 32, num_workers: int | None = None, prefetch_factor: int = 2, dynamic_batching: bool = True, label_filter: str | None = None, difficulty: Callable[..., Tensor] | None = None, scope_label: int = 0, min_steps_per_epoch: int = 1, require_cache: bool = False)
Bases: LightningDataModule
Source code in graphids/core/data/datamodule/graph.py
train_eval_dataloader ¶
TemporalDataModule ¶
Bases: LightningDataModule
Serve temporal event streams with PyG's TemporalDataLoader.
Source code in graphids/core/data/datamodule/temporal.py
fusion ¶
Fusion data module for pre-extracted TensorDict caches.
graph ¶
Lightning data module for graph datasets.
GraphDataModule ¶
GraphDataModule(dataset, batch_size: int = 32, num_workers: int | None = None, prefetch_factor: int = 2, dynamic_batching: bool = True, label_filter: str | None = None, difficulty: Callable[..., Tensor] | None = None, scope_label: int = 0, min_steps_per_epoch: int = 1, require_cache: bool = False)
Bases: LightningDataModule
Source code in graphids/core/data/datamodule/graph.py
train_eval_dataloader ¶
sampler ¶
Offline next-fit decreasing packer for variable-size graphs.
pack_offline ¶
pack_offline(sizes: Tensor, max_num: int, *, edge_sizes: Tensor | None = None, max_edges: int | None = None) -> list[list[int]]
Pack graph indices under node and edge budgets.
The sorted next-fit strategy is intentionally linear after sorting. Exact first-fit gives slightly tighter bins, but it is quadratic on large cached graph datasets and can spend minutes on CPU before the first GPU step.
Source code in graphids/core/data/datamodule/sampler.py
datasets ¶
CANBusDataset ¶
CANBusDataset(root: str | Path, raw_dir: str | Path, *, val_fraction: float, split: str = 'train', source_dirs: list[str] | None = None, seed: int = 42, shared_vocab: dict | None = None, shared_vocab_digest: str | None = None, scaler_cfg: ScalerCfg = _DEFAULT_SCALER_CFG, representation_cfg: GraphRepresentationCfg = _DEFAULT_REPRESENTATION_CFG, transform=None, pre_transform=None)
Bases: BaseGraphDataset
One graph is one sliding window of CAN messages.
Source code in graphids/core/data/datasets/_base.py
CANBusSource
dataclass
¶
CANBusSource(name: str, lake_root: str | None = None, val_fraction: float = 0.2, seed: int = 42, scaler_cfg: ScalerCfg = ZBenignScalerCfg(), representation_cfg: GraphRepresentationCfg = SnapshotRepresentationCfg(), vocab_scope: Literal['train', 'all'] = 'train')
Bases: BaseGraphSource
Catalog to train/val/test CANBusDataset builder.
TemporalCANBusSource
dataclass
¶
TemporalCANBusSource(name: str, seed: int, lake_root: str | None = None, val_fraction: float = 0.2, vocab_scope: Literal['train', 'all'] = 'train', representation_cfg: TemporalRepresentationCfg = TemporalRepresentationCfg())
Catalog to build temporal CAN event streams.
can ¶
CAN-specific dataset adapters and source primitives.
CANBusDataset ¶
CANBusDataset(root: str | Path, raw_dir: str | Path, *, val_fraction: float, split: str = 'train', source_dirs: list[str] | None = None, seed: int = 42, shared_vocab: dict | None = None, shared_vocab_digest: str | None = None, scaler_cfg: ScalerCfg = _DEFAULT_SCALER_CFG, representation_cfg: GraphRepresentationCfg = _DEFAULT_REPRESENTATION_CFG, transform=None, pre_transform=None)
Bases: BaseGraphDataset
One graph is one sliding window of CAN messages.
Source code in graphids/core/data/datasets/_base.py
CANBusSource
dataclass
¶
CANBusSource(name: str, lake_root: str | None = None, val_fraction: float = 0.2, seed: int = 42, scaler_cfg: ScalerCfg = ZBenignScalerCfg(), representation_cfg: GraphRepresentationCfg = SnapshotRepresentationCfg(), vocab_scope: Literal['train', 'all'] = 'train')
Bases: BaseGraphSource
Catalog to train/val/test CANBusDataset builder.
TemporalCANBusSource
dataclass
¶
TemporalCANBusSource(name: str, seed: int, lake_root: str | None = None, val_fraction: float = 0.2, vocab_scope: Literal['train', 'all'] = 'train', representation_cfg: TemporalRepresentationCfg = TemporalRepresentationCfg())
Catalog to build temporal CAN event streams.
infer_attack_type ¶
Infer the attack code from filename/path substrings.
Source code in graphids/core/data/datasets/can_bus.py
load_can_rows ¶
Load, normalize, and parse raw CAN CSVs from source dirs.
Source code in graphids/core/data/datasets/can_bus.py
parse_payload ¶
Hex payload to byte_0..7 plus Shannon entropy.
Source code in graphids/core/data/datasets/can_bus.py
can_bus ¶
CAN bus dataset adapter and schema.
CANBusDataset ¶
CANBusDataset(root: str | Path, raw_dir: str | Path, *, val_fraction: float, split: str = 'train', source_dirs: list[str] | None = None, seed: int = 42, shared_vocab: dict | None = None, shared_vocab_digest: str | None = None, scaler_cfg: ScalerCfg = _DEFAULT_SCALER_CFG, representation_cfg: GraphRepresentationCfg = _DEFAULT_REPRESENTATION_CFG, transform=None, pre_transform=None)
Bases: BaseGraphDataset
One graph is one sliding window of CAN messages.
Source code in graphids/core/data/datasets/_base.py
CANBusSource
dataclass
¶
CANBusSource(name: str, lake_root: str | None = None, val_fraction: float = 0.2, seed: int = 42, scaler_cfg: ScalerCfg = ZBenignScalerCfg(), representation_cfg: GraphRepresentationCfg = SnapshotRepresentationCfg(), vocab_scope: Literal['train', 'all'] = 'train')
Bases: BaseGraphSource
Catalog to train/val/test CANBusDataset builder.
TemporalCANBusSource
dataclass
¶
TemporalCANBusSource(name: str, seed: int, lake_root: str | None = None, val_fraction: float = 0.2, vocab_scope: Literal['train', 'all'] = 'train', representation_cfg: TemporalRepresentationCfg = TemporalRepresentationCfg())
Catalog to build temporal CAN event streams.
infer_attack_type ¶
Infer the attack code from filename/path substrings.
Source code in graphids/core/data/datasets/can_bus.py
load_can_rows ¶
Load, normalize, and parse raw CAN CSVs from source dirs.
Source code in graphids/core/data/datasets/can_bus.py
parse_payload ¶
Hex payload to byte_0..7 plus Shannon entropy.
Source code in graphids/core/data/datasets/can_bus.py
discovery ¶
Signal-discovery primitives for cross-vehicle ontology building.
CanonicalEntitySpec
dataclass
¶
CanonicalEntitySpec(canonical_id: str, name: str, aliases: tuple[str, ...] = (), vehicle_aliases: dict[str, tuple[str, ...]] = dict(), kind: Literal['signal', 'message', 'state', 'entity'] = 'signal', description: str | None = None)
One shared semantic entity across vehicles.
CanonicalFeatureFrameSpec
dataclass
¶
CanonicalFeatureFrameSpec(time_col: str = 'timestamp', vehicle_col: str | None = 'vehicle_id', alias_col: str = 'signal', value_col: str = 'value', keep_cols: tuple[str, ...] = (), feature_cols: tuple[str, ...] = (), unmapped: Literal['raise', 'drop', 'keep'] = 'raise')
How to flatten decoded vehicle rows into canonical feature records.
CanonicalRegistry
dataclass
¶
Lookup table for canonical entities and vehicle-specific aliases.
lookup_frame ¶
Return a canonical lookup frame for vectorized joins.
Source code in graphids/core/data/discovery/canonical.py
lookup_table ¶
Return alias -> entity mapping using vehicle::alias keys.
Source code in graphids/core/data/discovery/canonical.py
resolve ¶
Resolve an alias to a canonical entity.
Source code in graphids/core/data/discovery/canonical.py
DataLayerLayout
dataclass
¶
A single place to point at the three persistent data layers.
DiscoveryStore
dataclass
¶
DiscoveryStore(root: Path, profiles_name: str = 'signal_profiles.parquet', hypotheses_name: str = 'canonical_hypotheses.parquet', manifest_name: str = 'discovery_manifest.json')
HypothesisRecordSpec
dataclass
¶
HypothesisRecordSpec(vehicle_id: str, raw_signal: str, candidate_canonical_id: str | None = None, confidence: float = 0.0, status: Literal['unreviewed', 'provisional', 'confirmed', 'rejected'] = 'unreviewed', evidence: tuple[str, ...] = (), profile_path: Path | None = None, feature_digest: str | None = None)
Provisional semantic mapping for a raw signal.
MaterializedViewSpec
dataclass
¶
MaterializedViewSpec(root: Path, name: str = 'materialized_views', view_kind: ViewKind = 'snapshot', partition_cols: tuple[str, ...] = ('vehicle_id', 'view_kind', 'split'), format: Literal['parquet', 'sqlite', 'duckdb'] = 'parquet', key_cols: tuple[str, ...] = ('vehicle_id', 'canonical_id', 'timestamp'), feature_cols: tuple[str, ...] = (), label_cols: tuple[str, ...] = ('attack', 'attack_type'))
Training-facing view materialization contract.
RawEventTableSpec
dataclass
¶
RawEventTableSpec(root: Path, name: str = 'raw_can_events', partition_cols: tuple[str, ...] = ('vehicle_id', 'day'), format: Literal['parquet', 'sqlite', 'duckdb'] = 'parquet', primary_time_col: str = 'timestamp', raw_id_col: str = 'arb_id', vehicle_col: str = 'vehicle_id', attack_col: str = 'attack', signal_hint_col: str = 'signal_hint')
Canonical storage for immutable decoded CAN rows.
SignalHypothesisSpec
dataclass
¶
SignalHypothesisSpec(vehicle_id: str, raw_signal: str, candidate_canonical_id: str | None = None, confidence: float = 0.0, status: Literal['unreviewed', 'provisional', 'confirmed', 'rejected'] = 'unreviewed', evidence: tuple[str, ...] = ())
A provisional cross-vehicle mapping for a raw signal.
SignalProfileSpec
dataclass
¶
SignalProfileSpec(vehicle_col: str = 'vehicle_id', signal_col: str = 'arb_id', time_col: str = 'timestamp', entropy_col: str = 'entropy', attack_col: str = 'attack', byte_prefix: str = 'byte_', include_attack: bool = True)
How to aggregate raw CAN rows into per-signal profiles.
build_canonical_feature_frame ¶
build_canonical_feature_frame(df: DataFrame, registry: CanonicalRegistry, *, spec: CanonicalFeatureFrameSpec | None = None) -> pl.DataFrame
Normalize decoded rows into a long canonical feature table.
Source code in graphids/core/data/discovery/canonical.py
123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 | |
build_signal_profiles ¶
Aggregate raw CAN rows into one profile per vehicle/signal pair.
Source code in graphids/core/data/discovery/hypotheses.py
initialize_hypotheses ¶
Create an empty hypothesis table aligned to a profile table.
Source code in graphids/core/data/discovery/hypotheses.py
rank_signal_hypotheses ¶
Join profile scores to provisional hypotheses when available.
Source code in graphids/core/data/discovery/ranking.py
rank_signal_profiles ¶
Score per-signal profile rows by cross-vehicle support and stability.
The goal is not to claim a final ontology match; it is to give the discovery layer a concrete relational ranking pass that can surface stable signals for review.
Source code in graphids/core/data/discovery/ranking.py
canonical ¶
Canonical entity and feature-table primitives for cross-vehicle views.
CanonicalEntitySpec
dataclass
¶
CanonicalEntitySpec(canonical_id: str, name: str, aliases: tuple[str, ...] = (), vehicle_aliases: dict[str, tuple[str, ...]] = dict(), kind: Literal['signal', 'message', 'state', 'entity'] = 'signal', description: str | None = None)
One shared semantic entity across vehicles.
CanonicalFeatureFrameSpec
dataclass
¶
CanonicalFeatureFrameSpec(time_col: str = 'timestamp', vehicle_col: str | None = 'vehicle_id', alias_col: str = 'signal', value_col: str = 'value', keep_cols: tuple[str, ...] = (), feature_cols: tuple[str, ...] = (), unmapped: Literal['raise', 'drop', 'keep'] = 'raise')
How to flatten decoded vehicle rows into canonical feature records.
CanonicalRegistry
dataclass
¶
Lookup table for canonical entities and vehicle-specific aliases.
lookup_frame ¶
Return a canonical lookup frame for vectorized joins.
Source code in graphids/core/data/discovery/canonical.py
lookup_table ¶
Return alias -> entity mapping using vehicle::alias keys.
Source code in graphids/core/data/discovery/canonical.py
resolve ¶
Resolve an alias to a canonical entity.
Source code in graphids/core/data/discovery/canonical.py
build_canonical_feature_frame ¶
build_canonical_feature_frame(df: DataFrame, registry: CanonicalRegistry, *, spec: CanonicalFeatureFrameSpec | None = None) -> pl.DataFrame
Normalize decoded rows into a long canonical feature table.
Source code in graphids/core/data/discovery/canonical.py
123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 | |
hypotheses ¶
Signal profile and hypothesis-store primitives for cross-vehicle discovery.
DiscoveryStore
dataclass
¶
DiscoveryStore(root: Path, profiles_name: str = 'signal_profiles.parquet', hypotheses_name: str = 'canonical_hypotheses.parquet', manifest_name: str = 'discovery_manifest.json')
SignalHypothesisSpec
dataclass
¶
SignalHypothesisSpec(vehicle_id: str, raw_signal: str, candidate_canonical_id: str | None = None, confidence: float = 0.0, status: Literal['unreviewed', 'provisional', 'confirmed', 'rejected'] = 'unreviewed', evidence: tuple[str, ...] = ())
A provisional cross-vehicle mapping for a raw signal.
SignalProfileSpec
dataclass
¶
SignalProfileSpec(vehicle_col: str = 'vehicle_id', signal_col: str = 'arb_id', time_col: str = 'timestamp', entropy_col: str = 'entropy', attack_col: str = 'attack', byte_prefix: str = 'byte_', include_attack: bool = True)
How to aggregate raw CAN rows into per-signal profiles.
build_signal_profiles ¶
Aggregate raw CAN rows into one profile per vehicle/signal pair.
Source code in graphids/core/data/discovery/hypotheses.py
initialize_hypotheses ¶
Create an empty hypothesis table aligned to a profile table.
Source code in graphids/core/data/discovery/hypotheses.py
layout ¶
Storage-layout primitives for raw events, views, and hypotheses.
DataLayerLayout
dataclass
¶
A single place to point at the three persistent data layers.
HypothesisRecordSpec
dataclass
¶
HypothesisRecordSpec(vehicle_id: str, raw_signal: str, candidate_canonical_id: str | None = None, confidence: float = 0.0, status: Literal['unreviewed', 'provisional', 'confirmed', 'rejected'] = 'unreviewed', evidence: tuple[str, ...] = (), profile_path: Path | None = None, feature_digest: str | None = None)
Provisional semantic mapping for a raw signal.
MaterializedViewSpec
dataclass
¶
MaterializedViewSpec(root: Path, name: str = 'materialized_views', view_kind: ViewKind = 'snapshot', partition_cols: tuple[str, ...] = ('vehicle_id', 'view_kind', 'split'), format: Literal['parquet', 'sqlite', 'duckdb'] = 'parquet', key_cols: tuple[str, ...] = ('vehicle_id', 'canonical_id', 'timestamp'), feature_cols: tuple[str, ...] = (), label_cols: tuple[str, ...] = ('attack', 'attack_type'))
Training-facing view materialization contract.
RawEventTableSpec
dataclass
¶
RawEventTableSpec(root: Path, name: str = 'raw_can_events', partition_cols: tuple[str, ...] = ('vehicle_id', 'day'), format: Literal['parquet', 'sqlite', 'duckdb'] = 'parquet', primary_time_col: str = 'timestamp', raw_id_col: str = 'arb_id', vehicle_col: str = 'vehicle_id', attack_col: str = 'attack', signal_hint_col: str = 'signal_hint')
Canonical storage for immutable decoded CAN rows.
ranking ¶
Ranking helpers for cross-vehicle signal discovery.
rank_signal_hypotheses ¶
Join profile scores to provisional hypotheses when available.
Source code in graphids/core/data/discovery/ranking.py
rank_signal_profiles ¶
Score per-signal profile rows by cross-vehicle support and stability.
The goal is not to claim a final ontology match; it is to give the discovery layer a concrete relational ranking pass that can surface stable signals for review.
Source code in graphids/core/data/discovery/ranking.py
extract ¶
Extract and cache fusion features as a TensorDict.
Each upstream model implements extract_features(batch, device) -> dict[str, Tensor]
returning per-graph named feature tensors. This module collects those dicts
under the model name (vgae, gat, ...), stacks across batches, and saves
the resulting nested TensorDict to disk. No flat state vector, no offsets —
the fusion side reads keys directly.
Invoked by the experiment extraction pipeline. Idempotent on output_dir.
extract_states ¶
extract_states(*, checkpoints: dict[str, str], dataset: str, output_dir: str, max_samples: int = 150000, max_val_samples: int = 30000, batch_size: int = 256, seed: int = 42, val_fraction: float = 0.2, representation_cfg: GraphRepresentationCfg) -> None
Load model checkpoints, extract and cache fusion features.
Idempotent per-file: each split's cache is checked independently so re-running after adding test splits only extracts the missing files.
Source code in graphids/core/data/extract.py
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 | |
preprocessing ¶
Preprocessing primitives for cache build.
CanonicalEntitySpec
dataclass
¶
CanonicalEntitySpec(canonical_id: str, name: str, aliases: tuple[str, ...] = (), vehicle_aliases: dict[str, tuple[str, ...]] = dict(), kind: Literal['signal', 'message', 'state', 'entity'] = 'signal', description: str | None = None)
One shared semantic entity across vehicles.
CanonicalFeatureFrameSpec
dataclass
¶
CanonicalFeatureFrameSpec(time_col: str = 'timestamp', vehicle_col: str | None = 'vehicle_id', alias_col: str = 'signal', value_col: str = 'value', keep_cols: tuple[str, ...] = (), feature_cols: tuple[str, ...] = (), unmapped: Literal['raise', 'drop', 'keep'] = 'raise')
How to flatten decoded vehicle rows into canonical feature records.
CanonicalRegistry
dataclass
¶
Lookup table for canonical entities and vehicle-specific aliases.
lookup_frame ¶
Return a canonical lookup frame for vectorized joins.
Source code in graphids/core/data/discovery/canonical.py
lookup_table ¶
Return alias -> entity mapping using vehicle::alias keys.
Source code in graphids/core/data/discovery/canonical.py
resolve ¶
Resolve an alias to a canonical entity.
Source code in graphids/core/data/discovery/canonical.py
DiscoveryStore
dataclass
¶
DiscoveryStore(root: Path, profiles_name: str = 'signal_profiles.parquet', hypotheses_name: str = 'canonical_hypotheses.parquet', manifest_name: str = 'discovery_manifest.json')
EdgePolicy
dataclass
¶
EdgePolicy(name: str, src_col: str = 'node_id', dst_col: str = 'node_id', dst_shift: int = -1, src_alias: str = 'src', dst_alias: str = 'dst')
How to derive directed edges from windowed rows.
EntityRepresentationCfg
dataclass
¶
EntityRepresentationCfg(kind: Literal['entity'] = 'entity', anchor_column: str = 'node_id', anchor_value: str | int | None = None, history_window_size: int = 100, future_window_size: int = 0)
Entity-centric representation centered on one signal or message family.
EntitySegmentCfg
dataclass
¶
EntitySegmentCfg(anchor_column: str = 'node_id', anchor_value: str | int | None = None, history_window_size: int = 100, future_window_size: int = 0)
A segment centered on one arbitration ID or message family.
EntityViewCfg
dataclass
¶
EntityViewCfg(anchor_column: str = 'node_id', anchor_value: str | int | None = None, history_window_size: int = 100, future_window_size: int = 0)
Bases: _ViewCfg
Entity-centric view centered on one signal or message family.
EventChunkViewCfg
dataclass
¶
EventChunkViewCfg(message_count: int | None = 200, duration_ms: float | None = None, overlap: float = 0.0)
Bases: _ViewCfg
Chunk raw events by message count or duration.
GraphRepresentationPlan
dataclass
¶
GraphRepresentationPlan(kind: Literal['snapshot', 'snapshot_sequence', 'multi_scale', 'temporal', 'entity'], cfg: GraphRepresentationCfg)
Resolved representation kind and config payload.
GraphSegmentPlan
dataclass
¶
Resolved sample-shape plan for a dataset view.
GraphTransform
dataclass
¶
GraphTransform(name: str, requires: tuple[str, ...], produces: tuple[str, ...], fn: Callable[[DataFrame, DataFrame], tuple[DataFrame, DataFrame]])
A declarative graph transform with explicit input/output columns.
MultiScaleRepresentationCfg
dataclass
¶
MultiScaleRepresentationCfg(kind: Literal['multi_scale'] = 'multi_scale', window_sizes: tuple[int, ...] = (50, 100, 200), stride: int = 100)
Parallel snapshots at multiple window sizes.
MultiScaleSegmentCfg
dataclass
¶
Parallel windows at multiple temporal scales.
MultiScaleViewCfg
dataclass
¶
Bases: _ViewCfg
Parallel snapshot views at multiple window sizes.
RollingStreamViewCfg
dataclass
¶
RollingStreamViewCfg(history_messages: int = 500, prediction_horizon: int = 1, update_mode: Literal['append', 'replace'] = 'append')
Bases: _ViewCfg
Online/streaming view with bounded history.
Segmenter ¶
Bases: Protocol
Primitive that turns raw rows into a shaped sample view.
SequenceSegmentCfg
dataclass
¶
SequenceSegmentCfg(window_size: int, stride: int, sequence_length: int = 4, sequence_stride: int = 1)
A sequence of ordered windows from the same raw stream.
SignalHypothesisSpec
dataclass
¶
SignalHypothesisSpec(vehicle_id: str, raw_signal: str, candidate_canonical_id: str | None = None, confidence: float = 0.0, status: Literal['unreviewed', 'provisional', 'confirmed', 'rejected'] = 'unreviewed', evidence: tuple[str, ...] = ())
A provisional cross-vehicle mapping for a raw signal.
SignalProfileSpec
dataclass
¶
SignalProfileSpec(vehicle_col: str = 'vehicle_id', signal_col: str = 'arb_id', time_col: str = 'timestamp', entropy_col: str = 'entropy', attack_col: str = 'attack', byte_prefix: str = 'byte_', include_attack: bool = True)
How to aggregate raw CAN rows into per-signal profiles.
SnapshotRepresentationCfg
dataclass
¶
SnapshotRepresentationCfg(kind: Literal['snapshot'] = 'snapshot', window_size: int = 100, stride: int = 100)
One graph per sliding window.
SnapshotSequenceRepresentationCfg
dataclass
¶
SnapshotSequenceRepresentationCfg(kind: Literal['snapshot_sequence'] = 'snapshot_sequence', window_size: int = 100, stride: int = 100, sequence_length: int = 4, sequence_stride: int = 1)
Ordered sequence of snapshot graphs.
SnapshotSequenceViewCfg
dataclass
¶
SnapshotSequenceViewCfg(window_size: int = 100, stride: int = 100, sequence_length: int = 4, sequence_stride: int = 1)
Bases: _ViewCfg
An ordered sequence of snapshot graphs.
SnapshotViewCfg
dataclass
¶
Bases: _ViewCfg
One fixed graph per sliding window.
TemporalGraphSpec
dataclass
¶
TemporalGraphSpec(edge_policy: EdgePolicy = temporal_edge_policy(), time_col: str = 'timestamp', feature_cols: tuple[str, ...] = (), target_col: str = 'attack', aux_label_cols: tuple[str, ...] = ('attack_type',), binary_target: bool = True)
How to turn ordered rows into a PyG TemporalData object.
TemporalRepresentationCfg
dataclass
¶
TemporalRepresentationCfg(kind: Literal['temporal'] = 'temporal', time_col: str = 'timestamp', binary_target: bool = True, history_messages: int | None = None)
Event stream representation built as PyG TemporalData.
WindowSegmentCfg
dataclass
¶
One fixed window of rows.
WindowSegmenter
dataclass
¶
Default snapshot segmenter: one fixed sliding window per graph.
WindowedRows
dataclass
¶
Rows plus derived window metadata for snapshot-style segments.
build_canonical_feature_frame ¶
build_canonical_feature_frame(df: DataFrame, registry: CanonicalRegistry, *, spec: CanonicalFeatureFrameSpec | None = None) -> pl.DataFrame
Normalize decoded rows into a long canonical feature table.
Source code in graphids/core/data/discovery/canonical.py
123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 | |
build_graph_tables ¶
build_graph_tables(df: DataFrame, *, node_stat_exprs: list[Expr], label_exprs: list[Expr], edge_policy: EdgePolicy | None = None, edge_stat_exprs: list[Expr], edge_base_cols: list[str], graph_transforms: list[GraphTransform] | None = None, debug_artifacts_dir: str | Path | None = None, segment_cfg: WindowSegmentCfg | SequenceSegmentCfg | MultiScaleSegmentCfg | EntitySegmentCfg | None = None) -> GraphTables
Compose the graph preprocessing primitives into staged graph tables.
Source code in graphids/core/data/preprocessing/materialization.py
452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 | |
build_signal_profiles ¶
Aggregate raw CAN rows into one profile per vehicle/signal pair.
Source code in graphids/core/data/discovery/hypotheses.py
build_temporal_data ¶
Build a PyG TemporalData stream from ordered rows.
Source code in graphids/core/data/preprocessing/temporal.py
default_graph_transforms ¶
Default graph transforms used in cache builds.
Source code in graphids/core/data/preprocessing/graph_ops.py
graph_tables_to_pyg ¶
graph_tables_to_pyg(tables: GraphTables, *, node_col_order: list[str], edge_col_order: tuple[str, ...], label_exprs: list[Expr]) -> tuple[Data, dict, int, int]
Compose staged tables into pre-collated PyG tensors.
Source code in graphids/core/data/preprocessing/pyg.py
initialize_hypotheses ¶
Create an empty hypothesis table aligned to a profile table.
Source code in graphids/core/data/discovery/hypotheses.py
rank_signal_hypotheses ¶
Join profile scores to provisional hypotheses when available.
Source code in graphids/core/data/discovery/ranking.py
rank_signal_profiles ¶
Score per-signal profile rows by cross-vehicle support and stability.
The goal is not to claim a final ontology match; it is to give the discovery layer a concrete relational ranking pass that can surface stable signals for review.
Source code in graphids/core/data/discovery/ranking.py
representation_digest ¶
Short stable digest for paths and cache keys.
Source code in graphids/core/data/preprocessing/representations.py
representation_kind ¶
Stable label for logging and config routing.
Source code in graphids/core/data/preprocessing/representations.py
representation_payload ¶
Stable JSON-serializable payload for cache identity and metadata.
representation_plan ¶
Wrap a representation config with its stable kind label.
representation_segment ¶
representation_segment(cfg: GraphRepresentationCfg) -> WindowSegmentCfg | SequenceSegmentCfg | MultiScaleSegmentCfg | EntitySegmentCfg
Map a representation config to the corresponding segment primitive.
Source code in graphids/core/data/preprocessing/representations.py
representation_temporal_spec ¶
Map a representation config to the temporal-stream spec.
Source code in graphids/core/data/preprocessing/representations.py
representation_view ¶
Map a representation config to the corresponding public view config.
Source code in graphids/core/data/preprocessing/representations.py
representation_window_defaults ¶
Derive legacy window knobs from the explicit representation config.
Source code in graphids/core/data/preprocessing/representations.py
secondary_graph_transforms ¶
Additional exploratory graph transforms used in feature tests.
Source code in graphids/core/data/preprocessing/graph_ops.py
segment_kind ¶
Stable human-readable label for logging and config routing.
Source code in graphids/core/data/preprocessing/segments.py
segment_plan ¶
temporal_edge_policy ¶
temporal_edge_policy(*, src_col: str = 'node_id', dst_col: str = 'node_id', dst_shift: int = -1) -> EdgePolicy
Temporal adjacency policy: edge from row t to row t + dst_shift.
Source code in graphids/core/data/preprocessing/edge_policy.py
temporal_len ¶
view_kind ¶
Stable human-readable label for config selection and logging.
Source code in graphids/core/data/preprocessing/views.py
curriculum ¶
Curriculum difficulty scorers used by the graph datamodule.
score_random ¶
Uniform random per-graph difficulty for curriculum control runs.
score_vgae ¶
Per-graph reconstruction MSE from a trained VGAE checkpoint.
Higher = harder. Loads the VGAE on CPU, computes per-graph mean MSE
via torch_geometric.utils.scatter, releases the model.
Source code in graphids/core/data/preprocessing/curriculum.py
edge_policy ¶
Declarative edge construction policies for graph preprocessing.
EdgePolicy
dataclass
¶
EdgePolicy(name: str, src_col: str = 'node_id', dst_col: str = 'node_id', dst_shift: int = -1, src_alias: str = 'src', dst_alias: str = 'dst')
How to derive directed edges from windowed rows.
temporal_edge_policy ¶
temporal_edge_policy(*, src_col: str = 'node_id', dst_col: str = 'node_id', dst_shift: int = -1) -> EdgePolicy
Temporal adjacency policy: edge from row t to row t + dst_shift.
Source code in graphids/core/data/preprocessing/edge_policy.py
graph_ops ¶
Composable graph transforms over node/edge preprocessing tables.
GraphTransform
dataclass
¶
GraphTransform(name: str, requires: tuple[str, ...], produces: tuple[str, ...], fn: Callable[[DataFrame, DataFrame], tuple[DataFrame, DataFrame]])
A declarative graph transform with explicit input/output columns.
default_graph_transforms ¶
Default graph transforms used in cache builds.
Source code in graphids/core/data/preprocessing/graph_ops.py
secondary_graph_transforms ¶
Additional exploratory graph transforms used in feature tests.
Source code in graphids/core/data/preprocessing/graph_ops.py
materialization ¶
Graph table materialization from windowed preprocessing primitives.
build_graph_tables ¶
build_graph_tables(df: DataFrame, *, node_stat_exprs: list[Expr], label_exprs: list[Expr], edge_policy: EdgePolicy | None = None, edge_stat_exprs: list[Expr], edge_base_cols: list[str], graph_transforms: list[GraphTransform] | None = None, debug_artifacts_dir: str | Path | None = None, segment_cfg: WindowSegmentCfg | SequenceSegmentCfg | MultiScaleSegmentCfg | EntitySegmentCfg | None = None) -> GraphTables
Compose the graph preprocessing primitives into staged graph tables.
Source code in graphids/core/data/preprocessing/materialization.py
452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 | |
metadata ¶
Cache metadata contract for dataset builds.
load_metadata ¶
Read and version-gate cache_metadata.json.
Source code in graphids/core/data/preprocessing/metadata.py
merge_split_into_metadata ¶
merge_split_into_metadata(cache_dir: Path, split_name: str, split_entry: dict[str, Any], *, invariants: dict[str, Any], dataset_name: str, num_arb_ids: int) -> dict[str, Any]
Merge one split's entry into cache_metadata.json under FileLock.
First writer seeds top-level fields; later writers must match invariants + dataset name or raise.
Source code in graphids/core/data/preprocessing/metadata.py
pipeline ¶
Thin composer over graph preprocessing primitives.
GraphPipeline
dataclass
¶
GraphPipeline(node_stat_exprs: list[Expr], edge_stat_exprs: list[Expr], node_col_order: list[str], edge_col_order: tuple[str, ...], label_exprs: list[Expr], edge_base_cols: list[str], edge_policy: EdgePolicy | None = None, graph_transforms: list[GraphTransform] | None = None, debug_artifacts_dir: str | None = None, representation_cfg: GraphRepresentationCfg = SnapshotRepresentationCfg(), segment_cfg: WindowSegmentCfg | SequenceSegmentCfg | MultiScaleSegmentCfg | EntitySegmentCfg | None = None)
Pure config carrier for graph preprocessing primitives.
pyg ¶
PyG tensor packing primitives for staged graph tables.
graph_tables_to_pyg ¶
graph_tables_to_pyg(tables: GraphTables, *, node_col_order: list[str], edge_col_order: tuple[str, ...], label_exprs: list[Expr]) -> tuple[Data, dict, int, int]
Compose staged tables into pre-collated PyG tensors.
Source code in graphids/core/data/preprocessing/pyg.py
representations ¶
Explicit graph-representation configs for training and discovery.
EntityRepresentationCfg
dataclass
¶
EntityRepresentationCfg(kind: Literal['entity'] = 'entity', anchor_column: str = 'node_id', anchor_value: str | int | None = None, history_window_size: int = 100, future_window_size: int = 0)
Entity-centric representation centered on one signal or message family.
GraphRepresentationPlan
dataclass
¶
GraphRepresentationPlan(kind: Literal['snapshot', 'snapshot_sequence', 'multi_scale', 'temporal', 'entity'], cfg: GraphRepresentationCfg)
Resolved representation kind and config payload.
MultiScaleRepresentationCfg
dataclass
¶
MultiScaleRepresentationCfg(kind: Literal['multi_scale'] = 'multi_scale', window_sizes: tuple[int, ...] = (50, 100, 200), stride: int = 100)
Parallel snapshots at multiple window sizes.
SnapshotRepresentationCfg
dataclass
¶
SnapshotRepresentationCfg(kind: Literal['snapshot'] = 'snapshot', window_size: int = 100, stride: int = 100)
One graph per sliding window.
SnapshotSequenceRepresentationCfg
dataclass
¶
SnapshotSequenceRepresentationCfg(kind: Literal['snapshot_sequence'] = 'snapshot_sequence', window_size: int = 100, stride: int = 100, sequence_length: int = 4, sequence_stride: int = 1)
Ordered sequence of snapshot graphs.
TemporalRepresentationCfg
dataclass
¶
TemporalRepresentationCfg(kind: Literal['temporal'] = 'temporal', time_col: str = 'timestamp', binary_target: bool = True, history_messages: int | None = None)
Event stream representation built as PyG TemporalData.
representation_digest ¶
Short stable digest for paths and cache keys.
Source code in graphids/core/data/preprocessing/representations.py
representation_kind ¶
Stable label for logging and config routing.
Source code in graphids/core/data/preprocessing/representations.py
representation_payload ¶
Stable JSON-serializable payload for cache identity and metadata.
representation_plan ¶
Wrap a representation config with its stable kind label.
representation_segment ¶
representation_segment(cfg: GraphRepresentationCfg) -> WindowSegmentCfg | SequenceSegmentCfg | MultiScaleSegmentCfg | EntitySegmentCfg
Map a representation config to the corresponding segment primitive.
Source code in graphids/core/data/preprocessing/representations.py
representation_temporal_spec ¶
Map a representation config to the temporal-stream spec.
Source code in graphids/core/data/preprocessing/representations.py
representation_view ¶
Map a representation config to the corresponding public view config.
Source code in graphids/core/data/preprocessing/representations.py
representation_window_defaults ¶
Derive legacy window knobs from the explicit representation config.
Source code in graphids/core/data/preprocessing/representations.py
scaler ¶
Per-column feature scalers for tensor-based graph preprocessing.
segments ¶
Public segment primitives for alternate dataset views.
EntitySegmentCfg
dataclass
¶
EntitySegmentCfg(anchor_column: str = 'node_id', anchor_value: str | int | None = None, history_window_size: int = 100, future_window_size: int = 0)
A segment centered on one arbitration ID or message family.
GraphSegmentPlan
dataclass
¶
Resolved sample-shape plan for a dataset view.
MultiScaleSegmentCfg
dataclass
¶
Parallel windows at multiple temporal scales.
Segmenter ¶
Bases: Protocol
Primitive that turns raw rows into a shaped sample view.
SequenceSegmentCfg
dataclass
¶
SequenceSegmentCfg(window_size: int, stride: int, sequence_length: int = 4, sequence_stride: int = 1)
A sequence of ordered windows from the same raw stream.
WindowSegmentCfg
dataclass
¶
One fixed window of rows.
WindowSegmenter
dataclass
¶
Default snapshot segmenter: one fixed sliding window per graph.
WindowedRows
dataclass
¶
Rows plus derived window metadata for snapshot-style segments.
segment_kind ¶
Stable human-readable label for logging and config routing.
Source code in graphids/core/data/preprocessing/segments.py
segment_plan ¶
temporal ¶
Temporal graph primitives for stream and sequence views.
TemporalGraphSpec
dataclass
¶
TemporalGraphSpec(edge_policy: EdgePolicy = temporal_edge_policy(), time_col: str = 'timestamp', feature_cols: tuple[str, ...] = (), target_col: str = 'attack', aux_label_cols: tuple[str, ...] = ('attack_type',), binary_target: bool = True)
How to turn ordered rows into a PyG TemporalData object.
build_temporal_data ¶
Build a PyG TemporalData stream from ordered rows.
Source code in graphids/core/data/preprocessing/temporal.py
temporal_len ¶
transforms ¶
Reusable graph-transform expressions for dataset schemas.
views ¶
Public view primitives for turning raw CAN/event data into graph samples.
EntityViewCfg
dataclass
¶
EntityViewCfg(anchor_column: str = 'node_id', anchor_value: str | int | None = None, history_window_size: int = 100, future_window_size: int = 0)
Bases: _ViewCfg
Entity-centric view centered on one signal or message family.
EventChunkViewCfg
dataclass
¶
EventChunkViewCfg(message_count: int | None = 200, duration_ms: float | None = None, overlap: float = 0.0)
Bases: _ViewCfg
Chunk raw events by message count or duration.
MultiScaleViewCfg
dataclass
¶
Bases: _ViewCfg
Parallel snapshot views at multiple window sizes.
RollingStreamViewCfg
dataclass
¶
RollingStreamViewCfg(history_messages: int = 500, prediction_horizon: int = 1, update_mode: Literal['append', 'replace'] = 'append')
Bases: _ViewCfg
Online/streaming view with bounded history.
SnapshotSequenceViewCfg
dataclass
¶
SnapshotSequenceViewCfg(window_size: int = 100, stride: int = 100, sequence_length: int = 4, sequence_stride: int = 1)
Bases: _ViewCfg
An ordered sequence of snapshot graphs.
SnapshotViewCfg
dataclass
¶
Bases: _ViewCfg
One fixed graph per sliding window.
view_kind ¶
Stable human-readable label for config selection and logging.
Source code in graphids/core/data/preprocessing/views.py
vocab ¶
Vocabulary scan, digest, persist, and load primitives.
load_vocab ¶
Return (entries, digest) from a persisted vocab file.
persist_vocab ¶
Atomic write and return the digest.
Source code in graphids/core/data/preprocessing/vocab.py
scan_arb_ids ¶
Sorted unique arb_id across every CSV under source_dirs.
Source code in graphids/core/data/preprocessing/vocab.py
vocab_digest ¶
SHA256 over (id, index) pairs sorted by index.