[ https://issues.apache.org/jira/browse/SPARK-48361?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17849095#comment-17849095 ]
Bruce Robbins commented on SPARK-48361: --------------------------------------- I can take a look at the root cause, unless you are already looking at that, in which case I will hold off. > Correctness: CSV corrupt record filter with aggregate ignored > ------------------------------------------------------------- > > Key: SPARK-48361 > URL: https://issues.apache.org/jira/browse/SPARK-48361 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 3.5.1 > Environment: Using spark shell 3.5.1 on M1 Mac > Reporter: Ted Chester Jenks > Priority: Major > > Using corrupt record in CSV parsing for some data cleaning logic, I came > across a correctness bug. > > The following repro can be ran with spark-shell 3.5.1. > *Create test.csv with the following content:* > {code:java} > test,1,2,three > four,5,6,seven > 8,9 > ten,11,12,thirteen {code} > > > *In spark-shell:* > {code:java} > import org.apache.spark.sql.types._ > import org.apache.spark.sql.functions._ > > # define a STRING, DOUBLE, DOUBLE, STRING schema for the data > val schema = StructType(List(StructField("column1", StringType, true), > StructField("column2", DoubleType, true), StructField("column3", DoubleType, > true), StructField("column4", StringType, true))) > > # add a column for corrupt records to the schema > val schemaWithCorrupt = StructType(schema.fields :+ > StructField("_corrupt_record", StringType, true)) > > # read the CSV with the schema, headers, permissive parsing, and the corrupt > record column > val df = spark.read.option("header", "true").option("mode", > "PERMISSIVE").option("columnNameOfCorruptRecord", > "_corrupt_record").schema(schemaWithCorrupt).csv("test.csv") > > # define a UDF to count the commas in the corrupt record column > val countCommas = udf((s: String) => if (s != null) s.count(_ == ',') else > -1) > > # add a true/false column for whether the number of commas is 3 > val dfWithJagged = df.withColumn("__is_jagged", > when(col("_corrupt_record").isNull, > false).otherwise(countCommas(col("_corrupt_record")) =!= 3)) > dfWithJagged.show(){code} > *Returns:* > {code:java} > +-------+-------+-------+--------+---------------+-----------+ > |column1|column2|column3| column4|_corrupt_record|__is_jagged| > +-------+-------+-------+--------+---------------+-----------+ > | four| 5.0| 6.0| seven| NULL| false| > | 8| 9.0| NULL| NULL| 8,9| true| > | ten| 11.0| 12.0|thirteen| NULL| false| > +-------+-------+-------+--------+---------------+-----------+ {code} > So far so good... > > *BUT* > > *If we add an aggregate before we show:* > {code:java} > import org.apache.spark.sql.types._ > import org.apache.spark.sql.functions._ > val schema = StructType(List(StructField("column1", StringType, true), > StructField("column2", DoubleType, true), StructField("column3", DoubleType, > true), StructField("column4", StringType, true))) > val schemaWithCorrupt = StructType(schema.fields :+ > StructField("_corrupt_record", StringType, true)) > val df = spark.read.option("header", "true").option("mode", > "PERMISSIVE").option("columnNameOfCorruptRecord", > "_corrupt_record").schema(schemaWithCorrupt).csv("test.csv") > val countCommas = udf((s: String) => if (s != null) s.count(_ == ',') else > -1) > val dfWithJagged = df.withColumn("__is_jagged", > when(col("_corrupt_record").isNull, > false).otherwise(countCommas(col("_corrupt_record")) =!= 3)) > val dfDropped = dfWithJagged.filter(col("__is_jagged") =!= true) > val groupedSum = > dfDropped.groupBy("column1").agg(sum("column2").alias("sum_column2")) > groupedSum.show(){code} > *We get:* > {code:java} > +-------+-----------+ > |column1|sum_column2| > +-------+-----------+ > | 8| 9.0| > | four| 5.0| > | ten| 11.0| > +-------+-----------+ {code} > > *Which is not correct* > > With the addition of the aggregate, the filter down to rows with 3 commas in > the corrupt record column is ignored. This does not happed with any other > operators I have tried - just aggregates so far. > > > -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org