This is an automated email from the ASF dual-hosted git repository.

tgraves pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ad238a2  [SPARK-29976][CORE] Trigger speculation for stages with too 
few tasks
ad238a2 is described below

commit ad238a2238a9d0da89be4424574436cbfaee579d
Author: Yuchen Huo <yuchen....@databricks.com>
AuthorDate: Tue Dec 10 14:43:26 2019 -0600

    [SPARK-29976][CORE] Trigger speculation for stages with too few tasks
    
    ### What changes were proposed in this pull request?
    This PR add an optional spark conf for speculation to allow speculative 
runs for stages where there are only a few tasks.
    ```
    spark.speculation.task.duration.threshold
    ```
    
    If provided, tasks would be speculatively run if the TaskSet contains less 
tasks than the number of slots on a single executor and the task is taking 
longer time than the threshold.
    
    ### Why are the changes needed?
    This change helps avoid scenarios where there is single executor that could 
hang forever due to disk issue and we unfortunately assigned the single task in 
a TaskSet to that executor and cause the whole job to hang forever.
    
    ### Does this PR introduce any user-facing change?
    yes. If the new config `spark.speculation.task.duration.threshold` is 
provided and the TaskSet contains less tasks than the number of slots on a 
single executor and the task is taking longer time than the threshold, then 
speculative tasks would be submitted for the running tasks in the TaskSet.
    
    ### How was this patch tested?
    Unit tests are added to TaskSetManagerSuite.
    
    Closes #26614 from yuchenhuo/SPARK-29976.
    
    Authored-by: Yuchen Huo <yuchen....@databricks.com>
    Signed-off-by: Thomas Graves <tgra...@apache.org>
---
 .../org/apache/spark/internal/config/package.scala | 12 +++
 .../apache/spark/scheduler/TaskSetManager.scala    | 60 +++++++++----
 .../spark/scheduler/TaskSetManagerSuite.scala      | 98 ++++++++++++++++++++++
 docs/configuration.md                              | 13 +++
 4 files changed, 167 insertions(+), 16 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 25dc4c6..9d7b31a 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -1499,6 +1499,18 @@ package object config {
       .doubleConf
       .createWithDefault(0.75)
 
+  private[spark] val SPECULATION_TASK_DURATION_THRESHOLD =
+    ConfigBuilder("spark.speculation.task.duration.threshold")
+      .doc("Task duration after which scheduler would try to speculative run 
the task. If " +
+        "provided, tasks would be speculatively run if current stage contains 
less tasks " +
+        "than or equal to the number of slots on a single executor and the 
task is taking " +
+        "longer time than the threshold. This config helps speculate stage 
with very few " +
+        "tasks. Regular speculation configs may also apply if the executor 
slots are " +
+        "large enough. E.g. tasks might be re-launched if there are enough 
successful runs " +
+        "even though the threshold hasn't been reached.")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .createOptional
+
   private[spark] val STAGING_DIR = ConfigBuilder("spark.yarn.stagingDir")
     .doc("Staging directory used while submitting applications.")
     .stringConf
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 5c0bc49..e026e90 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -81,6 +81,13 @@ private[spark] class TaskSetManager(
   val speculationQuantile = conf.get(SPECULATION_QUANTILE)
   val speculationMultiplier = conf.get(SPECULATION_MULTIPLIER)
   val minFinishedForSpeculation = math.max((speculationQuantile * 
numTasks).floor.toInt, 1)
+  // User provided threshold for speculation regardless of whether the 
quantile has been reached
+  val speculationTaskDurationThresOpt = 
conf.get(SPECULATION_TASK_DURATION_THRESHOLD)
+  // SPARK-29976: Only when the total number of tasks in the stage is less 
than or equal to the
+  // number of slots on a single executor, would the task manager speculative 
run the tasks if
+  // their duration is longer than the given threshold. In this way, we 
wouldn't speculate too
+  // aggressively but still handle basic cases.
+  val speculationTasksLessEqToSlots = numTasks <= (conf.get(EXECUTOR_CORES) / 
sched.CPUS_PER_TASK)
 
   // For each task, tracks whether a copy of the task has succeeded. A task 
will also be
   // marked as "succeeded" if it failed with a fetch failure, in which case it 
should not
@@ -958,14 +965,40 @@ private[spark] class TaskSetManager(
   }
 
   /**
+   * Check if the task associated with the given tid has past the time 
threshold and should be
+   * speculative run.
+   */
+  private def checkAndSubmitSpeculatableTask(
+      tid: Long,
+      currentTimeMillis: Long,
+      threshold: Double): Boolean = {
+    val info = taskInfos(tid)
+    val index = info.index
+    if (!successful(index) && copiesRunning(index) == 1 &&
+        info.timeRunning(currentTimeMillis) > threshold && 
!speculatableTasks.contains(index)) {
+      addPendingTask(index, speculatable = true)
+      logInfo(
+        ("Marking task %d in stage %s (on %s) as speculatable because it ran 
more" +
+          " than %.0f ms(%d speculatable tasks in this taskset now)")
+          .format(index, taskSet.id, info.host, threshold, 
speculatableTasks.size + 1))
+      speculatableTasks += index
+      sched.dagScheduler.speculativeTaskSubmitted(tasks(index))
+      true
+    } else {
+      false
+    }
+  }
+
+  /**
    * Check for tasks to be speculated and return true if there are any. This 
is called periodically
    * by the TaskScheduler.
    *
    */
   override def checkSpeculatableTasks(minTimeToSpeculation: Int): Boolean = {
-    // Can't speculate if we only have one task, and no need to speculate if 
the task set is a
-    // zombie or is from a barrier stage.
-    if (isZombie || isBarrier || numTasks == 1) {
+    // No need to speculate if the task set is zombie or is from a barrier 
stage. If there is only
+    // one task we don't speculate since we don't have metrics to decide 
whether it's taking too
+    // long or not, unless a task duration threshold is explicitly provided.
+    if (isZombie || isBarrier || (numTasks == 1 && 
!speculationTaskDurationThresOpt.isDefined)) {
       return false
     }
     var foundTasks = false
@@ -983,19 +1016,14 @@ private[spark] class TaskSetManager(
       // bound based on that.
       logDebug("Task length threshold for speculation: " + threshold)
       for (tid <- runningTasksSet) {
-        val info = taskInfos(tid)
-        val index = info.index
-        if (!successful(index) && copiesRunning(index) == 1 && 
info.timeRunning(time) > threshold &&
-            !speculatableTasks.contains(index)) {
-          addPendingTask(index, speculatable = true)
-          logInfo(
-            ("Marking task %d in stage %s (on %s) as speculatable because it 
ran more" +
-            " than %.0f ms(%d speculatable tasks in this taskset now)")
-            .format(index, taskSet.id, info.host, threshold, 
speculatableTasks.size + 1))
-          speculatableTasks += index
-          sched.dagScheduler.speculativeTaskSubmitted(tasks(index))
-          foundTasks = true
-        }
+        foundTasks |= checkAndSubmitSpeculatableTask(tid, time, threshold)
+      }
+    } else if (speculationTaskDurationThresOpt.isDefined && 
speculationTasksLessEqToSlots) {
+      val time = clock.getTimeMillis()
+      val threshold = speculationTaskDurationThresOpt.get
+      logDebug(s"Tasks taking longer time than provided speculation threshold: 
$threshold")
+      for (tid <- runningTasksSet) {
+        foundTasks |= checkAndSubmitSpeculatableTask(tid, time, threshold)
       }
     }
     foundTasks
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index 34bcae8..1d64832 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -1778,6 +1778,104 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
     assert(manager.resourceOffer("exec1", "host1", ANY).isEmpty)
   }
 
+  private def testSpeculationDurationSetup(
+      speculationThresholdOpt: Option[String],
+      speculationQuantile: Double,
+      numTasks: Int,
+      numSlots: Int): (TaskSetManager, ManualClock) = {
+    sc = new SparkContext("local", "test")
+    sc.conf.set(config.SPECULATION_ENABLED, true)
+    sc.conf.set(config.SPECULATION_QUANTILE.key, speculationQuantile.toString)
+    // Set the number of slots per executor
+    sc.conf.set(config.EXECUTOR_CORES.key, numSlots.toString)
+    sc.conf.set(config.CPUS_PER_TASK.key, "1")
+    if (speculationThresholdOpt.isDefined) {
+      sc.conf.set(config.SPECULATION_TASK_DURATION_THRESHOLD.key, 
speculationThresholdOpt.get)
+    }
+    sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
+    // Create a task set with the given number of tasks
+    val taskSet = FakeTask.createTaskSet(numTasks)
+    val clock = new ManualClock()
+    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock 
= clock)
+    manager.isZombie = false
+
+    // Offer resources for the task to start
+    for (i <- 1 to numTasks) {
+      manager.resourceOffer(s"exec$i", s"host$i", NO_PREF)
+    }
+    (manager, clock)
+  }
+
+  private def testSpeculationDurationThreshold(
+      speculationThresholdProvided: Boolean,
+      numTasks: Int,
+      numSlots: Int): Unit = {
+    val (manager, clock) = testSpeculationDurationSetup(
+      // Set the threshold to be 60 minutes
+      if (speculationThresholdProvided) Some("60min") else None,
+      // Set the quantile to be 1.0 so that regular speculation would not be 
triggered
+      1.0,
+      numTasks,
+      numSlots
+    )
+
+    // if the time threshold has not been exceeded, no speculative run should 
be triggered
+    clock.advance(1000*60*60)
+    assert(!manager.checkSpeculatableTasks(0))
+    assert(sched.speculativeTasks.size == 0)
+
+    // Now the task should have been running for 60 minutes and 1 second
+    clock.advance(1)
+    if (speculationThresholdProvided && numSlots >= numTasks) {
+      assert(manager.checkSpeculatableTasks(0))
+      assert(sched.speculativeTasks.size == numTasks)
+      // Should not submit duplicated tasks
+      assert(!manager.checkSpeculatableTasks(0))
+      assert(sched.speculativeTasks.size == numTasks)
+    } else {
+      // If the feature flag is turned off, or the stage contains too many 
tasks
+      assert(!manager.checkSpeculatableTasks(0))
+      assert(sched.speculativeTasks.size == 0)
+    }
+  }
+
+  Seq(1, 2).foreach { numTasks =>
+    test("SPARK-29976 when a speculation time threshold is provided, should 
speculative " +
+      s"run the task even if there are not enough successful runs, total 
tasks: $numTasks") {
+      testSpeculationDurationThreshold(true, numTasks, numTasks)
+    }
+
+    test("SPARK-29976: when the speculation time threshold is not provided," +
+      s"don't speculative run if there are not enough successful runs, total 
tasks: $numTasks") {
+      testSpeculationDurationThreshold(false, numTasks, numTasks)
+    }
+  }
+
+  test("SPARK-29976 when a speculation time threshold is provided, should not 
speculative " +
+      "if there are too many tasks in the stage even though time threshold is 
provided") {
+    testSpeculationDurationThreshold(true, 2, 1)
+  }
+
+  test("SPARK-29976 Regular speculation configs should still take effect even 
when a " +
+      "threshold is provided") {
+    val (manager, clock) = testSpeculationDurationSetup(
+      Some("60min"),
+      speculationQuantile = 0.5,
+      numTasks = 2,
+      numSlots = 2
+    )
+
+    // Task duration can't be 0, advance 1 sec
+    clock.advance(1000)
+    // Mark one of the task succeeded, which should satisfy the quantile
+    manager.handleSuccessfulTask(0, createTaskResult(0))
+    // Advance 1 more second so the remaining task takes longer than medium 
but doesn't satisfy the
+    // duration threshold yet
+    clock.advance(1000)
+    assert(manager.checkSpeculatableTasks(0))
+    assert(sched.speculativeTasks.size == 1)
+  }
+
   test("TaskOutputFileAlreadyExistException lead to task set abortion") {
     sc = new SparkContext("local", "test")
     sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
diff --git a/docs/configuration.md b/docs/configuration.md
index 8cd285c..9375896 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -2034,6 +2034,19 @@ Apart from these, the following properties are also 
available, and may be useful
   </td>
 </tr>
 <tr>
+  <td><code>spark.speculation.task.duration.threshold</code></td>
+  <td>None</td>
+  <td>
+    Task duration after which scheduler would try to speculative run the task. 
If provided, tasks
+    would be speculatively run if current stage contains less tasks than or 
equal to the number of
+    slots on a single executor and the task is taking longer time than the 
threshold. This config
+    helps speculate stage with very few tasks. Regular speculation configs may 
also apply if the
+    executor slots are large enough. E.g. tasks might be re-launched if there 
are enough successful
+    runs even though the threshold hasn't been reached.
+    Default unit is bytes, unless otherwise specified.
+  </td>
+</tr>
+<tr>
   <td><code>spark.task.cpus</code></td>
   <td>1</td>
   <td>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to