Repository: spark
Updated Branches:
  refs/heads/branch-2.0 af2e6e0c9 -> b113b5d9f


[SPARK-17929][CORE] Fix deadlock when CoarseGrainedSchedulerBackend reset

## What changes were proposed in this pull request?

https://issues.apache.org/jira/browse/SPARK-17929

Now `CoarseGrainedSchedulerBackend` reset will get the lock,
```
  protected def reset(): Unit = synchronized {
    numPendingExecutors = 0
    executorsPendingToRemove.clear()

    // Remove all the lingering executors that should be removed but not yet. 
The reason might be
    // because (1) disconnected event is not yet received; (2) executors die 
silently.
    executorDataMap.toMap.foreach { case (eid, _) =>
      driverEndpoint.askWithRetry[Boolean](
        RemoveExecutor(eid, SlaveLost("Stale executor after cluster manager 
re-registered.")))
    }
  }
```
 but on removeExecutor also need the lock 
"CoarseGrainedSchedulerBackend.this.synchronized", this will cause deadlock.

```
   private def removeExecutor(executorId: String, reason: ExecutorLossReason): 
Unit = {
      logDebug(s"Asked to remove executor $executorId with reason $reason")
      executorDataMap.get(executorId) match {
        case Some(executorInfo) =>
          // This must be synchronized because variables mutated
          // in this block are read when requesting executors
          val killed = CoarseGrainedSchedulerBackend.this.synchronized {
            addressToExecutorId -= executorInfo.executorAddress
            executorDataMap -= executorId
            executorsPendingLossReason -= executorId
            executorsPendingToRemove.remove(executorId).getOrElse(false)
          }
     ...

## How was this patch tested?

manual test.

Author: w00228970 <wangf...@huawei.com>

Closes #15481 from scwf/spark-17929.

(cherry picked from commit c1f344f1a09b8834bec70c1ece30b9bff63e55ea)
Signed-off-by: Shixiong Zhu <shixi...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b113b5d9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b113b5d9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b113b5d9

Branch: refs/heads/branch-2.0
Commit: b113b5d9ff100385154ef0f836feb9805db163d2
Parents: af2e6e0
Author: w00228970 <wangf...@huawei.com>
Authored: Fri Oct 21 14:43:55 2016 -0700
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Fri Oct 21 14:44:05 2016 -0700

----------------------------------------------------------------------
 .../cluster/CoarseGrainedSchedulerBackend.scala       | 14 ++++++++------
 1 file changed, 8 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b113b5d9/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index c6b3fdf..2c173db 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -386,15 +386,17 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
    * Reset the state of CoarseGrainedSchedulerBackend to the initial state. 
Currently it will only
    * be called in the yarn-client mode when AM re-registers after a failure.
    * */
-  protected def reset(): Unit = synchronized {
-    numPendingExecutors = 0
-    executorsPendingToRemove.clear()
+  protected def reset(): Unit = {
+    val executors = synchronized {
+      numPendingExecutors = 0
+      executorsPendingToRemove.clear()
+      Set() ++ executorDataMap.keys
+    }
 
     // Remove all the lingering executors that should be removed but not yet. 
The reason might be
     // because (1) disconnected event is not yet received; (2) executors die 
silently.
-    executorDataMap.toMap.foreach { case (eid, _) =>
-      driverEndpoint.askWithRetry[Boolean](
-        RemoveExecutor(eid, SlaveLost("Stale executor after cluster manager 
re-registered.")))
+    executors.foreach { eid =>
+      removeExecutor(eid, SlaveLost("Stale executor after cluster manager 
re-registered."))
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to