Aim: To use an SSIS Script Task to import csv files where the metadata is different for each file.
Recently I had a task to do a one-off import of data from a large number of csv files where the metadata was different for each file. For example, the number of columns, names of column & data types differed for each file.
Thanks to TechBrothersIT, I found a script that I could amend to suit my requirements: http://www.techbrothersit.com/2015/01/how-to-create-tables-for-multiple-pipe.html. The amended script generates T-SQL statements to create the relevant tables & import all the data by looping through each row of the file. An example of the type of data which can be imported is the bulk data found on the Scottish Census website: http://www.scotlandscensus.gov.uk/ods-web/data-warehouse.html.
Create the SSIS package:
Launch SSDT & create (or open) an SSIS project. In a new package, add an ADO.NET connection manager for the destination database & rename it to ADO_Dest_DB (this is referenced in the script, therefore the script will need to be amended if the name of the connection manager is altered).
Create the following variables to pass information to the Script Task:
- loggingLevel: indicates how much information is to be logged (useful for debugging)
- 0 = no logging
- 1 = minimal logging (file details)
- 2 = moderate logging (file details & table creation T-SQL statements)
- 3 = full logging (file details, table creation & data insert T-SQL statements)
- executeSQL: indicates whether the generated T-SQL statements should be executed against the destination database
- 0 = don’t execute (useful for debugging when used with loggingLevel > 0)
- 1 = execute
- ParentSourceDirectory: the path of the files to be imported excluding the folder that contains them
- ChildSourceDirectory: the folder containing the files (don’t include backslashes). This will be the prefix for the table name so that associated tables can be easily identified as part of the same group. The entire path of the files is generated using ParentSourceDirectory & ChildSourceDirectory.
Drag a Script Task onto the Control Flow & edit the task to add the newly created variables to the ReadOnlyVariables field.
Edit the script & take note of the script identifier. Paste the following script into ScriptMain.cs & replace ********************************** with the script identifier. Add the Reference Microsoft.VisualBasic & exit the Script Task Editor.
#region Namespaces
using System;
using System.Data;
using Microsoft.SqlServer.Dts.Runtime;
using System.Windows.Forms;
using System.IO;
using System.Data.SqlClient;
using Microsoft.VisualBasic.FileIO;
#endregion
namespace **********************************
{
[Microsoft.SqlServer.Dts.Tasks.ScriptTask.SSISScriptTaskEntryPointAttribute]
public partial class ScriptMain : Microsoft.SqlServer.Dts.Tasks.ScriptTask.VSTARTScriptObjectModelBase
{
public void Main()
{
// variable which control logging & whether or not the SQL statement is executed
int loggingLevel = (int)Dts.Variables["User::loggingLevel"].Value; // 0 = no logging, 1 = minimal (file), 2 = moderate (table creation), 3 = full (all queries)
int executeSQL = (int)Dts.Variables["User::executeSQL"].Value; // 0 = don't execute SQL statement on the server, 1 = execute SQL statement
byte[] emptyBytes = new byte[0];
try
{
// Connect to ADO Connection - this should already exist in the Connection Managers section of the SSIS package & connect to the destination SQL Server database
SqlConnection myADONETConnection = new SqlConnection();
myADONETConnection = (SqlConnection)(Dts.Connections["ADO_Dest_DB"].AcquireConnection(Dts.Transaction) as SqlConnection);
// Read the file names one by one
string SourceDirectoryParent = Dts.Variables["User::ParentSourceDirectory"].Value.ToString();
string SourceDirectoryChild = Dts.Variables["User::ChildSourceDirectory"].Value.ToString();
string SourceDirectory = SourceDirectoryParent + SourceDirectoryChild;
string line1 = "";
string[] fileEntries = Directory.GetFiles(SourceDirectory);
// Logging (start)
if (loggingLevel >= 1)
{
Dts.Log("Source Directory: " + SourceDirectory, 0, emptyBytes);
}
// Loop through files
foreach (string fileName in fileEntries)
{
if (loggingLevel >= 1)
{
// Logging (start of file)
Dts.Log("File (start): " + fileName, 0, emptyBytes);
}
string[] tablefields;
int fieldnum = 0;
string columnlist = "";
// Read the first line of each file and assign to variable
TextFieldParser file2 = new TextFieldParser(fileName);
file2.HasFieldsEnclosedInQuotes = true;
file2.SetDelimiters(",");
tablefields = file2.ReadFields();
// Loop through fields in first line, give them a place-holder name if empty (e.g. Col_0) & assign to a variable
foreach (string tablefield in tablefields)
{
if (tablefield.Length == 0)
{
columnlist = columnlist + "] VARCHAR(255),[Col_" + fieldnum.ToString();
}
else if (tablefield.Length > 128)
{
columnlist = columnlist + "] VARCHAR(255),[" + tablefield.Substring(0, 127);
}
else
{
columnlist = columnlist + "] VARCHAR(255),[" + tablefield;
}
fieldnum++;
}
columnlist = columnlist.Remove(0, 16);
// Construct the T-SQL statement to DROP & CREATE the table for the current file
string filenameprefix = SourceDirectoryChild.Replace(" ", "");
string filenameonly = filenameprefix + "_" + (((fileName.Replace(SourceDirectory, "")).Replace(".csv", "")).Replace("\\", ""));
line1 = (" IF EXISTS (SELECT * FROM sys.objects WHERE object_id = OBJECT_ID(N'[dbo].[" + filenameonly + "]') AND type in (N'U')) DROP TABLE [dbo].[" + filenameonly
+ "]; CREATE TABLE [dbo].[" + filenameonly + "] ([" + columnlist + "] VARCHAR(255));").Replace(".csv", "");
// Close the connection to the file
file2.Close();
if (loggingLevel >= 2)
{
// Logging (start of table creation)
Dts.Log("Table creation (start): " + line1, 0, emptyBytes);
}
// Execute the T-SQL statement to DROP & CREATE the table
if (executeSQL == 1)
{
SqlCommand myCommand = new SqlCommand(line1, myADONETConnection);
myCommand.ExecuteNonQuery();
}
if (loggingLevel >= 2)
{
// Logging (end of table creation)
Dts.Log("Table creation (end)", 0, emptyBytes);
}
// Write the data from the file into the table
int counter = 0;
string[] fields;
string columnname = "";
string columnvalues = "";
TextFieldParser SourceFile = new TextFieldParser(fileName);
SourceFile.HasFieldsEnclosedInQuotes = true;
SourceFile.SetDelimiters(",");
// Loop through all lines on the file & construct the T-SQL statement to INSERT INTO ... VALUES(...)
while ((fields = SourceFile.ReadFields()) != null)
{
// For the first line of the file (which contains the headers), construct the INSERT INTO ... segment
if (counter == 0)
{
fieldnum = 0;
columnname = "";
foreach (string field in fields)
{
if (field.Length == 0)
{
columnname = columnname + "],[Col_" + fieldnum.ToString(); // this shouldn't be counter, it should be field number
}
else if (field.Length > 128)
{
columnname = columnname + "],[" + field.Substring(0, 127);
}
else
{
columnname = columnname + "],[" + field;
}
fieldnum++;
}
columnname = columnname.Remove(0, 3);
}
// For all other lines of the file (which contains the headers), construct the VALUES(...) segment
else
{
columnvalues = "";
foreach (string field in fields)
{
columnvalues = columnvalues + "','" + field.Replace("'", "");
}
columnvalues = columnvalues.Remove(0, 3);
string query = "INSERT INTO [dbo].[" + filenameonly + "] ([" + columnname + "]) VALUES('" + columnvalues + "');";
if (loggingLevel == 3)
{
// Logging (start of table population)
Dts.Log("Table population (start): " + query, 0, emptyBytes);
}
// Execute the T-SQL statement to INSERT INTO ... VALUES(...)
if (executeSQL == 1)
{
SqlCommand myCommand1 = new SqlCommand(query, myADONETConnection);
myCommand1.ExecuteNonQuery();
}
if (loggingLevel == 3)
{
// Logging (end of table population)
Dts.Log("Table population (end)", 0, emptyBytes);
}
}
// Move onto the next line
counter++;
}
SourceFile.Close();
if (loggingLevel >= 1)
{
// Logging (end of file)
Dts.Log("File (end)", 0, emptyBytes);
}
}
Dts.TaskResult = (int)ScriptResults.Success;
}
catch (Exception e)
{
Dts.Log(string.Format("Script Task encountered an unexpected error. Message: {0}.", e.Message), 0, emptyBytes);
Dts.TaskResult = (int)ScriptResults.Failure;
}
}
#region ScriptResults declaration
enum ScriptResults
{
Success = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Success,
Failure = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Failure
};
#endregion
}
}
Back in the Control Flow, enable logging by right clicking the background and set logging to a csv file (DataImport_log.csv) for the Script Task. This will automatically create a new connection manager for the log file.
In the Details tab, tick ScriptTaskLogEntry.
Test the import script:
Set the appropriate values for the loggingLevel & executeSQL variables.
Click the “Play” button to start debugging & run the script
Check the destination database to confirm that the tables have been created & contain the imported data.
If any errors occur, check the details in the log file (DataImport_log.csv) to confirm that the T-SQL has been generated correctly.
Be First to Comment