Repository: spark
Updated Branches:
  refs/heads/master 5fccdae18 -> 1e3b8762a


[SPARK-21479][SQL] Outer join filter pushdown in null supplying table when 
condition is on one of the joined columns

## What changes were proposed in this pull request?

Added `TransitPredicateInOuterJoin` optimization rule that transits constraints 
from the preserved side of an outer join to the null-supplying side. The 
constraints of the join operator will remain unchanged.

## How was this patch tested?

Added 3 tests in `InferFiltersFromConstraintsSuite`.

Author: maryannxue <maryann....@gmail.com>

Closes #20816 from maryannxue/spark-21479.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1e3b8762
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1e3b8762
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1e3b8762

Branch: refs/heads/master
Commit: 1e3b8762a854a07c317f69fba7fa1a7bcdc58ff3
Parents: 5fccdae
Author: maryannxue <maryann....@gmail.com>
Authored: Wed Apr 18 10:36:41 2018 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Wed Apr 18 10:36:41 2018 +0800

----------------------------------------------------------------------
 .../sql/catalyst/optimizer/Optimizer.scala      | 42 ++++++++++++++++++--
 .../plans/logical/QueryPlanConstraints.scala    | 25 ++++++++++--
 .../InferFiltersFromConstraintsSuite.scala      | 36 +++++++++++++++++
 3 files changed, 96 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1e3b8762/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 5fb59ef..913354e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -637,8 +637,11 @@ object CollapseWindow extends Rule[LogicalPlan] {
  * constraints. These filters are currently inserted to the existing 
conditions in the Filter
  * operators and on either side of Join operators.
  *
- * Note: While this optimization is applicable to all types of join, it 
primarily benefits Inner and
- * LeftSemi joins.
+ * In addition, for left/right outer joins, infer predicate from the preserved 
side of the Join
+ * operator and push the inferred filter over to the null-supplying side. For 
example, if the
+ * preserved side has constraints of the form 'a > 5' and the join condition 
is 'a = b', in
+ * which 'b' is an attribute from the null-supplying side, a [[Filter]] 
operator of 'b > 5' will
+ * be applied to the null-supplying side.
  */
 object InferFiltersFromConstraints extends Rule[LogicalPlan] with 
PredicateHelper {
 
@@ -671,11 +674,42 @@ object InferFiltersFromConstraints extends 
Rule[LogicalPlan] with PredicateHelpe
       val newConditionOpt = conditionOpt match {
         case Some(condition) =>
           val newFilters = additionalConstraints -- 
splitConjunctivePredicates(condition)
-          if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), 
condition)) else None
+          if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), 
condition)) else conditionOpt
         case None =>
           additionalConstraints.reduceOption(And)
       }
-      if (newConditionOpt.isDefined) Join(left, right, joinType, 
newConditionOpt) else join
+      // Infer filter for left/right outer joins
+      val newLeftOpt = joinType match {
+        case RightOuter if newConditionOpt.isDefined =>
+          val inferredConstraints = left.getRelevantConstraints(
+            left.constraints
+              .union(right.constraints)
+              .union(splitConjunctivePredicates(newConditionOpt.get).toSet))
+          val newFilters = inferredConstraints
+            .filterNot(left.constraints.contains)
+            .reduceLeftOption(And)
+          newFilters.map(Filter(_, left))
+        case _ => None
+      }
+      val newRightOpt = joinType match {
+        case LeftOuter if newConditionOpt.isDefined =>
+          val inferredConstraints = right.getRelevantConstraints(
+            right.constraints
+              .union(left.constraints)
+              .union(splitConjunctivePredicates(newConditionOpt.get).toSet))
+          val newFilters = inferredConstraints
+            .filterNot(right.constraints.contains)
+            .reduceLeftOption(And)
+          newFilters.map(Filter(_, right))
+        case _ => None
+      }
+
+      if ((newConditionOpt.isDefined && (newConditionOpt ne conditionOpt))
+        || newLeftOpt.isDefined || newRightOpt.isDefined) {
+        Join(newLeftOpt.getOrElse(left), newRightOpt.getOrElse(right), 
joinType, newConditionOpt)
+      } else {
+        join
+      }
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/1e3b8762/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala
index 0468488..a29f3d2 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala
@@ -41,9 +41,7 @@ trait QueryPlanConstraints { self: LogicalPlan =>
    * example, if this set contains the expression `a = 2` then that expression 
is guaranteed to
    * evaluate to `true` for all rows produced.
    */
-  lazy val constraints: ExpressionSet = ExpressionSet(allConstraints.filter { 
c =>
-        c.references.nonEmpty && c.references.subsetOf(outputSet) && 
c.deterministic
-      })
+  lazy val constraints: ExpressionSet = 
ExpressionSet(allConstraints.filter(selfReferenceOnly))
 
   /**
    * This method can be overridden by any child class of QueryPlan to specify 
a set of constraints
@@ -56,6 +54,23 @@ trait QueryPlanConstraints { self: LogicalPlan =>
   protected def validConstraints: Set[Expression] = Set.empty
 
   /**
+   * Returns an [[ExpressionSet]] that contains an additional set of 
constraints, such as
+   * equality constraints and `isNotNull` constraints, etc., and that only 
contains references
+   * to this [[LogicalPlan]] node.
+   */
+  def getRelevantConstraints(constraints: Set[Expression]): ExpressionSet = {
+    val allRelevantConstraints =
+      if (conf.constraintPropagationEnabled) {
+        constraints
+          .union(inferAdditionalConstraints(constraints))
+          .union(constructIsNotNullConstraints(constraints))
+      } else {
+        constraints
+      }
+    ExpressionSet(allRelevantConstraints.filter(selfReferenceOnly))
+  }
+
+  /**
    * Infers a set of `isNotNull` constraints from null intolerant expressions 
as well as
    * non-nullable attributes. For e.g., if an expression is of the form (`a > 
5`), this
    * returns a constraint of the form `isNotNull(a)`
@@ -120,4 +135,8 @@ trait QueryPlanConstraints { self: LogicalPlan =>
       destination: Attribute): Set[Expression] = constraints.map(_ transform {
     case e: Expression if e.semanticEquals(source) => destination
   })
+
+  private def selfReferenceOnly(e: Expression): Boolean = {
+    e.references.nonEmpty && e.references.subsetOf(outputSet) && 
e.deterministic
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/1e3b8762/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala
index f78c235..e068f51 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala
@@ -204,4 +204,40 @@ class InferFiltersFromConstraintsSuite extends PlanTest {
     val optimized = Optimize.execute(originalQuery)
     comparePlans(optimized, correctAnswer)
   }
+
+  test("SPARK-21479: Outer join after-join filters push down to null-supplying 
side") {
+    val x = testRelation.subquery('x)
+    val y = testRelation.subquery('y)
+    val condition = Some("x.a".attr === "y.a".attr)
+    val originalQuery = x.join(y, LeftOuter, condition).where("x.a".attr === 
2).analyze
+    val left = x.where(IsNotNull('a) && 'a === 2)
+    val right = y.where(IsNotNull('a) && 'a === 2)
+    val correctAnswer = left.join(right, LeftOuter, condition).analyze
+    val optimized = Optimize.execute(originalQuery)
+    comparePlans(optimized, correctAnswer)
+  }
+
+  test("SPARK-21479: Outer join pre-existing filters push down to 
null-supplying side") {
+    val x = testRelation.subquery('x)
+    val y = testRelation.subquery('y)
+    val condition = Some("x.a".attr === "y.a".attr)
+    val originalQuery = x.join(y.where("y.a".attr > 5), RightOuter, 
condition).analyze
+    val left = x.where(IsNotNull('a) && 'a > 5)
+    val right = y.where(IsNotNull('a) && 'a > 5)
+    val correctAnswer = left.join(right, RightOuter, condition).analyze
+    val optimized = Optimize.execute(originalQuery)
+    comparePlans(optimized, correctAnswer)
+  }
+
+  test("SPARK-21479: Outer join no filter push down to preserved side") {
+    val x = testRelation.subquery('x)
+    val y = testRelation.subquery('y)
+    val condition = Some("x.a".attr === "y.a".attr)
+    val originalQuery = x.join(y.where("y.a".attr === 1), LeftOuter, 
condition).analyze
+    val left = x
+    val right = y.where(IsNotNull('a) && 'a === 1)
+    val correctAnswer = left.join(right, LeftOuter, condition).analyze
+    val optimized = Optimize.execute(originalQuery)
+    comparePlans(optimized, correctAnswer)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to