Repository: spark
Updated Branches:
  refs/heads/master cd47e2337 -> 938434dc7


[SPARK-15913][CORE] Dispatcher.stopped should be enclosed by synchronized block.

## What changes were proposed in this pull request?

`Dispatcher.stopped` is guarded by `this`, but it is used without 
synchronization in `postMessage` function. This PR fixes this and also the 
exception message became more accurate.

## How was this patch tested?

Pass the existing Jenkins tests.

Author: Dongjoon Hyun <dongj...@apache.org>

Closes #13634 from dongjoon-hyun/SPARK-15913.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/938434dc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/938434dc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/938434dc

Branch: refs/heads/master
Commit: 938434dc78f35f77cdebd15dcce8d5e7871b396b
Parents: cd47e23
Author: Dongjoon Hyun <dongj...@apache.org>
Authored: Mon Jun 13 10:30:17 2016 -0700
Committer: Marcelo Vanzin <van...@cloudera.com>
Committed: Mon Jun 13 10:30:17 2016 -0700

----------------------------------------------------------------------
 .../org/apache/spark/rpc/netty/Dispatcher.scala | 21 ++++++++------------
 1 file changed, 8 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/938434dc/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala 
b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
index 4f8fe01..d305de2 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
@@ -144,25 +144,20 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) 
extends Logging {
       endpointName: String,
       message: InboxMessage,
       callbackIfStopped: (Exception) => Unit): Unit = {
-    val shouldCallOnStop = synchronized {
+    val error = synchronized {
       val data = endpoints.get(endpointName)
-      if (stopped || data == null) {
-        true
+      if (stopped) {
+        Some(new RpcEnvStoppedException())
+      } else if (data == null) {
+        Some(new SparkException(s"Could not find $endpointName."))
       } else {
         data.inbox.post(message)
         receivers.offer(data)
-        false
+        None
       }
     }
-    if (shouldCallOnStop) {
-      // We don't need to call `onStop` in the `synchronized` block
-      val error = if (stopped) {
-          new RpcEnvStoppedException()
-        } else {
-          new SparkException(s"Could not find $endpointName or it has been 
stopped.")
-        }
-      callbackIfStopped(error)
-    }
+    // We don't need to call `onStop` in the `synchronized` block
+    error.foreach(callbackIfStopped)
   }
 
   def stop(): Unit = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to