To loading the CSV file we are taking Data flow task and taking flat file as the source and loading that data into the destination table (stage table). It is ok if we have one file or two files then we can create the DFT and load the data into the table.
Suppose we
have 100 different files in this case what can we do?
Either we
need to create the DFT for each file and do the mapping and load that data into
the database. It is a very tedious task. Suppose if we have to create these many
DFT again if any files are added or remove we need to update the package.
If we have
such a type of scenario then doesn’t worry. Here we will see how to load the n
number of different types of files very easily.
See this
demo:
We have n number of files as below. Each file having with a table name.
All files are having different structures. Here we are assuming that files don’t have a header. Next, we will see if these files are having the header then how we will
load these files.
Here we are having the staging tables.
Let’s see how we are loading it.
To loading these files
first of all, we need to do the below things.
1.
Table name and file name must be the same. (if not the same in this case again we need to create the config table to map the file name
with the table name). Here we are having the files name and tables name both
are same.
2. We are having the config table where we are storing the list of column names with a separator. In the files, we have Pipe (|) separator. So we have to store the columns with pipe separated.
3.
If we are adding any files we need to add the
list of the columns for that files.
4.
If we are doing any change in the file structure
in this case, we need to update the columns list in the configuration table.
5.
Here for all tables, we have the same data type.(
in the next post we will see the different data types).
Now we are creating the package.
Foreach loop
container is used to enumerate the files.
We are having some variables. which are used in this package.
We are storing the file full path in the File_name.
This task is used to truncate the staging table. We are taking
the input as file_name which is having the file name with a full path. In the
full path, we are extracting the file name and dynamically creating the truncate
statement, and executing this statement.
DECLARE @full_path VARCHAR(500)=?, |
Writing the below code.
public void Main() { //Reading the
file information string File_Path = Dts.Variables["User::File_Name"].Value.ToString(); FileInfo fi = new FileInfo(File_Path); string FileName = fi.Name; string table_Name= FileName.Replace(".csv", ""); char FileDelimiter = '|'; string targetPath = Dts.Variables["User::Target_Path"].Value.ToString(); //Reading the csv file and storing that data into the datatable try { //Reading
the list of columns from the database creating the datatable and adding the
list of columns in this datatable.
SqlConnection con = new SqlConnection("Data Source = BAGESH\\BAGESHDB; Initial Catalog
=AdventureWorksDW2017_Stage; Integrated Security=true;"); string sql = "select Table_Name,Columns_name from
tbl_columns_config where Table_name =
'" + table_Name +"'";
con.Open(); SqlCommand
cmd = new SqlCommand(sql, con); cmd.CommandType = CommandType.Text;
SqlDataReader rd = cmd.ExecuteReader(); string Columns = null; while (rd.Read()) {
Columns = rd.GetString(1); } DataTable
dt = new DataTable(); string[] Columns_List =
Columns.Split(FileDelimiter); foreach (string column in Columns_List) { dt.Columns.Add(column); } //Reading the CSV file and adding the data into the datatable. using (StreamReader reader = new StreamReader(File_Path)) { while (!reader.EndOfStream) { //if we have header in this file we need to scape that header line here. string[] rows = reader.ReadLine().Split(FileDelimiter);
DataRow dr = dt.NewRow(); for (int i = 0; i <
Columns_List.Length; i++) {
dr[i] = rows[i];
}
dt.Rows.Add(dr); } }
con.Close(); // Creating Sql bulk copy command and mapping the table columns
and data table columns and inserting the records in the table. SqlBulkCopy obj = new SqlBulkCopy(con); con.Open(); //assign Destination table name obj.DestinationTableName = table_Name; foreach (string column in Columns_List) {
obj.ColumnMappings.Add(column, column); } con.Close(); //After loading the file we are moving the files to archive dir string destFile = Path.Combine(targetPath, FileName);
File.Move(File_Path, destFile); } //if any error occurs catch (Exception ex) {
Dts.Events.FireError(0, "Fire Error", "An error occurred: " + ex.Message.ToString(), "", 0); } Dts.TaskResult
= (int)ScriptResults.Success; } |
Compile the
code and save it.
Before
running this package.
Records in
the table.
List of files in the source dir.
Archive folder.
Let’s run this package.
Package executed
successfully.
See the records in the tables.
Source folder.
Destination folder.
Data loaded
successfully.
Let’s see after few days we have added some other files.
Adding both table in db.
Now we need to add these table columns in the config table.
Now we are running this package without doing any changes in this package.
Package executed
successfully.
See the
records in the table.
Data loaded
successfully.
Let’s modify
some file (add columns on the files).
First we
need to add the column in the table and config table.
Adding two columns
City and sal
Now updating the config table.
See the file.
Let’s run this package.
Package executed
successfully.
See the record in the table.
Data loaded successfully.
Thanks for reading J
This is my first visit to your web journal! We are a group of volunteers and new activities in the same specialty. Website gave us helpful data to work. Alves Anus Dybala Dina Mika Mitali Manik Luis Eric Marlisa
ReplyDelete