This is an automated email from the ASF dual-hosted git repository.

tgraves pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new bd5039f  [SPARK-33741][CORE] Add min threshold time speculation config
bd5039f is described below

commit bd5039fc3542dc4ac96bea28639f1896d7919388
Author: schintap <schin...@verizonmedia.com>
AuthorDate: Wed Jan 13 08:57:56 2021 -0600

    [SPARK-33741][CORE] Add min threshold time speculation config
    
    ### What changes were proposed in this pull request?
    Add min threshold time speculation config
    
    ### Why are the changes needed?
    When we turn on speculation with default configs we have the last 10% of 
the tasks subject to speculation. There are a lot of stages where the stage 
runs for few seconds to minutes. Also in general we don't want to speculate 
tasks that run within a minimum threshold. By setting a minimum threshold for 
speculation config gives us better control for speculative tasks
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Unit test
    
    Closes #30710 from redsanket/SPARK-33741.
    
    Lead-authored-by: schintap <schin...@verizonmedia.com>
    Co-authored-by: Sanket Chintapalli <chintapalli.sanketre...@gmail.com>
    Signed-off-by: Thomas Graves <tgra...@apache.org>
---
 .../org/apache/spark/internal/config/package.scala |  8 +++++
 .../scala/org/apache/spark/scheduler/Pool.scala    |  2 +-
 .../org/apache/spark/scheduler/Schedulable.scala   |  2 +-
 .../apache/spark/scheduler/TaskSchedulerImpl.scala |  2 +-
 .../apache/spark/scheduler/TaskSetManager.scala    |  2 +-
 .../spark/scheduler/TaskSetManagerSuite.scala      | 37 ++++++++++++++++++++--
 docs/configuration.md                              |  9 ++++++
 7 files changed, 55 insertions(+), 7 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 84c6647..f962bc6 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -1889,6 +1889,14 @@ package object config {
       .doubleConf
       .createWithDefault(0.75)
 
+  private[spark] val SPECULATION_MIN_THRESHOLD =
+    ConfigBuilder("spark.speculation.min.threshold")
+      .doc("Minimum amount of time a task runs before being considered for 
speculation. " +
+        "This can be used to avoid launching speculative copies of tasks that 
are very short.")
+      .version("3.2.0")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .createWithDefault(100)
+
   private[spark] val SPECULATION_TASK_DURATION_THRESHOLD =
     ConfigBuilder("spark.speculation.task.duration.threshold")
       .doc("Task duration after which scheduler would try to speculative run 
the task. If " +
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala 
b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
index 7333b31..de4c9d3 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
@@ -94,7 +94,7 @@ private[spark] class Pool(
     schedulableQueue.asScala.foreach(_.executorDecommission(executorId))
   }
 
-  override def checkSpeculatableTasks(minTimeToSpeculation: Int): Boolean = {
+  override def checkSpeculatableTasks(minTimeToSpeculation: Long): Boolean = {
     var shouldRevive = false
     for (schedulable <- schedulableQueue.asScala) {
       shouldRevive |= schedulable.checkSpeculatableTasks(minTimeToSpeculation)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala 
b/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala
index 0626f8f..e549ce6 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala
@@ -45,6 +45,6 @@ private[spark] trait Schedulable {
   def getSchedulableByName(name: String): Schedulable
   def executorLost(executorId: String, host: String, reason: 
ExecutorLossReason): Unit
   def executorDecommission(executorId: String): Unit
-  def checkSpeculatableTasks(minTimeToSpeculation: Int): Boolean
+  def checkSpeculatableTasks(minTimeToSpeculation: Long): Boolean
   def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager]
 }
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index b939e40..71b8bc2 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -100,7 +100,7 @@ private[spark] class TaskSchedulerImpl(
   // Duplicate copies of a task will only be launched if the original copy has 
been running for
   // at least this amount of time. This is to avoid the overhead of launching 
speculative copies
   // of tasks that are very short.
-  val MIN_TIME_TO_SPECULATION = 100
+  val MIN_TIME_TO_SPECULATION = conf.get(SPECULATION_MIN_THRESHOLD)
 
   private val speculationScheduler =
     
ThreadUtils.newDaemonSingleThreadScheduledExecutor("task-scheduler-speculation")
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index ad0791f..a3819fe 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -1035,7 +1035,7 @@ private[spark] class TaskSetManager(
    * by the TaskScheduler.
    *
    */
-  override def checkSpeculatableTasks(minTimeToSpeculation: Int): Boolean = {
+  override def checkSpeculatableTasks(minTimeToSpeculation: Long): Boolean = {
     // No need to speculate if the task set is zombie or is from a barrier 
stage. If there is only
     // one task we don't speculate since we don't have metrics to decide 
whether it's taking too
     // long or not, unless a task duration threshold is explicitly provided.
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index 3bf6cc2..da281b1 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -1901,7 +1901,8 @@ class TaskSetManagerSuite
       speculationQuantile: Double,
       numTasks: Int,
       numExecutorCores: Int,
-      numCoresPerTask: Int): (TaskSetManager, ManualClock) = {
+      numCoresPerTask: Int,
+      speculationMinimumThreshold: Option[String]): (TaskSetManager, 
ManualClock) = {
     val conf = new SparkConf()
     conf.set(config.SPECULATION_ENABLED, true)
     conf.set(config.SPECULATION_QUANTILE.key, speculationQuantile.toString)
@@ -1911,6 +1912,9 @@ class TaskSetManagerSuite
     if (speculationThresholdOpt.isDefined) {
       conf.set(config.SPECULATION_TASK_DURATION_THRESHOLD.key, 
speculationThresholdOpt.get)
     }
+    if (speculationMinimumThreshold.isDefined) {
+      conf.set(config.SPECULATION_MIN_THRESHOLD.key, 
speculationMinimumThreshold.get)
+    }
     sc = new SparkContext("local", "test", conf)
     sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
     // Create a task set with the given number of tasks
@@ -1937,7 +1941,8 @@ class TaskSetManagerSuite
       speculationQuantile = 1.0,
       numTasks,
       numSlots,
-      numCoresPerTask = 1
+      numCoresPerTask = 1,
+      None
     )
 
     // if the time threshold has not been exceeded, no speculative run should 
be triggered
@@ -2091,7 +2096,8 @@ class TaskSetManagerSuite
       speculationQuantile = 0.5,
       numTasks = 2,
       numExecutorCores = 2,
-      numCoresPerTask = 1
+      numCoresPerTask = 1,
+      None
     )
 
     // Task duration can't be 0, advance 1 sec
@@ -2211,6 +2217,31 @@ class TaskSetManagerSuite
       assert(manager.invokePrivate(numFailures())(index1) === 1)
     }
   }
+
+  test("SPARK-33741 Test minimum amount of time a task runs " +
+    "before being considered for speculation") {
+    val (manager, clock) = testSpeculationDurationSetup(
+      None,
+      speculationQuantile = 0.5,
+      numTasks = 2,
+      numExecutorCores = 2,
+      numCoresPerTask = 1,
+      Some("3000") // spark.speculation.min.threshold
+    )
+    // Task duration can't be 0, advance 1 sec
+    clock.advance(1000)
+    // Mark one of the task succeeded, which should satisfy the quantile
+    manager.handleSuccessfulTask(0, createTaskResult(0))
+    // Advance 1 more second so the remaining task takes longer
+    clock.advance(1000)
+    manager.checkSpeculatableTasks(sched.MIN_TIME_TO_SPECULATION)
+    // The task is not considered as speculative task due to minimum threshold 
interval of 3s
+    assert(sched.speculativeTasks.size == 0)
+    clock.advance(2000)
+    manager.checkSpeculatableTasks(sched.MIN_TIME_TO_SPECULATION)
+    // After 3s have elapsed now the task is marked as speculative task
+    assert(sched.speculativeTasks.size == 1)
+  }
 }
 
 class FakeLongTasks(stageId: Int, partitionId: Int) extends FakeTask(stageId, 
partitionId) {
diff --git a/docs/configuration.md b/docs/configuration.md
index fe1fc3e..612d62a 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -2309,6 +2309,15 @@ Apart from these, the following properties are also 
available, and may be useful
   <td>0.6.0</td>
 </tr>
 <tr>
+  <td><code>spark.speculation.min.threshold</code></td>
+  <td>100ms</td>
+  <td>
+    Minimum amount of time a task runs before being considered for speculation.
+    This can be used to avoid launching speculative copies of tasks that are 
very short.
+  </td>
+  <td>3.2.0</td>
+</tr>
+<tr>
   <td><code>spark.speculation.task.duration.threshold</code></td>
   <td>None</td>
   <td>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to