Repository: spark Updated Branches: refs/heads/master 7d6ff3910 -> 0d3a63193
[SPARK-20714][SS] Fix match error when watermark is set with timeout = no timeout / processing timeout ## What changes were proposed in this pull request? When watermark is set, and timeout conf is NoTimeout or ProcessingTimeTimeout (both do not need the watermark), the query fails at runtime with the following exception. ``` MatchException: Some(org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate1a9b798e) (of class scala.Some) org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec$$anonfun$doExecute$1.apply(FlatMapGroupsWithStateExec.scala:120) org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec$$anonfun$doExecute$1.apply(FlatMapGroupsWithStateExec.scala:116) org.apache.spark.sql.execution.streaming.state.package$StateStoreOps$$anonfun$1.apply(package.scala:70) org.apache.spark.sql.execution.streaming.state.package$StateStoreOps$$anonfun$1.apply(package.scala:65) org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:64) ``` The match did not correctly handle cases where watermark was defined by the timeout was different from EventTimeTimeout. ## How was this patch tested? New unit tests. Author: Tathagata Das <tathagata.das1...@gmail.com> Closes #17954 from tdas/SPARK-20714. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0d3a6319 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0d3a6319 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0d3a6319 Branch: refs/heads/master Commit: 0d3a63193c691ece88bb256d04156258a1c03a81 Parents: 7d6ff39 Author: Tathagata Das <tathagata.das1...@gmail.com> Authored: Fri May 12 10:49:50 2017 -0700 Committer: Shixiong Zhu <shixi...@databricks.com> Committed: Fri May 12 10:49:50 2017 -0700 ---------------------------------------------------------------------- .../streaming/FlatMapGroupsWithStateExec.scala | 2 +- .../streaming/FlatMapGroupsWithStateSuite.scala | 40 +++++++++++++++++++- 2 files changed, 40 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/0d3a6319/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala index e42df5d..5e79232 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala @@ -120,7 +120,7 @@ case class FlatMapGroupsWithStateExec( val filteredIter = watermarkPredicateForData match { case Some(predicate) if timeoutConf == EventTimeTimeout => iter.filter(row => !predicate.eval(row)) - case None => + case _ => iter } http://git-wip-us.apache.org/repos/asf/spark/blob/0d3a6319/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala index 85aa7db..89cfba6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala @@ -589,7 +589,7 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf ) } - test("flatMapGroupsWithState - streaming with event time timeout") { + test("flatMapGroupsWithState - streaming with event time timeout + watermark") { // Function to maintain the max event time // Returns the max event time in the state, or -1 if the state was removed by timeout val stateFunc = ( @@ -761,6 +761,44 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf assert(e.getMessage === "The output mode of function should be append or update") } + def testWithTimeout(timeoutConf: GroupStateTimeout): Unit = { + test("SPARK-20714: watermark does not fail query when timeout = " + timeoutConf) { + // Function to maintain running count up to 2, and then remove the count + // Returns the data and the count (-1 if count reached beyond 2 and state was just removed) + val stateFunc = + (key: String, values: Iterator[(String, Long)], state: GroupState[RunningCount]) => { + if (state.hasTimedOut) { + state.remove() + Iterator((key, "-1")) + } else { + val count = state.getOption.map(_.count).getOrElse(0L) + values.size + state.update(RunningCount(count)) + state.setTimeoutDuration("10 seconds") + Iterator((key, count.toString)) + } + } + + val clock = new StreamManualClock + val inputData = MemoryStream[(String, Long)] + val result = + inputData.toDF().toDF("key", "time") + .selectExpr("key", "cast(time as timestamp) as timestamp") + .withWatermark("timestamp", "10 second") + .as[(String, Long)] + .groupByKey(x => x._1) + .flatMapGroupsWithState(Update, ProcessingTimeTimeout)(stateFunc) + + testStream(result, Update)( + StartStream(ProcessingTime("1 second"), triggerClock = clock), + AddData(inputData, ("a", 1L)), + AdvanceManualClock(1 * 1000), + CheckLastBatch(("a", "1")) + ) + } + } + testWithTimeout(NoTimeout) + testWithTimeout(ProcessingTimeTimeout) + def testStateUpdateWithData( testName: String, stateUpdates: GroupState[Int] => Unit, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org