Repository: spark
Updated Branches:
  refs/heads/branch-1.6 2f390d306 -> a907c7c64


[SPARK-13195][STREAMING] Fix NoSuchElementException when a state is not set but 
timeoutThreshold is defined

Check the state Existence before calling get.

Author: Shixiong Zhu <shixi...@databricks.com>

Closes #11081 from zsxwing/SPARK-13195.

(cherry picked from commit 8e2f296306131e6c7c2f06d6672995d3ff8ab021)
Signed-off-by: Shixiong Zhu <shixi...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a907c7c6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a907c7c6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a907c7c6

Branch: refs/heads/branch-1.6
Commit: a907c7c64887833770cd593eecccf53620de59b7
Parents: 2f390d3
Author: Shixiong Zhu <shixi...@databricks.com>
Authored: Thu Feb 4 12:43:16 2016 -0800
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Thu Feb 4 12:43:25 2016 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/streaming/rdd/MapWithStateRDD.scala  | 3 ++-
 .../org/apache/spark/streaming/rdd/MapWithStateRDDSuite.scala   | 5 +++++
 2 files changed, 7 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a907c7c6/streaming/src/main/scala/org/apache/spark/streaming/rdd/MapWithStateRDD.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/rdd/MapWithStateRDD.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/rdd/MapWithStateRDD.scala
index fdf6167..a301ba0 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/rdd/MapWithStateRDD.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/rdd/MapWithStateRDD.scala
@@ -57,7 +57,8 @@ private[streaming] object MapWithStateRDDRecord {
       val returned = mappingFunction(batchTime, key, Some(value), wrappedState)
       if (wrappedState.isRemoved) {
         newStateMap.remove(key)
-      } else if (wrappedState.isUpdated || timeoutThresholdTime.isDefined) {
+      } else if (wrappedState.isUpdated
+          || (wrappedState.exists && timeoutThresholdTime.isDefined)) {
         newStateMap.put(key, wrappedState.get(), batchTime.milliseconds)
       }
       mappedData ++= returned

http://git-wip-us.apache.org/repos/asf/spark/blob/a907c7c6/streaming/src/test/scala/org/apache/spark/streaming/rdd/MapWithStateRDDSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/rdd/MapWithStateRDDSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/rdd/MapWithStateRDDSuite.scala
index aa95bd3..2b71a1b 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/rdd/MapWithStateRDDSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/rdd/MapWithStateRDDSuite.scala
@@ -185,6 +185,11 @@ class MapWithStateRDDSuite extends SparkFunSuite with 
RDDCheckpointTester with B
       timeoutThreshold = Some(initialTime + 1), removeTimedoutData = true,
       expectedStates = Nil, expectedTimingOutStates = Nil, 
expectedRemovedStates = Seq(123))
 
+    // If a state is not set but timeoutThreshold is defined, we should ignore 
this state.
+    // Previously it threw NoSuchElementException (SPARK-13195).
+    assertRecordUpdate(initStates = Seq(), data = Seq("noop"),
+      timeoutThreshold = Some(initialTime + 1), removeTimedoutData = true,
+      expectedStates = Nil, expectedTimingOutStates = Nil)
   }
 
   test("states generated by MapWithStateRDD") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to