Requirement:
Need to built an end to end pipeline to capture the changes in source system and replicate the data changes in real time to target application. Attunity Replicate will be used to replicate the data from source to stage and Delta Lake will be used to merge the data from stage to target.
Design Diagram:
Tools Used:
Attunity Replicate, Delta Lake, Data Bricks and Gen 2 Storage.
Solution:
- Attunity Replicate will identify the changes from Source System and populate FULL and INCREMENTAL table.
- When any new row inserted in Source Attunity will insert the new record on FULL table and update\Delete records in INCREMENTAL table with Unique Handler.
- Created Delta Lake Table and Merge the changes between
FULL Vs INCREMENTAL table
Created External Table on top of replicate stage tables.
CREATE EXTERNAL TABLE delta_Lake_cdc.employee( #
CREATE EXTERNAL TABLE delta_Lake_cdc.employee__ct( #
Read the FULL Load Table and Populate DELTA Table
val df=spark.sql("select * from delta_Lake_cdc.employee")
df.createOrReplaceTempView("employee")
%sql
CREATE TABLE IF NOT EXISTS delta_Lake_cdc.employee_CDC
USING DELTA
LOCATION 'abfss://containername@gen2.dfs.core.windows.net/delta_Lake_cdc/dbo/employee_CDC'
AS SELECT * FROM employee
Identify the changes between FULL and INCREMENTAL using below query and create _CT dateset
%scala
val df_emp_ct = spark.sql("select EmpID, EmpName, Designation, Department, JoiningDate from (select t1.* from (select header__operation,header__change_seq,empid,empname,designation,department,joiningdate from delta_Lake_cdc.employee__ct where header__operation = 'UPDATE') t1 join (select empid,max(header__change_seq) max_seq from (select cast(header__change_seq as varchar(500)) as header__change_seq ,empid from delta_Lake_cdc.employee__ct where header__operation = 'UPDATE') t2 group by empid) s on t1.empid=s.empid and t1.header__change_seq = s.max_seq) t3")
df_emp_ct.createOrReplaceTempView("employee_ct")
Merge the Changes to Delta Table
%sql
MERGE INTO delta_Lake_cdc.employee_CDC AS sink
USING employee AS source
ON sink.empid = source.empid
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED
THEN INSERT *
%sql
MERGE INTO delta_Lake_cdc.employee_CDC AS sink
USING employee_ct AS source
ON sink.empid = source.empid
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED
THEN INSERT *
Once all the merge is completed apply optimize to improve the performance
%sql
OPTIMIZE delta_Lake_cdc.employee_CDC
Validate the Changes in Delta Table
%sql
select * from delta_Lake_cdc.employee_CDC order by empid
Limitation:
Delta Format doesn't support parquet format tables, so we can create extrenal table in HIVE. All the consumer should use data bricks only.
No comments:
Post a Comment