Sunday, March 29, 2020

Row Level Security for Data Bricks Metastore Database



Problem Statement:

Team decided to maintain a common metastore database for all the source deployed in Azure Data Lake. Since the database is common across all the compute, it will allow developer to create\drop\modify tables in a database which team doesn’t own it. We need to ensure proper governance is maintained to prevent developer not to make any change to database which team doesn’t own it.

Solution Diagram:


Understand how Metastore Table are used internally:

Metastore is the central repository. It stores metadata for tables (like their schema and location) and partitions in a relational database. It provides client access to this information by using metastore service API.

                      dbo.DBS and dbo.TBLS are the two tables used by metastore whenever we create any table from any compute. DB_ID is the foreign key which used to validate between DBS and TBLS table.


What is ROW Level Security?


Row-Level Security enables you to use group membership or execution context to control access to rows in a database table. Row-Level Security (RLS) simplifies the design and coding of security in your application. RLS helps you implement restrictions on data row access.



RLS supports two types of security predicates:
·       Filter predicates silently filter the rows available to read operations (SELECT, UPDATE, and DELETE).
We’re not going to implement this predicate because we want everyone to read other source system tables.

·       Block predicates explicitly block write operations (AFTER INSERT, AFTER UPDATE, BEFORE UPDATE, BEFORE DELETE) that violate the predicate.
We’re going to implement this block predicate and it will prevent developer to create table which they don’t own it.


What is Databricks Cluster Username?

By Default, Databricks cluster use ROOT as a user and all the session request to metastore will get established with ROOT only.




Procedure to apply this security:


Step 1: Create Schema to make sure it is separate from other purpose
                               create schema security;
go
Step 2: Create a function which accept UserName and DataBase ID as a parameter. And validate those value in DBS table. If it is valid it will return 1 else NULL
       create function [security].[fn_DBsecuritypredicate](@User as                  sysname,@db_id bigint)
RETURNS TABLE
with schemabinding
as
return select 1 as fn_DBsecuritypredicate_result from [dbo].[DBS]
where @user = [OWNER_NAME] and @db_id = [DB_ID];
GO

Step 3: Create a security policy to block the insert when the above condition is not satisfied
CREATE SECURITY POLICY DB_CHECK
ADD BLOCK PREDICATE security.fn_DBsecuritypredicate(owner,DB_ID) ON [dbo].[TBLS] AFTER INSERT
with (state=on)





Conclusion:

Post security policy implementation. I'm able to prevent use to create table in database which they don't own.



Wednesday, March 25, 2020

Data bricks Notebook With Parameter


Requirement:
             Create a data bricks notebook which accept parameter
Screenshot:

Below screenshot shows the sample data bricks job with parameter defined.


Below screenshot shows the data bricks notebook which is mentioned in the above screenshot.





Copy Table Schema From One Metastore To Another


Requirement:

Need to migrate the table schema from 2.1 to 3.2 and copy the data from Data Lake Gen1 to Gen2

Component Involved:

Azure Data bricks, Azure Data Lake

Language: Scala

Code Snippet:

%scala
val path = dbutils.widgets.get("path")
val tname = dbutils.widgets.get("tname")
val des_tname = dbutils.widgets.get("des_tname")

%scala
val data = sqlContext.read.parquet("adl://adls.azuredatalakestore.net/data/" + path) data.createOrReplaceTempView(tname)

%scala
val SQL_builder = StringBuilder.newBuilder
val tmp_qry = "show columns in " + tname
val df=spark.sql(tmp_qry)
val ins_query = "CREATE TABLE IF NOT EXISTS " + des_tname + " Location '" + "abfss://data@adls.dfs.core.windows.net/"+ path + "' AS" + " select "

SQL_builder.append(ins_query)

df.collect().foreach(row =>{
if(row.toString() == "[remove_column_list]"){}
else{SQL_builder.append(row.toString().replace("[","").replace("]","") + ",")}} )

SQL_builder.append("CASE WHEN modifycolumnname= '1' THEN 'Yes' ELSE 'No' end `hdisdeletedrecord` from ")

SQL_builder.append(tname)

spark.sql(SQL_builder.toString())

Monday, March 23, 2020

Change Data Capture Using Attunity Replicate and Delta Lake


Requirement:

Need to built an end to end pipeline to capture the changes in source system and replicate the data changes in real time to target application. Attunity Replicate will be used to replicate the data from source to stage and Delta Lake will be used to merge the data from stage to target.

Design Diagram:














Tools Used:

Attunity Replicate, Delta Lake, Data Bricks and Gen 2 Storage.

Solution:

  1. Attunity Replicate will identify the changes from Source System and populate FULL and INCREMENTAL table. 
  1. When any new row inserted in Source Attunity will insert the new record on FULL table and update\Delete records in INCREMENTAL table with Unique Handler. 
  1. Created Delta Lake Table and Merge the changes between FULL Vs INCREMENTAL table
Code:

Created External Table on top of replicate stage tables.

CREATE EXTERNAL TABLE delta_Lake_cdc.employee( # 
CREATE EXTERNAL TABLE delta_Lake_cdc.employee__ct( # 


Read the FULL Load Table and Populate DELTA Table

  val df=spark.sql("select  * from delta_Lake_cdc.employee")
 df.createOrReplaceTempView("employee") 

%sql
 CREATE TABLE IF NOT EXISTS delta_Lake_cdc.employee_CDC

 USING DELTA 
 LOCATION 'abfss://containername@gen2.dfs.core.windows.net/delta_Lake_cdc/dbo/employee_CDC'
 AS SELECT * FROM employee


Identify the changes between FULL and INCREMENTAL using below query and create _CT dateset

%scala
val df_emp_ct = spark.sql("select EmpID, EmpName, Designation, Department, JoiningDate from (select t1.* from (select header__operation,header__change_seq,empid,empname,designation,department,joiningdate from delta_Lake_cdc.employee__ct where header__operation = 'UPDATE') t1 join (select empid,max(header__change_seq) max_seq from (select  cast(header__change_seq as varchar(500)) as header__change_seq ,empid from delta_Lake_cdc.employee__ct where header__operation = 'UPDATE') t2 group by empid) s on t1.empid=s.empid and t1.header__change_seq = s.max_seq) t3")

df_emp_ct.createOrReplaceTempView("employee_ct")



Merge the Changes to Delta Table

%sql 
MERGE INTO delta_Lake_cdc.employee_CDC AS sink
USING employee AS source
 ON sink.empid = source.empid
WHEN MATCHED THEN
 UPDATE SET *
WHEN NOT MATCHED
 THEN INSERT * 

%sql 
MERGE INTO delta_Lake_cdc.employee_CDC AS sink
USING employee_ct AS source
 ON sink.empid = source.empid
WHEN MATCHED THEN
 UPDATE SET *
WHEN NOT MATCHED
 THEN INSERT * 


Once all the merge is completed apply optimize to improve the performance

%sql
OPTIMIZE delta_Lake_cdc.employee_CDC

Validate the Changes in Delta Table

%sql
select * from delta_Lake_cdc.employee_CDC order by empid

Limitation:

Delta Format doesn't support parquet format tables, so we can create extrenal table in HIVE. All the consumer should use data bricks only.




Call Data bricks Job Using REST API

Below power shell will help to call Data bricks Job with parameter  [Net.ServicePointManager]::SecurityProtocol = [Net.SecurityProtocolType]...