mridulm commented on a change in pull request #28287: URL: https://github.com/apache/spark/pull/28287#discussion_r447233292
########## File path: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ########## @@ -696,7 +713,9 @@ private[spark] class ExecutorAllocationManager( // If this is the last stage with pending tasks, mark the scheduler queue as empty // This is needed in case the stage is aborted for any reason - if (stageAttemptToNumTasks.isEmpty && stageAttemptToNumSpeculativeTasks.isEmpty) { + if (stageAttemptToNumTasks.isEmpty && + stageAttemptToNumSpeculativeTasks.isEmpty && + unschedulableTaskSets.isEmpty) { Review comment: Is this required ? If `unschedulableTaskSets` is not empty, then `stageAttemptToNumTasks` should also be non empty right ? ########## File path: core/src/main/java/org/apache/spark/SparkFirehoseListener.java ########## @@ -162,6 +162,11 @@ public void onSpeculativeTaskSubmitted(SparkListenerSpeculativeTaskSubmitted spe onEvent(speculativeTask); } + public void onUnschedulableBlacklistTaskSubmitted( Review comment: Couple of things here: * The task is not submitted, but rather has become unschedulable (perhaps due to lost executors). * The event details are about unschedulable task set - does not give task information. Can we rename this to something better ? ########## File path: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ########## @@ -789,6 +811,23 @@ private[spark] class ExecutorAllocationManager( } } + override def onUnschedulableBlacklistTaskSubmitted + (blacklistedTask: SparkListenerUnschedulableBlacklistTaskSubmitted): Unit = { + val stageId = blacklistedTask.stageId + val stageAttemptId = blacklistedTask.stageAttemptId + allocationManager.synchronized { + (stageId, stageAttemptId) match { + case (Some(stageId), Some(stageAttemptId)) => + val stageAttempt = StageAttempt(stageId, stageAttemptId) + unschedulableTaskSets.add(stageAttempt) + case (None, _) => + // Clear unschedulableTaskSets since atleast one task becomes schedulable now + unschedulableTaskSets.clear() Review comment: Why are we clearing all tasksets ? Shouldn't this not remove the specific taskset which became schedulable ? ########## File path: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ########## @@ -289,13 +290,23 @@ private[spark] class ExecutorAllocationManager( s" tasksperexecutor: $tasksPerExecutor") val maxNeeded = math.ceil(numRunningOrPendingTasks * executorAllocationRatio / tasksPerExecutor).toInt - if (tasksPerExecutor > 1 && maxNeeded == 1 && pendingSpeculative > 0) { + val totalNeed = if (tasksPerExecutor > 1 && maxNeeded == 1 && pendingSpeculative > 0) { // If we have pending speculative tasks and only need a single executor, allocate one more // to satisfy the locality requirements of speculation maxNeeded + 1 } else { maxNeeded } + + // Request additional executors to schedule the unschedulable tasks as well + if (numUnschedulables > 0) { + val maxNeededForUnschedulables = math.ceil(numUnschedulables * executorAllocationRatio / + tasksPerExecutor).toInt + math.max(totalNeed, executorMonitor.executorCountWithResourceProfile(rpId)) + + maxNeededForUnschedulables + } else { Review comment: Can you update with details about how this was resolved ? The original looked fine to me, but I want to make sure I am not missing something. ########## File path: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ########## @@ -829,12 +868,25 @@ private[spark] class ExecutorAllocationManager( numTotalTasks - numRunning } + def pendingUnschedulableTasksPerResourceProfile(rp: Int): Int = { Review comment: The method is returning number of task sets which are not schedulable - not number of tasks which are not scheduable. `maxNumExecutorsNeededPerResourceProfile` is relying on this being task count - we will need to fix it here and in the event being fired. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org