This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 19f0a19006c2 [SPARK-54439][SQL] KeyGroupedPartitioning and join key
size mismatch
19f0a19006c2 is described below
commit 19f0a19006c20a448c1994b69007b22b51e45323
Author: Peter Toth <[email protected]>
AuthorDate: Fri Nov 21 14:30:48 2025 -0800
[SPARK-54439][SQL] KeyGroupedPartitioning and join key size mismatch
Fix `KeyGroupedShuffleSpec.createPartitioning()` as clustering required at
the other side of the join might contain more clustering expressions than the
number of expressions in the shuffle spec's `KeyGroupedPartitioning`, so simply
zipping them is not correct.
Fix a correctness issue due to wrong partitioning on the shuffle side.
Yes, it fixes the query.
Added new UT.
No.
Closes #53142 from
peter-toth/SPARK-54439-keygroupedpartitioning-and-join-key-size-mismatch.
Authored-by: Peter Toth <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit 05602d57d9779855991c1447568c259aecdb546e)
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../sql/catalyst/plans/physical/partitioning.scala | 11 +++--
.../connector/KeyGroupedPartitioningSuite.scala | 52 ++++++++++++++++++++++
2 files changed, 59 insertions(+), 4 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
index 038105f9bfdf..dc66b6f30e52 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
@@ -899,10 +899,13 @@ case class KeyGroupedShuffleSpec(
}
override def createPartitioning(clustering: Seq[Expression]): Partitioning =
{
- val newExpressions: Seq[Expression] =
clustering.zip(partitioning.expressions).map {
- case (c, e: TransformExpression) => TransformExpression(
- e.function, Seq(c), e.numBucketsOpt)
- case (c, _) => c
+ assert(clustering.size == distribution.clustering.size,
+ "Required distributions of join legs should be the same size.")
+
+ val newExpressions = partitioning.expressions.zip(keyPositions).map {
+ case (te: TransformExpression, positionSet) =>
+ te.copy(children = te.children.map(_ => clustering(positionSet.head)))
+ case (_, positionSet) => clustering(positionSet.head)
}
KeyGroupedPartitioning(newExpressions,
partitioning.numPartitions,
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
index c73e8e16fbbb..44882f294491 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
@@ -2626,4 +2626,56 @@ class KeyGroupedPartitioningSuite extends
DistributionAndOrderingSuiteBase {
assert(scans.forall(_.inputRDD.partitions.length == 2))
}
}
+
+ test("SPARK-54439: KeyGroupedPartitioning and join key size mismatch") {
+ val items_partitions = Array(identity("id"))
+ createTable(items, itemsColumns, items_partitions)
+
+ sql(s"INSERT INTO testcat.ns.$items VALUES " +
+ "(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " +
+ "(3, 'bb', 10.0, cast('2020-01-01' as timestamp)), " +
+ "(4, 'cc', 15.5, cast('2020-02-01' as timestamp))")
+
+ createTable(purchases, purchasesColumns, Array.empty)
+ sql(s"INSERT INTO testcat.ns.$purchases VALUES " +
+ "(1, 42.0, cast('2020-01-01' as timestamp)), " +
+ "(3, 19.5, cast('2020-02-01' as timestamp))")
+
+ withSQLConf(SQLConf.V2_BUCKETING_SHUFFLE_ENABLED.key -> "true") {
+ // `time` and `item_id` in the required `ClusteredDistribution` for
`purchases`, but `item` is
+ // storage partitioned only by `id`
+ val df = createJoinTestDF(Seq("arrive_time" -> "time", "id" ->
"item_id"))
+ val shuffles = collectShuffles(df.queryExecution.executedPlan)
+ assert(shuffles.size == 1, "only shuffle one side not report
partitioning")
+
+ checkAnswer(df, Seq(Row(1, "aa", 40.0, 42.0)))
+ }
+ }
+
+ test("SPARK-54439: KeyGroupedPartitioning with transform and join key size
mismatch") {
+ // Do not use `bucket()` in "one side partition" tests as its
implementation in
+ // `InMemoryBaseTable` conflicts with `BucketFunction`
+ val items_partitions = Array(years("arrive_time"))
+ createTable(items, itemsColumns, items_partitions)
+
+ sql(s"INSERT INTO testcat.ns.$items VALUES " +
+ "(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " +
+ "(1, 'bb', 10.0, cast('2021-01-01' as timestamp)), " +
+ "(4, 'cc', 15.5, cast('2021-02-01' as timestamp))")
+
+ createTable(purchases, purchasesColumns, Array.empty)
+ sql(s"INSERT INTO testcat.ns.$purchases VALUES " +
+ "(1, 42.0, cast('2020-01-01' as timestamp)), " +
+ "(3, 19.5, cast('2021-02-01' as timestamp))")
+
+ withSQLConf(SQLConf.V2_BUCKETING_SHUFFLE_ENABLED.key -> "true") {
+ // `item_id` and `time` in the required `ClusteredDistribution` for
`purchases`, but `item` is
+ // storage partitioned only by `year(arrive_time)`
+ val df = createJoinTestDF(Seq("id" -> "item_id", "arrive_time" ->
"time"))
+ val shuffles = collectShuffles(df.queryExecution.executedPlan)
+ assert(shuffles.size == 1, "only shuffle one side not report
partitioning")
+
+ checkAnswer(df, Seq(Row(1, "aa", 40.0, 42.0)))
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]