Repository: spark
Updated Branches:
  refs/heads/master 3ed91c9b8 -> fdd3bace1


[SPARK-22148][SPARK-15815][SCHEDULER] Acquire new executors to avoid hang 
because of blacklisting

## What changes were proposed in this pull request?
Every time a task is unschedulable because of the condition where no. of task 
failures < no. of executors available, we currently abort the taskSet - failing 
the job. This change tries to acquire new executors so that we can complete the 
job successfully. We try to acquire a new executor only when we can kill an 
existing idle executor. We fallback to the older implementation where we abort 
the job if we cannot find an idle executor.

## How was this patch tested?

I performed some manual tests to check and validate the behavior.

```scala
val rdd = sc.parallelize(Seq(1 to 10), 3)

import org.apache.spark.TaskContext

val mapped = rdd.mapPartitionsWithIndex ( (index, iterator) => { if (index == 
2) { Thread.sleep(30 * 1000); val attemptNum = TaskContext.get.attemptNumber; 
if (attemptNum < 3) throw new Exception("Fail for blacklisting")};  
iterator.toList.map (x => x + " -> " + index).iterator } )

mapped.collect
```

Closes #22288 from dhruve/bug/SPARK-22148.

Lead-authored-by: Dhruve Ashar <dhruveas...@gmail.com>
Co-authored-by: Dhruve Ashar <dhr...@users.noreply.github.com>
Co-authored-by: Tom Graves <tgra...@apache.org>
Signed-off-by: Thomas Graves <tgra...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fdd3bace
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fdd3bace
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fdd3bace

Branch: refs/heads/master
Commit: fdd3bace1da01e5958fe0345c38e889e740ce25e
Parents: 3ed91c9
Author: Dhruve Ashar <dhruveas...@gmail.com>
Authored: Tue Nov 6 08:25:32 2018 -0600
Committer: Thomas Graves <tgra...@apache.org>
Committed: Tue Nov 6 08:25:32 2018 -0600

----------------------------------------------------------------------
 .../apache/spark/internal/config/package.scala  |   8 +
 .../spark/scheduler/BlacklistTracker.scala      |  30 ++-
 .../spark/scheduler/TaskSchedulerImpl.scala     |  71 ++++++-
 .../apache/spark/scheduler/TaskSetManager.scala |  41 ++--
 .../scheduler/BlacklistIntegrationSuite.scala   |   7 +-
 .../scheduler/TaskSchedulerImplSuite.scala      | 189 ++++++++++++++++++-
 docs/configuration.md                           |   8 +
 7 files changed, 318 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/fdd3bace/core/src/main/scala/org/apache/spark/internal/config/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index c8993e1..2b3ba3c 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -622,6 +622,14 @@ package object config {
       .checkValue(v => v > 0, "The value should be a positive time value.")
       .createWithDefaultString("365d")
 
+  private[spark] val UNSCHEDULABLE_TASKSET_TIMEOUT =
+    ConfigBuilder("spark.scheduler.blacklist.unschedulableTaskSetTimeout")
+      .doc("The timeout in seconds to wait to acquire a new executor and 
schedule a task " +
+        "before aborting a TaskSet which is unschedulable because of being 
completely blacklisted.")
+      .timeConf(TimeUnit.SECONDS)
+      .checkValue(v => v >= 0, "The value should be a non negative time 
value.")
+      .createWithDefault(120)
+
   private[spark] val BARRIER_MAX_CONCURRENT_TASKS_CHECK_INTERVAL =
     ConfigBuilder("spark.scheduler.barrier.maxConcurrentTasksCheck.interval")
       .doc("Time in seconds to wait between a max concurrent tasks check 
failure and the next " +

http://git-wip-us.apache.org/repos/asf/spark/blob/fdd3bace/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala 
b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala
index 980fbbe..ef6d02d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala
@@ -146,21 +146,31 @@ private[scheduler] class BlacklistTracker (
     nextExpiryTime = math.min(execMinExpiry, nodeMinExpiry)
   }
 
+  private def killExecutor(exec: String, msg: String): Unit = {
+    allocationClient match {
+      case Some(a) =>
+        logInfo(msg)
+        a.killExecutors(Seq(exec), adjustTargetNumExecutors = false, 
countFailures = false,
+          force = true)
+      case None =>
+        logInfo(s"Not attempting to kill blacklisted executor id $exec " +
+          s"since allocation client is not defined.")
+    }
+  }
+
   private def killBlacklistedExecutor(exec: String): Unit = {
     if (conf.get(config.BLACKLIST_KILL_ENABLED)) {
-      allocationClient match {
-        case Some(a) =>
-          logInfo(s"Killing blacklisted executor id $exec " +
-            s"since ${config.BLACKLIST_KILL_ENABLED.key} is set.")
-          a.killExecutors(Seq(exec), adjustTargetNumExecutors = false, 
countFailures = false,
-            force = true)
-        case None =>
-          logWarning(s"Not attempting to kill blacklisted executor id $exec " +
-            s"since allocation client is not defined.")
-      }
+      killExecutor(exec,
+        s"Killing blacklisted executor id $exec since 
${config.BLACKLIST_KILL_ENABLED.key} is set.")
     }
   }
 
+  private[scheduler] def killBlacklistedIdleExecutor(exec: String): Unit = {
+    killExecutor(exec,
+      s"Killing blacklisted idle executor id $exec because of task 
unschedulability and trying " +
+        "to acquire a new executor.")
+  }
+
   private def killExecutorsOnBlacklistedNode(node: String): Unit = {
     if (conf.get(config.BLACKLIST_KILL_ENABLED)) {
       allocationClient match {

http://git-wip-us.apache.org/repos/asf/spark/blob/fdd3bace/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 4f870e85..61556ea 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -35,7 +35,7 @@ import org.apache.spark.rpc.RpcEndpoint
 import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
 import org.apache.spark.scheduler.TaskLocality.TaskLocality
 import org.apache.spark.storage.BlockManagerId
-import org.apache.spark.util.{AccumulatorV2, ThreadUtils, Utils}
+import org.apache.spark.util.{AccumulatorV2, SystemClock, ThreadUtils, Utils}
 
 /**
  * Schedules tasks for multiple types of clusters by acting through a 
SchedulerBackend.
@@ -117,6 +117,11 @@ private[spark] class TaskSchedulerImpl(
 
   protected val executorIdToHost = new HashMap[String, String]
 
+  private val abortTimer = new Timer(true)
+  private val clock = new SystemClock
+  // Exposed for testing
+  val unschedulableTaskSetToExpiryTime = new HashMap[TaskSetManager, Long]
+
   // Listener object to pass upcalls into
   var dagScheduler: DAGScheduler = null
 
@@ -415,9 +420,53 @@ private[spark] class TaskSchedulerImpl(
             launchedAnyTask |= launchedTaskAtCurrentMaxLocality
           } while (launchedTaskAtCurrentMaxLocality)
         }
+
         if (!launchedAnyTask) {
-          taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
+          taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors).foreach { 
taskIndex =>
+              // If the taskSet is unschedulable we try to find an existing 
idle blacklisted
+              // executor. If we cannot find one, we abort immediately. Else 
we kill the idle
+              // executor and kick off an abortTimer which if it doesn't 
schedule a task within the
+              // the timeout will abort the taskSet if we were unable to 
schedule any task from the
+              // taskSet.
+              // Note 1: We keep track of schedulability on a per taskSet 
basis rather than on a per
+              // task basis.
+              // Note 2: The taskSet can still be aborted when there are more 
than one idle
+              // blacklisted executors and dynamic allocation is on. This can 
happen when a killed
+              // idle executor isn't replaced in time by 
ExecutorAllocationManager as it relies on
+              // pending tasks and doesn't kill executors on idle timeouts, 
resulting in the abort
+              // timer to expire and abort the taskSet.
+              executorIdToRunningTaskIds.find(x => !isExecutorBusy(x._1)) 
match {
+                case Some ((executorId, _)) =>
+                  if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) {
+                    blacklistTrackerOpt.foreach(blt => 
blt.killBlacklistedIdleExecutor(executorId))
+
+                    val timeout = 
conf.get(config.UNSCHEDULABLE_TASKSET_TIMEOUT) * 1000
+                    unschedulableTaskSetToExpiryTime(taskSet) = 
clock.getTimeMillis() + timeout
+                    logInfo(s"Waiting for $timeout ms for completely "
+                      + s"blacklisted task to be schedulable again before 
aborting $taskSet.")
+                    abortTimer.schedule(
+                      createUnschedulableTaskSetAbortTimer(taskSet, 
taskIndex), timeout)
+                  }
+                case None => // Abort Immediately
+                  logInfo("Cannot schedule any task because of complete 
blacklisting. No idle" +
+                    s" executors can be found to kill. Aborting $taskSet." )
+                  taskSet.abortSinceCompletelyBlacklisted(taskIndex)
+              }
+          }
+        } else {
+          // We want to defer killing any taskSets as long as we have a non 
blacklisted executor
+          // which can be used to schedule a task from any active taskSets. 
This ensures that the
+          // job can make progress.
+          // Note: It is theoretically possible that a taskSet never gets 
scheduled on a
+          // non-blacklisted executor and the abort timer doesn't kick in 
because of a constant
+          // submission of new TaskSets. See the PR for more details.
+          if (unschedulableTaskSetToExpiryTime.nonEmpty) {
+            logInfo("Clearing the expiry times for all unschedulable taskSets 
as a task was " +
+              "recently scheduled.")
+            unschedulableTaskSetToExpiryTime.clear()
+          }
         }
+
         if (launchedAnyTask && taskSet.isBarrier) {
           // Check whether the barrier tasks are partially launched.
           // TODO SPARK-24818 handle the assert failure case (that can happen 
when some locality
@@ -453,6 +502,23 @@ private[spark] class TaskSchedulerImpl(
     return tasks
   }
 
+  private def createUnschedulableTaskSetAbortTimer(
+      taskSet: TaskSetManager,
+      taskIndex: Int): TimerTask = {
+    new TimerTask() {
+      override def run() {
+        if (unschedulableTaskSetToExpiryTime.contains(taskSet) &&
+            unschedulableTaskSetToExpiryTime(taskSet) <= 
clock.getTimeMillis()) {
+          logInfo("Cannot schedule any task because of complete blacklisting. 
" +
+            s"Wait time for scheduling expired. Aborting $taskSet.")
+          taskSet.abortSinceCompletelyBlacklisted(taskIndex)
+        } else {
+          this.cancel()
+        }
+      }
+    }
+  }
+
   /**
    * Shuffle offers around to avoid always placing tasks on the same workers.  
Exposed to allow
    * overriding in tests, so it can be deterministic.
@@ -590,6 +656,7 @@ private[spark] class TaskSchedulerImpl(
       barrierCoordinator.stop()
     }
     starvationTimer.cancel()
+    abortTimer.cancel()
   }
 
   override def defaultParallelism(): Int = backend.defaultParallelism()

http://git-wip-us.apache.org/repos/asf/spark/blob/fdd3bace/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index d5e85a1..6bf60dd 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -623,8 +623,8 @@ private[spark] class TaskSetManager(
    *
    * It is possible that this taskset has become impossible to schedule 
*anywhere* due to the
    * blacklist.  The most common scenario would be if there are fewer 
executors than
-   * spark.task.maxFailures. We need to detect this so we can fail the task 
set, otherwise the job
-   * will hang.
+   * spark.task.maxFailures. We need to detect this so we can avoid the job 
from being hung.
+   * We try to acquire new executor/s by killing an existing idle blacklisted 
executor.
    *
    * There's a tradeoff here: we could make sure all tasks in the task set are 
schedulable, but that
    * would add extra time to each iteration of the scheduling loop. Here, we 
take the approach of
@@ -635,9 +635,9 @@ private[spark] class TaskSetManager(
    * failures (this is because the method picks one unscheduled task, and then 
iterates through each
    * executor until it finds one that the task isn't blacklisted on).
    */
-  private[scheduler] def abortIfCompletelyBlacklisted(
-      hostToExecutors: HashMap[String, HashSet[String]]): Unit = {
-    taskSetBlacklistHelperOpt.foreach { taskSetBlacklist =>
+  private[scheduler] def getCompletelyBlacklistedTaskIfAny(
+      hostToExecutors: HashMap[String, HashSet[String]]): Option[Int] = {
+    taskSetBlacklistHelperOpt.flatMap { taskSetBlacklist =>
       val appBlacklist = blacklistTracker.get
       // Only look for unschedulable tasks when at least one executor has 
registered. Otherwise,
       // task sets will be (unnecessarily) aborted in cases when no executors 
have registered yet.
@@ -658,11 +658,11 @@ private[spark] class TaskSetManager(
           }
         }
 
-        pendingTask.foreach { indexInTaskSet =>
+        pendingTask.find { indexInTaskSet =>
           // try to find some executor this task can run on.  Its possible 
that some *other*
           // task isn't schedulable anywhere, but we will discover that in 
some later call,
           // when that unschedulable task is the last task remaining.
-          val blacklistedEverywhere = hostToExecutors.forall { case (host, 
execsOnHost) =>
+          hostToExecutors.forall { case (host, execsOnHost) =>
             // Check if the task can run on the node
             val nodeBlacklisted =
               appBlacklist.isNodeBlacklisted(host) ||
@@ -679,22 +679,27 @@ private[spark] class TaskSetManager(
               }
             }
           }
-          if (blacklistedEverywhere) {
-            val partition = tasks(indexInTaskSet).partitionId
-            abort(s"""
-              |Aborting $taskSet because task $indexInTaskSet (partition 
$partition)
-              |cannot run anywhere due to node and executor blacklist.
-              |Most recent failure:
-              |${taskSetBlacklist.getLatestFailureReason}
-              |
-              |Blacklisting behavior can be configured via spark.blacklist.*.
-              |""".stripMargin)
-          }
         }
+      } else {
+        None
       }
     }
   }
 
+  private[scheduler] def abortSinceCompletelyBlacklisted(indexInTaskSet: Int): 
Unit = {
+    taskSetBlacklistHelperOpt.foreach { taskSetBlacklist =>
+      val partition = tasks(indexInTaskSet).partitionId
+      abort(s"""
+         |Aborting $taskSet because task $indexInTaskSet (partition $partition)
+         |cannot run anywhere due to node and executor blacklist.
+         |Most recent failure:
+         |${taskSetBlacklist.getLatestFailureReason}
+         |
+         |Blacklisting behavior can be configured via spark.blacklist.*.
+         |""".stripMargin)
+    }
+  }
+
   /**
    * Marks the task as getting result and notifies the DAG Scheduler
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/fdd3bace/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
index fe22d70..29bb823 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
@@ -96,15 +96,16 @@ class BlacklistIntegrationSuite extends 
SchedulerIntegrationSuite[MultiExecutorM
     assertDataStructuresEmpty(noFailure = true)
   }
 
-  // Make sure that if we've failed on all executors, but haven't hit 
task.maxFailures yet, the job
-  // doesn't hang
+  // Make sure that if we've failed on all executors, but haven't hit 
task.maxFailures yet, we try
+  // to acquire a new executor and if we aren't able to get one, the job 
doesn't hang and we abort
   testScheduler(
     "SPARK-15865 Progress with fewer executors than maxTaskFailures",
     extraConfs = Seq(
       config.BLACKLIST_ENABLED.key -> "true",
       "spark.testing.nHosts" -> "2",
       "spark.testing.nExecutorsPerHost" -> "1",
-      "spark.testing.nCoresPerExecutor" -> "1"
+      "spark.testing.nCoresPerExecutor" -> "1",
+      "spark.scheduler.blacklist.unschedulableTaskSetTimeout" -> "0s"
     )
   ) {
     def runBackend(): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/fdd3bace/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index 9e1d13e..29172b4 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -20,10 +20,12 @@ package org.apache.spark.scheduler
 import java.nio.ByteBuffer
 
 import scala.collection.mutable.HashMap
+import scala.concurrent.duration._
 
 import org.mockito.Matchers.{anyInt, anyObject, anyString, eq => meq}
 import org.mockito.Mockito.{atLeast, atMost, never, spy, times, verify, when}
 import org.scalatest.BeforeAndAfterEach
+import org.scalatest.concurrent.Eventually
 import org.scalatest.mockito.MockitoSugar
 
 import org.apache.spark._
@@ -40,7 +42,7 @@ class FakeSchedulerBackend extends SchedulerBackend {
 }
 
 class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with 
BeforeAndAfterEach
-    with Logging with MockitoSugar {
+    with Logging with MockitoSugar with Eventually {
 
   var failedTaskSetException: Option[Throwable] = None
   var failedTaskSetReason: String = null
@@ -82,10 +84,12 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with B
     setupHelper()
   }
 
-  def setupSchedulerWithMockTaskSetBlacklist(): TaskSchedulerImpl = {
+  def setupSchedulerWithMockTaskSetBlacklist(confs: (String, String)*): 
TaskSchedulerImpl = {
     blacklist = mock[BlacklistTracker]
     val conf = new 
SparkConf().setMaster("local").setAppName("TaskSchedulerImplSuite")
     conf.set(config.BLACKLIST_ENABLED, true)
+    confs.foreach { case (k, v) => conf.set(k, v) }
+
     sc = new SparkContext(conf)
     taskScheduler =
       new TaskSchedulerImpl(sc, sc.conf.getInt("spark.task.maxFailures", 4)) {
@@ -466,7 +470,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with B
     }
   }
 
-  test("abort stage when all executors are blacklisted") {
+  test("abort stage when all executors are blacklisted and we cannot acquire 
new executor") {
     taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
     val taskSet = FakeTask.createTaskSet(numTasks = 10, stageAttemptId = 0)
     taskScheduler.submitTasks(taskSet)
@@ -503,6 +507,185 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with B
     verify(tsm).abort(anyString(), anyObject())
   }
 
+  test("SPARK-22148 abort timer should kick in when task is completely 
blacklisted & no new " +
+      "executor can be acquired") {
+    // set the abort timer to fail immediately
+    taskScheduler = setupSchedulerWithMockTaskSetBlacklist(
+      config.UNSCHEDULABLE_TASKSET_TIMEOUT.key -> "0")
+
+    // We have only 1 task remaining with 1 executor
+    val taskSet = FakeTask.createTaskSet(numTasks = 1, stageAttemptId = 0)
+    taskScheduler.submitTasks(taskSet)
+    val tsm = stageToMockTaskSetManager(0)
+
+    // submit an offer with one executor
+    val firstTaskAttempts = taskScheduler.resourceOffers(IndexedSeq(
+      WorkerOffer("executor0", "host0", 1)
+    )).flatten
+
+    // Fail the running task
+    val failedTask = firstTaskAttempts.find(_.executorId == "executor0").get
+    taskScheduler.statusUpdate(failedTask.taskId, TaskState.FAILED, 
ByteBuffer.allocate(0))
+    // we explicitly call the handleFailedTask method here to avoid adding a 
sleep in the test suite
+    // Reason being - handleFailedTask is run by an executor service and there 
is a momentary delay
+    // before it is launched and this fails the assertion check.
+    tsm.handleFailedTask(failedTask.taskId, TaskState.FAILED, UnknownReason)
+    when(tsm.taskSetBlacklistHelperOpt.get.isExecutorBlacklistedForTask(
+      "executor0", failedTask.index)).thenReturn(true)
+
+    // make an offer on the blacklisted executor.  We won't schedule anything, 
and set the abort
+    // timer to kick in immediately
+    assert(taskScheduler.resourceOffers(IndexedSeq(
+      WorkerOffer("executor0", "host0", 1)
+    )).flatten.size === 0)
+    // Wait for the abort timer to kick in. Even though we configure the 
timeout to be 0, there is a
+    // slight delay as the abort timer is launched in a separate thread.
+    eventually(timeout(500.milliseconds)) {
+      assert(tsm.isZombie)
+    }
+  }
+
+  test("SPARK-22148 try to acquire a new executor when task is unschedulable 
with 1 executor") {
+    taskScheduler = setupSchedulerWithMockTaskSetBlacklist(
+      config.UNSCHEDULABLE_TASKSET_TIMEOUT.key -> "10")
+
+    // We have only 1 task remaining with 1 executor
+    val taskSet = FakeTask.createTaskSet(numTasks = 1, stageAttemptId = 0)
+    taskScheduler.submitTasks(taskSet)
+    val tsm = stageToMockTaskSetManager(0)
+
+    // submit an offer with one executor
+    val firstTaskAttempts = taskScheduler.resourceOffers(IndexedSeq(
+      WorkerOffer("executor0", "host0", 1)
+    )).flatten
+
+    // Fail the running task
+    val failedTask = firstTaskAttempts.head
+    taskScheduler.statusUpdate(failedTask.taskId, TaskState.FAILED, 
ByteBuffer.allocate(0))
+    // we explicitly call the handleFailedTask method here to avoid adding a 
sleep in the test suite
+    // Reason being - handleFailedTask is run by an executor service and there 
is a momentary delay
+    // before it is launched and this fails the assertion check.
+    tsm.handleFailedTask(failedTask.taskId, TaskState.FAILED, UnknownReason)
+    when(tsm.taskSetBlacklistHelperOpt.get.isExecutorBlacklistedForTask(
+      "executor0", failedTask.index)).thenReturn(true)
+
+    // make an offer on the blacklisted executor.  We won't schedule anything, 
and set the abort
+    // timer to expire if no new executors could be acquired. We kill the 
existing idle blacklisted
+    // executor and try to acquire a new one.
+    assert(taskScheduler.resourceOffers(IndexedSeq(
+      WorkerOffer("executor0", "host0", 1)
+    )).flatten.size === 0)
+    assert(taskScheduler.unschedulableTaskSetToExpiryTime.contains(tsm))
+    assert(!tsm.isZombie)
+
+    // Offer a new executor which should be accepted
+    assert(taskScheduler.resourceOffers(IndexedSeq(
+      WorkerOffer("executor1", "host0", 1)
+    )).flatten.size === 1)
+    assert(taskScheduler.unschedulableTaskSetToExpiryTime.isEmpty)
+    assert(!tsm.isZombie)
+  }
+
+  // This is to test a scenario where we have two taskSets completely 
blacklisted and on acquiring
+  // a new executor we don't want the abort timer for the second taskSet to 
expire and abort the job
+  test("SPARK-22148 abort timer should clear unschedulableTaskSetToExpiryTime 
for all TaskSets") {
+    taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
+
+    // We have 2 taskSets with 1 task remaining in each with 1 executor 
completely blacklisted
+    val taskSet1 = FakeTask.createTaskSet(numTasks = 1, stageId = 0, 
stageAttemptId = 0)
+    taskScheduler.submitTasks(taskSet1)
+    val taskSet2 = FakeTask.createTaskSet(numTasks = 1, stageId = 1, 
stageAttemptId = 0)
+    taskScheduler.submitTasks(taskSet2)
+    val tsm = stageToMockTaskSetManager(0)
+
+    // submit an offer with one executor
+    val firstTaskAttempts = taskScheduler.resourceOffers(IndexedSeq(
+      WorkerOffer("executor0", "host0", 1)
+    )).flatten
+
+    assert(taskScheduler.unschedulableTaskSetToExpiryTime.isEmpty)
+
+    // Fail the running task
+    val failedTask = firstTaskAttempts.head
+    taskScheduler.statusUpdate(failedTask.taskId, TaskState.FAILED, 
ByteBuffer.allocate(0))
+    tsm.handleFailedTask(failedTask.taskId, TaskState.FAILED, UnknownReason)
+    when(tsm.taskSetBlacklistHelperOpt.get.isExecutorBlacklistedForTask(
+      "executor0", failedTask.index)).thenReturn(true)
+
+    // make an offer. We will schedule the task from the second taskSet. Since 
a task was scheduled
+    // we do not kick off the abort timer for taskSet1
+    val secondTaskAttempts = taskScheduler.resourceOffers(IndexedSeq(
+      WorkerOffer("executor0", "host0", 1)
+    )).flatten
+
+    assert(taskScheduler.unschedulableTaskSetToExpiryTime.isEmpty)
+
+    val tsm2 = stageToMockTaskSetManager(1)
+    val failedTask2 = secondTaskAttempts.head
+    taskScheduler.statusUpdate(failedTask2.taskId, TaskState.FAILED, 
ByteBuffer.allocate(0))
+    tsm2.handleFailedTask(failedTask2.taskId, TaskState.FAILED, UnknownReason)
+    when(tsm2.taskSetBlacklistHelperOpt.get.isExecutorBlacklistedForTask(
+      "executor0", failedTask2.index)).thenReturn(true)
+
+    // make an offer on the blacklisted executor.  We won't schedule anything, 
and set the abort
+    // timer for taskSet1 and taskSet2
+    assert(taskScheduler.resourceOffers(IndexedSeq(
+      WorkerOffer("executor0", "host0", 1)
+    )).flatten.size === 0)
+    assert(taskScheduler.unschedulableTaskSetToExpiryTime.contains(tsm))
+    assert(taskScheduler.unschedulableTaskSetToExpiryTime.contains(tsm2))
+    assert(taskScheduler.unschedulableTaskSetToExpiryTime.size == 2)
+
+    // Offer a new executor which should be accepted
+    assert(taskScheduler.resourceOffers(IndexedSeq(
+      WorkerOffer("executor1", "host1", 1)
+    )).flatten.size === 1)
+
+    // Check if all the taskSets are cleared
+    assert(taskScheduler.unschedulableTaskSetToExpiryTime.isEmpty)
+
+    assert(!tsm.isZombie)
+  }
+
+  // this test is to check that we don't abort a taskSet which is not being 
scheduled on other
+  // executors as it is waiting on locality timeout and not being aborted 
because it is still not
+  // completely blacklisted.
+  test("SPARK-22148 Ensure we don't abort the taskSet if we haven't been 
completely blacklisted") {
+    taskScheduler = setupSchedulerWithMockTaskSetBlacklist(
+      config.UNSCHEDULABLE_TASKSET_TIMEOUT.key -> "0",
+      // This is to avoid any potential flakiness in the test because of large 
pauses in jenkins
+      config.LOCALITY_WAIT.key -> "30s"
+    )
+
+    val preferredLocation = Seq(ExecutorCacheTaskLocation("host0", 
"executor0"))
+    val taskSet1 = FakeTask.createTaskSet(numTasks = 1, stageId = 0, 
stageAttemptId = 0,
+      preferredLocation)
+    taskScheduler.submitTasks(taskSet1)
+
+    val tsm = stageToMockTaskSetManager(0)
+
+    // submit an offer with one executor
+    var taskAttempts = taskScheduler.resourceOffers(IndexedSeq(
+      WorkerOffer("executor0", "host0", 1)
+    )).flatten
+
+    // Fail the running task
+    val failedTask = taskAttempts.head
+    taskScheduler.statusUpdate(failedTask.taskId, TaskState.FAILED, 
ByteBuffer.allocate(0))
+    tsm.handleFailedTask(failedTask.taskId, TaskState.FAILED, UnknownReason)
+    when(tsm.taskSetBlacklistHelperOpt.get.isExecutorBlacklistedForTask(
+      "executor0", failedTask.index)).thenReturn(true)
+
+    // make an offer but we won't schedule anything yet as scheduler locality 
is still PROCESS_LOCAL
+    assert(taskScheduler.resourceOffers(IndexedSeq(
+      WorkerOffer("executor1", "host0", 1)
+    )).flatten.isEmpty)
+
+    assert(taskScheduler.unschedulableTaskSetToExpiryTime.isEmpty)
+
+    assert(!tsm.isZombie)
+  }
+
   /**
    * Helper for performance tests.  Takes the explicitly blacklisted nodes and 
executors; verifies
    * that the blacklists are used efficiently to ensure scheduling is not 
O(numPendingTasks).

http://git-wip-us.apache.org/repos/asf/spark/blob/fdd3bace/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index 11ee7a9..f8937b0 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -1611,6 +1611,14 @@ Apart from these, the following properties are also 
available, and may be useful
   </td>
 </tr>
 <tr>
+  <td><code>spark.scheduler.blacklist.unschedulableTaskSetTimeout</code></td>
+  <td>120s</td>
+  <td>
+    The timeout in seconds to wait to acquire a new executor and schedule a 
task before aborting a
+    TaskSet which is unschedulable because of being completely blacklisted.
+  </td>
+</tr>
+<tr>
   <td><code>spark.blacklist.enabled</code></td>
   <td>
     false


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to