spark git commit: [SPARK-17022][YARN] Handle potential deadlock in driver handling messages

2016-08-12 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 bc683f037 -> 0fb01496c


[SPARK-17022][YARN] Handle potential deadlock in driver handling messages

## What changes were proposed in this pull request?

We directly send RequestExecutors to AM instead of transfer it to 
yarnShedulerBackend first, to avoid potential deadlock.

## How was this patch tested?

manual tests

Author: WangTaoTheTonic 

Closes #14605 from WangTaoTheTonic/lock.

(cherry picked from commit ea0bf91b4a2ca3ef472906e50e31fd6268b6f53e)
Signed-off-by: Marcelo Vanzin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0fb01496
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0fb01496
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0fb01496

Branch: refs/heads/branch-2.0
Commit: 0fb01496c09defa1436dbb7f5e1cbc5461617a31
Parents: bc683f0
Author: WangTaoTheTonic 
Authored: Thu Aug 11 15:09:23 2016 -0700
Committer: Marcelo Vanzin 
Committed: Thu Aug 11 15:09:32 2016 -0700

--
 .../scheduler/cluster/YarnSchedulerBackend.scala  | 18 +++---
 1 file changed, 15 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0fb01496/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
--
diff --git 
a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
 
b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index 6b3c831..ea63ff5 100644
--- 
a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ 
b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -125,8 +125,20 @@ private[spark] abstract class YarnSchedulerBackend(
* This includes executors already pending or running.
*/
   override def doRequestTotalExecutors(requestedTotal: Int): Boolean = {
-yarnSchedulerEndpointRef.askWithRetry[Boolean](
-  RequestExecutors(requestedTotal, localityAwareTasks, 
hostToLocalTaskCount))
+val r = RequestExecutors(requestedTotal, localityAwareTasks, 
hostToLocalTaskCount)
+yarnSchedulerEndpoint.amEndpoint match {
+  case Some(am) =>
+try {
+  am.askWithRetry[Boolean](r)
+} catch {
+  case NonFatal(e) =>
+logError(s"Sending $r to AM was unsuccessful", e)
+return false
+}
+  case None =>
+logWarning("Attempted to request executors before the AM has 
registered!")
+return false
+}
   }
 
   /**
@@ -209,7 +221,7 @@ private[spark] abstract class YarnSchedulerBackend(
*/
   private class YarnSchedulerEndpoint(override val rpcEnv: RpcEnv)
 extends ThreadSafeRpcEndpoint with Logging {
-private var amEndpoint: Option[RpcEndpointRef] = None
+var amEndpoint: Option[RpcEndpointRef] = None
 
 private val askAmThreadPool =
   
ThreadUtils.newDaemonCachedThreadPool("yarn-scheduler-ask-am-thread-pool")


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-17022][YARN] Handle potential deadlock in driver handling messages

2016-08-11 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/master 4ec5c360c -> ea0bf91b4


[SPARK-17022][YARN] Handle potential deadlock in driver handling messages

## What changes were proposed in this pull request?

We directly send RequestExecutors to AM instead of transfer it to 
yarnShedulerBackend first, to avoid potential deadlock.

## How was this patch tested?

manual tests

Author: WangTaoTheTonic 

Closes #14605 from WangTaoTheTonic/lock.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ea0bf91b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ea0bf91b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ea0bf91b

Branch: refs/heads/master
Commit: ea0bf91b4a2ca3ef472906e50e31fd6268b6f53e
Parents: 4ec5c36
Author: WangTaoTheTonic 
Authored: Thu Aug 11 15:09:23 2016 -0700
Committer: Marcelo Vanzin 
Committed: Thu Aug 11 15:09:23 2016 -0700

--
 .../scheduler/cluster/YarnSchedulerBackend.scala  | 18 +++---
 1 file changed, 15 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ea0bf91b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
--
diff --git 
a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
 
b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index 6b3c831..ea63ff5 100644
--- 
a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ 
b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -125,8 +125,20 @@ private[spark] abstract class YarnSchedulerBackend(
* This includes executors already pending or running.
*/
   override def doRequestTotalExecutors(requestedTotal: Int): Boolean = {
-yarnSchedulerEndpointRef.askWithRetry[Boolean](
-  RequestExecutors(requestedTotal, localityAwareTasks, 
hostToLocalTaskCount))
+val r = RequestExecutors(requestedTotal, localityAwareTasks, 
hostToLocalTaskCount)
+yarnSchedulerEndpoint.amEndpoint match {
+  case Some(am) =>
+try {
+  am.askWithRetry[Boolean](r)
+} catch {
+  case NonFatal(e) =>
+logError(s"Sending $r to AM was unsuccessful", e)
+return false
+}
+  case None =>
+logWarning("Attempted to request executors before the AM has 
registered!")
+return false
+}
   }
 
   /**
@@ -209,7 +221,7 @@ private[spark] abstract class YarnSchedulerBackend(
*/
   private class YarnSchedulerEndpoint(override val rpcEnv: RpcEnv)
 extends ThreadSafeRpcEndpoint with Logging {
-private var amEndpoint: Option[RpcEndpointRef] = None
+var amEndpoint: Option[RpcEndpointRef] = None
 
 private val askAmThreadPool =
   
ThreadUtils.newDaemonCachedThreadPool("yarn-scheduler-ask-am-thread-pool")


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org