Repository: spark
Updated Branches:
  refs/heads/master f64a1ddd0 -> 540e91280


[SPARK-17244] Catalyst should not pushdown non-deterministic join conditions

## What changes were proposed in this pull request?

Given that non-deterministic expressions can be stateful, pushing them down the 
query plan during the optimization phase can cause incorrect behavior. This 
patch fixes that issue by explicitly disabling that.

## How was this patch tested?

A new test in `FilterPushdownSuite` that checks catalyst behavior for both 
deterministic and non-deterministic join conditions.

Author: Sameer Agarwal <samee...@cs.berkeley.edu>

Closes #14815 from sameeragarwal/constraint-inputfile.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/540e9128
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/540e9128
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/540e9128

Branch: refs/heads/master
Commit: 540e91280147a61727f99592a66c0cbb12328fac
Parents: f64a1dd
Author: Sameer Agarwal <samee...@cs.berkeley.edu>
Authored: Fri Aug 26 16:40:59 2016 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Fri Aug 26 16:40:59 2016 -0700

----------------------------------------------------------------------
 .../sql/catalyst/optimizer/Optimizer.scala      | 21 +++++++++++++-------
 .../optimizer/FilterPushdownSuite.scala         | 14 +++++++++++++
 2 files changed, 28 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/540e9128/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 82ad0fb..5c83161 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -1379,18 +1379,25 @@ object EliminateOuterJoin extends Rule[LogicalPlan] 
with PredicateHelper {
  */
 object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper 
{
   /**
-   * Splits join condition expressions into three categories based on the 
attributes required
-   * to evaluate them.
+   * Splits join condition expressions or filter predicates (on a given join's 
output) into three
+   * categories based on the attributes required to evaluate them. Note that 
we explicitly exclude
+   * on-deterministic (i.e., stateful) condition expressions in 
canEvaluateInLeft or
+   * canEvaluateInRight to prevent pushing these predicates on either side of 
the join.
    *
    * @return (canEvaluateInLeft, canEvaluateInRight, haveToEvaluateInBoth)
    */
   private def split(condition: Seq[Expression], left: LogicalPlan, right: 
LogicalPlan) = {
+    // Note: In order to ensure correctness, it's important to not change the 
relative ordering of
+    // any deterministic expression that follows a non-deterministic 
expression. To achieve this,
+    // we only consider pushing down those expressions that precede the first 
non-deterministic
+    // expression in the condition.
+    val (pushDownCandidates, containingNonDeterministic) = 
condition.span(_.deterministic)
     val (leftEvaluateCondition, rest) =
-        condition.partition(_.references subsetOf left.outputSet)
+      pushDownCandidates.partition(_.references.subsetOf(left.outputSet))
     val (rightEvaluateCondition, commonCondition) =
-        rest.partition(_.references subsetOf right.outputSet)
+        rest.partition(expr => expr.references.subsetOf(right.outputSet))
 
-    (leftEvaluateCondition, rightEvaluateCondition, commonCondition)
+    (leftEvaluateCondition, rightEvaluateCondition, commonCondition ++ 
containingNonDeterministic)
   }
 
   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
@@ -1441,7 +1448,7 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] 
with PredicateHelper {
       }
 
     // push down the join filter into sub query scanning if applicable
-    case f @ Join(left, right, joinType, joinCondition) =>
+    case j @ Join(left, right, joinType, joinCondition) =>
       val (leftJoinConditions, rightJoinConditions, commonJoinCondition) =
         split(joinCondition.map(splitConjunctivePredicates).getOrElse(Nil), 
left, right)
 
@@ -1471,7 +1478,7 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] 
with PredicateHelper {
           val newJoinCond = (leftJoinConditions ++ 
commonJoinCondition).reduceLeftOption(And)
 
           Join(newLeft, newRight, LeftOuter, newJoinCond)
-        case FullOuter => f
+        case FullOuter => j
         case NaturalJoin(_) => sys.error("Untransformed NaturalJoin node")
         case UsingJoin(_, _) => sys.error("Untransformed Using join node")
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/540e9128/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index 9f25e9d..55836f9 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -987,4 +987,18 @@ class FilterPushdownSuite extends PlanTest {
 
     comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer)
   }
+
+  test("join condition pushdown: deterministic and non-deterministic") {
+    val x = testRelation.subquery('x)
+    val y = testRelation.subquery('y)
+
+    // Verify that all conditions preceding the first non-deterministic 
condition are pushed down
+    // by the optimizer and others are not.
+    val originalQuery = x.join(y, condition = Some("x.a".attr === 5 && 
"y.a".attr === 5 &&
+      "x.a".attr === Rand(10) && "y.b".attr === 5))
+    val correctAnswer = x.where("x.a".attr === 5).join(y.where("y.a".attr === 
5),
+        condition = Some("x.a".attr === Rand(10) && "y.b".attr === 5))
+
+    comparePlans(Optimize.execute(originalQuery.analyze), 
correctAnswer.analyze)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to