Tuesday, August 21, 2007

Programmatically creating a Dataflow with Merge join Component

Overview

Microsoft SQL Server 2005 Integration Services(a.k.a SSIS) is robust ETL(Extract,Transform & Load) Tool used for creating Data Integration solutions and Data warehouses.

There are two ways of creating packages in SSIS :


  • One is by using the Business Intelligence Development Studio( Popularly called BIDS) , which is an IDE having drag and drop features enabling us to create packages and the tasks and components inside it.
  • Second is programmatically creating packages using C#.net or VB.net codes.This aspect is especially very important when we have to create packages by reading the metadata of some external application.

Getting Started

In this blog i am going to discuss how to Programmatically creating a package having a dataflow which uses a merge component.The whole thing that i am writing is completely based on my experience.The .net language that i am going to use here is C#.net.

Our aim is to create a package . Inside the package we will put a Data Flow task and inside the Data Flow we will have two OLEDB Source components both of which connects to a merge component and finally the merge connects to a OLEDB destination component.The whole thing will look something like this in the BIDS designer.

The C#.net program follows for programmatically creating the above package.In this program we are loading a pre created Integration services project solution containing an empty package.For executing this program we need to reference the following .dll files

  • Microsoft.SQLServer.ManagedDTS.dll
  • Microsoft.SqlServer.DTSPipelineWrap.dll
  • Microsoft.SQLServer.DTSRuntimeWrap.dll
  • Microsoft.SqlServer.PipelineHost.dll

The Program


using System;
using System.Collections.Generic;
using System.Text;
using System.Data.SqlClient;
using Microsoft.SqlServer.Dts.Runtime;
using dtrw = Microsoft.SqlServer.Dts.Runtime.Wrapper;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using System.Runtime.InteropServices;
namespace Microsoft.Samples.SqlServer.Dts
{
class Program
{
static void Main(string[] args)
{

/* Loading a pre created package.The Package object is the top-level container for all other objects in an SSIS project solution. As the top-level container, the package is the first object created, and subsequent objects are added to it, and then executed within the context of the package.Various tasks and ConnectionManager objects will be added to the package */

Package package;
string mypkg = @"C:\Documents and Settings\user\My Documents\Visual Studio 2005\Projects\Integration Services Project1\Integration Services Project1\Package.dtsx";
Application app = new Application();
package = app.LoadPackage(mypkg, null);

/* Adding a Data Flow task. Different Pipeline Components will be added to it.The taskhost that we are using here is a wrapper to the data flow task.The task here is accessed through the innerObject property of the taskhost class*/

TaskHost task1 = (TaskHost)package.Executables.Add("DTS.Pipeline");
task1.Name = "Dataflow to using merge component";
task1.Description = "data flow task";
MainPipe dataflow = task1.InnerObject as MainPipe;

try
{

/* Add a OLEDB connection manager. The ConnectionManager class used represents physical connections to external data sources*/

ConnectionManager cm = package.Connections.Add("OLEDB");
cm.ConnectionString = "Data Source=computer name;Initial Catalog=AdventureWorks;Provider=SQLOLEDB.1;Integrated Security=SSPI;Persist Security Info=False;Auto Translate=False;";
cm.Name = "Oledb Connection Manager";
cm.Description = "connect to sqlserver 2005";

// create the First source


IDTSComponentMetaData90 source1 = dataflow.ComponentMetaDataCollection.New();
source1.ComponentClassID = "DTSAdapter.OleDbSource";

//Create the designtime instance of the component
CManagedComponentWrapper src1Designtime = source1.Instantiate();

/*Call the ProvideComponentProperties method. This method initializes a newly created component by creating its custom properties and its input and output objects*/

/* Note: The CustomPropertyCollection of the component contains a collection of IDTSCustomProperty90 objects specific to the component.Components only populate their custom property collections when you call the ProvideComponentProperties method*/

src1Designtime.ProvideComponentProperties();
source1.Name = "OLEDB source 1";
source1.Description = "source 1";
src1Designtime.SetComponentProperty("AccessMode", 0);
src1Designtime.SetComponentProperty("OpenRowset", "Production.Product");
Console.WriteLine("Source1 created and configured Successfully");



//create the second source


IDTSComponentMetaData90 source2 = dataflow.ComponentMetaDataCollection.New();
source2.ComponentClassID = "DTSAdapter.OleDbSource";
CManagedComponentWrapper src2Designtime = source2.Instantiate();
src2Designtime.ProvideComponentProperties();
source2.Name = "OLEDB Source 2";
src2Designtime.SetComponentProperty("AccessMode", 0);
src2Designtime.SetComponentProperty("OpenRowset", "Production.ProductSubcategory");
Console.WriteLine("Source2 created and configured Successfully");

/*Specify the connection managers for Source1 and source2.The Connections class is a collection of the connection managers that have been added to that package and are available for use at run time*/

if (source1.RuntimeConnectionCollection.Count > 0)
{
source1.RuntimeConnectionCollection[0].ConnectionManagerID = cm.ID;
source1.RuntimeConnectionCollection[0].ConnectionManager = DtsConvert.ToConnectionManager90(cm);
}
if (source2.RuntimeConnectionCollection.Count > 0)
{
source2.RuntimeConnectionCollection[0].ConnectionManagerID = cm.ID;
source2.RuntimeConnectionCollection[0].ConnectionManager = DtsConvert.ToConnectionManager90(cm);
}

/* Call the ReinitializeMetaData method to initialize the columns in the outputs of a source component.Call the AcquireConnections method before calling ReinitializeMetaData to provide the component access to its external data source and the ability to populate its column metadata. Finally, call the ReleaseConnections method to release the connection.*/

src1Designtime.AcquireConnections(null);
src1Designtime.ReinitializeMetaData();
src1Designtime.ReleaseConnections();

src2Designtime.AcquireConnections(null);
src2Designtime.ReinitializeMetaData();
src2Designtime.ReleaseConnections();

//create the Merge Transformation

IDTSComponentMetaData90 merge = dataflow.ComponentMetaDataCollection.New();
merge.ComponentClassID = "DTSTransform.MergeJoin";
CManagedComponentWrapper mergeDesigntime = merge.Instantiate();
mergeDesigntime.ProvideComponentProperties();
merge.Name = "Merge Source1 and source2";
Console.WriteLine("merge created ");
merge.InputCollection[0].ExternalMetadataColumnCollection.IsUsed = false;
merge.InputCollection[0].HasSideEffects = false;
merge.InputCollection[1].ExternalMetadataColumnCollection.IsUsed = false;
merge.InputCollection[1].HasSideEffects = false;

//create path from source1 to merge


IDTSPath90 pathSRC1merge = dataflow.PathCollection.New();
pathSRC1merge.AttachPathAndPropagateNotifications(source1.OutputCollection[0], merge.InputCollection[0]);


//create path from source2 to merge

IDTSPath90 pathSrc2merge = dataflow.PathCollection.New();
pathSrc2merge.AttachPathAndPropagateNotifications(source2.OutputCollection[0], merge.InputCollection[1]);

/*Set the IsSorted and SortKeyposition for source1 & source2 . The SortKeyPosition for the sources should be set after doing ReinitializeMetaData */

source1.OutputCollection[0].IsSorted = true;
source1.OutputCollection[0].OutputColumnCollection[18].SortKeyPosition = 1;

source2.OutputCollection[0].IsSorted = true;
source2.OutputCollection[0].OutputColumnCollection[0].SortKeyPosition = 1;

IDTSInput90 mergeInput1 = merge.InputCollection[0];
IDTSVirtualInput90 vMergeInput1 = mergeInput1.GetVirtualInput();
foreach (IDTSVirtualInputColumn90 vColumn in vMergeInput1.VirtualInputColumnCollection)
{
if ((vColumn.Name == "MakeFlag") (vColumn.Name == "Color"))
{
continue;
}
mergeDesigntime.SetUsageType(
mergeInput1.ID, vMergeInput1, vColumn.LineageID,
DTSUsageType.UT_READONLY);
}
IDTSInput90 mergeInput2 = merge.InputCollection[1];
IDTSVirtualInput90 vMergeInput2 = mergeInput2.GetVirtualInput();
foreach (IDTSVirtualInputColumn90 vColumn in vMergeInput2.VirtualInputColumnCollection)
{
if ((vColumn.Name == "Name") (vColumn.Name == "ModifiedDate") (vColumn.Name == "rowguid") (vColumn.Name == "ProductSubcategoryID"))
{ continue; }
mergeDesigntime.SetUsageType(
mergeInput2.ID, vMergeInput2, vColumn.LineageID, DTSUsageType.UT_READONLY);

}

//set the custom properties for merge

IDTSCustomProperty90 property1 = merge.CustomPropertyCollection[0];
property1.Value = 2;
IDTSCustomProperty90 property2 = merge.CustomPropertyCollection[1];
property2.Value = 1;
IDTSOutput90 ououy = merge.OutputCollection[0];
ououy.OutputColumnCollection[8].Name = "jsjsjs";
mergeDesigntime.AcquireConnections(null);
mergeDesigntime.ReinitializeMetaData();
mergeDesigntime.ReleaseConnections();

//create destination component

IDTSComponentMetaData90 dest = dataflow.ComponentMetaDataCollection.New();
dest.ComponentClassID = "DTSAdapter.OLEDBDestination";
CManagedComponentWrapper destDesigntime = dest.Instantiate();
destDesigntime.ProvideComponentProperties();
dest.Name = "OLEDB Destination";
destDesigntime.SetComponentProperty("AccessMode", 0);
destDesigntime.SetComponentProperty("OpenRowset", "dbo.merge1");

if (dest.RuntimeConnectionCollection.Count > 0)
{
dest.RuntimeConnectionCollection[0].ConnectionManagerID = cm.ID;
dest.RuntimeConnectionCollection[0].ConnectionManager = DtsConvert.ToConnectionManager90(cm);
}

//create path from merge to destination

IDTSPath90 pathMergeDest = dataflow.PathCollection.New();
pathMergeDest.AttachPathAndPropagateNotifications(merge.OutputCollection[0], dest.InputCollection[0]);
destDesigntime.AcquireConnections(null);
destDesigntime.ReinitializeMetaData();
destDesigntime.ReleaseConnections();
IDTSInput90 DestInput = dest.InputCollection[0];
IDTSVirtualInput90 vDestInput = DestInput.GetVirtualInput();
foreach (IDTSVirtualInputColumn90 vColumn in vDestInput.VirtualInputColumnCollection)
{
destDesigntime.SetUsageType(
DestInput.ID, vDestInput, vColumn.LineageID, DTSUsageType.UT_READWRITE);
}

//map external metadata to the inputcolumn


int index = 0;
foreach (IDTSInputColumn90 inputColumn in DestInput.InputColumnCollection)
{
Console.WriteLine(DestInput.ExternalMetadataColumnCollection.Count);
IDTSExternalMetadataColumn90 exMetaColumn = DestInput.ExternalMetadataColumnCollection[index];
index = index + 1;
exMetaColumn.CodePage = inputColumn.CodePage;
exMetaColumn.DataType = inputColumn.DataType;
exMetaColumn.Length = inputColumn.Length;
Console.WriteLine(exMetaColumn.ID);
destDesigntime.MapInputColumn(DestInput.ID, inputColumn.ID, exMetaColumn.ID);
if (DestInput.ExternalMetadataColumnCollection.Count < (index + 1)) { break; } } src1Designtime.AcquireConnections(null); src1Designtime.ReinitializeMetaData(); src1Designtime.ReleaseConnections();

src2Designtime.AcquireConnections(null);
src2Designtime.ReinitializeMetaData();
src2Designtime.ReleaseConnections();

destDesigntime.AcquireConnections(null);
destDesigntime.ReinitializeMetaData();
destDesigntime.ReleaseConnections();

//save the application

app.SaveToXml(@"C:\Documents and Settings\user\My Documents\Visual Studio 2005\Projects\Integration Services Project1\Integration Services Project1\Package.dtsx", package, null);
Console.WriteLine("saved");
Console.Read();

}
catch (COMException cexp)
{
Console.WriteLine("Exception occured : " + cexp.TargetSite + cexp);
Console.Read();
}
catch (System.ArgumentException argex)
{
Console.WriteLine(argex);
Console.Read();
}
catch (DtsRuntimeException runex)
{
Console.WriteLine("Exception---" + runex);
Console.Read();
}
}
}
}







free search engine submission

3 comments:

Anonymous said...

Thanks for this article, this help me on design dynamic SSIS package generation.

Anonymous said...

Thanks for this artical but when I am trying to use this ,it gives me error as Methid name expected for if ((vColumn.Name == "MakeFlag") these lines wherevever used in code

Anonymous said...

Great explanation on creating SSIS components. I'm trying to create a repeatable process for creating packages needed for building a 1-to-1 mapping between oracle source tables and SQL destination tables. Do you have any examples of this that you would mind sharing? So far, everything I've seen either references Adventureworks or lives exclusively within the SQL world (not heterogeneous databases).

Thanks!