Repository: spark Updated Branches: refs/heads/branch-1.6 897599601 -> 3cc938ac8
[SPARK-13473][SQL] Don't push predicate through project with nondeterministic field(s) ## What changes were proposed in this pull request? Predicates shouldn't be pushed through project with nondeterministic field(s). See https://github.com/graphframes/graphframes/pull/23 and SPARK-13473 for more details. This PR targets master, branch-1.6, and branch-1.5. ## How was this patch tested? A test case is added in `FilterPushdownSuite`. It constructs a query plan where a filter is over a project with a nondeterministic field. Optimized query plan shouldn't change in this case. Author: Cheng Lian <l...@databricks.com> Closes #11348 from liancheng/spark-13473-no-ppd-through-nondeterministic-project-field. (cherry picked from commit 3fa6491be66dad690ca5329dd32e7c82037ae8c1) Signed-off-by: Wenchen Fan <wenc...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3cc938ac Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3cc938ac Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3cc938ac Branch: refs/heads/branch-1.6 Commit: 3cc938ac8124b8445f171baa365fa44a47962cc9 Parents: 8975996 Author: Cheng Lian <l...@databricks.com> Authored: Thu Feb 25 20:43:03 2016 +0800 Committer: Wenchen Fan <wenc...@databricks.com> Committed: Thu Feb 25 20:45:18 2016 +0800 ---------------------------------------------------------------------- .../sql/catalyst/optimizer/Optimizer.scala | 9 ++++++- .../optimizer/FilterPushdownSuite.scala | 27 +++----------------- 2 files changed, 11 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/3cc938ac/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 06d14fc..682b860 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -618,7 +618,14 @@ object SimplifyFilters extends Rule[LogicalPlan] { */ object PushPredicateThroughProject extends Rule[LogicalPlan] with PredicateHelper { def apply(plan: LogicalPlan): LogicalPlan = plan transform { - case filter @ Filter(condition, project @ Project(fields, grandChild)) => + // SPARK-13473: We can't push the predicate down when the underlying projection output non- + // deterministic field(s). Non-deterministic expressions are essentially stateful. This + // implies that, for a given input row, the output are determined by the expression's initial + // state and all the input rows processed before. In another word, the order of input rows + // matters for non-deterministic expressions, while pushing down predicates changes the order. + case filter @ Filter(condition, project @ Project(fields, grandChild)) + if fields.forall(_.deterministic) => + // Create a map of Aliases to their values from the child projection. // e.g., 'SELECT a + b AS c, d ...' produces Map(c -> a + b). val aliasMap = AttributeMap(fields.collect { http://git-wip-us.apache.org/repos/asf/spark/blob/3cc938ac/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index fba4c5c..6978807 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -147,7 +147,7 @@ class FilterPushdownSuite extends PlanTest { comparePlans(optimized, correctAnswer) } - test("nondeterministic: can't push down filter through project") { + test("nondeterministic: can't push down filter with nondeterministic condition through project") { val originalQuery = testRelation .select(Rand(10).as('rand), 'a) .where('rand > 5 || 'a > 5) @@ -158,36 +158,15 @@ class FilterPushdownSuite extends PlanTest { comparePlans(optimized, originalQuery) } - test("nondeterministic: push down part of filter through project") { + test("nondeterministic: can't push down filter through project with nondeterministic field") { val originalQuery = testRelation .select(Rand(10).as('rand), 'a) - .where('rand > 5 && 'a > 5) - .analyze - - val optimized = Optimize.execute(originalQuery) - - val correctAnswer = testRelation .where('a > 5) - .select(Rand(10).as('rand), 'a) - .where('rand > 5) - .analyze - - comparePlans(optimized, correctAnswer) - } - - test("nondeterministic: push down filter through project") { - val originalQuery = testRelation - .select(Rand(10).as('rand), 'a) - .where('a > 5 && 'a < 10) .analyze val optimized = Optimize.execute(originalQuery) - val correctAnswer = testRelation - .where('a > 5 && 'a < 10) - .select(Rand(10).as('rand), 'a) - .analyze - comparePlans(optimized, correctAnswer) + comparePlans(optimized, originalQuery) } test("filters: combines filters") { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org