Skip to content

Pipeline Data Flow

Phase 1: Preprocessing (runs once per dataset, CPU only)

Raw CSVs (NFS)                        Cache artifacts (NFS -> scratch -> TMPDIR)

data/automotive/<dataset>/
+-- normal_traffic.csv
+-- dos_attack.csv
+-- ...

        |
        v  1. pl.scan_csv() -- lazy, no data loaded yet
        v  2. Column normalization (arbitration_id -> arb_id, data_field -> payload)
        v  3. Hex payload parsing -> 8 Float32 byte columns + Shannon entropy
        v  4. Attack type tagging from filename stem + parent dir
        v  5. Sort by timestamp, .collect()  <-- FIRST MATERIALIZATION
        |
        v  6. Vocabulary -- unique arb_ids -> dense int IDs
        |
        v  7. sliding_window_graphs()
        |
        |   7a: Window assignment (window_size=100, stride=100)
        |
        |   7b: Three lazy frames built from one scan:
        |     +- stats_lf: group_by(_wid, node_id) -> 35 node features
        |     +- edges_base: shift(-1).over(_wid) -> 11 edge features
        |     +- labels_lf: group_by(_wid) -> y, attack_type
        |
        |   7c: Sequential .collect() (saves ~20-30 GB peak)
        |   7d: Bidirectional edge flag via self-join
        |   7e: Clustering coeff + degree entirely in Polars (no NetworkX)
        |   7f: Local ID remapping (bulk Polars join)
        |   7g: Polars -> torch bulk handoff (.to_torch Float32)
        |   7h: Pre-collation -- (Data, slices) built directly from bulk tensors.
        |       RLE boundaries become slice offsets; no list[Data], no collate().
        |       Peak memory ~1x final tensor size.
        |   7i: Graphs presorted by node count before save. Adjacent graphs on
        |       disk have similar size -> NodeBudgetBatchSampler + bucket shuffle
        |       produces sequential mmap page faults; reduces VRAM fragmentation.
        |
        v  8. Returns (Data, slices, num_graphs) from bulk tensors
        v  9. atomic_save() -> torch.save + fsync + rename
        |
        +---> {lake_root}/cache/v9.0.0/{dataset}/processed/data_train.pt
        +---> {lake_root}/cache/v9.0.0/{dataset}/processed/data_test.pt
        +---> {lake_root}/cache/v9.0.0/{dataset}/cache_metadata.json
        +---> {lake_root}/cache/v9.0.0/{dataset}/processed/.complete

num_arb_ids is read at load time from cache_metadata.json. The authoritative value is written from the shared arb-id vocab built in CANBusSource.build() (scans every split's source_dirs before any tensor is constructed) and persisted as {root}/vocab.json. Index 0 is reserved for UNK; real ids start at 1. The earlier per-split node_id.max() + 1 derivation was removed because test subdirs can contain arb_ids absent from train, which under-sized the embedding table relative to the real deployment vocabulary and crashed at inference. See graphids/core/data/vocab.py, graphids/core/data/metadata.py (schema v3; vocab_digest is an invariant cache key), and ~/plans/oov-embedding-handling.md.

Phase 2: Training Data Loading

Storage hierarchy:
  NFS (~50ms/read) -> Scratch/GPFS (~5ms/read) -> TMPDIR/local SSD (~0.1ms/read)
  (staged by _preamble.sh before training starts)

+---------------------------------------------------------------------+
|                     TRAINING LOOP (per epoch)                       |
|                                                                     |
|  1. torch.load(data_train.pt, mmap=True)                           |
|     Memory-mapped tensors, pages fault on access.                   |
|     Done ONCE at setup(), not per epoch.                            |
|                                                                     |
|  2. Train/val split -- torch.randperm(seed=seed)                   |
|     val_fraction=0.2 default (configurable). Done ONCE.            |
|                                                                     |
|  3. PRE-BATCHED STANDARD PATH (dynamic_batching=True)              |
|     First train_dataloader() call only:                             |
|       a) node_budget() probes VRAM -> max nodes per batch           |
|       b) NodeBudgetBatchSampler plans all batches deterministically |
|       c) Batch.from_data_list() collates all batches upfront        |
|     Subsequent epochs: shuffle batch ORDER only, no re-collation    |
|     num_workers=0 -- each __getitem__ is O(1) Batch.clone()         |
|                                                                     |
|  4. Val/test loaders: _build_loader() with NodeBudgetBatchSampler  |
|     + PyG DataLoader, num_workers auto-sized via autosize_workers() |
|     Wrapped in PrefetchLoader for async H2D when GPU available.     |
|                                                                     |
|  5. batch.to(device, non_blocking=True) -- async DMA (~1-2ms)      |
|     (PrefetchLoader issues this via CUDA stream)                    |
|                                                                     |
|  6. GPU forward + backward                                         |
|     VGAE: ~10ms  |  GAT: ~25ms                                     |
|                                                                     |
|  WRITES: MLflowTrainingCallback (per-epoch metrics, peak VRAM),     |
|          ModelCheckpoint (best/last ckpts + SHA256 sidecar)         |
+---------------------------------------------------------------------+

CURRICULUM PATH (sampler="curriculum"):
  setup(): score normal-class graphs via VGAE, bucket into K difficulty tiers.
  first train_dataloader(): pre-batch each tier + attack tier.
  CurriculumEpochCallback: selects active tiers each epoch (O(1) tier swap).
  Also num_workers=0 -- same pre-batched O(1) clone pattern.

Sampler: NodeBudgetBatchSampler

Replaces PyG's DynamicBatchSampler. Reads num_nodes_per_graph derived from cache slice offsets at zero I/O cost -- avoids 50K mmap'd Data reconstructions per epoch on large datasets. With shuffle=True, uses bucket shuffle (sort by size -> chunk into buckets -> shuffle bucket order + within-bucket), keeping batch-to-batch size variance low for VRAM allocator stability.

make_graph_loader (sampler.py:28) is the single factory for all loaders: sets spawn multiprocessing context, persistent_workers, and file_system sharing strategy in worker init. Wraps with PrefetchLoader when a GPU device is available.