Repository: spark Updated Branches: refs/heads/branch-1.6 1678bff7f -> d1654864a
[SPARK-14495][SQL][1.6] fix resolution failure of having clause with distinct aggregate function #### Symptom: In the latest **branch 1.6**, when a `DISTINCT` aggregation function is used in the `HAVING` clause, Analyzer throws `AnalysisException` with a message like following: ``` resolved attribute(s) gid#558,id#559 missing from date#554,id#555 in operator !Expand [List(date#554, null, 0, if ((gid#558 = 1)) id#559 else null),List(date#554, id#555, 1, null)], [date#554,id#561,gid#560,if ((gid = 1)) id else null#562]; ``` #### Root cause: The problem is that the distinct aggregate in having condition are resolved by the rule `DistinctAggregationRewriter` twice, which messes up the resulted `EXPAND` operator. In a `ResolveAggregateFunctions` rule, when resolving ```Filter(havingCondition, _: Aggregate)```, the `havingCondition` is resolved as an `Aggregate` in a nested loop of analyzer rule execution (by invoking `RuleExecutor.execute`). At this nested level of analysis, the rule `DistinctAggregationRewriter` rewrites this distinct aggregate clause to an expanded two-layer aggregation, where the `aggregateExpresssions` of the final `Aggregate` contains the resolved `gid` and the aggregate expression attributes (In the above case, they are `gid#558, id#559`). After completion of the nested analyzer rule execution, the resulted `aggregateExpressions` in the `havingCondition` is pushed down into the underlying `Aggregate` operator. The `DistinctAggregationRewriter` rule is executed again. The `projections` field of `EXPAND` operator is populated with the `aggregateExpressions` of the `havingCondition` mentioned above. However, the attributes (In the above case, they are `gid#558, id#559`) in the projection list of `EXPAND` operator can not be found in the underlying relation. #### Solution: This PR retrofits part of [#11579](https://github.com/apache/spark/pull/11579) that moves the `DistinctAggregationRewriter` to the beginning of Optimizer, so that it guarantees that the rewrite only happens after all the aggregate functions are resolved first. Thus, it avoids resolution failure. #### How is the PR change tested New [test cases ](https://github.com/xwu0226/spark/blob/f73428f94746d6d074baf6702589545bdbd11cad/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala#L927-L988) are added to drive `DistinctAggregationRewriter` rewrites for multi-distinct aggregations , involving having clause. A following up PR will be submitted to add these test cases to master(2.0) branch. Author: xin Wu <xi...@us.ibm.com> Closes #12974 from xwu0226/SPARK-14495_review. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d1654864 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d1654864 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d1654864 Branch: refs/heads/branch-1.6 Commit: d1654864a60503a5e495a1261f55ceb89f916984 Parents: 1678bff Author: xin Wu <xi...@us.ibm.com> Authored: Wed May 11 16:30:45 2016 +0800 Committer: Wenchen Fan <wenc...@databricks.com> Committed: Wed May 11 16:30:45 2016 +0800 ---------------------------------------------------------------------- .../spark/sql/catalyst/analysis/Analyzer.scala | 1 - .../analysis/DistinctAggregationRewriter.scala | 5 +---- .../sql/catalyst/optimizer/Optimizer.scala | 20 ++++++++++++++++---- .../expressions/ExpressionEvalHelper.scala | 4 ++-- .../expressions/MathFunctionsSuite.scala | 4 ++-- .../scala/org/apache/spark/sql/SQLContext.scala | 2 +- .../spark/sql/execution/PlannerSuite.scala | 8 +++++--- .../hive/execution/AggregationQuerySuite.scala | 16 ++++++++++++++++ 8 files changed, 43 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/d1654864/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index bc62c7f..04f62d7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -80,7 +80,6 @@ class Analyzer( ExtractWindowExpressions :: GlobalAggregates :: ResolveAggregateFunctions :: - DistinctAggregationRewriter(conf) :: HiveTypeCoercion.typeCoercionRules ++ extendedResolutionRules : _*), Batch("Nondeterministic", Once, http://git-wip-us.apache.org/repos/asf/spark/blob/d1654864/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala index 9c78f6d..47d6d36 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala @@ -102,11 +102,8 @@ import org.apache.spark.sql.types.IntegerType */ case class DistinctAggregationRewriter(conf: CatalystConf) extends Rule[LogicalPlan] { - def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { - case p if !p.resolved => p - // We need to wait until this Aggregate operator is resolved. + def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { case a: Aggregate => rewrite(a) - case p => p } def rewrite(a: Aggregate): Aggregate = { http://git-wip-us.apache.org/repos/asf/spark/blob/d1654864/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 682b860..676e0b7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -18,7 +18,8 @@ package org.apache.spark.sql.catalyst.optimizer import scala.collection.immutable.HashSet -import org.apache.spark.sql.catalyst.analysis.{CleanupAliases, EliminateSubQueries} +import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf} +import org.apache.spark.sql.catalyst.analysis.{CleanupAliases, DistinctAggregationRewriter, EliminateSubQueries} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.plans.Inner @@ -30,14 +31,13 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.types._ -abstract class Optimizer extends RuleExecutor[LogicalPlan] - -object DefaultOptimizer extends Optimizer { +abstract class Optimizer(conf: CatalystConf) extends RuleExecutor[LogicalPlan] { val batches = // SubQueries are only needed for analysis and can be removed before execution. Batch("Remove SubQueries", FixedPoint(100), EliminateSubQueries) :: Batch("Aggregate", FixedPoint(100), + DistinctAggregationRewriter(conf), ReplaceDistinctWithAggregate, RemoveLiteralFromGroupExpressions) :: Batch("Operator Optimizations", FixedPoint(100), @@ -68,6 +68,18 @@ object DefaultOptimizer extends Optimizer { Batch("LocalRelation", FixedPoint(100), ConvertToLocalRelation) :: Nil } +case class DefaultOptimizer(conf: CatalystConf) extends Optimizer(conf) + +/** + * An optimizer used in test code. + * + * To ensure extendability, we leave the standard rules in the abstract optimizer rules, while + * specific rules go to the subclasses + */ +object SimpleTestOptimizer extends SimpleTestOptimizer + +class SimpleTestOptimizer extends Optimizer( + new SimpleCatalystConf(caseSensitiveAnalysis = true)) /** * Pushes operations down into a Sample. http://git-wip-us.apache.org/repos/asf/spark/blob/d1654864/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala index 465f7d0..074785e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala @@ -24,7 +24,7 @@ import org.scalatest.prop.GeneratorDrivenPropertyChecks import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.expressions.codegen._ -import org.apache.spark.sql.catalyst.optimizer.DefaultOptimizer +import org.apache.spark.sql.catalyst.optimizer.SimpleTestOptimizer import org.apache.spark.sql.catalyst.plans.logical.{OneRowRelation, Project} import org.apache.spark.sql.types.DataType @@ -189,7 +189,7 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks { expected: Any, inputRow: InternalRow = EmptyRow): Unit = { val plan = Project(Alias(expression, s"Optimized($expression)")() :: Nil, OneRowRelation) - val optimizedPlan = DefaultOptimizer.execute(plan) + val optimizedPlan = SimpleTestOptimizer.execute(plan) checkEvaluationWithoutCodegen(optimizedPlan.expressions.head, expected, inputRow) } http://git-wip-us.apache.org/repos/asf/spark/blob/d1654864/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala index aacc56f..90f9096 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala @@ -23,7 +23,7 @@ import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateProjection, GenerateMutableProjection} import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.optimizer.DefaultOptimizer +import org.apache.spark.sql.catalyst.optimizer.SimpleTestOptimizer import org.apache.spark.sql.catalyst.plans.logical.{OneRowRelation, Project} import org.apache.spark.sql.types._ @@ -150,7 +150,7 @@ class MathFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper { expression: Expression, inputRow: InternalRow = EmptyRow): Unit = { val plan = Project(Alias(expression, s"Optimized($expression)")() :: Nil, OneRowRelation) - val optimizedPlan = DefaultOptimizer.execute(plan) + val optimizedPlan = SimpleTestOptimizer.execute(plan) checkNaNWithoutCodegen(optimizedPlan.expressions.head, inputRow) } http://git-wip-us.apache.org/repos/asf/spark/blob/d1654864/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 47fd7fc..8a07cee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -202,7 +202,7 @@ class SQLContext private[sql]( } @transient - protected[sql] lazy val optimizer: Optimizer = DefaultOptimizer + protected[sql] lazy val optimizer: Optimizer = DefaultOptimizer(conf) @transient protected[sql] val ddlParser = new DDLParser(sqlParser.parse(_)) http://git-wip-us.apache.org/repos/asf/spark/blob/d1654864/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index 2fb439f..7d7c39c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -44,10 +44,12 @@ class PlannerSuite extends SharedSQLContext { fail(s"Could query play aggregation query $query. Is it an aggregation query?")) val aggregations = planned.collect { case n if n.nodeName contains "Aggregate" => n } - // For the new aggregation code path, there will be four aggregate operator for - // distinct aggregations. + // For the new aggregation code path, there will be three aggregate operator for + // distinct aggregations. There used to be four aggregate operators because single + // distinct aggregate used to trigger DistinctAggregationRewriter rewrite. Now the + // the rewrite only happens when there are multiple distinct aggregations. assert( - aggregations.size == 2 || aggregations.size == 4, + aggregations.size == 2 || aggregations.size == 3, s"The plan of query $query does not have partial aggregations.") } http://git-wip-us.apache.org/repos/asf/spark/blob/d1654864/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala index 64bff82..d21227a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala @@ -930,6 +930,22 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te Row(11) :: Nil) } } + + test("SPARK-14495: distinct aggregate in having clause") { + checkAnswer( + sqlContext.sql( + """ + |select key, count(distinct value1), count(distinct value2) + |from agg2 group by key + |having count(distinct value1) > 0 + """.stripMargin), + Seq( + Row(null, 3, 3), + Row(1, 2, 3), + Row(2, 2, 1) + ) + ) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org