Repository: spark
Updated Branches:
  refs/heads/branch-2.2 7123ec8e1 -> f14246959


[SPARK-20714][SS] Fix match error when watermark is set with timeout = no 
timeout / processing timeout

## What changes were proposed in this pull request?

When watermark is set, and timeout conf is NoTimeout or ProcessingTimeTimeout 
(both do not need the watermark), the query fails at runtime with the following 
exception.
```
MatchException: 
Some(org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate1a9b798e)
 (of class scala.Some)
    
org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec$$anonfun$doExecute$1.apply(FlatMapGroupsWithStateExec.scala:120)
    
org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec$$anonfun$doExecute$1.apply(FlatMapGroupsWithStateExec.scala:116)
    
org.apache.spark.sql.execution.streaming.state.package$StateStoreOps$$anonfun$1.apply(package.scala:70)
    
org.apache.spark.sql.execution.streaming.state.package$StateStoreOps$$anonfun$1.apply(package.scala:65)
    
org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:64)
```

The match did not correctly handle cases where watermark was defined by the 
timeout was different from EventTimeTimeout.

## How was this patch tested?
New unit tests.

Author: Tathagata Das <tathagata.das1...@gmail.com>

Closes #17954 from tdas/SPARK-20714.

(cherry picked from commit 0d3a63193c691ece88bb256d04156258a1c03a81)
Signed-off-by: Shixiong Zhu <shixi...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f1424695
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f1424695
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f1424695

Branch: refs/heads/branch-2.2
Commit: f14246959c3ed84d4a69a02d7609676810039dc3
Parents: 7123ec8
Author: Tathagata Das <tathagata.das1...@gmail.com>
Authored: Fri May 12 10:49:50 2017 -0700
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Fri May 12 10:49:58 2017 -0700

----------------------------------------------------------------------
 .../streaming/FlatMapGroupsWithStateExec.scala  |  2 +-
 .../streaming/FlatMapGroupsWithStateSuite.scala | 40 +++++++++++++++++++-
 2 files changed, 40 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f1424695/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
index e42df5d..5e79232 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
@@ -120,7 +120,7 @@ case class FlatMapGroupsWithStateExec(
         val filteredIter = watermarkPredicateForData match {
           case Some(predicate) if timeoutConf == EventTimeTimeout =>
             iter.filter(row => !predicate.eval(row))
-          case None =>
+          case _ =>
             iter
         }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f1424695/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
index 85aa7db..89cfba6 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
@@ -589,7 +589,7 @@ class FlatMapGroupsWithStateSuite extends 
StateStoreMetricsTest with BeforeAndAf
     )
   }
 
-  test("flatMapGroupsWithState - streaming with event time timeout") {
+  test("flatMapGroupsWithState - streaming with event time timeout + 
watermark") {
     // Function to maintain the max event time
     // Returns the max event time in the state, or -1 if the state was removed 
by timeout
     val stateFunc = (
@@ -761,6 +761,44 @@ class FlatMapGroupsWithStateSuite extends 
StateStoreMetricsTest with BeforeAndAf
     assert(e.getMessage === "The output mode of function should be append or 
update")
   }
 
+  def testWithTimeout(timeoutConf: GroupStateTimeout): Unit = {
+    test("SPARK-20714: watermark does not fail query when timeout = " + 
timeoutConf) {
+      // Function to maintain running count up to 2, and then remove the count
+      // Returns the data and the count (-1 if count reached beyond 2 and 
state was just removed)
+      val stateFunc =
+      (key: String, values: Iterator[(String, Long)], state: 
GroupState[RunningCount]) => {
+        if (state.hasTimedOut) {
+          state.remove()
+          Iterator((key, "-1"))
+        } else {
+          val count = state.getOption.map(_.count).getOrElse(0L) + values.size
+          state.update(RunningCount(count))
+          state.setTimeoutDuration("10 seconds")
+          Iterator((key, count.toString))
+        }
+      }
+
+      val clock = new StreamManualClock
+      val inputData = MemoryStream[(String, Long)]
+      val result =
+        inputData.toDF().toDF("key", "time")
+          .selectExpr("key", "cast(time as timestamp) as timestamp")
+          .withWatermark("timestamp", "10 second")
+          .as[(String, Long)]
+          .groupByKey(x => x._1)
+          .flatMapGroupsWithState(Update, ProcessingTimeTimeout)(stateFunc)
+
+      testStream(result, Update)(
+        StartStream(ProcessingTime("1 second"), triggerClock = clock),
+        AddData(inputData, ("a", 1L)),
+        AdvanceManualClock(1 * 1000),
+        CheckLastBatch(("a", "1"))
+      )
+    }
+  }
+  testWithTimeout(NoTimeout)
+  testWithTimeout(ProcessingTimeTimeout)
+
   def testStateUpdateWithData(
       testName: String,
       stateUpdates: GroupState[Int] => Unit,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to