bmarcott commented on a change in pull request #26696: [WIP][SPARK-18886][CORE] Make locality wait time be the time since a TSM's available slots were fully utilized URL: https://github.com/apache/spark/pull/26696#discussion_r360738965
########## File path: core/src/main/scala/org/apache/spark/scheduler/Pool.scala ########## @@ -119,4 +120,72 @@ private[spark] class Pool( parent.decreaseRunningTasks(taskNum) } } + + // Update the number of slots considered available for each TaskSetManager whose ancestor + // in the tree is this pool + // For FAIR scheduling, slots are distributed among pools based on weights and minshare. + // If a pool requires fewer slots than are available to it, the leftover slots are redistributed + // to the remaining pools using the remaining pools' weights. + // For FIFO scheduling, the schedulable queue is iterated over in FIFO order, + // giving each schedulable the remaining slots, + // up to the number of remaining tasks for that schedulable. + override def updateAvailableSlots(numSlots: Float): Unit = { + schedulingMode match { + case SchedulingMode.FAIR => + val queueCopy = new util.LinkedList[Schedulable](schedulableQueue) + var shouldRedistribute = true + var totalWeights = schedulableQueue.asScala.map(_.weight).sum + var totalSlots = numSlots + while (totalSlots > 0 && shouldRedistribute) { + shouldRedistribute = false + var nextWeights = totalWeights + var nextSlots = totalSlots + val iterator = queueCopy.iterator() + while (iterator.hasNext) { + val schedulable = iterator.next() + val numTasksRemaining = schedulable.getSortedTaskSetQueue + .map(tsm => tsm.tasks.length - tsm.tasksSuccessful).sum + val allocatedSlots = Math.max( + totalSlots * schedulable.weight / totalWeights, + schedulable.minShare) + if (numTasksRemaining < allocatedSlots) { Review comment: 2. from above can be solved by utilizing the `SchedulingAlgorithm` as mentioned, but 1. still remains. A couple more problematic areas: - Slots can be rejected due to [blacklisting](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L392) - Slots can be rejected due to not meeting [resource requirements](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L344) I'm trying to think of an idea where the TSM will report directly if it rejected due to delay scheduling, but I am having trouble thinking how to utilize that data due to problem 1. in previous comment. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org