[ 
https://issues.apache.org/jira/browse/SPARK-48361?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17849747#comment-17849747
 ] 

Bruce Robbins commented on SPARK-48361:
---------------------------------------

After looking at this, I see that this is arguably documented behavior 
(although still somewhat surprising).

The documentation for the {{mode}} option says the following:
{quote}
Note that Spark tries to parse only required columns in CSV under column 
pruning. Therefore, corrupt records can be different based on required set of 
fields. This behavior can be controlled by 
spark.sql.csv.parser.columnPruning.enabled (enabled by default).
{quote}
And, indeed, if you turn off CSV column pruning, your issue goes away:
{noformat}
scala> groupedSum.show()
+-------+-----------+
|column1|sum_column2|
+-------+-----------+
|      8|        9.0|
|   four|        5.0|
|    ten|       11.0|
+-------+-----------+


scala> sql("set spark.sql.csv.parser.columnPruning.enabled=false")
res1: org.apache.spark.sql.DataFrame = [key: string, value: string]

scala> groupedSum.show()
+-------+-----------+
|column1|sum_column2|
+-------+-----------+
|   four|        5.0|
|    ten|       11.0|
+-------+-----------+


scala> 
{noformat}
The grouping operation only needs a subset of the columns (column1, column2, 
and _corrupt_record for the filter), so the rest of the columns are pruned. 
Because only a part of the input record is parsed, the parser never discovers 
that the record is corrupted, so {{_corrupt_record}} is null.

It's still a little weird though, because if you include, say, {{column4}} as a 
grouping column, {{_corrupt_record}} still remains null. It seems the code 
wants to set {{_corrupt_record}} only if it's parsing the entire input record.

> Correctness: CSV corrupt record filter with aggregate ignored
> -------------------------------------------------------------
>
>                 Key: SPARK-48361
>                 URL: https://issues.apache.org/jira/browse/SPARK-48361
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 3.5.1
>         Environment: Using spark shell 3.5.1 on M1 Mac
>            Reporter: Ted Chester Jenks
>            Priority: Major
>
> Using corrupt record in CSV parsing for some data cleaning logic, I came 
> across a correctness bug.
>  
> The following repro can be ran with spark-shell 3.5.1.
> *Create test.csv with the following content:*
> {code:java}
> test,1,2,three
> four,5,6,seven
> 8,9
> ten,11,12,thirteen {code}
>  
>  
> *In spark-shell:*
> {code:java}
> import org.apache.spark.sql.types._ 
> import org.apache.spark.sql.functions._
>  
> # define a STRING, DOUBLE, DOUBLE, STRING schema for the data
> val schema = StructType(List(StructField("column1", StringType, true), 
> StructField("column2", DoubleType, true), StructField("column3", DoubleType, 
> true), StructField("column4", StringType, true)))
>  
> # add a column for corrupt records to the schema
> val schemaWithCorrupt = StructType(schema.fields :+ 
> StructField("_corrupt_record", StringType, true)) 
>  
> # read the CSV with the schema, headers, permissive parsing, and the corrupt 
> record column
> val df = spark.read.option("header", "true").option("mode", 
> "PERMISSIVE").option("columnNameOfCorruptRecord", 
> "_corrupt_record").schema(schemaWithCorrupt).csv("test.csv") 
>  
> # define a UDF to count the commas in the corrupt record column
> val countCommas = udf((s: String) => if (s != null) s.count(_ == ',') else 
> -1) 
>  
> # add a true/false column for whether the number of commas is 3
> val dfWithJagged = df.withColumn("__is_jagged", 
> when(col("_corrupt_record").isNull, 
> false).otherwise(countCommas(col("_corrupt_record")) =!= 3))
> dfWithJagged.show(){code}
> *Returns:*
> {code:java}
> +-------+-------+-------+--------+---------------+-----------+
> |column1|column2|column3| column4|_corrupt_record|__is_jagged|
> +-------+-------+-------+--------+---------------+-----------+
> |   four|    5.0|    6.0|   seven|           NULL|      false|
> |      8|    9.0|   NULL|    NULL|            8,9|       true|
> |    ten|   11.0|   12.0|thirteen|           NULL|      false|
> +-------+-------+-------+--------+---------------+-----------+ {code}
> So far so good...
>  
> *BUT*
>  
> *If we add an aggregate before we show:*
> {code:java}
> import org.apache.spark.sql.types._ 
> import org.apache.spark.sql.functions._
> val schema = StructType(List(StructField("column1", StringType, true), 
> StructField("column2", DoubleType, true), StructField("column3", DoubleType, 
> true), StructField("column4", StringType, true)))
> val schemaWithCorrupt = StructType(schema.fields :+ 
> StructField("_corrupt_record", StringType, true)) 
> val df = spark.read.option("header", "true").option("mode", 
> "PERMISSIVE").option("columnNameOfCorruptRecord", 
> "_corrupt_record").schema(schemaWithCorrupt).csv("test.csv") 
> val countCommas = udf((s: String) => if (s != null) s.count(_ == ',') else 
> -1) 
> val dfWithJagged = df.withColumn("__is_jagged", 
> when(col("_corrupt_record").isNull, 
> false).otherwise(countCommas(col("_corrupt_record")) =!= 3))
> val dfDropped = dfWithJagged.filter(col("__is_jagged") =!= true)
> val groupedSum = 
> dfDropped.groupBy("column1").agg(sum("column2").alias("sum_column2"))
> groupedSum.show(){code}
> *We get:*
> {code:java}
> +-------+-----------+
> |column1|sum_column2|
> +-------+-----------+
> |      8|        9.0|
> |   four|        5.0|
> |    ten|       11.0|
> +-------+-----------+ {code}
>  
> *Which is not correct*
>  
> With the addition of the aggregate, the filter down to rows with 3 commas in 
> the corrupt record column is ignored. This does not happed with any other 
> operators I have tried - just aggregates so far.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to