Repository: spark
Updated Branches:
  refs/heads/master 7dfad4b13 -> 37eb9184f


[SPARK-17712][SQL] Fix invalid pushdown of data-independent filters beneath 
aggregates

## What changes were proposed in this pull request?

This patch fixes a minor correctness issue impacting the pushdown of filters 
beneath aggregates. Specifically, if a filter condition references no grouping 
or aggregate columns (e.g. `WHERE false`) then it would be incorrectly pushed 
beneath an aggregate.

Intuitively, the only case where you can push a filter beneath an aggregate is 
when that filter is deterministic and is defined over the grouping columns / 
expressions, since in that case the filter is acting to exclude entire groups 
from the query (like a `HAVING` clause). The existing code would only push 
deterministic filters beneath aggregates when all of the filter's references 
were grouping columns, but this logic missed the case where a filter has no 
references. For example, `WHERE false` is deterministic but is independent of 
the actual data.

This patch fixes this minor bug by adding a new check to ensure that we don't 
push filters beneath aggregates when those filters don't reference any columns.

## How was this patch tested?

New regression test in FilterPushdownSuite.

Author: Josh Rosen <joshro...@databricks.com>

Closes #15289 from JoshRosen/SPARK-17712.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/37eb9184
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/37eb9184
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/37eb9184

Branch: refs/heads/master
Commit: 37eb9184f1e9f1c07142c66936671f4711ef407d
Parents: 7dfad4b
Author: Josh Rosen <joshro...@databricks.com>
Authored: Wed Sep 28 19:03:05 2016 -0700
Committer: Herman van Hovell <hvanhov...@databricks.com>
Committed: Wed Sep 28 19:03:05 2016 -0700

----------------------------------------------------------------------
 .../spark/sql/catalyst/optimizer/Optimizer.scala   |  2 +-
 .../catalyst/optimizer/FilterPushdownSuite.scala   | 17 +++++++++++++++++
 2 files changed, 18 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/37eb9184/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 0df16b7..4952ba3 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -710,7 +710,7 @@ object PushDownPredicate extends Rule[LogicalPlan] with 
PredicateHelper {
 
       val (pushDown, rest) = candidates.partition { cond =>
         val replaced = replaceAlias(cond, aliasMap)
-        replaced.references.subsetOf(aggregate.child.outputSet)
+        cond.references.nonEmpty && 
replaced.references.subsetOf(aggregate.child.outputSet)
       }
 
       val stayUp = rest ++ containingNonDeterministic

http://git-wip-us.apache.org/repos/asf/spark/blob/37eb9184/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index 55836f9..019f132 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -687,6 +687,23 @@ class FilterPushdownSuite extends PlanTest {
     comparePlans(optimized, correctAnswer)
   }
 
+  test("SPARK-17712: aggregate: don't push down filters that are 
data-independent") {
+    val originalQuery = LocalRelation.apply(testRelation.output, Seq.empty)
+      .select('a, 'b)
+      .groupBy('a)(count('a))
+      .where(false)
+
+    val optimized = Optimize.execute(originalQuery.analyze)
+
+    val correctAnswer = testRelation
+      .select('a, 'b)
+      .groupBy('a)(count('a))
+      .where(false)
+      .analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
+
   test("broadcast hint") {
     val originalQuery = BroadcastHint(testRelation)
       .where('a === 2L && 'b + Rand(10).as("rnd") === 3)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to