This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 103768bcb8b [SPARK-40105][SQL] Improve repartition in 
ReplaceCTERefWithRepartition
103768bcb8b is described below

commit 103768bcb8bb98ae1b55d449f4f7edf215f3a72c
Author: ulysses-you <ulyssesyo...@gmail.com>
AuthorDate: Wed Aug 17 17:46:19 2022 +0800

    [SPARK-40105][SQL] Improve repartition in ReplaceCTERefWithRepartition
    
    ### What changes were proposed in this pull request?
    
    - skip adding a repartition if the top level node of CTE is rebalance
    - use RepartitionByExpression instead of Repartition so that AQE can 
coalesce the shuffle partition
    
    ### Why are the changes needed?
    
    If cte can not inlined, the ReplaceCTERefWithRepartition will add 
repartition to force a shuffle so that the reference can reuse shuffle exchange.
    The added repartition should be optimized by AQE for better performance.
    
    If the user has specified a rebalance, the ReplaceCTERefWithRepartition 
should skip add repartition.
    
    ### Does this PR introduce _any_ user-facing change?
    
    no, only improve performance
    
    ### How was this patch tested?
    
    add test
    
    Closes #37537 from ulysses-you/cte.
    
    Authored-by: ulysses-you <ulyssesyo...@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../optimizer/ReplaceCTERefWithRepartition.scala   | 10 ++++++--
 .../org/apache/spark/sql/CTEInlineSuite.scala      | 30 +++++++++++++++++++---
 2 files changed, 35 insertions(+), 5 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceCTERefWithRepartition.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceCTERefWithRepartition.scala
index 0190fa2a2ab..c01372c71ab 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceCTERefWithRepartition.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceCTERefWithRepartition.scala
@@ -41,6 +41,12 @@ object ReplaceCTERefWithRepartition extends 
Rule[LogicalPlan] {
       replaceWithRepartition(plan, mutable.HashMap.empty[Long, LogicalPlan])
   }
 
+  private def canSkipExtraRepartition(p: LogicalPlan): Boolean = p match {
+    case _: RepartitionOperation => true
+    case _: RebalancePartitions => true
+    case _ => false
+  }
+
   private def replaceWithRepartition(
       plan: LogicalPlan,
       cteMap: mutable.HashMap[Long, LogicalPlan]): LogicalPlan = plan match {
@@ -48,12 +54,12 @@ object ReplaceCTERefWithRepartition extends 
Rule[LogicalPlan] {
       cteDefs.foreach { cteDef =>
         val inlined = replaceWithRepartition(cteDef.child, cteMap)
         val withRepartition =
-          if (inlined.isInstanceOf[RepartitionOperation] || 
cteDef.underSubquery) {
+          if (canSkipExtraRepartition(inlined) || cteDef.underSubquery) {
             // If the CTE definition plan itself is a repartition operation or 
if it hosts a merged
             // scalar subquery, we do not need to add an extra repartition 
shuffle.
             inlined
           } else {
-            Repartition(conf.numShufflePartitions, shuffle = true, inlined)
+            RepartitionByExpression(Seq.empty, inlined, None)
           }
         cteMap.put(cteDef.id, withRepartition)
       }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala
index ee000bce1fc..26d165b460a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql
 
 import org.apache.spark.sql.catalyst.expressions.{And, GreaterThan, LessThan, 
Literal, Or}
-import org.apache.spark.sql.catalyst.plans.logical.{Filter, Project, 
RepartitionOperation, WithCTE}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, Project, 
RebalancePartitions, RepartitionByExpression, RepartitionOperation, WithCTE}
 import org.apache.spark.sql.execution.adaptive._
 import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
 import org.apache.spark.sql.internal.SQLConf
@@ -42,8 +42,13 @@ abstract class CTEInlineSuiteBase
            |select * from v except select * from v
          """.stripMargin)
       checkAnswer(df, Nil)
+
+      val r = df.queryExecution.optimizedPlan.find {
+        case RepartitionByExpression(p, _, None) => p.isEmpty
+        case _ => false
+      }
       assert(
-        
df.queryExecution.optimizedPlan.exists(_.isInstanceOf[RepartitionOperation]),
+        r.isDefined,
         "Non-deterministic With-CTE with multiple references should be not 
inlined.")
     }
   }
@@ -485,4 +490,23 @@ abstract class CTEInlineSuiteBase
 
 class CTEInlineSuiteAEOff extends CTEInlineSuiteBase with 
DisableAdaptiveExecutionSuite
 
-class CTEInlineSuiteAEOn extends CTEInlineSuiteBase with 
EnableAdaptiveExecutionSuite
+class CTEInlineSuiteAEOn extends CTEInlineSuiteBase with 
EnableAdaptiveExecutionSuite {
+  import testImplicits._
+
+  test("SPARK-40105: Improve repartition in ReplaceCTERefWithRepartition") {
+    withTempView("t") {
+      Seq((0, 1), (1, 2)).toDF("c1", "c2").createOrReplaceTempView("t")
+      val df = sql(
+        s"""with
+           |v as (
+           |  select /*+ rebalance(c1) */ c1, c2, rand() from t
+           |)
+           |select * from v except select * from v
+         """.stripMargin)
+      checkAnswer(df, Nil)
+
+      
assert(!df.queryExecution.optimizedPlan.exists(_.isInstanceOf[RepartitionOperation]))
+      
assert(df.queryExecution.optimizedPlan.exists(_.isInstanceOf[RebalancePartitions]))
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to