Repository: spark
Updated Branches:
  refs/heads/branch-1.2 6a46cc3c8 -> 01adf45a9


[SPARK-4802] [streaming] Remove receiverInfo once receiver is de-registered

  Once the streaming receiver is de-registered at executor, the 
`ReceiverTrackerActor` needs to
remove the corresponding reveiverInfo from the `receiverInfo` map at 
`ReceiverTracker`.

Author: Ilayaperumal Gopinathan <igopinat...@pivotal.io>

Closes #3647 from ilayaperumalg/receiverInfo-RTracker and squashes the 
following commits:

6eb97d5 [Ilayaperumal Gopinathan] Polishing based on the review
3640c86 [Ilayaperumal Gopinathan] Remove receiverInfo once receiver is 
de-registered

(cherry picked from commit 10d69e9cbfdabe95d0e513176d5347d7b59da0ee)
Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/01adf45a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/01adf45a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/01adf45a

Branch: refs/heads/branch-1.2
Commit: 01adf45a9b2e0264ee4571dd51c04a57126b666a
Parents: 6a46cc3
Author: Ilayaperumal Gopinathan <igopinat...@pivotal.io>
Authored: Tue Dec 23 15:14:54 2014 -0800
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Tue Dec 23 15:15:10 2014 -0800

----------------------------------------------------------------------
 .../org/apache/spark/streaming/scheduler/ReceiverTracker.scala   | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/01adf45a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index 32e481d..1f0e442 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -150,8 +150,8 @@ class ReceiverTracker(ssc: StreamingContext, 
skipReceiverLaunch: Boolean = false
         logWarning("No prior receiver info")
         ReceiverInfo(streamId, "", null, false, "", lastErrorMessage = 
message, lastError = error)
     }
-    receiverInfo(streamId) = newReceiverInfo
-    listenerBus.post(StreamingListenerReceiverStopped(receiverInfo(streamId)))
+    receiverInfo -= streamId
+    listenerBus.post(StreamingListenerReceiverStopped(newReceiverInfo))
     val messageWithError = if (error != null && !error.isEmpty) {
       s"$message - $error"
     } else {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to