In the last post I covered creating a wrapper function for
use within an SSIS package and staging tables in the data warehouse. This post
will deal with creating the SSIS package(s) itself.
As I mentioned in my first post about Change Data Capture
(CDC) , SQL Server Books Online has a very good step-by-step how-to on setting
up CDC and the related SSIS package to perform incremental loads to a data
warehouse. My series of posts is meant to make that set up and implementation a
little easier by somewhat automating the script and object generation process.
I’ve created a sample dtsx file that you can download. Below
I will take you through the various control flow items and how to generate the
SQL and VB or .NET scripts needed for each. However, I don’t intend to explain
in detail how to create the package. Please see the MSDN or BOL help files for
a detailed, step-by-step guide.
The package comprises the following steps broken out into
two main sections:
1.
1. Control Flow items that are run for the entire
CDC incremental load
a.
Truncating any existing staging tables (only if
using the ‘all with merge’ option in the wrapper functions).
b.
Calculating the data retrieval interval –
essentially you are setting the start and end dates and times of the data you
want to load. In my example I’m grabbing 4 hours’ worth of data between 6 and 2
hours ago.
c.
Checking whether the data is ready – due to
latency and other transaction related lags or delays there is a chance that not
all the data will be ready for loading. So this package has a built in loop to
check for the data a certain number of times before giving up. This is also why
I’ve added the 2 hour padding into the retrieval interval above.
2.
2. Control Flow items that need to be created for
each capture instance. These can also be set up as independent packages
launched by the package steps above. However, I’m sticking with doing
everything within one SSIS package.
a.
Dynamically preparing the query to retrieve data
from each capture instance
b.
Retrieving the data and loading by either
i.
Applying the insert, update and delete changes
(If ‘all with merge’ IS NOT being used)
ii.
Applying the delete and merge changes (If ‘all
with merge’ IS being used)
3.
The following variables will need to be added at the package scope level. The names need not be exactly as stated below, but these are what I used in my sample package
The following variables will need to be added at the package scope level. The names need not be exactly as stated below, but these are what I used in my sample package
a.
DataReady Int32
b.
DelaySeconds Int32
c.
ExtractedStartTime DateTime
d.
ExtractEndtime DateTime
e.
IntervalID Int32
f.
SqlDataQuery String
g.
TimeoutCeiling Int32
h.
TimeoutCount Int32
Section 1
1.
Truncate staging tables
a.
Create an ‘Execute SQL Task’ Control Flow in the
SSIS package
b.
Run the following script on the source database
and add the resulting truncate table statements into the Execute SQL Task
select 'TRUNCATE
TABLE stg' + t.name
from sys.tables as t
where is_tracked_by_cdc = 1
2. Calculating interval for incremental load –
Create an ‘Execute SQL Task’ with the ResultSet set to Single row. Map the
result set columns ExtractStartTime and ExtractEndTime to variables
User::ExtractStartTime and User::ExtractEndTime respectively.
SELECT CAST(CONVERT (CHAR(15),DATEADD(hh,-6, GETDATE()),113) + '00:00' AS DATETIME) AS ExtractStartTime,
CAST(CONVERT (CHAR(15),DATEADD(hh,-2, GETDATE()),113) + '00:00' AS DATETIME) AS ExtractEndTime
3. Check whether the data is ready
a.
Add a ForLoop container into the package and
inside this container add
i.
Execute SQL Task: Set the ResultSet to Single
row. Map the result set columns DataReady and TimeOutCount to the variables
User::DataReady and User::TimeOutCount respectively. Then add the following
query:
declare @DataReady int, @TimeoutCount int
if not exists (select tran_end_time
from cdc.lsn_time_mapping
where tran_end_time >
? )--@ExtractEndTime
select @DataReady = 0
else
if ? = 0
select @DataReady = 3
else
if not exists (select tran_end_time
from cdc.lsn_time_mapping
where tran_end_time <=
? )--@ExtractStartTime
select @DataReady = 1
else
select @DataReady = 2
select @TimeoutCount = ?
if (@DataReady = 0)
select @TimeoutCount =
@TimeoutCount + 1
else
select @TimeoutCount =
0
if (@TimeoutCount
> ?)
select @DataReady = 5
select @DataReady as DataReady, @TimeoutCount as
TimeoutCount
Map the input parameters as follows:
Variable name
|
Parameter name
|
User::ExtractEndTime
|
0
|
User::IntervalID
|
1
|
User::ExtractStartTime
|
2
|
User::TimeoutCount
|
3
|
User::TimeoutCeiling
|
4
|
ii. Add two Script Tasks, one as a delay if the above query establishes the
data is not ready and, in order to avoid an infinite loop, one to quit the
package and raise/log an error. These can be written in tsql, VB or C#. Tsql is
the least efficient of the three (and if using tsql then replace the Script
Task with an Execute SQL Task). My example uses C# for the delay and VB.Net for
the exit loop (I’m not proficient in either so I used what I best understood).
Section 2
1.
Create sql statement to execute wrapper function
for each table (or more precisely, each capture instance. Once again, this can
be written either as a Script Task in C#/VB or as an Execute SQL Task in tsql.
My example uses C#. Run the following script on the source database to return a
list of C# scripts for each capture instance. I’ve included the parameter @function_prefix in order to identify the wrapper functions in the
database by their name prefix. When I created my custom wrapper functions in
the last post, I named them using the prefix ‘fn_wrp_’. But use whatever fits your naming conventions
best.
Specify the following as the ReadOnlyVariables: User::ExtractEndTime,User::ExtractStartTime
Specify
as the ReadWriteVariable: User::SqlDataQuery
DECLARE @function_prefix varchar(10) --the pattern to search
for in the CDC functions
SET @function_prefix =
'fn_wrp_'
SELECT name as
fn_name, 'int dataReady;
System.DateTime extractStartTime;
System.DateTime extractEndTime;
string sqlDataQuery;
dataReady = (int)Dts.Variables["DataReady"].Value;
extractStartTime =
(System.DateTime)Dts.Variables["ExtractStartTime"].Value;
extractEndTime =
(System.DateTime)Dts.Variables["ExtractEndTime"].Value;
if (dataReady == 2)
{
sqlDataQuery = "SELECT * FROM dbo.' +
+ '(''" +
string.Format("{0:yyyy-MM-dd hh:mm:ss}", extractStartTime) +
"'', ''" + string.Format("{0:yyyy-MM-dd hh:mm:ss}",
extractEndTime) + "'',''all with merge'')";
}
else
{
sqlDataQuery = "SELECT * FROM
dbo.fn_net_changes_HumanResources_JobCandidate(null" + ", ''" +
string.Format("{0:yyyy-MM-dd hh:mm:ss}", extractEndTime) +
"'',''all with merge'')";
}
Dts.Variables["SqlDataQuery"].Value = sqlDataQuery;
Dts.TaskResult = (int)ScriptResults.Success;' AS [.NET Script]
FROM sys.objects
WHERE name like '%'
+ @function_prefix +
'%'
2.
Add the Data Flow Task that uses the above
prepared query to load the data into the data warehouse. This Data Flow Task
contains several items:
a.
OLE DB Source item
This is defined with the Data Access Mode set to ‘SQL command from
variable’ and uses the variable as defined by the script task we created in
point 4. Select the Variable name User::SqldataQuery. Select the columns you
want in the result set.
b.
Conditional Split item – based on the CDC
Operation point to the appropriate OLE DB destination or command. Since my
example uses the ‘all with merge’ @row_filter_option I have set two conditions:
i.
Inserts or updates: CDC_OPERATION ==
"M"
ii.
Deletes: CDC_OPERATION == "D"
c.
OLE DB Destination items
i.
If using the ‘all with merge’ @row_filter_option
1.
OLE DB Command item for the delete
Run
the following query on the source database to get delete statements for all the
tables tracked by CDC (it assumes the data warehouse tables are named the same
as the source tables):
SELECT 'delete
from dbo.' + t.name
+ ' where ' + c.name + ' = ?'
FROM sys.tables as t
JOIN sys.columns as c on c.object_id = t.object_id
where t.is_tracked_by_cdc
= 1 and c.is_identity = 1
2.
OLE DB Destination item for the insert into the
staging tables – As this is a standard you will need to create this manually.
ii.
If using the ‘all’ or ‘all with mask’ @row_filter_option
1.
OLE DB Command items for the delete – See above
2.
OLE DB Command items for the update
3.
OLE DB Destination item for the insert – As this
is a standard you will need to create this manually.
3.
Lastly, add an Execute SQL Task item to perform
the Merge. This step will look at a staging table and insert rows that don’t
yet exist in the data warehouse table or update them if they do. I’ve created a
script to aid in the creation of the MERGE statement. The resulting sql
statements NEED to be reviewed before implementing. I do not guarantee the results.
Run the following query on the source database (it assumes the data warehouse
tables are named the same as the source tables):
select t.name, 'MERGE dbo.' + isnull(t.name , '') + ' AS target
USING (SELECT ' + substring((SELECT ', ' + isnull(c.name , '')
from sys.columns as c
where t.object_id = c.object_id
for xml path('')),2,65536) + '
FROM
dbo.tmp' + isnull(t.name , '') + ') AS source (' + substring((SELECT ', ' + isnull(c.name , '')
from sys.columns as c
where t.object_id = c.object_id
for xml path('')),2,65536) + ')
ON (target.' + (select isnull(c.name , '')
from sys.columns as c
where t.object_id = c.object_id and c.is_identity = 1) + ' = source.' +
(select isnull(c.name , '')
from sys.columns as c
where t.object_id = c.object_id and c.is_identity = 1) + ')
WHEN MATCHED THEN
UPDATE
SET ' + substring((SELECT ', ' + isnull(c.name , '') + ' = source.' + isnull(c.name , '')
from sys.columns as c
join sys.columns as c1 on c.column_id = c1.column_id and c.object_id = c1.object_id
where t.object_id = c.object_id and c.is_identity != 1
for xml path('')),2,65536) + '
WHEN NOT MATCHED THEN
INSERT (' + substring((SELECT ', source.' + isnull(c.name , '')
from sys.columns as c
where t.object_id = c.object_id
for xml path('')),2,65536) + ')
VALUES (' + substring((SELECT ', source.' + isnull(c.name , '')
from sys.columns as c
where t.object_id = c.object_id
for xml path('')),2,65536) + ');'
FROM sys.tables as t
where t.is_tracked_by_cdc
= 1
I’ve now outlined the main steps in creating an SSIS package
to incrementally extract data from an OLTP database into a data warehouse using
CDC tracked tables. The above is a very basic solution. In a real-life scenario
the data may be dealt with rather differently. For instance, many organisations
do not want to delete data from the data warehouse. Any delete CDC Operations
might instead be handled as an update in the DW, marking a row as having been
deleted, but keeping the data for reporting or auditing reasons. In some cases
the base data warehouse tables may not even update rows but insert a new row
for each change or delete for particular primary keys. There may also be instances where some of the
transforms might be performed during the extract phase using the wrapper
functions.
All my queries above can be modified to suit these and other
various scenarios. The main goal of this and the previous few posts, and I hope
I’ve achieved it, was to provide an overview of CDC and one of its uses:
incrementally extracting data with an SSIS package, as
well as providing some shortcuts to make the implementation a bit easier.
No comments:
Post a Comment