This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 103768bcb8b [SPARK-40105][SQL] Improve repartition in ReplaceCTERefWithRepartition 103768bcb8b is described below commit 103768bcb8bb98ae1b55d449f4f7edf215f3a72c Author: ulysses-you <ulyssesyo...@gmail.com> AuthorDate: Wed Aug 17 17:46:19 2022 +0800 [SPARK-40105][SQL] Improve repartition in ReplaceCTERefWithRepartition ### What changes were proposed in this pull request? - skip adding a repartition if the top level node of CTE is rebalance - use RepartitionByExpression instead of Repartition so that AQE can coalesce the shuffle partition ### Why are the changes needed? If cte can not inlined, the ReplaceCTERefWithRepartition will add repartition to force a shuffle so that the reference can reuse shuffle exchange. The added repartition should be optimized by AQE for better performance. If the user has specified a rebalance, the ReplaceCTERefWithRepartition should skip add repartition. ### Does this PR introduce _any_ user-facing change? no, only improve performance ### How was this patch tested? add test Closes #37537 from ulysses-you/cte. Authored-by: ulysses-you <ulyssesyo...@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../optimizer/ReplaceCTERefWithRepartition.scala | 10 ++++++-- .../org/apache/spark/sql/CTEInlineSuite.scala | 30 +++++++++++++++++++--- 2 files changed, 35 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceCTERefWithRepartition.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceCTERefWithRepartition.scala index 0190fa2a2ab..c01372c71ab 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceCTERefWithRepartition.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceCTERefWithRepartition.scala @@ -41,6 +41,12 @@ object ReplaceCTERefWithRepartition extends Rule[LogicalPlan] { replaceWithRepartition(plan, mutable.HashMap.empty[Long, LogicalPlan]) } + private def canSkipExtraRepartition(p: LogicalPlan): Boolean = p match { + case _: RepartitionOperation => true + case _: RebalancePartitions => true + case _ => false + } + private def replaceWithRepartition( plan: LogicalPlan, cteMap: mutable.HashMap[Long, LogicalPlan]): LogicalPlan = plan match { @@ -48,12 +54,12 @@ object ReplaceCTERefWithRepartition extends Rule[LogicalPlan] { cteDefs.foreach { cteDef => val inlined = replaceWithRepartition(cteDef.child, cteMap) val withRepartition = - if (inlined.isInstanceOf[RepartitionOperation] || cteDef.underSubquery) { + if (canSkipExtraRepartition(inlined) || cteDef.underSubquery) { // If the CTE definition plan itself is a repartition operation or if it hosts a merged // scalar subquery, we do not need to add an extra repartition shuffle. inlined } else { - Repartition(conf.numShufflePartitions, shuffle = true, inlined) + RepartitionByExpression(Seq.empty, inlined, None) } cteMap.put(cteDef.id, withRepartition) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala index ee000bce1fc..26d165b460a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql import org.apache.spark.sql.catalyst.expressions.{And, GreaterThan, LessThan, Literal, Or} -import org.apache.spark.sql.catalyst.plans.logical.{Filter, Project, RepartitionOperation, WithCTE} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, Project, RebalancePartitions, RepartitionByExpression, RepartitionOperation, WithCTE} import org.apache.spark.sql.execution.adaptive._ import org.apache.spark.sql.execution.exchange.ReusedExchangeExec import org.apache.spark.sql.internal.SQLConf @@ -42,8 +42,13 @@ abstract class CTEInlineSuiteBase |select * from v except select * from v """.stripMargin) checkAnswer(df, Nil) + + val r = df.queryExecution.optimizedPlan.find { + case RepartitionByExpression(p, _, None) => p.isEmpty + case _ => false + } assert( - df.queryExecution.optimizedPlan.exists(_.isInstanceOf[RepartitionOperation]), + r.isDefined, "Non-deterministic With-CTE with multiple references should be not inlined.") } } @@ -485,4 +490,23 @@ abstract class CTEInlineSuiteBase class CTEInlineSuiteAEOff extends CTEInlineSuiteBase with DisableAdaptiveExecutionSuite -class CTEInlineSuiteAEOn extends CTEInlineSuiteBase with EnableAdaptiveExecutionSuite +class CTEInlineSuiteAEOn extends CTEInlineSuiteBase with EnableAdaptiveExecutionSuite { + import testImplicits._ + + test("SPARK-40105: Improve repartition in ReplaceCTERefWithRepartition") { + withTempView("t") { + Seq((0, 1), (1, 2)).toDF("c1", "c2").createOrReplaceTempView("t") + val df = sql( + s"""with + |v as ( + | select /*+ rebalance(c1) */ c1, c2, rand() from t + |) + |select * from v except select * from v + """.stripMargin) + checkAnswer(df, Nil) + + assert(!df.queryExecution.optimizedPlan.exists(_.isInstanceOf[RepartitionOperation])) + assert(df.queryExecution.optimizedPlan.exists(_.isInstanceOf[RebalancePartitions])) + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org