Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/7532#discussion_r35915249
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/master/Master.scala 
---
    @@ -563,32 +570,48 @@ private[master] class Master(
           app: ApplicationInfo,
           usableWorkers: Array[WorkerInfo],
           spreadOutApps: Boolean): Array[Int] = {
    -    // If the number of cores per executor is not specified, then we can 
just schedule
    -    // 1 core at a time since we expect a single executor to be launched 
on each worker
    -    val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1)
    +    val coresPerExecutor = app.desc.coresPerExecutor
    +    val minCoresPerExecutor = coresPerExecutor.getOrElse(1)
         val memoryPerExecutor = app.desc.memoryPerExecutorMB
         val numUsable = usableWorkers.length
         val assignedCores = new Array[Int](numUsable) // Number of cores to 
give to each worker
    -    val assignedMemory = new Array[Int](numUsable) // Amount of memory to 
give to each worker
    +    val assignedExecutors = new Array[Int](numUsable) // Number of new 
executors on each worker
         var coresToAssign = math.min(app.coresLeft, 
usableWorkers.map(_.coresFree).sum)
    -    var freeWorkers = (0 until numUsable).toIndexedSeq
     
    +    /** Return whether the specified worker can launch an executor for 
this app. */
         def canLaunchExecutor(pos: Int): Boolean = {
    -      usableWorkers(pos).coresFree - assignedCores(pos) >= 
coresPerExecutor &&
    -      usableWorkers(pos).memoryFree - assignedMemory(pos) >= 
memoryPerExecutor
    +      val assignedMemory = assignedExecutors(pos) * memoryPerExecutor
    +      val underLimit =
    +        if (app.oneExecutorPerWorker() && assignedExecutors(pos) == 1) {
    +          // We only have one executor per worker and have already started 
to assign cores to it,
    +          // so assigning more to it does not change the number of 
executors we'll end up with
    +          true
    +        } else {
    +          // Otherwise, we should launch a new executor only if we do not 
exceed the limit
    +          assignedExecutors.sum + app.executors.size < app.executorLimit
    --- End diff --
    
    It feels a little weird to insert app limit logic here, since the function 
seems to be about whether the worker can take a new executor.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to