This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 865a3ded2ea [SPARK-38959][SQL][FOLLOWUP] Optimizer batch 
`PartitionPruning` should optimize subqueries
865a3ded2ea is described below

commit 865a3ded2ea1ca86be93df58205882bc509b98cd
Author: Wenchen Fan <wenc...@databricks.com>
AuthorDate: Thu Nov 10 13:45:49 2022 +0800

    [SPARK-38959][SQL][FOLLOWUP] Optimizer batch `PartitionPruning` should 
optimize subqueries
    
    ### What changes were proposed in this pull request?
    
    This is a followup to https://github.com/apache/spark/pull/36304 to 
simplify `RowLevelOperationRuntimeGroupFiltering`. It does 3 things:
    1. run `OptimizeSubqueries` in the batch `PartitionPruning`, so that 
`RowLevelOperationRuntimeGroupFiltering` does not need to invoke it manually.
    2. skip dpp subquery in `OptimizeSubqueries`, to avoid the issue fixed by 
https://github.com/apache/spark/pull/33664
    3. `RowLevelOperationRuntimeGroupFiltering` creates `InSubquery` instead of 
`DynamicPruningSubquery`, so that it can be optimized by `OptimizeSubqueries` 
later. This also avoids unnecessary planning overhead of 
`DynamicPruningSubquery`, as there is no join and we can only run it as a 
subquery.
    
    ### Why are the changes needed?
    
    code simplification
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    existing tests
    
    Closes #38557 from cloud-fan/help.
    
    Lead-authored-by: Wenchen Fan <wenc...@databricks.com>
    Co-authored-by: Wenchen Fan <cloud0...@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../spark/sql/catalyst/optimizer/Optimizer.scala       |  3 +++
 .../apache/spark/sql/execution/SparkOptimizer.scala    |  3 ++-
 .../RowLevelOperationRuntimeGroupFiltering.scala       | 18 +++++++-----------
 3 files changed, 12 insertions(+), 12 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index afbf7302727..2bef03d633a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -320,6 +320,9 @@ abstract class Optimizer(catalogManager: CatalogManager)
     }
     def apply(plan: LogicalPlan): LogicalPlan = 
plan.transformAllExpressionsWithPruning(
       _.containsPattern(PLAN_EXPRESSION), ruleId) {
+      // Do not optimize DPP subquery, as it was created from optimized plan 
and we should not
+      // optimize it again, to save optimization time and avoid breaking 
broadcast/subquery reuse.
+      case d: DynamicPruningSubquery => d
       case s: SubqueryExpression =>
         val Subquery(newPlan, _) = 
Optimizer.this.execute(Subquery.fromExpression(s))
         // At this point we have an optimized subquery plan that we are going 
to attach
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
index 017d1f937c3..9624bf1fa9f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
@@ -51,7 +51,8 @@ class SparkOptimizer(
     Batch("Optimize Metadata Only Query", Once, 
OptimizeMetadataOnlyQuery(catalog)) :+
     Batch("PartitionPruning", Once,
       PartitionPruning,
-      RowLevelOperationRuntimeGroupFiltering(OptimizeSubqueries)) :+
+      RowLevelOperationRuntimeGroupFiltering,
+      OptimizeSubqueries) :+
     Batch("InjectRuntimeFilter", FixedPoint(1),
       InjectRuntimeFilter) :+
     Batch("MergeScalarSubqueries", Once,
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelOperationRuntimeGroupFiltering.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelOperationRuntimeGroupFiltering.scala
index d9dad43532e..bb5edc94fa5 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelOperationRuntimeGroupFiltering.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelOperationRuntimeGroupFiltering.scala
@@ -17,10 +17,10 @@
 
 package org.apache.spark.sql.execution.dynamicpruning
 
-import org.apache.spark.sql.catalyst.expressions.{And, Attribute, 
DynamicPruningSubquery, Expression, PredicateHelper, V2ExpressionUtils}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
DynamicPruningExpression, Expression, InSubquery, ListQuery, PredicateHelper, 
V2ExpressionUtils}
 import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
 import org.apache.spark.sql.catalyst.planning.GroupBasedRowLevelOperation
-import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, 
LogicalPlan}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.connector.read.SupportsRuntimeV2Filtering
 import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Implicits, 
DataSourceV2Relation, DataSourceV2ScanRelation}
@@ -37,8 +37,7 @@ import 
org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Implicits, Dat
  *
  * Note this rule only applies to group-based row-level operations.
  */
-case class RowLevelOperationRuntimeGroupFiltering(optimizeSubqueries: 
Rule[LogicalPlan])
-  extends Rule[LogicalPlan] with PredicateHelper {
+object RowLevelOperationRuntimeGroupFiltering extends Rule[LogicalPlan] with 
PredicateHelper {
 
   import DataSourceV2Implicits._
 
@@ -65,8 +64,7 @@ case class 
RowLevelOperationRuntimeGroupFiltering(optimizeSubqueries: Rule[Logic
           Filter(dynamicPruningCond, r)
       }
 
-      // optimize subqueries to rewrite them as joins and trigger job planning
-      replaceData.copy(query = optimizeSubqueries(newQuery))
+      replaceData.copy(query = newQuery)
   }
 
   private def buildMatchingRowsPlan(
@@ -88,10 +86,8 @@ case class 
RowLevelOperationRuntimeGroupFiltering(optimizeSubqueries: Rule[Logic
       buildKeys: Seq[Attribute],
       pruningKeys: Seq[Attribute]): Expression = {
 
-    val buildQuery = Project(buildKeys, matchingRowsPlan)
-    val dynamicPruningSubqueries = pruningKeys.zipWithIndex.map { case (key, 
index) =>
-      DynamicPruningSubquery(key, buildQuery, buildKeys, index, 
onlyInBroadcast = false)
-    }
-    dynamicPruningSubqueries.reduce(And)
+    val buildQuery = Aggregate(buildKeys, buildKeys, matchingRowsPlan)
+    DynamicPruningExpression(
+      InSubquery(pruningKeys, ListQuery(buildQuery, childOutputs = 
buildQuery.output)))
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to