Repository: spark
Updated Branches:
  refs/heads/branch-1.6 abe36c53d -> d98fb19c1


[SPARK-15606][CORE] Use non-blocking removeExecutor call to avoid deadlocks

## What changes were proposed in this pull request?
Set minimum number of dispatcher threads to 3 to avoid deadlocks on machines 
with only 2 cores

## How was this patch tested?

Spark test builds

Author: Pete Robbins <robbin...@gmail.com>

Closes #13355 from robbinspg/SPARK-13906.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d98fb19c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d98fb19c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d98fb19c

Branch: refs/heads/branch-1.6
Commit: d98fb19c18f0122f335e5d810a2f8ff752b98d86
Parents: abe36c5
Author: Pete Robbins <robbin...@gmail.com>
Authored: Thu Jun 2 10:14:51 2016 -0700
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Tue Jun 21 14:21:51 2016 -0700

----------------------------------------------------------------------
 .../scheduler/cluster/CoarseGrainedSchedulerBackend.scala    | 2 +-
 .../scala/org/apache/spark/storage/BlockManagerMaster.scala  | 8 ++++++++
 2 files changed, 9 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d98fb19c/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 87f2dbf..75b1d29 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -273,7 +273,7 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
           // manager to reregister itself. If that happens, the block manager 
master will know
           // about the executor, but the scheduler will not. Therefore, we 
should remove the
           // executor from the block manager when we hit this case.
-          scheduler.sc.env.blockManager.master.removeExecutor(executorId)
+          scheduler.sc.env.blockManager.master.removeExecutorAsync(executorId)
           logInfo(s"Asked to remove non-existent executor $executorId")
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/d98fb19c/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index 440c4c1..10e1d9e 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -41,6 +41,14 @@ class BlockManagerMaster(
     logInfo("Removed " + execId + " successfully in removeExecutor")
   }
 
+  /** Request removal of a dead executor from the driver endpoint.
+   *  This is only called on the driver side. Non-blocking
+   */
+  def removeExecutorAsync(execId: String) {
+    driverEndpoint.ask[Boolean](RemoveExecutor(execId))
+    logInfo("Removal of executor " + execId + " requested")
+  }
+
   /** Register the BlockManager's id with the driver. */
   def registerBlockManager(
       blockManagerId: BlockManagerId, maxMemSize: Long, slaveEndpoint: 
RpcEndpointRef): Unit = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to