Wednesday, 21 August 2024

Load CSV Files that have different number of columns using SSIS

Recently we got one requirement to load the csv file data into the table using SSIS. Loading CSV file data in SSIS is very easy task when we are getting the all column. But the issue when we are not sure that when we will get the correct file. Client was agree that he will share the csv files that may have different number of column. 

Let’s see the example.

For this demo we are taking pre poll survey (Opinion poll) csv file. In this CSV file few columns fixed but some of the column where may be we will get the data or may not get the data. For this we have created the csv file which is having first six columns are fixed I means for each records we will get these value and for the remaining ten columns are not fixed. For one record we may get remaining 2 column or 4 or 6 or 8 or all column. We are getting in the sequence. See the below file.


In this file Sno 1002 & 1008 having all column. Sno 1005& 1009 missing 4 columns. Sno 1001, 1004 & 1006 missing 6 columns. Sno 1007 missing 8 columns and Sno 1010 missing 10 column.

See this file in note pad.

Here we do not have the comma for the missing column.

To load this file we can use C# or SQL Scrubber script. In this example we will use SQL Scrubber.

First of all we need to load the CSV file in to the table in one column. Creating a stage table to load the row data.

CREATE TABLE stg_RowData(

                id int IDENTITY(1,1) NOT NULL,

                RowData varchar(max) NULL

                )

Table created successfully.

Now opening the SSDT and creating a package.

Creating Oledb and flat file connection. 

Taking Execute SQL task to truncate the stage table.

Now taking for each loop container to load more than one csv files. Inside the container we are taking the Dataflow task.

Taking source as Flat file and destination as oledb destination.

Doing the mapping.

Let’s load the data.


Data loaded successfully into the stage table and row data loaded successfully.

Now we need to scrub these data and load it into the staging table. Below is the staging and main table.

CREATE TABLE Stg_Survey(         

                 SNo                                                       INT NOT NULL,

                 Voter_Name                    VARCHAR(100) NULL,

                 Voter_Add                                        VARCHAR(100) NULL,

                 Constituency_Name     VARCHAR(50) NULL,

                 Constituency_State VARCHAR(50) NULL,

                 Pre_Poll_date                  VARCHAR(50)NULL,

                 Question_1                                       VARCHAR(200) NULL,

                 Answer_1                                          VARCHAR(200) NULL,

                 Question_2                                       VARCHAR(200) NULL,

                 Answer_2                                          VARCHAR(200) NULL,

                 Question_3                                       VARCHAR(200) NULL,

                 Answer_3                                          VARCHAR(200) NULL,

                 Question_4                                       VARCHAR(200) NULL,

                 Answer_4                                          VARCHAR(200) NULL,

                 Question_5                                       VARCHAR(200) NULL,

                 Answer_5                                          VARCHAR(200) NULL     

)

CREATE TABLE Survey(  

                 SNo                                                       INT NOT NULL,

                 Voter_Name                    VARCHAR(100) NULL,

                 Voter_Add                                        VARCHAR(100) NULL,

                 Constituency_Name     VARCHAR(50) NULL,

                 Constituency_State VARCHAR(50) NULL,

                 Pre_Poll_date                  DATE NULL,

                 Question_1                                       VARCHAR(200) NULL,

                 Answer_1                                          VARCHAR(200) NULL,

                 Question_2                                       VARCHAR(200) NULL,

                 Answer_2                                          VARCHAR(200) NULL,

                 Question_3                                       VARCHAR(200) NULL,

                 Answer_3                                          VARCHAR(200) NULL,

                 Question_4                                       VARCHAR(200) NULL,

                 Answer_4                                          VARCHAR(200) NULL,

                 Question_5                                       VARCHAR(200) NULL,

                 Answer_5                                          VARCHAR(200) NULL     

)

Table created successfully.

In the row staging data we have stored into comma separated. We need to split these rows into columns. To split the row we are using user defined function and giving the column sequence number for each columns.

Read here: DelimitedSplit8K function to split the delimited text

https://bageshkumarbagi-msbi.blogspot.com/2024/08/delimitedsplit8k-function-to-split.html

See below

Sno 1001 has 10 columns and Sno 1002 has 16 columns.

Now we are writing the sp to load the data into stage table and from stage to main table.

Below SP is used to load the data.

CREATE PROCEDURE Usp_load_survey_data

AS

  BEGIN

SET DATEFORMAT DMY;

      CREATE TABLE #temp_rowdata

        (

           sno         BIGINT,

           item        VARCHAR(256) NULL,

           colsequence INT

        );

 

      INSERT INTO #temp_rowdata

                  (sno,

                   item,

                   colsequence)

      SELECT LEFT(t.rowdata, Charindex(',', t.rowdata) - 1)

             AS

             sno,

             s.item,

             Row_number()

               OVER (

                 partition BY LEFT(t.rowdata, Charindex(',', t.rowdata) - 1)

                 ORDER BY LEFT(t.rowdata, Charindex(',', t.rowdata) - 1) ASC )

             AS

             ColSequence

      FROM   stg_rowdata AS t

             CROSS apply dbo.Delimitedsplit8k(t.rowdata, ',') AS s;

 

      INSERT INTO stg_survey

                  (sno,

                   voter_name,

                   voter_add,

                   constituency_name,

                   constituency_state,

                   pre_poll_date,

                   question_1,

                   answer_1,

                   question_2,

                   answer_2,

                   question_3,

                   answer_3,

                   question_4,

                   answer_4,

                   question_5,

                   answer_5)

      SELECT RowData.item    AS sno,

             RowData2.item   AS Voter_Name,

             RowData3.item   AS Voter_Add,

             RowData4.item   AS Constituency_Name,

             RowData5.item   AS Constituency_State,

             RowData6.item   AS Pre_Poll_date,

             Question_1.item AS Question_1,

             Answer_1.item   AS Answer_1,

             Question_2.item AS Question_2,

             Answer_2.item   AS Answer_2,

             Question_3.item AS Question_3,

             Answer_3.item   AS Answer_3,

             Question_4.item AS Question_4,

             Answer_4.item   AS Answer_4,

             Question_5.item AS Question_5,

             Answer_5.item   AS Answer_5

      FROM   #temp_rowdata AS RowData

             INNER JOIN (SELECT sno,

                                item,

                                colsequence

                         FROM   #temp_rowdata) AS RowData2

                     ON RowData2.sno = RowData.sno

                        AND RowData2.colsequence = 2

             INNER JOIN (SELECT sno,

                                item,

                                colsequence

                         FROM   #temp_rowdata) AS RowData3

                     ON RowData3.sno = RowData.sno

                        AND RowData3.colsequence = 3

             INNER JOIN (SELECT sno,

                                item,

                                colsequence

                         FROM   #temp_rowdata) AS RowData4

                     ON RowData4.sno = RowData.sno

                        AND RowData4.colsequence = 4

             INNER JOIN (SELECT sno,

                                item,

                                colsequence

                         FROM   #temp_rowdata) AS RowData5

                     ON RowData5.sno = RowData.sno

                        AND RowData5.colsequence = 5

             INNER JOIN (SELECT sno,

                                item,

                                colsequence

                         FROM   #temp_rowdata) AS RowData6

                     ON RowData6.sno = RowData.sno

                        AND RowData6.colsequence = 6

             OUTER apply (SELECT sno,

                                 item,

                                 colsequence

                          FROM   #temp_rowdata

                          WHERE  colsequence = 7

                                 AND sno = RowData.sno) AS Question_1

             OUTER apply (SELECT sno,

                                 item,

                                 colsequence

                          FROM   #temp_rowdata

                          WHERE  colsequence = 8

                                 AND sno = RowData.sno) AS Answer_1

             OUTER apply (SELECT sno,

                                 item,

                                 colsequence

                          FROM   #temp_rowdata

                          WHERE  colsequence = 9

                                 AND sno = RowData.sno) AS Question_2

             OUTER apply (SELECT sno,

                                 item,

                                 colsequence

                          FROM   #temp_rowdata

                          WHERE  colsequence = 10

                                 AND sno = RowData.sno) AS Answer_2

             OUTER apply (SELECT sno,

                                 item,

                                 colsequence

                          FROM   #temp_rowdata

                          WHERE  colsequence = 11

                                 AND sno = RowData.sno) AS Question_3

             OUTER apply (SELECT sno,

                                 item,

                                 colsequence

                          FROM   #temp_rowdata

                          WHERE  colsequence = 12

                                 AND sno = RowData.sno) AS Answer_3

             OUTER apply (SELECT sno,

                                 item,

                                 colsequence

                          FROM   #temp_rowdata

                          WHERE  colsequence = 13

                                 AND sno = RowData.sno) AS Question_4

             OUTER apply (SELECT sno,

                                 item,

                                 colsequence

                          FROM   #temp_rowdata

                          WHERE  colsequence = 14

                                 AND sno = RowData.sno) AS Answer_4

             OUTER apply (SELECT sno,

                                 item,

                                 colsequence

                          FROM   #temp_rowdata

                          WHERE  colsequence = 15

                                 AND sno = RowData.sno) AS Question_5

             OUTER apply (SELECT sno,

                                 item,

                                 colsequence

                          FROM   #temp_rowdata

                          WHERE  colsequence = 16

                                 AND sno = RowData.sno) AS Answer_5

      WHERE  RowData.colsequence = 1;

 

      --update the existing records in the main table.

      UPDATE sur

      SET    Voter_Name = stg.voter_name,

             Voter_Add = stg.voter_add,

             Constituency_Name = stg.constituency_name,

             Constituency_State = stg.constituency_state,

             Pre_Poll_date = stg.pre_poll_date,

             Question_1 = stg.question_1,

             Answer_1 = stg.answer_1,

             Question_2 = stg.question_2,

             Answer_2 = stg.answer_2,

             Question_3 = stg.question_3,

             Answer_3 = stg.answer_3,

             Question_4 = stg.question_4,

             Answer_4 = stg.answer_4,

             Question_5 = stg.question_5,

             Answer_5 = stg.answer_5

      FROM   survey sur

             JOIN stg_survey stg

               ON stg.sno = sur.sno

      WHERE  Isnull(sur.voter_name, '') <> Isnull(stg.voter_name, '')

              OR Isnull(sur.voter_add, '') <> Isnull(stg.voter_add, '')

              OR Isnull(sur.constituency_name, '') <>

                 Isnull(stg.constituency_name, '')

              OR Isnull(sur.constituency_state, '') <>

                 Isnull(stg.constituency_state, '')

              OR Isnull(sur.pre_poll_date, '') <> Isnull(stg.pre_poll_date, '')

              OR Isnull(sur.question_1, '') <> Isnull(stg.question_1, '')

              OR Isnull(sur.answer_1, '') <> Isnull(stg.answer_1, '')

              OR Isnull(sur.question_2, '') <> Isnull(stg.question_2, '')

              OR Isnull(sur.answer_2, '') <> Isnull(stg.answer_2, '')

              OR Isnull(sur.question_3, '') <> Isnull(stg.question_3, '')

              OR Isnull(sur.answer_3, '') <> Isnull(stg.answer_3, '')

              OR Isnull(sur.question_4, '') <> Isnull(stg.question_4, '')

              OR Isnull(sur.answer_4, '') <> Isnull(stg.answer_4, '')

              OR Isnull(sur.question_5, '') <> Isnull(stg.question_5, '')

              OR Isnull(sur.answer_5, '') <> Isnull(stg.answer_5, '');

 

      --inserting the records

      INSERT INTO survey

                  (sno,

                   voter_name,

                   voter_add,

                   constituency_name,

                   constituency_state,

                   pre_poll_date,

                   question_1,

                   answer_1,

                   question_2,

                   answer_2,

                   question_3,

                   answer_3,

                   question_4,

                   answer_4,

                   question_5,

                   answer_5)

      SELECT stg.sno,

             stg.voter_name,

             stg.voter_add,

             stg.constituency_name,

             stg.constituency_state,

             stg.pre_poll_date,

             stg.question_1,

             stg.answer_1,

             stg.question_2,

             stg.answer_2,

             stg.question_3,

             stg.answer_3,

             stg.question_4,

             stg.answer_4,

             stg.question_5,

             stg.answer_5

      FROM   stg_survey stg

             LEFT JOIN survey sur

                    ON sur.sno = stg.sno

      WHERE  sur.sno IS NULL;

 

      DROP TABLE #temp_rowdata;

  END;

go

 Now we can call this SP to load the data. 

Now package is ready to run.

Running this package.

Package executed successfully.

See the data into the main table.

Data loaded successfully.

No comments:

Post a Comment

If you have any doubt, please let me know.

Popular Posts