Repository: spark
Updated Branches:
  refs/heads/master 456c11f15 -> 81f8f3406


[SPARK-4955]With executor dynamic scaling enabled,executor shoude be added or 
killed in yarn-cluster mode.

With executor dynamic scaling enabled, executor number shoude be added or 
killed in yarn-cluster mode.so in yarn-cluster mode, ApplicationMaster start a 
AMActor that add or kill a executor. then YarnSchedulerActor  in 
YarnSchedulerBackend send message to am's AMActor.
andrewor14 ChengXiangLi tdas

Author: lianhuiwang <[email protected]>

Closes #3962 from lianhuiwang/SPARK-4955 and squashes the following commits:

48d9ebb [lianhuiwang] update with andrewor14's comments
12426af [lianhuiwang] refactor am's code
45da3b0 [lianhuiwang] remove unrelated code
9318fc1 [lianhuiwang] update with andrewor14's comments
08ba473 [lianhuiwang] address andrewor14's comments
265c36d [lianhuiwang] fix small change
f43bda8 [lianhuiwang] fix address andrewor14's comments
7a7767a [lianhuiwang] fix address andrewor14's comments
bbc4d5a [lianhuiwang] address andrewor14's comments
1b029a4 [lianhuiwang] fix bug
7d33791 [lianhuiwang] in AM create a new actorSystem
2164ea8 [lianhuiwang] fix a min bug
6dfeeec [lianhuiwang] in yarn-cluster mode,executor number can be added or 
killed.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/81f8f340
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/81f8f340
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/81f8f340

Branch: refs/heads/master
Commit: 81f8f3406284c391dfad14fb70147fa8e20692a8
Parents: 456c11f
Author: lianhuiwang <[email protected]>
Authored: Wed Jan 28 12:50:57 2015 -0800
Committer: Andrew Or <[email protected]>
Committed: Wed Jan 28 12:51:15 2015 -0800

----------------------------------------------------------------------
 .../spark/deploy/yarn/ApplicationMaster.scala   | 57 +++++++++++++++-----
 1 file changed, 43 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/81f8f340/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 902bdda..d3e327b 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -43,8 +43,11 @@ import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils}
 /**
  * Common application master functionality for Spark on Yarn.
  */
-private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
-  client: YarnRMClient) extends Logging {
+private[spark] class ApplicationMaster(
+    args: ApplicationMasterArguments,
+    client: YarnRMClient)
+  extends Logging {
+
   // TODO: Currently, task to container is computed once (TaskSetManager) - 
which need not be
   // optimal as more containers are available. Might need to handle this 
better.
 
@@ -231,6 +234,24 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments,
     reporterThread = launchReporterThread()
   }
 
+  /**
+   * Create an actor that communicates with the driver.
+   *
+   * In cluster mode, the AM and the driver belong to same process
+   * so the AM actor need not monitor lifecycle of the driver.
+   */
+  private def runAMActor(
+      host: String,
+      port: String,
+      isDriver: Boolean): Unit = {
+    val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
+      SparkEnv.driverActorSystemName,
+      host,
+      port,
+      YarnSchedulerBackend.ACTOR_NAME)
+    actor = actorSystem.actorOf(Props(new AMActor(driverUrl, isDriver)), name 
= "YarnAM")
+  }
+
   private def runDriver(securityMgr: SecurityManager): Unit = {
     addAmIpFilter()
     userClassThread = startUserClass()
@@ -245,6 +266,11 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments,
         ApplicationMaster.EXIT_SC_NOT_INITED,
         "Timed out waiting for SparkContext.")
     } else {
+      actorSystem = sc.env.actorSystem
+      runAMActor(
+        sc.getConf.get("spark.driver.host"),
+        sc.getConf.get("spark.driver.port"),
+        isDriver = true)
       registerAM(sc.ui.map(_.appUIAddress).getOrElse(""), securityMgr)
       userClassThread.join()
     }
@@ -253,7 +279,7 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments,
   private def runExecutorLauncher(securityMgr: SecurityManager): Unit = {
     actorSystem = AkkaUtils.createActorSystem("sparkYarnAM", 
Utils.localHostName, 0,
       conf = sparkConf, securityManager = securityMgr)._1
-    actor = waitForSparkDriver()
+    waitForSparkDriver()
     addAmIpFilter()
     registerAM(sparkConf.get("spark.driver.appUIAddress", ""), securityMgr)
 
@@ -367,7 +393,7 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments,
     }
   }
 
-  private def waitForSparkDriver(): ActorRef = {
+  private def waitForSparkDriver(): Unit = {
     logInfo("Waiting for Spark driver to be reachable.")
     var driverUp = false
     val hostport = args.userArgs(0)
@@ -399,12 +425,7 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments,
     sparkConf.set("spark.driver.host", driverHost)
     sparkConf.set("spark.driver.port", driverPort.toString)
 
-    val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
-      SparkEnv.driverActorSystemName,
-      driverHost,
-      driverPort.toString,
-      YarnSchedulerBackend.ACTOR_NAME)
-    actorSystem.actorOf(Props(new AMActor(driverUrl)), name = "YarnAM")
+    runAMActor(driverHost, driverPort.toString, isDriver = false)
   }
 
   /** Add the Yarn IP filter that is required for properly securing the UI. */
@@ -462,9 +483,9 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments,
   }
 
   /**
-   * Actor that communicates with the driver in client deploy mode.
+   * An actor that communicates with the driver's scheduler backend.
    */
-  private class AMActor(driverUrl: String) extends Actor {
+  private class AMActor(driverUrl: String, isDriver: Boolean) extends Actor {
     var driver: ActorSelection = _
 
     override def preStart() = {
@@ -474,13 +495,21 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments,
       // we can monitor Lifecycle Events.
       driver ! "Hello"
       driver ! RegisterClusterManager
-      context.system.eventStream.subscribe(self, 
classOf[RemotingLifecycleEvent])
+      // In cluster mode, the AM can directly monitor the driver status instead
+      // of trying to deduce it from the lifecycle of the driver's actor
+      if (!isDriver) {
+        context.system.eventStream.subscribe(self, 
classOf[RemotingLifecycleEvent])
+      }
     }
 
     override def receive = {
       case x: DisassociatedEvent =>
         logInfo(s"Driver terminated or disconnected! Shutting down. $x")
-        finish(FinalApplicationStatus.SUCCEEDED, 
ApplicationMaster.EXIT_SUCCESS)
+        // In cluster mode, do not rely on the disassociated event to exit
+        // This avoids potentially reporting incorrect exit codes if the 
driver fails
+        if (!isDriver) {
+          finish(FinalApplicationStatus.SUCCEEDED, 
ApplicationMaster.EXIT_SUCCESS)
+        }
 
       case x: AddWebUIFilter =>
         logInfo(s"Add WebUI Filter. $x")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to