mridulm commented on a change in pull request #28287:
URL: https://github.com/apache/spark/pull/28287#discussion_r447233292



##########
File path: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
##########
@@ -696,7 +713,9 @@ private[spark] class ExecutorAllocationManager(
 
         // If this is the last stage with pending tasks, mark the scheduler 
queue as empty
         // This is needed in case the stage is aborted for any reason
-        if (stageAttemptToNumTasks.isEmpty && 
stageAttemptToNumSpeculativeTasks.isEmpty) {
+        if (stageAttemptToNumTasks.isEmpty &&
+          stageAttemptToNumSpeculativeTasks.isEmpty &&
+          unschedulableTaskSets.isEmpty) {

Review comment:
       Is this required ? If `unschedulableTaskSets` is not empty, then 
`stageAttemptToNumTasks` should also be non empty right ?

##########
File path: core/src/main/java/org/apache/spark/SparkFirehoseListener.java
##########
@@ -162,6 +162,11 @@ public void 
onSpeculativeTaskSubmitted(SparkListenerSpeculativeTaskSubmitted spe
     onEvent(speculativeTask);
   }
 
+  public void onUnschedulableBlacklistTaskSubmitted(

Review comment:
       Couple of things here:
   * The task is not submitted, but rather has become unschedulable (perhaps 
due to lost executors).
   * The event details are about unschedulable task set - does not give task 
information.
   
   Can we rename this to something better ?

##########
File path: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
##########
@@ -789,6 +811,23 @@ private[spark] class ExecutorAllocationManager(
       }
     }
 
+    override def onUnschedulableBlacklistTaskSubmitted
+      (blacklistedTask: SparkListenerUnschedulableBlacklistTaskSubmitted): 
Unit = {
+      val stageId = blacklistedTask.stageId
+      val stageAttemptId = blacklistedTask.stageAttemptId
+      allocationManager.synchronized {
+        (stageId, stageAttemptId) match {
+          case (Some(stageId), Some(stageAttemptId)) =>
+            val stageAttempt = StageAttempt(stageId, stageAttemptId)
+            unschedulableTaskSets.add(stageAttempt)
+          case (None, _) =>
+            // Clear unschedulableTaskSets since atleast one task becomes 
schedulable now
+            unschedulableTaskSets.clear()

Review comment:
       Why are we clearing all tasksets ? Shouldn't this not remove the 
specific taskset which became schedulable ?

##########
File path: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
##########
@@ -289,13 +290,23 @@ private[spark] class ExecutorAllocationManager(
       s" tasksperexecutor: $tasksPerExecutor")
     val maxNeeded = math.ceil(numRunningOrPendingTasks * 
executorAllocationRatio /
       tasksPerExecutor).toInt
-    if (tasksPerExecutor > 1 && maxNeeded == 1 && pendingSpeculative > 0) {
+    val totalNeed = if (tasksPerExecutor > 1 && maxNeeded == 1 && 
pendingSpeculative > 0) {
       // If we have pending speculative tasks and only need a single executor, 
allocate one more
       // to satisfy the locality requirements of speculation
       maxNeeded + 1
     } else {
       maxNeeded
     }
+
+    // Request additional executors to schedule the unschedulable tasks as well
+    if (numUnschedulables > 0) {
+      val maxNeededForUnschedulables = math.ceil(numUnschedulables * 
executorAllocationRatio /
+        tasksPerExecutor).toInt
+      math.max(totalNeed, 
executorMonitor.executorCountWithResourceProfile(rpId)) +
+        maxNeededForUnschedulables
+    } else {

Review comment:
       Can you update with details about how this was resolved ? The original 
looked fine to me, but I want to make sure I am not missing something.

##########
File path: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
##########
@@ -829,12 +868,25 @@ private[spark] class ExecutorAllocationManager(
       numTotalTasks - numRunning
     }
 
+    def pendingUnschedulableTasksPerResourceProfile(rp: Int): Int = {

Review comment:
       The method is returning number of task sets which are not schedulable - 
not number of tasks which are not scheduable.
   `maxNumExecutorsNeededPerResourceProfile` is relying on this being task 
count - we will need to fix it here and in the event being fired.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to