Repository: spark Updated Branches: refs/heads/branch-1.6 abe36c53d -> d98fb19c1
[SPARK-15606][CORE] Use non-blocking removeExecutor call to avoid deadlocks ## What changes were proposed in this pull request? Set minimum number of dispatcher threads to 3 to avoid deadlocks on machines with only 2 cores ## How was this patch tested? Spark test builds Author: Pete Robbins <robbin...@gmail.com> Closes #13355 from robbinspg/SPARK-13906. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d98fb19c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d98fb19c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d98fb19c Branch: refs/heads/branch-1.6 Commit: d98fb19c18f0122f335e5d810a2f8ff752b98d86 Parents: abe36c5 Author: Pete Robbins <robbin...@gmail.com> Authored: Thu Jun 2 10:14:51 2016 -0700 Committer: Shixiong Zhu <shixi...@databricks.com> Committed: Tue Jun 21 14:21:51 2016 -0700 ---------------------------------------------------------------------- .../scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 2 +- .../scala/org/apache/spark/storage/BlockManagerMaster.scala | 8 ++++++++ 2 files changed, 9 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/d98fb19c/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 87f2dbf..75b1d29 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -273,7 +273,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // manager to reregister itself. If that happens, the block manager master will know // about the executor, but the scheduler will not. Therefore, we should remove the // executor from the block manager when we hit this case. - scheduler.sc.env.blockManager.master.removeExecutor(executorId) + scheduler.sc.env.blockManager.master.removeExecutorAsync(executorId) logInfo(s"Asked to remove non-existent executor $executorId") } } http://git-wip-us.apache.org/repos/asf/spark/blob/d98fb19c/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index 440c4c1..10e1d9e 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -41,6 +41,14 @@ class BlockManagerMaster( logInfo("Removed " + execId + " successfully in removeExecutor") } + /** Request removal of a dead executor from the driver endpoint. + * This is only called on the driver side. Non-blocking + */ + def removeExecutorAsync(execId: String) { + driverEndpoint.ask[Boolean](RemoveExecutor(execId)) + logInfo("Removal of executor " + execId + " requested") + } + /** Register the BlockManager's id with the driver. */ def registerBlockManager( blockManagerId: BlockManagerId, maxMemSize: Long, slaveEndpoint: RpcEndpointRef): Unit = { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org