Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22288#discussion_r216725731 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -414,9 +425,48 @@ private[spark] class TaskSchedulerImpl( launchedAnyTask |= launchedTaskAtCurrentMaxLocality } while (launchedTaskAtCurrentMaxLocality) } + if (!launchedAnyTask) { - taskSet.abortIfCompletelyBlacklisted(hostToExecutors) - } + taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors) match { + case taskIndex: Some[Int] => // Returns the taskIndex which was unschedulable + + // If the taskSet is unschedulable we kill an existing blacklisted executor/s and + // kick off an abortTimer which after waiting will abort the taskSet if we were + // unable to schedule any task from the taskSet. + // Note: We keep a track of schedulability on a per taskSet basis rather than on a + // per task basis. + val executor = hostToExecutors.valuesIterator.next().iterator.next() + logDebug("Killing executor because of task unschedulability: " + executor) + blacklistTrackerOpt.foreach(blt => blt.killBlacklistedExecutor(executor)) + + if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) { + unschedulableTaskSetToExpiryTime(taskSet) = clock.getTimeMillis() --- End diff -- I'd include a logInfo here that spark can't schedule anything because of blacklisting, but its going to try to kill blacklisted executors and acquire new ones. Also mention how long it will wait before giving up and the associated conf.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org