[ 
https://issues.apache.org/jira/browse/SPARK-48361?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17848692#comment-17848692
 ] 

Bruce Robbins commented on SPARK-48361:
---------------------------------------

Sorry for being dense. What would the correct answer be?

> Correctness: CSV corrupt record filter with aggregate ignored
> -------------------------------------------------------------
>
>                 Key: SPARK-48361
>                 URL: https://issues.apache.org/jira/browse/SPARK-48361
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 3.5.1
>         Environment: Using spark shell 3.5.1 on M1 Mac
>            Reporter: Ted Chester Jenks
>            Priority: Major
>
> Using corrupt record in CSV parsing for some data cleaning logic, I came 
> across a correctness bug.
>  
> The following repro can be ran with spark-shell 3.5.1.
> *Create test.csv with the following content:*
> {code:java}
> test,1,2,three
> four,5,6,seven
> 8,9
> ten,11,12,thirteen {code}
>  
>  
> *In spark-shell:*
> {code:java}
> import org.apache.spark.sql.types._ 
> import org.apache.spark.sql.functions._
>  
> # define a STRING, DOUBLE, DOUBLE, STRING schema for the data
> val schema = StructType(List(StructField("column1", StringType, true), 
> StructField("column2", DoubleType, true), StructField("column3", DoubleType, 
> true), StructField("column4", StringType, true)))
>  
> # add a column for corrupt records to the schema
> val schemaWithCorrupt = StructType(schema.fields :+ 
> StructField("_corrupt_record", StringType, true)) 
>  
> # read the CSV with the schema, headers, permissive parsing, and the corrupt 
> record column
> val df = spark.read.option("header", "true").option("mode", 
> "PERMISSIVE").option("columnNameOfCorruptRecord", 
> "_corrupt_record").schema(schemaWithCorrupt).csv("test.csv") 
>  
> # define a UDF to count the commas in the corrupt record column
> val countCommas = udf((s: String) => if (s != null) s.count(_ == ',') else 
> -1) 
>  
> # add a true/false column for whether the number of commas is 3
> val dfWithJagged = df.withColumn("__is_jagged", 
> when(col("_corrupt_record").isNull, 
> false).otherwise(countCommas(col("_corrupt_record")) =!= 3))
> dfWithJagged.show(){code}
> *Returns:*
> {code:java}
> +-------+-------+-------+--------+---------------+-----------+
> |column1|column2|column3| column4|_corrupt_record|__is_jagged|
> +-------+-------+-------+--------+---------------+-----------+
> |   four|    5.0|    6.0|   seven|           NULL|      false|
> |      8|    9.0|   NULL|    NULL|            8,9|       true|
> |    ten|   11.0|   12.0|thirteen|           NULL|      false|
> +-------+-------+-------+--------+---------------+-----------+ {code}
> So far so good...
>  
> *BUT*
>  
> *If we add an aggregate before we show:*
> {code:java}
> import org.apache.spark.sql.types._ 
> import org.apache.spark.sql.functions._
>  
> # define a STRING, DOUBLE, DOUBLE, STRING schema for the data
> val schema = StructType(List(StructField("column1", StringType, true), 
> StructField("column2", DoubleType, true), StructField("column3", DoubleType, 
> true), StructField("column4", StringType, true)))
>  
> # add a column for corrupt records to the schema
> val schemaWithCorrupt = StructType(schema.fields :+ 
> StructField("_corrupt_record", StringType, true)) 
>  
> # read the CSV with the schema, headers, permissive parsing, and the corrupt 
> record column
> val df = spark.read.option("header", "true").option("mode", 
> "PERMISSIVE").option("columnNameOfCorruptRecord", 
> "_corrupt_record").schema(schemaWithCorrupt).csv("test.csv") 
>  
> # define a UDF to count the commas in the corrupt record column
> val countCommas = udf((s: String) => if (s != null) s.count(_ == ',') else 
> -1) 
>  
> # add a true/false column for whether the number of commas is 3
> val dfWithJagged = df.withColumn("__is_jagged", 
> when(col("_corrupt_record").isNull, 
> false).otherwise(countCommas(col("_corrupt_record")) =!= 3))
>   
> # sum up column1
> val groupedSum = 
> dfWithJagged.groupBy("column1").agg(sum("column2").alias("sum_column2"))
> groupedSum.show(){code}
> *We get:*
> {code:java}
> +-------+-----------+
> |column1|sum_column2|
> +-------+-----------+
> |      8|        9.0|
> |   four|        5.0|
> |    ten|       11.0|
> +-------+-----------+ {code}
>  
> *Which is not correct*
>  
> With the addition of the aggregate, the filter down to rows with 3 commas in 
> the corrupt record column is ignored. This does not happed with any other 
> operators I have tried - just aggregates so far.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to