Skip to main content

Pipeline Visibility

When a pipeline run spans thousands of Lambda invocations, knowing which chunks succeeded, failed, or got stuck is critical. This page documents the DynamoDB schema that gives per-chunk visibility into every run.

Problem

The original schema tracked only aggregate counters (jobs_completed, jobs_failed) on a single run item. When a Lambda timed out, the chunk was never counted -- neither completed nor failed -- leaving the run permanently stuck. The first full GRS run (22 tiles) produced 221 DLQ messages that were invisible from DynamoDB.

DynamoDB Schema

The phspectra-runs table uses a composite key (PK + SK) to store both run-level and chunk-level items under the same partition.

Key schema

AttributeTypePurpose
PKStringrun_id (UUID) -- shared between the run item and all its chunk items
SKString"RUN" for run items, "CHUNK#chunk-{start:07d}" for chunk items

GSI1 -- query runs by survey

AttributeTypePurpose
GSI1_PKStringsurvey (e.g. "grs-15")
GSI1_SKStringcreated_at ISO timestamp

Projection: ALL. Set on run items only.

Run item

Created by the splitter Lambda when it fans out chunks.

AttributeTypeDescription
PKStringRun ID (UUID)
SKStringAlways "RUN"
GSI1_PKStringSurvey name (for GSI1 queries)
GSI1_SKStringcreated_at (for GSI1 range queries)
surveyStringSurvey name
created_atStringISO 8601 timestamp
jobs_totalNumberTotal chunks to process
jobs_completedNumberAtomically incremented by each successful worker
jobs_failedNumberAtomically incremented on worker failure
n_spectraNumberTotal spectra in the cube
n_chunksNumberNumber of chunks created
paramsStringJSON-encoded fit_gaussians params (if non-empty)

Chunk item

Created by the splitter as PENDING, updated by workers as they start and finish.

AttributeTypeDescription
PKStringRun ID (same as the parent run)
SKString"CHUNK#chunk-{start:07d}"
statusStringPENDING / IN_PROGRESS / COMPLETED / FAILED
chunk_keyStringS3 key of the .npz file
n_spectraNumberSpectra count in this chunk
created_atStringISO timestamp (set by splitter)
started_atStringISO timestamp (set by worker on start)
completed_atStringISO timestamp (set by worker on finish)
duration_msNumberWall-clock processing time
output_keyStringParquet S3 key (on success)
errorStringError message, truncated to 1 KB (on failure)

Chunk lifecycle

When a Lambda times out, the chunk remains IN_PROGRESS with a started_at timestamp but no completed_at. This is exactly the gap the old schema could not detect.

Access Patterns

1. Get a run

Fetch the run-level summary for a known run ID.

GetItem(PK=run_id, SK="RUN")

2. List all chunks for a run

See the status of every chunk in a run.

Query(PK=run_id, SK begins_with "CHUNK#")

3. Find runs by survey

Discover the most recent run for a given survey (used by the CLI after uploading a manifest).

Query(
IndexName="GSI1",
GSI1_PK=survey,
GSI1_SK >= not_before,
ScanIndexForward=False,
Limit=1
)

This replaced a full-table Scan with a filter expression.

4. Detect stuck chunks

Find chunks where the worker started but never finished (Lambda timeout, OOM, etc.).

Query(PK=run_id, SK begins_with "CHUNK#")
-> filter: status = "IN_PROGRESS" AND started_at < (now - 15 min)

If a chunk has been IN_PROGRESS for longer than the Lambda timeout (15 minutes), the worker is dead and the chunk will land in the DLQ after SQS retries.

5. Count failures with details

Query(PK=run_id, SK begins_with "CHUNK#")
-> filter: status = "FAILED"

Each failed chunk carries the truncated error message and timing, so you can see why it failed and how long it ran before failing.

CLI example queries

These use the AWS CLI to inspect run state directly.

# Get the run summary
aws dynamodb get-item \
--table-name phspectra-runs \
--key '{"PK":{"S":"<run-id>"},"SK":{"S":"RUN"}}'

# List all chunks for a run
aws dynamodb query \
--table-name phspectra-runs \
--key-condition-expression 'PK = :pk AND begins_with(SK, :prefix)' \
--expression-attribute-values '{":pk":{"S":"<run-id>"},":prefix":{"S":"CHUNK#"}}'

# Find stuck chunks (IN_PROGRESS for over 15 min)
aws dynamodb query \
--table-name phspectra-runs \
--key-condition-expression 'PK = :pk AND begins_with(SK, :prefix)' \
--filter-expression '#s = :status' \
--expression-attribute-names '{"#s":"status"}' \
--expression-attribute-values '{":pk":{"S":"<run-id>"},":prefix":{"S":"CHUNK#"},":status":{"S":"IN_PROGRESS"}}'

# Find most recent run for a survey
aws dynamodb query \
--table-name phspectra-runs \
--index-name GSI1 \
--key-condition-expression 'GSI1_PK = :survey' \
--expression-attribute-values '{":survey":{"S":"grs-15"}}' \
--scan-index-forward false \
--limit 1