This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6766c39b458a [SPARK-46707][SQL][FOLLOWUP] Push down throwable 
predicate through aggregates
6766c39b458a is described below

commit 6766c39b458ad7abacd1a5b11c896efabf36f95c
Author: zml1206 <zhuml1...@gmail.com>
AuthorDate: Tue May 14 15:53:43 2024 +0800

    [SPARK-46707][SQL][FOLLOWUP] Push down throwable predicate through 
aggregates
    
    ### What changes were proposed in this pull request?
    Push down throwable predicate through aggregates and add ut for "can't push 
down nondeterministic filter through aggregate".
    
    ### Why are the changes needed?
    If we can push down a filter through Aggregate, it means the filter only 
references the grouping keys. The Aggregate operator can't reduce grouping keys 
so the filter won't see any new data after pushing down. So push down throwable 
filter through aggregate does not affect exception thrown.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    UT
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #44975 from zml1206/SPARK-46707-FOLLOWUP.
    
    Authored-by: zml1206 <zhuml1...@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../spark/sql/catalyst/optimizer/Optimizer.scala      |  8 ++++++--
 .../sql/catalyst/optimizer/FilterPushdownSuite.scala  | 19 ++++++++++++++++---
 2 files changed, 22 insertions(+), 5 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index dfc1e17c2a29..4ee6d9027a9c 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -1768,6 +1768,10 @@ object PushPredicateThroughNonJoin extends 
Rule[LogicalPlan] with PredicateHelpe
       val aliasMap = getAliasMap(project)
       project.copy(child = Filter(replaceAlias(condition, aliasMap), 
grandChild))
 
+    // We can push down deterministic predicate through Aggregate, including 
throwable predicate.
+    // If we can push down a filter through Aggregate, it means the filter 
only references the
+    // grouping keys or constants. The Aggregate operator can't reduce 
distinct values of grouping
+    // keys so the filter won't see any new data after push down.
     case filter @ Filter(condition, aggregate: Aggregate)
       if aggregate.aggregateExpressions.forall(_.deterministic)
         && aggregate.groupingExpressions.nonEmpty =>
@@ -1777,8 +1781,8 @@ object PushPredicateThroughNonJoin extends 
Rule[LogicalPlan] with PredicateHelpe
       // attributes produced by the aggregate operator's child operator.
       val (pushDown, stayUp) = splitConjunctivePredicates(condition).partition 
{ cond =>
         val replaced = replaceAlias(cond, aliasMap)
-        cond.deterministic && !cond.throwable &&
-          cond.references.nonEmpty && 
replaced.references.subsetOf(aggregate.child.outputSet)
+        cond.deterministic && cond.references.nonEmpty &&
+          replaced.references.subsetOf(aggregate.child.outputSet)
       }
 
       if (pushDown.nonEmpty) {
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index 03e65412d166..5027222be6b8 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -219,6 +219,17 @@ class FilterPushdownSuite extends PlanTest {
     comparePlans(optimized, correctAnswer)
   }
 
+  test("Can't push down nondeterministic filter through aggregate") {
+    val originalQuery = testRelation
+      .groupBy($"a")($"a", count($"b") as "c")
+      .where(Rand(10) > $"a")
+      .analyze
+
+    val optimized = Optimize.execute(originalQuery)
+
+    comparePlans(optimized, originalQuery)
+  }
+
   test("filters: combines filters") {
     val originalQuery = testRelation
       .select($"a")
@@ -1483,14 +1494,16 @@ class FilterPushdownSuite extends PlanTest {
   test("SPARK-46707: push down predicate with sequence (without step) through 
aggregates") {
     val x = testRelation.subquery("x")
 
-    // do not push down when sequence has step param
+    // Always push down sequence as it's deterministic
     val queryWithStep = x.groupBy($"x.a", $"x.b")($"x.a", $"x.b")
       .where(IsNotNull(Sequence($"x.a", $"x.b", Some(Literal(1)))))
       .analyze
     val optimizedQueryWithStep = Optimize.execute(queryWithStep)
-    comparePlans(optimizedQueryWithStep, queryWithStep)
+    val correctAnswerWithStep = x.where(IsNotNull(Sequence($"x.a", $"x.b", 
Some(Literal(1)))))
+      .groupBy($"x.a", $"x.b")($"x.a", $"x.b")
+      .analyze
+    comparePlans(optimizedQueryWithStep, correctAnswerWithStep)
 
-    // push down when sequence does not have step param
     val queryWithoutStep = x.groupBy($"x.a", $"x.b")($"x.a", $"x.b")
       .where(IsNotNull(Sequence($"x.a", $"x.b", None)))
       .analyze


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to