Repository: spark
Updated Branches:
  refs/heads/master f79aa285c -> c0189abc7


[SPARK-20373][SQL][SS] Batch queries with 'Dataset/DataFrame.withWatermark()` 
does not execute

## What changes were proposed in this pull request?

Any Dataset/DataFrame batch query with the operation `withWatermark` does not 
execute because the batch planner does not have any rule to explicitly handle 
the EventTimeWatermark logical plan.
The right solution is to simply remove the plan node, as the watermark should 
not affect any batch query in any way.

Changes:
- In this PR, we add a new rule `EliminateEventTimeWatermark` to check if we 
need to ignore the event time watermark. We will ignore watermark in any batch 
query.

Depends upon:
- [SPARK-20672](https://issues.apache.org/jira/browse/SPARK-20672). We can not 
add this rule into analyzer directly, because streaming query will be copied to 
`triggerLogicalPlan ` in every trigger, and the rule will be applied to 
`triggerLogicalPlan` mistakenly.

Others:
- A typo fix in example.

## How was this patch tested?

add new unit test.

Author: uncleGen <husty...@gmail.com>

Closes #17896 from uncleGen/SPARK-20373.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c0189abc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c0189abc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c0189abc

Branch: refs/heads/master
Commit: c0189abc7c6ddbecc1832d2ff0cfc5546a010b60
Parents: f79aa28
Author: uncleGen <husty...@gmail.com>
Authored: Tue May 9 15:08:09 2017 -0700
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Tue May 9 15:08:09 2017 -0700

----------------------------------------------------------------------
 docs/structured-streaming-programming-guide.md            |  3 +++
 .../examples/sql/streaming/StructuredSessionization.scala |  4 ++--
 .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 10 ++++++++++
 .../src/main/scala/org/apache/spark/sql/Dataset.scala     |  3 ++-
 .../spark/sql/streaming/EventTimeWatermarkSuite.scala     | 10 ++++++++++
 5 files changed, 27 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c0189abc/docs/structured-streaming-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/structured-streaming-programming-guide.md 
b/docs/structured-streaming-programming-guide.md
index 53b3db2..bd01be9 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -901,6 +901,9 @@ Some sinks (e.g. files) may not supported fine-grained 
updates that Update Mode
 with them, we have also support Append Mode, where only the *final counts* are 
written to sink.
 This is illustrated below.
 
+Note that using `withWatermark` on a non-streaming Dataset is no-op. As the 
watermark should not affect 
+any batch query in any way, we will ignore it directly.
+
 ![Watermarking in Append 
Mode](img/structured-streaming-watermark-append-mode.png)
 
 Similar to the Update Mode earlier, the engine maintains intermediate counts 
for each window. 

http://git-wip-us.apache.org/repos/asf/spark/blob/c0189abc/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala
 
b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala
index 2ce792c..ed63fb6 100644
--- 
a/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala
+++ 
b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala
@@ -34,14 +34,14 @@ import org.apache.spark.sql.streaming._
  * To run this on your local machine, you need to first run a Netcat server
  * `$ nc -lk 9999`
  * and then run the example
- * `$ bin/run-example sql.streaming.StructuredNetworkWordCount
+ * `$ bin/run-example sql.streaming.StructuredSessionization
  * localhost 9999`
  */
 object StructuredSessionization {
 
   def main(args: Array[String]): Unit = {
     if (args.length < 2) {
-      System.err.println("Usage: StructuredNetworkWordCount <hostname> <port>")
+      System.err.println("Usage: StructuredSessionization <hostname> <port>")
       System.exit(1)
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c0189abc/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 72e7d5d..c56dd36 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -2457,6 +2457,16 @@ object CleanupAliases extends Rule[LogicalPlan] {
 }
 
 /**
+ * Ignore event time watermark in batch query, which is only supported in 
Structured Streaming.
+ * TODO: add this rule into analyzer rule list.
+ */
+object EliminateEventTimeWatermark extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+    case EventTimeWatermark(_, _, child) if !child.isStreaming => child
+  }
+}
+
+/**
  * Maps a time column to multiple time windows using the Expand operator. 
Since it's non-trivial to
  * figure out how many windows a time column can map to, we over-estimate the 
number of windows and
  * filter out the rows where the time column is not inside the time window.

http://git-wip-us.apache.org/repos/asf/spark/blob/c0189abc/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 620c8bd..61154e2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -615,7 +615,8 @@ class Dataset[T] private[sql](
         .getOrElse(throw new AnalysisException(s"Unable to parse time delay 
'$delayThreshold'"))
     require(parsedDelay.milliseconds >= 0 && parsedDelay.months >= 0,
       s"delay threshold ($delayThreshold) should not be negative.")
-    EventTimeWatermark(UnresolvedAttribute(eventTime), parsedDelay, 
logicalPlan)
+    EliminateEventTimeWatermark(
+      EventTimeWatermark(UnresolvedAttribute(eventTime), parsedDelay, 
logicalPlan))
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/c0189abc/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
index fd850a7..1b60a06 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
@@ -344,6 +344,16 @@ class EventTimeWatermarkSuite extends StreamTest with 
BeforeAndAfter with Loggin
     assert(eventTimeColumns(0).name === "second")
   }
 
+  test("EventTime watermark should be ignored in batch query.") {
+    val df = testData
+      .withColumn("eventTime", $"key".cast("timestamp"))
+      .withWatermark("eventTime", "1 minute")
+      .select("eventTime")
+      .as[Long]
+
+    checkDataset[Long](df, 1L to 100L: _*)
+  }
+
   private def assertNumStateRows(numTotalRows: Long): AssertOnQuery = 
AssertOnQuery { q =>
     val progressWithData = q.recentProgress.filter(_.numInputRows > 
0).lastOption.get
     assert(progressWithData.stateOperators(0).numRowsTotal === numTotalRows)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to