Repository: spark Updated Branches: refs/heads/branch-2.0 81004f13f -> a804c9260
[SPARK-16644][SQL] Aggregate should not propagate constraints containing aggregate expressions aggregate expressions can only be executed inside `Aggregate`, if we propagate it up with constraints, the parent operator can not execute it and will fail at runtime. new test in SQLQuerySuite Author: Wenchen Fan <wenc...@databricks.com> Author: Yin Huai <yh...@databricks.com> Closes #14281 from cloud-fan/bug. (cherry picked from commit cfa5ae84ed0f48b3b108d0614dbf6fcd79ef5179) Signed-off-by: Yin Huai <yh...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a804c926 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a804c926 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a804c926 Branch: refs/heads/branch-2.0 Commit: a804c92604253720fdccf117915db58c7768a6b8 Parents: 81004f1 Author: Wenchen Fan <wenc...@databricks.com> Authored: Wed Jul 20 18:37:15 2016 -0700 Committer: Yin Huai <yh...@databricks.com> Committed: Wed Jul 20 18:47:42 2016 -0700 ---------------------------------------------------------------------- .../plans/logical/basicLogicalOperators.scala | 6 ++++-- .../plans/ConstraintPropagationSuite.scala | 6 ++++-- .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 17 +++++++++++++++++ 3 files changed, 25 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/a804c926/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index c0e400f..b31f5aa 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -483,8 +483,10 @@ case class Aggregate( override def output: Seq[Attribute] = aggregateExpressions.map(_.toAttribute) override def maxRows: Option[Long] = child.maxRows - override def validConstraints: Set[Expression] = - child.constraints.union(getAliasedConstraints(aggregateExpressions)) + override def validConstraints: Set[Expression] = { + val nonAgg = aggregateExpressions.filter(_.find(_.isInstanceOf[AggregateExpression]).isEmpty) + child.constraints.union(getAliasedConstraints(nonAgg)) + } override lazy val statistics: Statistics = { if (groupingExpressions.isEmpty) { http://git-wip-us.apache.org/repos/asf/spark/blob/a804c926/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala index 0b73b5e..5a76969 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala @@ -79,13 +79,15 @@ class ConstraintPropagationSuite extends SparkFunSuite { assert(tr.analyze.constraints.isEmpty) val aliasedRelation = tr.where('c.attr > 10 && 'a.attr < 5) - .groupBy('a, 'c, 'b)('a, 'c.as("c1"), count('a).as("a3")).select('c1, 'a).analyze + .groupBy('a, 'c, 'b)('a, 'c.as("c1"), count('a).as("a3")).select('c1, 'a, 'a3).analyze + // SPARK-16644: aggregate expression count(a) should not appear in the constraints. verifyConstraints(aliasedRelation.analyze.constraints, ExpressionSet(Seq(resolveColumn(aliasedRelation.analyze, "c1") > 10, IsNotNull(resolveColumn(aliasedRelation.analyze, "c1")), resolveColumn(aliasedRelation.analyze, "a") < 5, - IsNotNull(resolveColumn(aliasedRelation.analyze, "a"))))) + IsNotNull(resolveColumn(aliasedRelation.analyze, "a")), + IsNotNull(resolveColumn(aliasedRelation.analyze, "a3"))))) } test("propagating constraints in expand") { http://git-wip-us.apache.org/repos/asf/spark/blob/a804c926/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index ede7d9a..8a5ff2c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -2929,4 +2929,21 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { sql(s"SELECT '$literal' AS DUMMY"), Row(s"$expected") :: Nil) } + + test("SPARK-16644: Aggregate should not put aggregate expressions to constraints") { + withTable("tbl") { + sql("CREATE TABLE tbl(a INT, b INT) USING parquet") + checkAnswer(sql( + """ + |SELECT + | a, + | MAX(b) AS c1, + | b AS c2 + |FROM tbl + |WHERE a = b + |GROUP BY a, b + |HAVING c1 = 1 + """.stripMargin), Nil) + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org