Repository: spark Updated Branches: refs/heads/branch-2.0 c864e8a80 -> 399597b04
[SPARK-17337][SPARK-16804][SQL][BRANCH-2.0] Backport subquery related PRs ## What changes were proposed in this pull request? This PR backports two subquery related PRs to branch-2.0: - https://github.com/apache/spark/pull/14411 - https://github.com/apache/spark/pull/15761 ## How was this patch tested? Added a tests to `SubquerySuite`. Author: Nattavut Sutyanyong <nsy....@gmail.com> Author: Herman van Hovell <hvanhov...@databricks.com> Closes #15772 from hvanhovell/SPARK-17337-2.0. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/399597b0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/399597b0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/399597b0 Branch: refs/heads/branch-2.0 Commit: 399597b04a83bbe3cc748c21446de0d808d08155 Parents: c864e8a Author: Herman van Hovell <hvanhov...@databricks.com> Authored: Fri Nov 4 15:54:58 2016 -0700 Committer: Reynold Xin <r...@databricks.com> Committed: Fri Nov 4 15:54:58 2016 -0700 ---------------------------------------------------------------------- .../spark/sql/catalyst/analysis/Analyzer.scala | 13 ++++++ .../sql/catalyst/optimizer/Optimizer.scala | 16 ++++++- .../catalyst/analysis/AnalysisErrorSuite.scala | 17 ++++++++ .../org/apache/spark/sql/SubquerySuite.scala | 44 ++++++++++++++++++++ 4 files changed, 89 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/399597b0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 617f3e0..6332f92 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1025,6 +1025,19 @@ class Analyzer( case e: Expand => failOnOuterReferenceInSubTree(e, "an EXPAND") e + case l : LocalLimit => + failOnOuterReferenceInSubTree(l, "a LIMIT") + l + // Since LIMIT <n> is represented as GlobalLimit(<n>, (LocalLimit (<n>, child)) + // and we are walking bottom up, we will fail on LocalLimit before + // reaching GlobalLimit. + // The code below is just a safety net. + case g : GlobalLimit => + failOnOuterReferenceInSubTree(g, "a LIMIT") + g + case s : Sample => + failOnOuterReferenceInSubTree(s, "a TABLESAMPLE") + s case p => failOnOuterReference(p) p http://git-wip-us.apache.org/repos/asf/spark/blob/399597b0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 4c06038..f0992b3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1020,7 +1020,7 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper { // state and all the input rows processed before. In another word, the order of input rows // matters for non-deterministic expressions, while pushing down predicates changes the order. case filter @ Filter(condition, project @ Project(fields, grandChild)) - if fields.forall(_.deterministic) => + if fields.forall(_.deterministic) && canPushThroughCondition(grandChild, condition) => // Create a map of Aliases to their values from the child projection. // e.g., 'SELECT a + b AS c, d ...' produces Map(c -> a + b). @@ -1161,6 +1161,20 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper { filter } } + + /** + * Check if we can safely push a filter through a projection, by making sure that predicate + * subqueries in the condition do not contain the same attributes as the plan they are moved + * into. This can happen when the plan and predicate subquery have the same source. + */ + private def canPushThroughCondition(plan: LogicalPlan, condition: Expression): Boolean = { + val attributes = plan.outputSet + val matched = condition.find { + case PredicateSubquery(p, _, _, _) => p.outputSet.intersect(attributes).nonEmpty + case _ => false + } + matched.isEmpty + } } /** http://git-wip-us.apache.org/repos/asf/spark/blob/399597b0/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala index ff112c5..6438065 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala @@ -533,5 +533,22 @@ class AnalysisErrorSuite extends AnalysisTest { Exists(Union(LocalRelation(b), Filter(EqualTo(OuterReference(a), c), LocalRelation(c)))), LocalRelation(a)) assertAnalysisError(plan3, "Accessing outer query column is not allowed in" :: Nil) + + val plan4 = Filter( + Exists( + Limit(1, + Filter(EqualTo(OuterReference(a), b), LocalRelation(b))) + ), + LocalRelation(a)) + assertAnalysisError(plan4, "Accessing outer query column is not allowed in a LIMIT" :: Nil) + + val plan5 = Filter( + Exists( + Sample(0.0, 0.5, false, 1L, + Filter(EqualTo(OuterReference(a), b), LocalRelation(b)))().select('b) + ), + LocalRelation(a)) + assertAnalysisError(plan5, + "Accessing outer query column is not allowed in a TABLESAMPLE" :: Nil) } } http://git-wip-us.apache.org/repos/asf/spark/blob/399597b0/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index afed342..184d1f7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -571,4 +571,48 @@ class SubquerySuite extends QueryTest with SharedSQLContext { Row(1.0, false) :: Row(1.0, false) :: Row(2.0, true) :: Row(2.0, true) :: Row(3.0, false) :: Row(5.0, true) :: Row(null, false) :: Row(null, true) :: Nil) } + + test("SPARK-16804: Correlated subqueries containing LIMIT - 1") { + withTempView("onerow") { + Seq(1).toDF("c1").createOrReplaceTempView("onerow") + + checkAnswer( + sql( + """ + | select c1 from onerow t1 + | where exists (select 1 from onerow t2 where t1.c1=t2.c1) + | and exists (select 1 from onerow LIMIT 1)""".stripMargin), + Row(1) :: Nil) + } + } + + test("SPARK-16804: Correlated subqueries containing LIMIT - 2") { + withTempView("onerow") { + Seq(1).toDF("c1").createOrReplaceTempView("onerow") + + checkAnswer( + sql( + """ + | select c1 from onerow t1 + | where exists (select 1 + | from (select 1 from onerow t2 LIMIT 1) + | where t1.c1=t2.c1)""".stripMargin), + Row(1) :: Nil) + } + } + + test("SPARK-17337: Incorrect column resolution leads to incorrect results") { + withTempView("t1", "t2") { + Seq(1, 2).toDF("c1").createOrReplaceTempView("t1") + Seq(1).toDF("c2").createOrReplaceTempView("t2") + checkAnswer( + sql( + """ + | select * + | from (select t2.c2+1 as c3 + | from t1 left join t2 on t1.c1=t2.c2) t3 + | where c3 not in (select c2 from t2)""".stripMargin), + Row(2) :: Nil) + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org