Real Time CDC Pipeline with AWS DMS and Snowflake
Data is everywhere and it needs to be ingested in order to make it useful, ingestion can be done either in batches or in real time, in this article I will share a data pipeline design where you can capture data changes in other words Change Data Capture (CDC) from source to destination in near real time using technologies AWS DMS and Snowflake.
I am not only going to share the approach through the design but also Terraform scripts that can be reused easily for your use case. Later you will learn some important points related to DMS and SnowPipe.
Architecture
Components
Lets go over the components with its related Terraform functions:
AWS DMS
DMS (Database migration service) is a service provided by AWS for data movement from point A to B. It is a very powerful service and easy to use.
Assuming roles are already created through Terraform or AWS Console.
A DMS consist of many components:
Replication Instance
Setting up instance on which DMS will run, this gives control over instance type and size according to data size and latency needs. You can also go with managed instance.
resource "aws_dms_replication_instance" "my_replication" {
engine_version = "3.4.7"
replication_instance_id = my-replication"
allocated_storage = "100"
apply_immediately = true
auto_minor_version_upgrade = true
availability_zone = "us-east-1a"
multi_az = false
publicly_accessible = false
replication_instance_class = "dms.c5.4xlarge"
vpc_security_group_ids = ["sg-123456"]
replication_subnet_group_id = aws_dms_replication_subnet_group.my_dms_subnet_group.id
}
# Create a subnet group using existing VPC subnets
resource "aws_dms_replication_subnet_group" "my_dms_subnet_group" {
replication_subnet_group_description = "Subnet for DMS job"
replication_subnet_group_id = "my-dms-subnet-group"
subnet_ids = ["subnet-123456"]
}
Source
The goal is to move data from Application database which in our case is RDS.
resource "aws_dms_endpoint" "my_source" {
endpoint_type = "source"
endpoint_id = "my-source"
engine_name = "mysql"
server_name = "my_server"
database_name = "my_db"
port = my_port
username = "my_user"
password = var.PSWD
ssl_mode = "none"
}
Target
S3 is used as target which is ideal for analytical workflows, either to use from s3 into Spark or load into further warehouses [discussed later].
resource "aws_dms_endpoint" "my_target" {
endpoint_type = "target"
endpoint_id = "my-target"
engine_name = "s3"
s3_settings {
bucket_name = "my-bucket"
bucket_folder = "my-prefix"
cdc_path = "my-cdc-full-s3-path"
cdc_inserts_only = true
data_format = "parquet"
service_access_role_arn = "my-role"
parquet_timestamp_in_millisecond = true
timestamp_column_name = "Timestamp"
encryption_mode = "SSE_KMS"
server_side_encryption_kms_key_id = "my-kms-id"
}
}
Task
Task is what actually executes the data movement with the configurations set up by users on the replication instance previously setup. Task is the core component from where you can monitor status, start, stop or reload/resume.
Task must be stopped to apply changes.
resource "aws_dms_replication_task" "my_task" {
replication_instance_arn = aws_dms_replication_instance.my_replication.replication_instance_arn
replication_task_id = "my-task"
source_endpoint_arn = aws_dms_endpoint.my_source.endpoint_arn
table_mappings = file("my-table-mappings.json")
target_endpoint_arn = aws_dms_endpoint.my_target.endpoint_arn
migration_type = "full-load-and-cdc"
lifecycle {
ignore_changes = all
}
my-table-mappings.json
with basic example:
{
"rules": [
{
"rule-type": "selection",
"rule-id": "1",
"rule-name": "1",
"object-locator": {
"schema-name": "my-schema",
"table-name": "my-table"
},
"rule-action": "explicit"
},
{
"rule-type": "transformation",
"rule-id": "2",
"rule-name": "2",
"object-locator": {
"schema-name": "my-schema",
"table-name": "my-table"
},
"rule-target": "table",
"rule-action": "rename",
"value": "my-bucket/my-prefix"
}
]
}
The above json will select
just one table from the schema and apply the rename
transformation which ends up in s3. Some other common transformations are add-column
, include-column
, remove-column
. For more table level configuration check aws documentation.
Snowflake
Snowflake is a modern Data Warehouse with lot of powerful features, with one being the SnowPipe which allows to ingest data from S3 into a SF table.
Assuming database, schema and roles with permissions are already set.
Table
Creating a table where we will load data into.
resource "snowflake_table" "my_table" {
provider = snowflake.snowflake_provider
database = "my-db"
schema = "my-schema"
name = "my-table"
comment = "My table sourced from RDS through DMS"
column {
name = "ID"
type = "NUMBER(38,0)"
}
column {
name = "COLUMN_A"
type = "VARCHAR(1000)"
}
column {
name = "COLUMN_B"
type = "TIMESTAMP_TZ(9)"
}
}
Storage Integration
Snowflake requires an integration with an external source (S3), this integration allows snowflake to work with the source and ingest data into stage whenever it lands.
resource "snowflake_storage_integration" "my_integration" {
provider = snowflake.snowflake_provider
name = "my-integration"
comment = "..."
type = "EXTERNAL_STAGE"
enabled = true
storage_provider = "S3"
storage_aws_role_arn = "my-aws-role-for-sf-with-s3-read-permissions"
storage_allowed_locations = [
"s3://my-bucket/my-prefix/",
]
}
Stage
Stage works with storage integration to move data from external sources into Snowflake.
resource "snowflake_stage" "my_stage" {
provider = snowflake.snowflake_terraform_svc
name = "my-stage"
url = "s3://my-bucket/my-prefix/"
database = "my-db"
schema = "my-schema"
storage_integration = snowflake_storage_integration.my_integration.name
}
SnowPipe
Last step on SF side is to setup a pipe which loads data from s3 into a table using a stage in a continuous fashion, its similar to running copy command manually.
resource "snowflake_pipe" "my_pipe" {
provider = snowflake.snowflake_terraform_svc
name = "my-pipe"
database = "my-db"
schema = "my-schema"
comment = "...."
copy_statement = "COPY INTO my-db.my-schema.my-table FROM @my-db.my-schema.my-stage/my-table/ FILE_FORMAT = (TYPE = 'PARQUET') MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE"
auto_ingest = true
}
Using FILE_FORMAT
directly in the copy_statement
instead of using snowflake_file_format
as there is a bug reported here.
S3 Notification
In order for SnowPipe to listen to S3 drops, we need to setup s3 notification for the Snowflake role.
S3 bucket notification must be created after Snowflake integration has been setup, so in case you are doing all in one terraform plan, you will have to apply it again to reflect this one.
resource "aws_s3_bucket_notification" "my_bucket_notification" {
bucket = "my-bucket"
queue {
queue_arn = snowflake_pipe.my_pipe.notification_channel
events = ["s3:ObjectCreated:*"]
filter_prefix = "my-prefix/"
}
}
Alerting & Monitoring
Alerting is important, otherwise pipeline might fail silently until someone manually pings and ask why the data is stale. Alerting is shown in the design but for terraform refer this. It can be setup for each component in many ways, easiest and fastest are:
For DMS, Email notification can be setup through Simple Notification Service (SNS) that can be subscribed by
n
number of subscribers.For Snowflake, Dashboards are typically ideal to catch data related issues, like freshness etc. Custom jobs or tools can also help, like Great Expectations or Monte Carlo.
For monitoring and debugging:
For DMS, console is a good way to monitor while logs through CloudWatch can help.
For SnowPipe, below are some commands to help with debugging etc.
For finding current pipe status, loading or in queue files.
select system$pipe_status('my-pipe');
For pipe related properties like s3 notification, copy command:
describe pipe 'my-pipe';
For history on the table:
SELECT * FROM table(information_schema.copy_history(table_name=>'my-db.my-schema.my-table, start_time=> dateadd(months, -1, current_timestamp())))
ORDER BY last_load_time DESC
LIMIT 10;
Points
Both DMS and SnowPipe have their limitations and concerns, that may or may not be a suitable choice for certain types of workflows. Below are some important points:
SnowPipe could be slow depending on source file size, resulting in higher latencies.
SnowPipe could end up with duplicates if DMS full load is performed.
SnowPipe does not support
Upserts
, it only supportsInserts
throughcopy
command, meaning duplicates could occur.DMS CDC depends on RDS binary log, if DMS is not fixed in time, you may require a full load.
DMS is not suitable for custom business logics in the transformation.
DMS can be configured on how often it should sink data, smaller files could lead to lower latency on SnowPipe however higher utilization on DMS Instance.
Cross account DMS like having a target in different AWS account could be challenging to setup especially if KMS encryption is used.
DMS further detailed limitations
Conclusion
DMS in my option is underrated, it can do lot of work with lot less effort. If you are looking to do CDC or moving data from point A to point B internally, then DMS is a good tool to consider rather than looking for alternative open source or third party tools especially if you are on AWS.
DMS works great in scenarios where heavy transformation or business logic is not required. For advanced use case, going with message service with stream processing engine with custom logic could give more flexibility and scalability with better real time performance at the expense of infrastructure and tools setup.