This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 07f85c4c650 [SPARK-42101][SQL][FOLLOWUP] Improve TableCacheQueryStage with CoalesceShufflePartitions 07f85c4c650 is described below commit 07f85c4c650b52b8ff2741c71d3d4aa5cfee0820 Author: ulysses-you <ulyssesyo...@gmail.com> AuthorDate: Tue Mar 14 21:01:05 2023 +0800 [SPARK-42101][SQL][FOLLOWUP] Improve TableCacheQueryStage with CoalesceShufflePartitions ### What changes were proposed in this pull request? `CoalesceShufflePartitions` should make sure all leaves are `ExchangeQueryStageExec` to avoid collect `TableCacheQueryStage`. As we can not change the partition number of IMR. Add two tests to make sure `CoalesceShufflePartitions` works well with `TableCacheQueryStage`. Note that, these two tests work without this pr, thanks to `ValidateRequirements` the wrong plan has been reverted. ### Why are the changes needed? Avoid potential issue. ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? add test Closes #40406 from ulysses-you/cache-aqe-followup. Authored-by: ulysses-you <ulyssesyo...@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../adaptive/CoalesceShufflePartitions.scala | 14 +++++--- .../adaptive/AdaptiveQueryExecSuite.scala | 37 ++++++++++++++++++++++ 2 files changed, 46 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala index 5c005efb732..dfc7e23c82d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala @@ -124,7 +124,7 @@ case class CoalesceShufflePartitions(session: SparkSession) extends AQEShuffleRe /** * Gather all coalesce-able groups such that the shuffle stages in each child of a Union operator * are in their independent groups if: - * 1) all leaf nodes of this child are shuffle stages; and + * 1) all leaf nodes of this child are exchange stages; and * 2) all these shuffle stages support coalescing. */ private def collectCoalesceGroups(plan: SparkPlan): Seq[Seq[ShuffleStageInfo]] = plan match { @@ -132,10 +132,14 @@ case class CoalesceShufflePartitions(session: SparkSession) extends AQEShuffleRe Seq(collectShuffleStageInfos(r)) case unary: UnaryExecNode => collectCoalesceGroups(unary.child) case union: UnionExec => union.children.flatMap(collectCoalesceGroups) - // If not all leaf nodes are query stages, it's not safe to reduce the number of shuffle - // partitions, because we may break the assumption that all children of a spark plan have - // same number of output partitions. - case p if p.collectLeaves().forall(_.isInstanceOf[QueryStageExec]) => + // If not all leaf nodes are exchange query stages, it's not safe to reduce the number of + // shuffle partitions, because we may break the assumption that all children of a spark plan + // have same number of output partitions. + // Note that, `BroadcastQueryStageExec` is a valid case: + // If a join has been optimized from shuffled join to broadcast join, then the one side is + // `BroadcastQueryStageExec` and other side is `ShuffleQueryStageExec`. It can coalesce the + // shuffle side as we do not expect broadcast exchange has same partition number. + case p if p.collectLeaves().forall(_.isInstanceOf[ExchangeQueryStageExec]) => val shuffleStages = collectShuffleStageInfos(p) // ShuffleExchanges introduced by repartition do not support partition number change. // We change the number of partitions only if all the ShuffleExchanges support it. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index d2fe588c9a5..8ed31e1968c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -2765,6 +2765,43 @@ class AdaptiveQueryExecSuite checkShuffleAndSort(firstAccess = false) } } + + test("SPARK-42101: Do not coalesce shuffle partition if other side is TableCacheQueryStage") { + withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "3", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", + SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "true", + SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1") { + withTempView("v1", "v2") { + Seq(1, 2).toDF("c1").repartition(3, $"c1").cache().createOrReplaceTempView("v1") + Seq(1, 2).toDF("c2").createOrReplaceTempView("v2") + + val df = spark.sql("SELECT * FROM v1 JOIN v2 ON v1.c1 = v2.c2") + df.collect() + val finalPlan = df.queryExecution.executedPlan + assert(collect(finalPlan) { + case q: ShuffleQueryStageExec => q + }.size == 1) + assert(collect(finalPlan) { + case r: AQEShuffleReadExec => r + }.isEmpty) + } + } + } + + test("SPARK-42101: Coalesce shuffle partition with union even if exists TableCacheQueryStage") { + withSQLConf(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "true", + SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1") { + val cached = Seq(1).toDF("c").cache() + val df = Seq(2).toDF("c").repartition($"c").unionAll(cached) + df.collect() + assert(collect(df.queryExecution.executedPlan) { + case r @ AQEShuffleReadExec(_: ShuffleQueryStageExec, _) => r + }.size == 1) + assert(collect(df.queryExecution.executedPlan) { + case c: TableCacheQueryStageExec => c + }.size == 1) + } + } } /** --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org