Manual Unit Testing
What is a unit test?
Unit tests exist in all manners of technology implementations, be it in productionised code or a data warehouse. Although, the latter may often be seen as less practised, perhaps due to the recency of the industry. For the purposes of this document, we will focus on implementing unit testing within data warehouses.
The prime aim of unit testing is to record the success or failure of expected outputs within not only the data warehouse, but the entire data transformation pipeline. This can be separated into three distinct stages:
- Input - Has the Extract and Load of data from data source into data lake occurred as expected?
- Transform - Have the transformation scripts behaved as expected?
- Output - Do the end tables produce the expected results?
Unit test were designed to follow the principles of being specific, automated, fast and independent. So for datawarehousing our unit tests should be:
- Specific: The unit test should have a clear success/failure output, so that debugging becomes much easier.
- Automated: The unit test should be able to run on a schedule, so a log can be formed of previous results.
- Fast: The unit tests themselves need to be quick, so the entire data pipeline is not hindered by slow running tests. This also translates back to the tests being specific so that debugging can be rapid.
- Independent: The unit test should be able to be run on its own and at any time and order.
Various sources will have their own way of stating this, but they generally align on the concept.
Why do we do this?
The expected output of a unit test has been briefly touched upon in the previous section. The main interest here is the test type and the test output. If we are testing for whether a primary key in a table is unique or not, then the result should be either true or false. Other tests that act more like a record will have their result as null
to prevent skew when looking at the total true/false. The simple set of outputs is a quick way of checking whether a table has been built correctly and that the transform logic outputs as expected.
Therefore, well-placed unit tests allows visibility for the end-user to know exactly which expected outputs failed, where and when, such as to enable rapid debugging. This then doubles-up for the analyst to monitor the warehouse build as they go along.
What about the logs in the Kleene application?
While we do have the logs in the Kleene app, we cannot currently surface this into a BI tool to monitor the results of the log. It also helps having the logs in a relational-database format, which makes pointing a dashboard at the test log table much easier.
The tests we implement as standard at Kleene are as follows:
- Transform start time
- Primary key is unique
- Data has been recently extracted
- Transform has run successfully *
- Nulls are not present in particular fields
- Static data checks
- Numeric checks
Where tests six and seven are not mandatory everywhere, but should be used where possible to maximise pipeline health maintenance.
* For The Transform has run successfully unit test (3.), we are checking the unit test has finished inserting within a time frame from when the main query in the transform finishes running, where that time is defined by adding the _last_run_timestamp
field into the main query and not the unit test query.
An example is given below:
CREATE TABLE staging.customers_entity as
SELECT
scs.customer_id
, scs.customer_first_name
, scs.customer_last_name
, scs.customer_created_at
.
.
.
, current_timestamp as _last_run_timestamp
FROM
staging.shopify_customers_subentity scs
left join ...
Be careful to not use the _last_run_timestamp
from the previous transform step; always create a new _last_run_timestamp
field in the transform being tested.
How to implement
To house all the results from our specific, automated, fast and independent unit tests in the data warehouse, we need to create a table in its own schema. Once created, unit tests insert their metadata into this table.
How to create the initial table
Copy and paste the SQL below into a transform, save and then run. You can run this directly in the SQL console, but I would advise against this as there will be no record saved in the app of how the initial unit test log table was created.
CREATE SCHEMA IF NOT EXISTS utility;
CREATE TABLE IF NOT EXISTS utility.unit_test_log
(
_test_timestamp datetime -- test run timestamp
, _test_type varchar -- test type/name
, _test_description varchar -- description of what the test is doing
, _test_environment varchar -- database in the data warehouse
, _test_schema varchar -- schema in the data warehouse
, _test_table varchar -- table the unit test is focused on
, _test_result boolean -- true/false for the test
, _test_row_count int -- the row count of the table as of the test
, _test_id varchar -- unique identifier for the unit test result
)
The reason we create the table like this, instead of creating a table as a select statement, is to avoid the situation where the data warehouse decides the data type of the string fields is a varchar size equal to the number of characters in the select statement for that field. For instance:
CREATE TABLE utility.unit_test_log as
SELECT
current_timestamp as _test_timestamp -- timestamp_tz(9)
, 'unique primary key' as _test_type -- varchar(18)
, 'checks the guinea_pig_id primary key is unique' as _test_description -- varchar(23)
, 'prod' as _test_environment -- varchar(4)
, 'staging' as _test_schema -- varchar(7)
, 'staging.guinea_pig_subentity' as _test_table -- varchar(28)
, count(distinct guinea_pig_id) = count(*) as _test_result -- boolean
, count(*) as _test_row_count -- number(18,0)
, md5(_test_timestamp||'_'||_test_type||'_'||_test_schema||'_'||_test_table) as _test_id -- whatever the max varchar is for the query
FROM
staging.guinea_pig_subentity
Next to each field is commented-out text, which shows the datatype that is inherited when creating the table with a select statement. As you can see, the free text fields now have an upper character limit, which means inserting longer text into these fields will cause errors, since the data types will not match! Thus, we must create the table using the initial method first, where we can define the fields with their desired data types.
Once the table structure is created, all that remains is populating the unit test log with data. This is not complicated to do at all, as the unit tests are simply SQL INSERT INTO
statements, where the template for each of the standard tests is shown below.
Examples to Copy and Paste
Unit Testing Workflow:
To make implementing the unit tests easier, please copy and paste the following workflow into each transform, then amend where appropriate.
- Copy and paste the full suite of unit tests below into a transform, then insert the main transform query between dashed lines.
- Add this field to the main query in the transform:
current_timestamp
as_last_run_timestamp
. It is recommended to put this at the bottom of the query to be out of the way. - Use find and replace to replace all instances of these fields / tables below:
- table name: replace
_schema_._table_name
with [schema_name].[table_name] - schema name: replace
_schema_
with [schema_name] - unique primary key: replace
_field_name
with [primary_key] - static data check field: replace date with
_date_field
and_field_name
with [some field] - numeric test field: replace
_numeric_field
with [some numeric field] and change the numeric test as needed (e.g. >0 or <0)
- table name: replace
- If your database name is different to
prod
then you need to change this too. The reason the database name is included in the path to the input table is due to it being easier for the DAG to recognise dependencies on an Amazon Redshift data warehouse, when the transforms follow a [database_name].[schema_name].[table_name] format.
------------------------------------------------------------------------------------------------------------------
-----------------------------------------------/* SET VARIABLES */----------------------------------------------
------------------------------------------------------------------------------------------------------------------
-- UNIT TEST DB, SCEHMA AND TABLE DETAILS
SET (_ENVIRONMENT) = ('DATABASE_NAME');
SET (_SCHEMA) = ('SCHEMA_NAME');
SET (_TABLE_1) = ('SCHEMA_NAME.TABLE_NAME_1');
-- UNIT TEST 4 VARIABLES:
SET (_PRIMARY_KEY) = ('UNIQUE_GRANULARITY_FIELD_NAME');
-- UNIT TEST 5 VARIABLES:
SET (_FIELD_NAME_NULL) = ('FIELD_NAME_THAT_SHOULD_NEVER_BE_NULL');
-- UNIT TEST 6 VARIABLES:
SET (_MONTH) = ('MANUAL_DATE_GIVEN_FOR_ACCEPTANCE_TEST_CRITERIA');
SET (_DATE_FIELD) = ('FIELD_NAME_THAT_CONSTRAINS_ACCEPTANCE_TEST_CRITERIA');
SET (_FIELD_NAME_AGG) = ('FIELD_NAME_TO_AGGREGATE');
SET (_STATIC_FIELD_VALUE) = (NUMERIC_VALUE_FOR_ACCEPTANCE_TEST);
-- UNIT TEST 7 VARIABLES:
SET (_NUMERIC_FIELD) = (NUMERIC_FIELD);
SET (_NUMERIC_VALUE) = (NUMERIC_VALUE_FOR_ACCEPTANCE_TEST);
-- UNIT TEST 8 VARIABLES:
SET (_FOREIGN_KEY) = ('T1.FIELD_NAME_FROM_TABLE_1'); -- Important to prefix the field name with "t1."
SET (_PRIMARY_FIELD) = ('T2.FIELD_NAME_FROM_TABLE_2'); -- Important to prefix the field name with "t2."
SET (_TABLE_2) = ('SCHEMA_NAME.TABLE_NAME_2');
------------------------------------------------------------------------------------------------------------------
------------------------------------/* 1. TRANSFORM COMMENCEMENT UNIT TEST */------------------------------------
------------------------------------------------------------------------------------------------------------------
INSERT INTO
PROD.UTILITY.UNIT_TEST_LOG
SELECT
current_timestamp AS _test_timestamp
, 'transform start time' AS _test_type
, 'checks the transform has started and records the timestamp' AS _test_description
, $_ENVIRONMENT AS _test_environment
, $_SCHEMA AS _test_schema
, $_TABLE_1 AS _test_table
, null AS _test_result
, count(*) AS _test_row_count
, md5(_test_timestamp||'_'||_test_type||'_'||_test_schema||'_'||_test_table) AS _test_id
;
------------------------------------------------------------------------------------------------------------------
------------------------------------------------------------------------------------------------------------------
------/* INSERT YOUR TRANSFORM IN PLACE OF THIS ROW; BETWEEN THE TWO LOTS OF DOUBLE COMMENTED OUT LINES */------
;
------------------------------------------------------------------------------------------------------------------
------------------------------------/* 2. TRANSFORM RUN SUCCESS UNIT TEST */-------------------------------------
------------------------------------------------------------------------------------------------------------------
INSERT INTO
PROD.UTILITY.UNIT_TEST_LOG
SELECT
current_timestamp AS _test_timestamp
, 'transform end time' AS _test_type
, 'checks the transform has run successfully' AS _test_description
, $_ENVIRONMENT AS _test_environment
, $_SCHEMA AS _test_schema
, $_TABLE_1 AS _test_table
, CASE
WHEN DATEADD(minute, -1, CURRENT_TIMESTAMP::timestamp) <= MAX(_LAST_RUN_TIMESTAMP)
and MAX(_LAST_RUN_TIMESTAMP) <= DATEADD(minute, 1, CURRENT_TIMESTAMP::timestamp)
THEN TRUE
ELSE FALSE
END AS _test_result
, count(*) AS _test_row_count
, md5(_test_timestamp||'_'||_test_type||'_'||_test_schema||'_'||_test_table) AS _test_id
FROM
TABLE($_TABLE_1);
------------------------------------------------------------------------------------------------------------------
------------------------------------/* 3. RECENT DATA EXTRACTION UNIT TEST */-------------------------------------
------------------------------------------------------------------------------------------------------------------
INSERT INTO
PROD.UTILITY.UNIT_TEST_LOG
SELECT
current_timestamp as _test_timestamp
, 'data extraction is recent' as _test_type
, 'checks for recently extracted data via _kleene_extract_date' as _test_description
, $_ENVIRONMENT AS _test_environment
, $_SCHEMA AS _test_schema
, $_TABLE_1 AS _test_table
, max(_kleene_extract_date) >= DATEADD(day, -1, current_Timestamp::timestamp) as _test_result
, count(*) as _test_row_count
, md5(_test_timestamp||'_'||_test_type||'_'||_test_schema||'_'||_test_table) as _test_id
FROM
TABLE($_TABLE_1);
------------------------------------------------------------------------------------------------------------------
--------------------------------------/* 4. UNIQUE PRIMARY KEY UNIT TEST */---------------------------------------
------------------------------------------------------------------------------------------------------------------
INSERT INTO
PROD.UTILITY.UNIT_TEST_LOG
SELECT
current_timestamp AS _test_timestamp
, 'unique primary key' AS _test_type
, 'checks the _field_name primary key is unique' AS _test_description
, $_ENVIRONMENT AS _test_environment
, $_SCHEMA AS _test_schema
, $_TABLE_1 AS _test_table
, count(DISTINCT IDENTIFIER($_PRIMARY_KEY)) = count(*) AS _test_result
, count(*) AS _test_row_count
, md5(_test_timestamp||'_'||_test_type||'_'||_test_schema||'_'||_test_table) AS _test_id
FROM
TABLE($_TABLE_1);
------------------------------------------------------------------------------------------------------------------
--------------------------------------/* 5. NO NULLS IN A FIELD UNIT TEST */--------------------------------------
------------------------------------------------------------------------------------------------------------------
INSERT INTO
PROD.UTILITY.UNIT_TEST_LOG
SELECT
current_timestamp AS _test_timestamp
, 'nulls are not present in the field' AS _test_type
, 'checks there are no nulls in the field' AS _test_description
, $_ENVIRONMENT AS _test_environment
, $_SCHEMA AS _test_schema
, $_TABLE_1 AS _test_table
, SUM(CASE WHEN IDENTIFIER($_FIELD_NAME_NULL) is null THEN 1 ELSE 0 END) = 0 AS _test_result
, count(*) AS _test_row_count
, md5(_test_timestamp||'_'||_test_type||'_'||_test_schema||'_'||_test_table) AS _test_id
FROM
TABLE($_TABLE_1);
The examples below are designed to be copy and pasted, where you can highlight placeholder text and easily replace them with what you need. Placeholder text examples are _field_name
and _table_name
, so really the fields that begin with an underscore ‘_
’ and end with ‘name
’ are fields to replace.
As we are copy and pasting, the unit tests themselves must not be changed structurally. The text that can be changed are the aforementioned _field_name
and _table_name
.
The semicolon delimiter is at the beginning of each unit test, so that we can also copy and paste the unit test after any SQL and have it run independently, as opposed to the delimiter being at the end, where the prior-SQL query and unit test with no delimiter separating the queries can ruin the transform.
Below, we go into more detail on the setup for each unit test, where required.
1) Transform start time (needs to go at the top of the transform).
You may see that there is no FROM <table>
clause in the unit test below. This is not a mistake. We ignore the FROM
clause since the unit test only records when the transform started and in which transform it is, thus we drop the dependency entirely. In addition, this means we can implement unit tests in a transform and run it with success on the first try, rather than creating the transform, running it, then adding in unit tests.
;
INSERT INTO utility.unit_test_log
SELECT
current_timestamp as _test_timestamp
, 'transform start time' as _test_type
, 'checks the transform has started and records the timestamp' as _test_description
, 'prod' as _test_environment
, '_schema_' as _test_schema
, '_schema_._table_name' as _test_table
, null as _test_result
, count(*) as _test_row_count
, md5(_test_timestamp||'_'||_test_type||'_'||_test_schema||'_'||_test_table) as _test_id
2) Primary key is unique.
This takes a count of the distinct values in _field_name
and compares that with a count of the total rows. If the primary key is unique, then the number of rows matches the number of distinct values in _field_name
.
;
INSERT INTO utility.unit_test_log
SELECT
current_timestamp as _test_timestamp
, 'unique primary key' as _test_type
, 'checks the _field_name primary key is unique' as _test_description
, 'prod' as _test_environment
, '_schema_' as _test_schema
, '_schema_._table_name' as _test_table
, count(distinct _field_name) = count(*) as _test_result
, count(*) as _test_row_count
, md5(_test_timestamp||'_'||_test_type||'_'||_test_schema||'_'||_test_table) as _test_id
FROM
prod._schema_._table_name
3) Data has been recently extracted.
Using the _kleene_extract_date
(or other fields such as updated_at
), the max value of this field is taken and compared to the current timestamp of when the unit test is run. If the extraction date is within some number of days of the current timestamp (this will vary depending on when new data is set to be ingested), then data is classed as “recently extracted” else “data is not actively flowing into the warehouse”.
;
INSERT INTO utility.unit_test_log
SELECT
current_timestamp as _test_timestamp
, 'data extraction is recent' as _test_type
, 'checks for recently extracted data via _kleene_extract_date' as _test_description
, 'prod' as _test_environment
, '_schema_' as _test_schema
, '_schema_._table_name' as _test_table
, max(_kleene_extract_date) >= DATEADD(day, -1, current_Timestamp::timestamp) as _test_result
, count(*) as _test_row_count
, md5(_test_timestamp||'_'||_test_type||'_'||_test_schema||'_'||_test_table) as _test_id
FROM
prod._schema_._table_name
4) Transform has run successfully
Remember, the _last_run_timestamp
must exist in the main query (details given previously) for this unit test to work! When the main query has finished, the timestamp is logged in the table before this unit test begins. Once this unit test begins, it compares the current timestamp against the _last_run_timestamp
from the main query and checks if they are within some margin of error to each other in time. If these are not, then the main query may have failed, thus a mismatch in timestamps result in a false result. This test is important because it materialises transforms running and succeeding/failing into the unit test log.
;
INSERT INTO utility.unit_test_log
SELECT
current_timestamp as _test_timestamp
, 'transform success' as _test_type
, 'checks the transform has run successfully' as _test_description
, 'prod' as _test_environment
, '_schema_' as _test_schema
, '_schema_._table_name' as _test_table
, CASE
WHEN DATEADD(minute, -1, CURRENT_TIMESTAMP::timestamp) <= MAX(_last_run_timestamp)
and MAX(_last_run_timestamp) <= DATEADD(minute, 1, CURRENT_TIMESTAMP::timestamp)
THEN TRUE
ELSE FALSE
END as _test_result
, count(*) as _test_row_count
, md5(_test_timestamp||'_'||_test_type||'_'||_test_schema||'_'||_test_table) as _test_id
FROM
prod._schema_._table_name
5) Nulls are not present in particular fields
;
INSERT INTO utility.unit_test_log
SELECT
current_timestamp as _test_timestamp
, 'nulls are not present in the field' as _test_type
, 'checks there are no nulls in the field _field_name' as _test_description
, 'prod' as _test_environment
, '_schema_' as _test_schema
, '_schema_._table_name' as _test_table
, SUM(CASE WHEN _field_name is null THEN 1 ELSE 0 END) = 0 as _test_result
, count(*) as _test_row_count
, md5(_test_timestamp||'_'||_test_type||'_'||_test_schema||'_'||_test_table) as _test_id
FROM
prod._schema_._table_name
6) Static data checks (example, change the result as appropriate)
;
INSERT INTO utility.unit_test_log
SELECT
current_timestamp as _test_timestamp
, 'static check' as _test_type
, 'checks static/historic data has not changed' as _test_description
, 'prod' as _test_environment
, '_schema_' as _test_schema
, '_schema_._table_name' as _test_table
, SUM(CASE WHEN date_trunc(month, date) = _date_field
THEN _field_name END) = 3000000 as _test_result
, count(*) as _test_row_count
, md5(_test_timestamp||'_'||_test_type||'_'||_test_schema||'_'||_test_table) as _test_id
FROM
prod._schema_._table_name
7) Numeric checks (example, change the result as appropriate)
;
INSERT INTO utility.unit_test_log
SELECT
current_timestamp as _test_timestamp
, 'numeric check' as _test_type
, 'checks the number of basket items is greater than zero' as _test_description
, 'prod' as _test_environment
, '_schema_' as _test_schema
, '_schema_._table_name' as _test_table
, MIN(total_basket_items) > 0 as _test_result
, count(*) as _test_row_count
, md5(_test_timestamp||'_'||_test_type||'_'||_test_schema||'_'||_test_table) as _test_id
FROM
prod._schema_._table_name
Implementing these in the Kleene app requires correct formatting followed by appending the test into the appropriate transform. For example, the below unit test is inserted into the orders domain, which is usually called dw.orders_domain
:
;
INSERT INTO utility.unit_test_log
SELECT
current_timestamp as _test_timestamp
, 'unique primary key' as _test_type
, 'checks the order_id primary key is unique' as _test_description
, 'prod' as _test_environment
, 'dw' as _test_schema
, 'dw.orders_domain' as _test_table
, count(distinct order_id) = count(*) as _test_result
, count(*) as _test_row_count
, md5(_test_timestamp||'_'||_test_type||'_'||_test_schema||'_'||_test_table) as _test_id
FROM
prod.dw.orders_domain
When unit tests should be implemented during a build
Unit tests should be deployed in a transform whenever a new transform is built. This saves times by identifying bugs as early as possible.
Custom unit tests
You can create them, yes but try and follow the standard outlined above!
Updated 10 months ago