Repository: spark
Updated Branches:
  refs/heads/master 7d6ff3910 -> 0d3a63193


[SPARK-20714][SS] Fix match error when watermark is set with timeout = no 
timeout / processing timeout

## What changes were proposed in this pull request?

When watermark is set, and timeout conf is NoTimeout or ProcessingTimeTimeout 
(both do not need the watermark), the query fails at runtime with the following 
exception.
```
MatchException: 
Some(org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate1a9b798e)
 (of class scala.Some)
    
org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec$$anonfun$doExecute$1.apply(FlatMapGroupsWithStateExec.scala:120)
    
org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec$$anonfun$doExecute$1.apply(FlatMapGroupsWithStateExec.scala:116)
    
org.apache.spark.sql.execution.streaming.state.package$StateStoreOps$$anonfun$1.apply(package.scala:70)
    
org.apache.spark.sql.execution.streaming.state.package$StateStoreOps$$anonfun$1.apply(package.scala:65)
    
org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:64)
```

The match did not correctly handle cases where watermark was defined by the 
timeout was different from EventTimeTimeout.

## How was this patch tested?
New unit tests.

Author: Tathagata Das <tathagata.das1...@gmail.com>

Closes #17954 from tdas/SPARK-20714.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0d3a6319
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0d3a6319
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0d3a6319

Branch: refs/heads/master
Commit: 0d3a63193c691ece88bb256d04156258a1c03a81
Parents: 7d6ff39
Author: Tathagata Das <tathagata.das1...@gmail.com>
Authored: Fri May 12 10:49:50 2017 -0700
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Fri May 12 10:49:50 2017 -0700

----------------------------------------------------------------------
 .../streaming/FlatMapGroupsWithStateExec.scala  |  2 +-
 .../streaming/FlatMapGroupsWithStateSuite.scala | 40 +++++++++++++++++++-
 2 files changed, 40 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0d3a6319/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
index e42df5d..5e79232 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
@@ -120,7 +120,7 @@ case class FlatMapGroupsWithStateExec(
         val filteredIter = watermarkPredicateForData match {
           case Some(predicate) if timeoutConf == EventTimeTimeout =>
             iter.filter(row => !predicate.eval(row))
-          case None =>
+          case _ =>
             iter
         }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0d3a6319/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
index 85aa7db..89cfba6 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
@@ -589,7 +589,7 @@ class FlatMapGroupsWithStateSuite extends 
StateStoreMetricsTest with BeforeAndAf
     )
   }
 
-  test("flatMapGroupsWithState - streaming with event time timeout") {
+  test("flatMapGroupsWithState - streaming with event time timeout + 
watermark") {
     // Function to maintain the max event time
     // Returns the max event time in the state, or -1 if the state was removed 
by timeout
     val stateFunc = (
@@ -761,6 +761,44 @@ class FlatMapGroupsWithStateSuite extends 
StateStoreMetricsTest with BeforeAndAf
     assert(e.getMessage === "The output mode of function should be append or 
update")
   }
 
+  def testWithTimeout(timeoutConf: GroupStateTimeout): Unit = {
+    test("SPARK-20714: watermark does not fail query when timeout = " + 
timeoutConf) {
+      // Function to maintain running count up to 2, and then remove the count
+      // Returns the data and the count (-1 if count reached beyond 2 and 
state was just removed)
+      val stateFunc =
+      (key: String, values: Iterator[(String, Long)], state: 
GroupState[RunningCount]) => {
+        if (state.hasTimedOut) {
+          state.remove()
+          Iterator((key, "-1"))
+        } else {
+          val count = state.getOption.map(_.count).getOrElse(0L) + values.size
+          state.update(RunningCount(count))
+          state.setTimeoutDuration("10 seconds")
+          Iterator((key, count.toString))
+        }
+      }
+
+      val clock = new StreamManualClock
+      val inputData = MemoryStream[(String, Long)]
+      val result =
+        inputData.toDF().toDF("key", "time")
+          .selectExpr("key", "cast(time as timestamp) as timestamp")
+          .withWatermark("timestamp", "10 second")
+          .as[(String, Long)]
+          .groupByKey(x => x._1)
+          .flatMapGroupsWithState(Update, ProcessingTimeTimeout)(stateFunc)
+
+      testStream(result, Update)(
+        StartStream(ProcessingTime("1 second"), triggerClock = clock),
+        AddData(inputData, ("a", 1L)),
+        AdvanceManualClock(1 * 1000),
+        CheckLastBatch(("a", "1"))
+      )
+    }
+  }
+  testWithTimeout(NoTimeout)
+  testWithTimeout(ProcessingTimeTimeout)
+
   def testStateUpdateWithData(
       testName: String,
       stateUpdates: GroupState[Int] => Unit,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to