[ 
https://issues.apache.org/jira/browse/SPARK-30149?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joby Joje updated SPARK-30149:
------------------------------
    Description: 
Reading a CSV file with defined schema I am able to load the files and do the 
processing, which works fine using the below code. The schema is defined as to 
strictly follow the datatype to record precision's with accuracy.
{code:java}
source_schema = StructType([source_schema = StructType([ StructField("NAME", 
StringType(), True), StructField("AGE", StringType(), True), 
StructField("GENDER", StringType(), True), StructField("PROFESSION", 
StringType(), True), StructField("SALARY", DecimalType(38, 14), True), 
StructField("BAD_RECORD", StringType(), True)]){code}
{code:java}
df_raw_file = sparksession.read \ 
.format("csv") \ 
.option("delimiter", '\t') \ 
.option("header", "false") \ 
.option("inferSchema", "true") \ 
.option("columnNameOfCorruptRecord", "BAD_RECORD") \ 
.schema(source_schema) \ .load(in_file_list) \ 
.withColumn("LINE_NUMBER", monotonically_increasing_id()) \ 
.withColumn("SOURCE_FILE_NAME", input_file_name()){code}
 As per the Spark Documentation the mode is {{PERMISSIVE}} by default is its 
not set and A record with less/more tokens than schema is not a corrupted 
record to CSV. When it meets a record having fewer tokens than the length of 
the schema, sets {{null}} to extra fields. When the record has more tokens than 
the length of the schema, it drops extra tokens.
{code:java}
FILE SCHEMA TEST.CSVFILE SCHEMA TEST.CSV
root 
|-- NAME: string (nullable = true) 
|-- AGE: string (nullable = true) 
|-- GENDER: string (nullable = true) 
|-- PROFESSION: string (nullable = true) 
|-- SALARY: decimal(38,14) (nullable = true) 
|-- BAD_RECORD: string (nullable = true) 
|-- LINE_NUMBER: long (nullable = false) 
|-- SOURCE_FILE_NAME: string (nullable = false)
OUTPUT THE FILE TEST.CSV


+------+----+------+----------+------------------+---------------------------------+-----------+-----------------+
|NAME  |AGE |GENDER|PROFESSION|SALARY            |BAD_RECORD                    
   |LINE_NUMBER|SOURCE_FILE_NAME |
+------+----+------+----------+------------------+---------------------------------+-----------+-----------------+
|null  |null|null  |null      |null              |NAME  AGE     GENDER  
PROFESSION      SALARY|0          |Test.CSV|
|JOHN  |27  |MALE  |CEO       |300.12314234500000|null                          
   |1          |Test.CSV|
|JUSTIN|67  |MALE  |CTO       |123.23453543450000|null                          
   |2          |Test.CSV|
|SARAH |45  |FEMALE|CS        |null              |null                          
   |3          |Test.CSV|
|SEAN  |66  |MALE  |CA        |null              |SEAN  66      MALE    CA      
            |4          |Test.CSV|
|PHIL  |34  |MALE  |null      |234.98698600000000|null                          
   |5          |Test.CSV|
|null  |null|null  |null      |null              |JILL  25                      
BOARD                    |6          |Test.CSV|
|JACK  |30  |MALE  |BOARD     |null              |JACK  30      MALE    BOARD   
            |7          |Test.CSV|
+------+----+------+----------+------------------+---------------------------------+-----------+-----------------+{code}
 

The TEST1.CSV doesnt have the SALARY column so it should have NULLED the column 
and the BAD_RECORD column should be NULL for the rows, that doesnt seem to 
happen and the values are considered as CORRUPT. 

Also when it meets a corrupted record, puts the malformed string into a field 
configured by {{columnNameOfCorruptRecord}}, and sets other fields to {{null}} 
this is also not happening and I see this happening only for the JILL row.
{code:java}
FILE SCHEMA TEST1.CSV 
root 
|-- NAME: string (nullable = true) 
|-- AGE: string (nullable = true) 
|-- GENDER: string (nullable = true) 
|-- PROFESSION: string (nullable = true) 
|-- SALARY: decimal(38,14) (nullable = true) 
|-- BAD_RECORD: string (nullable = true) 
|-- LINE_NUMBER: long (nullable = false) 
|-- SOURCE_FILE_NAME: string (nullable = false)

OUTPUT THE FILE TEST1.CSV
+------+----+------+----------+------+--------------------------+-----------+-----------------+
 |NAME |AGE |GENDER|PROFESSION|SALARY|BAD_RECORD |LINE_NUMBER|SOURCE_FILE_NAME 
| 
+------+----+------+----------+------+--------------------------+-----------+-----------------+
 
|NAME |AGE |GENDER|PROFESSION|null |NAME AGE GENDER PROFESSION|0 |Test1.CSV| 
|JOHN |27 |MALE |CEO |null |JOHN 27 MALE CEO |1 |Test1.CSV| 
|JUSTIN|67 |MALE |CTO |null |JUSTIN 67 MALE CTO |2 |Test1.CSV| 
|SARAH |45 |FEMALE|CS |null |SARAH 45 FEMALE CS |3 |Test1.CSV| 
|SEAN |66 |MALE |CA |null |SEAN 66 MALE CA |4 |Test1.CSV| 
|PHIL |34 |MALE |null |null |PHIL 34 MALE |5 |Test1.CSV| 
|null |null|null |null |null |JILL 25 BOARD |6 |Test1.CSV| 
|JACK |30 |MALE |BOARD |null |JACK 30 MALE BOARD |7 |Test1.CSV| 
+------+----+------+----------+------+--------------------------+-----------+-----------------+{code}
 Attached the code, output and source files used for test.

  was:
Reading a CSV file with defined schema I am able to load the files and do the 
processing, which works fine using the below code. The schema is defined as to 
strictly follow the datatype to record precision's with accuracy.
{code:java}
source_schema = StructType([source_schema = StructType([ StructField("NAME", 
StringType(), True), StructField("AGE", StringType(), True), 
StructField("GENDER", StringType(), True), StructField("PROFESSION", 
StringType(), True), StructField("SALARY", DecimalType(38, 14), True), 
StructField("BAD_RECORD", StringType(), True)]){code}
{code:java}
df_raw_file = sparksession.read \ 
.format("csv") \ 
.option("delimiter", '\t') \ 
.option("header", "false") \ 
.option("inferSchema", "true") \ 
.option("columnNameOfCorruptRecord", "BAD_RECORD") \ 
.schema(source_schema) \ .load(in_file_list) \ 
.withColumn("LINE_NUMBER", monotonically_increasing_id()) \ 
.withColumn("SOURCE_FILE_NAME", input_file_name()){code}
 As per the Spark Documentation the mode is {{PERMISSIVE}} by default is its 
not set and A record with less/more tokens than schema is not a corrupted 
record to CSV. When it meets a record having fewer tokens than the length of 
the schema, sets {{null}} to extra fields. When the record has more tokens than 
the length of the schema, it drops extra tokens.
{code:java}
FILE SCHEMA TEST.CSVFILE SCHEMA TEST.CSV
root 
|-- NAME: string (nullable = true) 
|-- AGE: string (nullable = true) 
|-- GENDER: string (nullable = true) 
|-- PROFESSION: string (nullable = true) 
|-- SALARY: decimal(38,14) (nullable = true) 
|-- BAD_RECORD: string (nullable = true) 
|-- LINE_NUMBER: long (nullable = false) 
|-- SOURCE_FILE_NAME: string (nullable = false)
OUTPUT THE FILE TEST.CSV


+------+----+------+----------+------------------+---------------------------------+-----------+-----------------+
|NAME  |AGE |GENDER|PROFESSION|SALARY            |BAD_RECORD                    
   |LINE_NUMBER|SOURCE_FILE_NAME |
+------+----+------+----------+------------------+---------------------------------+-----------+-----------------+
|null  |null|null  |null      |null              |NAME  AGE     GENDER  
PROFESSION      SALARY|0          |Test.CSV|
|JOHN  |27  |MALE  |CEO       |300.12314234500000|null                          
   |1          |Test.CSV|
|JUSTIN|67  |MALE  |CTO       |123.23453543450000|null                          
   |2          |Test.CSV|
|SARAH |45  |FEMALE|CS        |null              |null                          
   |3          |Test.CSV|
|SEAN  |66  |MALE  |CA        |null              |SEAN  66      MALE    CA      
            |4          |Test.CSV|
|PHIL  |34  |MALE  |null      |234.98698600000000|null                          
   |5          |Test.CSV|
|null  |null|null  |null      |null              |JILL  25                      
BOARD                    |6          |Test.CSV|
|JACK  |30  |MALE  |BOARD     |null              |JACK  30      MALE    BOARD   
            |7          |Test.CSV|
+------+----+------+----------+------------------+---------------------------------+-----------+-----------------+{code}
 

The TEST1.CSV doesnt have the SALARY column so it should have NULLED the column 
and the BAD_RECORD column should be NULL for the rows, that doesnt seem to have 
and the values are considered as CORRUPT. 

Also when it meets a corrupted record, puts the malformed string into a field 
configured by {{columnNameOfCorruptRecord}}, and sets other fields to {{null}} 
this is also not happening and I see this happening only for the JILL row.

 

 
{code:java}
FILE SCHEMA TEST1.CSV 
root 
|-- NAME: string (nullable = true) 
|-- AGE: string (nullable = true) 
|-- GENDER: string (nullable = true) 
|-- PROFESSION: string (nullable = true) 
|-- SALARY: decimal(38,14) (nullable = true) 
|-- BAD_RECORD: string (nullable = true) 
|-- LINE_NUMBER: long (nullable = false) 
|-- SOURCE_FILE_NAME: string (nullable = false)

OUTPUT THE FILE TEST1.CSV
+------+----+------+----------+------+--------------------------+-----------+-----------------+
 |NAME |AGE |GENDER|PROFESSION|SALARY|BAD_RECORD |LINE_NUMBER|SOURCE_FILE_NAME 
| 
+------+----+------+----------+------+--------------------------+-----------+-----------------+
 
|NAME |AGE |GENDER|PROFESSION|null |NAME AGE GENDER PROFESSION|0 |Test1.CSV| 
|JOHN |27 |MALE |CEO |null |JOHN 27 MALE CEO |1 |Test1.CSV| 
|JUSTIN|67 |MALE |CTO |null |JUSTIN 67 MALE CTO |2 |Test1.CSV| 
|SARAH |45 |FEMALE|CS |null |SARAH 45 FEMALE CS |3 |Test1.CSV| 
|SEAN |66 |MALE |CA |null |SEAN 66 MALE CA |4 |Test1.CSV| 
|PHIL |34 |MALE |null |null |PHIL 34 MALE |5 |Test1.CSV| 
|null |null|null |null |null |JILL 25 BOARD |6 |Test1.CSV| 
|JACK |30 |MALE |BOARD |null |JACK 30 MALE BOARD |7 |Test1.CSV| 
+------+----+------+----------+------+--------------------------+-----------+-----------------+{code}
 


> Schema Definition Spark Read
> ----------------------------
>
>                 Key: SPARK-30149
>                 URL: https://issues.apache.org/jira/browse/SPARK-30149
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark, Spark Core
>    Affects Versions: 2.4.0
>            Reporter: Joby Joje
>            Priority: Blocker
>         Attachments: Output.txt, Schema.py, Test.SDL, Test1.SDL
>
>
> Reading a CSV file with defined schema I am able to load the files and do the 
> processing, which works fine using the below code. The schema is defined as 
> to strictly follow the datatype to record precision's with accuracy.
> {code:java}
> source_schema = StructType([source_schema = StructType([ StructField("NAME", 
> StringType(), True), StructField("AGE", StringType(), True), 
> StructField("GENDER", StringType(), True), StructField("PROFESSION", 
> StringType(), True), StructField("SALARY", DecimalType(38, 14), True), 
> StructField("BAD_RECORD", StringType(), True)]){code}
> {code:java}
> df_raw_file = sparksession.read \ 
> .format("csv") \ 
> .option("delimiter", '\t') \ 
> .option("header", "false") \ 
> .option("inferSchema", "true") \ 
> .option("columnNameOfCorruptRecord", "BAD_RECORD") \ 
> .schema(source_schema) \ .load(in_file_list) \ 
> .withColumn("LINE_NUMBER", monotonically_increasing_id()) \ 
> .withColumn("SOURCE_FILE_NAME", input_file_name()){code}
>  As per the Spark Documentation the mode is {{PERMISSIVE}} by default is its 
> not set and A record with less/more tokens than schema is not a corrupted 
> record to CSV. When it meets a record having fewer tokens than the length of 
> the schema, sets {{null}} to extra fields. When the record has more tokens 
> than the length of the schema, it drops extra tokens.
> {code:java}
> FILE SCHEMA TEST.CSVFILE SCHEMA TEST.CSV
> root 
> |-- NAME: string (nullable = true) 
> |-- AGE: string (nullable = true) 
> |-- GENDER: string (nullable = true) 
> |-- PROFESSION: string (nullable = true) 
> |-- SALARY: decimal(38,14) (nullable = true) 
> |-- BAD_RECORD: string (nullable = true) 
> |-- LINE_NUMBER: long (nullable = false) 
> |-- SOURCE_FILE_NAME: string (nullable = false)
> OUTPUT THE FILE TEST.CSV
> +------+----+------+----------+------------------+---------------------------------+-----------+-----------------+
> |NAME  |AGE |GENDER|PROFESSION|SALARY            |BAD_RECORD                  
>      |LINE_NUMBER|SOURCE_FILE_NAME |
> +------+----+------+----------+------------------+---------------------------------+-----------+-----------------+
> |null  |null|null  |null      |null              |NAME        AGE     GENDER  
> PROFESSION      SALARY|0          |Test.CSV|
> |JOHN  |27  |MALE  |CEO       |300.12314234500000|null                        
>      |1          |Test.CSV|
> |JUSTIN|67  |MALE  |CTO       |123.23453543450000|null                        
>      |2          |Test.CSV|
> |SARAH |45  |FEMALE|CS        |null              |null                        
>      |3          |Test.CSV|
> |SEAN  |66  |MALE  |CA        |null              |SEAN        66      MALE    
> CA                  |4          |Test.CSV|
> |PHIL  |34  |MALE  |null      |234.98698600000000|null                        
>      |5          |Test.CSV|
> |null  |null|null  |null      |null              |JILL        25              
>         BOARD                    |6          |Test.CSV|
> |JACK  |30  |MALE  |BOARD     |null              |JACK        30      MALE    
> BOARD               |7          |Test.CSV|
> +------+----+------+----------+------------------+---------------------------------+-----------+-----------------+{code}
>  
> The TEST1.CSV doesnt have the SALARY column so it should have NULLED the 
> column and the BAD_RECORD column should be NULL for the rows, that doesnt 
> seem to happen and the values are considered as CORRUPT. 
> Also when it meets a corrupted record, puts the malformed string into a field 
> configured by {{columnNameOfCorruptRecord}}, and sets other fields to 
> {{null}} this is also not happening and I see this happening only for the 
> JILL row.
> {code:java}
> FILE SCHEMA TEST1.CSV 
> root 
> |-- NAME: string (nullable = true) 
> |-- AGE: string (nullable = true) 
> |-- GENDER: string (nullable = true) 
> |-- PROFESSION: string (nullable = true) 
> |-- SALARY: decimal(38,14) (nullable = true) 
> |-- BAD_RECORD: string (nullable = true) 
> |-- LINE_NUMBER: long (nullable = false) 
> |-- SOURCE_FILE_NAME: string (nullable = false)
> OUTPUT THE FILE TEST1.CSV
> +------+----+------+----------+------+--------------------------+-----------+-----------------+
>  |NAME |AGE |GENDER|PROFESSION|SALARY|BAD_RECORD 
> |LINE_NUMBER|SOURCE_FILE_NAME | 
> +------+----+------+----------+------+--------------------------+-----------+-----------------+
>  
> |NAME |AGE |GENDER|PROFESSION|null |NAME AGE GENDER PROFESSION|0 |Test1.CSV| 
> |JOHN |27 |MALE |CEO |null |JOHN 27 MALE CEO |1 |Test1.CSV| 
> |JUSTIN|67 |MALE |CTO |null |JUSTIN 67 MALE CTO |2 |Test1.CSV| 
> |SARAH |45 |FEMALE|CS |null |SARAH 45 FEMALE CS |3 |Test1.CSV| 
> |SEAN |66 |MALE |CA |null |SEAN 66 MALE CA |4 |Test1.CSV| 
> |PHIL |34 |MALE |null |null |PHIL 34 MALE |5 |Test1.CSV| 
> |null |null|null |null |null |JILL 25 BOARD |6 |Test1.CSV| 
> |JACK |30 |MALE |BOARD |null |JACK 30 MALE BOARD |7 |Test1.CSV| 
> +------+----+------+----------+------+--------------------------+-----------+-----------------+{code}
>  Attached the code, output and source files used for test.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to