This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 6766c39b458a [SPARK-46707][SQL][FOLLOWUP] Push down throwable predicate through aggregates 6766c39b458a is described below commit 6766c39b458ad7abacd1a5b11c896efabf36f95c Author: zml1206 <zhuml1...@gmail.com> AuthorDate: Tue May 14 15:53:43 2024 +0800 [SPARK-46707][SQL][FOLLOWUP] Push down throwable predicate through aggregates ### What changes were proposed in this pull request? Push down throwable predicate through aggregates and add ut for "can't push down nondeterministic filter through aggregate". ### Why are the changes needed? If we can push down a filter through Aggregate, it means the filter only references the grouping keys. The Aggregate operator can't reduce grouping keys so the filter won't see any new data after pushing down. So push down throwable filter through aggregate does not affect exception thrown. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? UT ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44975 from zml1206/SPARK-46707-FOLLOWUP. Authored-by: zml1206 <zhuml1...@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../spark/sql/catalyst/optimizer/Optimizer.scala | 8 ++++++-- .../sql/catalyst/optimizer/FilterPushdownSuite.scala | 19 ++++++++++++++++--- 2 files changed, 22 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index dfc1e17c2a29..4ee6d9027a9c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1768,6 +1768,10 @@ object PushPredicateThroughNonJoin extends Rule[LogicalPlan] with PredicateHelpe val aliasMap = getAliasMap(project) project.copy(child = Filter(replaceAlias(condition, aliasMap), grandChild)) + // We can push down deterministic predicate through Aggregate, including throwable predicate. + // If we can push down a filter through Aggregate, it means the filter only references the + // grouping keys or constants. The Aggregate operator can't reduce distinct values of grouping + // keys so the filter won't see any new data after push down. case filter @ Filter(condition, aggregate: Aggregate) if aggregate.aggregateExpressions.forall(_.deterministic) && aggregate.groupingExpressions.nonEmpty => @@ -1777,8 +1781,8 @@ object PushPredicateThroughNonJoin extends Rule[LogicalPlan] with PredicateHelpe // attributes produced by the aggregate operator's child operator. val (pushDown, stayUp) = splitConjunctivePredicates(condition).partition { cond => val replaced = replaceAlias(cond, aliasMap) - cond.deterministic && !cond.throwable && - cond.references.nonEmpty && replaced.references.subsetOf(aggregate.child.outputSet) + cond.deterministic && cond.references.nonEmpty && + replaced.references.subsetOf(aggregate.child.outputSet) } if (pushDown.nonEmpty) { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index 03e65412d166..5027222be6b8 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -219,6 +219,17 @@ class FilterPushdownSuite extends PlanTest { comparePlans(optimized, correctAnswer) } + test("Can't push down nondeterministic filter through aggregate") { + val originalQuery = testRelation + .groupBy($"a")($"a", count($"b") as "c") + .where(Rand(10) > $"a") + .analyze + + val optimized = Optimize.execute(originalQuery) + + comparePlans(optimized, originalQuery) + } + test("filters: combines filters") { val originalQuery = testRelation .select($"a") @@ -1483,14 +1494,16 @@ class FilterPushdownSuite extends PlanTest { test("SPARK-46707: push down predicate with sequence (without step) through aggregates") { val x = testRelation.subquery("x") - // do not push down when sequence has step param + // Always push down sequence as it's deterministic val queryWithStep = x.groupBy($"x.a", $"x.b")($"x.a", $"x.b") .where(IsNotNull(Sequence($"x.a", $"x.b", Some(Literal(1))))) .analyze val optimizedQueryWithStep = Optimize.execute(queryWithStep) - comparePlans(optimizedQueryWithStep, queryWithStep) + val correctAnswerWithStep = x.where(IsNotNull(Sequence($"x.a", $"x.b", Some(Literal(1))))) + .groupBy($"x.a", $"x.b")($"x.a", $"x.b") + .analyze + comparePlans(optimizedQueryWithStep, correctAnswerWithStep) - // push down when sequence does not have step param val queryWithoutStep = x.groupBy($"x.a", $"x.b")($"x.a", $"x.b") .where(IsNotNull(Sequence($"x.a", $"x.b", None))) .analyze --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org