Github user nishkamravi2 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/7274#discussion_r34108869
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/master/Master.scala 
---
    @@ -544,38 +544,60 @@ private[master] class Master(
        * has enough cores and memory. Otherwise, each executor grabs all the 
cores available on the
        * worker by default, in which case only one executor may be launched on 
each worker.
        */
    -  private def startExecutorsOnWorkers(): Unit = {
    -    // Right now this is a very simple FIFO scheduler. We keep trying to 
fit in the first app
    -    // in the queue, then the second app, etc.
    +
    +  private[master] def scheduleExecutorsOnWorkers(app: ApplicationInfo, 
usableWorkers: Array[WorkerInfo],
    +    spreadOutApps: Boolean): Array[Int] = {
    +    val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1)
    +    val memoryPerExecutor = app.desc.memoryPerExecutorMB
    +    val numUsable = usableWorkers.length
    +    val assignedCores = new Array[Int](numUsable) // Number of cores to 
give to each worker
    +    val assignedMemory = new Array[Int](numUsable) // Amount of memory to 
give to each worker
    +    var toAssign = math.min(app.coresLeft, 
usableWorkers.map(_.coresFree).sum)
    +    var pos = 0
         if (spreadOutApps) {
    -      // Try to spread out each app among all the workers, until it has 
all its cores
    -      for (app <- waitingApps if app.coresLeft > 0) {
    -        val usableWorkers = workers.toArray.filter(_.state == 
WorkerState.ALIVE)
    -          .filter(worker => worker.memoryFree >= 
app.desc.memoryPerExecutorMB &&
    -            worker.coresFree >= app.desc.coresPerExecutor.getOrElse(1))
    -          .sortBy(_.coresFree).reverse
    -        val numUsable = usableWorkers.length
    -        val assigned = new Array[Int](numUsable) // Number of cores to 
give on each node
    -        var toAssign = math.min(app.coresLeft, 
usableWorkers.map(_.coresFree).sum)
    -        var pos = 0
    -        while (toAssign > 0) {
    -          if (usableWorkers(pos).coresFree - assigned(pos) > 0) {
    -            toAssign -= 1
    -            assigned(pos) += 1
    -          }
    -          pos = (pos + 1) % numUsable
    -        }
    -        // Now that we've decided how many cores to give on each node, 
let's actually give them
    -        for (pos <- 0 until numUsable if assigned(pos) > 0) {
    -          allocateWorkerResourceToExecutors(app, assigned(pos), 
usableWorkers(pos))
    +      // Try to spread out executors among workers (sparse scheduling)
    +      while (toAssign > 0) {
    +        if (usableWorkers(pos).coresFree - assignedCores(pos) >= 
coresPerExecutor &&
    +            usableWorkers(pos).memoryFree - assignedMemory(pos) >= 
memoryPerExecutor) {
    +          toAssign -= coresPerExecutor
    --- End diff --
    
    Consider the following: 4 workers each with 16 cores, spark.cores.max=48, 
spark.executor.cores = 16. When we spread out, we allocate one core at a time 
and in doing so end up allocating 12 cores from each worker. First, we ended up 
ignoring spark.executor.cores during allocation, which isn't right. Second, 
when the following condition is checked: while (coresLeft >= coresPerExecutor), 
coresLeft is 12 and coresPerExecutor is 16. As a result, executors don't 
launch. 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to