Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2169#discussion_r16897472
  
    --- Diff: 
yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala ---
    @@ -17,18 +17,430 @@
     
     package org.apache.spark.deploy.yarn
     
    +import java.util.{List => JList}
    +import java.util.concurrent.ConcurrentHashMap
    +import java.util.concurrent.atomic.AtomicInteger
    +
    +import scala.collection.JavaConversions._
    +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.yarn.api.records._
    +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse
    +
    +import org.apache.spark.{Logging, SparkConf, SparkEnv}
    +import org.apache.spark.scheduler.{SplitInfo, TaskSchedulerImpl}
    +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
    +
     object AllocationType extends Enumeration {
       type AllocationType = Value
       val HOST, RACK, ANY = Value
     }
     
    +// TODO:
    +// Too many params.
    +// Needs to be mt-safe
    +// Need to refactor this to make it 'cleaner' ... right now, all 
computation is reactive - should
    +// make it more proactive and decoupled.
    +
    +// Note that right now, we assume all node asks as uniform in terms of 
capabilities and priority
    +// Refer to 
http://developer.yahoo.com/blogs/hadoop/posts/2011/03/mapreduce-nextgen-scheduler/
 for
    +// more info on how we are requesting for containers.
    +
     /**
    - * Interface that defines a Yarn allocator.
    + * Common code for the Yarn container allocator. Contains all the 
version-agnostic code to
    + * manage container allocation for a running Spark application.
      */
    -trait YarnAllocator {
    +private[yarn] abstract class YarnAllocator(
    +    conf: Configuration,
    +    sparkConf: SparkConf,
    +    args: ApplicationMasterArguments,
    +    preferredNodes: collection.Map[String, collection.Set[SplitInfo]])
    +  extends Logging {
     
    -  def allocateResources(): Unit
    -  def getNumExecutorsFailed: Int
    -  def getNumExecutorsRunning: Int
    +  // These three are locked on allocatedHostToContainersMap. Complementary 
data structures
    +  // allocatedHostToContainersMap : containers which are running : host, 
Set<containerid>
    +  // allocatedContainerToHostMap: container to host mapping.
    +  private val allocatedHostToContainersMap =
    +    new HashMap[String, collection.mutable.Set[ContainerId]]()
     
    -}
    +  private val allocatedContainerToHostMap = new HashMap[ContainerId, 
String]()
    +
    +  // allocatedRackCount is populated ONLY if allocation happens (or 
decremented if this is an
    +  // allocated node)
    +  // As with the two data structures above, tightly coupled with them, and 
to be locked on
    +  // allocatedHostToContainersMap
    +  private val allocatedRackCount = new HashMap[String, Int]()
    +
    +  // Containers to be released in next request to RM
    +  private val releasedContainers = new ConcurrentHashMap[ContainerId, 
Boolean]
    +
    +  // Additional memory overhead - in mb.
    +  protected val memoryOverhead: Int = 
sparkConf.getInt("spark.yarn.executor.memoryOverhead",
    +    YarnSparkHadoopUtil.DEFAULT_MEMORY_OVERHEAD)
    +
    +  // Number of container requests that have been sent to, but not yet 
allocated by the
    +  // ApplicationMaster.
    +  private val numPendingAllocate = new AtomicInteger()
    +  private val numExecutorsRunning = new AtomicInteger()
    +  // Used to generate a unique id per executor
    +  private val executorIdCounter = new AtomicInteger()
    +  private val numExecutorsFailed = new AtomicInteger()
    +
    +  private val maxExecutors = args.numExecutors
    +
    +  protected val executorMemory = args.executorMemory
    +  protected val executorCores = args.executorCores
    +  protected val (preferredHostToCount, preferredRackToCount) =
    +    generateNodeToWeight(conf, preferredNodes)
    +
    +  def getNumExecutorsRunning: Int = numExecutorsRunning.intValue
    +
    +  def getNumExecutorsFailed: Int = numExecutorsFailed.intValue
    +
    +  def allocateResources() = {
    +    val missing = maxExecutors - numPendingAllocate.get() - 
numExecutorsRunning.get()
    +
    +    if (missing > 0) {
    +      numPendingAllocate.addAndGet(missing)
    +      logInfo("Will Allocate %d executor containers, each with %d 
memory".format(
    +        missing,
    +        (executorMemory + memoryOverhead)))
    +    } else {
    +      logDebug("Empty allocation request ...")
    +    }
    +
    +    val allocateResponse = allocateContainers(missing)
    +    val allocatedContainers = allocateResponse.getAllocatedContainers()
    +
    +    if (allocatedContainers.size > 0) {
    +      var numPendingAllocateNow = numPendingAllocate.addAndGet(-1 * 
allocatedContainers.size)
    +
    +      if (numPendingAllocateNow < 0) {
    +        numPendingAllocateNow = numPendingAllocate.addAndGet(-1 * 
numPendingAllocateNow)
    --- End diff --
    
    I know you just copy and pasted this but this seems wrong. I would have 
thought it would add back in any < 0.  To set it to 0.  
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to