Repository: spark
Updated Branches:
  refs/heads/branch-2.0 bc683f037 -> 0fb01496c


[SPARK-17022][YARN] Handle potential deadlock in driver handling messages

## What changes were proposed in this pull request?

We directly send RequestExecutors to AM instead of transfer it to 
yarnShedulerBackend first, to avoid potential deadlock.

## How was this patch tested?

manual tests

Author: WangTaoTheTonic <wangtao...@huawei.com>

Closes #14605 from WangTaoTheTonic/lock.

(cherry picked from commit ea0bf91b4a2ca3ef472906e50e31fd6268b6f53e)
Signed-off-by: Marcelo Vanzin <van...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0fb01496
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0fb01496
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0fb01496

Branch: refs/heads/branch-2.0
Commit: 0fb01496c09defa1436dbb7f5e1cbc5461617a31
Parents: bc683f0
Author: WangTaoTheTonic <wangtao...@huawei.com>
Authored: Thu Aug 11 15:09:23 2016 -0700
Committer: Marcelo Vanzin <van...@cloudera.com>
Committed: Thu Aug 11 15:09:32 2016 -0700

----------------------------------------------------------------------
 .../scheduler/cluster/YarnSchedulerBackend.scala  | 18 +++++++++++++++---
 1 file changed, 15 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0fb01496/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
 
b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index 6b3c831..ea63ff5 100644
--- 
a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ 
b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -125,8 +125,20 @@ private[spark] abstract class YarnSchedulerBackend(
    * This includes executors already pending or running.
    */
   override def doRequestTotalExecutors(requestedTotal: Int): Boolean = {
-    yarnSchedulerEndpointRef.askWithRetry[Boolean](
-      RequestExecutors(requestedTotal, localityAwareTasks, 
hostToLocalTaskCount))
+    val r = RequestExecutors(requestedTotal, localityAwareTasks, 
hostToLocalTaskCount)
+    yarnSchedulerEndpoint.amEndpoint match {
+      case Some(am) =>
+        try {
+          am.askWithRetry[Boolean](r)
+        } catch {
+          case NonFatal(e) =>
+            logError(s"Sending $r to AM was unsuccessful", e)
+            return false
+        }
+      case None =>
+        logWarning("Attempted to request executors before the AM has 
registered!")
+        return false
+    }
   }
 
   /**
@@ -209,7 +221,7 @@ private[spark] abstract class YarnSchedulerBackend(
    */
   private class YarnSchedulerEndpoint(override val rpcEnv: RpcEnv)
     extends ThreadSafeRpcEndpoint with Logging {
-    private var amEndpoint: Option[RpcEndpointRef] = None
+    var amEndpoint: Option[RpcEndpointRef] = None
 
     private val askAmThreadPool =
       
ThreadUtils.newDaemonCachedThreadPool("yarn-scheduler-ask-am-thread-pool")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to