Repository: spark Updated Branches: refs/heads/master b0768538e -> aa0364510
[SPARK-12447][YARN] Only update the states when executor is successfully launched The details is described in https://issues.apache.org/jira/browse/SPARK-12447. vanzin Please help to review, thanks a lot. Author: jerryshao <ss...@hortonworks.com> Closes #10412 from jerryshao/SPARK-12447. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/aa036451 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/aa036451 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/aa036451 Branch: refs/heads/master Commit: aa0364510792c18a0973b6096cd38f611fc1c1a6 Parents: b076853 Author: jerryshao <ss...@hortonworks.com> Authored: Thu Jun 9 17:31:19 2016 -0700 Committer: Marcelo Vanzin <van...@cloudera.com> Committed: Thu Jun 9 17:31:19 2016 -0700 ---------------------------------------------------------------------- .../spark/deploy/yarn/ExecutorRunnable.scala | 5 +- .../spark/deploy/yarn/YarnAllocator.scala | 72 ++++++++++++-------- 2 files changed, 47 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/aa036451/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala ---------------------------------------------------------------------- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index fc753b7..3d0e996 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -55,15 +55,14 @@ private[yarn] class ExecutorRunnable( executorCores: Int, appId: String, securityMgr: SecurityManager, - localResources: Map[String, LocalResource]) - extends Runnable with Logging { + localResources: Map[String, LocalResource]) extends Logging { var rpc: YarnRPC = YarnRPC.create(conf) var nmClient: NMClient = _ val yarnConf: YarnConfiguration = new YarnConfiguration(conf) lazy val env = prepareEnvironment(container) - override def run(): Unit = { + def run(): Unit = { logInfo("Starting Executor Container") nmClient = NMClient.createNMClient() nmClient.init(yarnConf) http://git-wip-us.apache.org/repos/asf/spark/blob/aa036451/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala ---------------------------------------------------------------------- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 066c665..b110d82 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -24,6 +24,7 @@ import java.util.regex.Pattern import scala.collection.mutable import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue} import scala.collection.JavaConverters._ +import scala.util.control.NonFatal import org.apache.hadoop.conf.Configuration import org.apache.hadoop.yarn.api.records._ @@ -472,41 +473,58 @@ private[yarn] class YarnAllocator( */ private def runAllocatedContainers(containersToUse: ArrayBuffer[Container]): Unit = { for (container <- containersToUse) { - numExecutorsRunning += 1 - assert(numExecutorsRunning <= targetNumExecutors) + executorIdCounter += 1 val executorHostname = container.getNodeId.getHost val containerId = container.getId - executorIdCounter += 1 val executorId = executorIdCounter.toString - assert(container.getResource.getMemory >= resource.getMemory) - logInfo("Launching container %s for on host %s".format(containerId, executorHostname)) - executorIdToContainer(executorId) = container - containerIdToExecutorId(container.getId) = executorId - - val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname, - new HashSet[ContainerId]) - - containerSet += containerId - allocatedContainerToHostMap.put(containerId, executorHostname) - - val executorRunnable = new ExecutorRunnable( - container, - conf, - sparkConf, - driverUrl, - executorId, - executorHostname, - executorMemory, - executorCores, - appAttemptId.getApplicationId.toString, - securityMgr, - localResources) + + def updateInternalState(): Unit = synchronized { + numExecutorsRunning += 1 + assert(numExecutorsRunning <= targetNumExecutors) + executorIdToContainer(executorId) = container + containerIdToExecutorId(container.getId) = executorId + + val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname, + new HashSet[ContainerId]) + containerSet += containerId + allocatedContainerToHostMap.put(containerId, executorHostname) + } + if (launchContainers) { logInfo("Launching ExecutorRunnable. driverUrl: %s, executorHostname: %s".format( driverUrl, executorHostname)) - launcherPool.execute(executorRunnable) + + launcherPool.execute(new Runnable { + override def run(): Unit = { + try { + new ExecutorRunnable( + container, + conf, + sparkConf, + driverUrl, + executorId, + executorHostname, + executorMemory, + executorCores, + appAttemptId.getApplicationId.toString, + securityMgr, + localResources + ).run() + updateInternalState() + } catch { + case NonFatal(e) => + logError(s"Failed to launch executor $executorId on container $containerId", e) + // Assigned container should be released immediately to avoid unnecessary resource + // occupation. + amClient.releaseAssignedContainer(containerId) + } + } + }) + } else { + // For test only + updateInternalState() } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org