This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push: new eace7d370213 [SPARK-45592][SPARK-45282][SQL][3.4] Correctness issue in AQE with InMemoryTableScanExec eace7d370213 is described below commit eace7d370213f8498107cb14cd85f854a4d4d1df Author: Emil Ejbyfeldt <eejbyfe...@liveintent.com> AuthorDate: Sun Nov 12 13:54:01 2023 -0800 [SPARK-45592][SPARK-45282][SQL][3.4] Correctness issue in AQE with InMemoryTableScanExec ### What changes were proposed in this pull request? This backports https://github.com/apache/spark/pull/43435 SPARK-45592 to the 3.4 branch. This is because it was already reported there as SPARK-45282 but it required enabling some extra configuration to hit the bug. ### Why are the changes needed? Fix correctness issue. ### Does this PR introduce _any_ user-facing change? Yes, fixing correctness issue. ### How was this patch tested? New tests based on the reproduction example in SPARK-45282 ### Was this patch authored or co-authored using generative AI tooling? No Closes #43729 from eejbyfeldt/SPARK-45282. Authored-by: Emil Ejbyfeldt <eejbyfe...@liveintent.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../sql/catalyst/plans/physical/partitioning.scala | 49 +++ .../spark/sql/catalyst/DistributionSuite.scala | 124 ++++--- .../spark/sql/catalyst/ShuffleSpecSuite.scala | 401 ++++++++++++--------- .../execution/adaptive/AQEShuffleReadExec.scala | 11 +- .../scala/org/apache/spark/sql/DatasetSuite.scala | 29 ++ .../WriteDistributionAndOrderingSuite.scala | 53 ++- 6 files changed, 401 insertions(+), 266 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala index d2f9e9b5d5bf..1eefe65859bd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala @@ -306,6 +306,35 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int) override protected def withNewChildrenInternal( newChildren: IndexedSeq[Expression]): HashPartitioning = copy(expressions = newChildren) + +} + +case class CoalescedBoundary(startReducerIndex: Int, endReducerIndex: Int) + +/** + * Represents a partitioning where partitions have been coalesced from a HashPartitioning into a + * fewer number of partitions. + */ +case class CoalescedHashPartitioning(from: HashPartitioning, partitions: Seq[CoalescedBoundary]) + extends Expression with Partitioning with Unevaluable { + + override def children: Seq[Expression] = from.expressions + override def nullable: Boolean = from.nullable + override def dataType: DataType = from.dataType + + override def satisfies0(required: Distribution): Boolean = from.satisfies0(required) + + override def createShuffleSpec(distribution: ClusteredDistribution): ShuffleSpec = + CoalescedHashShuffleSpec(from.createShuffleSpec(distribution), partitions) + + override protected def withNewChildrenInternal( + newChildren: IndexedSeq[Expression]): CoalescedHashPartitioning = + copy(from = from.copy(expressions = newChildren)) + + override val numPartitions: Int = partitions.length + + override def toString: String = from.toString + override def sql: String = from.sql } /** @@ -661,6 +690,26 @@ case class HashShuffleSpec( override def numPartitions: Int = partitioning.numPartitions } +case class CoalescedHashShuffleSpec( + from: ShuffleSpec, + partitions: Seq[CoalescedBoundary]) extends ShuffleSpec { + + override def isCompatibleWith(other: ShuffleSpec): Boolean = other match { + case SinglePartitionShuffleSpec => + numPartitions == 1 + case CoalescedHashShuffleSpec(otherParent, otherPartitions) => + partitions == otherPartitions && from.isCompatibleWith(otherParent) + case ShuffleSpecCollection(specs) => + specs.exists(isCompatibleWith) + case _ => + false + } + + override def canCreatePartitioning: Boolean = false + + override def numPartitions: Int = partitions.length +} + case class KeyGroupedShuffleSpec( partitioning: KeyGroupedPartitioning, distribution: ClusteredDistribution) extends ShuffleSpec { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/DistributionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/DistributionSuite.scala index a924a9ed02e5..7cb4d5f12325 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/DistributionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/DistributionSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst import org.apache.spark.SparkFunSuite /* Implicit conversions */ import org.apache.spark.sql.catalyst.dsl.expressions._ -import org.apache.spark.sql.catalyst.expressions.{Literal, Murmur3Hash, Pmod} +import org.apache.spark.sql.catalyst.expressions.{Expression, Literal, Murmur3Hash, Pmod} import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.types.IntegerType @@ -146,63 +146,75 @@ class DistributionSuite extends SparkFunSuite { false) } - test("HashPartitioning is the output partitioning") { - // HashPartitioning can satisfy ClusteredDistribution iff its hash expressions are a subset of - // the required clustering expressions. - checkSatisfied( - HashPartitioning(Seq($"a", $"b", $"c"), 10), - ClusteredDistribution(Seq($"a", $"b", $"c")), - true) - - checkSatisfied( - HashPartitioning(Seq($"b", $"c"), 10), - ClusteredDistribution(Seq($"a", $"b", $"c")), - true) - - checkSatisfied( - HashPartitioning(Seq($"a", $"b", $"c"), 10), - ClusteredDistribution(Seq($"b", $"c")), - false) - - checkSatisfied( - HashPartitioning(Seq($"a", $"b", $"c"), 10), - ClusteredDistribution(Seq($"d", $"e")), - false) - - // When ClusteredDistribution.requireAllClusterKeys is set to true, - // HashPartitioning can only satisfy ClusteredDistribution iff its hash expressions are - // exactly same as the required clustering expressions. - checkSatisfied( - HashPartitioning(Seq($"a", $"b", $"c"), 10), - ClusteredDistribution(Seq($"a", $"b", $"c"), requireAllClusterKeys = true), - true) - - checkSatisfied( - HashPartitioning(Seq($"b", $"c"), 10), - ClusteredDistribution(Seq($"a", $"b", $"c"), requireAllClusterKeys = true), - false) - - checkSatisfied( - HashPartitioning(Seq($"b", $"a", $"c"), 10), - ClusteredDistribution(Seq($"a", $"b", $"c"), requireAllClusterKeys = true), - false) - - // HashPartitioning cannot satisfy OrderedDistribution - checkSatisfied( - HashPartitioning(Seq($"a", $"b", $"c"), 10), - OrderedDistribution(Seq($"a".asc, $"b".asc, $"c".asc)), - false) + private def testHashPartitioningLike( + partitioningName: String, + create: (Seq[Expression], Int) => Partitioning): Unit = { + + test(s"$partitioningName is the output partitioning") { + // HashPartitioning can satisfy ClusteredDistribution iff its hash expressions are a subset of + // the required clustering expressions. + checkSatisfied( + create(Seq($"a", $"b", $"c"), 10), + ClusteredDistribution(Seq($"a", $"b", $"c")), + true) + + checkSatisfied( + create(Seq($"b", $"c"), 10), + ClusteredDistribution(Seq($"a", $"b", $"c")), + true) + + checkSatisfied( + create(Seq($"a", $"b", $"c"), 10), + ClusteredDistribution(Seq($"b", $"c")), + false) + + checkSatisfied( + create(Seq($"a", $"b", $"c"), 10), + ClusteredDistribution(Seq($"d", $"e")), + false) + + // When ClusteredDistribution.requireAllClusterKeys is set to true, + // HashPartitioning can only satisfy ClusteredDistribution iff its hash expressions are + // exactly same as the required clustering expressions. + checkSatisfied( + create(Seq($"a", $"b", $"c"), 10), + ClusteredDistribution(Seq($"a", $"b", $"c"), requireAllClusterKeys = true), + true) + + checkSatisfied( + create(Seq($"b", $"c"), 10), + ClusteredDistribution(Seq($"a", $"b", $"c"), requireAllClusterKeys = true), + false) + + checkSatisfied( + create(Seq($"b", $"a", $"c"), 10), + ClusteredDistribution(Seq($"a", $"b", $"c"), requireAllClusterKeys = true), + false) + + // HashPartitioning cannot satisfy OrderedDistribution + checkSatisfied( + create(Seq($"a", $"b", $"c"), 10), + OrderedDistribution(Seq($"a".asc, $"b".asc, $"c".asc)), + false) + + checkSatisfied( + create(Seq($"a", $"b", $"c"), 1), + OrderedDistribution(Seq($"a".asc, $"b".asc, $"c".asc)), + false) // TODO: this can be relaxed. + + checkSatisfied( + create(Seq($"b", $"c"), 10), + OrderedDistribution(Seq($"a".asc, $"b".asc, $"c".asc)), + false) + } + } - checkSatisfied( - HashPartitioning(Seq($"a", $"b", $"c"), 1), - OrderedDistribution(Seq($"a".asc, $"b".asc, $"c".asc)), - false) // TODO: this can be relaxed. + testHashPartitioningLike("HashPartitioning", + (expressions, numPartitions) => HashPartitioning(expressions, numPartitions)) - checkSatisfied( - HashPartitioning(Seq($"b", $"c"), 10), - OrderedDistribution(Seq($"a".asc, $"b".asc, $"c".asc)), - false) - } + testHashPartitioningLike("CoalescedHashPartitioning", (expressions, numPartitions) => + CoalescedHashPartitioning( + HashPartitioning(expressions, numPartitions), Seq(CoalescedBoundary(0, numPartitions)))) test("RangePartitioning is the output partitioning") { // RangePartitioning can satisfy OrderedDistribution iff its ordering is a prefix diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ShuffleSpecSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ShuffleSpecSuite.scala index 51e768873226..6b069d1c9736 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ShuffleSpecSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ShuffleSpecSuite.scala @@ -62,211 +62,254 @@ class ShuffleSpecSuite extends SparkFunSuite with SQLHelper { } } - test("compatibility: HashShuffleSpec on both sides") { - checkCompatible( - HashShuffleSpec(HashPartitioning(Seq($"a", $"b"), 10), - ClusteredDistribution(Seq($"a", $"b"))), - HashShuffleSpec(HashPartitioning(Seq($"a", $"b"), 10), - ClusteredDistribution(Seq($"a", $"b"))), - expected = true - ) - - checkCompatible( - HashShuffleSpec(HashPartitioning(Seq($"a"), 10), ClusteredDistribution(Seq($"a", $"b"))), - HashShuffleSpec(HashPartitioning(Seq($"a"), 10), ClusteredDistribution(Seq($"a", $"b"))), - expected = true - ) + private def testHashShuffleSpecLike( + shuffleSpecName: String, + create: (HashPartitioning, ClusteredDistribution) => ShuffleSpec): Unit = { - checkCompatible( - HashShuffleSpec(HashPartitioning(Seq($"b"), 10), ClusteredDistribution(Seq($"a", $"b"))), - HashShuffleSpec(HashPartitioning(Seq($"d"), 10), ClusteredDistribution(Seq($"c", $"d"))), - expected = true - ) + test(s"compatibility: $shuffleSpecName on both sides") { + checkCompatible( + create(HashPartitioning(Seq($"a", $"b"), 10), + ClusteredDistribution(Seq($"a", $"b"))), + create(HashPartitioning(Seq($"a", $"b"), 10), + ClusteredDistribution(Seq($"a", $"b"))), + expected = true + ) - checkCompatible( - HashShuffleSpec(HashPartitioning(Seq($"a", $"a", $"b"), 10), - ClusteredDistribution(Seq($"a", $"b"))), - HashShuffleSpec(HashPartitioning(Seq($"c", $"c", $"d"), 10), - ClusteredDistribution(Seq($"c", $"d"))), - expected = true - ) + checkCompatible( + create(HashPartitioning(Seq($"a"), 10), ClusteredDistribution(Seq($"a", $"b"))), + create(HashPartitioning(Seq($"a"), 10), ClusteredDistribution(Seq($"a", $"b"))), + expected = true + ) - checkCompatible( - HashShuffleSpec(HashPartitioning(Seq($"a", $"b"), 10), - ClusteredDistribution(Seq($"a", $"b", $"b"))), - HashShuffleSpec(HashPartitioning(Seq($"a", $"d"), 10), - ClusteredDistribution(Seq($"a", $"c", $"d"))), - expected = true - ) + checkCompatible( + create(HashPartitioning(Seq($"b"), 10), ClusteredDistribution(Seq($"a", $"b"))), + create(HashPartitioning(Seq($"d"), 10), ClusteredDistribution(Seq($"c", $"d"))), + expected = true + ) - checkCompatible( - HashShuffleSpec(HashPartitioning(Seq($"a", $"b", $"a"), 10), - ClusteredDistribution(Seq($"a", $"b", $"b"))), - HashShuffleSpec(HashPartitioning(Seq($"a", $"c", $"a"), 10), - ClusteredDistribution(Seq($"a", $"c", $"c"))), - expected = true - ) + checkCompatible( + create(HashPartitioning(Seq($"a", $"a", $"b"), 10), + ClusteredDistribution(Seq($"a", $"b"))), + create(HashPartitioning(Seq($"c", $"c", $"d"), 10), + ClusteredDistribution(Seq($"c", $"d"))), + expected = true + ) - checkCompatible( - HashShuffleSpec(HashPartitioning(Seq($"a", $"b", $"a"), 10), - ClusteredDistribution(Seq($"a", $"b", $"b"))), - HashShuffleSpec(HashPartitioning(Seq($"a", $"c", $"a"), 10), - ClusteredDistribution(Seq($"a", $"c", $"d"))), - expected = true - ) + checkCompatible( + create(HashPartitioning(Seq($"a", $"b"), 10), + ClusteredDistribution(Seq($"a", $"b", $"b"))), + create(HashPartitioning(Seq($"a", $"d"), 10), + ClusteredDistribution(Seq($"a", $"c", $"d"))), + expected = true + ) - // negative cases - checkCompatible( - HashShuffleSpec(HashPartitioning(Seq($"a"), 10), - ClusteredDistribution(Seq($"a", $"b"))), - HashShuffleSpec(HashPartitioning(Seq($"c"), 5), - ClusteredDistribution(Seq($"c", $"d"))), - expected = false - ) + checkCompatible( + create(HashPartitioning(Seq($"a", $"b", $"a"), 10), + ClusteredDistribution(Seq($"a", $"b", $"b"))), + create(HashPartitioning(Seq($"a", $"c", $"a"), 10), + ClusteredDistribution(Seq($"a", $"c", $"c"))), + expected = true + ) - checkCompatible( - HashShuffleSpec(HashPartitioning(Seq($"a", $"b"), 10), - ClusteredDistribution(Seq($"a", $"b"))), - HashShuffleSpec(HashPartitioning(Seq($"b"), 10), - ClusteredDistribution(Seq($"a", $"b"))), - expected = false - ) + checkCompatible( + create(HashPartitioning(Seq($"a", $"b", $"a"), 10), + ClusteredDistribution(Seq($"a", $"b", $"b"))), + create(HashPartitioning(Seq($"a", $"c", $"a"), 10), + ClusteredDistribution(Seq($"a", $"c", $"d"))), + expected = true + ) - checkCompatible( - HashShuffleSpec(HashPartitioning(Seq($"a"), 10), - ClusteredDistribution(Seq($"a", $"b"))), - HashShuffleSpec(HashPartitioning(Seq($"b"), 10), - ClusteredDistribution(Seq($"a", $"b"))), - expected = false - ) + // negative cases + checkCompatible( + create(HashPartitioning(Seq($"a"), 10), + ClusteredDistribution(Seq($"a", $"b"))), + create(HashPartitioning(Seq($"c"), 5), + ClusteredDistribution(Seq($"c", $"d"))), + expected = false + ) - checkCompatible( - HashShuffleSpec(HashPartitioning(Seq($"a"), 10), - ClusteredDistribution(Seq($"a", $"b"))), - HashShuffleSpec(HashPartitioning(Seq($"d"), 10), - ClusteredDistribution(Seq($"c", $"d"))), - expected = false - ) + checkCompatible( + create(HashPartitioning(Seq($"a", $"b"), 10), + ClusteredDistribution(Seq($"a", $"b"))), + create(HashPartitioning(Seq($"b"), 10), + ClusteredDistribution(Seq($"a", $"b"))), + expected = false + ) - checkCompatible( - HashShuffleSpec(HashPartitioning(Seq($"a"), 10), - ClusteredDistribution(Seq($"a", $"b"))), - HashShuffleSpec(HashPartitioning(Seq($"d"), 10), - ClusteredDistribution(Seq($"c", $"d"))), - expected = false - ) + checkCompatible( + create(HashPartitioning(Seq($"a"), 10), + ClusteredDistribution(Seq($"a", $"b"))), + create(HashPartitioning(Seq($"b"), 10), + ClusteredDistribution(Seq($"a", $"b"))), + expected = false + ) - checkCompatible( - HashShuffleSpec(HashPartitioning(Seq($"a", $"a", $"b"), 10), - ClusteredDistribution(Seq($"a", $"b"))), - HashShuffleSpec(HashPartitioning(Seq($"a", $"b", $"a"), 10), - ClusteredDistribution(Seq($"a", $"b"))), - expected = false - ) + checkCompatible( + create(HashPartitioning(Seq($"a"), 10), + ClusteredDistribution(Seq($"a", $"b"))), + create(HashPartitioning(Seq($"d"), 10), + ClusteredDistribution(Seq($"c", $"d"))), + expected = false + ) - checkCompatible( - HashShuffleSpec(HashPartitioning(Seq($"a", $"a", $"b"), 10), - ClusteredDistribution(Seq($"a", $"b", $"b"))), - HashShuffleSpec(HashPartitioning(Seq($"a", $"b", $"a"), 10), - ClusteredDistribution(Seq($"a", $"b", $"b"))), - expected = false - ) - } + checkCompatible( + create(HashPartitioning(Seq($"a"), 10), + ClusteredDistribution(Seq($"a", $"b"))), + create(HashPartitioning(Seq($"d"), 10), + ClusteredDistribution(Seq($"c", $"d"))), + expected = false + ) - test("compatibility: Only one side is HashShuffleSpec") { - checkCompatible( - HashShuffleSpec(HashPartitioning(Seq($"a", $"b"), 10), - ClusteredDistribution(Seq($"a", $"b"))), - SinglePartitionShuffleSpec, - expected = false - ) + checkCompatible( + create(HashPartitioning(Seq($"a", $"a", $"b"), 10), + ClusteredDistribution(Seq($"a", $"b"))), + create(HashPartitioning(Seq($"a", $"b", $"a"), 10), + ClusteredDistribution(Seq($"a", $"b"))), + expected = false + ) - checkCompatible( - HashShuffleSpec(HashPartitioning(Seq($"a", $"b"), 1), - ClusteredDistribution(Seq($"a", $"b"))), - SinglePartitionShuffleSpec, - expected = true - ) + checkCompatible( + create(HashPartitioning(Seq($"a", $"a", $"b"), 10), + ClusteredDistribution(Seq($"a", $"b", $"b"))), + create(HashPartitioning(Seq($"a", $"b", $"a"), 10), + ClusteredDistribution(Seq($"a", $"b", $"b"))), + expected = false + ) + } - checkCompatible( - SinglePartitionShuffleSpec, - HashShuffleSpec(HashPartitioning(Seq($"a", $"b"), 1), - ClusteredDistribution(Seq($"a", $"b"))), - expected = true - ) + test(s"compatibility: Only one side is $shuffleSpecName") { + checkCompatible( + create(HashPartitioning(Seq($"a", $"b"), 10), + ClusteredDistribution(Seq($"a", $"b"))), + SinglePartitionShuffleSpec, + expected = false + ) - checkCompatible( - HashShuffleSpec(HashPartitioning(Seq($"a", $"b"), 10), - ClusteredDistribution(Seq($"a", $"b"))), - RangeShuffleSpec(10, ClusteredDistribution(Seq($"a", $"b"))), - expected = false - ) + checkCompatible( + create(HashPartitioning(Seq($"a", $"b"), 1), + ClusteredDistribution(Seq($"a", $"b"))), + SinglePartitionShuffleSpec, + expected = true + ) - checkCompatible( - RangeShuffleSpec(10, ClusteredDistribution(Seq($"a", $"b"))), - HashShuffleSpec(HashPartitioning(Seq($"a", $"b"), 10), - ClusteredDistribution(Seq($"a", $"b"))), - expected = false - ) + checkCompatible( + SinglePartitionShuffleSpec, + create(HashPartitioning(Seq($"a", $"b"), 1), + ClusteredDistribution(Seq($"a", $"b"))), + expected = true + ) - checkCompatible( - HashShuffleSpec(HashPartitioning(Seq($"a", $"b"), 10), - ClusteredDistribution(Seq($"a", $"b"))), - ShuffleSpecCollection(Seq( - HashShuffleSpec(HashPartitioning(Seq($"a", $"b"), 10), - ClusteredDistribution(Seq($"a", $"b"))))), - expected = true - ) + checkCompatible( + create(HashPartitioning(Seq($"a", $"b"), 10), + ClusteredDistribution(Seq($"a", $"b"))), + RangeShuffleSpec(10, ClusteredDistribution(Seq($"a", $"b"))), + expected = false + ) - checkCompatible( - HashShuffleSpec(HashPartitioning(Seq($"a", $"b"), 10), - ClusteredDistribution(Seq($"a", $"b"))), - ShuffleSpecCollection(Seq( - HashShuffleSpec(HashPartitioning(Seq($"a"), 10), + checkCompatible( + RangeShuffleSpec(10, ClusteredDistribution(Seq($"a", $"b"))), + create(HashPartitioning(Seq($"a", $"b"), 10), ClusteredDistribution(Seq($"a", $"b"))), - HashShuffleSpec(HashPartitioning(Seq($"a", $"b"), 10), - ClusteredDistribution(Seq($"a", $"b"))))), - expected = true - ) + expected = false + ) - checkCompatible( - HashShuffleSpec(HashPartitioning(Seq($"a", $"b"), 10), - ClusteredDistribution(Seq($"a", $"b"))), - ShuffleSpecCollection(Seq( - HashShuffleSpec(HashPartitioning(Seq($"a"), 10), + checkCompatible( + create(HashPartitioning(Seq($"a", $"b"), 10), ClusteredDistribution(Seq($"a", $"b"))), - HashShuffleSpec(HashPartitioning(Seq($"a", $"b", $"c"), 10), - ClusteredDistribution(Seq($"a", $"b", $"c"))))), - expected = false - ) + ShuffleSpecCollection(Seq( + create(HashPartitioning(Seq($"a", $"b"), 10), + ClusteredDistribution(Seq($"a", $"b"))))), + expected = true + ) - checkCompatible( - ShuffleSpecCollection(Seq( - HashShuffleSpec(HashPartitioning(Seq($"b"), 10), + checkCompatible( + create(HashPartitioning(Seq($"a", $"b"), 10), ClusteredDistribution(Seq($"a", $"b"))), - HashShuffleSpec(HashPartitioning(Seq($"a", $"b"), 10), - ClusteredDistribution(Seq($"a", $"b"))))), - ShuffleSpecCollection(Seq( - HashShuffleSpec(HashPartitioning(Seq($"a", $"b", $"c"), 10), - ClusteredDistribution(Seq($"a", $"b", $"c"))), - HashShuffleSpec(HashPartitioning(Seq($"d"), 10), - ClusteredDistribution(Seq($"c", $"d"))))), - expected = true - ) + ShuffleSpecCollection(Seq( + create(HashPartitioning(Seq($"a"), 10), + ClusteredDistribution(Seq($"a", $"b"))), + create(HashPartitioning(Seq($"a", $"b"), 10), + ClusteredDistribution(Seq($"a", $"b"))))), + expected = true + ) - checkCompatible( - ShuffleSpecCollection(Seq( - HashShuffleSpec(HashPartitioning(Seq($"b"), 10), + checkCompatible( + create(HashPartitioning(Seq($"a", $"b"), 10), ClusteredDistribution(Seq($"a", $"b"))), - HashShuffleSpec(HashPartitioning(Seq($"a", $"b"), 10), - ClusteredDistribution(Seq($"a", $"b"))))), - ShuffleSpecCollection(Seq( - HashShuffleSpec(HashPartitioning(Seq($"a", $"b", $"c"), 10), - ClusteredDistribution(Seq($"a", $"b", $"c"))), - HashShuffleSpec(HashPartitioning(Seq($"c"), 10), - ClusteredDistribution(Seq($"c", $"d"))))), - expected = false - ) + ShuffleSpecCollection(Seq( + create(HashPartitioning(Seq($"a"), 10), + ClusteredDistribution(Seq($"a", $"b"))), + create(HashPartitioning(Seq($"a", $"b", $"c"), 10), + ClusteredDistribution(Seq($"a", $"b", $"c"))))), + expected = false + ) + + checkCompatible( + ShuffleSpecCollection(Seq( + create(HashPartitioning(Seq($"b"), 10), + ClusteredDistribution(Seq($"a", $"b"))), + create(HashPartitioning(Seq($"a", $"b"), 10), + ClusteredDistribution(Seq($"a", $"b"))))), + ShuffleSpecCollection(Seq( + create(HashPartitioning(Seq($"a", $"b", $"c"), 10), + ClusteredDistribution(Seq($"a", $"b", $"c"))), + create(HashPartitioning(Seq($"d"), 10), + ClusteredDistribution(Seq($"c", $"d"))))), + expected = true + ) + + checkCompatible( + ShuffleSpecCollection(Seq( + create(HashPartitioning(Seq($"b"), 10), + ClusteredDistribution(Seq($"a", $"b"))), + create(HashPartitioning(Seq($"a", $"b"), 10), + ClusteredDistribution(Seq($"a", $"b"))))), + ShuffleSpecCollection(Seq( + create(HashPartitioning(Seq($"a", $"b", $"c"), 10), + ClusteredDistribution(Seq($"a", $"b", $"c"))), + create(HashPartitioning(Seq($"c"), 10), + ClusteredDistribution(Seq($"c", $"d"))))), + expected = false + ) + } + } + + testHashShuffleSpecLike("HashShuffleSpec", + (partitioning, distribution) => HashShuffleSpec(partitioning, distribution)) + testHashShuffleSpecLike("CoalescedHashShuffleSpec", + (partitioning, distribution) => { + val partitions = if (partitioning.numPartitions == 1) { + Seq(CoalescedBoundary(0, 1)) + } else { + Seq(CoalescedBoundary(0, 1), CoalescedBoundary(0, partitioning.numPartitions)) + } + CoalescedHashShuffleSpec(HashShuffleSpec(partitioning, distribution), partitions) + }) + + test("compatibility: CoalescedHashShuffleSpec other specs") { + val hashShuffleSpec = HashShuffleSpec( + HashPartitioning(Seq($"a", $"b"), 10), ClusteredDistribution(Seq($"a", $"b"))) + checkCompatible( + hashShuffleSpec, + CoalescedHashShuffleSpec(hashShuffleSpec, Seq(CoalescedBoundary(0, 10))), + expected = false + ) + + checkCompatible( + CoalescedHashShuffleSpec(hashShuffleSpec, + Seq(CoalescedBoundary(0, 5), CoalescedBoundary(5, 10))), + CoalescedHashShuffleSpec(hashShuffleSpec, + Seq(CoalescedBoundary(0, 5), CoalescedBoundary(5, 10))), + expected = true + ) + + checkCompatible( + CoalescedHashShuffleSpec(hashShuffleSpec, + Seq(CoalescedBoundary(0, 4), CoalescedBoundary(4, 10))), + CoalescedHashShuffleSpec(hashShuffleSpec, + Seq(CoalescedBoundary(0, 5), CoalescedBoundary(5, 10))), + expected = false + ) } test("compatibility: other specs") { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEShuffleReadExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEShuffleReadExec.scala index 46ec91dcc0ab..6b39ac70a62e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEShuffleReadExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEShuffleReadExec.scala @@ -19,10 +19,11 @@ package org.apache.spark.sql.execution.adaptive import scala.collection.mutable.ArrayBuffer +import org.apache.spark.SparkException import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} -import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, RangePartitioning, RoundRobinPartitioning, SinglePartition, UnknownPartitioning} +import org.apache.spark.sql.catalyst.plans.physical.{CoalescedBoundary, CoalescedHashPartitioning, HashPartitioning, Partitioning, RangePartitioning, RoundRobinPartitioning, SinglePartition, UnknownPartitioning} import org.apache.spark.sql.catalyst.trees.CurrentOrigin import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.exchange.{ReusedExchangeExec, ShuffleExchangeLike} @@ -75,7 +76,13 @@ case class AQEShuffleReadExec private( // partitions is changed. child.outputPartitioning match { case h: HashPartitioning => - CurrentOrigin.withOrigin(h.origin)(h.copy(numPartitions = partitionSpecs.length)) + val partitions = partitionSpecs.map { + case CoalescedPartitionSpec(start, end, _) => CoalescedBoundary(start, end) + // Can not happend due to isCoalescedRead + case unexpected => + throw SparkException.internalError(s"Unexpected ShufflePartitionSpec: $unexpected") + } + CurrentOrigin.withOrigin(h.origin)(CoalescedHashPartitioning(h, partitions)) case r: RangePartitioning => CurrentOrigin.withOrigin(r.origin)(r.copy(numPartitions = partitionSpecs.length)) // This can only happen for `REBALANCE_PARTITIONS_BY_NONE`, which uses diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index f8f6845afcaa..6a1aa25c6e21 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql import java.io.{Externalizable, ObjectInput, ObjectOutput} import java.sql.{Date, Timestamp} +import java.util.UUID import scala.util.Random @@ -2461,6 +2462,34 @@ class DatasetSuite extends QueryTest ) assert(result == expected) } + + test("SPARK-45282: Coaleasced shuffle read is not compatible with hash partitioning") { + + withSQLConf( + SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "33554432", + SQLConf.COALESCE_PARTITIONS_PARALLELISM_FIRST.key -> "false", + SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "true") { + val data = (1 to 1000000).toDS().map(i => UUID.randomUUID().toString).persist() + + val left = data.map(k => (k, 1)) + val right = data.map(k => (k, k)) + + val left1 = left + .toDF("key", "value1") + .repartition(col("key")) + .persist() + left1.count() + + val right1 = right + .toDF("key", "value2") + .repartition(col("key")) + .persist() + + val join = left1.join(right1, "key") + + assert(join.count() == 1000000) + } + } } class DatasetLargeResultCollectingSuite extends QueryTest diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala index d4a6d2213652..52af05d0eeed 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.{catalyst, AnalysisException, DataFrame, Row} import org.apache.spark.sql.catalyst.expressions.{ApplyFunctionExpression, Cast, Literal} import org.apache.spark.sql.catalyst.expressions.objects.Invoke import org.apache.spark.sql.catalyst.plans.physical -import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, RangePartitioning, UnknownPartitioning} +import org.apache.spark.sql.catalyst.plans.physical.{CoalescedBoundary, CoalescedHashPartitioning, HashPartitioning, RangePartitioning, UnknownPartitioning} import org.apache.spark.sql.connector.catalog.Identifier import org.apache.spark.sql.connector.catalog.functions._ import org.apache.spark.sql.connector.distributions.{Distribution, Distributions} @@ -257,11 +257,8 @@ class WriteDistributionAndOrderingSuite extends DistributionAndOrderingSuiteBase ) ) val writePartitioningExprs = Seq(attr("data"), attr("id")) - val writePartitioning = if (!coalesce) { - clusteredWritePartitioning(writePartitioningExprs, targetNumPartitions) - } else { - clusteredWritePartitioning(writePartitioningExprs, Some(1)) - } + val writePartitioning = clusteredWritePartitioning( + writePartitioningExprs, targetNumPartitions, coalesce) checkWriteRequirements( tableDistribution, @@ -366,11 +363,8 @@ class WriteDistributionAndOrderingSuite extends DistributionAndOrderingSuiteBase ) ) val writePartitioningExprs = Seq(attr("data")) - val writePartitioning = if (!coalesce) { - clusteredWritePartitioning(writePartitioningExprs, targetNumPartitions) - } else { - clusteredWritePartitioning(writePartitioningExprs, Some(1)) - } + val writePartitioning = clusteredWritePartitioning( + writePartitioningExprs, targetNumPartitions, coalesce) checkWriteRequirements( tableDistribution, @@ -848,11 +842,8 @@ class WriteDistributionAndOrderingSuite extends DistributionAndOrderingSuiteBase ) ) val writePartitioningExprs = Seq(attr("data")) - val writePartitioning = if (!coalesce) { - clusteredWritePartitioning(writePartitioningExprs, targetNumPartitions) - } else { - clusteredWritePartitioning(writePartitioningExprs, Some(1)) - } + val writePartitioning = clusteredWritePartitioning( + writePartitioningExprs, targetNumPartitions, coalesce) checkWriteRequirements( tableDistribution, @@ -932,11 +923,8 @@ class WriteDistributionAndOrderingSuite extends DistributionAndOrderingSuiteBase ) ) val writePartitioningExprs = Seq(attr("data")) - val writePartitioning = if (!coalesce) { - clusteredWritePartitioning(writePartitioningExprs, targetNumPartitions) - } else { - clusteredWritePartitioning(writePartitioningExprs, Some(1)) - } + val writePartitioning = clusteredWritePartitioning( + writePartitioningExprs, targetNumPartitions, coalesce) checkWriteRequirements( tableDistribution, @@ -1119,11 +1107,8 @@ class WriteDistributionAndOrderingSuite extends DistributionAndOrderingSuiteBase ) val writePartitioningExprs = Seq(truncateExpr) - val writePartitioning = if (!coalesce) { - clusteredWritePartitioning(writePartitioningExprs, targetNumPartitions) - } else { - clusteredWritePartitioning(writePartitioningExprs, Some(1)) - } + val writePartitioning = clusteredWritePartitioning( + writePartitioningExprs, targetNumPartitions, coalesce) checkWriteRequirements( tableDistribution, @@ -1365,6 +1350,9 @@ class WriteDistributionAndOrderingSuite extends DistributionAndOrderingSuiteBase case p: physical.HashPartitioning => val resolvedExprs = p.expressions.map(resolveAttrs(_, plan)) p.copy(expressions = resolvedExprs) + case c: physical.CoalescedHashPartitioning => + val resolvedExprs = c.from.expressions.map(resolveAttrs(_, plan)) + c.copy(from = c.from.copy(expressions = resolvedExprs)) case _: UnknownPartitioning => // don't check partitioning if no particular one is expected actualPartitioning @@ -1423,8 +1411,15 @@ class WriteDistributionAndOrderingSuite extends DistributionAndOrderingSuiteBase private def clusteredWritePartitioning( writePartitioningExprs: Seq[catalyst.expressions.Expression], - targetNumPartitions: Option[Int]): physical.Partitioning = { - HashPartitioning(writePartitioningExprs, - targetNumPartitions.getOrElse(conf.numShufflePartitions)) + targetNumPartitions: Option[Int], + coalesce: Boolean): physical.Partitioning = { + val partitioning = HashPartitioning(writePartitioningExprs, + targetNumPartitions.getOrElse(conf.numShufflePartitions)) + if (coalesce) { + CoalescedHashPartitioning( + partitioning, Seq(CoalescedBoundary(0, partitioning.numPartitions))) + } else { + partitioning + } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org