bmarcott commented on a change in pull request #26696: [WIP][SPARK-18886][CORE] 
Make locality wait time be the time since a TSM's available slots were fully 
utilized
URL: https://github.com/apache/spark/pull/26696#discussion_r360738965
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/scheduler/Pool.scala
 ##########
 @@ -119,4 +120,72 @@ private[spark] class Pool(
       parent.decreaseRunningTasks(taskNum)
     }
   }
+
+  // Update the number of slots considered available for each TaskSetManager 
whose ancestor
+  // in the tree is this pool
+  // For FAIR scheduling, slots are distributed among pools based on weights 
and minshare.
+  //   If a pool requires fewer slots than are available to it, the leftover 
slots are redistributed
+  //   to the remaining pools using the remaining pools' weights.
+  // For FIFO scheduling, the schedulable queue is iterated over in FIFO order,
+  //   giving each schedulable the remaining slots,
+  //   up to the number of remaining tasks for that schedulable.
+  override def updateAvailableSlots(numSlots: Float): Unit = {
+    schedulingMode match {
+      case SchedulingMode.FAIR =>
+        val queueCopy = new util.LinkedList[Schedulable](schedulableQueue)
+        var shouldRedistribute = true
+        var totalWeights = schedulableQueue.asScala.map(_.weight).sum
+        var totalSlots = numSlots
+        while (totalSlots > 0 && shouldRedistribute) {
+          shouldRedistribute = false
+          var nextWeights = totalWeights
+          var nextSlots = totalSlots
+          val iterator = queueCopy.iterator()
+          while (iterator.hasNext) {
+            val schedulable = iterator.next()
+            val numTasksRemaining = schedulable.getSortedTaskSetQueue
+              .map(tsm => tsm.tasks.length - tsm.tasksSuccessful).sum
+            val allocatedSlots = Math.max(
+              totalSlots * schedulable.weight / totalWeights,
+              schedulable.minShare)
+            if (numTasksRemaining < allocatedSlots) {
 
 Review comment:
   2. from above can be solved by utilizing the `SchedulingAlgorithm` as 
mentioned, but 1. still remains. A couple more problematic areas:
   
   - Slots can be rejected due to 
[blacklisting](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L392)
   - Slots can be rejected due to not meeting [resource 
requirements](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L344)
   
   I'm trying to think of an idea where the TSM will report directly if it 
rejected due to delay scheduling, but I am having trouble thinking how to 
utilize that data due to problem 1. in previous comment.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to