Repository: spark
Updated Branches:
  refs/heads/branch-1.5 bf5b2f26b -> 33ce274cd


[SPARK-10369] [STREAMING] Don't remove ReceiverTrackingInfo when 
deregisterReceivering since we may reuse it later

`deregisterReceiver` should not remove `ReceiverTrackingInfo`. Otherwise, it 
will throw `java.util.NoSuchElementException: key not found` when restarting it.

Author: zsxwing <zsxw...@gmail.com>

Closes #8538 from zsxwing/SPARK-10369.

(cherry picked from commit 4a5fe091658b1d06f427e404a11a84fc84f953c5)
Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/33ce274c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/33ce274c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/33ce274c

Branch: refs/heads/branch-1.5
Commit: 33ce274cdf7538b5816f1a400b2fad394ec2a056
Parents: bf5b2f2
Author: zsxwing <zsxw...@gmail.com>
Authored: Mon Aug 31 12:19:11 2015 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Mon Aug 31 12:19:48 2015 -0700

----------------------------------------------------------------------
 .../streaming/scheduler/ReceiverTracker.scala   |  4 +-
 .../scheduler/ReceiverTrackerSuite.scala        | 51 ++++++++++++++++++++
 2 files changed, 53 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/33ce274c/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index 3d532a6..f86fd44 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -291,7 +291,7 @@ class ReceiverTracker(ssc: StreamingContext, 
skipReceiverLaunch: Boolean = false
         ReceiverTrackingInfo(
           streamId, ReceiverState.INACTIVE, None, None, None, None, 
Some(errorInfo))
     }
-    receiverTrackingInfos -= streamId
+    receiverTrackingInfos(streamId) = newReceiverTrackingInfo
     
listenerBus.post(StreamingListenerReceiverStopped(newReceiverTrackingInfo.toReceiverInfo))
     val messageWithError = if (error != null && !error.isEmpty) {
       s"$message - $error"
@@ -483,7 +483,7 @@ class ReceiverTracker(ssc: StreamingContext, 
skipReceiverLaunch: Boolean = false
         context.reply(true)
       // Local messages
       case AllReceiverIds =>
-        context.reply(receiverTrackingInfos.keys.toSeq)
+        context.reply(receiverTrackingInfos.filter(_._2.state != 
ReceiverState.INACTIVE).keys.toSeq)
       case StopAllReceivers =>
         assert(isTrackerStopping || isTrackerStopped)
         stopReceivers()

http://git-wip-us.apache.org/repos/asf/spark/blob/33ce274c/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
index dd292ba..45138b7 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
@@ -60,6 +60,26 @@ class ReceiverTrackerSuite extends TestSuiteBase {
       }
     }
   }
+
+  test("should restart receiver after stopping it") {
+    withStreamingContext(new StreamingContext(conf, Milliseconds(100))) { ssc 
=>
+      @volatile var startTimes = 0
+      ssc.addStreamingListener(new StreamingListener {
+        override def onReceiverStarted(receiverStarted: 
StreamingListenerReceiverStarted): Unit = {
+          startTimes += 1
+        }
+      })
+      val input = ssc.receiverStream(new StoppableReceiver)
+      val output = new TestOutputStream(input)
+      output.register()
+      ssc.start()
+      StoppableReceiver.shouldStop = true
+      eventually(timeout(10 seconds), interval(10 millis)) {
+        // The receiver is stopped once, so if it's restarted, it should be 
started twice.
+        assert(startTimes === 2)
+      }
+    }
+  }
 }
 
 /** An input DStream with for testing rate controlling */
@@ -132,3 +152,34 @@ private[streaming] object RateTestReceiver {
 
   def getActive(): Option[RateTestReceiver] = Option(activeReceiver)
 }
+
+/**
+ * A custom receiver that could be stopped via StoppableReceiver.shouldStop
+ */
+class StoppableReceiver extends Receiver[Int](StorageLevel.MEMORY_ONLY) {
+
+  var receivingThreadOption: Option[Thread] = None
+
+  def onStart() {
+    val thread = new Thread() {
+      override def run() {
+        while (!StoppableReceiver.shouldStop) {
+          Thread.sleep(10)
+        }
+        StoppableReceiver.this.stop("stop")
+      }
+    }
+    thread.start()
+  }
+
+  def onStop() {
+    StoppableReceiver.shouldStop = true
+    receivingThreadOption.foreach(_.join())
+    // Reset it so as to restart it
+    StoppableReceiver.shouldStop = false
+  }
+}
+
+object StoppableReceiver {
+  @volatile var shouldStop = false
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to