Repository: spark Updated Branches: refs/heads/branch-1.2 6a46cc3c8 -> 01adf45a9
[SPARK-4802] [streaming] Remove receiverInfo once receiver is de-registered Once the streaming receiver is de-registered at executor, the `ReceiverTrackerActor` needs to remove the corresponding reveiverInfo from the `receiverInfo` map at `ReceiverTracker`. Author: Ilayaperumal Gopinathan <igopinat...@pivotal.io> Closes #3647 from ilayaperumalg/receiverInfo-RTracker and squashes the following commits: 6eb97d5 [Ilayaperumal Gopinathan] Polishing based on the review 3640c86 [Ilayaperumal Gopinathan] Remove receiverInfo once receiver is de-registered (cherry picked from commit 10d69e9cbfdabe95d0e513176d5347d7b59da0ee) Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/01adf45a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/01adf45a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/01adf45a Branch: refs/heads/branch-1.2 Commit: 01adf45a9b2e0264ee4571dd51c04a57126b666a Parents: 6a46cc3 Author: Ilayaperumal Gopinathan <igopinat...@pivotal.io> Authored: Tue Dec 23 15:14:54 2014 -0800 Committer: Tathagata Das <tathagata.das1...@gmail.com> Committed: Tue Dec 23 15:15:10 2014 -0800 ---------------------------------------------------------------------- .../org/apache/spark/streaming/scheduler/ReceiverTracker.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/01adf45a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index 32e481d..1f0e442 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -150,8 +150,8 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false logWarning("No prior receiver info") ReceiverInfo(streamId, "", null, false, "", lastErrorMessage = message, lastError = error) } - receiverInfo(streamId) = newReceiverInfo - listenerBus.post(StreamingListenerReceiverStopped(receiverInfo(streamId))) + receiverInfo -= streamId + listenerBus.post(StreamingListenerReceiverStopped(newReceiverInfo)) val messageWithError = if (error != null && !error.isEmpty) { s"$message - $error" } else { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org