[ 
https://issues.apache.org/jira/browse/SPARK-43777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17726261#comment-17726261
 ] 

Kristopher Kane commented on SPARK-43777:
-----------------------------------------

Yes, a colleague pointed out last night that I had missed that in the window 
spec.  

> Coalescing partitions in AQE returns different results with row_number 
> windows. 
> --------------------------------------------------------------------------------
>
>                 Key: SPARK-43777
>                 URL: https://issues.apache.org/jira/browse/SPARK-43777
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.1.3, 3.3.2
>         Environment: SBT based Spark project with unit tests running on 
> spark-testing-base. Tested with Spark 3.1.3 and 3.3.2
>            Reporter: Kristopher Kane
>            Priority: Major
>
> While updating our code base from 3.1 to 3.3, I had a test fail due to wrong 
> results.  With 3.1, we did not proactively turn on AQE in sbt based tests and 
> noticed the failure due to AQE enabled by default between 3.1 and 3.3
> An easily reproducible test: 
> {code:java}
>     val testDataDf = Seq(
>       (1, 1, 0, 0),
>       (1, 1, 0, 0),
>       (1, 1, 0, 0),
>       (1, 0, 0, 1),
>       (1, 0, 0, 1),
>       (2, 0, 0, 0),
>       (2, 0, 1, 0),
>       (2, 1, 0, 0),
>       (3, 0, 1, 0),
>       (3, 0, 1, 0),
>     ).toDF("id", "is_attribute1", "is_attribute2", "is_attribute3")    
> val placeWindowSpec = Window
>       .partitionBy("id")
>       .orderBy($"count".desc)    
> val resultDf: DataFrame = testDataDf
>       .select("id", "is_attribute1", "is_attribute2", "is_attribute3")
>       .withColumn(
>         "place",
>         when($"is_attribute1" === 1, "attribute1")
>           .when($"is_attribute2" === 1, "attribute2")
>           .when($"is_attribute3" === 1, "attribute3")
>           .otherwise("other")
>       )
>       .groupBy("id", "place")
>       .agg(
>         functions.count("*").as("count")
>       )
>       .withColumn(
>         "rank",
>         row_number().over(placeWindowSpec)
>       )    
> resultDf.orderBy("id", "place", "rank").show() {code}
>  
> Various results based on Spark version and AQE settings: 
> {code:java}
> Spark 3.1
> Without AQE
> +---+----------+-----+----+
> | id|     place|count|rank|
> +---+----------+-----+----+
> |  1|attribute1|    3|   1|
> |  1|attribute3|    2|   2|
> |  2|attribute1|    1|   2|
> |  2|attribute2|    1|   1|
> |  2|     other|    1|   3|
> |  3|attribute2|    2|   1|
> +---+----------+-----+----+
> AQE with defaults
> +---+----------+-----+----+
> | id|     place|count|rank|
> +---+----------+-----+----+
> |  1|attribute1|    3|   1|
> |  1|attribute3|    2|   2|
> |  2|attribute1|    1|   2|
> |  2|attribute2|    1|   1|
> |  2|     other|    1|   3|
> |  3|attribute2|    2|   1|
> +---+----------+-----+----+
> AQE with .set("spark.sql.adaptive.coalescePartitions.enabled", "false")
> +---+----------+-----+----+
> | id|     place|count|rank|
> +---+----------+-----+----+
> |  1|attribute1|    3|   1|
> |  1|attribute3|    2|   2|
> |  2|attribute1|    1|   2|
> |  2|attribute2|    1|   1|
> |  2|     other|    1|   3|
> |  3|attribute2|    2|   1|
> +---+----------+-----+----+
> AQE with .set("spark.sql.adaptive.coalescePartitions.initialPartitionNum", 
> "1") - Like Spark 3.3 with AQE defaults
> +---+----------+-----+----+
> | id|     place|count|rank|
> +---+----------+-----+----+
> |  1|attribute1|    3|   1|
> |  1|attribute3|    2|   2|
> |  2|attribute1|    1|   3|
> |  2|attribute2|    1|   2|
> |  2|     other|    1|   1|
> |  3|attribute2|    2|   1|
> ----------------------------------------
> Spark 3.3.2
> ----------------------------------------
> AQE with defaults
> +---+----------+-----+----+
> | id|     place|count|rank|
> +---+----------+-----+----+
> |  1|attribute1|    3|   1|
> |  1|attribute3|    2|   2|
> |  2|attribute1|    1|   3|
> |  2|attribute2|    1|   2|
> |  2|     other|    1|   1|
> |  3|attribute2|    2|   1|
> +---+----------+-----+----+
> --------------------------------------------
> AQE with .set("spark.sql.adaptive.coalescePartitions.enabled", "false") - 
> This matches Spark 3.1
> +---+----------+-----+----+
> | id|     place|count|rank|
> +---+----------+-----+----+
> |  1|attribute1|    3|   1|
> |  1|attribute3|    2|   2|
> |  2|attribute1|    1|   2|
> |  2|attribute2|    1|   1|
> |  2|     other|    1|   3|
> |  3|attribute2|    2|   1|
> +---+----------+-----+----+ {code}
> As you can see, the 'rank' column of row_number(partition by, order by) 
> returns a different rank for id value 2's three attributes based on how AQE 
> coalesces partitions. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to