Repository: spark
Updated Branches:
  refs/heads/branch-1.6 1678bff7f -> d1654864a


[SPARK-14495][SQL][1.6] fix resolution failure of having clause with distinct 
aggregate function

#### Symptom:
In the latest **branch 1.6**, when a `DISTINCT` aggregation function is used in 
the `HAVING` clause, Analyzer throws `AnalysisException` with a message like 
following:
```
resolved attribute(s) gid#558,id#559 missing from date#554,id#555 in operator 
!Expand [List(date#554, null, 0, if ((gid#558 = 1)) id#559 else 
null),List(date#554, id#555, 1, null)], [date#554,id#561,gid#560,if ((gid = 1)) 
id else null#562];
```
#### Root cause:
The problem is that the distinct aggregate in having condition are resolved by 
the rule `DistinctAggregationRewriter` twice, which messes up the resulted 
`EXPAND` operator.

In a `ResolveAggregateFunctions` rule, when resolving 
```Filter(havingCondition, _: Aggregate)```, the `havingCondition` is resolved 
as an `Aggregate` in a nested loop of analyzer rule execution (by invoking 
`RuleExecutor.execute`). At this nested level of analysis, the rule 
`DistinctAggregationRewriter` rewrites this distinct aggregate clause to an 
expanded two-layer aggregation, where the `aggregateExpresssions` of the final 
`Aggregate` contains the resolved `gid` and the aggregate expression attributes 
(In the above case, they are  `gid#558, id#559`).

After completion of the nested analyzer rule execution, the resulted 
`aggregateExpressions` in the `havingCondition` is pushed down into the 
underlying `Aggregate` operator. The `DistinctAggregationRewriter` rule is 
executed again. The `projections` field of `EXPAND` operator is populated with 
the `aggregateExpressions` of the `havingCondition` mentioned above. However, 
the attributes (In the above case, they are `gid#558, id#559`) in the 
projection list of `EXPAND` operator can not be found in the underlying 
relation.

#### Solution:
This PR retrofits part of [#11579](https://github.com/apache/spark/pull/11579) 
that moves the `DistinctAggregationRewriter` to the beginning of Optimizer, so 
that it guarantees that the rewrite only happens after all the aggregate 
functions are resolved first. Thus, it avoids resolution failure.

#### How is the PR change tested
New [test cases 
](https://github.com/xwu0226/spark/blob/f73428f94746d6d074baf6702589545bdbd11cad/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala#L927-L988)
 are added to drive `DistinctAggregationRewriter` rewrites for multi-distinct 
aggregations , involving having clause.

A following up PR will be submitted to add these test cases to master(2.0) 
branch.

Author: xin Wu <xi...@us.ibm.com>

Closes #12974 from xwu0226/SPARK-14495_review.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d1654864
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d1654864
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d1654864

Branch: refs/heads/branch-1.6
Commit: d1654864a60503a5e495a1261f55ceb89f916984
Parents: 1678bff
Author: xin Wu <xi...@us.ibm.com>
Authored: Wed May 11 16:30:45 2016 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Wed May 11 16:30:45 2016 +0800

----------------------------------------------------------------------
 .../spark/sql/catalyst/analysis/Analyzer.scala  |  1 -
 .../analysis/DistinctAggregationRewriter.scala  |  5 +----
 .../sql/catalyst/optimizer/Optimizer.scala      | 20 ++++++++++++++++----
 .../expressions/ExpressionEvalHelper.scala      |  4 ++--
 .../expressions/MathFunctionsSuite.scala        |  4 ++--
 .../scala/org/apache/spark/sql/SQLContext.scala |  2 +-
 .../spark/sql/execution/PlannerSuite.scala      |  8 +++++---
 .../hive/execution/AggregationQuerySuite.scala  | 16 ++++++++++++++++
 8 files changed, 43 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d1654864/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index bc62c7f..04f62d7 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -80,7 +80,6 @@ class Analyzer(
       ExtractWindowExpressions ::
       GlobalAggregates ::
       ResolveAggregateFunctions ::
-      DistinctAggregationRewriter(conf) ::
       HiveTypeCoercion.typeCoercionRules ++
       extendedResolutionRules : _*),
     Batch("Nondeterministic", Once,

http://git-wip-us.apache.org/repos/asf/spark/blob/d1654864/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala
index 9c78f6d..47d6d36 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala
@@ -102,11 +102,8 @@ import org.apache.spark.sql.types.IntegerType
  */
 case class DistinctAggregationRewriter(conf: CatalystConf) extends 
Rule[LogicalPlan] {
 
-  def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
-    case p if !p.resolved => p
-    // We need to wait until this Aggregate operator is resolved.
+  def apply(plan: LogicalPlan): LogicalPlan = plan transformUp  {
     case a: Aggregate => rewrite(a)
-    case p => p
   }
 
   def rewrite(a: Aggregate): Aggregate = {

http://git-wip-us.apache.org/repos/asf/spark/blob/d1654864/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 682b860..676e0b7 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -18,7 +18,8 @@
 package org.apache.spark.sql.catalyst.optimizer
 
 import scala.collection.immutable.HashSet
-import org.apache.spark.sql.catalyst.analysis.{CleanupAliases, 
EliminateSubQueries}
+import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf}
+import org.apache.spark.sql.catalyst.analysis.{CleanupAliases, 
DistinctAggregationRewriter, EliminateSubQueries}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.catalyst.plans.Inner
@@ -30,14 +31,13 @@ import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
 import org.apache.spark.sql.types._
 
-abstract class Optimizer extends RuleExecutor[LogicalPlan]
-
-object DefaultOptimizer extends Optimizer {
+abstract class Optimizer(conf: CatalystConf) extends RuleExecutor[LogicalPlan] 
{
   val batches =
     // SubQueries are only needed for analysis and can be removed before 
execution.
     Batch("Remove SubQueries", FixedPoint(100),
       EliminateSubQueries) ::
     Batch("Aggregate", FixedPoint(100),
+      DistinctAggregationRewriter(conf),
       ReplaceDistinctWithAggregate,
       RemoveLiteralFromGroupExpressions) ::
     Batch("Operator Optimizations", FixedPoint(100),
@@ -68,6 +68,18 @@ object DefaultOptimizer extends Optimizer {
     Batch("LocalRelation", FixedPoint(100),
       ConvertToLocalRelation) :: Nil
 }
+case class DefaultOptimizer(conf: CatalystConf) extends Optimizer(conf)
+
+/**
+ * An optimizer used in test code.
+ *
+ * To ensure extendability, we leave the standard rules in the abstract 
optimizer rules, while
+ * specific rules go to the subclasses
+ */
+object SimpleTestOptimizer extends SimpleTestOptimizer
+
+class SimpleTestOptimizer extends Optimizer(
+  new SimpleCatalystConf(caseSensitiveAnalysis = true))
 
 /**
  * Pushes operations down into a Sample.

http://git-wip-us.apache.org/repos/asf/spark/blob/d1654864/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
index 465f7d0..074785e 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
@@ -24,7 +24,7 @@ import org.scalatest.prop.GeneratorDrivenPropertyChecks
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
 import org.apache.spark.sql.catalyst.expressions.codegen._
-import org.apache.spark.sql.catalyst.optimizer.DefaultOptimizer
+import org.apache.spark.sql.catalyst.optimizer.SimpleTestOptimizer
 import org.apache.spark.sql.catalyst.plans.logical.{OneRowRelation, Project}
 import org.apache.spark.sql.types.DataType
 
@@ -189,7 +189,7 @@ trait ExpressionEvalHelper extends 
GeneratorDrivenPropertyChecks {
       expected: Any,
       inputRow: InternalRow = EmptyRow): Unit = {
     val plan = Project(Alias(expression, s"Optimized($expression)")() :: Nil, 
OneRowRelation)
-    val optimizedPlan = DefaultOptimizer.execute(plan)
+    val optimizedPlan = SimpleTestOptimizer.execute(plan)
     checkEvaluationWithoutCodegen(optimizedPlan.expressions.head, expected, 
inputRow)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d1654864/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala
index aacc56f..90f9096 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala
@@ -23,7 +23,7 @@ import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateProjection, 
GenerateMutableProjection}
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.optimizer.DefaultOptimizer
+import org.apache.spark.sql.catalyst.optimizer.SimpleTestOptimizer
 import org.apache.spark.sql.catalyst.plans.logical.{OneRowRelation, Project}
 import org.apache.spark.sql.types._
 
@@ -150,7 +150,7 @@ class MathFunctionsSuite extends SparkFunSuite with 
ExpressionEvalHelper {
     expression: Expression,
     inputRow: InternalRow = EmptyRow): Unit = {
     val plan = Project(Alias(expression, s"Optimized($expression)")() :: Nil, 
OneRowRelation)
-    val optimizedPlan = DefaultOptimizer.execute(plan)
+    val optimizedPlan = SimpleTestOptimizer.execute(plan)
     checkNaNWithoutCodegen(optimizedPlan.expressions.head, inputRow)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d1654864/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 47fd7fc..8a07cee 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -202,7 +202,7 @@ class SQLContext private[sql](
     }
 
   @transient
-  protected[sql] lazy val optimizer: Optimizer = DefaultOptimizer
+  protected[sql] lazy val optimizer: Optimizer = DefaultOptimizer(conf)
 
   @transient
   protected[sql] val ddlParser = new DDLParser(sqlParser.parse(_))

http://git-wip-us.apache.org/repos/asf/spark/blob/d1654864/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index 2fb439f..7d7c39c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -44,10 +44,12 @@ class PlannerSuite extends SharedSQLContext {
         fail(s"Could query play aggregation query $query. Is it an aggregation 
query?"))
     val aggregations = planned.collect { case n if n.nodeName contains 
"Aggregate" => n }
 
-    // For the new aggregation code path, there will be four aggregate 
operator for
-    // distinct aggregations.
+    // For the new aggregation code path, there will be three aggregate 
operator for
+    // distinct aggregations. There used to be four aggregate operators 
because single
+    // distinct aggregate used to trigger DistinctAggregationRewriter rewrite. 
Now the
+    // the rewrite only happens when there are multiple distinct aggregations.
     assert(
-      aggregations.size == 2 || aggregations.size == 4,
+      aggregations.size == 2 || aggregations.size == 3,
       s"The plan of query $query does not have partial aggregations.")
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d1654864/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
index 64bff82..d21227a 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
@@ -930,6 +930,22 @@ abstract class AggregationQuerySuite extends QueryTest 
with SQLTestUtils with Te
         Row(11) :: Nil)
     }
   }
+
+  test("SPARK-14495: distinct aggregate in having clause") {
+    checkAnswer(
+      sqlContext.sql(
+        """
+          |select key, count(distinct value1), count(distinct value2)
+          |from agg2 group by key
+          |having count(distinct value1) > 0
+        """.stripMargin),
+      Seq(
+        Row(null, 3, 3),
+        Row(1, 2, 3),
+        Row(2, 2, 1)
+      )
+    )
+  }
 }
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to