Repository: spark
Updated Branches:
  refs/heads/master b0768538e -> aa0364510


[SPARK-12447][YARN] Only update the states when executor is successfully 
launched

The details is described in https://issues.apache.org/jira/browse/SPARK-12447.

vanzin Please help to review, thanks a lot.

Author: jerryshao <ss...@hortonworks.com>

Closes #10412 from jerryshao/SPARK-12447.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/aa036451
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/aa036451
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/aa036451

Branch: refs/heads/master
Commit: aa0364510792c18a0973b6096cd38f611fc1c1a6
Parents: b076853
Author: jerryshao <ss...@hortonworks.com>
Authored: Thu Jun 9 17:31:19 2016 -0700
Committer: Marcelo Vanzin <van...@cloudera.com>
Committed: Thu Jun 9 17:31:19 2016 -0700

----------------------------------------------------------------------
 .../spark/deploy/yarn/ExecutorRunnable.scala    |  5 +-
 .../spark/deploy/yarn/YarnAllocator.scala       | 72 ++++++++++++--------
 2 files changed, 47 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/aa036451/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index fc753b7..3d0e996 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -55,15 +55,14 @@ private[yarn] class ExecutorRunnable(
     executorCores: Int,
     appId: String,
     securityMgr: SecurityManager,
-    localResources: Map[String, LocalResource])
-  extends Runnable with Logging {
+    localResources: Map[String, LocalResource]) extends Logging {
 
   var rpc: YarnRPC = YarnRPC.create(conf)
   var nmClient: NMClient = _
   val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
   lazy val env = prepareEnvironment(container)
 
-  override def run(): Unit = {
+  def run(): Unit = {
     logInfo("Starting Executor Container")
     nmClient = NMClient.createNMClient()
     nmClient.init(yarnConf)

http://git-wip-us.apache.org/repos/asf/spark/blob/aa036451/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 066c665..b110d82 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -24,6 +24,7 @@ import java.util.regex.Pattern
 import scala.collection.mutable
 import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue}
 import scala.collection.JavaConverters._
+import scala.util.control.NonFatal
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.yarn.api.records._
@@ -472,41 +473,58 @@ private[yarn] class YarnAllocator(
    */
   private def runAllocatedContainers(containersToUse: ArrayBuffer[Container]): 
Unit = {
     for (container <- containersToUse) {
-      numExecutorsRunning += 1
-      assert(numExecutorsRunning <= targetNumExecutors)
+      executorIdCounter += 1
       val executorHostname = container.getNodeId.getHost
       val containerId = container.getId
-      executorIdCounter += 1
       val executorId = executorIdCounter.toString
-
       assert(container.getResource.getMemory >= resource.getMemory)
-
       logInfo("Launching container %s for on host %s".format(containerId, 
executorHostname))
-      executorIdToContainer(executorId) = container
-      containerIdToExecutorId(container.getId) = executorId
-
-      val containerSet = 
allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
-        new HashSet[ContainerId])
-
-      containerSet += containerId
-      allocatedContainerToHostMap.put(containerId, executorHostname)
-
-      val executorRunnable = new ExecutorRunnable(
-        container,
-        conf,
-        sparkConf,
-        driverUrl,
-        executorId,
-        executorHostname,
-        executorMemory,
-        executorCores,
-        appAttemptId.getApplicationId.toString,
-        securityMgr,
-        localResources)
+
+      def updateInternalState(): Unit = synchronized {
+        numExecutorsRunning += 1
+        assert(numExecutorsRunning <= targetNumExecutors)
+        executorIdToContainer(executorId) = container
+        containerIdToExecutorId(container.getId) = executorId
+
+        val containerSet = 
allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
+          new HashSet[ContainerId])
+        containerSet += containerId
+        allocatedContainerToHostMap.put(containerId, executorHostname)
+      }
+
       if (launchContainers) {
         logInfo("Launching ExecutorRunnable. driverUrl: %s,  executorHostname: 
%s".format(
           driverUrl, executorHostname))
-        launcherPool.execute(executorRunnable)
+
+        launcherPool.execute(new Runnable {
+          override def run(): Unit = {
+            try {
+              new ExecutorRunnable(
+                container,
+                conf,
+                sparkConf,
+                driverUrl,
+                executorId,
+                executorHostname,
+                executorMemory,
+                executorCores,
+                appAttemptId.getApplicationId.toString,
+                securityMgr,
+                localResources
+              ).run()
+              updateInternalState()
+            } catch {
+              case NonFatal(e) =>
+                logError(s"Failed to launch executor $executorId on container 
$containerId", e)
+                // Assigned container should be released immediately to avoid 
unnecessary resource
+                // occupation.
+                amClient.releaseAssignedContainer(containerId)
+            }
+          }
+        })
+      } else {
+        // For test only
+        updateInternalState()
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to