This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 19f0a19006c2 [SPARK-54439][SQL] KeyGroupedPartitioning and join key 
size mismatch
19f0a19006c2 is described below

commit 19f0a19006c20a448c1994b69007b22b51e45323
Author: Peter Toth <[email protected]>
AuthorDate: Fri Nov 21 14:30:48 2025 -0800

    [SPARK-54439][SQL] KeyGroupedPartitioning and join key size mismatch
    
    Fix `KeyGroupedShuffleSpec.createPartitioning()` as clustering required at 
the other side of the join might contain more clustering expressions than the 
number of expressions in the shuffle spec's `KeyGroupedPartitioning`, so simply 
zipping them is not correct.
    
    Fix a correctness issue due to wrong partitioning on the shuffle side.
    
    Yes, it fixes the query.
    
    Added new UT.
    
    No.
    
    Closes #53142 from 
peter-toth/SPARK-54439-keygroupedpartitioning-and-join-key-size-mismatch.
    
    Authored-by: Peter Toth <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
    (cherry picked from commit 05602d57d9779855991c1447568c259aecdb546e)
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../sql/catalyst/plans/physical/partitioning.scala | 11 +++--
 .../connector/KeyGroupedPartitioningSuite.scala    | 52 ++++++++++++++++++++++
 2 files changed, 59 insertions(+), 4 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
index 038105f9bfdf..dc66b6f30e52 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
@@ -899,10 +899,13 @@ case class KeyGroupedShuffleSpec(
       }
 
   override def createPartitioning(clustering: Seq[Expression]): Partitioning = 
{
-    val newExpressions: Seq[Expression] = 
clustering.zip(partitioning.expressions).map {
-      case (c, e: TransformExpression) => TransformExpression(
-        e.function, Seq(c), e.numBucketsOpt)
-      case (c, _) => c
+    assert(clustering.size == distribution.clustering.size,
+      "Required distributions of join legs should be the same size.")
+
+    val newExpressions = partitioning.expressions.zip(keyPositions).map {
+      case (te: TransformExpression, positionSet) =>
+        te.copy(children = te.children.map(_ => clustering(positionSet.head)))
+      case (_, positionSet) => clustering(positionSet.head)
     }
     KeyGroupedPartitioning(newExpressions,
       partitioning.numPartitions,
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
index c73e8e16fbbb..44882f294491 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
@@ -2626,4 +2626,56 @@ class KeyGroupedPartitioningSuite extends 
DistributionAndOrderingSuiteBase {
       assert(scans.forall(_.inputRDD.partitions.length == 2))
     }
   }
+
+  test("SPARK-54439: KeyGroupedPartitioning and join key size mismatch") {
+    val items_partitions = Array(identity("id"))
+    createTable(items, itemsColumns, items_partitions)
+
+    sql(s"INSERT INTO testcat.ns.$items VALUES " +
+      "(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " +
+      "(3, 'bb', 10.0, cast('2020-01-01' as timestamp)), " +
+      "(4, 'cc', 15.5, cast('2020-02-01' as timestamp))")
+
+    createTable(purchases, purchasesColumns, Array.empty)
+    sql(s"INSERT INTO testcat.ns.$purchases VALUES " +
+      "(1, 42.0, cast('2020-01-01' as timestamp)), " +
+      "(3, 19.5, cast('2020-02-01' as timestamp))")
+
+    withSQLConf(SQLConf.V2_BUCKETING_SHUFFLE_ENABLED.key -> "true") {
+      // `time` and `item_id` in the required `ClusteredDistribution` for 
`purchases`, but `item` is
+      // storage partitioned only by `id`
+      val df = createJoinTestDF(Seq("arrive_time" -> "time", "id" -> 
"item_id"))
+      val shuffles = collectShuffles(df.queryExecution.executedPlan)
+      assert(shuffles.size == 1, "only shuffle one side not report 
partitioning")
+
+      checkAnswer(df, Seq(Row(1, "aa", 40.0, 42.0)))
+    }
+  }
+
+  test("SPARK-54439: KeyGroupedPartitioning with transform and join key size 
mismatch") {
+    // Do not use `bucket()` in "one side partition" tests as its 
implementation in
+    // `InMemoryBaseTable` conflicts with `BucketFunction`
+    val items_partitions = Array(years("arrive_time"))
+    createTable(items, itemsColumns, items_partitions)
+
+    sql(s"INSERT INTO testcat.ns.$items VALUES " +
+      "(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " +
+      "(1, 'bb', 10.0, cast('2021-01-01' as timestamp)), " +
+      "(4, 'cc', 15.5, cast('2021-02-01' as timestamp))")
+
+    createTable(purchases, purchasesColumns, Array.empty)
+    sql(s"INSERT INTO testcat.ns.$purchases VALUES " +
+      "(1, 42.0, cast('2020-01-01' as timestamp)), " +
+      "(3, 19.5, cast('2021-02-01' as timestamp))")
+
+    withSQLConf(SQLConf.V2_BUCKETING_SHUFFLE_ENABLED.key -> "true") {
+      // `item_id` and `time` in the required `ClusteredDistribution` for 
`purchases`, but `item` is
+      // storage partitioned only by `year(arrive_time)`
+      val df = createJoinTestDF(Seq("id" -> "item_id", "arrive_time" -> 
"time"))
+      val shuffles = collectShuffles(df.queryExecution.executedPlan)
+      assert(shuffles.size == 1, "only shuffle one side not report 
partitioning")
+
+      checkAnswer(df, Seq(Row(1, "aa", 40.0, 42.0)))
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to