Kristopher Kane created SPARK-43777:
---------------------------------------

             Summary: Coalescing partitions in AQE returns different results 
with row_number windows. 
                 Key: SPARK-43777
                 URL: https://issues.apache.org/jira/browse/SPARK-43777
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 3.3.2, 3.1.3
         Environment: SBT based Spark project with unit tests running on 
spark-testing-base. Tested with Spark 3.1.3 and 3.3.2
            Reporter: Kristopher Kane


While updating our code base from 3.1 to 3.3, I had a test fail due to wrong 
results.  With 3.1, we did not proactively turn on AQE in sbt based tests and 
noticed the failure due to AQE enabled by default between 3.1 and 3.3

An easily reproducible test: 


{code:java}
    val testDataDf = Seq(
      (1, 1, 0, 0),
      (1, 1, 0, 0),
      (1, 1, 0, 0),
      (1, 0, 0, 1),
      (1, 0, 0, 1),
      (2, 0, 0, 0),
      (2, 0, 1, 0),
      (2, 1, 0, 0),
      (3, 0, 1, 0),
      (3, 0, 1, 0),
    ).toDF("id", "is_attribute1", "is_attribute2", "is_attribute3")    val 
placeWindowSpec = Window
      .partitionBy("id")
      .orderBy($"count".desc)    val resultDf: DataFrame = testDataDf
      .select("id", "is_attribute1", "is_attribute2", "is_attribute3")
      .withColumn(
        "place",
        when($"is_attribute1" === 1, "attribute1")
          .when($"is_attribute2" === 1, "attribute2")
          .when($"is_attribute3" === 1, "attribute3")
          .otherwise("other")
      )
      .groupBy("id", "place")
      .agg(
        functions.count("*").as("count")
      )
      .withColumn(
        "rank",
        row_number().over(placeWindowSpec)
      )    resultDf.orderBy("id", "place", "rank").show() {code}
 

Various results based on Spark version and AQE settings: 


{code:java}
Spark 3.1
Without AQE
+---+----------+-----+----+
| id|     place|count|rank|
+---+----------+-----+----+
|  1|attribute1|    3|   1|
|  1|attribute3|    2|   2|
|  2|attribute1|    1|   2|
|  2|attribute2|    1|   1|
|  2|     other|    1|   3|
|  3|attribute2|    2|   1|
+---+----------+-----+----+

AQE with defaults
+---+----------+-----+----+
| id|     place|count|rank|
+---+----------+-----+----+
|  1|attribute1|    3|   1|
|  1|attribute3|    2|   2|
|  2|attribute1|    1|   2|
|  2|attribute2|    1|   1|
|  2|     other|    1|   3|
|  3|attribute2|    2|   1|
+---+----------+-----+----+

AQE with .set("spark.sql.adaptive.coalescePartitions.enabled", "false")
+---+----------+-----+----+
| id|     place|count|rank|
+---+----------+-----+----+
|  1|attribute1|    3|   1|
|  1|attribute3|    2|   2|
|  2|attribute1|    1|   2|
|  2|attribute2|    1|   1|
|  2|     other|    1|   3|
|  3|attribute2|    2|   1|
+---+----------+-----+----+

AQE with .set("spark.sql.adaptive.coalescePartitions.initialPartitionNum", "1") 
- Like Spark 3.3 with AQE defaults
+---+----------+-----+----+
| id|     place|count|rank|
+---+----------+-----+----+
|  1|attribute1|    3|   1|
|  1|attribute3|    2|   2|
|  2|attribute1|    1|   3|
|  2|attribute2|    1|   2|
|  2|     other|    1|   1|
|  3|attribute2|    2|   1|

----------------------------------------
Spark 3.3.2
----------------------------------------
AQE with defaults - Different than Spark 3.1
+---+----------+-----+----+
| id|     place|count|rank|
+---+----------+-----+----+
|  1|attribute1|    3|   1|
|  1|attribute3|    2|   2|
|  2|attribute1|    1|   3|
|  2|attribute2|    1|   2|
|  2|     other|    1|   1|
|  3|attribute2|    2|   1|
+---+----------+-----+----+
--------------------------------------------
AQE with .set("spark.sql.adaptive.coalescePartitions.enabled", "false") - This 
matches Spark 3.1
+---+----------+-----+----+
| id|     place|count|rank|
+---+----------+-----+----+
|  1|attribute1|    3|   1|
|  1|attribute3|    2|   2|
|  2|attribute1|    1|   2|
|  2|attribute2|    1|   1|
|  2|     other|    1|   3|
|  3|attribute2|    2|   1|
+---+----------+-----+----+ {code}
As you can see, the 'rank' column of row_number(partition by, order by) returns 
a different rank for id value 2's three attributes based on how AQE coalesces 
partitions. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to