Github user kayousterhout commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14079#discussion_r72538818
  
    --- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
    @@ -611,15 +620,31 @@ private[spark] class TaskSetManager(
     
         // If no executors have registered yet, don't abort the stage, just 
wait.  We probably
         // got here because a task set was added before the executors 
registered.
    -    if (executors.nonEmpty) {
    +    if (executorsByHost.nonEmpty) {
           // take any task that needs to be scheduled, and see if we can find 
some executor it *could*
           // run on
    -      pendingTask.foreach { taskId =>
    -        if (executors.forall(executorIsBlacklisted(_, taskId))) {
    -          val execs = executors.toIndexedSeq.sorted.mkString("(", ",", ")")
    -          val partition = tasks(taskId).partitionId
    -          abort(s"Aborting ${taskSet} because task $taskId (partition 
$partition)" +
    -            s" has already failed on executors $execs, and no other 
executors are available.")
    +      pendingTask.foreach { indexInTaskSet =>
    +        // try to find some executor this task can run on.  Its possible 
that some *other*
    +        // task isn't schedulable anywhere, but we will discover that in 
some later call,
    +        // when that unschedulable task is the last task remaining.
    +        val blacklistedEverywhere = executorsByHost.forall { case (host, 
execs) =>
    +          val nodeBlacklisted = blacklist.isNodeBlacklisted(host) ||
    +            isNodeBlacklistedForTaskSet(host) ||
    +            isNodeBlacklistedForTask(host, indexInTaskSet)
    +          if (nodeBlacklisted) {
    +            true
    +          } else {
    +            execs.forall { exec =>
    +              blacklist.isExecutorBlacklisted(exec) ||
    +                isExecutorBlacklistedForTaskSet(exec) ||
    +                isExecutorBlacklistedForTask(exec, indexInTaskSet)
    +            }
    +          }
    +        }
    +        if (blacklistedEverywhere) {
    +          val partition = tasks(indexInTaskSet).partitionId
    +          abort(s"Aborting ${taskSet} because task $indexInTaskSet 
(partition $partition) cannot " +
    +            s"run anywhere due to node and executor blacklist.")
    --- End diff --
    
    Maybe refer to the config options here? Anticipating confused users...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to