cloud-fan commented on a change in pull request #31258:
URL: https://github.com/apache/spark/pull/31258#discussion_r570002134



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
##########
@@ -156,7 +157,7 @@ case class AdaptiveSparkPlanExec(
 
   override def output: Seq[Attribute] = inputPlan.output
 
-  override def doCanonicalize(): SparkPlan = inputPlan.canonicalized
+  override def doCanonicalize(): SparkPlan = executedPlan.canonicalized

Review comment:
       `executedPlan` keeps changing, how about using `initialPlan`?

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveDynamicPruningFilters.scala
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import scala.collection.concurrent.TrieMap
+
+import org.apache.spark.sql.catalyst.expressions.{BindReferences, 
DynamicPruningExpression, Literal}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.exchange.BroadcastExchangeExec
+import org.apache.spark.sql.execution.joins.{HashedRelationBroadcastMode, 
HashJoin}
+
+/**
+ * A rule to insert dynamic pruning predicates in order to reuse the results 
of broadcast.
+ */
+case class PlanAdaptiveDynamicPruningFilters(
+    stageCache: TrieMap[SparkPlan, QueryStageExec]) extends Rule[SparkPlan] {
+  def apply(plan: SparkPlan): SparkPlan = {
+    if (!conf.dynamicPartitionPruningEnabled) {
+      return plan
+    }
+
+    plan transformAllExpressions {
+      case DynamicPruningExpression(InSubqueryExec(
+          value, SubqueryAdaptiveBroadcastExec(name, index, buildKeys,
+          adaptivePlan: AdaptiveSparkPlanExec), exprId, _)) =>
+        val packedKeys = BindReferences.bindReferences(
+          HashJoin.rewriteKeyExpr(buildKeys), adaptivePlan.executedPlan.output)
+        val mode = HashedRelationBroadcastMode(packedKeys)
+        // plan a broadcast exchange of the build side of the join
+        val exchange = BroadcastExchangeExec(mode, adaptivePlan.executedPlan)

Review comment:
       now we can put `adaptivePlan` directly.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
##########
@@ -149,7 +149,8 @@ abstract class QueryStageExec extends LeafExecNode {
  */

Review comment:
       Let's add parameter doc
   ```
   /**
    * ...
    * @param id              the query stage id.
    * @param plan            the underlying plan.
    * @param _canonicalized  the canonicalized plan before applying query stage 
optimizer rules.
    */
   ```

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
##########
@@ -197,7 +199,8 @@ case class ShuffleQueryStageExec(
  */
 case class BroadcastQueryStageExec(

Review comment:
       ditto

##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
##########
@@ -250,7 +259,9 @@ abstract class DynamicPartitionPruningSuiteBase
   /**
    * Test the result of a simple join on mock-up tables
    */
-  test("simple inner join triggers DPP with mock-up tables") {
+  test("simple inner join triggers DPP with mock-up tables",
+    DisableAdaptiveExecution("The adaptive DPP filters will be inserted only 
when" +

Review comment:
       let's simplify the reason: `DPP in AQE must reuse broadcast`

##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
##########
@@ -467,6 +482,7 @@ abstract class DynamicPartitionPruningSuiteBase
           """.stripMargin)
 
         checkPartitionPruningPredicate(df, true, false)
+

Review comment:
       unnecessary change

##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
##########
@@ -1265,7 +1302,7 @@ abstract class DynamicPartitionPruningSuiteBase
           case se: ReusedExchangeExec => se
         }
         assert(reuseExchangeNodes.size == 1, "Expected plan to contain 1 
ReusedExchangeExec " +
-          s"nodes. Found ${reuseExchangeNodes.size}")
+            s"nodes. Found ${reuseExchangeNodes.size}")

Review comment:
       unnecessary change

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
##########
@@ -146,10 +146,15 @@ abstract class QueryStageExec extends LeafExecNode {
 
 /**
  * A shuffle query stage whose child is a [[ShuffleExchangeLike]] or 
[[ReusedExchangeExec]].
+ *
+ * @param id the query stage id.
+ * @param plan the underlying plan.
+ * @param _canonicalized the canonicalized plan before applying query stage 
optimizer rules.
  */
 case class ShuffleQueryStageExec(
     override val id: Int,
-    override val plan: SparkPlan) extends QueryStageExec {
+    override val plan: SparkPlan,
+    _canonicalized: SparkPlan) extends QueryStageExec {

Review comment:
       we missed to add `override def doCanonicalize(): SparkPlan = 
_canonicalized`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to