This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new eace7d370213 [SPARK-45592][SPARK-45282][SQL][3.4] Correctness issue in 
AQE with InMemoryTableScanExec
eace7d370213 is described below

commit eace7d370213f8498107cb14cd85f854a4d4d1df
Author: Emil Ejbyfeldt <eejbyfe...@liveintent.com>
AuthorDate: Sun Nov 12 13:54:01 2023 -0800

    [SPARK-45592][SPARK-45282][SQL][3.4] Correctness issue in AQE with 
InMemoryTableScanExec
    
    ### What changes were proposed in this pull request?
    
    This backports https://github.com/apache/spark/pull/43435 SPARK-45592  to 
the 3.4 branch. This is because it was already reported there as SPARK-45282 
but it required enabling some extra configuration to hit the bug.
    
    ### Why are the changes needed?
    
    Fix correctness issue.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, fixing correctness issue.
    
    ### How was this patch tested?
    
    New tests based on the reproduction example in SPARK-45282
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #43729 from eejbyfeldt/SPARK-45282.
    
    Authored-by: Emil Ejbyfeldt <eejbyfe...@liveintent.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../sql/catalyst/plans/physical/partitioning.scala |  49 +++
 .../spark/sql/catalyst/DistributionSuite.scala     | 124 ++++---
 .../spark/sql/catalyst/ShuffleSpecSuite.scala      | 401 ++++++++++++---------
 .../execution/adaptive/AQEShuffleReadExec.scala    |  11 +-
 .../scala/org/apache/spark/sql/DatasetSuite.scala  |  29 ++
 .../WriteDistributionAndOrderingSuite.scala        |  53 ++-
 6 files changed, 401 insertions(+), 266 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
index d2f9e9b5d5bf..1eefe65859bd 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
@@ -306,6 +306,35 @@ case class HashPartitioning(expressions: Seq[Expression], 
numPartitions: Int)
 
   override protected def withNewChildrenInternal(
     newChildren: IndexedSeq[Expression]): HashPartitioning = copy(expressions 
= newChildren)
+
+}
+
+case class CoalescedBoundary(startReducerIndex: Int, endReducerIndex: Int)
+
+/**
+ * Represents a partitioning where partitions have been coalesced from a 
HashPartitioning into a
+ * fewer number of partitions.
+ */
+case class CoalescedHashPartitioning(from: HashPartitioning, partitions: 
Seq[CoalescedBoundary])
+  extends Expression with Partitioning with Unevaluable {
+
+  override def children: Seq[Expression] = from.expressions
+  override def nullable: Boolean = from.nullable
+  override def dataType: DataType = from.dataType
+
+  override def satisfies0(required: Distribution): Boolean = 
from.satisfies0(required)
+
+  override def createShuffleSpec(distribution: ClusteredDistribution): 
ShuffleSpec =
+    CoalescedHashShuffleSpec(from.createShuffleSpec(distribution), partitions)
+
+  override protected def withNewChildrenInternal(
+    newChildren: IndexedSeq[Expression]): CoalescedHashPartitioning =
+      copy(from = from.copy(expressions = newChildren))
+
+  override val numPartitions: Int = partitions.length
+
+  override def toString: String = from.toString
+  override def sql: String = from.sql
 }
 
 /**
@@ -661,6 +690,26 @@ case class HashShuffleSpec(
   override def numPartitions: Int = partitioning.numPartitions
 }
 
+case class CoalescedHashShuffleSpec(
+    from: ShuffleSpec,
+    partitions: Seq[CoalescedBoundary]) extends ShuffleSpec {
+
+  override def isCompatibleWith(other: ShuffleSpec): Boolean = other match {
+    case SinglePartitionShuffleSpec =>
+      numPartitions == 1
+    case CoalescedHashShuffleSpec(otherParent, otherPartitions) =>
+      partitions == otherPartitions && from.isCompatibleWith(otherParent)
+    case ShuffleSpecCollection(specs) =>
+      specs.exists(isCompatibleWith)
+    case _ =>
+      false
+  }
+
+  override def canCreatePartitioning: Boolean = false
+
+  override def numPartitions: Int = partitions.length
+}
+
 case class KeyGroupedShuffleSpec(
     partitioning: KeyGroupedPartitioning,
     distribution: ClusteredDistribution) extends ShuffleSpec {
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/DistributionSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/DistributionSuite.scala
index a924a9ed02e5..7cb4d5f12325 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/DistributionSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/DistributionSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst
 import org.apache.spark.SparkFunSuite
 /* Implicit conversions */
 import org.apache.spark.sql.catalyst.dsl.expressions._
-import org.apache.spark.sql.catalyst.expressions.{Literal, Murmur3Hash, Pmod}
+import org.apache.spark.sql.catalyst.expressions.{Expression, Literal, 
Murmur3Hash, Pmod}
 import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.types.IntegerType
 
@@ -146,63 +146,75 @@ class DistributionSuite extends SparkFunSuite {
       false)
   }
 
-  test("HashPartitioning is the output partitioning") {
-    // HashPartitioning can satisfy ClusteredDistribution iff its hash 
expressions are a subset of
-    // the required clustering expressions.
-    checkSatisfied(
-      HashPartitioning(Seq($"a", $"b", $"c"), 10),
-      ClusteredDistribution(Seq($"a", $"b", $"c")),
-      true)
-
-    checkSatisfied(
-      HashPartitioning(Seq($"b", $"c"), 10),
-      ClusteredDistribution(Seq($"a", $"b", $"c")),
-      true)
-
-    checkSatisfied(
-      HashPartitioning(Seq($"a", $"b", $"c"), 10),
-      ClusteredDistribution(Seq($"b", $"c")),
-      false)
-
-    checkSatisfied(
-      HashPartitioning(Seq($"a", $"b", $"c"), 10),
-      ClusteredDistribution(Seq($"d", $"e")),
-      false)
-
-    // When ClusteredDistribution.requireAllClusterKeys is set to true,
-    // HashPartitioning can only satisfy ClusteredDistribution iff its hash 
expressions are
-    // exactly same as the required clustering expressions.
-    checkSatisfied(
-      HashPartitioning(Seq($"a", $"b", $"c"), 10),
-      ClusteredDistribution(Seq($"a", $"b", $"c"), requireAllClusterKeys = 
true),
-      true)
-
-    checkSatisfied(
-      HashPartitioning(Seq($"b", $"c"), 10),
-      ClusteredDistribution(Seq($"a", $"b", $"c"), requireAllClusterKeys = 
true),
-      false)
-
-    checkSatisfied(
-      HashPartitioning(Seq($"b", $"a", $"c"), 10),
-      ClusteredDistribution(Seq($"a", $"b", $"c"), requireAllClusterKeys = 
true),
-      false)
-
-    // HashPartitioning cannot satisfy OrderedDistribution
-    checkSatisfied(
-      HashPartitioning(Seq($"a", $"b", $"c"), 10),
-      OrderedDistribution(Seq($"a".asc, $"b".asc, $"c".asc)),
-      false)
+  private def testHashPartitioningLike(
+      partitioningName: String,
+      create: (Seq[Expression], Int) => Partitioning): Unit = {
+
+    test(s"$partitioningName is the output partitioning") {
+      // HashPartitioning can satisfy ClusteredDistribution iff its hash 
expressions are a subset of
+      // the required clustering expressions.
+      checkSatisfied(
+        create(Seq($"a", $"b", $"c"), 10),
+        ClusteredDistribution(Seq($"a", $"b", $"c")),
+        true)
+
+      checkSatisfied(
+        create(Seq($"b", $"c"), 10),
+        ClusteredDistribution(Seq($"a", $"b", $"c")),
+        true)
+
+      checkSatisfied(
+        create(Seq($"a", $"b", $"c"), 10),
+        ClusteredDistribution(Seq($"b", $"c")),
+        false)
+
+      checkSatisfied(
+        create(Seq($"a", $"b", $"c"), 10),
+        ClusteredDistribution(Seq($"d", $"e")),
+        false)
+
+      // When ClusteredDistribution.requireAllClusterKeys is set to true,
+      // HashPartitioning can only satisfy ClusteredDistribution iff its hash 
expressions are
+      // exactly same as the required clustering expressions.
+      checkSatisfied(
+        create(Seq($"a", $"b", $"c"), 10),
+        ClusteredDistribution(Seq($"a", $"b", $"c"), requireAllClusterKeys = 
true),
+        true)
+
+      checkSatisfied(
+        create(Seq($"b", $"c"), 10),
+        ClusteredDistribution(Seq($"a", $"b", $"c"), requireAllClusterKeys = 
true),
+        false)
+
+      checkSatisfied(
+        create(Seq($"b", $"a", $"c"), 10),
+        ClusteredDistribution(Seq($"a", $"b", $"c"), requireAllClusterKeys = 
true),
+        false)
+
+      // HashPartitioning cannot satisfy OrderedDistribution
+      checkSatisfied(
+        create(Seq($"a", $"b", $"c"), 10),
+        OrderedDistribution(Seq($"a".asc, $"b".asc, $"c".asc)),
+        false)
+
+      checkSatisfied(
+        create(Seq($"a", $"b", $"c"), 1),
+        OrderedDistribution(Seq($"a".asc, $"b".asc, $"c".asc)),
+        false) // TODO: this can be relaxed.
+
+      checkSatisfied(
+        create(Seq($"b", $"c"), 10),
+        OrderedDistribution(Seq($"a".asc, $"b".asc, $"c".asc)),
+        false)
+    }
+  }
 
-    checkSatisfied(
-      HashPartitioning(Seq($"a", $"b", $"c"), 1),
-      OrderedDistribution(Seq($"a".asc, $"b".asc, $"c".asc)),
-      false) // TODO: this can be relaxed.
+  testHashPartitioningLike("HashPartitioning",
+    (expressions, numPartitions) => HashPartitioning(expressions, 
numPartitions))
 
-    checkSatisfied(
-      HashPartitioning(Seq($"b", $"c"), 10),
-      OrderedDistribution(Seq($"a".asc, $"b".asc, $"c".asc)),
-      false)
-  }
+  testHashPartitioningLike("CoalescedHashPartitioning", (expressions, 
numPartitions) =>
+      CoalescedHashPartitioning(
+        HashPartitioning(expressions, numPartitions), Seq(CoalescedBoundary(0, 
numPartitions))))
 
   test("RangePartitioning is the output partitioning") {
     // RangePartitioning can satisfy OrderedDistribution iff its ordering is a 
prefix
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ShuffleSpecSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ShuffleSpecSuite.scala
index 51e768873226..6b069d1c9736 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ShuffleSpecSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ShuffleSpecSuite.scala
@@ -62,211 +62,254 @@ class ShuffleSpecSuite extends SparkFunSuite with 
SQLHelper {
     }
   }
 
-  test("compatibility: HashShuffleSpec on both sides") {
-    checkCompatible(
-      HashShuffleSpec(HashPartitioning(Seq($"a", $"b"), 10),
-        ClusteredDistribution(Seq($"a", $"b"))),
-      HashShuffleSpec(HashPartitioning(Seq($"a", $"b"), 10),
-        ClusteredDistribution(Seq($"a", $"b"))),
-      expected = true
-    )
-
-    checkCompatible(
-      HashShuffleSpec(HashPartitioning(Seq($"a"), 10), 
ClusteredDistribution(Seq($"a", $"b"))),
-      HashShuffleSpec(HashPartitioning(Seq($"a"), 10), 
ClusteredDistribution(Seq($"a", $"b"))),
-      expected = true
-    )
+  private def testHashShuffleSpecLike(
+      shuffleSpecName: String,
+      create: (HashPartitioning, ClusteredDistribution) => ShuffleSpec): Unit 
= {
 
-    checkCompatible(
-      HashShuffleSpec(HashPartitioning(Seq($"b"), 10), 
ClusteredDistribution(Seq($"a", $"b"))),
-      HashShuffleSpec(HashPartitioning(Seq($"d"), 10), 
ClusteredDistribution(Seq($"c", $"d"))),
-      expected = true
-    )
+    test(s"compatibility: $shuffleSpecName on both sides") {
+      checkCompatible(
+        create(HashPartitioning(Seq($"a", $"b"), 10),
+          ClusteredDistribution(Seq($"a", $"b"))),
+        create(HashPartitioning(Seq($"a", $"b"), 10),
+          ClusteredDistribution(Seq($"a", $"b"))),
+        expected = true
+      )
 
-    checkCompatible(
-      HashShuffleSpec(HashPartitioning(Seq($"a", $"a", $"b"), 10),
-        ClusteredDistribution(Seq($"a", $"b"))),
-      HashShuffleSpec(HashPartitioning(Seq($"c", $"c", $"d"), 10),
-        ClusteredDistribution(Seq($"c", $"d"))),
-      expected = true
-    )
+      checkCompatible(
+        create(HashPartitioning(Seq($"a"), 10), 
ClusteredDistribution(Seq($"a", $"b"))),
+        create(HashPartitioning(Seq($"a"), 10), 
ClusteredDistribution(Seq($"a", $"b"))),
+        expected = true
+      )
 
-    checkCompatible(
-      HashShuffleSpec(HashPartitioning(Seq($"a", $"b"), 10),
-        ClusteredDistribution(Seq($"a", $"b", $"b"))),
-      HashShuffleSpec(HashPartitioning(Seq($"a", $"d"), 10),
-        ClusteredDistribution(Seq($"a", $"c", $"d"))),
-      expected = true
-    )
+      checkCompatible(
+        create(HashPartitioning(Seq($"b"), 10), 
ClusteredDistribution(Seq($"a", $"b"))),
+        create(HashPartitioning(Seq($"d"), 10), 
ClusteredDistribution(Seq($"c", $"d"))),
+        expected = true
+      )
 
-    checkCompatible(
-      HashShuffleSpec(HashPartitioning(Seq($"a", $"b", $"a"), 10),
-        ClusteredDistribution(Seq($"a", $"b", $"b"))),
-      HashShuffleSpec(HashPartitioning(Seq($"a", $"c", $"a"), 10),
-        ClusteredDistribution(Seq($"a", $"c", $"c"))),
-      expected = true
-    )
+      checkCompatible(
+        create(HashPartitioning(Seq($"a", $"a", $"b"), 10),
+          ClusteredDistribution(Seq($"a", $"b"))),
+        create(HashPartitioning(Seq($"c", $"c", $"d"), 10),
+          ClusteredDistribution(Seq($"c", $"d"))),
+        expected = true
+      )
 
-    checkCompatible(
-      HashShuffleSpec(HashPartitioning(Seq($"a", $"b", $"a"), 10),
-        ClusteredDistribution(Seq($"a", $"b", $"b"))),
-      HashShuffleSpec(HashPartitioning(Seq($"a", $"c", $"a"), 10),
-        ClusteredDistribution(Seq($"a", $"c", $"d"))),
-      expected = true
-    )
+      checkCompatible(
+        create(HashPartitioning(Seq($"a", $"b"), 10),
+          ClusteredDistribution(Seq($"a", $"b", $"b"))),
+        create(HashPartitioning(Seq($"a", $"d"), 10),
+          ClusteredDistribution(Seq($"a", $"c", $"d"))),
+        expected = true
+      )
 
-    // negative cases
-    checkCompatible(
-      HashShuffleSpec(HashPartitioning(Seq($"a"), 10),
-        ClusteredDistribution(Seq($"a", $"b"))),
-      HashShuffleSpec(HashPartitioning(Seq($"c"), 5),
-        ClusteredDistribution(Seq($"c", $"d"))),
-      expected = false
-    )
+      checkCompatible(
+        create(HashPartitioning(Seq($"a", $"b", $"a"), 10),
+          ClusteredDistribution(Seq($"a", $"b", $"b"))),
+        create(HashPartitioning(Seq($"a", $"c", $"a"), 10),
+          ClusteredDistribution(Seq($"a", $"c", $"c"))),
+        expected = true
+      )
 
-    checkCompatible(
-      HashShuffleSpec(HashPartitioning(Seq($"a", $"b"), 10),
-        ClusteredDistribution(Seq($"a", $"b"))),
-      HashShuffleSpec(HashPartitioning(Seq($"b"), 10),
-        ClusteredDistribution(Seq($"a", $"b"))),
-      expected = false
-    )
+      checkCompatible(
+        create(HashPartitioning(Seq($"a", $"b", $"a"), 10),
+          ClusteredDistribution(Seq($"a", $"b", $"b"))),
+        create(HashPartitioning(Seq($"a", $"c", $"a"), 10),
+          ClusteredDistribution(Seq($"a", $"c", $"d"))),
+        expected = true
+      )
 
-    checkCompatible(
-      HashShuffleSpec(HashPartitioning(Seq($"a"), 10),
-        ClusteredDistribution(Seq($"a", $"b"))),
-      HashShuffleSpec(HashPartitioning(Seq($"b"), 10),
-        ClusteredDistribution(Seq($"a", $"b"))),
-      expected = false
-    )
+      // negative cases
+      checkCompatible(
+        create(HashPartitioning(Seq($"a"), 10),
+          ClusteredDistribution(Seq($"a", $"b"))),
+        create(HashPartitioning(Seq($"c"), 5),
+          ClusteredDistribution(Seq($"c", $"d"))),
+        expected = false
+      )
 
-    checkCompatible(
-      HashShuffleSpec(HashPartitioning(Seq($"a"), 10),
-        ClusteredDistribution(Seq($"a", $"b"))),
-      HashShuffleSpec(HashPartitioning(Seq($"d"), 10),
-        ClusteredDistribution(Seq($"c", $"d"))),
-      expected = false
-    )
+      checkCompatible(
+        create(HashPartitioning(Seq($"a", $"b"), 10),
+          ClusteredDistribution(Seq($"a", $"b"))),
+        create(HashPartitioning(Seq($"b"), 10),
+          ClusteredDistribution(Seq($"a", $"b"))),
+        expected = false
+      )
 
-    checkCompatible(
-      HashShuffleSpec(HashPartitioning(Seq($"a"), 10),
-        ClusteredDistribution(Seq($"a", $"b"))),
-      HashShuffleSpec(HashPartitioning(Seq($"d"), 10),
-        ClusteredDistribution(Seq($"c", $"d"))),
-      expected = false
-    )
+      checkCompatible(
+        create(HashPartitioning(Seq($"a"), 10),
+          ClusteredDistribution(Seq($"a", $"b"))),
+        create(HashPartitioning(Seq($"b"), 10),
+          ClusteredDistribution(Seq($"a", $"b"))),
+        expected = false
+      )
 
-    checkCompatible(
-      HashShuffleSpec(HashPartitioning(Seq($"a", $"a", $"b"), 10),
-        ClusteredDistribution(Seq($"a", $"b"))),
-      HashShuffleSpec(HashPartitioning(Seq($"a", $"b", $"a"), 10),
-        ClusteredDistribution(Seq($"a", $"b"))),
-      expected = false
-    )
+      checkCompatible(
+        create(HashPartitioning(Seq($"a"), 10),
+          ClusteredDistribution(Seq($"a", $"b"))),
+        create(HashPartitioning(Seq($"d"), 10),
+          ClusteredDistribution(Seq($"c", $"d"))),
+        expected = false
+      )
 
-    checkCompatible(
-      HashShuffleSpec(HashPartitioning(Seq($"a", $"a", $"b"), 10),
-        ClusteredDistribution(Seq($"a", $"b", $"b"))),
-      HashShuffleSpec(HashPartitioning(Seq($"a", $"b", $"a"), 10),
-        ClusteredDistribution(Seq($"a", $"b", $"b"))),
-      expected = false
-    )
-  }
+      checkCompatible(
+        create(HashPartitioning(Seq($"a"), 10),
+          ClusteredDistribution(Seq($"a", $"b"))),
+        create(HashPartitioning(Seq($"d"), 10),
+          ClusteredDistribution(Seq($"c", $"d"))),
+        expected = false
+      )
 
-  test("compatibility: Only one side is HashShuffleSpec") {
-    checkCompatible(
-      HashShuffleSpec(HashPartitioning(Seq($"a", $"b"), 10),
-        ClusteredDistribution(Seq($"a", $"b"))),
-      SinglePartitionShuffleSpec,
-      expected = false
-    )
+      checkCompatible(
+        create(HashPartitioning(Seq($"a", $"a", $"b"), 10),
+          ClusteredDistribution(Seq($"a", $"b"))),
+        create(HashPartitioning(Seq($"a", $"b", $"a"), 10),
+          ClusteredDistribution(Seq($"a", $"b"))),
+        expected = false
+      )
 
-    checkCompatible(
-      HashShuffleSpec(HashPartitioning(Seq($"a", $"b"), 1),
-        ClusteredDistribution(Seq($"a", $"b"))),
-      SinglePartitionShuffleSpec,
-      expected = true
-    )
+      checkCompatible(
+        create(HashPartitioning(Seq($"a", $"a", $"b"), 10),
+          ClusteredDistribution(Seq($"a", $"b", $"b"))),
+        create(HashPartitioning(Seq($"a", $"b", $"a"), 10),
+          ClusteredDistribution(Seq($"a", $"b", $"b"))),
+        expected = false
+      )
+    }
 
-    checkCompatible(
-      SinglePartitionShuffleSpec,
-      HashShuffleSpec(HashPartitioning(Seq($"a", $"b"), 1),
-        ClusteredDistribution(Seq($"a", $"b"))),
-      expected = true
-    )
+    test(s"compatibility: Only one side is $shuffleSpecName") {
+      checkCompatible(
+        create(HashPartitioning(Seq($"a", $"b"), 10),
+          ClusteredDistribution(Seq($"a", $"b"))),
+        SinglePartitionShuffleSpec,
+        expected = false
+      )
 
-    checkCompatible(
-      HashShuffleSpec(HashPartitioning(Seq($"a", $"b"), 10),
-        ClusteredDistribution(Seq($"a", $"b"))),
-      RangeShuffleSpec(10, ClusteredDistribution(Seq($"a", $"b"))),
-      expected = false
-    )
+      checkCompatible(
+        create(HashPartitioning(Seq($"a", $"b"), 1),
+          ClusteredDistribution(Seq($"a", $"b"))),
+        SinglePartitionShuffleSpec,
+        expected = true
+      )
 
-    checkCompatible(
-      RangeShuffleSpec(10, ClusteredDistribution(Seq($"a", $"b"))),
-      HashShuffleSpec(HashPartitioning(Seq($"a", $"b"), 10),
-        ClusteredDistribution(Seq($"a", $"b"))),
-      expected = false
-    )
+      checkCompatible(
+        SinglePartitionShuffleSpec,
+        create(HashPartitioning(Seq($"a", $"b"), 1),
+          ClusteredDistribution(Seq($"a", $"b"))),
+        expected = true
+      )
 
-    checkCompatible(
-      HashShuffleSpec(HashPartitioning(Seq($"a", $"b"), 10),
-        ClusteredDistribution(Seq($"a", $"b"))),
-      ShuffleSpecCollection(Seq(
-        HashShuffleSpec(HashPartitioning(Seq($"a", $"b"), 10),
-          ClusteredDistribution(Seq($"a", $"b"))))),
-      expected = true
-    )
+      checkCompatible(
+        create(HashPartitioning(Seq($"a", $"b"), 10),
+          ClusteredDistribution(Seq($"a", $"b"))),
+        RangeShuffleSpec(10, ClusteredDistribution(Seq($"a", $"b"))),
+        expected = false
+      )
 
-    checkCompatible(
-      HashShuffleSpec(HashPartitioning(Seq($"a", $"b"), 10),
-        ClusteredDistribution(Seq($"a", $"b"))),
-      ShuffleSpecCollection(Seq(
-        HashShuffleSpec(HashPartitioning(Seq($"a"), 10),
+      checkCompatible(
+        RangeShuffleSpec(10, ClusteredDistribution(Seq($"a", $"b"))),
+        create(HashPartitioning(Seq($"a", $"b"), 10),
           ClusteredDistribution(Seq($"a", $"b"))),
-        HashShuffleSpec(HashPartitioning(Seq($"a", $"b"), 10),
-          ClusteredDistribution(Seq($"a", $"b"))))),
-      expected = true
-    )
+        expected = false
+      )
 
-    checkCompatible(
-      HashShuffleSpec(HashPartitioning(Seq($"a", $"b"), 10),
-        ClusteredDistribution(Seq($"a", $"b"))),
-      ShuffleSpecCollection(Seq(
-        HashShuffleSpec(HashPartitioning(Seq($"a"), 10),
+      checkCompatible(
+        create(HashPartitioning(Seq($"a", $"b"), 10),
           ClusteredDistribution(Seq($"a", $"b"))),
-        HashShuffleSpec(HashPartitioning(Seq($"a", $"b", $"c"), 10),
-          ClusteredDistribution(Seq($"a", $"b", $"c"))))),
-      expected = false
-    )
+        ShuffleSpecCollection(Seq(
+          create(HashPartitioning(Seq($"a", $"b"), 10),
+            ClusteredDistribution(Seq($"a", $"b"))))),
+        expected = true
+      )
 
-    checkCompatible(
-      ShuffleSpecCollection(Seq(
-        HashShuffleSpec(HashPartitioning(Seq($"b"), 10),
+      checkCompatible(
+        create(HashPartitioning(Seq($"a", $"b"), 10),
           ClusteredDistribution(Seq($"a", $"b"))),
-        HashShuffleSpec(HashPartitioning(Seq($"a", $"b"), 10),
-          ClusteredDistribution(Seq($"a", $"b"))))),
-      ShuffleSpecCollection(Seq(
-        HashShuffleSpec(HashPartitioning(Seq($"a", $"b", $"c"), 10),
-          ClusteredDistribution(Seq($"a", $"b", $"c"))),
-        HashShuffleSpec(HashPartitioning(Seq($"d"), 10),
-          ClusteredDistribution(Seq($"c", $"d"))))),
-      expected = true
-    )
+        ShuffleSpecCollection(Seq(
+          create(HashPartitioning(Seq($"a"), 10),
+            ClusteredDistribution(Seq($"a", $"b"))),
+          create(HashPartitioning(Seq($"a", $"b"), 10),
+            ClusteredDistribution(Seq($"a", $"b"))))),
+        expected = true
+      )
 
-    checkCompatible(
-      ShuffleSpecCollection(Seq(
-        HashShuffleSpec(HashPartitioning(Seq($"b"), 10),
+      checkCompatible(
+        create(HashPartitioning(Seq($"a", $"b"), 10),
           ClusteredDistribution(Seq($"a", $"b"))),
-        HashShuffleSpec(HashPartitioning(Seq($"a", $"b"), 10),
-          ClusteredDistribution(Seq($"a", $"b"))))),
-      ShuffleSpecCollection(Seq(
-        HashShuffleSpec(HashPartitioning(Seq($"a", $"b", $"c"), 10),
-          ClusteredDistribution(Seq($"a", $"b", $"c"))),
-        HashShuffleSpec(HashPartitioning(Seq($"c"), 10),
-          ClusteredDistribution(Seq($"c", $"d"))))),
-      expected = false
-    )
+        ShuffleSpecCollection(Seq(
+          create(HashPartitioning(Seq($"a"), 10),
+            ClusteredDistribution(Seq($"a", $"b"))),
+          create(HashPartitioning(Seq($"a", $"b", $"c"), 10),
+            ClusteredDistribution(Seq($"a", $"b", $"c"))))),
+        expected = false
+      )
+
+      checkCompatible(
+        ShuffleSpecCollection(Seq(
+          create(HashPartitioning(Seq($"b"), 10),
+            ClusteredDistribution(Seq($"a", $"b"))),
+          create(HashPartitioning(Seq($"a", $"b"), 10),
+            ClusteredDistribution(Seq($"a", $"b"))))),
+        ShuffleSpecCollection(Seq(
+          create(HashPartitioning(Seq($"a", $"b", $"c"), 10),
+            ClusteredDistribution(Seq($"a", $"b", $"c"))),
+          create(HashPartitioning(Seq($"d"), 10),
+            ClusteredDistribution(Seq($"c", $"d"))))),
+        expected = true
+      )
+
+      checkCompatible(
+        ShuffleSpecCollection(Seq(
+          create(HashPartitioning(Seq($"b"), 10),
+            ClusteredDistribution(Seq($"a", $"b"))),
+          create(HashPartitioning(Seq($"a", $"b"), 10),
+            ClusteredDistribution(Seq($"a", $"b"))))),
+        ShuffleSpecCollection(Seq(
+          create(HashPartitioning(Seq($"a", $"b", $"c"), 10),
+            ClusteredDistribution(Seq($"a", $"b", $"c"))),
+          create(HashPartitioning(Seq($"c"), 10),
+            ClusteredDistribution(Seq($"c", $"d"))))),
+        expected = false
+      )
+    }
+  }
+
+  testHashShuffleSpecLike("HashShuffleSpec",
+    (partitioning, distribution) => HashShuffleSpec(partitioning, 
distribution))
+   testHashShuffleSpecLike("CoalescedHashShuffleSpec",
+    (partitioning, distribution) => {
+      val partitions = if (partitioning.numPartitions == 1) {
+        Seq(CoalescedBoundary(0, 1))
+      } else {
+        Seq(CoalescedBoundary(0, 1), CoalescedBoundary(0, 
partitioning.numPartitions))
+      }
+      CoalescedHashShuffleSpec(HashShuffleSpec(partitioning, distribution), 
partitions)
+  })
+
+  test("compatibility: CoalescedHashShuffleSpec other specs") {
+      val hashShuffleSpec = HashShuffleSpec(
+        HashPartitioning(Seq($"a", $"b"), 10), ClusteredDistribution(Seq($"a", 
$"b")))
+      checkCompatible(
+        hashShuffleSpec,
+        CoalescedHashShuffleSpec(hashShuffleSpec, Seq(CoalescedBoundary(0, 
10))),
+        expected = false
+      )
+
+      checkCompatible(
+        CoalescedHashShuffleSpec(hashShuffleSpec,
+          Seq(CoalescedBoundary(0, 5), CoalescedBoundary(5, 10))),
+        CoalescedHashShuffleSpec(hashShuffleSpec,
+          Seq(CoalescedBoundary(0, 5), CoalescedBoundary(5, 10))),
+        expected = true
+      )
+
+      checkCompatible(
+        CoalescedHashShuffleSpec(hashShuffleSpec,
+          Seq(CoalescedBoundary(0, 4), CoalescedBoundary(4, 10))),
+        CoalescedHashShuffleSpec(hashShuffleSpec,
+          Seq(CoalescedBoundary(0, 5), CoalescedBoundary(5, 10))),
+        expected = false
+      )
   }
 
   test("compatibility: other specs") {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEShuffleReadExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEShuffleReadExec.scala
index 46ec91dcc0ab..6b39ac70a62e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEShuffleReadExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEShuffleReadExec.scala
@@ -19,10 +19,11 @@ package org.apache.spark.sql.execution.adaptive
 
 import scala.collection.mutable.ArrayBuffer
 
+import org.apache.spark.SparkException
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
-import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, 
Partitioning, RangePartitioning, RoundRobinPartitioning, SinglePartition, 
UnknownPartitioning}
+import org.apache.spark.sql.catalyst.plans.physical.{CoalescedBoundary, 
CoalescedHashPartitioning, HashPartitioning, Partitioning, RangePartitioning, 
RoundRobinPartitioning, SinglePartition, UnknownPartitioning}
 import org.apache.spark.sql.catalyst.trees.CurrentOrigin
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.exchange.{ReusedExchangeExec, 
ShuffleExchangeLike}
@@ -75,7 +76,13 @@ case class AQEShuffleReadExec private(
       // partitions is changed.
       child.outputPartitioning match {
         case h: HashPartitioning =>
-          CurrentOrigin.withOrigin(h.origin)(h.copy(numPartitions = 
partitionSpecs.length))
+          val partitions = partitionSpecs.map {
+            case CoalescedPartitionSpec(start, end, _) => 
CoalescedBoundary(start, end)
+            // Can not happend due to isCoalescedRead
+            case unexpected =>
+              throw SparkException.internalError(s"Unexpected 
ShufflePartitionSpec: $unexpected")
+          }
+          CurrentOrigin.withOrigin(h.origin)(CoalescedHashPartitioning(h, 
partitions))
         case r: RangePartitioning =>
           CurrentOrigin.withOrigin(r.origin)(r.copy(numPartitions = 
partitionSpecs.length))
         // This can only happen for `REBALANCE_PARTITIONS_BY_NONE`, which uses
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index f8f6845afcaa..6a1aa25c6e21 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql
 
 import java.io.{Externalizable, ObjectInput, ObjectOutput}
 import java.sql.{Date, Timestamp}
+import java.util.UUID
 
 import scala.util.Random
 
@@ -2461,6 +2462,34 @@ class DatasetSuite extends QueryTest
     )
     assert(result == expected)
   }
+
+  test("SPARK-45282: Coaleasced shuffle read is not compatible with hash 
partitioning") {
+
+    withSQLConf(
+        SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key  -> "33554432",
+        SQLConf.COALESCE_PARTITIONS_PARALLELISM_FIRST.key -> "false",
+        SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "true") {
+      val data = (1 to 1000000).toDS().map(i => 
UUID.randomUUID().toString).persist()
+
+      val left = data.map(k => (k, 1))
+      val right = data.map(k => (k, k))
+
+      val left1 = left
+        .toDF("key", "value1")
+        .repartition(col("key"))
+        .persist()
+      left1.count()
+
+      val right1 = right
+        .toDF("key", "value2")
+        .repartition(col("key"))
+        .persist()
+
+      val join = left1.join(right1, "key")
+
+      assert(join.count() == 1000000)
+    }
+  }
 }
 
 class DatasetLargeResultCollectingSuite extends QueryTest
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala
index d4a6d2213652..52af05d0eeed 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.{catalyst, AnalysisException, 
DataFrame, Row}
 import org.apache.spark.sql.catalyst.expressions.{ApplyFunctionExpression, 
Cast, Literal}
 import org.apache.spark.sql.catalyst.expressions.objects.Invoke
 import org.apache.spark.sql.catalyst.plans.physical
-import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, 
RangePartitioning, UnknownPartitioning}
+import org.apache.spark.sql.catalyst.plans.physical.{CoalescedBoundary, 
CoalescedHashPartitioning, HashPartitioning, RangePartitioning, 
UnknownPartitioning}
 import org.apache.spark.sql.connector.catalog.Identifier
 import org.apache.spark.sql.connector.catalog.functions._
 import org.apache.spark.sql.connector.distributions.{Distribution, 
Distributions}
@@ -257,11 +257,8 @@ class WriteDistributionAndOrderingSuite extends 
DistributionAndOrderingSuiteBase
       )
     )
     val writePartitioningExprs = Seq(attr("data"), attr("id"))
-    val writePartitioning = if (!coalesce) {
-      clusteredWritePartitioning(writePartitioningExprs, targetNumPartitions)
-    } else {
-      clusteredWritePartitioning(writePartitioningExprs, Some(1))
-    }
+    val writePartitioning = clusteredWritePartitioning(
+      writePartitioningExprs, targetNumPartitions, coalesce)
 
     checkWriteRequirements(
       tableDistribution,
@@ -366,11 +363,8 @@ class WriteDistributionAndOrderingSuite extends 
DistributionAndOrderingSuiteBase
       )
     )
     val writePartitioningExprs = Seq(attr("data"))
-    val writePartitioning = if (!coalesce) {
-      clusteredWritePartitioning(writePartitioningExprs, targetNumPartitions)
-    } else {
-      clusteredWritePartitioning(writePartitioningExprs, Some(1))
-    }
+    val writePartitioning = clusteredWritePartitioning(
+      writePartitioningExprs, targetNumPartitions, coalesce)
 
     checkWriteRequirements(
       tableDistribution,
@@ -848,11 +842,8 @@ class WriteDistributionAndOrderingSuite extends 
DistributionAndOrderingSuiteBase
       )
     )
     val writePartitioningExprs = Seq(attr("data"))
-    val writePartitioning = if (!coalesce) {
-      clusteredWritePartitioning(writePartitioningExprs, targetNumPartitions)
-    } else {
-      clusteredWritePartitioning(writePartitioningExprs, Some(1))
-    }
+    val writePartitioning = clusteredWritePartitioning(
+      writePartitioningExprs, targetNumPartitions, coalesce)
 
     checkWriteRequirements(
       tableDistribution,
@@ -932,11 +923,8 @@ class WriteDistributionAndOrderingSuite extends 
DistributionAndOrderingSuiteBase
       )
     )
     val writePartitioningExprs = Seq(attr("data"))
-    val writePartitioning = if (!coalesce) {
-      clusteredWritePartitioning(writePartitioningExprs, targetNumPartitions)
-    } else {
-      clusteredWritePartitioning(writePartitioningExprs, Some(1))
-    }
+    val writePartitioning = clusteredWritePartitioning(
+      writePartitioningExprs, targetNumPartitions, coalesce)
 
     checkWriteRequirements(
       tableDistribution,
@@ -1119,11 +1107,8 @@ class WriteDistributionAndOrderingSuite extends 
DistributionAndOrderingSuiteBase
     )
 
     val writePartitioningExprs = Seq(truncateExpr)
-    val writePartitioning = if (!coalesce) {
-      clusteredWritePartitioning(writePartitioningExprs, targetNumPartitions)
-    } else {
-      clusteredWritePartitioning(writePartitioningExprs, Some(1))
-    }
+    val writePartitioning = clusteredWritePartitioning(
+      writePartitioningExprs, targetNumPartitions, coalesce)
 
     checkWriteRequirements(
       tableDistribution,
@@ -1365,6 +1350,9 @@ class WriteDistributionAndOrderingSuite extends 
DistributionAndOrderingSuiteBase
       case p: physical.HashPartitioning =>
         val resolvedExprs = p.expressions.map(resolveAttrs(_, plan))
         p.copy(expressions = resolvedExprs)
+      case c: physical.CoalescedHashPartitioning =>
+        val resolvedExprs = c.from.expressions.map(resolveAttrs(_, plan))
+        c.copy(from = c.from.copy(expressions = resolvedExprs))
       case _: UnknownPartitioning =>
         // don't check partitioning if no particular one is expected
         actualPartitioning
@@ -1423,8 +1411,15 @@ class WriteDistributionAndOrderingSuite extends 
DistributionAndOrderingSuiteBase
 
   private def clusteredWritePartitioning(
       writePartitioningExprs: Seq[catalyst.expressions.Expression],
-      targetNumPartitions: Option[Int]): physical.Partitioning = {
-    HashPartitioning(writePartitioningExprs,
-      targetNumPartitions.getOrElse(conf.numShufflePartitions))
+      targetNumPartitions: Option[Int],
+      coalesce: Boolean): physical.Partitioning = {
+    val partitioning = HashPartitioning(writePartitioningExprs,
+        targetNumPartitions.getOrElse(conf.numShufflePartitions))
+    if (coalesce)  {
+      CoalescedHashPartitioning(
+        partitioning, Seq(CoalescedBoundary(0, partitioning.numPartitions)))
+    } else {
+      partitioning
+    }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org


Reply via email to