Introduction: Why Safe Data Pipelines Matter

In the world of data engineering, there’s a constant challenge we all face: how do we ensure our production data remains reliable and error-free when deploying updates? Anyone who’s experienced the cold sweat of a bad deployment affecting critical business data knows this pain all too well.

Enter the Write-Audit-Publish pattern—a robust approach that can significantly reduce the risk of data pipeline failures. This pattern, which shares DNA with the well-known Blue-Green deployment strategy from software engineering, creates a safety net that can save your team countless hours of troubleshooting and emergency fixes.

In this article, we’ll explore how the WAP pattern works, why it’s valuable, and I’ll share a practical implementation using Apache Airflow that you can adapt for your own data pipelines.



Understanding the Write-Audit-Publish Pattern


The WAP pattern is elegantly simple in concept, yet powerful in practice. It involves three distinct stages:

  1. Write: Create or generate the new data in a staging environment, separate from production.
  2. Audit: Verify the data integrity, completeness, and correctness through automated tests.
  3. Publish: Only after successful validation, move the data to production.

Here’s a visual representation of how the pattern works:

WAP

The beauty of this approach is that your production data remains untouched until you’ve confirmed that the new data meets your quality standards. This drastically reduces the risk of introducing bad data into your production environment.



When to Use the WAP Pattern


The WAP pattern is particularly valuable when:

  • Your data pipelines feed critical business operations
  • You have complex transformations that could potentially introduce errors
  • You need to ensure downstream systems aren’t affected by bad data
  • You’re working with high-volume data where manual verification isn’t feasible
  • You want to implement continuous deployment for your data pipelines

While it adds some overhead to your pipeline process, the benefits in terms of reliability and reduced production incidents make it worthwhile for most data teams.



Implementing WAP in Apache Airflow


Let’s look at a practical implementation of the WAP pattern using Apache Airflow. The example below shows a reusable approach that can be applied to multiple tables in your data warehouse.

Core Components of the Implementation

Our Airflow DAG consists of several key components:

  1. A TaskGroup generator that creates the Write-Audit-Publish sequence for any given table
  2. Python functions handling each step of the process
  3. Proper error handling and logging throughout the pipeline

Here’s a simplified version of the core implementation:

import logging
from airflow import DAG
import pendulum
from airflow.operators.empty import EmptyOperator
from airflow.utils.task_group import TaskGroup
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

# Setup your Airflow DAG
local_tz = pendulum.timezone("Australia/Melbourne")

with DAG(
    dag_id="wap_pattern_example",
    description="Write-Audit-Publish Pattern Example",
    start_date=pendulum.datetime(year=2025, month=5, day=18, tz=local_tz),
    schedule_interval=None,
    catchup=False,
    is_paused_upon_creation=True,
    max_active_runs=1,
    tags=["WAP", "DataQuality"],
    default_args={
        "retries": 3,
        "retry_delay": timedelta(minutes=3),
        'depends_on_past': True,
        'wait_for_downstream': False
    }
) as dag:

    # Define start and end points
    start = EmptyOperator(task_id="start")
    end = EmptyOperator(task_id="end")

    def write_table(output_table, production_table):
        """Create a copy of the production table in a staging environment"""
        # Your database-specific implementation here
        logging.info(f"Creating staging table {output_table}")
        # Example implementation:
        execute_query(f"DROP TABLE IF EXISTS {output_table}")
        execute_query(f"CREATE TABLE {output_table} AS SELECT * FROM {production_table}")

    def audit_table(output_table, production_table):
        """Run data quality checks on the staging table"""
        logging.info(f"Starting audit for tables: {production_table} and {output_table}")

        # Example check: Verify row counts match
        prod_count_query = f"SELECT COUNT(*) FROM {production_table}"
        output_count_query = f"SELECT COUNT(*) FROM {output_table}"

        prod_count_result = execute_query(prod_count_query)
        output_count_result = execute_query(output_count_query)

        # Extract row count values
        prod_count = prod_count_result[0][0]  # First row, first column
        output_count = output_count_result[0][0]  # First row, first column

        if prod_count != output_count:
            logging.error(f"AUDIT FAILED! Row count mismatch: {production_table}={prod_count}, {output_table}={output_count}")
            raise ValueError(f"AUDIT FAILED! Row count mismatch: {production_table}={prod_count}, {output_table}={output_count}")

        # Add additional data quality checks here
        # For example: null checks, data type validation, business rule validation

        logging.info(f"AUDIT PASSED: {production_table} ({prod_count} rows) matches {output_table} ({output_count} rows).")

    def publish_table(output_table, production_table):
        """Move data from staging to production with backup"""
        # Get current date in YYYYMMDD format
        current_date = datetime.now().strftime("%Y%m%d")

        # Extract the base table name from production_table
        table_name = production_table.split('"')[-2]

        # Create backup of current production table
        backup_table = f'"schema"."backup"."{table_name}_BKUP_{current_date}"'
        logging.info(f"Backing up {production_table} to {backup_table}")

        execute_query(f"DROP TABLE IF EXISTS {backup_table}")
        execute_query(f"CREATE TABLE {backup_table} AS SELECT * FROM {production_table}")

        # Move output table to production
        execute_query(f"DROP TABLE IF EXISTS {production_table}")
        execute_query(f"CREATE TABLE {production_table} AS SELECT * FROM {output_table}")
        execute_query(f"DROP TABLE IF EXISTS {output_table}")

        logging.info(f"Published {output_table} as {production_table}")

    def create_wap_task_group(table_name):
        """Creates a TaskGroup for the Write-Audit-Publish process for a given table."""

        OUTPUT_TABLE = f'"schema"."staging"."{table_name}_output"'
        PRODUCTION_TABLE = f'"schema"."production"."{table_name}"'

        with TaskGroup(group_id=f"{table_name}_wap") as wap_group:

            # Step 1: Write Task
            write_task = PythonOperator(
                task_id=f"write_{table_name}",
                python_callable=write_table,
                op_kwargs={
                    "output_table": OUTPUT_TABLE,
                    "production_table": PRODUCTION_TABLE
                }
            )

            # Step 2: Audit Task
            audit_task = PythonOperator(
                task_id=f"audit_{table_name}",
                python_callable=audit_table,
                op_kwargs={
                    "output_table": OUTPUT_TABLE,
                    "production_table": PRODUCTION_TABLE
                }
            )

            # Step 3: Publish Task
            publish_task = PythonOperator(
                task_id=f"publish_{table_name}",
                python_callable=publish_table,
                op_kwargs={
                    "output_table": OUTPUT_TABLE,
                    "production_table": PRODUCTION_TABLE
                }
            )

            # Define dependencies within the group
            write_task >> audit_task >> publish_task

        return wap_group

    # Create WAP Task Groups for multiple tables
    customer_wap = create_wap_task_group("dim_customer")
    orders_wap = create_wap_task_group("fct_orders")

    # Define overall DAG structure
    start >> [customer_wap, orders_wap] >> end

This implementation provides a reusable framework for applying the WAP pattern to any table in your data warehouse. You can extend the audit step with additional data quality checks specific to your business requirements.



Key Implementation Details


The Write Step

The write step creates a copy of the production data in a staging environment. This allows us to work with the data without affecting production. In our implementation, we’re creating a new table with the same structure and data as the production table.

def write_table(output_table, production_table):
    execute_query(f"DROP TABLE IF EXISTS {output_table}")
    execute_query(f"CREATE TABLE {output_table} AS SELECT * FROM {production_table}")

Depending on your specific requirements, you might:

  • Apply transformations to the data
  • Merge multiple data sources
  • Enrich the data with additional information
  • Run your dbt models to generate the output

The Audit Step

The audit step is where we verify the quality of our data before it goes to production. In our example, we’re doing a simple row count check, but in practice, you’d want to implement more comprehensive tests:

def audit_table(output_table, production_table):
    # Basic row count check
    prod_count_query = f"SELECT COUNT(*) FROM {production_table}"
    output_count_query = f"SELECT COUNT(*) FROM {output_table}"

    prod_count_result = execute_query(prod_count_query)
    output_count_result = execute_query(output_count_query)

    prod_count = prod_count_result[0][0]
    output_count = output_count_result[0][0]

    if prod_count != output_count:
        raise ValueError(f"AUDIT FAILED! Row count mismatch")

Here are some additional audit checks you might want to implement:

  • Column-level validation: Check data types, ranges, and constraints
  • Business rule validation: Ensure the data meets specific business requirements
  • Anomaly detection: Identify unusual patterns or outliers
  • Referential integrity: Verify relationships between tables
  • Completeness checks: Ensure all expected data is present

If you’re using dbt, you can leverage its testing framework to run detailed tests on your models before promoting them to production.

The Publish Step

The publish step is where we move the validated data to production. A key aspect of our implementation is creating a backup of the current production data before replacing it:

def publish_table(output_table, production_table):
    # Backup current production
    current_date = datetime.now().strftime("%Y%m%d")
    backup_table = f'"schema"."backup"."{table_name}_BKUP_{current_date}"'

    execute_query(f"DROP TABLE IF EXISTS {backup_table}")
    execute_query(f"CREATE TABLE {backup_table} AS SELECT * FROM {production_table}")

    # Replace production with new data
    execute_query(f"DROP TABLE IF EXISTS {production_table}")
    execute_query(f"CREATE TABLE {production_table} AS SELECT * FROM {output_table}")

This backup provides an additional safety net, allowing you to quickly restore the previous state if issues are discovered after deployment.



Advanced WAP Patterns


While the basic WAP pattern provides significant benefits, there are several advanced variations you can implement for additional robustness:

Incremental WAP

Instead of processing the entire dataset, you can implement an incremental approach that only processes new or changed data:

def write_incremental(output_table, production_table, incremental_key, last_run_date):
    """Create a staging table with only new or changed data"""
    execute_query(f"""
        CREATE TABLE IF NOT EXISTS {output_table} AS
        SELECT * FROM {production_table}
        WHERE {incremental_key} > '{last_run_date}'
    """)

This approach is particularly valuable for large datasets where processing the entire dataset would be time-consuming and resource-intensive.

Parallel WAP

For complex data pipelines with multiple interdependent tables, you can implement a parallel WAP pattern where multiple tables are processed simultaneously

This approach can significantly reduce the overall execution time of your pipeline while maintaining the safety benefits of the WAP pattern.

Validation-Heavy WAP

For critical data where accuracy is paramount, you can implement a validation-heavy WAP pattern with multiple layers of checks:

  1. Schema validation: Ensure the table structure matches expectations
  2. Data type validation: Verify data types for each column
  3. Data quality validation: Check for nulls, duplicates, and other quality issues
  4. Business rule validation: Ensure business-specific rules are met
  5. Cross-table validation: Verify relationships between tables
  6. Historical trend validation: Compare with historical data to detect anomalies


Integration with Modern Data Stack Tools


The WAP pattern can be integrated with various tools in the modern data stack:

dbt Integration

If you’re using dbt for your transformations, you can implement the WAP pattern by:

  1. Using dbt to create your models in a staging schema
  2. Running dbt tests to validate the models
  3. Using a custom operation to move the validated models to production
def dbt_wap_process():
    # Run dbt models in staging
    subprocess.run(["dbt", "run", "--target", "staging"])

    # Run dbt tests
    test_result = subprocess.run(["dbt", "test", "--target", "staging"])

    # If tests pass, promote to production
    if test_result.returncode == 0:
        subprocess.run(["dbt", "run", "--target", "production"])

Great Expectations Integration

If you’re using Great Expectations for data quality, you can integrate it into the audit step:

def audit_with_great_expectations(output_table):
    context = ge.data_context.DataContext()
    batch = context.get_batch({
        "datasource": "warehouse",
        "table": output_table
    })

    results = context.run_validation(
        batch,
        "my_expectation_suite"
    )

    if not results["success"]:
        raise ValueError("Data quality validation failed")


Best Practices for Implementing WAP


Based on my experience implementing the WAP pattern, here are some best practices to consider:

1. Make Audits Comprehensive

Your audit step is the gatekeeper that prevents bad data from reaching production. Make it as comprehensive as possible:

  • Validate data structure (schema, data types)
  • Check data quality (nulls, duplicates, outliers)
  • Verify business rules and constraints
  • Compare with historical data for anomalies

2. Implement Proper Error Handling

Ensure your implementation has robust error handling and clear logging:

try:
    # Run audit checks
    audit_table(output_table, production_table)
except Exception as e:
    logging.error(f"Audit failed: {str(e)}")
    # Send alerts to appropriate channels
    send_alert(f"WAP process failed during audit: {str(e)}")
    raise

3. Create Meaningful Backups

Always create backups before publishing to production:

  • Include timestamps in backup table names
  • Consider keeping multiple historical backups
  • Implement a cleanup policy to manage backup storage

4. Monitor Pipeline Performance

Track the performance of your WAP pipeline to identify bottlenecks:

  • Monitor execution time for each step
  • Track resource usage (memory, CPU)
  • Identify slow-running queries or operations

5. Implement Alerting

Set up alerts to notify when issues occur:

  • Alert on audit failures
  • Alert on performance degradation
  • Alert on unexpected data patterns


Conclusion: Building Resilient Data Pipelines

The Write-Audit-Publish pattern provides a robust framework for building resilient data pipelines that protect your production data from errors. By implementing this pattern in Apache Airflow, you can create a systematic approach to data validation and deployment that scales across your data warehouse.

The investment in setting up a WAP pipeline pays dividends in reduced production incidents, improved data quality, and increased confidence in your data systems. As data continues to drive critical business decisions, implementing patterns like WAP becomes essential for maintaining trust in our data infrastructure.




Additional Resources