Repository: spark
Updated Branches:
  refs/heads/master b55a36bc3 -> 76520955f


[SPARK-9082] [SQL] Filter using non-deterministic expressions should not be 
pushed down

Author: Wenchen Fan <cloud0...@outlook.com>

Closes #7446 from cloud-fan/filter and squashes the following commits:

330021e [Wenchen Fan] add exists to tree node
2cab68c [Wenchen Fan] more enhance
949be07 [Wenchen Fan] push down part of predicate if possible
3912f84 [Wenchen Fan] address comments
8ce15ca [Wenchen Fan] fix bug
557158e [Wenchen Fan] Filter using non-deterministic expressions should not be 
pushed down


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/76520955
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/76520955
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/76520955

Branch: refs/heads/master
Commit: 76520955fddbda87a5c53d0a394dedc91dce67e8
Parents: b55a36b
Author: Wenchen Fan <cloud0...@outlook.com>
Authored: Wed Jul 22 11:45:51 2015 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Wed Jul 22 11:45:51 2015 -0700

----------------------------------------------------------------------
 .../sql/catalyst/optimizer/Optimizer.scala      | 50 ++++++++++++++++----
 .../optimizer/FilterPushdownSuite.scala         | 45 +++++++++++++++++-
 2 files changed, 84 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/76520955/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index e42f0b9..d2db3dd 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -541,20 +541,50 @@ object SimplifyFilters extends Rule[LogicalPlan] {
  *
  * This heuristic is valid assuming the expression evaluation cost is minimal.
  */
-object PushPredicateThroughProject extends Rule[LogicalPlan] {
+object PushPredicateThroughProject extends Rule[LogicalPlan] with 
PredicateHelper {
   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
     case filter @ Filter(condition, project @ Project(fields, grandChild)) =>
-      val sourceAliases = fields.collect { case a @ Alias(c, _) =>
-        (a.toAttribute: Attribute) -> c
-      }.toMap
-      project.copy(child = filter.copy(
-        replaceAlias(condition, sourceAliases),
-        grandChild))
+      // Create a map of Aliases to their values from the child projection.
+      // e.g., 'SELECT a + b AS c, d ...' produces Map(c -> a + b).
+      val aliasMap = AttributeMap(fields.collect {
+        case a: Alias => (a.toAttribute, a.child)
+      })
+
+      // Split the condition into small conditions by `And`, so that we can 
push down part of this
+      // condition without nondeterministic expressions.
+      val andConditions = splitConjunctivePredicates(condition)
+      val nondeterministicConditions = 
andConditions.filter(hasNondeterministic(_, aliasMap))
+
+      // If there is no nondeterministic conditions, push down the whole 
condition.
+      if (nondeterministicConditions.isEmpty) {
+        project.copy(child = Filter(replaceAlias(condition, aliasMap), 
grandChild))
+      } else {
+        // If they are all nondeterministic conditions, leave it un-changed.
+        if (nondeterministicConditions.length == andConditions.length) {
+          filter
+        } else {
+          val deterministicConditions = 
andConditions.filterNot(hasNondeterministic(_, aliasMap))
+          // Push down the small conditions without nondeterministic 
expressions.
+          val pushedCondition = deterministicConditions.map(replaceAlias(_, 
aliasMap)).reduce(And)
+          Filter(nondeterministicConditions.reduce(And),
+            project.copy(child = Filter(pushedCondition, grandChild)))
+        }
+      }
+  }
+
+  private def hasNondeterministic(
+      condition: Expression,
+      sourceAliases: AttributeMap[Expression]) = {
+    condition.collect {
+      case a: Attribute if sourceAliases.contains(a) => sourceAliases(a)
+    }.exists(!_.deterministic)
   }
 
-  private def replaceAlias(condition: Expression, sourceAliases: 
Map[Attribute, Expression]) = {
-    condition transform {
-      case a: AttributeReference => sourceAliases.getOrElse(a, a)
+  // Substitute any attributes that are produced by the child projection, so 
that we safely
+  // eliminate it.
+  private def replaceAlias(condition: Expression, sourceAliases: 
AttributeMap[Expression]) = {
+    condition.transform {
+      case a: Attribute => sourceAliases.getOrElse(a, a)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/76520955/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index dc28b3f..0f1fde2 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.optimizer
 
 import org.apache.spark.sql.catalyst.analysis
 import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries
-import org.apache.spark.sql.catalyst.expressions.{SortOrder, Ascending, Count, 
Explode}
+import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.plans.{LeftSemi, PlanTest, LeftOuter, 
RightOuter}
 import org.apache.spark.sql.catalyst.rules._
@@ -146,6 +146,49 @@ class FilterPushdownSuite extends PlanTest {
     comparePlans(optimized, correctAnswer)
   }
 
+  test("nondeterministic: can't push down filter through project") {
+    val originalQuery = testRelation
+      .select(Rand(10).as('rand), 'a)
+      .where('rand > 5 || 'a > 5)
+      .analyze
+
+    val optimized = Optimize.execute(originalQuery)
+
+    comparePlans(optimized, originalQuery)
+  }
+
+  test("nondeterministic: push down part of filter through project") {
+    val originalQuery = testRelation
+      .select(Rand(10).as('rand), 'a)
+      .where('rand > 5 && 'a > 5)
+      .analyze
+
+    val optimized = Optimize.execute(originalQuery)
+
+    val correctAnswer = testRelation
+      .where('a > 5)
+      .select(Rand(10).as('rand), 'a)
+      .where('rand > 5)
+      .analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
+
+  test("nondeterministic: push down filter through project") {
+    val originalQuery = testRelation
+      .select(Rand(10).as('rand), 'a)
+      .where('a > 5 && 'a < 10)
+      .analyze
+
+    val optimized = Optimize.execute(originalQuery)
+    val correctAnswer = testRelation
+      .where('a > 5 && 'a < 10)
+      .select(Rand(10).as('rand), 'a)
+      .analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
+
   test("filters: combines filters") {
     val originalQuery = testRelation
       .select('a)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to