Repository: spark
Updated Branches:
  refs/heads/branch-2.2 41f705a57 -> 7c30ae39f


[SPARK-22983] Don't push filters beneath aggregates with empty grouping 
expressions

## What changes were proposed in this pull request?

The following SQL query should return zero rows, but in Spark it actually 
returns one row:

```
SELECT 1 from (
  SELECT 1 AS z,
  MIN(a.x)
  FROM (select 1 as x) a
  WHERE false
) b
where b.z != b.z
```

The problem stems from the `PushDownPredicate` rule: when this rule encounters 
a filter on top of an Aggregate operator, e.g. `Filter(Agg(...))`, it removes 
the original filter and adds a new filter onto Aggregate's child, e.g. 
`Agg(Filter(...))`. This is sometimes okay, but the case above is a 
counterexample: because there is no explicit `GROUP BY`, we are implicitly 
computing a global aggregate over the entire table so the original filter was 
not acting like a `HAVING` clause filtering the number of groups: if we push 
this filter then it fails to actually reduce the cardinality of the Aggregate 
output, leading to the wrong answer.

In 2016 I fixed a similar problem involving invalid pushdowns of 
data-independent filters (filters which reference no columns of the filtered 
relation). There was additional discussion after my fix was merged which 
pointed out that my patch was an incomplete fix (see #15289), but it looks I 
must have either misunderstood the comment or forgot to follow up on the 
additional points raised there.

This patch fixes the problem by choosing to never push down filters in cases 
where there are no grouping expressions. Since there are no grouping keys, the 
only columns are aggregate columns and we can't push filters defined over 
aggregate results, so this change won't cause us to miss out on any legitimate 
pushdown opportunities.

## How was this patch tested?

New regression tests in `SQLQueryTestSuite` and `FilterPushdownSuite`.

Author: Josh Rosen <joshro...@databricks.com>

Closes #20180 from 
JoshRosen/SPARK-22983-dont-push-filters-beneath-aggs-with-empty-grouping-expressions.

(cherry picked from commit 2c73d2a948bdde798aaf0f87c18846281deb05fd)
Signed-off-by: gatorsmile <gatorsm...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7c30ae39
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7c30ae39
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7c30ae39

Branch: refs/heads/branch-2.2
Commit: 7c30ae39f57ef0c42173b52aa405027b44e0ad9f
Parents: 41f705a
Author: Josh Rosen <joshro...@databricks.com>
Authored: Mon Jan 8 16:04:03 2018 +0800
Committer: gatorsmile <gatorsm...@gmail.com>
Committed: Mon Jan 8 16:05:04 2018 +0800

----------------------------------------------------------------------
 .../spark/sql/catalyst/optimizer/Optimizer.scala    |  3 ++-
 .../catalyst/optimizer/FilterPushdownSuite.scala    | 13 +++++++++++++
 .../test/resources/sql-tests/inputs/group-by.sql    |  9 +++++++++
 .../resources/sql-tests/results/group-by.sql.out    | 16 +++++++++++++++-
 4 files changed, 39 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7c30ae39/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 82bd759..fe66821 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -754,7 +754,8 @@ object PushDownPredicate extends Rule[LogicalPlan] with 
PredicateHelper {
       project.copy(child = Filter(replaceAlias(condition, aliasMap), 
grandChild))
 
     case filter @ Filter(condition, aggregate: Aggregate)
-      if aggregate.aggregateExpressions.forall(_.deterministic) =>
+      if aggregate.aggregateExpressions.forall(_.deterministic)
+        && aggregate.groupingExpressions.nonEmpty =>
       // Find all the aliased expressions in the aggregate list that don't 
include any actual
       // AggregateExpression, and create a map from the alias to the expression
       val aliasMap = AttributeMap(aggregate.aggregateExpressions.collect {

http://git-wip-us.apache.org/repos/asf/spark/blob/7c30ae39/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index d4d281e..4d41354 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -797,6 +797,19 @@ class FilterPushdownSuite extends PlanTest {
     comparePlans(optimized, correctAnswer)
   }
 
+  test("aggregate: don't push filters if the aggregate has no grouping 
expressions") {
+    val originalQuery = LocalRelation.apply(testRelation.output, Seq.empty)
+      .select('a, 'b)
+      .groupBy()(count(1))
+      .where(false)
+
+    val optimized = Optimize.execute(originalQuery.analyze)
+
+    val correctAnswer = originalQuery.analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
+
   test("broadcast hint") {
     val originalQuery = ResolvedHint(testRelation)
       .where('a === 2L && 'b + Rand(10).as("rnd") === 3)

http://git-wip-us.apache.org/repos/asf/spark/blob/7c30ae39/sql/core/src/test/resources/sql-tests/inputs/group-by.sql
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/sql-tests/inputs/group-by.sql 
b/sql/core/src/test/resources/sql-tests/inputs/group-by.sql
index 1e13845..c5070b7 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/group-by.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/group-by.sql
@@ -60,3 +60,12 @@ SELECT a, COUNT(1) FROM testData WHERE false GROUP BY a;
 -- Aggregate with empty input and empty GroupBy expressions.
 SELECT COUNT(1) FROM testData WHERE false;
 SELECT 1 FROM (SELECT COUNT(1) FROM testData WHERE false) t;
+
+-- Aggregate with empty GroupBy expressions and filter on top
+SELECT 1 from (
+  SELECT 1 AS z,
+  MIN(a.x)
+  FROM (select 1 as x) a
+  WHERE false
+) b
+where b.z != b.z

http://git-wip-us.apache.org/repos/asf/spark/blob/7c30ae39/sql/core/src/test/resources/sql-tests/results/group-by.sql.out
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/sql-tests/results/group-by.sql.out 
b/sql/core/src/test/resources/sql-tests/results/group-by.sql.out
index 42e8230..ed66c03 100644
--- a/sql/core/src/test/resources/sql-tests/results/group-by.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/group-by.sql.out
@@ -1,5 +1,5 @@
 -- Automatically generated by SQLQueryTestSuite
--- Number of queries: 25
+-- Number of queries: 26
 
 
 -- !query 0
@@ -227,3 +227,17 @@ SELECT 1 FROM (SELECT COUNT(1) FROM testData WHERE false) t
 struct<1:int>
 -- !query 24 output
 1
+
+
+-- !query 25
+SELECT 1 from (
+  SELECT 1 AS z,
+  MIN(a.x)
+  FROM (select 1 as x) a
+  WHERE false
+) b
+where b.z != b.z
+-- !query 25 schema
+struct<1:int>
+-- !query 25 output
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to