Repository: spark
Updated Branches:
  refs/heads/master 8c06a5faa -> 2eeada373


SPARK-1714. Take advantage of AMRMClient APIs to simplify logic in YarnA...

...llocator

The goal of this PR is to simplify YarnAllocator as much as possible and get it 
up to the level of code quality we see in the rest of Spark.

In service of this, it does a few things:
* Uses AMRMClient APIs for matching containers to requests.
* Adds calls to AMRMClient.removeContainerRequest so that, when we use a 
container, we don't end up requesting it again.
* Removes YarnAllocator's host->rack cache. YARN's RackResolver already does 
this caching, so this is redundant.
* Adds tests for basic YarnAllocator functionality.
* Breaks up the allocateResources method, which was previously nearly 300 lines.
* A little bit of stylistic cleanup.
* Fixes a bug that causes three times the requests to be filed when preferred 
host locations are given.

The patch is lossy. In particular, it loses the logic for trying to avoid 
containers bunching up on nodes. As I understand it, the logic that's gone is:

* If, in a single response from the RM, we receive a set of containers on a 
node, and prefer some number of containers on that node greater than 0 but less 
than the number we received, give back the delta between what we preferred and 
what we received.

This seems like a weird way to avoid bunching E.g. it does nothing to avoid 
bunching when we don't request containers on particular nodes.

Author: Sandy Ryza <sa...@cloudera.com>

Closes #3765 from sryza/sandy-spark-1714 and squashes the following commits:

32a5942 [Sandy Ryza] Muffle RackResolver logs
74f56dd [Sandy Ryza] Fix a couple comments and simplify requestTotalExecutors
60ea4bd [Sandy Ryza] Fix scalastyle
ca35b53 [Sandy Ryza] Simplify further
e9cf8a6 [Sandy Ryza] Fix YarnClusterSuite
257acf3 [Sandy Ryza] Remove locality stuff and more cleanup
59a3c5e [Sandy Ryza] Take out rack stuff
5f72fd5 [Sandy Ryza] Further documentation and cleanup
89edd68 [Sandy Ryza] SPARK-1714. Take advantage of AMRMClient APIs to simplify 
logic in YarnAllocator


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2eeada37
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2eeada37
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2eeada37

Branch: refs/heads/master
Commit: 2eeada373e59d63b774ba92eb5d75fcd3a1cf8f4
Parents: 8c06a5f
Author: Sandy Ryza <sa...@cloudera.com>
Authored: Wed Jan 21 10:31:54 2015 -0600
Committer: Thomas Graves <tgra...@apache.org>
Committed: Wed Jan 21 10:31:54 2015 -0600

----------------------------------------------------------------------
 .../org/apache/spark/log4j-defaults.properties  |   1 +
 .../spark/deploy/yarn/YarnAllocator.scala       | 733 ++++++-------------
 .../apache/spark/deploy/yarn/YarnRMClient.scala |   3 +-
 .../spark/deploy/yarn/YarnSparkHadoopUtil.scala |  41 +-
 .../cluster/YarnClientClusterScheduler.scala    |   5 +-
 .../cluster/YarnClusterScheduler.scala          |   6 +-
 .../spark/deploy/yarn/YarnAllocatorSuite.scala  | 150 +++-
 7 files changed, 389 insertions(+), 550 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2eeada37/core/src/main/resources/org/apache/spark/log4j-defaults.properties
----------------------------------------------------------------------
diff --git a/core/src/main/resources/org/apache/spark/log4j-defaults.properties 
b/core/src/main/resources/org/apache/spark/log4j-defaults.properties
index 89eec7d..c99a61f 100644
--- a/core/src/main/resources/org/apache/spark/log4j-defaults.properties
+++ b/core/src/main/resources/org/apache/spark/log4j-defaults.properties
@@ -10,3 +10,4 @@ log4j.logger.org.eclipse.jetty=WARN
 log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
 log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
 log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
+log4j.logger.org.apache.hadoop.yarn.util.RackResolver=WARN

http://git-wip-us.apache.org/repos/asf/spark/blob/2eeada37/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index de65ef2..4c35b60 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -17,8 +17,8 @@
 
 package org.apache.spark.deploy.yarn
 
+import java.util.Collections
 import java.util.concurrent._
-import java.util.concurrent.atomic.AtomicInteger
 import java.util.regex.Pattern
 
 import scala.collection.JavaConversions._
@@ -28,33 +28,26 @@ import 
com.google.common.util.concurrent.ThreadFactoryBuilder
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse
 import org.apache.hadoop.yarn.client.api.AMRMClient
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
-import org.apache.hadoop.yarn.util.Records
+import org.apache.hadoop.yarn.util.RackResolver
 
-import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv}
+import org.apache.spark.{Logging, SecurityManager, SparkConf}
 import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
-import org.apache.spark.scheduler.{SplitInfo, TaskSchedulerImpl}
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
 
-object AllocationType extends Enumeration {
-  type AllocationType = Value
-  val HOST, RACK, ANY = Value
-}
-
-// TODO:
-// Too many params.
-// Needs to be mt-safe
-// Need to refactor this to make it 'cleaner' ... right now, all computation 
is reactive - should
-// make it more proactive and decoupled.
-
-// Note that right now, we assume all node asks as uniform in terms of 
capabilities and priority
-// Refer to 
http://developer.yahoo.com/blogs/hadoop/posts/2011/03/mapreduce-nextgen-scheduler/
 for
-// more info on how we are requesting for containers.
-
 /**
- * Acquires resources for executors from a ResourceManager and launches 
executors in new containers.
+ * YarnAllocator is charged with requesting containers from the YARN 
ResourceManager and deciding
+ * what to do with containers when YARN fulfills these requests.
+ *
+ * This class makes use of YARN's AMRMClient APIs. We interact with the 
AMRMClient in three ways:
+ * * Making our resource needs known, which updates local bookkeeping about 
containers requested.
+ * * Calling "allocate", which syncs our local container requests with the RM, 
and returns any
+ *   containers that YARN has granted to us.  This also functions as a 
heartbeat.
+ * * Processing the containers granted to us to possibly launch executors 
inside of them.
+ *
+ * The public methods of this class are thread-safe.  All methods that mutate 
state are
+ * synchronized.
  */
 private[yarn] class YarnAllocator(
     conf: Configuration,
@@ -62,50 +55,42 @@ private[yarn] class YarnAllocator(
     amClient: AMRMClient[ContainerRequest],
     appAttemptId: ApplicationAttemptId,
     args: ApplicationMasterArguments,
-    preferredNodes: collection.Map[String, collection.Set[SplitInfo]],
     securityMgr: SecurityManager)
   extends Logging {
 
   import YarnAllocator._
 
-  // These three are locked on allocatedHostToContainersMap. Complementary 
data structures
-  // allocatedHostToContainersMap : containers which are running : host, 
Set<ContainerId>
-  // allocatedContainerToHostMap: container to host mapping.
-  private val allocatedHostToContainersMap =
-    new HashMap[String, collection.mutable.Set[ContainerId]]()
+  // These two complementary data structures are locked on 
allocatedHostToContainersMap.
+  // Visible for testing.
+  val allocatedHostToContainersMap =
+    new HashMap[String, collection.mutable.Set[ContainerId]]
+  val allocatedContainerToHostMap = new HashMap[ContainerId, String]
 
-  private val allocatedContainerToHostMap = new HashMap[ContainerId, String]()
+  // Containers that we no longer care about. We've either already told the RM 
to release them or
+  // will on the next heartbeat. Containers get removed from this map after 
the RM tells us they've
+  // completed.
+  private val releasedContainers = Collections.newSetFromMap[ContainerId](
+    new ConcurrentHashMap[ContainerId, java.lang.Boolean])
 
-  // allocatedRackCount is populated ONLY if allocation happens (or 
decremented if this is an
-  // allocated node)
-  // As with the two data structures above, tightly coupled with them, and to 
be locked on
-  // allocatedHostToContainersMap
-  private val allocatedRackCount = new HashMap[String, Int]()
+  @volatile private var numExecutorsRunning = 0
+  // Used to generate a unique ID per executor
+  private var executorIdCounter = 0
+  @volatile private var numExecutorsFailed = 0
 
-  // Containers to be released in next request to RM
-  private val releasedContainers = new ConcurrentHashMap[ContainerId, Boolean]
-
-  // Number of container requests that have been sent to, but not yet 
allocated by the
-  // ApplicationMaster.
-  private val numPendingAllocate = new AtomicInteger()
-  private val numExecutorsRunning = new AtomicInteger()
-  // Used to generate a unique id per executor
-  private val executorIdCounter = new AtomicInteger()
-  private val numExecutorsFailed = new AtomicInteger()
-
-  private var maxExecutors = args.numExecutors
+  @volatile private var maxExecutors = args.numExecutors
 
   // Keep track of which container is running which executor to remove the 
executors later
   private val executorIdToContainer = new HashMap[String, Container]
 
+  // Executor memory in MB.
   protected val executorMemory = args.executorMemory
-  protected val executorCores = args.executorCores
-  protected val (preferredHostToCount, preferredRackToCount) =
-    generateNodeToWeight(conf, preferredNodes)
-
-  // Additional memory overhead - in mb.
+  // Additional memory overhead.
   protected val memoryOverhead: Int = 
sparkConf.getInt("spark.yarn.executor.memoryOverhead",
     math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, 
MEMORY_OVERHEAD_MIN))
+  // Number of cores per executor.
+  protected val executorCores = args.executorCores
+  // Resource capability requested for each executors
+  private val resource = Resource.newInstance(executorMemory + memoryOverhead, 
executorCores)
 
   private val launcherPool = new ThreadPoolExecutor(
     // max pool size of Integer.MAX_VALUE is ignored because we use an 
unbounded queue
@@ -115,26 +100,34 @@ private[yarn] class YarnAllocator(
     new ThreadFactoryBuilder().setNameFormat("ContainerLauncher 
#%d").setDaemon(true).build())
   launcherPool.allowCoreThreadTimeOut(true)
 
-  def getNumExecutorsRunning: Int = numExecutorsRunning.intValue
+  private val driverUrl = "akka.tcp://sparkDriver@%s:%s/user/%s".format(
+    sparkConf.get("spark.driver.host"),
+    sparkConf.get("spark.driver.port"),
+    CoarseGrainedSchedulerBackend.ACTOR_NAME)
+
+  // For testing
+  private val launchContainers = 
sparkConf.getBoolean("spark.yarn.launchContainers", true)
 
-  def getNumExecutorsFailed: Int = numExecutorsFailed.intValue
+  def getNumExecutorsRunning: Int = numExecutorsRunning
+
+  def getNumExecutorsFailed: Int = numExecutorsFailed
+
+  /**
+   * Number of container requests that have not yet been fulfilled.
+   */
+  def getNumPendingAllocate: Int = getNumPendingAtLocation(ANY_HOST)
+
+  /**
+   * Number of container requests at the given location that have not yet been 
fulfilled.
+   */
+  private def getNumPendingAtLocation(location: String): Int =
+    amClient.getMatchingRequests(RM_REQUEST_PRIORITY, location, 
resource).map(_.size).sum
 
   /**
    * Request as many executors from the ResourceManager as needed to reach the 
desired total.
-   * This takes into account executors already running or pending.
    */
   def requestTotalExecutors(requestedTotal: Int): Unit = synchronized {
-    val currentTotal = numPendingAllocate.get + numExecutorsRunning.get
-    if (requestedTotal > currentTotal) {
-      maxExecutors += (requestedTotal - currentTotal)
-      // We need to call `allocateResources` here to avoid the following race 
condition:
-      // If we request executors twice before `allocateResources` is called, 
then we will end up
-      // double counting the number requested because `numPendingAllocate` is 
not updated yet.
-      allocateResources()
-    } else {
-      logInfo(s"Not allocating more executors because there are already 
$currentTotal " +
-        s"(application requested $requestedTotal total)")
-    }
+    maxExecutors = requestedTotal
   }
 
   /**
@@ -144,7 +137,7 @@ private[yarn] class YarnAllocator(
     if (executorIdToContainer.contains(executorId)) {
       val container = executorIdToContainer.remove(executorId).get
       internalReleaseContainer(container)
-      numExecutorsRunning.decrementAndGet()
+      numExecutorsRunning -= 1
       maxExecutors -= 1
       assert(maxExecutors >= 0, "Allocator killed more executors than are 
allocated!")
     } else {
@@ -153,498 +146,236 @@ private[yarn] class YarnAllocator(
   }
 
   /**
-   * Allocate missing containers based on the number of executors currently 
pending and running.
+   * Request resources such that, if YARN gives us all we ask for, we'll have 
a number of containers
+   * equal to maxExecutors.
    *
-   * This method prioritizes the allocated container responses from the RM 
based on node and
-   * rack locality. Additionally, it releases any extra containers allocated 
for this application
-   * but are not needed. This must be synchronized because variables read in 
this block are
-   * mutated by other methods.
+   * Deal with any containers YARN has granted to us by possibly launching 
executors in them.
+   *
+   * This must be synchronized because variables read in this method are 
mutated by other methods.
    */
   def allocateResources(): Unit = synchronized {
-    val missing = maxExecutors - numPendingAllocate.get() - 
numExecutorsRunning.get()
+    val numPendingAllocate = getNumPendingAllocate
+    val missing = maxExecutors - numPendingAllocate - numExecutorsRunning
 
     if (missing > 0) {
-      val totalExecutorMemory = executorMemory + memoryOverhead
-      numPendingAllocate.addAndGet(missing)
-      logInfo(s"Will allocate $missing executor containers, each with 
$totalExecutorMemory MB " +
-        s"memory including $memoryOverhead MB overhead")
-    } else {
-      logDebug("Empty allocation request ...")
+      logInfo(s"Will request $missing executor containers, each with 
${resource.getVirtualCores} " +
+        s"cores and ${resource.getMemory} MB memory including $memoryOverhead 
MB overhead")
     }
 
-    val allocateResponse = allocateContainers(missing)
+    addResourceRequests(missing)
+    val progressIndicator = 0.1f
+    // Poll the ResourceManager. This doubles as a heartbeat if there are no 
pending container
+    // requests.
+    val allocateResponse = amClient.allocate(progressIndicator)
+
     val allocatedContainers = allocateResponse.getAllocatedContainers()
 
     if (allocatedContainers.size > 0) {
-      var numPendingAllocateNow = numPendingAllocate.addAndGet(-1 * 
allocatedContainers.size)
-
-      if (numPendingAllocateNow < 0) {
-        numPendingAllocateNow = numPendingAllocate.addAndGet(-1 * 
numPendingAllocateNow)
-      }
-
-      logDebug("""
-        Allocated containers: %d
-        Current executor count: %d
-        Containers released: %s
-        Cluster resources: %s
-        """.format(
+      logDebug("Allocated containers: %d. Current executor count: %d. Cluster 
resources: %s."
+        .format(
           allocatedContainers.size,
-          numExecutorsRunning.get(),
-          releasedContainers,
+          numExecutorsRunning,
           allocateResponse.getAvailableResources))
 
-      val hostToContainers = new HashMap[String, ArrayBuffer[Container]]()
-
-      for (container <- allocatedContainers) {
-        if (isResourceConstraintSatisfied(container)) {
-          // Add the accepted `container` to the host's list of already 
accepted,
-          // allocated containers
-          val host = container.getNodeId.getHost
-          val containersForHost = hostToContainers.getOrElseUpdate(host,
-            new ArrayBuffer[Container]())
-          containersForHost += container
-        } else {
-          // Release container, since it doesn't satisfy resource constraints.
-          internalReleaseContainer(container)
-        }
-      }
-
-       // Find the appropriate containers to use.
-      // TODO: Cleanup this group-by...
-      val dataLocalContainers = new HashMap[String, ArrayBuffer[Container]]()
-      val rackLocalContainers = new HashMap[String, ArrayBuffer[Container]]()
-      val offRackContainers = new HashMap[String, ArrayBuffer[Container]]()
-
-      for (candidateHost <- hostToContainers.keySet) {
-        val maxExpectedHostCount = 
preferredHostToCount.getOrElse(candidateHost, 0)
-        val requiredHostCount = maxExpectedHostCount - 
allocatedContainersOnHost(candidateHost)
-
-        val remainingContainersOpt = hostToContainers.get(candidateHost)
-        assert(remainingContainersOpt.isDefined)
-        var remainingContainers = remainingContainersOpt.get
-
-        if (requiredHostCount >= remainingContainers.size) {
-          // Since we have <= required containers, add all remaining 
containers to
-          // `dataLocalContainers`.
-          dataLocalContainers.put(candidateHost, remainingContainers)
-          // There are no more free containers remaining.
-          remainingContainers = null
-        } else if (requiredHostCount > 0) {
-          // Container list has more containers than we need for data locality.
-          // Split the list into two: one based on the data local container 
count,
-          // (`remainingContainers.size` - `requiredHostCount`), and the other 
to hold remaining
-          // containers.
-          val (dataLocal, remaining) = remainingContainers.splitAt(
-            remainingContainers.size - requiredHostCount)
-          dataLocalContainers.put(candidateHost, dataLocal)
-
-          // Invariant: remainingContainers == remaining
-
-          // YARN has a nasty habit of allocating a ton of containers on a 
host - discourage this.
-          // Add each container in `remaining` to list of containers to 
release. If we have an
-          // insufficient number of containers, then the next allocation cycle 
will reallocate
-          // (but won't treat it as data local).
-          // TODO(harvey): Rephrase this comment some more.
-          for (container <- remaining) internalReleaseContainer(container)
-          remainingContainers = null
-        }
-
-        // For rack local containers
-        if (remainingContainers != null) {
-          val rack = YarnSparkHadoopUtil.lookupRack(conf, candidateHost)
-          if (rack != null) {
-            val maxExpectedRackCount = preferredRackToCount.getOrElse(rack, 0)
-            val requiredRackCount = maxExpectedRackCount - 
allocatedContainersOnRack(rack) -
-              rackLocalContainers.getOrElse(rack, List()).size
-
-            if (requiredRackCount >= remainingContainers.size) {
-              // Add all remaining containers to to `dataLocalContainers`.
-              dataLocalContainers.put(rack, remainingContainers)
-              remainingContainers = null
-            } else if (requiredRackCount > 0) {
-              // Container list has more containers that we need for data 
locality.
-              // Split the list into two: one based on the data local 
container count,
-              // (`remainingContainers.size` - `requiredHostCount`), and the 
other to hold remaining
-              // containers.
-              val (rackLocal, remaining) = remainingContainers.splitAt(
-                remainingContainers.size - requiredRackCount)
-              val existingRackLocal = rackLocalContainers.getOrElseUpdate(rack,
-                new ArrayBuffer[Container]())
-
-              existingRackLocal ++= rackLocal
-
-              remainingContainers = remaining
-            }
-          }
-        }
-
-        if (remainingContainers != null) {
-          // Not all containers have been consumed - add them to the list of 
off-rack containers.
-          offRackContainers.put(candidateHost, remainingContainers)
-        }
-      }
-
-      // Now that we have split the containers into various groups, go through 
them in order:
-      // first host-local, then rack-local, and finally off-rack.
-      // Note that the list we create below tries to ensure that not all 
containers end up within
-      // a host if there is a sufficiently large number of hosts/containers.
-      val allocatedContainersToProcess = new 
ArrayBuffer[Container](allocatedContainers.size)
-      allocatedContainersToProcess ++= 
TaskSchedulerImpl.prioritizeContainers(dataLocalContainers)
-      allocatedContainersToProcess ++= 
TaskSchedulerImpl.prioritizeContainers(rackLocalContainers)
-      allocatedContainersToProcess ++= 
TaskSchedulerImpl.prioritizeContainers(offRackContainers)
-
-      // Run each of the allocated containers.
-      for (container <- allocatedContainersToProcess) {
-        val numExecutorsRunningNow = numExecutorsRunning.incrementAndGet()
-        val executorHostname = container.getNodeId.getHost
-        val containerId = container.getId
-
-        val executorMemoryOverhead = (executorMemory + memoryOverhead)
-        assert(container.getResource.getMemory >= executorMemoryOverhead)
-
-        if (numExecutorsRunningNow > maxExecutors) {
-          logInfo("""Ignoring container %s at host %s, since we already have 
the required number of
-            containers for it.""".format(containerId, executorHostname))
-          internalReleaseContainer(container)
-          numExecutorsRunning.decrementAndGet()
-        } else {
-          val executorId = executorIdCounter.incrementAndGet().toString
-          val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
-            SparkEnv.driverActorSystemName,
-            sparkConf.get("spark.driver.host"),
-            sparkConf.get("spark.driver.port"),
-            CoarseGrainedSchedulerBackend.ACTOR_NAME)
-
-          logInfo("Launching container %s for on host %s".format(containerId, 
executorHostname))
-          executorIdToContainer(executorId) = container
-
-          // To be safe, remove the container from `releasedContainers`.
-          releasedContainers.remove(containerId)
-
-          val rack = YarnSparkHadoopUtil.lookupRack(conf, executorHostname)
-          allocatedHostToContainersMap.synchronized {
-            val containerSet = 
allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
-              new HashSet[ContainerId]())
-
-            containerSet += containerId
-            allocatedContainerToHostMap.put(containerId, executorHostname)
-
-            if (rack != null) {
-              allocatedRackCount.put(rack, allocatedRackCount.getOrElse(rack, 
0) + 1)
-            }
-          }
-          logInfo("Launching ExecutorRunnable. driverUrl: %s,  
executorHostname: %s".format(
-            driverUrl, executorHostname))
-          val executorRunnable = new ExecutorRunnable(
-            container,
-            conf,
-            sparkConf,
-            driverUrl,
-            executorId,
-            executorHostname,
-            executorMemory,
-            executorCores,
-            appAttemptId.getApplicationId.toString,
-            securityMgr)
-          launcherPool.execute(executorRunnable)
-        }
-      }
-      logDebug("""
-        Finished allocating %s containers (from %s originally).
-        Current number of executors running: %d,
-        Released containers: %s
-        """.format(
-          allocatedContainersToProcess,
-          allocatedContainers,
-          numExecutorsRunning.get(),
-          releasedContainers))
+      handleAllocatedContainers(allocatedContainers)
     }
 
     val completedContainers = allocateResponse.getCompletedContainersStatuses()
     if (completedContainers.size > 0) {
       logDebug("Completed %d containers".format(completedContainers.size))
 
-      for (completedContainer <- completedContainers) {
-        val containerId = completedContainer.getContainerId
-
-        if (releasedContainers.containsKey(containerId)) {
-          // Already marked the container for release, so remove it from
-          // `releasedContainers`.
-          releasedContainers.remove(containerId)
-        } else {
-          // Decrement the number of executors running. The next iteration of
-          // the ApplicationMaster's reporting thread will take care of 
allocating.
-          numExecutorsRunning.decrementAndGet()
-          logInfo("Completed container %s (state: %s, exit status: %s)".format(
-            containerId,
-            completedContainer.getState,
-            completedContainer.getExitStatus))
-          // Hadoop 2.2.X added a ContainerExitStatus we should switch to use
-          // there are some exit status' we shouldn't necessarily count 
against us, but for
-          // now I think its ok as none of the containers are expected to exit
-          if (completedContainer.getExitStatus == -103) { // vmem limit 
exceeded
-            logWarning(memLimitExceededLogMessage(
-              completedContainer.getDiagnostics,
-              VMEM_EXCEEDED_PATTERN))
-          } else if (completedContainer.getExitStatus == -104) { // pmem limit 
exceeded
-            logWarning(memLimitExceededLogMessage(
-              completedContainer.getDiagnostics,
-              PMEM_EXCEEDED_PATTERN))
-          } else if (completedContainer.getExitStatus != 0) {
-            logInfo("Container marked as failed: " + containerId +
-              ". Exit status: " + completedContainer.getExitStatus +
-              ". Diagnostics: " + completedContainer.getDiagnostics)
-            numExecutorsFailed.incrementAndGet()
-          }
-        }
+      processCompletedContainers(completedContainers)
 
-        allocatedHostToContainersMap.synchronized {
-          if (allocatedContainerToHostMap.containsKey(containerId)) {
-            val hostOpt = allocatedContainerToHostMap.get(containerId)
-            assert(hostOpt.isDefined)
-            val host = hostOpt.get
-
-            val containerSetOpt = allocatedHostToContainersMap.get(host)
-            assert(containerSetOpt.isDefined)
-            val containerSet = containerSetOpt.get
-
-            containerSet.remove(containerId)
-            if (containerSet.isEmpty) {
-              allocatedHostToContainersMap.remove(host)
-            } else {
-              allocatedHostToContainersMap.update(host, containerSet)
-            }
-
-            allocatedContainerToHostMap.remove(containerId)
-
-            // TODO: Move this part outside the synchronized block?
-            val rack = YarnSparkHadoopUtil.lookupRack(conf, host)
-            if (rack != null) {
-              val rackCount = allocatedRackCount.getOrElse(rack, 0) - 1
-              if (rackCount > 0) {
-                allocatedRackCount.put(rack, rackCount)
-              } else {
-                allocatedRackCount.remove(rack)
-              }
-            }
-          }
-        }
-      }
-      logDebug("""
-        Finished processing %d completed containers.
-        Current number of executors running: %d,
-        Released containers: %s
-        """.format(
-          completedContainers.size,
-          numExecutorsRunning.get(),
-          releasedContainers))
+      logDebug("Finished processing %d completed containers. Current running 
executor count: %d."
+        .format(completedContainers.size, numExecutorsRunning))
     }
   }
 
-  private def allocatedContainersOnHost(host: String): Int = {
-    allocatedHostToContainersMap.synchronized {
-     allocatedHostToContainersMap.getOrElse(host, Set()).size
+  /**
+   * Request numExecutors additional containers from YARN. Visible for testing.
+   */
+  def addResourceRequests(numExecutors: Int): Unit = {
+    for (i <- 0 until numExecutors) {
+      val request = new ContainerRequest(resource, null, null, 
RM_REQUEST_PRIORITY)
+      amClient.addContainerRequest(request)
+      val nodes = request.getNodes
+      val hostStr = if (nodes == null || nodes.isEmpty) "Any" else nodes.last
+      logInfo("Container request (host: %s, capability: %s".format(hostStr, 
resource))
     }
   }
 
-  private def allocatedContainersOnRack(rack: String): Int = {
-    allocatedHostToContainersMap.synchronized {
-      allocatedRackCount.getOrElse(rack, 0)
+  /**
+   * Handle containers granted by the RM by launching executors on them.
+   *
+   * Due to the way the YARN allocation protocol works, certain healthy race 
conditions can result
+   * in YARN granting containers that we no longer need. In this case, we 
release them.
+   *
+   * Visible for testing.
+   */
+  def handleAllocatedContainers(allocatedContainers: Seq[Container]): Unit = {
+    val containersToUse = new ArrayBuffer[Container](allocatedContainers.size)
+
+    // Match incoming requests by host
+    val remainingAfterHostMatches = new ArrayBuffer[Container]
+    for (allocatedContainer <- allocatedContainers) {
+      matchContainerToRequest(allocatedContainer, 
allocatedContainer.getNodeId.getHost,
+        containersToUse, remainingAfterHostMatches)
     }
-  }
-
-  private def isResourceConstraintSatisfied(container: Container): Boolean = {
-    container.getResource.getMemory >= (executorMemory + memoryOverhead)
-  }
 
-  // A simple method to copy the split info map.
-  private def generateNodeToWeight(
-      conf: Configuration,
-      input: collection.Map[String, collection.Set[SplitInfo]])
-    : (Map[String, Int], Map[String, Int]) = {
-    if (input == null) {
-      return (Map[String, Int](), Map[String, Int]())
+    // Match remaining by rack
+    val remainingAfterRackMatches = new ArrayBuffer[Container]
+    for (allocatedContainer <- remainingAfterHostMatches) {
+      val rack = RackResolver.resolve(conf, 
allocatedContainer.getNodeId.getHost).getNetworkLocation
+      matchContainerToRequest(allocatedContainer, rack, containersToUse,
+        remainingAfterRackMatches)
     }
 
-    val hostToCount = new HashMap[String, Int]
-    val rackToCount = new HashMap[String, Int]
-
-    for ((host, splits) <- input) {
-      val hostCount = hostToCount.getOrElse(host, 0)
-      hostToCount.put(host, hostCount + splits.size)
+    // Assign remaining that are neither node-local nor rack-local
+    val remainingAfterOffRackMatches = new ArrayBuffer[Container]
+    for (allocatedContainer <- remainingAfterRackMatches) {
+      matchContainerToRequest(allocatedContainer, ANY_HOST, containersToUse,
+        remainingAfterOffRackMatches)
+    }
 
-      val rack = YarnSparkHadoopUtil.lookupRack(conf, host)
-      if (rack != null) {
-        val rackCount = rackToCount.getOrElse(host, 0)
-        rackToCount.put(host, rackCount + splits.size)
+    if (!remainingAfterOffRackMatches.isEmpty) {
+      logDebug(s"Releasing ${remainingAfterOffRackMatches.size} unneeded 
containers that were " +
+        s"allocated to us")
+      for (container <- remainingAfterOffRackMatches) {
+        internalReleaseContainer(container)
       }
     }
 
-    (hostToCount.toMap, rackToCount.toMap)
-  }
+    runAllocatedContainers(containersToUse)
 
-  private def internalReleaseContainer(container: Container): Unit = {
-    releasedContainers.put(container.getId(), true)
-    amClient.releaseAssignedContainer(container.getId())
+    logInfo("Received %d containers from YARN, launching executors on %d of 
them."
+      .format(allocatedContainers.size, containersToUse.size))
   }
 
   /**
-   * Called to allocate containers in the cluster.
+   * Looks for requests for the given location that match the given container 
allocation. If it
+   * finds one, removes the request so that it won't be submitted again. 
Places the container into
+   * containersToUse or remaining.
    *
-   * @param count Number of containers to allocate.
-   *              If zero, should still contact RM (as a heartbeat).
-   * @return Response to the allocation request.
+   * @param allocatedContainer container that was given to us by YARN
+   * @location resource name, either a node, rack, or *
+   * @param containersToUse list of containers that will be used
+   * @param remaining list of containers that will not be used
    */
-  private def allocateContainers(count: Int): AllocateResponse = {
-    addResourceRequests(count)
-
-    // We have already set the container request. Poll the ResourceManager for 
a response.
-    // This doubles as a heartbeat if there are no pending container requests.
-    val progressIndicator = 0.1f
-    amClient.allocate(progressIndicator)
+  private def matchContainerToRequest(
+      allocatedContainer: Container,
+      location: String,
+      containersToUse: ArrayBuffer[Container],
+      remaining: ArrayBuffer[Container]): Unit = {
+    val matchingRequests = 
amClient.getMatchingRequests(allocatedContainer.getPriority, location,
+      allocatedContainer.getResource)
+
+    // Match the allocation to a request
+    if (!matchingRequests.isEmpty) {
+      val containerRequest = matchingRequests.get(0).iterator.next
+      amClient.removeContainerRequest(containerRequest)
+      containersToUse += allocatedContainer
+    } else {
+      remaining += allocatedContainer
+    }
   }
 
-  private def createRackResourceRequests(hostContainers: 
ArrayBuffer[ContainerRequest])
-    : ArrayBuffer[ContainerRequest] = {
-    // Generate modified racks and new set of hosts under it before issuing 
requests.
-    val rackToCounts = new HashMap[String, Int]()
-
-    for (container <- hostContainers) {
-      val candidateHost = container.getNodes.last
-      assert(YarnSparkHadoopUtil.ANY_HOST != candidateHost)
-
-      val rack = YarnSparkHadoopUtil.lookupRack(conf, candidateHost)
-      if (rack != null) {
-        var count = rackToCounts.getOrElse(rack, 0)
-        count += 1
-        rackToCounts.put(rack, count)
+  /**
+   * Launches executors in the allocated containers.
+   */
+  private def runAllocatedContainers(containersToUse: ArrayBuffer[Container]): 
Unit = {
+    for (container <- containersToUse) {
+      numExecutorsRunning += 1
+      assert(numExecutorsRunning <= maxExecutors)
+      val executorHostname = container.getNodeId.getHost
+      val containerId = container.getId
+      executorIdCounter += 1
+      val executorId = executorIdCounter.toString
+
+      assert(container.getResource.getMemory >= resource.getMemory)
+
+      logInfo("Launching container %s for on host %s".format(containerId, 
executorHostname))
+
+      val containerSet = 
allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
+        new HashSet[ContainerId])
+
+      containerSet += containerId
+      allocatedContainerToHostMap.put(containerId, executorHostname)
+
+      val executorRunnable = new ExecutorRunnable(
+        container,
+        conf,
+        sparkConf,
+        driverUrl,
+        executorId,
+        executorHostname,
+        executorMemory,
+        executorCores,
+        appAttemptId.getApplicationId.toString,
+        securityMgr)
+      if (launchContainers) {
+        logInfo("Launching ExecutorRunnable. driverUrl: %s,  executorHostname: 
%s".format(
+          driverUrl, executorHostname))
+        launcherPool.execute(executorRunnable)
       }
     }
-
-    val requestedContainers = new 
ArrayBuffer[ContainerRequest](rackToCounts.size)
-    for ((rack, count) <- rackToCounts) {
-      requestedContainers ++= createResourceRequests(
-        AllocationType.RACK,
-        rack,
-        count,
-        RM_REQUEST_PRIORITY)
-    }
-
-    requestedContainers
   }
 
-  private def addResourceRequests(numExecutors: Int): Unit = {
-    val containerRequests: List[ContainerRequest] =
-      if (numExecutors <= 0) {
-        logDebug("numExecutors: " + numExecutors)
-        List()
-      } else if (preferredHostToCount.isEmpty) {
-        logDebug("host preferences is empty")
-        createResourceRequests(
-          AllocationType.ANY,
-          resource = null,
-          numExecutors,
-          RM_REQUEST_PRIORITY).toList
+  private def processCompletedContainers(completedContainers: 
Seq[ContainerStatus]): Unit = {
+    for (completedContainer <- completedContainers) {
+      val containerId = completedContainer.getContainerId
+
+      if (releasedContainers.contains(containerId)) {
+        // Already marked the container for release, so remove it from
+        // `releasedContainers`.
+        releasedContainers.remove(containerId)
       } else {
-        // Request for all hosts in preferred nodes and for numExecutors -
-        // candidates.size, request by default allocation policy.
-        val hostContainerRequests = new 
ArrayBuffer[ContainerRequest](preferredHostToCount.size)
-        for ((candidateHost, candidateCount) <- preferredHostToCount) {
-          val requiredCount = candidateCount - 
allocatedContainersOnHost(candidateHost)
-
-          if (requiredCount > 0) {
-            hostContainerRequests ++= createResourceRequests(
-              AllocationType.HOST,
-              candidateHost,
-              requiredCount,
-              RM_REQUEST_PRIORITY)
-          }
+        // Decrement the number of executors running. The next iteration of
+        // the ApplicationMaster's reporting thread will take care of 
allocating.
+        numExecutorsRunning -= 1
+        logInfo("Completed container %s (state: %s, exit status: %s)".format(
+          containerId,
+          completedContainer.getState,
+          completedContainer.getExitStatus))
+        // Hadoop 2.2.X added a ContainerExitStatus we should switch to use
+        // there are some exit status' we shouldn't necessarily count against 
us, but for
+        // now I think its ok as none of the containers are expected to exit
+        if (completedContainer.getExitStatus == -103) { // vmem limit exceeded
+          logWarning(memLimitExceededLogMessage(
+            completedContainer.getDiagnostics,
+            VMEM_EXCEEDED_PATTERN))
+        } else if (completedContainer.getExitStatus == -104) { // pmem limit 
exceeded
+          logWarning(memLimitExceededLogMessage(
+            completedContainer.getDiagnostics,
+            PMEM_EXCEEDED_PATTERN))
+        } else if (completedContainer.getExitStatus != 0) {
+          logInfo("Container marked as failed: " + containerId +
+            ". Exit status: " + completedContainer.getExitStatus +
+            ". Diagnostics: " + completedContainer.getDiagnostics)
+          numExecutorsFailed += 1
         }
-        val rackContainerRequests: List[ContainerRequest] = 
createRackResourceRequests(
-          hostContainerRequests).toList
-
-        val anyContainerRequests = createResourceRequests(
-          AllocationType.ANY,
-          resource = null,
-          numExecutors,
-          RM_REQUEST_PRIORITY)
-
-        val containerRequestBuffer = new ArrayBuffer[ContainerRequest](
-          hostContainerRequests.size + rackContainerRequests.size + 
anyContainerRequests.size)
-
-        containerRequestBuffer ++= hostContainerRequests
-        containerRequestBuffer ++= rackContainerRequests
-        containerRequestBuffer ++= anyContainerRequests
-        containerRequestBuffer.toList
       }
 
-    for (request <- containerRequests) {
-      amClient.addContainerRequest(request)
-    }
+      allocatedHostToContainersMap.synchronized {
+        if (allocatedContainerToHostMap.containsKey(containerId)) {
+          val host = allocatedContainerToHostMap.get(containerId).get
+          val containerSet = allocatedHostToContainersMap.get(host).get
 
-    for (request <- containerRequests) {
-      val nodes = request.getNodes
-      val hostStr = if (nodes == null || nodes.isEmpty) {
-        "Any"
-      } else {
-        nodes.last
-      }
-      logInfo("Container request (host: %s, priority: %s, capability: 
%s".format(
-        hostStr,
-        request.getPriority().getPriority,
-        request.getCapability))
-    }
-  }
+          containerSet.remove(containerId)
+          if (containerSet.isEmpty) {
+            allocatedHostToContainersMap.remove(host)
+          } else {
+            allocatedHostToContainersMap.update(host, containerSet)
+          }
 
-  private def createResourceRequests(
-      requestType: AllocationType.AllocationType,
-      resource: String,
-      numExecutors: Int,
-      priority: Int): ArrayBuffer[ContainerRequest] = {
-    // If hostname is specified, then we need at least two requests - node 
local and rack local.
-    // There must be a third request, which is ANY. That will be specially 
handled.
-    requestType match {
-      case AllocationType.HOST => {
-        assert(YarnSparkHadoopUtil.ANY_HOST != resource)
-        val hostname = resource
-        val nodeLocal = constructContainerRequests(
-          Array(hostname),
-          racks = null,
-          numExecutors,
-          priority)
-
-        // Add `hostname` to the global (singleton) host->rack mapping in 
YarnAllocationHandler.
-        YarnSparkHadoopUtil.populateRackInfo(conf, hostname)
-        nodeLocal
-      }
-      case AllocationType.RACK => {
-        val rack = resource
-        constructContainerRequests(hosts = null, Array(rack), numExecutors, 
priority)
+          allocatedContainerToHostMap.remove(containerId)
+        }
       }
-      case AllocationType.ANY => constructContainerRequests(
-        hosts = null, racks = null, numExecutors, priority)
-      case _ => throw new IllegalArgumentException(
-        "Unexpected/unsupported request type: " + requestType)
     }
   }
 
-  private def constructContainerRequests(
-      hosts: Array[String],
-      racks: Array[String],
-      numExecutors: Int,
-      priority: Int
-    ): ArrayBuffer[ContainerRequest] = {
-    val memoryRequest = executorMemory + memoryOverhead
-    val resource = Resource.newInstance(memoryRequest, executorCores)
-
-    val prioritySetting = Records.newRecord(classOf[Priority])
-    prioritySetting.setPriority(priority)
-
-    val requests = new ArrayBuffer[ContainerRequest]()
-    for (i <- 0 until numExecutors) {
-      requests += new ContainerRequest(resource, hosts, racks, prioritySetting)
-    }
-    requests
+  private def internalReleaseContainer(container: Container): Unit = {
+    releasedContainers.add(container.getId())
+    amClient.releaseAssignedContainer(container.getId())
   }
 
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/2eeada37/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
index b45e599..b134751 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
@@ -72,8 +72,7 @@ private[spark] class YarnRMClient(args: 
ApplicationMasterArguments) extends Logg
       amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress)
       registered = true
     }
-    new YarnAllocator(conf, sparkConf, amClient, getAttemptId(), args,
-      preferredNodeLocations, securityMgr)
+    new YarnAllocator(conf, sparkConf, amClient, getAttemptId(), args, 
securityMgr)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/2eeada37/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index d7cf904..4bff846 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -31,7 +31,7 @@ import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.security.Credentials
 import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.api.records.ApplicationAccessType
+import org.apache.hadoop.yarn.api.records.{Priority, ApplicationAccessType}
 import org.apache.hadoop.yarn.util.RackResolver
 import org.apache.hadoop.conf.Configuration
 
@@ -99,13 +99,7 @@ object YarnSparkHadoopUtil {
 
   // All RM requests are issued with same priority : we do not (yet) have any 
distinction between
   // request types (like map/reduce in hadoop for example)
-  val RM_REQUEST_PRIORITY = 1
-
-  // Host to rack map - saved from allocation requests. We are expecting this 
not to change.
-  // Note that it is possible for this to change : and ResourceManager will 
indicate that to us via
-  // update response to allocate. But we are punting on handling that for now.
-  private val hostToRack = new ConcurrentHashMap[String, String]()
-  private val rackToHostSet = new ConcurrentHashMap[String, JSet[String]]()
+  val RM_REQUEST_PRIORITY = Priority.newInstance(1)
 
   /**
    * Add a path variable to the given environment map.
@@ -184,37 +178,6 @@ object YarnSparkHadoopUtil {
     }
   }
 
-  def lookupRack(conf: Configuration, host: String): String = {
-    if (!hostToRack.contains(host)) {
-      populateRackInfo(conf, host)
-    }
-    hostToRack.get(host)
-  }
-
-  def populateRackInfo(conf: Configuration, hostname: String) {
-    Utils.checkHost(hostname)
-
-    if (!hostToRack.containsKey(hostname)) {
-      // If there are repeated failures to resolve, all to an ignore list.
-      val rackInfo = RackResolver.resolve(conf, hostname)
-      if (rackInfo != null && rackInfo.getNetworkLocation != null) {
-        val rack = rackInfo.getNetworkLocation
-        hostToRack.put(hostname, rack)
-        if (! rackToHostSet.containsKey(rack)) {
-          rackToHostSet.putIfAbsent(rack,
-            Collections.newSetFromMap(new ConcurrentHashMap[String, 
JBoolean]()))
-        }
-        rackToHostSet.get(rack).add(hostname)
-
-        // TODO(harvey): Figure out what this comment means...
-        // Since RackResolver caches, we are disabling this for now ...
-      } /* else {
-        // right ? Else we will keep calling rack resolver in case we cant 
resolve rack info ...
-        hostToRack.put(hostname, null)
-      } */
-    }
-  }
-
   def getApplicationAclsForYarn(securityMgr: SecurityManager)
       : Map[ApplicationAccessType, String] = {
     Map[ApplicationAccessType, String] (

http://git-wip-us.apache.org/repos/asf/spark/blob/2eeada37/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
 
b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
index 254774a..2fa24cc 100644
--- 
a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
+++ 
b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
@@ -17,8 +17,9 @@
 
 package org.apache.spark.scheduler.cluster
 
+import org.apache.hadoop.yarn.util.RackResolver
+
 import org.apache.spark._
-import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
 import org.apache.spark.scheduler.TaskSchedulerImpl
 import org.apache.spark.util.Utils
 
@@ -30,6 +31,6 @@ private[spark] class YarnClientClusterScheduler(sc: 
SparkContext) extends TaskSc
   // By default, rack is unknown
   override def getRackForHost(hostPort: String): Option[String] = {
     val host = Utils.parseHostPort(hostPort)._1
-    Option(YarnSparkHadoopUtil.lookupRack(sc.hadoopConfiguration, host))
+    Option(RackResolver.resolve(sc.hadoopConfiguration, 
host).getNetworkLocation)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/2eeada37/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
 
b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
index 4157ff9..be55d26 100644
--- 
a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
+++ 
b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
@@ -17,8 +17,10 @@
 
 package org.apache.spark.scheduler.cluster
 
+import org.apache.hadoop.yarn.util.RackResolver
+
 import org.apache.spark._
-import org.apache.spark.deploy.yarn.{ApplicationMaster, YarnSparkHadoopUtil}
+import org.apache.spark.deploy.yarn.ApplicationMaster
 import org.apache.spark.scheduler.TaskSchedulerImpl
 import org.apache.spark.util.Utils
 
@@ -39,7 +41,7 @@ private[spark] class YarnClusterScheduler(sc: SparkContext) 
extends TaskSchedule
   // By default, rack is unknown
   override def getRackForHost(hostPort: String): Option[String] = {
     val host = Utils.parseHostPort(hostPort)._1
-    Option(YarnSparkHadoopUtil.lookupRack(sc.hadoopConfiguration, host))
+    Option(RackResolver.resolve(sc.hadoopConfiguration, 
host).getNetworkLocation)
   }
 
   override def postStartHook() {

http://git-wip-us.apache.org/repos/asf/spark/blob/2eeada37/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala 
b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index 8d184a0..024b25f 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -17,18 +17,160 @@
 
 package org.apache.spark.deploy.yarn
 
+import java.util.{Arrays, List => JList}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic
+import org.apache.hadoop.net.DNSToSwitchMapping
+import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.client.api.AMRMClient
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
+
+import org.apache.spark.SecurityManager
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
 import org.apache.spark.deploy.yarn.YarnAllocator._
-import org.scalatest.FunSuite
+import org.apache.spark.scheduler.SplitInfo
+
+import org.scalatest.{BeforeAndAfterEach, FunSuite, Matchers}
+
+class MockResolver extends DNSToSwitchMapping {
+
+  override def resolve(names: JList[String]): JList[String] = {
+    if (names.size > 0 && names.get(0) == "host3") Arrays.asList("/rack2")
+    else Arrays.asList("/rack1")
+  }
+
+  override def reloadCachedMappings() {}
+
+  def reloadCachedMappings(names: JList[String]) {}
+}
+
+class YarnAllocatorSuite extends FunSuite with Matchers with 
BeforeAndAfterEach {
+  val conf = new Configuration()
+  conf.setClass(
+    CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
+    classOf[MockResolver], classOf[DNSToSwitchMapping])
+
+  val sparkConf = new SparkConf()
+  sparkConf.set("spark.driver.host", "localhost")
+  sparkConf.set("spark.driver.port", "4040")
+  sparkConf.set("spark.yarn.jar", "notarealjar.jar")
+  sparkConf.set("spark.yarn.launchContainers", "false")
+
+  val appAttemptId = 
ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 0), 0)
+
+  // Resource returned by YARN.  YARN can give larger containers than 
requested, so give 6 cores
+  // instead of the 5 requested and 3 GB instead of the 2 requested.
+  val containerResource = Resource.newInstance(3072, 6)
+
+  var rmClient: AMRMClient[ContainerRequest] = _
+
+  var containerNum = 0
+
+  override def beforeEach() {
+    rmClient = AMRMClient.createAMRMClient()
+    rmClient.init(conf)
+    rmClient.start()
+  }
+
+  override def afterEach() {
+    rmClient.stop()
+  }
+
+  class MockSplitInfo(host: String) extends SplitInfo(null, host, null, 1, 
null) {
+    override def equals(other: Any) = false
+  }
+
+  def createAllocator(maxExecutors: Int = 5): YarnAllocator = {
+    val args = Array(
+      "--num-executors", s"$maxExecutors",
+      "--executor-cores", "5",
+      "--executor-memory", "2048",
+      "--jar", "somejar.jar",
+      "--class", "SomeClass")
+    new YarnAllocator(
+      conf,
+      sparkConf,
+      rmClient,
+      appAttemptId,
+      new ApplicationMasterArguments(args),
+      new SecurityManager(sparkConf))
+  }
+
+  def createContainer(host: String): Container = {
+    val containerId = ContainerId.newInstance(appAttemptId, containerNum)
+    containerNum += 1
+    val nodeId = NodeId.newInstance(host, 1000)
+    Container.newInstance(containerId, nodeId, "", containerResource, 
RM_REQUEST_PRIORITY, null)
+  }
+
+  test("single container allocated") {
+    // request a single container and receive it
+    val handler = createAllocator()
+    handler.addResourceRequests(1)
+    handler.getNumExecutorsRunning should be (0)
+    handler.getNumPendingAllocate should be (1)
+
+    val container = createContainer("host1")
+    handler.handleAllocatedContainers(Array(container))
+
+    handler.getNumExecutorsRunning should be (1)
+    handler.allocatedContainerToHostMap.get(container.getId).get should be 
("host1")
+    handler.allocatedHostToContainersMap.get("host1").get should contain 
(container.getId)
+    rmClient.getMatchingRequests(container.getPriority, "host1", 
containerResource).size should be (0)
+  }
+
+  test("some containers allocated") {
+    // request a few containers and receive some of them
+    val handler = createAllocator()
+    handler.addResourceRequests(4)
+    handler.getNumExecutorsRunning should be (0)
+    handler.getNumPendingAllocate should be (4)
+
+    val container1 = createContainer("host1")
+    val container2 = createContainer("host1")
+    val container3 = createContainer("host2")
+    handler.handleAllocatedContainers(Array(container1, container2, 
container3))
+
+    handler.getNumExecutorsRunning should be (3)
+    handler.allocatedContainerToHostMap.get(container1.getId).get should be 
("host1")
+    handler.allocatedContainerToHostMap.get(container2.getId).get should be 
("host1")
+    handler.allocatedContainerToHostMap.get(container3.getId).get should be 
("host2")
+    handler.allocatedHostToContainersMap.get("host1").get should contain 
(container1.getId)
+    handler.allocatedHostToContainersMap.get("host1").get should contain 
(container2.getId)
+    handler.allocatedHostToContainersMap.get("host2").get should contain 
(container3.getId)
+  }
+
+  test("receive more containers than requested") {
+    val handler = createAllocator(2)
+    handler.addResourceRequests(2)
+    handler.getNumExecutorsRunning should be (0)
+    handler.getNumPendingAllocate should be (2)
+
+    val container1 = createContainer("host1")
+    val container2 = createContainer("host2")
+    val container3 = createContainer("host4")
+    handler.handleAllocatedContainers(Array(container1, container2, 
container3))
+
+    handler.getNumExecutorsRunning should be (2)
+    handler.allocatedContainerToHostMap.get(container1.getId).get should be 
("host1")
+    handler.allocatedContainerToHostMap.get(container2.getId).get should be 
("host2")
+    handler.allocatedContainerToHostMap.contains(container3.getId) should be 
(false)
+    handler.allocatedHostToContainersMap.get("host1").get should contain 
(container1.getId)
+    handler.allocatedHostToContainersMap.get("host2").get should contain 
(container2.getId)
+    handler.allocatedHostToContainersMap.contains("host4") should be (false)
+  }
 
-class YarnAllocatorSuite extends FunSuite {
   test("memory exceeded diagnostic regexes") {
     val diagnostics =
       "Container 
[pid=12465,containerID=container_1412887393566_0003_01_000002] is running " +
-      "beyond physical memory limits. Current usage: 2.1 MB of 2 GB physical 
memory used; " +
-      "5.8 GB of 4.2 GB virtual memory used. Killing container."
+        "beyond physical memory limits. Current usage: 2.1 MB of 2 GB physical 
memory used; " +
+        "5.8 GB of 4.2 GB virtual memory used. Killing container."
     val vmemMsg = memLimitExceededLogMessage(diagnostics, 
VMEM_EXCEEDED_PATTERN)
     val pmemMsg = memLimitExceededLogMessage(diagnostics, 
PMEM_EXCEEDED_PATTERN)
     assert(vmemMsg.contains("5.8 GB of 4.2 GB virtual memory used."))
     assert(pmemMsg.contains("2.1 MB of 2 GB physical memory used."))
   }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to