Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22288#discussion_r228675680 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -415,9 +420,54 @@ private[spark] class TaskSchedulerImpl( launchedAnyTask |= launchedTaskAtCurrentMaxLocality } while (launchedTaskAtCurrentMaxLocality) } + if (!launchedAnyTask) { - taskSet.abortIfCompletelyBlacklisted(hostToExecutors) + taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors) match { + case Some(taskIndex) => // Returns the taskIndex which was unschedulable + + // If the taskSet is unschedulable we try to find an existing idle blacklisted + // executor. If we cannot find one, we abort immediately. Else we kill the idle + // executor and kick off an abortTimer which if it doesn't schedule a task within the + // the timeout will abort the taskSet if we were unable to schedule any task from the + // taskSet. + // Note 1: We keep track of schedulability on a per taskSet basis rather than on a per + // task basis. + // Note 2: The taskSet can still be aborted when there are more than one idle + // blacklisted executors and dynamic allocation is on. This can happen when a killed + // idle executor isn't replaced in time by ExecutorAllocationManager as it relies on + // pending tasks and doesn't kill executors on idle timeouts, resulting in the abort + // timer to expire and abort the taskSet. + executorIdToRunningTaskIds.find(x => !isExecutorBusy(x._1)) match { + case Some ((executorId, _)) => + if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) { + blacklistTrackerOpt.foreach(blt => blt.killBlacklistedIdleExecutor(executorId)) + + val timeout = conf.get(config.UNSCHEDULABLE_TASKSET_TIMEOUT) * 1000 + unschedulableTaskSetToExpiryTime(taskSet) = clock.getTimeMillis() + timeout + logInfo(s"Waiting for $timeout ms for completely " + + s"blacklisted task to be schedulable again before aborting $taskSet.") + abortTimer.schedule( + createUnschedulableTaskSetAbortTimer(taskSet, taskIndex), timeout) + } + case _ => // Abort Immediately + logInfo("Cannot schedule any task because of complete blacklisting. No idle" + + s" executors can be found to kill. Aborting $taskSet." ) + taskSet.abortSinceCompletelyBlacklisted(taskIndex) + } + case _ => // Do nothing if no tasks completely blacklisted. + } + } else { + // We want to defer killing any taskSets as long as we have a non blacklisted executor + // which can be used to schedule a task from any active taskSets. This ensures that the + // job can make progress and if we encounter a flawed taskSet it will eventually either + // fail or abort due to being completely blacklisted. --- End diff -- Here's the scenario I'm worried about: 1) taskset1 and taskset2 are both running currently. taskset1 has enough failures to get blacklisted everywhere. 2) there is an idle executor, even though taskset2 is running (eg. the executor that is available doesn't meet the locality preferences of taskset2). So abort timer is started. 3) the idle executor is killed, and you get a new one. 4) just by luck, taskset2 gets a hold of the new idle executor (eg. the executor is on a node blacklisted by taskset1, or taskset2 just has a higher priority). abort timer is cleared 5) taskset2 finishes, but meanwhile taskset3 has been launched, and it uses the idle executor. etc. for taskSetN, so you keep launching tasks, abort timer gets cleared, but nothing even gets scheduled on taskset1. admittedly this would not be the normal scenario -- you'll need more tasksets to keep coming, and you need tight enough resource constraints that taskset1 never get a hold of anything, even the new one.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org