Kristopher Kane created SPARK-43777: ---------------------------------------
Summary: Coalescing partitions in AQE returns different results with row_number windows. Key: SPARK-43777 URL: https://issues.apache.org/jira/browse/SPARK-43777 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.3.2, 3.1.3 Environment: SBT based Spark project with unit tests running on spark-testing-base. Tested with Spark 3.1.3 and 3.3.2 Reporter: Kristopher Kane While updating our code base from 3.1 to 3.3, I had a test fail due to wrong results. With 3.1, we did not proactively turn on AQE in sbt based tests and noticed the failure due to AQE enabled by default between 3.1 and 3.3 An easily reproducible test: {code:java} val testDataDf = Seq( (1, 1, 0, 0), (1, 1, 0, 0), (1, 1, 0, 0), (1, 0, 0, 1), (1, 0, 0, 1), (2, 0, 0, 0), (2, 0, 1, 0), (2, 1, 0, 0), (3, 0, 1, 0), (3, 0, 1, 0), ).toDF("id", "is_attribute1", "is_attribute2", "is_attribute3") val placeWindowSpec = Window .partitionBy("id") .orderBy($"count".desc) val resultDf: DataFrame = testDataDf .select("id", "is_attribute1", "is_attribute2", "is_attribute3") .withColumn( "place", when($"is_attribute1" === 1, "attribute1") .when($"is_attribute2" === 1, "attribute2") .when($"is_attribute3" === 1, "attribute3") .otherwise("other") ) .groupBy("id", "place") .agg( functions.count("*").as("count") ) .withColumn( "rank", row_number().over(placeWindowSpec) ) resultDf.orderBy("id", "place", "rank").show() {code} Various results based on Spark version and AQE settings: {code:java} Spark 3.1 Without AQE +---+----------+-----+----+ | id| place|count|rank| +---+----------+-----+----+ | 1|attribute1| 3| 1| | 1|attribute3| 2| 2| | 2|attribute1| 1| 2| | 2|attribute2| 1| 1| | 2| other| 1| 3| | 3|attribute2| 2| 1| +---+----------+-----+----+ AQE with defaults +---+----------+-----+----+ | id| place|count|rank| +---+----------+-----+----+ | 1|attribute1| 3| 1| | 1|attribute3| 2| 2| | 2|attribute1| 1| 2| | 2|attribute2| 1| 1| | 2| other| 1| 3| | 3|attribute2| 2| 1| +---+----------+-----+----+ AQE with .set("spark.sql.adaptive.coalescePartitions.enabled", "false") +---+----------+-----+----+ | id| place|count|rank| +---+----------+-----+----+ | 1|attribute1| 3| 1| | 1|attribute3| 2| 2| | 2|attribute1| 1| 2| | 2|attribute2| 1| 1| | 2| other| 1| 3| | 3|attribute2| 2| 1| +---+----------+-----+----+ AQE with .set("spark.sql.adaptive.coalescePartitions.initialPartitionNum", "1") - Like Spark 3.3 with AQE defaults +---+----------+-----+----+ | id| place|count|rank| +---+----------+-----+----+ | 1|attribute1| 3| 1| | 1|attribute3| 2| 2| | 2|attribute1| 1| 3| | 2|attribute2| 1| 2| | 2| other| 1| 1| | 3|attribute2| 2| 1| ---------------------------------------- Spark 3.3.2 ---------------------------------------- AQE with defaults - Different than Spark 3.1 +---+----------+-----+----+ | id| place|count|rank| +---+----------+-----+----+ | 1|attribute1| 3| 1| | 1|attribute3| 2| 2| | 2|attribute1| 1| 3| | 2|attribute2| 1| 2| | 2| other| 1| 1| | 3|attribute2| 2| 1| +---+----------+-----+----+ -------------------------------------------- AQE with .set("spark.sql.adaptive.coalescePartitions.enabled", "false") - This matches Spark 3.1 +---+----------+-----+----+ | id| place|count|rank| +---+----------+-----+----+ | 1|attribute1| 3| 1| | 1|attribute3| 2| 2| | 2|attribute1| 1| 2| | 2|attribute2| 1| 1| | 2| other| 1| 3| | 3|attribute2| 2| 1| +---+----------+-----+----+ {code} As you can see, the 'rank' column of row_number(partition by, order by) returns a different rank for id value 2's three attributes based on how AQE coalesces partitions. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org