Repository: spark
Updated Branches:
  refs/heads/branch-2.0 81004f13f -> a804c9260


[SPARK-16644][SQL] Aggregate should not propagate constraints containing 
aggregate expressions

aggregate expressions can only be executed inside `Aggregate`, if we propagate 
it up with constraints, the parent operator can not execute it and will fail at 
runtime.

new test in SQLQuerySuite

Author: Wenchen Fan <wenc...@databricks.com>
Author: Yin Huai <yh...@databricks.com>

Closes #14281 from cloud-fan/bug.

(cherry picked from commit cfa5ae84ed0f48b3b108d0614dbf6fcd79ef5179)
Signed-off-by: Yin Huai <yh...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a804c926
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a804c926
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a804c926

Branch: refs/heads/branch-2.0
Commit: a804c92604253720fdccf117915db58c7768a6b8
Parents: 81004f1
Author: Wenchen Fan <wenc...@databricks.com>
Authored: Wed Jul 20 18:37:15 2016 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Wed Jul 20 18:47:42 2016 -0700

----------------------------------------------------------------------
 .../plans/logical/basicLogicalOperators.scala      |  6 ++++--
 .../plans/ConstraintPropagationSuite.scala         |  6 ++++--
 .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 17 +++++++++++++++++
 3 files changed, 25 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a804c926/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index c0e400f..b31f5aa 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -483,8 +483,10 @@ case class Aggregate(
   override def output: Seq[Attribute] = aggregateExpressions.map(_.toAttribute)
   override def maxRows: Option[Long] = child.maxRows
 
-  override def validConstraints: Set[Expression] =
-    child.constraints.union(getAliasedConstraints(aggregateExpressions))
+  override def validConstraints: Set[Expression] = {
+    val nonAgg = 
aggregateExpressions.filter(_.find(_.isInstanceOf[AggregateExpression]).isEmpty)
+    child.constraints.union(getAliasedConstraints(nonAgg))
+  }
 
   override lazy val statistics: Statistics = {
     if (groupingExpressions.isEmpty) {

http://git-wip-us.apache.org/repos/asf/spark/blob/a804c926/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala
index 0b73b5e..5a76969 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala
@@ -79,13 +79,15 @@ class ConstraintPropagationSuite extends SparkFunSuite {
     assert(tr.analyze.constraints.isEmpty)
 
     val aliasedRelation = tr.where('c.attr > 10 && 'a.attr < 5)
-      .groupBy('a, 'c, 'b)('a, 'c.as("c1"), count('a).as("a3")).select('c1, 
'a).analyze
+      .groupBy('a, 'c, 'b)('a, 'c.as("c1"), count('a).as("a3")).select('c1, 
'a, 'a3).analyze
 
+    // SPARK-16644: aggregate expression count(a) should not appear in the 
constraints.
     verifyConstraints(aliasedRelation.analyze.constraints,
       ExpressionSet(Seq(resolveColumn(aliasedRelation.analyze, "c1") > 10,
         IsNotNull(resolveColumn(aliasedRelation.analyze, "c1")),
         resolveColumn(aliasedRelation.analyze, "a") < 5,
-        IsNotNull(resolveColumn(aliasedRelation.analyze, "a")))))
+        IsNotNull(resolveColumn(aliasedRelation.analyze, "a")),
+        IsNotNull(resolveColumn(aliasedRelation.analyze, "a3")))))
   }
 
   test("propagating constraints in expand") {

http://git-wip-us.apache.org/repos/asf/spark/blob/a804c926/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index ede7d9a..8a5ff2c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -2929,4 +2929,21 @@ class SQLQuerySuite extends QueryTest with 
SharedSQLContext {
       sql(s"SELECT '$literal' AS DUMMY"),
       Row(s"$expected") :: Nil)
   }
+
+  test("SPARK-16644: Aggregate should not put aggregate expressions to 
constraints") {
+    withTable("tbl") {
+      sql("CREATE TABLE tbl(a INT, b INT) USING parquet")
+      checkAnswer(sql(
+        """
+          |SELECT
+          |  a,
+          |  MAX(b) AS c1,
+          |  b AS c2
+          |FROM tbl
+          |WHERE a = b
+          |GROUP BY a, b
+          |HAVING c1 = 1
+        """.stripMargin), Nil)
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to