Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/22288#discussion_r226754849 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -415,9 +420,65 @@ private[spark] class TaskSchedulerImpl( launchedAnyTask |= launchedTaskAtCurrentMaxLocality } while (launchedTaskAtCurrentMaxLocality) } + if (!launchedAnyTask) { - taskSet.abortIfCompletelyBlacklisted(hostToExecutors) + taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors) match { + case taskIndex: Some[Int] => // Returns the taskIndex which was unschedulable + + // If the taskSet is unschedulable we try to find an existing idle blacklisted + // executor. If we cannot find one, we abort immediately. Else we kill the idle + // executor and kick off an abortTimer which if it doesn't schedule a task within the + // the timeout will abort the taskSet if we were unable to schedule any task from the + // taskSet. + // Note 1: We keep track of schedulability on a per taskSet basis rather than on a per + // task basis. + // Note 2: The taskSet can still be aborted when there are more than one idle + // blacklisted executors and dynamic allocation is on. This can happen when a killed + // idle executor isn't replaced in time by ExecutorAllocationManager as it relies on + // pending tasks and doesn't kill executors on idle timeouts, resulting in the abort + // timer to expire and abort the taskSet. + executorIdToRunningTaskIds.find(x => !isExecutorBusy(x._1)) match { + case Some (x) => + val executorId = x._1 + if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) { + blacklistTrackerOpt.foreach(blt => blt.killBlacklistedIdleExecutor(executorId)) + + unschedulableTaskSetToExpiryTime(taskSet) = clock.getTimeMillis() + val timeout = conf.get(config.UNSCHEDULABLE_TASKSET_TIMEOUT) * 1000 + logInfo(s"Waiting for $timeout ms for completely " + + s"blacklisted task to be schedulable again before aborting $taskSet.") + abortTimer.schedule(new TimerTask() { + override def run() { + if (unschedulableTaskSetToExpiryTime.contains(taskSet) && + (unschedulableTaskSetToExpiryTime(taskSet) + timeout) + <= clock.getTimeMillis() + ) { + logInfo("Cannot schedule any task because of complete blacklisting. " + + s"Wait time for scheduling expired. Aborting $taskSet.") + taskSet.abortSinceCompletelyBlacklisted(taskIndex.get) + } else { + this.cancel() + } + } + }, timeout) + } + case _ => // Abort Immediately + logInfo("Cannot schedule any task because of complete blacklisting. No idle" + + s" executors can be found to kill. Aborting $taskSet." ) + taskSet.abortSinceCompletelyBlacklisted(taskIndex.get) + } + case _ => // Do nothing if no tasks completely blacklisted. + } + } else { + // If a task was scheduled, we clear the expiry time for all the taskSets. This ensures + // that we have got atleast a non blacklisted executor and the job can progress. The + // abort timer checks this entry to decide if we want to abort the taskSet. --- End diff -- That is correct. It also covers other scenario that @tgravescs originally pointed out. Lets say if you have multiple taskSets running which are completely blacklisted. If you were able to get an executor, you would just clear the timer for that specific taskSet. Now due to resource constraint, if you weren't able to obtain another executor within the timeout for the other taskSet, you would abort the other taskSet when you could actually wait for it to be scheduled on the newly obtained executor. So clearing the timer for all the taskSets ensures that currently we aren't in a completely blacklisted state and should try to run to completion. However if the taskset itself is flawed, we would eventually fail. This could result in wasted effort, but we don't have a way to determine that yet, so this should be okay.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org