This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 865a3ded2ea [SPARK-38959][SQL][FOLLOWUP] Optimizer batch `PartitionPruning` should optimize subqueries 865a3ded2ea is described below commit 865a3ded2ea1ca86be93df58205882bc509b98cd Author: Wenchen Fan <wenc...@databricks.com> AuthorDate: Thu Nov 10 13:45:49 2022 +0800 [SPARK-38959][SQL][FOLLOWUP] Optimizer batch `PartitionPruning` should optimize subqueries ### What changes were proposed in this pull request? This is a followup to https://github.com/apache/spark/pull/36304 to simplify `RowLevelOperationRuntimeGroupFiltering`. It does 3 things: 1. run `OptimizeSubqueries` in the batch `PartitionPruning`, so that `RowLevelOperationRuntimeGroupFiltering` does not need to invoke it manually. 2. skip dpp subquery in `OptimizeSubqueries`, to avoid the issue fixed by https://github.com/apache/spark/pull/33664 3. `RowLevelOperationRuntimeGroupFiltering` creates `InSubquery` instead of `DynamicPruningSubquery`, so that it can be optimized by `OptimizeSubqueries` later. This also avoids unnecessary planning overhead of `DynamicPruningSubquery`, as there is no join and we can only run it as a subquery. ### Why are the changes needed? code simplification ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? existing tests Closes #38557 from cloud-fan/help. Lead-authored-by: Wenchen Fan <wenc...@databricks.com> Co-authored-by: Wenchen Fan <cloud0...@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../spark/sql/catalyst/optimizer/Optimizer.scala | 3 +++ .../apache/spark/sql/execution/SparkOptimizer.scala | 3 ++- .../RowLevelOperationRuntimeGroupFiltering.scala | 18 +++++++----------- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index afbf7302727..2bef03d633a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -320,6 +320,9 @@ abstract class Optimizer(catalogManager: CatalogManager) } def apply(plan: LogicalPlan): LogicalPlan = plan.transformAllExpressionsWithPruning( _.containsPattern(PLAN_EXPRESSION), ruleId) { + // Do not optimize DPP subquery, as it was created from optimized plan and we should not + // optimize it again, to save optimization time and avoid breaking broadcast/subquery reuse. + case d: DynamicPruningSubquery => d case s: SubqueryExpression => val Subquery(newPlan, _) = Optimizer.this.execute(Subquery.fromExpression(s)) // At this point we have an optimized subquery plan that we are going to attach diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala index 017d1f937c3..9624bf1fa9f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala @@ -51,7 +51,8 @@ class SparkOptimizer( Batch("Optimize Metadata Only Query", Once, OptimizeMetadataOnlyQuery(catalog)) :+ Batch("PartitionPruning", Once, PartitionPruning, - RowLevelOperationRuntimeGroupFiltering(OptimizeSubqueries)) :+ + RowLevelOperationRuntimeGroupFiltering, + OptimizeSubqueries) :+ Batch("InjectRuntimeFilter", FixedPoint(1), InjectRuntimeFilter) :+ Batch("MergeScalarSubqueries", Once, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelOperationRuntimeGroupFiltering.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelOperationRuntimeGroupFiltering.scala index d9dad43532e..bb5edc94fa5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelOperationRuntimeGroupFiltering.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelOperationRuntimeGroupFiltering.scala @@ -17,10 +17,10 @@ package org.apache.spark.sql.execution.dynamicpruning -import org.apache.spark.sql.catalyst.expressions.{And, Attribute, DynamicPruningSubquery, Expression, PredicateHelper, V2ExpressionUtils} +import org.apache.spark.sql.catalyst.expressions.{Attribute, DynamicPruningExpression, Expression, InSubquery, ListQuery, PredicateHelper, V2ExpressionUtils} import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral import org.apache.spark.sql.catalyst.planning.GroupBasedRowLevelOperation -import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, LogicalPlan} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.connector.read.SupportsRuntimeV2Filtering import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Implicits, DataSourceV2Relation, DataSourceV2ScanRelation} @@ -37,8 +37,7 @@ import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Implicits, Dat * * Note this rule only applies to group-based row-level operations. */ -case class RowLevelOperationRuntimeGroupFiltering(optimizeSubqueries: Rule[LogicalPlan]) - extends Rule[LogicalPlan] with PredicateHelper { +object RowLevelOperationRuntimeGroupFiltering extends Rule[LogicalPlan] with PredicateHelper { import DataSourceV2Implicits._ @@ -65,8 +64,7 @@ case class RowLevelOperationRuntimeGroupFiltering(optimizeSubqueries: Rule[Logic Filter(dynamicPruningCond, r) } - // optimize subqueries to rewrite them as joins and trigger job planning - replaceData.copy(query = optimizeSubqueries(newQuery)) + replaceData.copy(query = newQuery) } private def buildMatchingRowsPlan( @@ -88,10 +86,8 @@ case class RowLevelOperationRuntimeGroupFiltering(optimizeSubqueries: Rule[Logic buildKeys: Seq[Attribute], pruningKeys: Seq[Attribute]): Expression = { - val buildQuery = Project(buildKeys, matchingRowsPlan) - val dynamicPruningSubqueries = pruningKeys.zipWithIndex.map { case (key, index) => - DynamicPruningSubquery(key, buildQuery, buildKeys, index, onlyInBroadcast = false) - } - dynamicPruningSubqueries.reduce(And) + val buildQuery = Aggregate(buildKeys, buildKeys, matchingRowsPlan) + DynamicPruningExpression( + InSubquery(pruningKeys, ListQuery(buildQuery, childOutputs = buildQuery.output))) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org