Repository: spark
Updated Branches:
  refs/heads/master 2d799d080 -> 0fb73253f


[SPARK-21587][SS] Added filter pushdown through watermarks.

## What changes were proposed in this pull request?

Push filter predicates through EventTimeWatermark if they're deterministic and 
do not reference the watermarked attribute. (This is similar but not identical 
to the logic for pushing through UnaryNode.)

## How was this patch tested?
unit tests

Author: Jose Torres <joseph-tor...@databricks.com>

Closes #18790 from joseph-torres/SPARK-21587.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0fb73253
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0fb73253
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0fb73253

Branch: refs/heads/master
Commit: 0fb73253fc832361d5d89ba85692ae653961e104
Parents: 2d799d0
Author: Jose Torres <joseph-tor...@databricks.com>
Authored: Wed Aug 9 12:50:04 2017 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Wed Aug 9 12:50:04 2017 -0700

----------------------------------------------------------------------
 .../sql/catalyst/optimizer/Optimizer.scala      | 19 +++++++
 .../optimizer/FilterPushdownSuite.scala         | 57 ++++++++++++++++++++
 2 files changed, 76 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0fb73253/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index d82af94..a51b385 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -867,6 +867,25 @@ object PushDownPredicate extends Rule[LogicalPlan] with 
PredicateHelper {
         filter
       }
 
+    case filter @ Filter(condition, watermark: EventTimeWatermark) =>
+      // We can only push deterministic predicates which don't reference the 
watermark attribute.
+      // We could in theory span() only on determinism and pull out 
deterministic predicates
+      // on the watermark separately. But it seems unnecessary and a bit 
confusing to not simply
+      // use the prefix as we do for nondeterminism in other cases.
+
+      val (pushDown, stayUp) = splitConjunctivePredicates(condition).span(
+        p => p.deterministic && !p.references.contains(watermark.eventTime))
+
+      if (pushDown.nonEmpty) {
+        val pushDownPredicate = pushDown.reduceLeft(And)
+        val newWatermark = watermark.copy(child = Filter(pushDownPredicate, 
watermark.child))
+        // If there is no more filter to stay up, just eliminate the filter.
+        // Otherwise, create "Filter(stayUp) <- watermark <- 
Filter(pushDownPredicate)".
+        if (stayUp.isEmpty) newWatermark else Filter(stayUp.reduceLeft(And), 
newWatermark)
+      } else {
+        filter
+      }
+
     case filter @ Filter(_, u: UnaryNode)
         if canPushThrough(u) && u.expressions.forall(_.deterministic) =>
       pushDownPredicate(filter, u.child) { predicate =>

http://git-wip-us.apache.org/repos/asf/spark/blob/0fb73253/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index 3553d23..582b3ea 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
 import org.apache.spark.sql.types.IntegerType
+import org.apache.spark.unsafe.types.CalendarInterval
 
 class FilterPushdownSuite extends PlanTest {
 
@@ -1134,4 +1135,60 @@ class FilterPushdownSuite extends PlanTest {
     comparePlans(Optimize.execute(originalQuery.analyze), 
correctAnswer.analyze,
       checkAnalysis = false)
   }
+
+  test("watermark pushdown: no pushdown on watermark attribute") {
+    val interval = new CalendarInterval(2, 2000L)
+
+    // Verify that all conditions preceding the first watermark touching 
condition are pushed down
+    // by the optimizer and others are not.
+    val originalQuery = EventTimeWatermark('b, interval, testRelation)
+      .where('a === 5 && 'b === 10 && 'c === 5)
+    val correctAnswer = EventTimeWatermark(
+      'b, interval, testRelation.where('a === 5))
+      .where('b === 10 && 'c === 5)
+
+    comparePlans(Optimize.execute(originalQuery.analyze), 
correctAnswer.analyze,
+      checkAnalysis = false)
+  }
+
+  test("watermark pushdown: no pushdown for nondeterministic filter") {
+    val interval = new CalendarInterval(2, 2000L)
+
+    // Verify that all conditions preceding the first watermark touching 
condition are pushed down
+    // by the optimizer and others are not.
+    val originalQuery = EventTimeWatermark('c, interval, testRelation)
+      .where('a === 5 && 'b === Rand(10) && 'c === 5)
+    val correctAnswer = EventTimeWatermark(
+      'c, interval, testRelation.where('a === 5))
+      .where('b === Rand(10) && 'c === 5)
+
+    comparePlans(Optimize.execute(originalQuery.analyze), 
correctAnswer.analyze,
+      checkAnalysis = false)
+  }
+
+  test("watermark pushdown: full pushdown") {
+    val interval = new CalendarInterval(2, 2000L)
+
+    // Verify that all conditions preceding the first watermark touching 
condition are pushed down
+    // by the optimizer and others are not.
+    val originalQuery = EventTimeWatermark('c, interval, testRelation)
+      .where('a === 5 && 'b === 10)
+    val correctAnswer = EventTimeWatermark(
+      'c, interval, testRelation.where('a === 5 && 'b === 10))
+
+    comparePlans(Optimize.execute(originalQuery.analyze), 
correctAnswer.analyze,
+      checkAnalysis = false)
+  }
+
+  test("watermark pushdown: empty pushdown") {
+    val interval = new CalendarInterval(2, 2000L)
+
+    // Verify that all conditions preceding the first watermark touching 
condition are pushed down
+    // by the optimizer and others are not.
+    val originalQuery = EventTimeWatermark('a, interval, testRelation)
+      .where('a === 5 && 'b === 10)
+
+    comparePlans(Optimize.execute(originalQuery.analyze), 
originalQuery.analyze,
+      checkAnalysis = false)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to