Repository: spark
Updated Branches:
  refs/heads/branch-1.5 b8b1fbfc8 -> dc058f2ff


[SPARK-11511][STREAMING] Fix NPE when an InputDStream is not used

Just ignored `InputDStream`s that have null `rememberDuration` in 
`DStreamGraph.getMaxInputStreamRememberDuration`.

Author: Shixiong Zhu <shixi...@databricks.com>

Closes #9476 from zsxwing/SPARK-11511.

(cherry picked from commit cf69ce136590fea51843bc54f44f0f45c7d0ac36)
Signed-off-by: Sean Owen <so...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dc058f2f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dc058f2f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dc058f2f

Branch: refs/heads/branch-1.5
Commit: dc058f2fff6ee378aeaccd756a272a5e44be1c6c
Parents: b8b1fbf
Author: Shixiong Zhu <shixi...@databricks.com>
Authored: Fri Nov 6 14:51:53 2015 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Fri Nov 6 14:52:21 2015 +0000

----------------------------------------------------------------------
 .../org/apache/spark/streaming/DStreamGraph.scala   |  3 ++-
 .../spark/streaming/StreamingContextSuite.scala     | 16 ++++++++++++++++
 2 files changed, 18 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/dc058f2f/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
index 40789c6..8138caa 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
@@ -169,7 +169,8 @@ final private[streaming] class DStreamGraph extends 
Serializable with Logging {
    * safe remember duration which can be used to perform cleanup operations.
    */
   def getMaxInputStreamRememberDuration(): Duration = {
-    inputStreams.map { _.rememberDuration }.maxBy { _.milliseconds }
+    // If an InputDStream is not used, its `rememberDuration` will be null and 
we can ignore them
+    inputStreams.map(_.rememberDuration).filter(_ != 
null).maxBy(_.milliseconds)
   }
 
   @throws(classOf[IOException])

http://git-wip-us.apache.org/repos/asf/spark/blob/dc058f2f/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index c7a8771..860fac2 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -780,6 +780,22 @@ class StreamingContextSuite extends SparkFunSuite with 
BeforeAndAfter with Timeo
       "Please don't use queueStream when checkpointing is enabled."))
   }
 
+  test("Creating an InputDStream but not using it should not crash") {
+    ssc = new StreamingContext(master, appName, batchDuration)
+    val input1 = addInputStream(ssc)
+    val input2 = addInputStream(ssc)
+    val output = new TestOutputStream(input2)
+    output.register()
+    val batchCount = new BatchCounter(ssc)
+    ssc.start()
+    // Just wait for completing 2 batches to make sure it triggers
+    // `DStream.getMaxInputStreamRememberDuration`
+    batchCount.waitUntilBatchesCompleted(2, 10000)
+    // Throw the exception if crash
+    ssc.awaitTerminationOrTimeout(1)
+    ssc.stop()
+  }
+
   def addInputStream(s: StreamingContext): DStream[Int] = {
     val input = (1 to 100).map(i => 1 to i)
     val inputStream = new TestInputStream(s, input, 1)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to