Repository: spark
Updated Branches:
  refs/heads/master 42d225f44 -> 7bb6d31cf


[SPARK-11232][CORE] Use 'offer' instead of 'put' to make sure calling send 
won't be interrupted

The current `NettyRpcEndpointRef.send` can be interrupted because it uses 
`LinkedBlockingQueue.put`, which may hang the application.

Image the following execution order:

  | thread 1: TaskRunner.kill | thread 2: TaskRunner.run
------------- | ------------- | -------------
1 | killed = true |
2 |  | if (killed) {
3 |  | throw new TaskKilledException
4 |  | case _: TaskKilledException  _: InterruptedException if task.killed =>
5 | task.kill(interruptThread): interruptThread is true |
6 | | execBackend.statusUpdate(taskId, TaskState.KILLED, 
ser.serialize(TaskKilled))
7 | | localEndpoint.send(StatusUpdate(taskId, state, serializedData)): in 
LocalBackend

Then `localEndpoint.send(StatusUpdate(taskId, state, serializedData))` will 
throw `InterruptedException`. This will prevent the executor from updating the 
task status and hang the application.

An failure caused by the above issue here: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44062/consoleFull

Since `receivers` is an unbounded `LinkedBlockingQueue`, we can just use 
`LinkedBlockingQueue.offer` to resolve this issue.

Author: zsxwing <zsxw...@gmail.com>

Closes #9198 from zsxwing/dont-interrupt-send.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7bb6d31c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7bb6d31c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7bb6d31c

Branch: refs/heads/master
Commit: 7bb6d31cff279776f90744407291682774cfe1c2
Parents: 42d225f
Author: zsxwing <zsxw...@gmail.com>
Authored: Thu Oct 22 11:31:47 2015 -0700
Committer: Marcelo Vanzin <van...@cloudera.com>
Committed: Thu Oct 22 11:31:47 2015 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/rpc/netty/Dispatcher.scala     | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7bb6d31c/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala 
b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
index f1a8273..7bf44a6 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
@@ -66,7 +66,7 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) 
extends Logging {
       }
       val data = endpoints.get(name)
       endpointRefs.put(data.endpoint, data.ref)
-      receivers.put(data)  // for the OnStart message
+      receivers.offer(data)  // for the OnStart message
     }
     endpointRef
   }
@@ -80,7 +80,7 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) 
extends Logging {
     val data = endpoints.remove(name)
     if (data != null) {
       data.inbox.stop()
-      receivers.put(data)  // for the OnStop message
+      receivers.offer(data)  // for the OnStop message
     }
     // Don't clean `endpointRefs` here because it's possible that some 
messages are being processed
     // now and they can use `getRpcEndpointRef`. So `endpointRefs` will be 
cleaned in Inbox via
@@ -163,7 +163,7 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) 
extends Logging {
         true
       } else {
         data.inbox.post(createMessageFn(data.ref))
-        receivers.put(data)
+        receivers.offer(data)
         false
       }
     }
@@ -183,7 +183,7 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) 
extends Logging {
     // Stop all endpoints. This will queue all endpoints for processing by the 
message loops.
     endpoints.keySet().asScala.foreach(unregisterRpcEndpoint)
     // Enqueue a message that tells the message loops to stop.
-    receivers.put(PoisonPill)
+    receivers.offer(PoisonPill)
     threadpool.shutdown()
   }
 
@@ -218,7 +218,7 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) 
extends Logging {
             val data = receivers.take()
             if (data == PoisonPill) {
               // Put PoisonPill back so that other MessageLoops can see it.
-              receivers.put(PoisonPill)
+              receivers.offer(PoisonPill)
               return
             }
             data.inbox.process(Dispatcher.this)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to