Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22288#discussion_r225193634
  
    --- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
    @@ -415,9 +419,61 @@ private[spark] class TaskSchedulerImpl(
                 launchedAnyTask |= launchedTaskAtCurrentMaxLocality
               } while (launchedTaskAtCurrentMaxLocality)
             }
    +
             if (!launchedAnyTask) {
    -          taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
    +          taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors) match 
{
    +            case taskIndex: Some[Int] => // Returns the taskIndex which 
was unschedulable
    +
    +              // If the taskSet is unschedulable we try to find an 
existing idle blacklisted
    +              // executor. If we cannot find one, we abort immediately. 
Else we kill the idle
    +              // executor and kick off an abortTimer which if it doesn't 
schedule a task within the
    +              // the timeout will abort the taskSet if we were unable to 
schedule any task from the
    +              // taskSet.
    +              // Note 1: We keep track of schedulability on a per taskSet 
basis rather than on a per
    +              // task basis.
    +              // Note 2: The taskSet can still be aborted when there are 
more than one idle
    +              // blacklisted executors and dynamic allocation is on. This 
can happen when a killed
    +              // idle executor isn't replaced in time by 
ExecutorAllocationManager as it relies on
    +              // pending tasks and doesn't kill executors on idle 
timeouts, resulting in the abort
    +              // timer to expire and abort the taskSet.
    +              executorIdToRunningTaskIds.find(x => !isExecutorBusy(x._1)) 
match {
    +                case Some (x) =>
    +                  val executorId = x._1
    +                  if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) 
{
    +                    blacklistTrackerOpt.foreach(blt => 
blt.killBlacklistedIdleExecutor(executorId))
    +
    +                    unschedulableTaskSetToExpiryTime(taskSet) = 
clock.getTimeMillis()
    +                    val timeout = 
conf.get(config.UNSCHEDULABLE_TASKSET_TIMEOUT) * 1000
    +                    logInfo(s"Waiting for $timeout ms for completely "
    +                      + s"blacklisted task to be schedulable again before 
aborting $taskSet.")
    +                    abortTimer.schedule(new TimerTask() {
    +                      override def run() {
    +                        if 
(unschedulableTaskSetToExpiryTime.contains(taskSet) &&
    +                          (unschedulableTaskSetToExpiryTime(taskSet) + 
timeout)
    +                            <= clock.getTimeMillis()
    +                        ) {
    +                          logInfo("Cannot schedule any task because of 
complete blacklisting. " +
    +                            s"Wait time for scheduling expired. Aborting 
$taskSet.")
    +                          
taskSet.abortSinceCompletelyBlacklisted(taskIndex.get)
    +                        } else {
    +                          this.cancel()
    +                        }
    +                      }
    +                    }, timeout)
    +                  }
    +                case _ => // Abort Immediately
    +                  logInfo("Cannot schedule any task because of complete 
blacklisting. No idle" +
    +                  s" executors can be found to kill. Aborting $taskSet." )
    +                  taskSet.abortSinceCompletelyBlacklisted(taskIndex.get)
    +              }
    +            case _ => // Do nothing if no tasks completely blacklisted.
    +          }
    +        } else {
    +          // If a task was scheduled, we clear the expiry time for the 
taskSet. The abort timer
    +          // checks this entry to decide if we want to abort the taskSet.
    +          unschedulableTaskSetToExpiryTime.remove(taskSet)
    --- End diff --
    
    Here we have to handle the situation where if you have 2 tasksets, they may 
have both chose the same executor to kill.  If one of the tasksets kills the 
executor and launches a task it clears it expiry, but if a second taskset had 
tried to kill the same executor we don't clear it and it could end up aborting 
the second taskset and killing the job even though it shouldn't have


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to