Repository: spark Updated Branches: refs/heads/branch-2.0 e77a437d2 -> 62ab53658
[SPARK-17114][SQL] Fix aggregates grouped by literals with empty input ## What changes were proposed in this pull request? This PR fixes an issue with aggregates that have an empty input, and use a literals as their grouping keys. These aggregates are currently interpreted as aggregates **without** grouping keys, this triggers the ungrouped code path (which aways returns a single row). This PR fixes the `RemoveLiteralFromGroupExpressions` optimizer rule, which changes the semantics of the Aggregate by eliminating all literal grouping keys. ## How was this patch tested? Added tests to `SQLQueryTestSuite`. Author: Herman van Hovell <hvanhov...@databricks.com> Closes #15101 from hvanhovell/SPARK-17114-3. (cherry picked from commit d403562eb4b5b1d804909861d3e8b75d8f6323b9) Signed-off-by: Herman van Hovell <hvanhov...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/62ab5365 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/62ab5365 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/62ab5365 Branch: refs/heads/branch-2.0 Commit: 62ab536588e19293a84004f547ebc316346b869e Parents: e77a437 Author: Herman van Hovell <hvanhov...@databricks.com> Authored: Thu Sep 15 20:24:15 2016 +0200 Committer: Herman van Hovell <hvanhov...@databricks.com> Committed: Thu Sep 15 20:24:29 2016 +0200 ---------------------------------------------------------------------- .../sql/catalyst/optimizer/Optimizer.scala | 11 ++++- .../optimizer/AggregateOptimizeSuite.scala | 10 +++- .../resources/sql-tests/inputs/group-by.sql | 17 +++++++ .../sql-tests/results/group-by.sql.out | 51 ++++++++++++++++++++ 4 files changed, 86 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/62ab5365/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index e743898..d824c2e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1439,9 +1439,16 @@ object ReplaceExceptWithAntiJoin extends Rule[LogicalPlan] { */ object RemoveLiteralFromGroupExpressions extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { - case a @ Aggregate(grouping, _, _) => + case a @ Aggregate(grouping, _, _) if grouping.nonEmpty => val newGrouping = grouping.filter(!_.foldable) - a.copy(groupingExpressions = newGrouping) + if (newGrouping.nonEmpty) { + a.copy(groupingExpressions = newGrouping) + } else { + // All grouping expressions are literals. We should not drop them all, because this can + // change the return semantics when the input of the Aggregate is empty (SPARK-17114). We + // instead replace this by single, easy to hash/sort, literal expression. + a.copy(groupingExpressions = Seq(Literal(0, IntegerType))) + } } } http://git-wip-us.apache.org/repos/asf/spark/blob/62ab5365/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/AggregateOptimizeSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/AggregateOptimizeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/AggregateOptimizeSuite.scala index 4c26c18..aecf59a 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/AggregateOptimizeSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/AggregateOptimizeSuite.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} import org.apache.spark.sql.catalyst.rules.RuleExecutor class AggregateOptimizeSuite extends PlanTest { - val conf = new SimpleCatalystConf(caseSensitiveAnalysis = false) + val conf = SimpleCatalystConf(caseSensitiveAnalysis = false, groupByOrdinal = false) val catalog = new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry, conf) val analyzer = new Analyzer(catalog, conf) @@ -49,6 +49,14 @@ class AggregateOptimizeSuite extends PlanTest { comparePlans(optimized, correctAnswer) } + test("do not remove all grouping expressions if they are all literals") { + val query = testRelation.groupBy(Literal("1"), Literal(1) + Literal(2))(sum('b)) + val optimized = Optimize.execute(analyzer.execute(query)) + val correctAnswer = analyzer.execute(testRelation.groupBy(Literal(0))(sum('b))) + + comparePlans(optimized, correctAnswer) + } + test("Remove aliased literals") { val query = testRelation.select('a, Literal(1).as('y)).groupBy('a, 'y)(sum('b)) val optimized = Optimize.execute(analyzer.execute(query)) http://git-wip-us.apache.org/repos/asf/spark/blob/62ab5365/sql/core/src/test/resources/sql-tests/inputs/group-by.sql ---------------------------------------------------------------------- diff --git a/sql/core/src/test/resources/sql-tests/inputs/group-by.sql b/sql/core/src/test/resources/sql-tests/inputs/group-by.sql new file mode 100644 index 0000000..6741703 --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/inputs/group-by.sql @@ -0,0 +1,17 @@ +-- Temporary data. +create temporary view myview as values 128, 256 as v(int_col); + +-- group by should produce all input rows, +select int_col, count(*) from myview group by int_col; + +-- group by should produce a single row. +select 'foo', count(*) from myview group by 1; + +-- group-by should not produce any rows (whole stage code generation). +select 'foo' from myview where int_col == 0 group by 1; + +-- group-by should not produce any rows (hash aggregate). +select 'foo', approx_count_distinct(int_col) from myview where int_col == 0 group by 1; + +-- group-by should not produce any rows (sort aggregate). +select 'foo', max(struct(int_col)) from myview where int_col == 0 group by 1; http://git-wip-us.apache.org/repos/asf/spark/blob/62ab5365/sql/core/src/test/resources/sql-tests/results/group-by.sql.out ---------------------------------------------------------------------- diff --git a/sql/core/src/test/resources/sql-tests/results/group-by.sql.out b/sql/core/src/test/resources/sql-tests/results/group-by.sql.out new file mode 100644 index 0000000..9127bd4 --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/results/group-by.sql.out @@ -0,0 +1,51 @@ +-- Automatically generated by SQLQueryTestSuite +-- Number of queries: 6 + + +-- !query 0 +create temporary view myview as values 128, 256 as v(int_col) +-- !query 0 schema +struct<> +-- !query 0 output + + + +-- !query 1 +select int_col, count(*) from myview group by int_col +-- !query 1 schema +struct<int_col:int,count(1):bigint> +-- !query 1 output +128 1 +256 1 + + +-- !query 2 +select 'foo', count(*) from myview group by 1 +-- !query 2 schema +struct<foo:string,count(1):bigint> +-- !query 2 output +foo 2 + + +-- !query 3 +select 'foo' from myview where int_col == 0 group by 1 +-- !query 3 schema +struct<foo:string> +-- !query 3 output + + + +-- !query 4 +select 'foo', approx_count_distinct(int_col) from myview where int_col == 0 group by 1 +-- !query 4 schema +struct<foo:string,approx_count_distinct(int_col):bigint> +-- !query 4 output + + + +-- !query 5 +select 'foo', max(struct(int_col)) from myview where int_col == 0 group by 1 +-- !query 5 schema +struct<foo:string,max(struct(int_col)):struct<int_col:int>> +-- !query 5 output + --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org