Repository: spark
Updated Branches:
  refs/heads/master 3d2131ab4 -> a90c5cd82


[SPARK-20686][SQL] PropagateEmptyRelation incorrectly handles aggregate without 
grouping

## What changes were proposed in this pull request?

The query

```
SELECT 1 FROM (SELECT COUNT(*) WHERE FALSE) t1
```

should return a single row of output because the subquery is an aggregate 
without a group-by and thus should return a single row. However, Spark 
incorrectly returns zero rows.

This is caused by SPARK-16208 / #13906, a patch which added an optimizer rule 
to propagate EmptyRelation through operators. The logic for handling aggregates 
is wrong: it checks whether aggregate expressions are non-empty for deciding 
whether the output should be empty, whereas it should be checking grouping 
expressions instead:

An aggregate with non-empty grouping expression will return one output row per 
group. If the input to the grouped aggregate is empty then all groups will be 
empty and thus the output will be empty. It doesn't matter whether the 
aggregation output columns include aggregate expressions since that won't 
affect the number of output rows.

If the grouping expressions are empty, however, then the aggregate will always 
produce a single output row and thus we cannot propagate the EmptyRelation.

The current implementation is incorrect and also misses an optimization 
opportunity by not propagating EmptyRelation in the case where a grouped 
aggregate has aggregate expressions (in other words, `SELECT COUNT(*) from 
emptyRelation GROUP BY x` would _not_ be optimized to `EmptyRelation` in the 
old code, even though it safely could be).

This patch resolves this issue by modifying `PropagateEmptyRelation` to 
consider only the presence/absence of grouping expressions, not the aggregate 
functions themselves, when deciding whether to propagate EmptyRelation.

## How was this patch tested?

- Added end-to-end regression tests in `SQLQueryTest`'s `group-by.sql` file.
- Updated unit tests in `PropagateEmptyRelationSuite`.

Author: Josh Rosen <joshro...@databricks.com>

Closes #17929 from JoshRosen/fix-PropagateEmptyRelation.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a90c5cd8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a90c5cd8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a90c5cd8

Branch: refs/heads/master
Commit: a90c5cd8226146a58362732171b92cb99a7bc4c7
Parents: 3d2131a
Author: Josh Rosen <joshro...@databricks.com>
Authored: Wed May 10 14:36:36 2017 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Wed May 10 14:36:36 2017 +0800

----------------------------------------------------------------------
 .../optimizer/PropagateEmptyRelation.scala      | 16 ++++++------
 .../optimizer/PropagateEmptyRelationSuite.scala |  8 +++---
 .../resources/sql-tests/inputs/group-by.sql     |  7 ++++++
 .../sql-tests/results/group-by.sql.out          | 26 +++++++++++++++++++-
 4 files changed, 44 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a90c5cd8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala
index 7400a01..987cd74 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala
@@ -18,7 +18,6 @@
 package org.apache.spark.sql.catalyst.optimizer
 
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
@@ -30,7 +29,7 @@ import org.apache.spark.sql.catalyst.rules._
  *    - Join with one or two empty children (including Intersect/Except).
  * 2. Unary-node Logical Plans
  *    - Project/Filter/Sample/Join/Limit/Repartition with all empty children.
- *    - Aggregate with all empty children and without AggregateFunction 
expressions like COUNT.
+ *    - Aggregate with all empty children and at least one grouping expression.
  *    - Generate(Explode) with all empty children. Others like Hive UDTF may 
return results.
  */
 object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper {
@@ -39,10 +38,6 @@ object PropagateEmptyRelation extends Rule[LogicalPlan] with 
PredicateHelper {
     case _ => false
   }
 
-  private def containsAggregateExpression(e: Expression): Boolean = {
-    e.collectFirst { case _: AggregateFunction => () }.isDefined
-  }
-
   private def empty(plan: LogicalPlan) = LocalRelation(plan.output, data = 
Seq.empty)
 
   def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
@@ -68,8 +63,13 @@ object PropagateEmptyRelation extends Rule[LogicalPlan] with 
PredicateHelper {
       case _: LocalLimit => empty(p)
       case _: Repartition => empty(p)
       case _: RepartitionByExpression => empty(p)
-      // AggregateExpressions like COUNT(*) return their results like 0.
-      case Aggregate(_, ae, _) if !ae.exists(containsAggregateExpression) => 
empty(p)
+      // An aggregate with non-empty group expression will return one output 
row per group when the
+      // input to the aggregate is not empty. If the input to the aggregate is 
empty then all groups
+      // will be empty and thus the output will be empty.
+      //
+      // If the grouping expressions are empty, however, then the aggregate 
will always produce a
+      // single output row and thus we cannot propagate the EmptyRelation.
+      case Aggregate(ge, _, _) if ge.nonEmpty => empty(p)
       // Generators like Hive-style UDTF may return their records within 
`close`.
       case Generate(_: Explode, _, _, _, _, _) => empty(p)
       case _ => p

http://git-wip-us.apache.org/repos/asf/spark/blob/a90c5cd8/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala
index c261a60..38dff47 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala
@@ -142,7 +142,7 @@ class PropagateEmptyRelationSuite extends PlanTest {
     comparePlans(optimized, correctAnswer.analyze)
   }
 
-  test("propagate empty relation through Aggregate without aggregate 
function") {
+  test("propagate empty relation through Aggregate with grouping expressions") 
{
     val query = testRelation1
       .where(false)
       .groupBy('a)('a, ('a + 1).as('x))
@@ -153,13 +153,13 @@ class PropagateEmptyRelationSuite extends PlanTest {
     comparePlans(optimized, correctAnswer)
   }
 
-  test("don't propagate empty relation through Aggregate with aggregate 
function") {
+  test("don't propagate empty relation through Aggregate without grouping 
expressions") {
     val query = testRelation1
       .where(false)
-      .groupBy('a)(count('a))
+      .groupBy()()
 
     val optimized = Optimize.execute(query.analyze)
-    val correctAnswer = LocalRelation('a.int).groupBy('a)(count('a)).analyze
+    val correctAnswer = LocalRelation('a.int).groupBy()().analyze
 
     comparePlans(optimized, correctAnswer)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/a90c5cd8/sql/core/src/test/resources/sql-tests/inputs/group-by.sql
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/sql-tests/inputs/group-by.sql 
b/sql/core/src/test/resources/sql-tests/inputs/group-by.sql
index a7994f3..1e13845 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/group-by.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/group-by.sql
@@ -53,3 +53,10 @@ set spark.sql.groupByAliases=false;
 
 -- Check analysis exceptions
 SELECT a AS k, COUNT(b) FROM testData GROUP BY k;
+
+-- Aggregate with empty input and non-empty GroupBy expressions.
+SELECT a, COUNT(1) FROM testData WHERE false GROUP BY a;
+
+-- Aggregate with empty input and empty GroupBy expressions.
+SELECT COUNT(1) FROM testData WHERE false;
+SELECT 1 FROM (SELECT COUNT(1) FROM testData WHERE false) t;

http://git-wip-us.apache.org/repos/asf/spark/blob/a90c5cd8/sql/core/src/test/resources/sql-tests/results/group-by.sql.out
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/sql-tests/results/group-by.sql.out 
b/sql/core/src/test/resources/sql-tests/results/group-by.sql.out
index 6bf9dff..42e8230 100644
--- a/sql/core/src/test/resources/sql-tests/results/group-by.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/group-by.sql.out
@@ -1,5 +1,5 @@
 -- Automatically generated by SQLQueryTestSuite
--- Number of queries: 22
+-- Number of queries: 25
 
 
 -- !query 0
@@ -203,3 +203,27 @@ struct<>
 -- !query 21 output
 org.apache.spark.sql.AnalysisException
 cannot resolve '`k`' given input columns: [a, b]; line 1 pos 47
+
+
+-- !query 22
+SELECT a, COUNT(1) FROM testData WHERE false GROUP BY a
+-- !query 22 schema
+struct<a:int,count(1):bigint>
+-- !query 22 output
+
+
+
+-- !query 23
+SELECT COUNT(1) FROM testData WHERE false
+-- !query 23 schema
+struct<count(1):bigint>
+-- !query 23 output
+0
+
+
+-- !query 24
+SELECT 1 FROM (SELECT COUNT(1) FROM testData WHERE false) t
+-- !query 24 schema
+struct<1:int>
+-- !query 24 output
+1


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to