The Ten-Minute Query


I’m sitting at my laptop on a Tuesday morning, waiting. The progress bar on my screen says ‘Query running… 4 minutes, 37 seconds.’ I lean back in my chair and let out this long sigh that probably says more than I intended.

My manager walks past my desk. She glances at my screen, and I can see that look—the one that says she already knows what I’m about to tell her. I didn’t need to explain.

“Still pulling that customer report?”

“Yeah.”

She shakes her head. “We need to fix this.”

It was an ad-hoc piece of work that we’d been running for months. But I just nod.

Here’s what was happening: every time someone needed to know ‘how many orders did customer X place?’ or ‘which products sold best last month?’, we’d query this massive table. Three million rows. Every single customer’s name, address, email—repeated on every order row. The system was scanning through all that redundant data every single time.

And then, two weeks ago, it got worse. Someone ran a data import script that had a bug. It replaced half our customer names with NULL. By the time we caught it, twelve reports had already gone out with blank customer names. I had to send apology emails. My team and I had to stay until midnight fixing it.

I thought, there has to be a better way to do this. People have been building data systems for decades—someone must have figured this out.

And they have. His name is Ralph Kimball, and his dimensional modeling methodology has been the gold standard for data warehouses since the 1990s.



The Problem I Couldn’t Unsee


Let me show you what our data looked like.

order_id | date       | customer_name | address      | email        | product | amount
---------|------------|---------------|--------------|--------------|---------|-------
1        | 2024-11-15 | John Smith    | 123 Main St  | john@...     | Pizza   | 25
2        | 2024-11-15 | John Smith    | 123 Main St  | john@...     | Soda    | 3
3        | 2024-11-15 | Jane Doe      | 456 Oak Ave  | jane@...     | Pizza   | 25
4        | 2024-11-16 | John Smith    | 123 Main St  | john@...     | Burger  | 12

See what’s happening? John Smith’s address and email are stored three times. Multiply that by thousands of customers and millions of orders, and you’ve got:

  • Every query scanning way more data than necessary
  • Storage costs that make your CFO unhappy
  • One bug corrupting your entire history
  • Query times that make you question your career choices

Every single query was like asking someone to find your keys by searching every room in your house, including the ones you never use. Sure, they’ll find the keys eventually. But there’s a better way.



The Kitchen That Changed Everything


Let me paint you a hypothetical scenario. Imagine it’s a Saturday, and you’re standing in your kitchen after just moving house. You can’t find anything. Everything’s still in boxes.

You’re rummaging through the third box, looking for a spatula. You’re pulling out random stuff—a whisk, some Tupperware lids, a pizza cutter, yesterday’s takeout containers, a bag of flour. Everything mixed together. This is chaos.

Now imagine a friend comes over. They take one look at your kitchen and say, “What is this?”

“It’s… I haven’t unpacked yet.”

“You’ve been here a week.” They walk over to the mess. “Here’s what you should do. The stuff that rarely changes—your plates, your pots, your cutting boards—those go in the cabinets. Organised, labelled, easy to find. The stuff that comes and goes constantly—the groceries, the leftovers, what you’re cooking today—that goes in the fridge and on the counter. Don’t mix the permanent stuff with the temporary stuff.”

Two hours later, the kitchen makes sense. You can find anything in seconds. And here’s the wild part—it actually takes up less space because you’re not duplicating storage for the same items.

That’s dimensional modeling.

When your data is like those boxes, everything’s mixed together. Customer information sitting right next to order information, repeated over and over. Every time you need something, you’re digging through all the boxes.

Dimensional modeling says: let’s organise this. Put stable descriptive data—things that rarely change—in one place. We call these dimension tables. Put the events that happen constantly—the transactions, the updates, the measurements—in another place. We call these fact tables. Link them with keys.

Here’s what that transformation looks like:

BEFORE (Messy Boxes - everything mixed together):

order_id | customer_name | address      | product | amount
1        | John Smith    | 123 Main St  | Pizza   | 25
2        | John Smith    | 123 Main St  | Soda    | 3

↑ John’s details repeated every single order!

AFTER (Organised Kitchen): DIM_CUSTOMER (Cabinets - stable stuff, stored ONCE):

customer_key | customer_id | name       | address      | email
1001         | C-123       | John Smith | 123 Main St  | john@...
1002         | C-456       | Jane Doe   | 456 Oak Ave  | jane@...

DIM_DATE (Calendar on the wall - one row per day):

date_key | full_date  | day_name  | month | quarter | year | is_weekend
20241115 | 2024-11-15 | Friday    | Nov   | Q4      | 2024 | N
20241116 | 2024-11-16 | Saturday  | Nov   | Q4      | 2024 | Y

FACT_ORDERS (Fridge/counter - events that happen constantly):

order_key | date_key | customer_key | product_key | amount
1         | 20241115 | 1001         | 5           | 25
2         | 20241115 | 1001         | 8           | 3

Customer details stored one time. Not a thousand times. When you need to answer ‘how much did we sell?’, you scan the compact fact table. When you need ‘which customers bought?’, you JOIN to get the descriptive details.

Notice something important in that structure? The dimension tables have two types of keys:

  • customer_key (1001, 1002) — This is a surrogate key. A meaningless integer we generate ourselves.
  • customer_id (C-123, C-456) — This is the natural key. The ID from the source system.

Why both? Because source systems are chaotic. They recycle IDs. They change formats. They get merged in acquisitions. The surrogate key is our key—stable, predictable, and under our control. Every dimension table gets a surrogate key as its primary key.

And that date dimension? It’s “the most important dimension.” Every fact table should have one. It lets you slice data by day of week, month, quarter, fiscal period, holidays—without calculating these on every query.

The core principle: things that rarely change go in dedicated storage, organised and easy to find. Things that happen constantly go in their own space, linked back by keys. Same rule applies to data:

  • Dimensions = Stable descriptive stuff (customers, products, locations, dates)
  • Facts = Events that happen constantly (orders, clicks, updates, measurements)


The Conference Room I’ll Never Forget


I’m sitting in a conference room. Fluorescent lights buzzing. That weird hum they make when it’s too quiet—I’m wondering why we haven’t changed to LEDs. My manager is standing at the whiteboard, and she’s pissed.

“Someone,” she says, not looking at anyone in particular, “pushed bad data to production last night.”

I feel my stomach drop. The data pipeline my team had been working on.

“The revenue report that went to the board this morning showed every transaction as zero dollars. Zero. The board emailed me asking if the company made any money last quarter.”

Oh god. Oh no. I sank down in my chair a little.

Turns out it wasn’t actually our release—it was someone else’s. But in that moment, sitting there, I realised something crucial: we had no safety net. Data went straight from transformation to production. No checks. No validation. If you made a mistake, everyone saw it immediately.

That conference room moment stuck with me. I started researching how other teams prevented this kind of disaster. That’s when I learned about Write-Audit-Publish. And honestly? It’s probably saved me from that same sick feeling countless times since.

Here’s the idea:

WRITE — You finish your transformation. Instead of putting the data straight into production, you write it to a staging branch. Think of it like a draft folder for emails. It’s there, but it’s not official yet.

AUDIT — This is the crucial part. You run every check you can think of:

  • Did I lose any rows?
  • Are there any NULL values where there shouldn’t be?
  • Do all the foreign keys match up?
  • Does the math make sense?
  • Do surrogate keys have duplicates?

It’s like re-reading an important email before you hit send. You check for typos, you verify the recipients, you make sure the attachment is right.

Important: In the future when you find incidents, bugs, or problems—add them to this AUDIT layer. Ensure you don’t make the same mistakes twice.

PUBLISH — Only if everything passes, you merge the branch to production. If anything fails, you stop. You fix it. You try again. The bad data never reaches production. Never reaches that board meeting.

The traditional way to implement this involves copying data between folders or tables. But there’s a better approach using Apache Iceberg’s branching feature. Instead of physically copying data, you create a branch—like in Git. Your audit runs against the branch. If it passes, you merge the branch to main. If it fails, you drop the branch. No data was ever at risk.

Critical design decision: WAP should happen at each table level, not just at the end of the pipeline. Each dimension job does its own Write-Audit-Publish cycle. Each fact job does its own. This way:

  • Failures are isolated to the specific table that failed
  • You catch issues early, not at the very end
  • Each job is self-contained and independently testable
  • Downstream jobs read from published (main) tables, not staging branches

Let me show you what this looks like when an audit catches something:

AUDIT PHASE (dim_system):

Row count check.......................... PASS
   Source: 1,198 rows
   Target: 1,198 rows

Primary key uniqueness................... FAIL
   Found 13 duplicate system_keys in dim_system!

AUDIT FAILED — BRANCH NOT MERGED

Data remains in staging branch.
Production table unchanged.
Fix the duplicates and re-run the job.

Look at that. The audit caught duplicate surrogate keys before they reached production. Nobody sees this except you. You fix the problem. You run it again. Then it passes, and the branch merges to main.

This is Write-Audit-Publish. It’s the difference between sending an email with a typo versus catching it first. Between serving undercooked food versus checking with a thermometer. Between publishing bad data versus validating before it matters.

That conference room moment? Doesn’t happen with this pattern.



Building This Thing for Real


Alright, enough theory. Let me show you how to actually build this in AWS.

We’re going to use real (game) space exploration data—systems, planets, updates. It’s way more interesting than order data, and the patterns work exactly the same.

In this section:

Architecture Overview

Here’s our architecture:

                    +------------------------------------------+
                    |      AWS Step Functions Workflow         |
                    +------------------------------------------+
                                       |
          +----------------------------+----------------------------+
          |                            |                            |
          v                            v                            v
    +------------+             +------------+             +------------+
    | Glue Job:  |             | Glue Job:  |             | Glue Job:  |
    | dim_date   |             | dim_system |             | dim_planet |
    | (WAP)      |             | (WAP)      |             | (WAP)      |
    +------------+             +------------+             +------------+
          |                            |                            |
          +----------------------------+----------------------------+
                                       |
                        (Wait for all dimensions)
                                       |
                                       v
                         +------------------------+
                         |      Glue Job:         |
                         | fact_system_updates    |
                         | (WAP + Incremental)    |
                         +------------------------+

Data Flow:

Landing LayerStaging Layer (PSA)Semantic Layer
Raw 1:1 from sourceIceberg + data_date partitioningProduction dimensional model
systems/stg_systems/dim_date
planets/stg_planets/dim_system
updates/stg_updates/dim_planet
fact_system_updates

Let me explain the three layers:

Landing Layer — Raw data, 1:1 from source systems. No transformations except what’s needed to land it. Contains only the latest fetch. If sourcing fails, this layer fails. Get the data close to you as quickly as possible (into your ecosystem). Typically all the files will be string or varchar to mitigate data type issues from a source system.

Staging Layer (Persistent Staging Area) — This is where we convert data types and prepare for dimensional modeling. Critically, we partition by data_date—the date the source data was fetched. This gives us:

  • Historical reprocessing capability (reload any past date)
  • Debugging (what did the data look like on January 15th?)
  • Dependency triggers (when a partition is written, trigger downstream jobs)

Semantic Layer — Production-ready dimensional model. This is where business users query. Tables are partitioned appropriately for query performance.

Also notice the job structure: each job does its own WAP cycle. Dimensions build in parallel, each publishing to main when their audits pass. Fact tables wait for all dimensions to publish, then do their own WAP cycle. This ensures:

  • Fact tables read from published dimensions (main branch), not staging branches
  • Foreign key lookups use production-ready surrogate keys
  • Each job is independently testable

Step 1: Setting Up S3 Buckets

I’m in the S3 console. Click “Create bucket”. I’m calling mine space-data-pipeline-demo.

Important settings:

  • Region: ap-southeast-2 (Sydney—pick one close to you)
  • Block all public access: Yes
  • Versioning: Enabled
  • Default encryption: Enabled

Inside this bucket:

space-data-pipeline-demo/
├── landing/              ← Raw 1:1 from source (latest only)
│   ├── systems/
│   ├── planets/
│   └── updates/
├── staging/              ← PSA with data_date partitioning
│   ├── stg_systems/
│   ├── stg_planets/
│   └── stg_updates/
└── semantic/             ← Production dimensional model
    ├── dim_date/
    ├── dim_system/
    ├── dim_planet/
    └── fact_system_updates/

Step 2: Creating the Iceberg Tables

Here’s the DDL for our dimensional model. Notice the partitioning strategies and SCD Type 2 columns.

Click to expand: dim_date DDL

This is a reference dimension—typically loaded once with all dates, then rarely updated. Partitioned by year for query performance.

CREATE TABLE space_exploration.dim_date (
    date_key INT,
    full_date DATE,
    day_of_week STRING,
    day_of_month INT,
    day_of_year INT,
    week_of_year INT,
    month_number INT,
    month_name STRING,
    quarter INT,
    year INT,
    is_weekend STRING,
    is_holiday STRING,
    fiscal_quarter INT,
    fiscal_year INT,
    -- Audit columns
    load_timestamp TIMESTAMP,
    source_system STRING
)
PARTITIONED BY (year)
LOCATION 's3://space-data-pipeline-demo/semantic/dim_date/'
TBLPROPERTIES (
    'table_type' = 'ICEBERG',
    'format' = 'parquet'
);
Click to expand: dim_system DDL (SCD Type 2)

Tracks history with effective_date/expiration_date. Partitioned by effective_year for efficient history queries.

CREATE TABLE space_exploration.dim_system (
    system_key INT,                    -- Surrogate key (generated)
    system_id STRING,                  -- Natural key (from source)
    system_name STRING,
    sector STRING,
    main_star_type STRING,
    coordinates_x DOUBLE,
    coordinates_y DOUBLE,
    coordinates_z DOUBLE,
    discovery_date DATE,
    exploration_status STRING,
    -- SCD Type 2 columns
    effective_date DATE,
    expiration_date DATE,
    current_flag STRING,               -- 'Y' or 'N'
    -- Audit columns
    load_timestamp TIMESTAMP,
    source_system STRING,
    -- Partition column
    effective_year INT
)
PARTITIONED BY (effective_year)
LOCATION 's3://space-data-pipeline-demo/semantic/dim_system/'
TBLPROPERTIES (
    'table_type' = 'ICEBERG',
    'format' = 'parquet'
);
Click to expand: dim_planet DDL (SCD Type 2)

Foreign key to dim_system uses the surrogate key. Partitioned by effective_year.

CREATE TABLE space_exploration.dim_planet (
    planet_key INT,                    -- Surrogate key
    planet_id STRING,                  -- Natural key
    system_key INT,                    -- FK to dim_system (surrogate!)
    planet_name STRING,
    planet_type STRING,
    orbital_period DOUBLE,
    has_atmosphere STRING,
    habitability_score DOUBLE,
    -- SCD Type 2 columns
    effective_date DATE,
    expiration_date DATE,
    current_flag STRING,
    -- Audit columns
    load_timestamp TIMESTAMP,
    source_system STRING,
    -- Partition column
    effective_year INT
)
PARTITIONED BY (effective_year)
LOCATION 's3://space-data-pipeline-demo/semantic/dim_planet/'
TBLPROPERTIES (
    'table_type' = 'ICEBERG',
    'format' = 'parquet'
);
Click to expand: fact_system_updates DDL

Incremental fact table partitioned by date_year. Loaded with delete-then-insert pattern.

CREATE TABLE space_exploration.fact_system_updates (
    update_key BIGINT,                 -- Surrogate key
    date_key INT,                      -- FK to dim_date
    system_key INT,                    -- FK to dim_system (current surrogate!)
    update_timestamp TIMESTAMP,        -- Original precision preserved
    status STRING,
    ships_present INT,                 -- Additive fact
    resource_level DOUBLE,             -- Additive fact
    -- Audit columns
    load_timestamp TIMESTAMP,
    source_system STRING,
    data_date DATE,                    -- Source data date (for reprocessing)
    -- Partition column
    date_year INT
)
PARTITIONED BY (date_year)
LOCATION 's3://space-data-pipeline-demo/semantic/fact_system_updates/'
TBLPROPERTIES (
    'table_type' = 'ICEBERG',
    'format' = 'parquet'
);
Click to expand: Staging Tables DDL (Persistent Staging Area)

Partitioned by data_date for historical reprocessing.

CREATE TABLE space_exploration.stg_systems (
    system_id STRING,
    system_name STRING,
    sector STRING,
    main_star_type STRING,
    coordinates_x DOUBLE,
    coordinates_y DOUBLE,
    coordinates_z DOUBLE,
    discovery_date DATE,
    exploration_status STRING,
    -- PSA columns
    data_date DATE,
    load_timestamp TIMESTAMP
)
PARTITIONED BY (data_date)
LOCATION 's3://space-data-pipeline-demo/staging/stg_systems/'
TBLPROPERTIES ('table_type' = 'ICEBERG', 'format' = 'parquet');

CREATE TABLE space_exploration.stg_planets (
    planet_id STRING,
    system_id STRING,
    planet_name STRING,
    planet_type STRING,
    orbital_period DOUBLE,
    has_atmosphere STRING,
    habitability_score DOUBLE,
    -- PSA columns
    data_date DATE,
    load_timestamp TIMESTAMP
)
PARTITIONED BY (data_date)
LOCATION 's3://space-data-pipeline-demo/staging/stg_planets/'
TBLPROPERTIES ('table_type' = 'ICEBERG', 'format' = 'parquet');

CREATE TABLE space_exploration.stg_updates (
    system_id STRING,
    update_timestamp TIMESTAMP,
    status STRING,
    ships_present INT,
    resource_level DOUBLE,
    -- PSA columns
    data_date DATE,
    load_timestamp TIMESTAMP
)
PARTITIONED BY (data_date)
LOCATION 's3://space-data-pipeline-demo/staging/stg_updates/'
TBLPROPERTIES ('table_type' = 'ICEBERG', 'format' = 'parquet');

Key design decisions:

  • dim_date: Partitioned by year. This is a reference dimension—typically loaded once with all dates from 1900-01-01 to 9999-12-31, then rarely updated.

  • dim_system / dim_planet: SCD Type 2 with effective_date, expiration_date, current_flag. Partitioned by effective_year for efficient history queries. Updated daily.

  • fact_system_updates: Partitioned by date_year. Loaded incrementally using delete-then-insert pattern.

  • Staging tables (PSA): Partitioned by data_date. This lets us reprocess any historical date and provides dependency triggers.



The Heart of the System: Orchestrated Glue Jobs with Per-Table WAP


Each job does its own Write-Audit-Publish cycle. Let me walk you through each one.


Job 1: Build Date Dimension (Reference Dimension)

The date dimension is special—it’s a reference dimension. We generate it ourselves with all calendar attributes, typically as a one-time load. You’d only rerun this if you need to extend the date range or add new attributes (like new holidays).

Click to expand: build_dim_date Glue Job

This job generates a complete date dimension from 2000-01-01 to 2050-12-31 (for demo purposes—production would use a wider range). It creates date attributes like day_of_week, quarter, fiscal_year, and is_weekend, then runs WAP to validate before publishing.

# Glue Job: build_dim_date
# This is typically a ONE-TIME RUN to populate all dates
# Rerun only when extending date range or adding attributes

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import datetime, timedelta

# Initialize
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

print("="*60)
print("Building Date Dimension (Reference Dimension)")
print("This is typically a ONE-TIME load")
print("="*60)

# Generate comprehensive date range
# Standard practice: 1900-01-01 to 9999-12-31
start_date = datetime(1900, 1, 1)
end_date = datetime(9999, 12, 31)

# For demo purposes, let's use a more reasonable range
# In production, you'd use the full range
start_date = datetime(2000, 1, 1)
end_date = datetime(2050, 12, 31)

print(f"Generating dates from {start_date.date()} to {end_date.date()}")

date_list = []
current = start_date
while current <= end_date:
    date_list.append((current,))
    current += timedelta(days=1)

date_df = spark.createDataFrame(date_list, ["full_date"])

# Build dimension with rich attributes
dim_date = date_df.select(
    # Surrogate key: YYYYMMDD format
    (year("full_date") * 10000 +
     month("full_date") * 100 +
     dayofmonth("full_date")).cast("int").alias("date_key"),

    col("full_date"),
    date_format("full_date", "EEEE").alias("day_of_week"),
    dayofmonth("full_date").alias("day_of_month"),
    dayofyear("full_date").alias("day_of_year"),
    weekofyear("full_date").alias("week_of_year"),
    month("full_date").alias("month_number"),
    date_format("full_date", "MMMM").alias("month_name"),
    quarter("full_date").alias("quarter"),
    year("full_date").alias("year"),

    # Business-friendly flags
    when(dayofweek("full_date").isin([1, 7]), "Y")
        .otherwise("N").alias("is_weekend"),
    lit("N").alias("is_holiday"),  # Populate from holiday calendar

    # Fiscal calendar (assuming fiscal year = calendar year)
    quarter("full_date").alias("fiscal_quarter"),
    year("full_date").alias("fiscal_year"),

    # Audit columns
    current_timestamp().alias("load_timestamp"),
    lit("GENERATED").alias("source_system")
)

row_count = dim_date.count()
print(f"Generated {row_count:,} date rows")

# ============================================
# WRITE PHASE: Write to staging branch
# ============================================
print("\n--- WRITE PHASE ---")
print("Writing to staging branch...")

spark.sql("ALTER TABLE space_exploration.dim_date CREATE BRANCH IF NOT EXISTS staging")

dim_date.writeTo("space_exploration.dim_date") \
    .option("branch", "staging") \
    .overwritePartitions()

print("Written to staging branch")

# ============================================
# AUDIT PHASE: Validate before publishing
# ============================================
print("\n--- AUDIT PHASE ---")

staging_df = spark.read \
    .option("branch", "staging") \
    .table("space_exploration.dim_date")

audit_failures = []

# Audit 1: Row count
staged_count = staging_df.count()
print(f"Audit 1 - Row count: {staged_count:,}")
if staged_count == 0:
    audit_failures.append("FAIL: dim_date is empty")

# Audit 2: Primary key uniqueness
unique_keys = staging_df.select("date_key").distinct().count()
if staged_count != unique_keys:
    duplicates = staged_count - unique_keys
    audit_failures.append(f"FAIL: {duplicates} duplicate date_keys")
    print(f"Audit 2 - PK uniqueness: FAIL ({duplicates} duplicates)")
else:
    print(f"Audit 2 - PK uniqueness: PASS (all {unique_keys:,} unique)")

# Audit 3: No NULLs in critical fields
null_count = staging_df.filter(
    col("date_key").isNull() | col("full_date").isNull()
).count()
if null_count > 0:
    audit_failures.append(f"FAIL: {null_count} NULL values in critical fields")
    print(f"Audit 3 - NULL check: FAIL ({null_count} NULLs)")
else:
    print("Audit 3 - NULL check: PASS")

# Audit 4: Date range coverage
date_range = staging_df.agg(
    min("full_date").alias("min_date"),
    max("full_date").alias("max_date")
).collect()[0]
print(f"Audit 4 - Date range: {date_range['min_date']} to {date_range['max_date']}")

# ============================================
# PUBLISH PHASE: Merge to main if audits pass
# ============================================
print("\n--- PUBLISH PHASE ---")

if len(audit_failures) > 0:
    print("\nAUDIT FAILED:")
    for failure in audit_failures:
        print(f"  {failure}")
    print("\nBranch NOT merged. Production unchanged.")
    spark.sql("ALTER TABLE space_exploration.dim_date DROP BRANCH staging")
    raise Exception(f"Audit failed with {len(audit_failures)} errors")
else:
    print("\nALL AUDITS PASSED")
    print("Merging staging branch to main...")

    # Fast-forward main to staging
    spark.sql("""
        CALL system.fast_forward(
            table => 'space_exploration.dim_date',
            branch => 'main',
            to => 'staging'
        )
    """)

    # Clean up staging branch
    spark.sql("ALTER TABLE space_exploration.dim_date DROP BRANCH staging")

    print(f"PUBLISHED: dim_date ({staged_count:,} rows)")

print("\n" + "="*60)
print("dim_date job complete")
print("="*60)

job.commit()

Job 2: Build System Dimension (SCD Type 2)

This dimension tracks history. When a system’s attributes change (like exploration_status), we expire the old row and create a new one.

Click to expand: build_dim_system Glue Job

This job implements SCD Type 2 change detection. It compares incoming data against the current dimension, identifies NEW/CHANGED/UNCHANGED records, expires old versions, and creates new versions with updated surrogate keys.

# Glue Job: build_dim_system
# SCD Type 2 implementation with effective/expiration dates

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import *
from pyspark.sql.window import Window

args = getResolvedOptions(sys.argv, ['JOB_NAME', 'DATA_DATE'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

data_date = args['DATA_DATE']  # e.g., '2026-01-17'
print("="*60)
print(f"Building System Dimension (SCD Type 2)")
print(f"Data Date: {data_date}")
print("="*60)

# ============================================
# LOAD: Read from Persistent Staging Area
# ============================================
print("\n--- LOAD PHASE ---")

# Read today's staged data (already landed in PSA)
source_df = spark.read.table("space_exploration.stg_systems") \
    .filter(col("data_date") == data_date)

source_count = source_df.count()
print(f"Source rows from PSA (data_date={data_date}): {source_count:,}")

if source_count == 0:
    print("No data for this date. Exiting.")
    job.commit()
    sys.exit(0)

# Read current production dimension (for SCD2 comparison)
try:
    current_dim = spark.read.table("space_exploration.dim_system") \
        .filter(col("current_flag") == "Y")
    current_count = current_dim.count()
    print(f"Current dimension rows (current_flag='Y'): {current_count:,}")
    has_existing_data = current_count > 0
except:
    print("No existing dimension data (first load)")
    has_existing_data = False
    current_dim = None

# ============================================
# TRANSFORM: SCD Type 2 Logic
# ============================================
print("\n--- TRANSFORM PHASE ---")

if not has_existing_data:
    # First load: all rows are new
    print("First load - all rows are INSERT")

    window_spec = Window.orderBy("system_id")

    new_dim = source_df.select(
        row_number().over(window_spec).alias("system_key"),
        col("system_id"),
        col("system_name"),
        col("sector"),
        col("main_star_type"),
        col("coordinates_x"),
        col("coordinates_y"),
        col("coordinates_z"),
        col("discovery_date"),
        col("exploration_status"),
        lit(data_date).cast("date").alias("effective_date"),
        lit("9999-12-31").cast("date").alias("expiration_date"),
        lit("Y").alias("current_flag"),
        current_timestamp().alias("load_timestamp"),
        lit("ELITE_DANGEROUS").alias("source_system"),
        year(lit(data_date)).alias("effective_year")
    ).distinct()

else:
    # Incremental load: detect changes for SCD2
    print("Incremental load - detecting changes for SCD2")

    # Define which columns to track for changes
    tracked_columns = [
        "system_name", "sector", "main_star_type",
        "coordinates_x", "coordinates_y", "coordinates_z",
        "exploration_status"
    ]

    # Join source with current to detect changes
    comparison = source_df.alias("src").join(
        current_dim.alias("curr"),
        col("src.system_id") == col("curr.system_id"),
        "full_outer"
    )

    # Identify: NEW (no match in current), CHANGED (values differ), UNCHANGED
    change_detection = comparison.withColumn(
        "change_type",
        when(col("curr.system_id").isNull(), "NEW")
        .when(col("src.system_id").isNull(), "DELETED")
        .when(
            (col("src.system_name") != col("curr.system_name")) |
            (col("src.sector") != col("curr.sector")) |
            (col("src.main_star_type") != col("curr.main_star_type")) |
            (col("src.exploration_status") != col("curr.exploration_status")) |
            (abs(col("src.coordinates_x") - col("curr.coordinates_x")) > 0.001) |
            (abs(col("src.coordinates_y") - col("curr.coordinates_y")) > 0.001) |
            (abs(col("src.coordinates_z") - col("curr.coordinates_z")) > 0.001),
            "CHANGED"
        )
        .otherwise("UNCHANGED")
    )

    # Count changes
    change_counts = change_detection.groupBy("change_type").count().collect()
    for row in change_counts:
        print(f"  {row['change_type']}: {row['count']:,}")

    # Get max surrogate key for new key generation
    max_key = current_dim.agg(max("system_key")).collect()[0][0] or 0

    # Build new dimension rows for NEW and CHANGED records
    new_records = change_detection.filter(col("change_type").isin(["NEW", "CHANGED"]))

    window_spec = Window.orderBy("src.system_id")

    new_rows = new_records.select(
        (row_number().over(window_spec) + max_key).alias("system_key"),
        col("src.system_id").alias("system_id"),
        col("src.system_name").alias("system_name"),
        col("src.sector").alias("sector"),
        col("src.main_star_type").alias("main_star_type"),
        col("src.coordinates_x").alias("coordinates_x"),
        col("src.coordinates_y").alias("coordinates_y"),
        col("src.coordinates_z").alias("coordinates_z"),
        col("src.discovery_date").alias("discovery_date"),
        col("src.exploration_status").alias("exploration_status"),
        lit(data_date).cast("date").alias("effective_date"),
        lit("9999-12-31").cast("date").alias("expiration_date"),
        lit("Y").alias("current_flag"),
        current_timestamp().alias("load_timestamp"),
        lit("ELITE_DANGEROUS").alias("source_system"),
        year(lit(data_date)).alias("effective_year")
    )

    # Build expired rows (for CHANGED records, expire the old version)
    changed_ids = change_detection.filter(col("change_type") == "CHANGED") \
        .select(col("curr.system_id").alias("system_id"))

    expired_rows = current_dim.join(changed_ids, "system_id") \
        .withColumn("expiration_date", lit(data_date).cast("date") - 1) \
        .withColumn("current_flag", lit("N"))

    # Unchanged rows stay as-is
    unchanged_rows = current_dim.join(
        change_detection.filter(col("change_type") == "UNCHANGED")
            .select(col("curr.system_id").alias("system_id")),
        "system_id"
    )

    # Historical rows (already expired) - read from full table
    historical_rows = spark.read.table("space_exploration.dim_system") \
        .filter(col("current_flag") == "N")

    # Combine all
    new_dim = new_rows \
        .unionByName(expired_rows, allowMissingColumns=True) \
        .unionByName(unchanged_rows, allowMissingColumns=True) \
        .unionByName(historical_rows, allowMissingColumns=True)

transform_count = new_dim.count()
print(f"Total dimension rows after SCD2: {transform_count:,}")

# ============================================
# WRITE PHASE: Write to staging branch
# ============================================
print("\n--- WRITE PHASE ---")

spark.sql("ALTER TABLE space_exploration.dim_system CREATE BRANCH IF NOT EXISTS staging")

new_dim.writeTo("space_exploration.dim_system") \
    .option("branch", "staging") \
    .overwritePartitions()

print("Written to staging branch")

# ============================================
# AUDIT PHASE
# ============================================
print("\n--- AUDIT PHASE ---")

staging_df = spark.read \
    .option("branch", "staging") \
    .table("space_exploration.dim_system")

audit_failures = []

# Audit 1: Row count
staged_count = staging_df.count()
print(f"Audit 1 - Row count: {staged_count:,}")
if staged_count == 0:
    audit_failures.append("FAIL: dim_system is empty")

# Audit 2: Surrogate key uniqueness
unique_keys = staging_df.select("system_key").distinct().count()
if staged_count != unique_keys:
    duplicates = staged_count - unique_keys
    audit_failures.append(f"FAIL: {duplicates} duplicate system_keys")
    print(f"Audit 2 - PK uniqueness: FAIL ({duplicates} duplicates)")
else:
    print(f"Audit 2 - PK uniqueness: PASS")

# Audit 3: Only one current row per natural key
current_rows = staging_df.filter(col("current_flag") == "Y")
current_count = current_rows.count()
unique_natural_keys = current_rows.select("system_id").distinct().count()
if current_count != unique_natural_keys:
    audit_failures.append(f"FAIL: Multiple current rows for same system_id")
    print(f"Audit 3 - Single current: FAIL")
else:
    print(f"Audit 3 - Single current: PASS ({current_count:,} current rows)")

# Audit 4: No NULLs in critical fields
null_count = staging_df.filter(
    col("system_key").isNull() |
    col("system_id").isNull() |
    col("effective_date").isNull() |
    col("current_flag").isNull()
).count()
if null_count > 0:
    audit_failures.append(f"FAIL: {null_count} NULL values in critical fields")
    print(f"Audit 4 - NULL check: FAIL")
else:
    print("Audit 4 - NULL check: PASS")

# Audit 5: Effective/expiration date logic
invalid_dates = staging_df.filter(
    col("effective_date") > col("expiration_date")
).count()
if invalid_dates > 0:
    audit_failures.append(f"FAIL: {invalid_dates} rows with effective > expiration")
    print(f"Audit 5 - Date logic: FAIL")
else:
    print("Audit 5 - Date logic: PASS")

# ============================================
# PUBLISH PHASE
# ============================================
print("\n--- PUBLISH PHASE ---")

if len(audit_failures) > 0:
    print("\nAUDIT FAILED:")
    for failure in audit_failures:
        print(f"  {failure}")
    print("\nBranch NOT merged. Production unchanged.")
    spark.sql("ALTER TABLE space_exploration.dim_system DROP BRANCH staging")
    raise Exception(f"Audit failed with {len(audit_failures)} errors")
else:
    print("\nALL AUDITS PASSED")
    print("Merging staging branch to main...")

    spark.sql("""
        CALL system.fast_forward(
            table => 'space_exploration.dim_system',
            branch => 'main',
            to => 'staging'
        )
    """)

    spark.sql("ALTER TABLE space_exploration.dim_system DROP BRANCH staging")

    print(f"PUBLISHED: dim_system ({staged_count:,} rows)")

job.commit()

Job 3: Build Planet Dimension (SCD Type 2)

Similar pattern to dimsystem, but with a foreign key lookup to the _published dim_system.

Click to expand: build_dim_planet Glue Job

This job demonstrates FK lookup to a published dimension. It reads from the main branch of dim_system (not staging), ensuring it uses production-ready surrogate keys. The FK integrity audit validates this relationship.

# Glue Job: build_dim_planet
# SCD Type 2 with FK lookup to published dim_system

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import *
from pyspark.sql.window import Window

args = getResolvedOptions(sys.argv, ['JOB_NAME', 'DATA_DATE'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

data_date = args['DATA_DATE']
print("="*60)
print(f"Building Planet Dimension (SCD Type 2)")
print(f"Data Date: {data_date}")
print("="*60)

# ============================================
# LOAD PHASE
# ============================================
print("\n--- LOAD PHASE ---")

source_df = spark.read.table("space_exploration.stg_planets") \
    .filter(col("data_date") == data_date)

source_count = source_df.count()
print(f"Source rows: {source_count:,}")

if source_count == 0:
    print("No data. Exiting.")
    job.commit()
    sys.exit(0)

# CRITICAL: Read from PUBLISHED dim_system (main branch, not staging)
# This is why dim_planet must run AFTER dim_system publishes
dim_system = spark.read.table("space_exploration.dim_system") \
    .filter(col("current_flag") == "Y") \
    .select("system_key", "system_id")

print(f"dim_system lookup rows: {dim_system.count():,}")

# ============================================
# TRANSFORM PHASE
# ============================================
print("\n--- TRANSFORM PHASE ---")

# Check for existing data
try:
    current_dim = spark.read.table("space_exploration.dim_planet") \
        .filter(col("current_flag") == "Y")
    has_existing = current_dim.count() > 0
    max_key = current_dim.agg(max("planet_key")).collect()[0][0] or 0
except:
    has_existing = False
    max_key = 0

# Join source with dim_system to get surrogate key
source_with_sk = source_df.join(
    dim_system,
    source_df.system_id == dim_system.system_id,
    "left"
).select(
    source_df["*"],
    dim_system.system_key
)

# Track orphaned planets (no matching system)
orphaned = source_with_sk.filter(col("system_key").isNull()).count()
if orphaned > 0:
    print(f"WARNING: {orphaned} planets have no matching system!")

# SCD2 logic (simplified for first load demonstration)
window_spec = Window.orderBy("planet_id")

new_dim = source_with_sk.select(
    (row_number().over(window_spec) + max_key).alias("planet_key"),
    col("planet_id"),
    col("system_key"),
    col("planet_name"),
    col("planet_type"),
    col("orbital_period"),
    col("has_atmosphere"),
    col("habitability_score"),
    lit(data_date).cast("date").alias("effective_date"),
    lit("9999-12-31").cast("date").alias("expiration_date"),
    lit("Y").alias("current_flag"),
    current_timestamp().alias("load_timestamp"),
    lit("ELITE_DANGEROUS").alias("source_system"),
    year(lit(data_date)).alias("effective_year")
).distinct()

# (Full SCD2 change detection would follow same pattern as dim_system)

transform_count = new_dim.count()
print(f"Dimension rows: {transform_count:,}")

# ============================================
# WRITE PHASE
# ============================================
print("\n--- WRITE PHASE ---")

spark.sql("ALTER TABLE space_exploration.dim_planet CREATE BRANCH IF NOT EXISTS staging")

new_dim.writeTo("space_exploration.dim_planet") \
    .option("branch", "staging") \
    .overwritePartitions()

print("Written to staging branch")

# ============================================
# AUDIT PHASE
# ============================================
print("\n--- AUDIT PHASE ---")

staging_df = spark.read \
    .option("branch", "staging") \
    .table("space_exploration.dim_planet")

audit_failures = []

# Audit 1: PK uniqueness
staged_count = staging_df.count()
unique_keys = staging_df.select("planet_key").distinct().count()
if staged_count != unique_keys:
    audit_failures.append(f"FAIL: Duplicate planet_keys")
else:
    print(f"Audit 1 - PK uniqueness: PASS")

# Audit 2: FK integrity to dim_system (published!)
published_system_keys = spark.read.table("space_exploration.dim_system") \
    .select("system_key").distinct()

orphaned_fk = staging_df \
    .filter(col("system_key").isNotNull()) \
    .select("system_key").distinct() \
    .join(published_system_keys, "system_key", "left_anti") \
    .count()

if orphaned_fk > 0:
    audit_failures.append(f"FAIL: {orphaned_fk} orphaned system_keys")
    print(f"Audit 2 - FK integrity: FAIL")
else:
    print("Audit 2 - FK integrity: PASS")

# Audit 3: NULL check
null_count = staging_df.filter(
    col("planet_key").isNull() | col("planet_id").isNull()
).count()
if null_count > 0:
    audit_failures.append(f"FAIL: {null_count} NULLs in critical fields")
else:
    print("Audit 3 - NULL check: PASS")

# ============================================
# PUBLISH PHASE
# ============================================
print("\n--- PUBLISH PHASE ---")

if len(audit_failures) > 0:
    print("\nAUDIT FAILED:")
    for f in audit_failures:
        print(f"  {f}")
    spark.sql("ALTER TABLE space_exploration.dim_planet DROP BRANCH staging")
    raise Exception(f"Audit failed")
else:
    print("\nALL AUDITS PASSED")
    spark.sql("""
        CALL system.fast_forward(
            table => 'space_exploration.dim_planet',
            branch => 'main',
            to => 'staging'
        )
    """)
    spark.sql("ALTER TABLE space_exploration.dim_planet DROP BRANCH staging")
    print(f"PUBLISHED: dim_planet ({staged_count:,} rows)")

job.commit()

Job 4: Build Fact Table (Incremental with Delete-Insert)

This is the critical job. It reads from published dimensions (main branch), uses delete-then-insert pattern for the date partition, and includes left anti-join as a defensive check for missing records.

Click to expand: build_fact_system_updates Glue Job

This job loads facts incrementally by date. For a given data_date, it deletes existing records and inserts new ones. It validates FK integrity against all published dimensions before merging to main.

# Glue Job: build_fact_system_updates
# Incremental load with delete-then-insert + left anti-join defensive coding

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import *
from pyspark.sql.window import Window

args = getResolvedOptions(sys.argv, ['JOB_NAME', 'DATA_DATE'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

data_date = args['DATA_DATE']
print("="*60)
print(f"Building Fact Table: System Updates (Incremental)")
print(f"Data Date: {data_date}")
print("="*60)

# ============================================
# LOAD PHASE
# ============================================
print("\n--- LOAD PHASE ---")

# Read from PSA for this data_date
source_df = spark.read.table("space_exploration.stg_updates") \
    .filter(col("data_date") == data_date)

source_count = source_df.count()
print(f"Source rows: {source_count:,}")

if source_count == 0:
    print("No data. Exiting.")
    job.commit()
    sys.exit(0)

# Read PUBLISHED dimensions (main branch)
# Fact tables MUST read from published dimensions, not staging branches
dim_system = spark.read.table("space_exploration.dim_system") \
    .filter(col("current_flag") == "Y") \
    .select("system_key", "system_id")

dim_date = spark.read.table("space_exploration.dim_date") \
    .select("date_key", "full_date", "year")

print(f"dim_system rows: {dim_system.count():,}")
print(f"dim_date rows: {dim_date.count():,}")

# Read existing fact table for incremental logic
try:
    existing_fact = spark.read.table("space_exploration.fact_system_updates")
    existing_count = existing_fact.count()
    print(f"Existing fact rows: {existing_count:,}")
    max_key = existing_fact.agg(max("update_key")).collect()[0][0] or 0
except:
    print("No existing fact data (first load)")
    existing_fact = None
    max_key = 0

# ============================================
# TRANSFORM PHASE
# ============================================
print("\n--- TRANSFORM PHASE ---")

# Add date column for joining
source_with_date = source_df.withColumn(
    "update_date",
    to_date("update_timestamp")
)

# Lookup surrogate keys from published dimensions
fact_transformed = source_with_date \
    .join(dim_system, source_with_date.system_id == dim_system.system_id, "left") \
    .join(dim_date, source_with_date.update_date == dim_date.full_date, "left") \
    .select(
        col("date_key"),
        dim_system.system_key,
        source_with_date.update_timestamp,
        col("status"),
        col("ships_present"),
        col("resource_level"),
        lit(data_date).cast("date").alias("data_date"),
        dim_date.year.alias("date_year")
    )

# Track orphaned facts
orphaned_systems = fact_transformed.filter(col("system_key").isNull()).count()
orphaned_dates = fact_transformed.filter(col("date_key").isNull()).count()

if orphaned_systems > 0:
    print(f"WARNING: {orphaned_systems} facts have no matching system")
if orphaned_dates > 0:
    print(f"WARNING: {orphaned_dates} facts have no matching date")

# ============================================
# INCREMENTAL LOGIC: Delete-then-Insert + Left Anti-Join
# ============================================
print("\n--- INCREMENTAL LOGIC ---")

# Extract the date partition we're loading
date_partition_year = int(data_date[:4])  # e.g., 2026

if existing_fact is not None:
    # Step 1: Get existing records for this data_date
    existing_for_date = existing_fact.filter(col("data_date") == data_date)
    existing_for_date_count = existing_for_date.count()
    print(f"Existing records for data_date={data_date}: {existing_for_date_count:,}")

    # Step 2: Records NOT in this date (keep as-is)
    records_to_keep = existing_fact.filter(col("data_date") != data_date)
    keep_count = records_to_keep.count()
    print(f"Records to keep (other dates): {keep_count:,}")

    # Step 3: New records from source (delete-then-insert for this date)
    # Generate new surrogate keys starting after max existing key
    window_spec = Window.orderBy("update_timestamp", "system_key")

    new_records = fact_transformed.select(
        (row_number().over(window_spec) + max_key).alias("update_key"),
        col("date_key"),
        col("system_key"),
        col("update_timestamp"),
        col("status"),
        col("ships_present"),
        col("resource_level"),
        current_timestamp().alias("load_timestamp"),
        lit("ELITE_DANGEROUS").alias("source_system"),
        col("data_date"),
        col("date_year")
    )

    new_count = new_records.count()
    print(f"New records for data_date={data_date}: {new_count:,}")

    # Step 4: Left anti-join defensive check
    # Find any records in destination that should exist but don't
    # (This catches edge cases where source has records not in our transform)
    # For this example, we're doing a full replace for the date, so this is informational

    # Step 5: Combine: kept records + new records (effectively delete-insert)
    final_fact = records_to_keep.unionByName(new_records)

else:
    # First load: all records are new
    window_spec = Window.orderBy("update_timestamp", "system_key")

    final_fact = fact_transformed.select(
        row_number().over(window_spec).alias("update_key"),
        col("date_key"),
        col("system_key"),
        col("update_timestamp"),
        col("status"),
        col("ships_present"),
        col("resource_level"),
        current_timestamp().alias("load_timestamp"),
        lit("ELITE_DANGEROUS").alias("source_system"),
        col("data_date"),
        col("date_year")
    )

transform_count = final_fact.count()
print(f"Final fact rows: {transform_count:,}")

# ============================================
# WRITE PHASE
# ============================================
print("\n--- WRITE PHASE ---")

spark.sql("ALTER TABLE space_exploration.fact_system_updates CREATE BRANCH IF NOT EXISTS staging")

final_fact.writeTo("space_exploration.fact_system_updates") \
    .option("branch", "staging") \
    .overwritePartitions()

print("Written to staging branch")

# ============================================
# AUDIT PHASE
# ============================================
print("\n--- AUDIT PHASE ---")

staging_df = spark.read \
    .option("branch", "staging") \
    .table("space_exploration.fact_system_updates")

audit_failures = []

# Audit 1: Row count (source to target for this date)
staged_for_date = staging_df.filter(col("data_date") == data_date).count()
print(f"Audit 1 - Row count for {data_date}: Source={source_count:,}, Target={staged_for_date:,}")
if source_count != staged_for_date:
    # This might be OK if we filtered orphans, but flag for review
    print(f"  Note: Difference of {source_count - staged_for_date} (likely orphaned records)")

# Audit 2: PK uniqueness
staged_count = staging_df.count()
unique_keys = staging_df.select("update_key").distinct().count()
if staged_count != unique_keys:
    audit_failures.append(f"FAIL: Duplicate update_keys")
    print(f"Audit 2 - PK uniqueness: FAIL")
else:
    print(f"Audit 2 - PK uniqueness: PASS ({staged_count:,} unique)")

# Audit 3: FK integrity to dim_system
published_system_keys = spark.read.table("space_exploration.dim_system") \
    .select("system_key").distinct()

orphaned_fk_system = staging_df \
    .filter(col("system_key").isNotNull()) \
    .select("system_key").distinct() \
    .join(published_system_keys, "system_key", "left_anti") \
    .count()

if orphaned_fk_system > 0:
    audit_failures.append(f"FAIL: {orphaned_fk_system} orphaned system_keys")
    print(f"Audit 3 - FK to dim_system: FAIL")
else:
    print("Audit 3 - FK to dim_system: PASS")

# Audit 4: FK integrity to dim_date
published_date_keys = spark.read.table("space_exploration.dim_date") \
    .select("date_key").distinct()

orphaned_fk_date = staging_df \
    .filter(col("date_key").isNotNull()) \
    .select("date_key").distinct() \
    .join(published_date_keys, "date_key", "left_anti") \
    .count()

if orphaned_fk_date > 0:
    audit_failures.append(f"FAIL: {orphaned_fk_date} orphaned date_keys")
    print(f"Audit 4 - FK to dim_date: FAIL")
else:
    print("Audit 4 - FK to dim_date: PASS")

# Audit 5: No NULLs in mandatory FKs
# (We allow NULLs for orphaned records, but track them)
null_system = staging_df.filter(col("system_key").isNull()).count()
null_date = staging_df.filter(col("date_key").isNull()).count()
print(f"Audit 5 - NULL FKs: {null_system} null system_key, {null_date} null date_key")

# Audit 6: Value range checks
invalid_resources = staging_df.filter(
    (col("resource_level") < 0) | (col("resource_level") > 1)
).count()
if invalid_resources > 0:
    audit_failures.append(f"FAIL: {invalid_resources} invalid resource_level values")
    print(f"Audit 6 - Value ranges: FAIL")
else:
    print("Audit 6 - Value ranges: PASS")

# Audit 7: No future timestamps
future_ts = staging_df.filter(col("update_timestamp") > current_timestamp()).count()
if future_ts > 0:
    audit_failures.append(f"FAIL: {future_ts} future timestamps")
    print(f"Audit 7 - Timestamps: FAIL")
else:
    print("Audit 7 - Timestamps: PASS")

# ============================================
# PUBLISH PHASE
# ============================================
print("\n--- PUBLISH PHASE ---")

if len(audit_failures) > 0:
    print("\nAUDIT FAILED:")
    for f in audit_failures:
        print(f"  {f}")
    spark.sql("ALTER TABLE space_exploration.fact_system_updates DROP BRANCH staging")
    raise Exception(f"Audit failed with {len(audit_failures)} errors")
else:
    print("\nALL AUDITS PASSED")
    spark.sql("""
        CALL system.fast_forward(
            table => 'space_exploration.fact_system_updates',
            branch => 'main',
            to => 'staging'
        )
    """)
    spark.sql("ALTER TABLE space_exploration.fact_system_updates DROP BRANCH staging")
    print(f"PUBLISHED: fact_system_updates ({staged_count:,} rows)")

job.commit()

Step Functions Workflow


The Step Functions workflow orchestrates everything. Here’s the execution flow:

  1. Stage 1 (Parallel): dim_date and dim_system run simultaneously—neither depends on the other
  2. Stage 2: dim_planet runs after Stage 1 completes (needs dim_system’s surrogate keys)
  3. Stage 3: fact_system_updates runs last (needs all dimension surrogate keys)

If any job fails, the workflow stops and sends an SNS notification.

Click to expand: Step Functions JSON Definition
{
  "Comment": "Dimensional Modeling Pipeline - Per-Table WAP Pattern",
  "StartAt": "Parallel Dimensions Stage 1",
  "States": {
    "Parallel Dimensions Stage 1": {
      "Type": "Parallel",
      "Comment": "dim_date and dim_system have no dependencies",
      "Branches": [
        {
          "StartAt": "Build dim_date (WAP)",
          "States": {
            "Build dim_date (WAP)": {
              "Type": "Task",
              "Resource": "arn:aws:states:::glue:startJobRun.sync",
              "Parameters": {
                "JobName": "build_dim_date"
              },
              "End": true
            }
          }
        },
        {
          "StartAt": "Build dim_system (WAP)",
          "States": {
            "Build dim_system (WAP)": {
              "Type": "Task",
              "Resource": "arn:aws:states:::glue:startJobRun.sync",
              "Parameters": {
                "JobName": "build_dim_system",
                "Arguments": {
                  "--DATA_DATE.$": "$.data_date"
                }
              },
              "End": true
            }
          }
        }
      ],
      "Next": "Build dim_planet (WAP)",
      "Catch": [
        {
          "ErrorEquals": ["States.ALL"],
          "Next": "Notify Failure",
          "ResultPath": "$.error"
        }
      ]
    },
    "Build dim_planet (WAP)": {
      "Type": "Task",
      "Comment": "Depends on dim_system being published",
      "Resource": "arn:aws:states:::glue:startJobRun.sync",
      "Parameters": {
        "JobName": "build_dim_planet",
        "Arguments": {
          "--DATA_DATE.$": "$.data_date"
        }
      },
      "Next": "Build fact_system_updates (WAP)",
      "Catch": [
        {
          "ErrorEquals": ["States.ALL"],
          "Next": "Notify Failure",
          "ResultPath": "$.error"
        }
      ]
    },
    "Build fact_system_updates (WAP)": {
      "Type": "Task",
      "Comment": "Depends on ALL dimensions being published",
      "Resource": "arn:aws:states:::glue:startJobRun.sync",
      "Parameters": {
        "JobName": "build_fact_system_updates",
        "Arguments": {
          "--DATA_DATE.$": "$.data_date"
        }
      },
      "Next": "Pipeline Succeeded",
      "Catch": [
        {
          "ErrorEquals": ["States.ALL"],
          "Next": "Notify Failure",
          "ResultPath": "$.error"
        }
      ]
    },
    "Notify Failure": {
      "Type": "Task",
      "Resource": "arn:aws:states:::sns:publish",
      "Parameters": {
        "TopicArn": "arn:aws:sns:ap-southeast-2:ACCOUNT:pipeline-alerts",
        "Message.$": "States.Format('Pipeline failed: {}', $.error.Cause)",
        "Subject": "Data Pipeline Failed"
      },
      "Next": "Pipeline Failed"
    },
    "Pipeline Succeeded": {
      "Type": "Succeed"
    },
    "Pipeline Failed": {
      "Type": "Fail",
      "Error": "PipelineError",
      "Cause": "One or more jobs failed - check individual job logs"
    }
  }
}

Start the workflow with input:

{
  "data_date": "2026-01-17"
}

This data_date propagates to each job, enabling:

  • PSA partition reads (WHERE data_date = '2026-01-17')
  • Historical reprocessing (re-run with a past date)
  • Dependency triggers (S3 event on partition write triggers workflow)


Dependency Triggers with S3 Events


You can improve this further by using partition writes as triggers. Here’s how to set that up:

Option 1: S3 Event → EventBridge → Step Functions

When a file lands in landing/updates/, trigger the pipeline:

# EventBridge Rule
{
  "source": ["aws.s3"],
  "detail-type": ["Object Created"],
  "detail":
    {
      "bucket": { "name": ["space-data-pipeline-demo"] },
      "object": { "key": [{ "prefix": "landing/updates/" }] },
    },
}

Option 2: Glue Trigger on Partition

Configure Glue to watch for new partitions:

# Check if partition exists before running
partition_path = f"s3://space-data-pipeline-demo/staging/stg_updates/data_date={data_date}/"

import boto3
s3 = boto3.client('s3')

try:
    s3.head_object(
        Bucket='space-data-pipeline-demo',
        Key=f'staging/stg_updates/data_date={data_date}/_SUCCESS'
    )
    print(f"Partition exists - proceeding")
except:
    print(f"Partition not ready - exiting")
    sys.exit(0)

This lets you build dependency chains based on partition availability.



The Moment of Truth: Querying


Now let’s query the properly partitioned dimensional model:

-- Query leverages date partition pruning
SELECT
    dt.year,
    dt.quarter,
    d.main_star_type,
    COUNT(*) as updates,
    SUM(f.ships_present) as total_ships,
    ROUND(AVG(f.resource_level), 3) as avg_resources
FROM
    space_exploration.fact_system_updates f

    LEFT JOIN space_exploration.dim_system d
    ON f.system_key = d.system_key
    AND d.current_flag = 'Y'  -- Only current dimension records!

    LEFT JOIN space_exploration.dim_date dt
    ON f.date_key = dt.date_key
WHERE
    dt.year = 2026  -- Partition pruning!
    AND dt.quarter = 1
GROUP BY
    dt.year, dt.quarter, d.main_star_type
ORDER BY
    total_ships DESC;

Notice d.current_flag = 'Y' — this is how you query SCD Type 2 dimensions for current state. For point-in-time analysis:

-- Point-in-time analysis: What was system status on Jan 15, 2026?
SELECT
    d.system_name,
    d.exploration_status,
    d.effective_date,
    d.expiration_date
FROM
    space_exploration.dim_system d
WHERE
    d.system_id = 'SOL'
    AND DATE '2026-01-15' BETWEEN d.effective_date AND d.expiration_date;


A Few Things Worth Remembering


WAP at each table, not just at the end. Each job should do its own Write-Audit-Publish cycle. Failures are isolated. Downstream jobs read from published (main) tables.

Landing is raw, Staging is persistent. Landing = 1:1 from source, latest only. Staging (PSA) = partitioned by data_date for historical reprocessing.

dim_date is a reference dimension. Load it once with a wide date range. Partition by year. Rarely needs updates.

SCD Type 2 for dimensions that change. Add effective_date, expiration_date, current_flag. Partition by effective year. Query with current_flag = 'Y' for current state.

Incremental facts with delete-then-insert. For a given date partition: delete existing, insert new. Use left anti-join as defensive check for missing records.

Partition strategically:

  • dim_date: by year
  • dim_system/planet: by effective_year (SCD2)
  • fact: by date_year

Use data_date for dependencies. When stg_updates/data_date=2026-01-17/ is written, trigger the downstream pipeline. This makes dependencies explicit and reprocessing easy.


That ten-minute query? Down to 1.2 seconds with partition pruning.

That conference room moment? Each job’s WAP cycle catches issues before they propagate.

These patterns work. They’ve saved me from embarrassment, late nights, and angry emails. They’ll do the same for you.