This is an automated email from the ASF dual-hosted git repository.

holden pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 0678afe  [SPARK-21040][CORE] Speculate tasks which are running on 
decommission executors
0678afe is described below

commit 0678afe393b1e4f65b70470483fe0cdb1fe139dc
Author: Prakhar Jain <prakharjai...@gmail.com>
AuthorDate: Fri Jul 17 16:11:02 2020 -0700

    [SPARK-21040][CORE] Speculate tasks which are running on decommission 
executors
    
    ### What changes were proposed in this pull request?
    This PR adds functionality to consider the running tasks on decommission 
executors based on some config.
    In spark-on-cloud , we sometimes already know that an executor won't be 
alive for more than fix amount of time. Ex- In AWS Spot nodes, once we get the 
notification, we know that a node will be gone in 120 seconds.
    So if the running tasks on the decommissioning executors may run beyond 
currentTime+120 seconds, then they are candidate for speculation.
    
    ### Why are the changes needed?
    Currently when an executor is decommission, we stop scheduling new tasks on 
those executors but the already running tasks keeps on running on them. Based 
on the cloud, we might know beforehand that an executor won't be alive for more 
than a preconfigured time. Different cloud providers gives different timeouts 
before they take away the nodes. For Ex- In case of AWS spot nodes, an executor 
won't be alive for more than 120 seconds. We can utilize this information in 
cloud environments a [...]
    
    ### Does this PR introduce _any_ user-facing change?
    Yes. This PR adds a new config "spark.executor.decommission.killInterval" 
which they can explicitly set based on the cloud environment where they are 
running.
    
    ### How was this patch tested?
    Added UT.
    
    Closes #28619 from 
prakharjain09/SPARK-21040-speculate-decommission-exec-tasks.
    
    Authored-by: Prakhar Jain <prakharjai...@gmail.com>
    Signed-off-by: Holden Karau <hka...@apple.com>
---
 .../org/apache/spark/internal/config/package.scala |  11 +++
 .../apache/spark/scheduler/TaskSetManager.scala    |  28 +++++-
 .../spark/scheduler/TaskSetManagerSuite.scala      | 106 +++++++++++++++++++++
 3 files changed, 141 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index ca75a19..f0b292b 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -1843,6 +1843,17 @@ package object config {
       .timeConf(TimeUnit.MILLISECONDS)
       .createOptional
 
+  private[spark] val EXECUTOR_DECOMMISSION_KILL_INTERVAL =
+    ConfigBuilder("spark.executor.decommission.killInterval")
+      .doc("Duration after which a decommissioned executor will be killed 
forcefully." +
+        "This config is useful for cloud environments where we know in advance 
when " +
+        "an executor is going to go down after decommissioning signal i.e. 
around 2 mins " +
+        "in aws spot nodes, 1/2 hrs in spot block nodes etc. This config is 
currently " +
+        "used to decide what tasks running on decommission executors to 
speculate.")
+      .version("3.1.0")
+      .timeConf(TimeUnit.SECONDS)
+      .createOptional
+
   private[spark] val STAGING_DIR = ConfigBuilder("spark.yarn.stagingDir")
     .doc("Staging directory used while submitting applications.")
     .version("2.0.0")
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index a302f68..4b31ff0 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -19,7 +19,7 @@ package org.apache.spark.scheduler
 
 import java.io.NotSerializableException
 import java.nio.ByteBuffer
-import java.util.concurrent.ConcurrentLinkedQueue
+import java.util.concurrent.{ConcurrentLinkedQueue, TimeUnit}
 
 import scala.collection.immutable.Map
 import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
@@ -102,6 +102,8 @@ private[spark] class TaskSetManager(
     }
     numTasks <= slots
   }
+  val executorDecommissionKillInterval = 
conf.get(EXECUTOR_DECOMMISSION_KILL_INTERVAL).map(
+    TimeUnit.SECONDS.toMillis)
 
   // For each task, tracks whether a copy of the task has succeeded. A task 
will also be
   // marked as "succeeded" if it failed with a fetch failure, in which case it 
should not
@@ -165,6 +167,7 @@ private[spark] class TaskSetManager(
 
   // Task index, start and finish time for each task attempt (indexed by task 
ID)
   private[scheduler] val taskInfos = new HashMap[Long, TaskInfo]
+  private[scheduler] val tidToExecutorKillTimeMapping = new HashMap[Long, Long]
 
   // Use a MedianHeap to record durations of successful tasks so we know when 
to launch
   // speculative tasks. This is only used when speculation is enabled, to 
avoid the overhead
@@ -933,6 +936,7 @@ private[spark] class TaskSetManager(
 
   /** If the given task ID is in the set of running tasks, removes it. */
   def removeRunningTask(tid: Long): Unit = {
+    tidToExecutorKillTimeMapping.remove(tid)
     if (runningTasksSet.remove(tid) && parent != null) {
       parent.decreaseRunningTasks(1)
     }
@@ -1042,7 +1046,19 @@ private[spark] class TaskSetManager(
       // bound based on that.
       logDebug("Task length threshold for speculation: " + threshold)
       for (tid <- runningTasksSet) {
-        foundTasks |= checkAndSubmitSpeculatableTask(tid, time, threshold)
+        var speculated = checkAndSubmitSpeculatableTask(tid, time, threshold)
+        if (!speculated && tidToExecutorKillTimeMapping.contains(tid)) {
+          // Check whether this task will finish before the exectorKillTime 
assuming
+          // it will take medianDuration overall. If this task cannot finish 
within
+          // executorKillInterval, then this task is a candidate for 
speculation
+          val taskEndTimeBasedOnMedianDuration = taskInfos(tid).launchTime + 
medianDuration
+          val canExceedDeadline = tidToExecutorKillTimeMapping(tid) <
+            taskEndTimeBasedOnMedianDuration
+          if (canExceedDeadline) {
+            speculated = checkAndSubmitSpeculatableTask(tid, time, 0)
+          }
+        }
+        foundTasks |= speculated
       }
     } else if (speculationTaskDurationThresOpt.isDefined && 
speculationTasksLessEqToSlots) {
       val time = clock.getTimeMillis()
@@ -1100,8 +1116,12 @@ private[spark] class TaskSetManager(
 
   def executorDecommission(execId: String): Unit = {
     recomputeLocality()
-    // Future consideration: if an executor is decommissioned it may make 
sense to add the current
-    // tasks to the spec exec queue.
+    executorDecommissionKillInterval.foreach { interval =>
+      val executorKillTime = clock.getTimeMillis() + interval
+      runningTasksSet.filter(taskInfos(_).executorId == execId).foreach { tid 
=>
+        tidToExecutorKillTimeMapping(tid) = executorKillTime
+      }
+    }
   }
 
   def recomputeLocality(): Unit = {
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index 95c8197..ae51b55 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -1911,6 +1911,112 @@ class TaskSetManagerSuite
     testSpeculationDurationThreshold(true, 2, 1)
   }
 
+  test("SPARK-21040: Check speculative tasks are launched when an executor is 
decommissioned" +
+    " and the tasks running on it cannot finish within 
EXECUTOR_DECOMMISSION_KILL_INTERVAL") {
+    sc = new SparkContext("local", "test")
+    sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"), 
("exec3", "host3"))
+    val taskSet = FakeTask.createTaskSet(4)
+    sc.conf.set(config.SPECULATION_ENABLED, true)
+    sc.conf.set(config.SPECULATION_MULTIPLIER, 1.5)
+    sc.conf.set(config.SPECULATION_QUANTILE, 0.5)
+    sc.conf.set(config.EXECUTOR_DECOMMISSION_KILL_INTERVAL.key, "5s")
+    val clock = new ManualClock()
+    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock 
= clock)
+    val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = 
taskSet.tasks.map { task =>
+      task.metrics.internalAccums
+    }
+
+    // Start TASK 0,1 on exec1, TASK 2 on exec2
+    (0 until 2).foreach { _ =>
+      val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)._1
+      assert(taskOption.isDefined)
+      assert(taskOption.get.executorId === "exec1")
+    }
+    val taskOption2 = manager.resourceOffer("exec2", "host2", NO_PREF)._1
+    assert(taskOption2.isDefined)
+    assert(taskOption2.get.executorId === "exec2")
+
+    clock.advance(6*1000) // time = 6s
+    // Start TASK 3 on exec2 after some delay
+    val taskOption3 = manager.resourceOffer("exec2", "host2", NO_PREF)._1
+    assert(taskOption3.isDefined)
+    assert(taskOption3.get.executorId === "exec2")
+
+    assert(sched.startedTasks.toSet === Set(0, 1, 2, 3))
+
+    clock.advance(4*1000) // time = 10s
+    // Complete the first 2 tasks and leave the other 2 tasks in running
+    for (id <- Set(0, 1)) {
+      manager.handleSuccessfulTask(id, createTaskResult(id, 
accumUpdatesByTask(id)))
+      assert(sched.endedTasks(id) === Success)
+    }
+
+    // checkSpeculatableTasks checks that the task runtime is greater than the 
threshold for
+    // speculating. Since we use a SPECULATION_MULTIPLIER of 1.5, So tasks 
need to be running for
+    // > 15s for speculation
+    assert(!manager.checkSpeculatableTasks(0))
+    assert(sched.speculativeTasks.toSet === Set())
+
+    // decommission exec-2. All tasks running on exec-2 (i.e. TASK 2,3)  will 
be added to
+    // executorDecommissionSpeculationTriggerTimeoutOpt
+    // (TASK 2 -> 15, TASK 3 -> 15)
+    manager.executorDecommission("exec2")
+    assert(manager.tidToExecutorKillTimeMapping.keySet === Set(2, 3))
+    assert(manager.tidToExecutorKillTimeMapping(2) === 15*1000)
+    assert(manager.tidToExecutorKillTimeMapping(3) === 15*1000)
+
+    assert(manager.checkSpeculatableTasks(0))
+    // TASK 2 started at t=0s, so it can still finish before t=15s (Median 
task runtime = 10s)
+    // TASK 3 started at t=6s, so it might not finish before t=15s. So TASK 3 
should be part
+    // of speculativeTasks
+    assert(sched.speculativeTasks.toSet === Set(3))
+    assert(manager.copiesRunning(3) === 1)
+
+    // Offer resource to start the speculative attempt for the running task
+    val taskOption3New = manager.resourceOffer("exec3", "host3", NO_PREF)._1
+    // Offer more resources. Nothing should get scheduled now.
+    assert(manager.resourceOffer("exec3", "host3", NO_PREF)._1.isEmpty)
+    assert(taskOption3New.isDefined)
+
+    // Assert info about the newly launched speculative task
+    val speculativeTask3 = taskOption3New.get
+    assert(speculativeTask3.index === 3)
+    assert(speculativeTask3.taskId === 4)
+    assert(speculativeTask3.executorId === "exec3")
+    assert(speculativeTask3.attemptNumber === 1)
+
+    clock.advance(1*1000) // time = 11s
+    // Running checkSpeculatableTasks again should return false
+    assert(!manager.checkSpeculatableTasks(0))
+    assert(manager.copiesRunning(2) === 1)
+    assert(manager.copiesRunning(3) === 2)
+
+    clock.advance(5*1000) // time = 16s
+    // At t=16s, TASK 2 has been running for 16s. It is more than the
+    // SPECULATION_MULTIPLIER * medianRuntime = 1.5 * 10 = 15s. So now TASK 2 
will
+    // be selected for speculation. Here we are verifying that regular 
speculation configs
+    // should still take effect even when a 
EXECUTOR_DECOMMISSION_KILL_INTERVAL is provided and
+    // corresponding executor is decommissioned
+    assert(manager.checkSpeculatableTasks(0))
+    assert(sched.speculativeTasks.toSet === Set(2, 3))
+    assert(manager.copiesRunning(2) === 1)
+    assert(manager.copiesRunning(3) === 2)
+    val taskOption2New = manager.resourceOffer("exec3", "host3", NO_PREF)._1
+    assert(taskOption2New.isDefined)
+    val speculativeTask2 = taskOption2New.get
+    // Ensure that TASK 2 is re-launched on exec3, host3
+    assert(speculativeTask2.index === 2)
+    assert(speculativeTask2.taskId === 5)
+    assert(speculativeTask2.executorId === "exec3")
+    assert(speculativeTask2.attemptNumber === 1)
+
+    assert(manager.copiesRunning(2) === 2)
+    assert(manager.copiesRunning(3) === 2)
+
+    // Offering additional resources should not lead to any speculative tasks 
being respawned
+    assert(manager.resourceOffer("exec1", "host1", ANY)._1.isEmpty)
+  }
+
   test("SPARK-29976 Regular speculation configs should still take effect even 
when a " +
       "threshold is provided") {
     val (manager, clock) = testSpeculationDurationSetup(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to