Sunday, 15 August 2021

Dynamically read the n number of CSV files and load these data into the respective table

 To loading the CSV file we are taking Data flow task and taking flat file as the source and loading that data into the destination table (stage table). It is ok if we have one file or two files then we can create the DFT and load the data into the table.

Suppose we have 100 different files in this case what can we do?

Either we need to create the DFT for each file and do the mapping and load that data into the database. It is a very tedious task. Suppose if we have to create these many DFT again if any files are added or remove we need to update the package.

If we have such a type of scenario then doesn’t worry. Here we will see how to load the n number of different types of files very easily.

See this demo:

We have n number of files as below. Each file having with a table name.

 

All files are having different structures.  Here we are assuming that files don’t have a header. Next, we will see if these files are having the header then how we will load these files.

   


Here we are having the staging tables.

   


Let’s see how we are loading it.

To loading these files first of all, we need to do the below things.

1.       Table name and file name must be the same. (if not the same in this case again we need to create the config table to map the file name with the table name). Here we are having the files name and tables name both are same.

2.       We are having the config table where we are storing the list of column names with a separator. In the files, we have Pipe (|) separator. So we have to store the columns with pipe separated.   


3.       If we are adding any files we need to add the list of the columns for that files.

4.       If we are doing any change in the file structure in this case, we need to update the columns list in the configuration table.

5.       Here for all tables, we have the same data type.( in the next post we will see the different data types).

Now we are creating the package.   

Foreach loop container is used to enumerate the files.

We are having some variables. which are used in this package.   

We are storing the file full path in the File_name.   

This task is used to truncate the staging table. We are taking the input as file_name which is having the file name with a full path. In the full path, we are extracting the file name and dynamically creating the truncate statement, and executing this statement.

DECLARE @full_path VARCHAR(500)=?,
        @table_nm  VARCHAR(500),
        @sql_str   NVARCHAR(500);

SET @table_nm = Replace(RIGHT(@full_path, Charindex('\', Reverse(@full_path))
                                          - 1), '.csv', '');
SET @sql_str ='Truncate table ' + @table_nm;

EXEC (@sql_str); 

 Now in the script task, we need to take variables.    

Writing the below code.

public void Main()

                                {

          

          //Reading the file information

            string File_Path = Dts.Variables["User::File_Name"].Value.ToString();

            FileInfo fi = new FileInfo(File_Path);

            string FileName = fi.Name;

            string table_Name= FileName.Replace(".csv", "");

            char FileDelimiter = '|';

            string targetPath = Dts.Variables["User::Target_Path"].Value.ToString(); 

            //Reading the csv file and storing that data into the datatable

            try 

            {          

               //Reading the list of columns from the database creating the datatable and adding the list of columns in this datatable.

                SqlConnection con = new SqlConnection("Data Source = BAGESH\\BAGESHDB; Initial Catalog =AdventureWorksDW2017_Stage; Integrated Security=true;");

                string sql = "select Table_Name,Columns_name from tbl_columns_config  where Table_name = '" + table_Name +"'";

                con.Open();

                SqlCommand cmd = new SqlCommand(sql, con);

                cmd.CommandType = CommandType.Text; 

                SqlDataReader rd = cmd.ExecuteReader();

                string Columns = null;

                while (rd.Read())

                {               

                    Columns = rd.GetString(1);                  

                } 

                DataTable dt = new DataTable();

                string[] Columns_List = Columns.Split(FileDelimiter);

                foreach (string column in Columns_List) 

                {

                    dt.Columns.Add(column); 

                } 

                //Reading the CSV file and adding the data into the datatable. 

                using (StreamReader reader = new StreamReader(File_Path))

                {    

                    while (!reader.EndOfStream) 

                    {

//if we have header in this file we need to scape that header line here. 

                        string[] rows = reader.ReadLine().Split(FileDelimiter);

                        DataRow dr = dt.NewRow();

                        for (int i = 0; i < Columns_List.Length; i++)

                        {

                            dr[i] = rows[i];

                        }                       

                        dt.Rows.Add(dr);                       

 

                    } 

                } 

                con.Close();

                // Creating Sql bulk copy command and mapping the table columns and data table columns and inserting the records in the table.

                SqlBulkCopy obj = new SqlBulkCopy(con); 

                con.Open(); 

                //assign Destination table name

                obj.DestinationTableName = table_Name; 

                foreach (string column in Columns_List)

                {

                    obj.ColumnMappings.Add(column, column);

                }             

                   obj.WriteToServer(dt);               

                con.Close(); 

                //After loading the file we are moving the files to archive dir

                string destFile = Path.Combine(targetPath, FileName); 

                File.Move(File_Path, destFile);

            }

//if any error occurs

            catch (Exception ex)

            {

                Dts.Events.FireError(0, "Fire Error", "An error occurred: " + ex.Message.ToString(), "", 0);

            } 

            Dts.TaskResult = (int)ScriptResults.Success;

                                }

 

Compile the code and save it.

Before running this package.

Records in the table.

  

List of files in the source dir.   

Archive folder.  

Let’s run this package.

  

Package executed successfully.

See the records in the tables.   

Source folder.   

Destination folder.  


Data loaded successfully.

Let’s see after few days we have added some other files.    

Adding both table in db.  

Now we need to add these table columns in the config table.

    

Now we are running this package without doing any changes in this package.  

Package executed successfully.

See the records in the table.

  

Data loaded successfully.

Let’s modify some file (add columns on the files).

First we need to add the column in the table and config table.

Adding two columns

City and sal

  

Now updating the config table.  

See the file.  

Let’s run this package.

   

Package executed successfully.

See the record in the table.  

Data loaded successfully.

Thanks for reading J

Popular Posts