Repository: spark
Updated Branches:
  refs/heads/branch-1.5 08c5962a2 -> a6f8979c8


[SPARK-10102] [STREAMING] Fix a race condition that startReceiver may happen 
before setting trackerState to Started

Test failure: 
https://amplab.cs.berkeley.edu/jenkins/job/Spark-Master-Maven-with-YARN/HADOOP_PROFILE=hadoop-2.4,label=spark-test/3305/testReport/junit/org.apache.spark.streaming/StreamingContextSuite/stop_gracefully/

There is a race condition that setting `trackerState` to `Started` could happen 
after calling `startReceiver`. Then `startReceiver` won't start the receivers 
because it uses `! isTrackerStarted` to check if ReceiverTracker is stopping or 
stopped. But actually, `trackerState` is `Initialized` and will be changed to 
`Started` soon.

Therefore, we should use `isTrackerStopping || isTrackerStopped`.

Author: zsxwing <zsxw...@gmail.com>

Closes #8294 from zsxwing/SPARK-9504.

(cherry picked from commit 90273eff9604439a5a5853077e232d34555c67d7)
Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a6f8979c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a6f8979c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a6f8979c

Branch: refs/heads/branch-1.5
Commit: a6f8979c81c5355759f74e8b3c9eb3cafb6a9c7f
Parents: 08c5962
Author: zsxwing <zsxw...@gmail.com>
Authored: Tue Aug 18 20:15:54 2015 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Tue Aug 18 20:16:18 2015 -0700

----------------------------------------------------------------------
 .../spark/streaming/scheduler/ReceiverTracker.scala      | 11 ++++++++---
 1 file changed, 8 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a6f8979c/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index e076fb5..aae3acf 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -468,8 +468,13 @@ class ReceiverTracker(ssc: StreamingContext, 
skipReceiverLaunch: Boolean = false
      * Start a receiver along with its scheduled executors
      */
     private def startReceiver(receiver: Receiver[_], scheduledExecutors: 
Seq[String]): Unit = {
+      def shouldStartReceiver: Boolean = {
+        // It's okay to start when trackerState is Initialized or Started
+        !(isTrackerStopping || isTrackerStopped)
+      }
+
       val receiverId = receiver.streamId
-      if (!isTrackerStarted) {
+      if (!shouldStartReceiver) {
         onReceiverJobFinish(receiverId)
         return
       }
@@ -494,14 +499,14 @@ class ReceiverTracker(ssc: StreamingContext, 
skipReceiverLaunch: Boolean = false
       // We will keep restarting the receiver job until ReceiverTracker is 
stopped
       future.onComplete {
         case Success(_) =>
-          if (!isTrackerStarted) {
+          if (!shouldStartReceiver) {
             onReceiverJobFinish(receiverId)
           } else {
             logInfo(s"Restarting Receiver $receiverId")
             self.send(RestartReceiver(receiver))
           }
         case Failure(e) =>
-          if (!isTrackerStarted) {
+          if (!shouldStartReceiver) {
             onReceiverJobFinish(receiverId)
           } else {
             logError("Receiver has been stopped. Try to restart it.", e)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to