Skip to main content

Building a Data Platform: Part 4 - Ingesting data

Introduction

Having been through concepts, architecture and the team required to build a modern data platform it’s time to focus on the various stages of the data flow, beginning with ingestion. As in previous posts the focus will be on building the example architecture from part 2

Ingestion Workflow 

The aim of the ingestion stage is to have loaded data from a source into the data lake so that it can be queried. The amount of transformation should be kept to the bare minimum and should consist of: 
  1. Translating the data into a storage format that is efficient for data lake usage (e.g. Parquet files)
  2. Standardising field names (e.g. converting all fields names to lowercase and using underscores to separate words)
  3. Type conversion (e.g. if loading data from a CSV, convert any numeric fields to an appropriate numeric data type)
  4. Adding metadata to the records for lineage 
The metadata to include on each record can include:
  1. The date and time the data was ingested
  2. The source system
  3. The process that ingested it (could even include version number for full traceability) 
There are many more metadata fields that can be useful and it’s up to you to decide what works best in your environment. 

Any validation of data should be limited to the minimum required to enable these basic transformations. More advanced validation and data quality rules should be applied after the data has been loaded into the lake as a separate process. 

The diagram below shows example workflows for loading data from an API,  loading data from a CSV in S3, and loading data from a SFTP server. 


Note that most of the process remains the same, however, when loading files from a source it is often advisable to store the source file in an archive location so that if an error is discovered, the file can be reprocessed. For API ingestions it’s often necessary to keep a watermark record so that only new records are retrieved from the API. This watermark can be altered to enable re-processing of data if an issue is occurs. 

S3 Storage

Two buckets are required as a minimum: raw storage and archive. The raw storage bucket contains the raw data that is registered with the glue catalogue. The archive bucket is used to store source files for a period of time after they have been processed in case they are required to be re-run due to issues. It is advisable to set a lifecycle rule on the archive bucket to delete objects after a set period (e.g. 30 days) to prevent the archive growing indefinitely. 

The raw storage can either be a single bucket which uses S3 prefixes to separate datasets or there can be a bucket per dataset. The choice is a personal one but one advantage of a bucket per dataset is that you can apply different tags which is useful if you want to report on the storage and cost of each dataset. The downside however is that there is more effort to provision the buckets but this should be minimal using an IaC tool such as Terraform. Data in the bucket should be structured using a prefix such as the one below: /{information_classification}/{source}/{dataset}/ingestion_date={YYYY-MM-DD}

{information_classification} should be replaced with the classification of the data in accordance with your organisation’s standards. For example, companies often classify data along the lines of public (publicly available), internal (only for use by employees), confidential (limited to certain employees) and pii (personal identifiable information with very restricted access). There can be many other classifications but this is a reasonable start. 

{source} should be a friendly name that described where you got the data from (e.g. facebook, twitter, registration-api, etc). 

{dataset}  should be a friendly name for the dataset, for example if you’ve ingested the customer dataset from a CRM platform this should be customer. Note that you may want to have various additional sub-prefixes if there is more than one feed per dataset (e.g. customer, address, etc). 

The ingestion_date is used to define the partitioning scheme for the table. Partitioning is essential for reducing the cost of queries run by Athena or Redshift Spectrum. Using a partition key in a query provides a hint for the query engine to avoid reading certain S3 prefixes thus reducing the amount of data scanned (known as partition pruning). A date string of the format YYYY-MM-DD is used to make it easier to filter to records based on the date they were ingested. The reason for this is that a lot of queries will want to filter to a subset of data based on the time it was ingested (e.g. data transformations will only want to process and transform new data for loading into the warehouse). 

I strongly advise against using a partition schema along the lines of year=YY/month=MM/day=DD as this makes range queries very difficult for users. Although this can be fine for ingestion processes that can generate a query to load each day’s data in turn, it is very difficult for users to pick ranges of data, if you doubt this, try writing a SQL query to return all records for a dataset between 2021-11-15 and 2022-03-05. By contrast, using a date string plays well with query engines as they automatically can convert dates and strings for comparisons of this nature. Other fields can be used for partitioning however ingestion_date in my experience has been the most useful one as data consumers are usually interested in recent data and going back a given amount of time. 

Glue

AWS glue contains the data catalogue that defines the metadata used by Athena and Redshift Spectrum to query the data stored in files in S3. Glue can have data catalogues, databases and tables. I recommend one data catalogue per environment (e.g. dev, test, prod) and using databases to logically group one or more ingested data sources together. A table should be created for each distinct dataset/feed ingested. Ensure that the table is created with the appropriate partition key(s). 

There are two ways to define tables in Athena: via the API or using a crawler. I highly recommend avoiding crawlers as they can be expensive to run and create tables and partitions based on the structure of the files they crawl. It is likely to be cheaper, faster and grant you greater control if you create tables via the API. This can be done either via an IaC tool such as terraform or by calling the API via the ingestion process itself (e.g. using the boto3 library in Python).

When ingestion processes add new data to S3, the glue catalog needs to be informed that there are additional partitions. This is achieved most efficiently by calling the glue API to add a new partition (or using a SQL command via Athena to add the new partition). You should avoid using the MSCK REPAIR TABLE Athena command as it will scan all prefixes in the S3 bucket to recreate the partition metadata which means it will take longer for this command to run each day as new files are added. 

Note that libraries such as the AWS Data Wrangler for Python will automatically update partition information in Glue when you use their dataset functionality. When removing old data as part of your data retention process note that in addition to deleting the actual S3 files you will also need to remove the partitions for the table from Glue. This should also be done via the API or an Athena ALTER TABLE statement. 

Redshift Spectrum

Redshift Spectrum provides a mechanism for querying data stored in S3 and registered in the Glue catalogue by creating external schemas in a Redshift database. I recommend prefixing these schemas with an indication they are in the data lake (e.g. lake_). The reason for using Redshift Spectrum will become clearer in the future post about data transformation using DBT. 

Ingestion Processes

Although there are a variety of tools, languages and frameworks that can be used to ingest data into the lake I will focus on the use of Python. For most datasets, the AWS Data Wrangler library should be sufficient. For small datasets that can be processed using AWS Data Wrangler in a small amount of time (i.e. easily less than 15 minutes) a Lambda function is a simple solution and AWS Data Wrangler even comes with a lambda layer to make this easy to deploy. If dealing with datasets that either require more memory or more than 15 minutes to ingest, an ECS task is a good solution (you can also use AWS Batch which wraps ECS if you prefer). For larger datasets I would suggest using PySpark and in particular the Spark SQL API as it provides great performance and can handle very large datasets easily. 

Orchestration

As the example architecture is focussed on batch ingestion, there are two main ways that ingestion processes can be invoked:
  1. Time-based schedules
  2. Event triggers 
Time-based triggers are good for periodic extractions (i.e. daily, hourly, etc) which can’t be invoked by an event such as extracting data from an API or downloading files from an external SFTP server. These triggers can be created using cron schedule expressions in EventBridge. 

Event triggers can be used when the data source is aware of the data lake and can trigger some form of event. This can either be by writing a file to an S3 landing bucket the data lake owns or via some other form of notification such as sending an SNS notification to a topic owned by the data lake. 

Creating a landing bucket for data sources to load data is very useful as it enables the data to be ingested into the lake as soon as it becomes available by setting up S3 event notifications to trigger the load process as shown in the diagram below. 

Up Next 

The next part will discuss data transformation in detail and how DBT can be used for this purpose.

Comments

Popular posts from this blog

Building a Data Platform: Part 5 - Data Transformation (loading the warehouse)

Introduction  Once the data has been ingested into the data lake it's time to make it available for reporting by transforming the data into structures that are simple to query for a BI tool. This part of the series will describe the data modelling approach, the transformation tool (DBT) and how to transform the data ready for reporting. The example architecture diagram is shown below. Data Model There are a variety of data modelling approaches that can be used to model a data warehouse, popular approaches include: Dimensional Data Vault Third-Normal Form (3NF) A comparison of the different approaches is beyond the scope of this article, however for our example architecture we will be using dimensional modelling. Dimensional modelling has been the dominant data warehouse modelling approach in my personal experience and there are a good number of reasons to use it, the most important being ease-of-use for analysts and business users and performance. Dimensional modelling has two main...

Building a Data Platform: Part 1 - Concepts

Introduction This is the first part of a series about how to build a basic modern data platform that should be sufficient for most small-medium sized businesses. The series will focus on the key architecture elements, why they are needed and potential technologies that can be used. I'll only discuss batch processing initially, but I may cover real-time/streaming ingestion in a future post. This post will cover the conceptual elements of a modern data platform which can be used as a guide if you're starting out and help when considering which services or tools will meet your requirements. There isn't a "best" technology stack when building a data platform, it's all trade-offs. For example, ETL tools are easier to use than writing code but often hard to integrate within CI/CD pipelines. Conceptual Data Platform Conceptually, let's define a data platform as a collection of technologies that enables an organisation to use its data to the fullest. Generally the...

Building a Data Platform: Part 2 - Example Data Platform

Introduction In  part 1  we walked through a conceptual model of a basic data platform that covered off the key capabilities and requirements for each capability. In this part we'll discuss an example architecture that provides the minimum capabilities for a modern data platform. Conceptual to Logical architecture The diagram below shows a potential logical architecture for building our conceptual data platform. Before we dive into discussing the elements of it, note that the logical architecture is always going to depend on your situation and decisions will be affected by: The skills you have within your organisation Which cloud provider you use (assuming you're even in the cloud) Existing technology stack The appetite for change to the technical estate The key things to note are: All conceptual capabilities have a technology to provide them. Some technologies provide multiple conceptual capabilities (e.g. Redshift providing the transformed data storage and query engine) This...