Embracing Defensive Engineering: A Proactive Approach to Data Pipeline Integrity
Introduction
Have you ever had a data pipeline fall apart due to unexpected errors? In the ever-evolving landscape of data, surprises lurk around every corner. Defensive engineering, a methodology focused on preempting and mitigating data anomalies in data pipelines, plays a crucial role in building reliable data pipelines. It’s not just about fixing problems as they arise; it’s about anticipating potential issues and addressing them before they wreak havoc.
Below I’ll explore the various facets of defensive engineering, from the basics of handling nulls and type mismatches to the more complex challenges of ensuring data integrity and handling late-arriving data. Whether you’re a seasoned data engineer or just starting out, understanding these principles is key to creating data pipelines that are not just functional, but also robust and secure in the face of unpredictable data challenges.
Handling Unknown Attributes
In the absence of information can be just as troublesome as incorrect information. It normally involves investigation - “is this correct?” or “is it a issue with the data?”
Unknown values can introduce challenges such as:
- Ambiguity: They can disrupt calculations, requiring special handing in aggregations and other operations
- Filtering: Their behaviour during filtering options might be unexpected requiring explicit choices based on your data context.
- Joins: Incorrect results can occur when joining datasets if nulls are not accounted for thoughtfully.
Typically, a systematic approach to unknown attributes, and how they are handled is a good starting point.
Establish standard representation for different types of unknown data like “Not Applicable”,“Not Provided”, “Unknown”, “Error” which helps us form standard approach across the data, which means people have a pre defined set of rules and understand whats happening with the data without more investigation. It also allows joins to occur on these values, filters etc to work on a expected behaviour.
Tolerating Unknown Attributes
One way of handling this is with the COALESCE or IFNULL transformation logic. So if it isn’t known we could transform it like below.
SELECT COALESCE(column_name, 'Unknown') AS column_name
FROM table_name;
This query will replace any null values in column_name with the word ‘Unknown’.
We could also enhance this where there is a blank string, and replace with “Not Provided” or “Not Applicable”.
SELECT COALESCE(NULLIF(trim(column_name),''), 'Unknown') AS column_name
FROM table_name;
or
SELECT
CASE
WHEN trim(column_name) = '' THEN 'Not Provided'
ELSE COALESCE(column_name, 'Unknown')
END AS column_name
FROM table_name;
One important element here is consistency. As if you start having unknown, Unknown, UNKNOWN value types your still going to have the same issues as we are trying to avoid. Utilising functions, or macros will help.
{% macro clean_column(column_name, table_name) %}
SELECT
CASE WHEN trim({{ column_name }}) = '' THEN 'Not Provided' ELSE COALESCE({{ column_name }}, 'Unknown')
END AS {{ column_name }}
FROM
{{ table_name }}
{% endmacro %}
Then within the models code it would be
{{ clean_column('your_column_name', 'your_table_name') }}
Not Tolerating Unknown Attributes
In the scenario where Unknown Attributes are not tolerated. We can implement a data validation check or enforce a data contract that alerts you when nulls occur, allowing steps to be taken to investigate and address the underlying cause.
# schema.yml
version: 2
models:
- name: your_model
columns:
- name: column_name
tests:
- not_null
This DBT test will fail if there are any null values in column_name, which can be used to trigger alerts in your data pipeline workflow.
Remember, the key to effectively handling nulls lies in a thorough understanding of your data and its context. There’s no one-size-fits-all solution; the best approach depends on the specific needs and goals of your project or analysis, coupled with the need for consistent handling of unknown attributes to maintain data integrity over time.
Datatype Mismatch Handling
Data pipelines are vulnerable to disruptive effects of data type mismatches. These inconsistencies, where over time a column or attribute will change in values.
The most common source is changes in schemas, or errors introduced within code changes.
However we can prevent this with proactive type mismatch handling.
Some of the strategies for handling this are:
Explicit Casting: Utilising functions like CAST and CONVERT, to transform the data into the required field before proceeding. This is a great way to enhance your code as a living document, as not only do you have the Source Table having a data type, but also during your transform, your are converting it to that contract, or pre-agreed data type. So if the Source Table changes outside of you control, you still can see what it was intended to be in the Transformation. Additionally it might auto resolve in the case of transforming a String to a Integer.
SELECT CAST(column_name AS INTEGER) FROM table_name;
Error Handling with Grace: Employ TRY…CATCH blocks to gracefully handle the data type mismatch errors. This allows you to log the issue, attempt a conversion. Giving you the ability to investigate at a later stage. DBT you could run a generic test like below:
{% macro test_valid_email(model) %}
select *
from {{ model }}
where not regexp_like(column_name, '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$')
{% endmacro %}
models:
- name: your_model_name
columns:
- name: email_column
tests:
- valid_email
Data Validation and Contracts: Proactively prevent type mismatch by establishing data validation rules and data contract within your code. Tools like DBT and Great Expectations allow you to test that incoming data conforms with what is expected.
version: 2
models:
- name: your_model_name
columns:
- name: column_name
data_type: integer
An alternative is to run below within your models
select
column_name::integer as casted_column_name
from
{{ ref('your_source_model') }}
The method you implement would really depend on your specific pipeline requirements and goals. Some data might need more enforced approach, where as some pipeline might be ok with a softer enforcement.
Regardless of which way you choose, it is really important to data profile before you build your data pipelines, so you can understand the behaviour of the data, and you know which way to pursue.
Handling Reprocessed Data
In data pipeline, sometimes errors, or late data occurs. Where we sometimes might need to rerun a portion of code. A Naive rerun could introduce duplication of records, compromising the integrity of the data.
There are steps we can take to mitigate this, lets have a look.
Common reasons for a Re-run could be:
Upstream Errors: If a data source contained incorrect data, upon correction you may need to reload the affected data.
Pipeline Failures: Unexpected interruptions in your data pipeline might require restarting a load from a previous point.
Backfilling: When incorporating previously unavailable data or applying new transformation logic, reruns might be necessary
My go to logic for handling this will typically be below.
INSERT INTO target_table
SELECT *
FROM
(
SELECT
LOAD_DATETIME
,CAST(BUSINESS_KEY AS NVARCHAR(100)) AS BUSINESS_KEY
,DATA_PAYLOAD
FROM
source_table as src
) src
LEFT JOIN target_table as tgt
ON tgt.BUSINESS_KEY = main.BUSINESS_KEY
AND tgt.LOAD_DATETIME = main.LOAD_DATETIME
WHERE
tgt.BUSINESS_KEY IS NULL /* doesn't exist in target table */
Using the Key field, for a load_date we are able to insure we are only inserting records that haven’t been inserted already.
There are alternatives like using MERGE statements, that give us more traceability on what has occurred, however can also incur more performance overhead by providing this operation.
MERGE INTO target_table AS tgt
USING source_table AS src
ON tgt.BUSINESS_KEY = src.BUSINESS_KEY
AND tgt.LOAD_DATETIME = src.LOAD_DATETIME
WHEN MATCHED THEN
-- Update logic if the record needs modification
WHEN NOT MATCHED THEN
-- Insert logic for new records
Additionally to these methods, you can also create a hash value (unique fingerprint) based off the “Data Payload” or a combination of columns (not including the key) which can be used to calculate the hash of the incoming record. This can have the benefit of just comparing 1 value or column, rather than the overhead of scanning and comparing multiple columns.
Handling Division in SQL
Databases mirror mathematical principles; in both, dividing by zero is undefined. That means your database doesn’t have a proper value to represent the result, leading to errors. Often, your pipeline will halt with a clear message, but in some cases, you might get unpredictable consequences depending on how your database deals with undefined operations.
The Power of NULLIF: The NULLIF function effectively transforms “division by zero” situations into NULL values. NULLs can then be filtered, handled differently in reporting, or visually marked as non-applicable.
SELECT
order_total / NULLIF(quantity, 0) AS price_per_item
FROM
sales_data;
Custom Logic with CASE: CASE statements provide greater flexibility when you need more than simple null replacement. Define specific actions when the divisor is zero.
SELECT
CASE
WHEN units_sold = 0 THEN 'N/A'
WHEN units_sold IS NULL THEN NULL
ELSE revenue / units_sold
END AS revenue_per_unit
FROM
product_performance;
If your using Python or Stored Procedures you can use more advanced TRY..CATCH blocks to manage these errors.
In DBT you could create macro’s for the above like below
{% macro handle_division(dividend_col, divisor_col, zero_replacement='N/A') %}
CASE
WHEN {{ divisor_col }} = 0 THEN {{ zero_replacement }}
ELSE {{ dividend_col }} / {{ divisor_col }}
END
{% endmacro %}
Then this code could be used within your model.
SELECT
{{ handle_division('order_total', 'quantity') }} AS price_per_item FROM
orders
Late Data? No Problem!
There is a few different definitions for late data. We could have data that is Delayed, so we expected it to arrive in a certain timeframe, but it arrived 2 hours later. This type of data isn’t normally a issue for defensive coding. However we would need some form of monitoring in terms of SLA to insure our customers are happy.
What about if we received data that was out of order, so we Received Mondays data and Wednesday data, but Tuesday data arrived late?
If we know that this could be a common scenario, and we are dealing with a Type 2 data we could consider a dynamic effective_to with a Window Function.
INSERT INTO daily_sales_report (data_date, sales_revenue, order_count) SELECT
data_date
, SUM(order_amount)
, COUNT(*)
FROM
incoming_sales_data
WHERE
-- For the initial report on today's sales
data_date = CURRENT_DATE
-- On-time threshold
AND load_datetime >= DATE_SUB(CURRENT_TIMESTAMP, INTERVAL 3 HOUR)
GROUP BY
data_date;
Using a LEAD window function to peek at the effective_from of the next version of the record.
CREATE VIEW scd2_current_view AS
SELECT
product_id
, effective_from
, LEAD(effective_from) OVER (PARTITION BY product_id ORDER BY effective_from) - INTERVAL '1' DAY AS effective_to
, -- Other columns from your table
FROM
source_table;
This isn’t a solution for every database, but it can be handy when there might be materialised and cached views over the table, to enable this sort of behavior to occur.
If you do go with this approach, you need to be clear with your consumers of the behavior they might expect with the data.
There is other ways which could be more complex, around re-creating a version of history and comparing it against what you have. This might not be a recommended approach for your daily pipelines, but could be a alternative for weekly “clean up” or “verification” the history is in line.
Whichever method you implement, the key here is to drive down incidents delaying your data reaching your customers.
Having these in place can safeguard the integrity in your data, and give you a proactive approach and enforcement with your data.
Would be keen to know of any of your experiences, or ways you protect your data?