[ 
https://issues.apache.org/jira/browse/SPARK-43777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kristopher Kane updated SPARK-43777:
------------------------------------
    Description: 
While updating our code base from 3.1 to 3.3, I had a test fail due to wrong 
results.  With 3.1, we did not proactively turn on AQE in sbt based tests and 
noticed the failure due to AQE enabled by default between 3.1 and 3.3

An easily reproducible test: 
{code:java}
    val testDataDf = Seq(
      (1, 1, 0, 0),
      (1, 1, 0, 0),
      (1, 1, 0, 0),
      (1, 0, 0, 1),
      (1, 0, 0, 1),
      (2, 0, 0, 0),
      (2, 0, 1, 0),
      (2, 1, 0, 0),
      (3, 0, 1, 0),
      (3, 0, 1, 0),
    ).toDF("id", "is_attribute1", "is_attribute2", "is_attribute3")    val 
placeWindowSpec = Window
      .partitionBy("id")
      .orderBy($"count".desc)    val resultDf: DataFrame = testDataDf
      .select("id", "is_attribute1", "is_attribute2", "is_attribute3")
      .withColumn(
        "place",
        when($"is_attribute1" === 1, "attribute1")
          .when($"is_attribute2" === 1, "attribute2")
          .when($"is_attribute3" === 1, "attribute3")
          .otherwise("other")
      )
      .groupBy("id", "place")
      .agg(
        functions.count("*").as("count")
      )
      .withColumn(
        "rank",
        row_number().over(placeWindowSpec)
      )    


resultDf.orderBy("id", "place", "rank").show() {code}
 

Various results based on Spark version and AQE settings: 
{code:java}
Spark 3.1
Without AQE
+---+----------+-----+----+
| id|     place|count|rank|
+---+----------+-----+----+
|  1|attribute1|    3|   1|
|  1|attribute3|    2|   2|
|  2|attribute1|    1|   2|
|  2|attribute2|    1|   1|
|  2|     other|    1|   3|
|  3|attribute2|    2|   1|
+---+----------+-----+----+

AQE with defaults
+---+----------+-----+----+
| id|     place|count|rank|
+---+----------+-----+----+
|  1|attribute1|    3|   1|
|  1|attribute3|    2|   2|
|  2|attribute1|    1|   2|
|  2|attribute2|    1|   1|
|  2|     other|    1|   3|
|  3|attribute2|    2|   1|
+---+----------+-----+----+

AQE with .set("spark.sql.adaptive.coalescePartitions.enabled", "false")
+---+----------+-----+----+
| id|     place|count|rank|
+---+----------+-----+----+
|  1|attribute1|    3|   1|
|  1|attribute3|    2|   2|
|  2|attribute1|    1|   2|
|  2|attribute2|    1|   1|
|  2|     other|    1|   3|
|  3|attribute2|    2|   1|
+---+----------+-----+----+

AQE with .set("spark.sql.adaptive.coalescePartitions.initialPartitionNum", "1") 
- Like Spark 3.3 with AQE defaults
+---+----------+-----+----+
| id|     place|count|rank|
+---+----------+-----+----+
|  1|attribute1|    3|   1|
|  1|attribute3|    2|   2|
|  2|attribute1|    1|   3|
|  2|attribute2|    1|   2|
|  2|     other|    1|   1|
|  3|attribute2|    2|   1|

----------------------------------------
Spark 3.3.2
----------------------------------------
AQE with defaults
+---+----------+-----+----+
| id|     place|count|rank|
+---+----------+-----+----+
|  1|attribute1|    3|   1|
|  1|attribute3|    2|   2|
|  2|attribute1|    1|   3|
|  2|attribute2|    1|   2|
|  2|     other|    1|   1|
|  3|attribute2|    2|   1|
+---+----------+-----+----+
--------------------------------------------
AQE with .set("spark.sql.adaptive.coalescePartitions.enabled", "false") - This 
matches Spark 3.1
+---+----------+-----+----+
| id|     place|count|rank|
+---+----------+-----+----+
|  1|attribute1|    3|   1|
|  1|attribute3|    2|   2|
|  2|attribute1|    1|   2|
|  2|attribute2|    1|   1|
|  2|     other|    1|   3|
|  3|attribute2|    2|   1|
+---+----------+-----+----+ {code}
As you can see, the 'rank' column of row_number(partition by, order by) returns 
a different rank for id value 2's three attributes based on how AQE coalesces 
partitions. 

  was:
While updating our code base from 3.1 to 3.3, I had a test fail due to wrong 
results.  With 3.1, we did not proactively turn on AQE in sbt based tests and 
noticed the failure due to AQE enabled by default between 3.1 and 3.3

An easily reproducible test: 
{code:java}
    val testDataDf = Seq(
      (1, 1, 0, 0),
      (1, 1, 0, 0),
      (1, 1, 0, 0),
      (1, 0, 0, 1),
      (1, 0, 0, 1),
      (2, 0, 0, 0),
      (2, 0, 1, 0),
      (2, 1, 0, 0),
      (3, 0, 1, 0),
      (3, 0, 1, 0),
    ).toDF("id", "is_attribute1", "is_attribute2", "is_attribute3")    val 
placeWindowSpec = Window
      .partitionBy("id")
      .orderBy($"count".desc)    val resultDf: DataFrame = testDataDf
      .select("id", "is_attribute1", "is_attribute2", "is_attribute3")
      .withColumn(
        "place",
        when($"is_attribute1" === 1, "attribute1")
          .when($"is_attribute2" === 1, "attribute2")
          .when($"is_attribute3" === 1, "attribute3")
          .otherwise("other")
      )
      .groupBy("id", "place")
      .agg(
        functions.count("*").as("count")
      )
      .withColumn(
        "rank",
        row_number().over(placeWindowSpec)
      )    


resultDf.orderBy("id", "place", "rank").show() {code}
 

Various results based on Spark version and AQE settings: 
{code:java}
Spark 3.1
Without AQE
+---+----------+-----+----+
| id|     place|count|rank|
+---+----------+-----+----+
|  1|attribute1|    3|   1|
|  1|attribute3|    2|   2|
|  2|attribute1|    1|   2|
|  2|attribute2|    1|   1|
|  2|     other|    1|   3|
|  3|attribute2|    2|   1|
+---+----------+-----+----+

AQE with defaults
+---+----------+-----+----+
| id|     place|count|rank|
+---+----------+-----+----+
|  1|attribute1|    3|   1|
|  1|attribute3|    2|   2|
|  2|attribute1|    1|   2|
|  2|attribute2|    1|   1|
|  2|     other|    1|   3|
|  3|attribute2|    2|   1|
+---+----------+-----+----+

AQE with .set("spark.sql.adaptive.coalescePartitions.enabled", "false")
+---+----------+-----+----+
| id|     place|count|rank|
+---+----------+-----+----+
|  1|attribute1|    3|   1|
|  1|attribute3|    2|   2|
|  2|attribute1|    1|   2|
|  2|attribute2|    1|   1|
|  2|     other|    1|   3|
|  3|attribute2|    2|   1|
+---+----------+-----+----+

AQE with .set("spark.sql.adaptive.coalescePartitions.initialPartitionNum", "1") 
- Like Spark 3.3 with AQE defaults
+---+----------+-----+----+
| id|     place|count|rank|
+---+----------+-----+----+
|  1|attribute1|    3|   1|
|  1|attribute3|    2|   2|
|  2|attribute1|    1|   3|
|  2|attribute2|    1|   2|
|  2|     other|    1|   1|
|  3|attribute2|    2|   1|

----------------------------------------
Spark 3.3.2
----------------------------------------
AQE with defaults - Different than Spark 3.1
+---+----------+-----+----+
| id|     place|count|rank|
+---+----------+-----+----+
|  1|attribute1|    3|   1|
|  1|attribute3|    2|   2|
|  2|attribute1|    1|   3|
|  2|attribute2|    1|   2|
|  2|     other|    1|   1|
|  3|attribute2|    2|   1|
+---+----------+-----+----+
--------------------------------------------
AQE with .set("spark.sql.adaptive.coalescePartitions.enabled", "false") - This 
matches Spark 3.1
+---+----------+-----+----+
| id|     place|count|rank|
+---+----------+-----+----+
|  1|attribute1|    3|   1|
|  1|attribute3|    2|   2|
|  2|attribute1|    1|   2|
|  2|attribute2|    1|   1|
|  2|     other|    1|   3|
|  3|attribute2|    2|   1|
+---+----------+-----+----+ {code}
As you can see, the 'rank' column of row_number(partition by, order by) returns 
a different rank for id value 2's three attributes based on how AQE coalesces 
partitions. 


> Coalescing partitions in AQE returns different results with row_number 
> windows. 
> --------------------------------------------------------------------------------
>
>                 Key: SPARK-43777
>                 URL: https://issues.apache.org/jira/browse/SPARK-43777
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.1.3, 3.3.2
>         Environment: SBT based Spark project with unit tests running on 
> spark-testing-base. Tested with Spark 3.1.3 and 3.3.2
>            Reporter: Kristopher Kane
>            Priority: Major
>
> While updating our code base from 3.1 to 3.3, I had a test fail due to wrong 
> results.  With 3.1, we did not proactively turn on AQE in sbt based tests and 
> noticed the failure due to AQE enabled by default between 3.1 and 3.3
> An easily reproducible test: 
> {code:java}
>     val testDataDf = Seq(
>       (1, 1, 0, 0),
>       (1, 1, 0, 0),
>       (1, 1, 0, 0),
>       (1, 0, 0, 1),
>       (1, 0, 0, 1),
>       (2, 0, 0, 0),
>       (2, 0, 1, 0),
>       (2, 1, 0, 0),
>       (3, 0, 1, 0),
>       (3, 0, 1, 0),
>     ).toDF("id", "is_attribute1", "is_attribute2", "is_attribute3")    val 
> placeWindowSpec = Window
>       .partitionBy("id")
>       .orderBy($"count".desc)    val resultDf: DataFrame = testDataDf
>       .select("id", "is_attribute1", "is_attribute2", "is_attribute3")
>       .withColumn(
>         "place",
>         when($"is_attribute1" === 1, "attribute1")
>           .when($"is_attribute2" === 1, "attribute2")
>           .when($"is_attribute3" === 1, "attribute3")
>           .otherwise("other")
>       )
>       .groupBy("id", "place")
>       .agg(
>         functions.count("*").as("count")
>       )
>       .withColumn(
>         "rank",
>         row_number().over(placeWindowSpec)
>       )    
> resultDf.orderBy("id", "place", "rank").show() {code}
>  
> Various results based on Spark version and AQE settings: 
> {code:java}
> Spark 3.1
> Without AQE
> +---+----------+-----+----+
> | id|     place|count|rank|
> +---+----------+-----+----+
> |  1|attribute1|    3|   1|
> |  1|attribute3|    2|   2|
> |  2|attribute1|    1|   2|
> |  2|attribute2|    1|   1|
> |  2|     other|    1|   3|
> |  3|attribute2|    2|   1|
> +---+----------+-----+----+
> AQE with defaults
> +---+----------+-----+----+
> | id|     place|count|rank|
> +---+----------+-----+----+
> |  1|attribute1|    3|   1|
> |  1|attribute3|    2|   2|
> |  2|attribute1|    1|   2|
> |  2|attribute2|    1|   1|
> |  2|     other|    1|   3|
> |  3|attribute2|    2|   1|
> +---+----------+-----+----+
> AQE with .set("spark.sql.adaptive.coalescePartitions.enabled", "false")
> +---+----------+-----+----+
> | id|     place|count|rank|
> +---+----------+-----+----+
> |  1|attribute1|    3|   1|
> |  1|attribute3|    2|   2|
> |  2|attribute1|    1|   2|
> |  2|attribute2|    1|   1|
> |  2|     other|    1|   3|
> |  3|attribute2|    2|   1|
> +---+----------+-----+----+
> AQE with .set("spark.sql.adaptive.coalescePartitions.initialPartitionNum", 
> "1") - Like Spark 3.3 with AQE defaults
> +---+----------+-----+----+
> | id|     place|count|rank|
> +---+----------+-----+----+
> |  1|attribute1|    3|   1|
> |  1|attribute3|    2|   2|
> |  2|attribute1|    1|   3|
> |  2|attribute2|    1|   2|
> |  2|     other|    1|   1|
> |  3|attribute2|    2|   1|
> ----------------------------------------
> Spark 3.3.2
> ----------------------------------------
> AQE with defaults
> +---+----------+-----+----+
> | id|     place|count|rank|
> +---+----------+-----+----+
> |  1|attribute1|    3|   1|
> |  1|attribute3|    2|   2|
> |  2|attribute1|    1|   3|
> |  2|attribute2|    1|   2|
> |  2|     other|    1|   1|
> |  3|attribute2|    2|   1|
> +---+----------+-----+----+
> --------------------------------------------
> AQE with .set("spark.sql.adaptive.coalescePartitions.enabled", "false") - 
> This matches Spark 3.1
> +---+----------+-----+----+
> | id|     place|count|rank|
> +---+----------+-----+----+
> |  1|attribute1|    3|   1|
> |  1|attribute3|    2|   2|
> |  2|attribute1|    1|   2|
> |  2|attribute2|    1|   1|
> |  2|     other|    1|   3|
> |  3|attribute2|    2|   1|
> +---+----------+-----+----+ {code}
> As you can see, the 'rank' column of row_number(partition by, order by) 
> returns a different rank for id value 2's three attributes based on how AQE 
> coalesces partitions. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to