[ https://issues.apache.org/jira/browse/SPARK-43777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17726261#comment-17726261 ]
Kristopher Kane commented on SPARK-43777: ----------------------------------------- Yes, a colleague pointed out last night that I had missed that in the window spec. > Coalescing partitions in AQE returns different results with row_number > windows. > -------------------------------------------------------------------------------- > > Key: SPARK-43777 > URL: https://issues.apache.org/jira/browse/SPARK-43777 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 3.1.3, 3.3.2 > Environment: SBT based Spark project with unit tests running on > spark-testing-base. Tested with Spark 3.1.3 and 3.3.2 > Reporter: Kristopher Kane > Priority: Major > > While updating our code base from 3.1 to 3.3, I had a test fail due to wrong > results. With 3.1, we did not proactively turn on AQE in sbt based tests and > noticed the failure due to AQE enabled by default between 3.1 and 3.3 > An easily reproducible test: > {code:java} > val testDataDf = Seq( > (1, 1, 0, 0), > (1, 1, 0, 0), > (1, 1, 0, 0), > (1, 0, 0, 1), > (1, 0, 0, 1), > (2, 0, 0, 0), > (2, 0, 1, 0), > (2, 1, 0, 0), > (3, 0, 1, 0), > (3, 0, 1, 0), > ).toDF("id", "is_attribute1", "is_attribute2", "is_attribute3") > val placeWindowSpec = Window > .partitionBy("id") > .orderBy($"count".desc) > val resultDf: DataFrame = testDataDf > .select("id", "is_attribute1", "is_attribute2", "is_attribute3") > .withColumn( > "place", > when($"is_attribute1" === 1, "attribute1") > .when($"is_attribute2" === 1, "attribute2") > .when($"is_attribute3" === 1, "attribute3") > .otherwise("other") > ) > .groupBy("id", "place") > .agg( > functions.count("*").as("count") > ) > .withColumn( > "rank", > row_number().over(placeWindowSpec) > ) > resultDf.orderBy("id", "place", "rank").show() {code} > > Various results based on Spark version and AQE settings: > {code:java} > Spark 3.1 > Without AQE > +---+----------+-----+----+ > | id| place|count|rank| > +---+----------+-----+----+ > | 1|attribute1| 3| 1| > | 1|attribute3| 2| 2| > | 2|attribute1| 1| 2| > | 2|attribute2| 1| 1| > | 2| other| 1| 3| > | 3|attribute2| 2| 1| > +---+----------+-----+----+ > AQE with defaults > +---+----------+-----+----+ > | id| place|count|rank| > +---+----------+-----+----+ > | 1|attribute1| 3| 1| > | 1|attribute3| 2| 2| > | 2|attribute1| 1| 2| > | 2|attribute2| 1| 1| > | 2| other| 1| 3| > | 3|attribute2| 2| 1| > +---+----------+-----+----+ > AQE with .set("spark.sql.adaptive.coalescePartitions.enabled", "false") > +---+----------+-----+----+ > | id| place|count|rank| > +---+----------+-----+----+ > | 1|attribute1| 3| 1| > | 1|attribute3| 2| 2| > | 2|attribute1| 1| 2| > | 2|attribute2| 1| 1| > | 2| other| 1| 3| > | 3|attribute2| 2| 1| > +---+----------+-----+----+ > AQE with .set("spark.sql.adaptive.coalescePartitions.initialPartitionNum", > "1") - Like Spark 3.3 with AQE defaults > +---+----------+-----+----+ > | id| place|count|rank| > +---+----------+-----+----+ > | 1|attribute1| 3| 1| > | 1|attribute3| 2| 2| > | 2|attribute1| 1| 3| > | 2|attribute2| 1| 2| > | 2| other| 1| 1| > | 3|attribute2| 2| 1| > ---------------------------------------- > Spark 3.3.2 > ---------------------------------------- > AQE with defaults > +---+----------+-----+----+ > | id| place|count|rank| > +---+----------+-----+----+ > | 1|attribute1| 3| 1| > | 1|attribute3| 2| 2| > | 2|attribute1| 1| 3| > | 2|attribute2| 1| 2| > | 2| other| 1| 1| > | 3|attribute2| 2| 1| > +---+----------+-----+----+ > -------------------------------------------- > AQE with .set("spark.sql.adaptive.coalescePartitions.enabled", "false") - > This matches Spark 3.1 > +---+----------+-----+----+ > | id| place|count|rank| > +---+----------+-----+----+ > | 1|attribute1| 3| 1| > | 1|attribute3| 2| 2| > | 2|attribute1| 1| 2| > | 2|attribute2| 1| 1| > | 2| other| 1| 3| > | 3|attribute2| 2| 1| > +---+----------+-----+----+ {code} > As you can see, the 'rank' column of row_number(partition by, order by) > returns a different rank for id value 2's three attributes based on how AQE > coalesces partitions. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org