This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new bd29ca78905 [SPARK-41112][SQL] RuntimeFilter should apply ColumnPruning eagerly with in-subquery filter bd29ca78905 is described below commit bd29ca7890554ac8932be59097e6345505a36c4c Author: ulysses-you <ulyssesyo...@gmail.com> AuthorDate: Tue Nov 15 16:47:29 2022 +0800 [SPARK-41112][SQL] RuntimeFilter should apply ColumnPruning eagerly with in-subquery filter ### What changes were proposed in this pull request? Apply ColumnPruning for in subquery filter. Note that, the bloom filter side has already fixed by https://github.com/apache/spark/pull/36047 ### Why are the changes needed? The inferred in-subquery filter should apply ColumnPruning before get plan statistics and check if can be broadcasted. Otherwise, the final physical plan will be different from expected. ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? add test Closes #38619 from ulysses-you/SPARK-41112. Authored-by: ulysses-you <ulyssesyo...@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala | 2 +- .../test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala | 7 ++++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala index 62782f6051b..efcf607b589 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala @@ -99,7 +99,7 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J require(filterApplicationSideExp.dataType == filterCreationSideExp.dataType) val actualFilterKeyExpr = mayWrapWithHash(filterCreationSideExp) val alias = Alias(actualFilterKeyExpr, actualFilterKeyExpr.toString)() - val aggregate = Aggregate(Seq(alias), Seq(alias), filterCreationSidePlan) + val aggregate = ColumnPruning(Aggregate(Seq(alias), Seq(alias), filterCreationSidePlan)) if (!canBroadcastBySize(aggregate, conf)) { // Skip the InSubquery filter if the size of `aggregate` is beyond broadcast join threshold, // i.e., the semi-join will be a shuffled join, which is not worthwhile. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala index 0e016e19a62..fda442eeef0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql import org.apache.spark.sql.catalyst.expressions.{Alias, BloomFilterMightContain, Literal} import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, BloomFilterAggregate} -import org.apache.spark.sql.catalyst.optimizer.MergeScalarSubqueries +import org.apache.spark.sql.catalyst.optimizer.{ColumnPruning, MergeScalarSubqueries} import org.apache.spark.sql.catalyst.plans.LeftSemi import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, Join, LogicalPlan} import org.apache.spark.sql.execution.{ReusedSubqueryExec, SubqueryExec} @@ -257,6 +257,11 @@ class InjectRuntimeFilterSuite extends QueryTest with SQLTestUtils with SharedSp val normalizedDisabled = normalizePlan(normalizeExprIds(planDisabled)) ensureLeftSemiJoinExists(planEnabled) assert(normalizedEnabled != normalizedDisabled) + val agg = planEnabled.collect { + case Join(_, agg: Aggregate, LeftSemi, _, _) => agg + } + assert(agg.size == 1) + assert(agg.head.fastEquals(ColumnPruning(agg.head))) } else { comparePlans(planDisabled, planEnabled) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org