This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push: new 6403a84b6854 [SPARK-46590][SQL] Fix coalesce failed with unexpected partition indeces 6403a84b6854 is described below commit 6403a84b6854214a4ed7d5c0c800e877e0748964 Author: jackylee-ch <lijunq...@baidu.com> AuthorDate: Tue Jan 23 16:10:37 2024 +0800 [SPARK-46590][SQL] Fix coalesce failed with unexpected partition indeces ### What changes were proposed in this pull request? As outlined in JIRA issue [SPARK-46590](https://issues.apache.org/jira/browse/SPARK-46590), when a broadcast join follows a union within the same stage, the [collectCoalesceGroups](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala#L144) method will indiscriminately traverse all sub-plans, aggregating them into a single group, which is not expected. ### Why are the changes needed? In fact, for broadcastjoin, we do not expect broadcast exchange has same partition number. Therefore, we can safely disregard the broadcast join and continue traversing the subplan. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Newly added unit test. It would fail without this pr. ### Was this patch authored or co-authored using generative AI tooling? No Closes #44661 from jackylee-ch/fix_coalesce_problem_with_broadcastjoin_and_union. Authored-by: jackylee-ch <lijunq...@baidu.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> (cherry picked from commit de0c4ad3947f1188f02aaa612df8278d1c7c3ce5) Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../adaptive/CoalesceShufflePartitions.scala | 10 ++-- .../execution/adaptive/ShufflePartitionsUtil.scala | 6 ++- .../execution/CoalesceShufflePartitionsSuite.scala | 61 ++++++++++++++++++++++ .../sql/execution/ShufflePartitionsUtilSuite.scala | 31 +++++------ 4 files changed, 86 insertions(+), 22 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala index 34399001c726..26e5ac649dbb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala @@ -23,6 +23,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.plans.physical.SinglePartition import org.apache.spark.sql.execution.{ShufflePartitionSpec, SparkPlan, UnaryExecNode, UnionExec} import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, REBALANCE_PARTITIONS_BY_COL, REBALANCE_PARTITIONS_BY_NONE, REPARTITION_BY_COL, ShuffleExchangeLike, ShuffleOrigin} +import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec, CartesianProductExec} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.util.Utils @@ -146,13 +147,16 @@ case class CoalesceShufflePartitions(session: SparkSession) extends AQEShuffleRe Seq(collectShuffleStageInfos(r)) case unary: UnaryExecNode => collectCoalesceGroups(unary.child) case union: UnionExec => union.children.flatMap(collectCoalesceGroups) - // If not all leaf nodes are exchange query stages, it's not safe to reduce the number of - // shuffle partitions, because we may break the assumption that all children of a spark plan - // have same number of output partitions. + case join: CartesianProductExec => join.children.flatMap(collectCoalesceGroups) // Note that, `BroadcastQueryStageExec` is a valid case: // If a join has been optimized from shuffled join to broadcast join, then the one side is // `BroadcastQueryStageExec` and other side is `ShuffleQueryStageExec`. It can coalesce the // shuffle side as we do not expect broadcast exchange has same partition number. + case join: BroadcastHashJoinExec => join.children.flatMap(collectCoalesceGroups) + case join: BroadcastNestedLoopJoinExec => join.children.flatMap(collectCoalesceGroups) + // If not all leaf nodes are exchange query stages, it's not safe to reduce the number of + // shuffle partitions, because we may break the assumption that all children of a spark plan + // have same number of output partitions. case p if p.collectLeaves().forall(_.isInstanceOf[ExchangeQueryStageExec]) => val shuffleStages = collectShuffleStageInfos(p) // ShuffleExchanges introduced by repartition do not support partition number change. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala index dbed66683b01..9370b3d8d1d7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala @@ -128,8 +128,10 @@ object ShufflePartitionsUtil extends Logging { // There should be no unexpected partition specs and the start indices should be identical // across all different shuffles. - assert(partitionIndicesSeq.distinct.length == 1 && partitionIndicesSeq.head.forall(_ >= 0), - s"Invalid shuffle partition specs: $inputPartitionSpecs") + if (partitionIndicesSeq.distinct.length > 1 || partitionIndicesSeq.head.exists(_ < 0)) { + logWarning(s"Could not apply partition coalescing because of unexpected partition indices.") + return Seq.empty + } // The indices may look like [0, 1, 2, 2, 2, 3, 4, 4, 5], and the repeated `2` and `4` mean // skewed partitions. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala index 24a98dd83f33..e11191da6a95 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala @@ -310,6 +310,67 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite { } } + test("SPARK-46590 adaptive query execution works correctly with broadcast join and union") { + val test: SparkSession => Unit = { spark: SparkSession => + import spark.implicits._ + spark.conf.set(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, "1KB") + spark.conf.set(SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key, "10KB") + spark.conf.set(SQLConf.SKEW_JOIN_SKEWED_PARTITION_FACTOR, 2.0) + val df00 = spark.range(0, 1000, 2) + .selectExpr("id as key", "id as value") + .union(Seq.fill(100000)((600, 600)).toDF("key", "value")) + val df01 = spark.range(0, 1000, 3) + .selectExpr("id as key", "id as value") + val df10 = spark.range(0, 1000, 5) + .selectExpr("id as key", "id as value") + .union(Seq.fill(500000)((600, 600)).toDF("key", "value")) + val df11 = spark.range(0, 1000, 7) + .selectExpr("id as key", "id as value") + val df20 = spark.range(0, 10).selectExpr("id as key", "id as value") + + df20.join(df00.join(df01, Array("key", "value"), "left_outer") + .union(df10.join(df11, Array("key", "value"), "left_outer"))) + .write + .format("noop") + .mode("overwrite") + .save() + } + withSparkSession(test, 12000, None) + } + + test("SPARK-46590 adaptive query execution works correctly with cartesian join and union") { + val test: SparkSession => Unit = { spark: SparkSession => + import spark.implicits._ + spark.conf.set(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, "-1") + spark.conf.set(SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key, "100B") + spark.conf.set(SQLConf.SKEW_JOIN_SKEWED_PARTITION_FACTOR, 2.0) + val df00 = spark.range(0, 10, 2) + .selectExpr("id as key", "id as value") + .union(Seq.fill(1000)((600, 600)).toDF("key", "value")) + val df01 = spark.range(0, 10, 3) + .selectExpr("id as key", "id as value") + val df10 = spark.range(0, 10, 5) + .selectExpr("id as key", "id as value") + .union(Seq.fill(5000)((600, 600)).toDF("key", "value")) + val df11 = spark.range(0, 10, 7) + .selectExpr("id as key", "id as value") + val df20 = spark.range(0, 10) + .selectExpr("id as key", "id as value") + .union(Seq.fill(1000)((11, 11)).toDF("key", "value")) + val df21 = spark.range(0, 10) + .selectExpr("id as key", "id as value") + + df20.join(df21.hint("shuffle_hash"), Array("key", "value"), "left_outer") + .join(df00.join(df01.hint("shuffle_hash"), Array("key", "value"), "left_outer") + .union(df10.join(df11.hint("shuffle_hash"), Array("key", "value"), "left_outer"))) + .write + .format("noop") + .mode("overwrite") + .save() + } + withSparkSession(test, 100, None) + } + test("SPARK-24705 adaptive query execution works correctly when exchange reuse enabled") { val test: SparkSession => Unit = { spark: SparkSession => spark.sql("SET spark.sql.exchange.reuse=true") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ShufflePartitionsUtilSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ShufflePartitionsUtilSuite.scala index da05373125d3..f8b796436847 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ShufflePartitionsUtilSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ShufflePartitionsUtilSuite.scala @@ -567,14 +567,13 @@ class ShufflePartitionsUtilSuite extends SparkFunSuite with LocalSparkContext { } { - // Assertion error if shuffle partition specs contain `CoalescedShuffleSpec` that has - // `end` - `start` > 1. + // If shuffle partition specs contain `CoalescedShuffleSpec` that has + // `end` - `start` > 1, return empty result. val bytesByPartitionId1 = Array[Long](10, 10, 10, 10, 10) val bytesByPartitionId2 = Array[Long](10, 10, 10, 10, 10) val specs1 = Seq(CoalescedPartitionSpec(0, 1), CoalescedPartitionSpec(1, 5)) val specs2 = specs1 - intercept[AssertionError] { - ShufflePartitionsUtil.coalescePartitions( + val coalesced = ShufflePartitionsUtil.coalescePartitions( Array( Some(new MapOutputStatistics(0, bytesByPartitionId1)), Some(new MapOutputStatistics(1, bytesByPartitionId2))), @@ -582,17 +581,16 @@ class ShufflePartitionsUtilSuite extends SparkFunSuite with LocalSparkContext { Some(specs1), Some(specs2)), targetSize, 1, 0) - } + assert(coalesced.isEmpty) } { - // Assertion error if shuffle partition specs contain `PartialMapperShuffleSpec`. + // If shuffle partition specs contain `PartialMapperShuffleSpec`, return empty result. val bytesByPartitionId1 = Array[Long](10, 10, 10, 10, 10) val bytesByPartitionId2 = Array[Long](10, 10, 10, 10, 10) val specs1 = Seq(CoalescedPartitionSpec(0, 1), PartialMapperPartitionSpec(1, 0, 1)) val specs2 = specs1 - intercept[AssertionError] { - ShufflePartitionsUtil.coalescePartitions( + val coalesced = ShufflePartitionsUtil.coalescePartitions( Array( Some(new MapOutputStatistics(0, bytesByPartitionId1)), Some(new MapOutputStatistics(1, bytesByPartitionId2))), @@ -600,18 +598,17 @@ class ShufflePartitionsUtilSuite extends SparkFunSuite with LocalSparkContext { Some(specs1), Some(specs2)), targetSize, 1, 0) - } + assert(coalesced.isEmpty) } { - // Assertion error if partition specs of different shuffles have different lengths. + // If partition specs of different shuffles have different lengths, return empty result. val bytesByPartitionId1 = Array[Long](10, 10, 10, 10, 10) val bytesByPartitionId2 = Array[Long](10, 10, 10, 10, 10) val specs1 = Seq.tabulate(4)(i => CoalescedPartitionSpec(i, i + 1)) ++ Seq.tabulate(2)(i => PartialReducerPartitionSpec(4, i, i + 1, 10L)) val specs2 = Seq.tabulate(5)(i => CoalescedPartitionSpec(i, i + 1)) - intercept[AssertionError] { - ShufflePartitionsUtil.coalescePartitions( + val coalesced = ShufflePartitionsUtil.coalescePartitions( Array( Some(new MapOutputStatistics(0, bytesByPartitionId1)), Some(new MapOutputStatistics(1, bytesByPartitionId2))), @@ -619,11 +616,12 @@ class ShufflePartitionsUtilSuite extends SparkFunSuite with LocalSparkContext { Some(specs1), Some(specs2)), targetSize, 1, 0) - } + assert(coalesced.isEmpty) } { - // Assertion error if start indices of partition specs are not identical among all shuffles. + // If start indices of partition specs are not identical among all shuffles, + // return empty result. val bytesByPartitionId1 = Array[Long](10, 10, 10, 10, 10) val bytesByPartitionId2 = Array[Long](10, 10, 10, 10, 10) val specs1 = Seq.tabulate(4)(i => CoalescedPartitionSpec(i, i + 1)) ++ @@ -631,8 +629,7 @@ class ShufflePartitionsUtilSuite extends SparkFunSuite with LocalSparkContext { val specs2 = Seq.tabulate(2)(i => CoalescedPartitionSpec(i, i + 1)) ++ Seq.tabulate(2)(i => PartialReducerPartitionSpec(2, i, i + 1, 10L)) ++ Seq.tabulate(2)(i => CoalescedPartitionSpec(i + 3, i + 4)) - intercept[AssertionError] { - ShufflePartitionsUtil.coalescePartitions( + val coalesced = ShufflePartitionsUtil.coalescePartitions( Array( Some(new MapOutputStatistics(0, bytesByPartitionId1)), Some(new MapOutputStatistics(1, bytesByPartitionId2))), @@ -640,7 +637,7 @@ class ShufflePartitionsUtilSuite extends SparkFunSuite with LocalSparkContext { Some(specs1), Some(specs2)), targetSize, 1, 0) - } + assert(coalesced.isEmpty) } { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org