Repository: spark Updated Branches: refs/heads/branch-2.0 bc683f037 -> 0fb01496c
[SPARK-17022][YARN] Handle potential deadlock in driver handling messages ## What changes were proposed in this pull request? We directly send RequestExecutors to AM instead of transfer it to yarnShedulerBackend first, to avoid potential deadlock. ## How was this patch tested? manual tests Author: WangTaoTheTonic <wangtao...@huawei.com> Closes #14605 from WangTaoTheTonic/lock. (cherry picked from commit ea0bf91b4a2ca3ef472906e50e31fd6268b6f53e) Signed-off-by: Marcelo Vanzin <van...@cloudera.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0fb01496 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0fb01496 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0fb01496 Branch: refs/heads/branch-2.0 Commit: 0fb01496c09defa1436dbb7f5e1cbc5461617a31 Parents: bc683f0 Author: WangTaoTheTonic <wangtao...@huawei.com> Authored: Thu Aug 11 15:09:23 2016 -0700 Committer: Marcelo Vanzin <van...@cloudera.com> Committed: Thu Aug 11 15:09:32 2016 -0700 ---------------------------------------------------------------------- .../scheduler/cluster/YarnSchedulerBackend.scala | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/0fb01496/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala ---------------------------------------------------------------------- diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index 6b3c831..ea63ff5 100644 --- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -125,8 +125,20 @@ private[spark] abstract class YarnSchedulerBackend( * This includes executors already pending or running. */ override def doRequestTotalExecutors(requestedTotal: Int): Boolean = { - yarnSchedulerEndpointRef.askWithRetry[Boolean]( - RequestExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount)) + val r = RequestExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount) + yarnSchedulerEndpoint.amEndpoint match { + case Some(am) => + try { + am.askWithRetry[Boolean](r) + } catch { + case NonFatal(e) => + logError(s"Sending $r to AM was unsuccessful", e) + return false + } + case None => + logWarning("Attempted to request executors before the AM has registered!") + return false + } } /** @@ -209,7 +221,7 @@ private[spark] abstract class YarnSchedulerBackend( */ private class YarnSchedulerEndpoint(override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { - private var amEndpoint: Option[RpcEndpointRef] = None + var amEndpoint: Option[RpcEndpointRef] = None private val askAmThreadPool = ThreadUtils.newDaemonCachedThreadPool("yarn-scheduler-ask-am-thread-pool") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org