Repository: spark
Updated Branches:
  refs/heads/branch-2.1 4b1df0e89 -> b4bad04c5


[SPARK-18497][SS] Make ForeachSink support watermark

## What changes were proposed in this pull request?

The issue in ForeachSink is the new created DataSet still uses the old 
QueryExecution. When `foreachPartition` is called, `QueryExecution.toString` 
will be called and then fail because it doesn't know how to plan 
EventTimeWatermark.

This PR just replaces the QueryExecution with IncrementalExecution to fix the 
issue.

## How was this patch tested?

`test("foreach with watermark")`.

Author: Shixiong Zhu <shixi...@databricks.com>

Closes #15934 from zsxwing/SPARK-18497.

(cherry picked from commit 2a40de408b5eb47edba92f9fe92a42ed1e78bf98)
Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b4bad04c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b4bad04c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b4bad04c

Branch: refs/heads/branch-2.1
Commit: b4bad04c5e20b06992100c1d44ece9d3a5b4f817
Parents: 4b1df0e
Author: Shixiong Zhu <shixi...@databricks.com>
Authored: Fri Nov 18 16:34:38 2016 -0800
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Fri Nov 18 16:34:48 2016 -0800

----------------------------------------------------------------------
 .../sql/execution/streaming/ForeachSink.scala   | 16 ++++-----
 .../execution/streaming/ForeachSinkSuite.scala  | 35 ++++++++++++++++++++
 2 files changed, 43 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b4bad04c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
index f5c550d..c93fcfb 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
@@ -47,22 +47,22 @@ class ForeachSink[T : Encoder](writer: ForeachWriter[T]) 
extends Sink with Seria
     // method supporting incremental planning. But in the long run, we should 
generally make newly
     // created Datasets use `IncrementalExecution` where necessary (which is 
SPARK-16264 tries to
     // resolve).
-
+    val incrementalExecution = 
data.queryExecution.asInstanceOf[IncrementalExecution]
     val datasetWithIncrementalExecution =
-      new Dataset(data.sparkSession, data.logicalPlan, implicitly[Encoder[T]]) 
{
+      new Dataset(data.sparkSession, incrementalExecution, 
implicitly[Encoder[T]]) {
         override lazy val rdd: RDD[T] = {
           val objectType = exprEnc.deserializer.dataType
           val deserialized = CatalystSerde.deserialize[T](logicalPlan)
 
           // was originally: 
sparkSession.sessionState.executePlan(deserialized) ...
-          val incrementalExecution = new IncrementalExecution(
+          val newIncrementalExecution = new IncrementalExecution(
             this.sparkSession,
             deserialized,
-            data.queryExecution.asInstanceOf[IncrementalExecution].outputMode,
-            
data.queryExecution.asInstanceOf[IncrementalExecution].checkpointLocation,
-            
data.queryExecution.asInstanceOf[IncrementalExecution].currentBatchId,
-            
data.queryExecution.asInstanceOf[IncrementalExecution].currentEventTimeWatermark)
-          incrementalExecution.toRdd.mapPartitions { rows =>
+            incrementalExecution.outputMode,
+            incrementalExecution.checkpointLocation,
+            incrementalExecution.currentBatchId,
+            incrementalExecution.currentEventTimeWatermark)
+          newIncrementalExecution.toRdd.mapPartitions { rows =>
             rows.map(_.get(0, objectType))
           }.asInstanceOf[RDD[T]]
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/b4bad04c/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala
index 9e05921..ee626103 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala
@@ -25,6 +25,7 @@ import org.scalatest.BeforeAndAfter
 
 import org.apache.spark.SparkException
 import org.apache.spark.sql.ForeachWriter
+import org.apache.spark.sql.functions.{count, window}
 import org.apache.spark.sql.streaming.{OutputMode, StreamingQueryException, 
StreamTest}
 import org.apache.spark.sql.test.SharedSQLContext
 
@@ -169,6 +170,40 @@ class ForeachSinkSuite extends StreamTest with 
SharedSQLContext with BeforeAndAf
       assert(errorEvent.error.get.getMessage === "error")
     }
   }
+
+  test("foreach with watermark") {
+    val inputData = MemoryStream[Int]
+
+    val windowedAggregation = inputData.toDF()
+      .withColumn("eventTime", $"value".cast("timestamp"))
+      .withWatermark("eventTime", "10 seconds")
+      .groupBy(window($"eventTime", "5 seconds") as 'window)
+      .agg(count("*") as 'count)
+      .select($"count".as[Long])
+      .map(_.toInt)
+      .repartition(1)
+
+    val query = windowedAggregation
+      .writeStream
+      .outputMode(OutputMode.Complete)
+      .foreach(new TestForeachWriter())
+      .start()
+    try {
+      inputData.addData(10, 11, 12)
+      query.processAllAvailable()
+
+      val allEvents = ForeachSinkSuite.allEvents()
+      assert(allEvents.size === 1)
+      val expectedEvents = Seq(
+        ForeachSinkSuite.Open(partition = 0, version = 0),
+        ForeachSinkSuite.Process(value = 3),
+        ForeachSinkSuite.Close(None)
+      )
+      assert(allEvents === Seq(expectedEvents))
+    } finally {
+      query.stop()
+    }
+  }
 }
 
 /** A global object to collect events in the executor */


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to