This is an automated email from the ASF dual-hosted git repository.

irashid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 70910e6  [SPARK-26755][SCHEDULER] : Optimize Spark Scheduler to 
dequeue speculative tasks…
70910e6 is described below

commit 70910e6ad00f0de4075217d5305d87a477ff1dc4
Author: pgandhi <pgan...@verizonmedia.com>
AuthorDate: Tue Jul 30 09:54:51 2019 -0500

    [SPARK-26755][SCHEDULER] : Optimize Spark Scheduler to dequeue speculative 
tasks…
    
    … more efficiently
    
    This PR improves the performance of scheduling speculative tasks to be O(1) 
instead of O(numSpeculativeTasks), using the same approach used for scheduling 
regular tasks. The performance of this method is particularly important because 
a lock is held on the TaskSchedulerImpl which is a bottleneck for all 
scheduling operations. We ran a Join query on a large dataset with speculation 
enabled and out of 100000 tasks for the ShuffleMapStage, the maximum number of 
speculatable tasks that wa [...]
    
    In particular, this works by storing a separate stack of tasks by executor, 
node, and rack locality preferences. Then when trying to schedule a speculative 
task, rather than scanning all speculative tasks to find ones which match the 
given executor (or node, or rack) preference, we can jump to a quick check of 
tasks matching the resource offer. This technique was already used for regular 
tasks -- this change refactors the code to allow sharing with regular and 
speculative task execution.
    
    ## What changes were proposed in this pull request?
    
    Have split the main queue "speculatableTasks" into 5 separate queues based 
on locality preference similar to how normal tasks are enqueued. Thus, the 
"dequeueSpeculativeTask" method will avoid performing locality checks for each 
task at runtime and simply return the preferable task to be executed.
    
    ## How was this patch tested?
    We ran a spark job that performed a join on a 10 TB dataset to test the 
code change.
    Original Code:
    <img width="1433" alt="screen shot 2019-01-28 at 5 07 22 pm" 
src="https://user-images.githubusercontent.com/22228190/51873321-572df280-2322-11e9-9149-0aae08d5edc6.png";>
    
    Optimized Code:
    <img width="1435" alt="screen shot 2019-01-28 at 5 08 19 pm" 
src="https://user-images.githubusercontent.com/22228190/51873343-6745d200-2322-11e9-947b-2cfd0f06bcab.png";>
    
    As you can see, the run time of the ShuffleMapStage came down from 40 min 
to 6 min approximately, thus, reducing the overall running time of the spark 
job by a significant amount.
    
    Another example for the same job:
    
    Original Code:
    <img width="1440" alt="screen shot 2019-01-28 at 5 11 30 pm" 
src="https://user-images.githubusercontent.com/22228190/51873355-70cf3a00-2322-11e9-9c3a-af035449a306.png";>
    
    Optimized Code:
    <img width="1440" alt="screen shot 2019-01-28 at 5 12 16 pm" 
src="https://user-images.githubusercontent.com/22228190/51873367-7dec2900-2322-11e9-8d07-1b1b49285f71.png";>
    
    Closes #23677 from pgandhi999/SPARK-26755.
    
    Lead-authored-by: pgandhi <pgan...@verizonmedia.com>
    Co-authored-by: pgandhi <pgan...@oath.com>
    Signed-off-by: Imran Rashid <iras...@cloudera.com>
---
 .../apache/spark/scheduler/TaskSetManager.scala    | 292 ++++++++-------------
 .../scheduler/OutputCommitCoordinatorSuite.scala   |  12 +-
 .../spark/scheduler/TaskSetManagerSuite.scala      | 122 ++++++++-
 3 files changed, 242 insertions(+), 184 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index e7645fc..79a1afc 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -131,37 +131,17 @@ private[spark] class TaskSetManager(
   // same time for a barrier stage.
   private[scheduler] def isBarrier = taskSet.tasks.nonEmpty && 
taskSet.tasks(0).isBarrier
 
-  // Set of pending tasks for each executor. These collections are actually
-  // treated as stacks, in which new tasks are added to the end of the
-  // ArrayBuffer and removed from the end. This makes it faster to detect
-  // tasks that repeatedly fail because whenever a task failed, it is put
-  // back at the head of the stack. These collections may contain duplicates
-  // for two reasons:
-  // (1): Tasks are only removed lazily; when a task is launched, it remains
-  // in all the pending lists except the one that it was launched from.
-  // (2): Tasks may be re-added to these lists multiple times as a result
-  // of failures.
-  // Duplicates are handled in dequeueTaskFromList, which ensures that a
-  // task hasn't already started running before launching it.
-  private val pendingTasksForExecutor = new HashMap[String, ArrayBuffer[Int]]
-
-  // Set of pending tasks for each host. Similar to pendingTasksForExecutor,
-  // but at host level.
-  private val pendingTasksForHost = new HashMap[String, ArrayBuffer[Int]]
-
-  // Set of pending tasks for each rack -- similar to the above.
-  private val pendingTasksForRack = new HashMap[String, ArrayBuffer[Int]]
-
-  // Set containing pending tasks with no locality preferences.
-  private[scheduler] var pendingTasksWithNoPrefs = new ArrayBuffer[Int]
-
-  // Set containing all pending tasks (also used as a stack, as above).
-  private val allPendingTasks = new ArrayBuffer[Int]
+  // Store tasks waiting to be scheduled by locality preferences
+  private[scheduler] val pendingTasks = new PendingTasksByLocality()
 
   // Tasks that can be speculated. Since these will be a small fraction of 
total
-  // tasks, we'll just hold them in a HashSet.
+  // tasks, we'll just hold them in a HashSet. The HashSet here ensures that 
we do not add
+  // duplicate speculatable tasks.
   private[scheduler] val speculatableTasks = new HashSet[Int]
 
+  // Store speculatable tasks by locality preferences
+  private[scheduler] val pendingSpeculatableTasks = new 
PendingTasksByLocality()
+
   // Task index, start and finish time for each task attempt (indexed by task 
ID)
   private[scheduler] val taskInfos = new HashMap[Long, TaskInfo]
 
@@ -197,11 +177,11 @@ private[spark] class TaskSetManager(
       }
       // Resolve the rack for each host. This can be slow, so de-dupe the list 
of hosts,
       // and assign the rack to all relevant task indices.
-      val (hosts, indicesForHosts) = pendingTasksForHost.toSeq.unzip
+      val (hosts, indicesForHosts) = pendingTasks.forHost.toSeq.unzip
       val racks = sched.getRacksForHosts(hosts)
       racks.zip(indicesForHosts).foreach {
         case (Some(rack), indices) =>
-          pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer) ++= 
indices
+          pendingTasks.forRack.getOrElseUpdate(rack, new ArrayBuffer) ++= 
indices
         case (None, _) => // no rack, nothing to do
       }
     }
@@ -234,63 +214,41 @@ private[spark] class TaskSetManager(
   /** Add a task to all the pending-task lists that it should be on. */
   private[spark] def addPendingTask(
       index: Int,
-      resolveRacks: Boolean = true): Unit = {
+      resolveRacks: Boolean = true,
+      speculatable: Boolean = false): Unit = {
+    val pendingTaskSetToAddTo = if (speculatable) pendingSpeculatableTasks 
else pendingTasks
     for (loc <- tasks(index).preferredLocations) {
       loc match {
         case e: ExecutorCacheTaskLocation =>
-          pendingTasksForExecutor.getOrElseUpdate(e.executorId, new 
ArrayBuffer) += index
+          pendingTaskSetToAddTo.forExecutor.getOrElseUpdate(e.executorId, new 
ArrayBuffer) += index
         case e: HDFSCacheTaskLocation =>
           val exe = sched.getExecutorsAliveOnHost(loc.host)
           exe match {
             case Some(set) =>
               for (e <- set) {
-                pendingTasksForExecutor.getOrElseUpdate(e, new ArrayBuffer) += 
index
+                pendingTaskSetToAddTo.forExecutor.getOrElseUpdate(e, new 
ArrayBuffer) += index
               }
               logInfo(s"Pending task $index has a cached location at ${e.host} 
" +
                 ", where there are executors " + set.mkString(","))
             case None => logDebug(s"Pending task $index has a cached location 
at ${e.host} " +
-                ", but there are no executors alive there.")
+              ", but there are no executors alive there.")
           }
         case _ =>
       }
-      pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer) += index
+      pendingTaskSetToAddTo.forHost.getOrElseUpdate(loc.host, new ArrayBuffer) 
+= index
 
       if (resolveRacks) {
         sched.getRackForHost(loc.host).foreach { rack =>
-          pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer) += index
+          pendingTaskSetToAddTo.forRack.getOrElseUpdate(rack, new ArrayBuffer) 
+= index
         }
       }
     }
 
     if (tasks(index).preferredLocations == Nil) {
-      pendingTasksWithNoPrefs += index
+      pendingTaskSetToAddTo.noPrefs += index
     }
 
-    allPendingTasks += index  // No point scanning this whole list to find the 
old task there
-  }
-
-  /**
-   * Return the pending tasks list for a given executor ID, or an empty list if
-   * there is no map entry for that host
-   */
-  private def getPendingTasksForExecutor(executorId: String): ArrayBuffer[Int] 
= {
-    pendingTasksForExecutor.getOrElse(executorId, ArrayBuffer())
-  }
-
-  /**
-   * Return the pending tasks list for a given host, or an empty list if
-   * there is no map entry for that host
-   */
-  private def getPendingTasksForHost(host: String): ArrayBuffer[Int] = {
-    pendingTasksForHost.getOrElse(host, ArrayBuffer())
-  }
-
-  /**
-   * Return the pending rack-local task list for a given rack, or an empty 
list if
-   * there is no map entry for that rack
-   */
-  private def getPendingTasksForRack(rack: String): ArrayBuffer[Int] = {
-    pendingTasksForRack.getOrElse(rack, ArrayBuffer())
+    pendingTaskSetToAddTo.all += index
   }
 
   /**
@@ -302,16 +260,24 @@ private[spark] class TaskSetManager(
   private def dequeueTaskFromList(
       execId: String,
       host: String,
-      list: ArrayBuffer[Int]): Option[Int] = {
+      list: ArrayBuffer[Int],
+      speculative: Boolean = false): Option[Int] = {
     var indexOffset = list.size
     while (indexOffset > 0) {
       indexOffset -= 1
       val index = list(indexOffset)
-      if (!isTaskBlacklistedOnExecOrNode(index, execId, host)) {
+      if (!isTaskBlacklistedOnExecOrNode(index, execId, host) &&
+          !(speculative && hasAttemptOnHost(index, host))) {
         // This should almost always be list.trimEnd(1) to remove tail
         list.remove(indexOffset)
-        if (copiesRunning(index) == 0 && !successful(index)) {
-          return Some(index)
+        // Speculatable task should only be launched when at most one copy of 
the
+        // original task is running
+        if (!successful(index)) {
+          if (copiesRunning(index) == 0) {
+            return Some(index)
+          } else if (speculative && copiesRunning(index) == 1) {
+            return Some(index)
+          }
         }
       }
     }
@@ -331,127 +297,70 @@ private[spark] class TaskSetManager(
   }
 
   /**
-   * Return a speculative task for a given executor if any are available. The 
task should not have
-   * an attempt running on this host, in case the host is slow. In addition, 
the task should meet
-   * the given locality constraint.
+   * Dequeue a pending task for a given node and return its index and locality 
level.
+   * Only search for tasks matching the given locality constraint.
+   *
+   * @return An option containing (task index within the task set, locality, 
is speculative?)
    */
-  // Labeled as protected to allow tests to override providing speculative 
tasks if necessary
-  protected def dequeueSpeculativeTask(execId: String, host: String, locality: 
TaskLocality.Value)
-    : Option[(Int, TaskLocality.Value)] =
-  {
-    speculatableTasks.retain(index => !successful(index)) // Remove finished 
tasks from set
+  private def dequeueTask(
+      execId: String,
+      host: String,
+      maxLocality: TaskLocality.Value): Option[(Int, TaskLocality.Value, 
Boolean)] = {
+    // Tries to schedule a regular task first; if it returns None, then 
schedules
+    // a speculative task
+    dequeueTaskHelper(execId, host, maxLocality, false).orElse(
+      dequeueTaskHelper(execId, host, maxLocality, true))
+  }
 
-    def canRunOnHost(index: Int): Boolean = {
-      !hasAttemptOnHost(index, host) &&
-        !isTaskBlacklistedOnExecOrNode(index, execId, host)
+  protected def dequeueTaskHelper(
+      execId: String,
+      host: String,
+      maxLocality: TaskLocality.Value,
+      speculative: Boolean): Option[(Int, TaskLocality.Value, Boolean)] = {
+    if (speculative && speculatableTasks.isEmpty) {
+      return None
     }
-
-    if (!speculatableTasks.isEmpty) {
-      // Check for process-local tasks; note that tasks can be process-local
-      // on multiple nodes when we replicate cached blocks, as in Spark 
Streaming
-      for (index <- speculatableTasks if canRunOnHost(index)) {
-        val prefs = tasks(index).preferredLocations
-        val executors = prefs.flatMap(_ match {
-          case e: ExecutorCacheTaskLocation => Some(e.executorId)
-          case _ => None
-        })
-        if (executors.contains(execId)) {
-          speculatableTasks -= index
-          return Some((index, TaskLocality.PROCESS_LOCAL))
-        }
-      }
-
-      // Check for node-local tasks
-      if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) {
-        for (index <- speculatableTasks if canRunOnHost(index)) {
-          val locations = tasks(index).preferredLocations.map(_.host)
-          if (locations.contains(host)) {
-            speculatableTasks -= index
-            return Some((index, TaskLocality.NODE_LOCAL))
-          }
-        }
-      }
-
-      // Check for no-preference tasks
-      if (TaskLocality.isAllowed(locality, TaskLocality.NO_PREF)) {
-        for (index <- speculatableTasks if canRunOnHost(index)) {
-          val locations = tasks(index).preferredLocations
-          if (locations.size == 0) {
-            speculatableTasks -= index
-            return Some((index, TaskLocality.PROCESS_LOCAL))
-          }
-        }
-      }
-
-      // Check for rack-local tasks
-      if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) {
-        for (rack <- sched.getRackForHost(host)) {
-          for (index <- speculatableTasks if canRunOnHost(index)) {
-            val racks = 
tasks(index).preferredLocations.map(_.host).flatMap(sched.getRackForHost)
-            if (racks.contains(rack)) {
-              speculatableTasks -= index
-              return Some((index, TaskLocality.RACK_LOCAL))
-            }
-          }
-        }
-      }
-
-      // Check for non-local tasks
-      if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) {
-        for (index <- speculatableTasks if canRunOnHost(index)) {
-          speculatableTasks -= index
-          return Some((index, TaskLocality.ANY))
-        }
+    val pendingTaskSetToUse = if (speculative) pendingSpeculatableTasks else 
pendingTasks
+    def dequeue(list: ArrayBuffer[Int]): Option[Int] = {
+      val task = dequeueTaskFromList(execId, host, list, speculative)
+      if (speculative && task.isDefined) {
+        speculatableTasks -= task.get
       }
+      task
     }
 
-    None
-  }
-
-  /**
-   * Dequeue a pending task for a given node and return its index and locality 
level.
-   * Only search for tasks matching the given locality constraint.
-   *
-   * @return An option containing (task index within the task set, locality, 
is speculative?)
-   */
-  private def dequeueTask(execId: String, host: String, maxLocality: 
TaskLocality.Value)
-    : Option[(Int, TaskLocality.Value, Boolean)] =
-  {
-    for (index <- dequeueTaskFromList(execId, host, 
getPendingTasksForExecutor(execId))) {
-      return Some((index, TaskLocality.PROCESS_LOCAL, false))
+    dequeue(pendingTaskSetToUse.forExecutor.getOrElse(execId, 
ArrayBuffer())).foreach { index =>
+      return Some((index, TaskLocality.PROCESS_LOCAL, speculative))
     }
 
     if (TaskLocality.isAllowed(maxLocality, TaskLocality.NODE_LOCAL)) {
-      for (index <- dequeueTaskFromList(execId, host, 
getPendingTasksForHost(host))) {
-        return Some((index, TaskLocality.NODE_LOCAL, false))
+      dequeue(pendingTaskSetToUse.forHost.getOrElse(host, 
ArrayBuffer())).foreach { index =>
+        return Some((index, TaskLocality.NODE_LOCAL, speculative))
       }
     }
 
+    // Look for noPref tasks after NODE_LOCAL for minimize cross-rack traffic
     if (TaskLocality.isAllowed(maxLocality, TaskLocality.NO_PREF)) {
-      // Look for noPref tasks after NODE_LOCAL for minimize cross-rack traffic
-      for (index <- dequeueTaskFromList(execId, host, 
pendingTasksWithNoPrefs)) {
-        return Some((index, TaskLocality.PROCESS_LOCAL, false))
+      dequeue(pendingTaskSetToUse.noPrefs).foreach { index =>
+        return Some((index, TaskLocality.PROCESS_LOCAL, speculative))
       }
     }
 
     if (TaskLocality.isAllowed(maxLocality, TaskLocality.RACK_LOCAL)) {
       for {
         rack <- sched.getRackForHost(host)
-        index <- dequeueTaskFromList(execId, host, 
getPendingTasksForRack(rack))
+        index <- dequeue(pendingTaskSetToUse.forRack.getOrElse(rack, 
ArrayBuffer()))
       } {
-        return Some((index, TaskLocality.RACK_LOCAL, false))
+        return Some((index, TaskLocality.RACK_LOCAL, speculative))
       }
     }
 
     if (TaskLocality.isAllowed(maxLocality, TaskLocality.ANY)) {
-      for (index <- dequeueTaskFromList(execId, host, allPendingTasks)) {
-        return Some((index, TaskLocality.ANY, false))
+      dequeue(pendingTaskSetToUse.all).foreach { index =>
+        return Some((index, TaskLocality.ANY, speculative))
       }
     }
-
-    // find a speculative task if all others tasks have been scheduled
-    dequeueSpeculativeTask(execId, host, maxLocality).map {
-      case (taskIndex, allowedLocality) => (taskIndex, allowedLocality, true)}
+    None
   }
 
   /**
@@ -616,10 +525,10 @@ private[spark] class TaskSetManager(
 
     while (currentLocalityIndex < myLocalityLevels.length - 1) {
       val moreTasks = myLocalityLevels(currentLocalityIndex) match {
-        case TaskLocality.PROCESS_LOCAL => 
moreTasksToRunIn(pendingTasksForExecutor)
-        case TaskLocality.NODE_LOCAL => moreTasksToRunIn(pendingTasksForHost)
-        case TaskLocality.NO_PREF => pendingTasksWithNoPrefs.nonEmpty
-        case TaskLocality.RACK_LOCAL => moreTasksToRunIn(pendingTasksForRack)
+        case TaskLocality.PROCESS_LOCAL => 
moreTasksToRunIn(pendingTasks.forExecutor)
+        case TaskLocality.NODE_LOCAL => moreTasksToRunIn(pendingTasks.forHost)
+        case TaskLocality.NO_PREF => pendingTasks.noPrefs.nonEmpty
+        case TaskLocality.RACK_LOCAL => moreTasksToRunIn(pendingTasks.forRack)
       }
       if (!moreTasks) {
         // This is a performance optimization: if there are no more tasks that 
can
@@ -686,13 +595,13 @@ private[spark] class TaskSetManager(
           // from each list, we may need to go deeper in the list.  We poll 
from the end because
           // failed tasks are put back at the end of allPendingTasks, so we're 
more likely to find
           // an unschedulable task this way.
-          val indexOffset = allPendingTasks.lastIndexWhere { indexInTaskSet =>
+          val indexOffset = pendingTasks.all.lastIndexWhere { indexInTaskSet =>
             copiesRunning(indexInTaskSet) == 0 && !successful(indexInTaskSet)
           }
           if (indexOffset == -1) {
             None
           } else {
-            Some(allPendingTasks(indexOffset))
+            Some(pendingTasks.all(indexOffset))
           }
         }
 
@@ -1064,10 +973,12 @@ private[spark] class TaskSetManager(
         val info = taskInfos(tid)
         val index = info.index
         if (!successful(index) && copiesRunning(index) == 1 && 
info.timeRunning(time) > threshold &&
-          !speculatableTasks.contains(index)) {
+            !speculatableTasks.contains(index)) {
+          addPendingTask(index, speculatable = true)
           logInfo(
-            "Marking task %d in stage %s (on %s) as speculatable because it 
ran more than %.0f ms"
-              .format(index, taskSet.id, info.host, threshold))
+            ("Marking task %d in stage %s (on %s) as speculatable because it 
ran more" +
+            " than %.0f ms(%d speculatable tasks in this taskset now)")
+            .format(index, taskSet.id, info.host, threshold, 
speculatableTasks.size + 1))
           speculatableTasks += index
           sched.dagScheduler.speculativeTaskSubmitted(tasks(index))
           foundTasks = true
@@ -1100,19 +1011,19 @@ private[spark] class TaskSetManager(
   private def computeValidLocalityLevels(): Array[TaskLocality.TaskLocality] = 
{
     import TaskLocality.{PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY}
     val levels = new ArrayBuffer[TaskLocality.TaskLocality]
-    if (!pendingTasksForExecutor.isEmpty &&
-        pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive(_))) {
+    if (!pendingTasks.forExecutor.isEmpty &&
+        pendingTasks.forExecutor.keySet.exists(sched.isExecutorAlive(_))) {
       levels += PROCESS_LOCAL
     }
-    if (!pendingTasksForHost.isEmpty &&
-        pendingTasksForHost.keySet.exists(sched.hasExecutorsAliveOnHost(_))) {
+    if (!pendingTasks.forHost.isEmpty &&
+        pendingTasks.forHost.keySet.exists(sched.hasExecutorsAliveOnHost(_))) {
       levels += NODE_LOCAL
     }
-    if (!pendingTasksWithNoPrefs.isEmpty) {
+    if (!pendingTasks.noPrefs.isEmpty) {
       levels += NO_PREF
     }
-    if (!pendingTasksForRack.isEmpty &&
-        pendingTasksForRack.keySet.exists(sched.hasHostAliveOnRack(_))) {
+    if (!pendingTasks.forRack.isEmpty &&
+        pendingTasks.forRack.keySet.exists(sched.hasHostAliveOnRack(_))) {
       levels += RACK_LOCAL
     }
     levels += ANY
@@ -1137,3 +1048,32 @@ private[spark] object TaskSetManager {
   // this.
   val TASK_SIZE_TO_WARN_KIB = 1000
 }
+
+/**
+ * Set of pending tasks for various levels of locality: executor, host, rack,
+ * noPrefs and anyPrefs. These collections are actually
+ * treated as stacks, in which new tasks are added to the end of the
+ * ArrayBuffer and removed from the end. This makes it faster to detect
+ * tasks that repeatedly fail because whenever a task failed, it is put
+ * back at the head of the stack. These collections may contain duplicates
+ * for two reasons:
+ * (1): Tasks are only removed lazily; when a task is launched, it remains
+ * in all the pending lists except the one that it was launched from.
+ * (2): Tasks may be re-added to these lists multiple times as a result
+ * of failures.
+ * Duplicates are handled in dequeueTaskFromList, which ensures that a
+ * task hasn't already started running before launching it.
+ */
+private[scheduler] class PendingTasksByLocality {
+
+  // Set of pending tasks for each executor.
+  val forExecutor = new HashMap[String, ArrayBuffer[Int]]
+  // Set of pending tasks for each host. Similar to pendingTasksForExecutor, 
but at host level.
+  val forHost = new HashMap[String, ArrayBuffer[Int]]
+  // Set containing pending tasks with no locality preferences.
+  val noPrefs = new ArrayBuffer[Int]
+  // Set of pending tasks for each rack -- similar to the above.
+  val forRack = new HashMap[String, ArrayBuffer[Int]]
+  // Set containing all pending tasks (also used as a stack, as above).
+  val all = new ArrayBuffer[Int]
+}
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
index f582ef5..d696406 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
@@ -107,14 +107,18 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite 
with BeforeAndAfter {
       val taskSet = invoke.getArguments()(0).asInstanceOf[TaskSet]
       new TaskSetManager(mockTaskScheduler, taskSet, 4) {
         private var hasDequeuedSpeculatedTask = false
-        override def dequeueSpeculativeTask(execId: String,
+        override def dequeueTaskHelper(
+            execId: String,
             host: String,
-            locality: TaskLocality.Value): Option[(Int, TaskLocality.Value)] = 
{
-          if (hasDequeuedSpeculatedTask) {
+            locality: TaskLocality.Value,
+            speculative: Boolean): Option[(Int, TaskLocality.Value, Boolean)] 
= {
+          if (!speculative) {
+            super.dequeueTaskHelper(execId, host, locality, speculative)
+          } else if (hasDequeuedSpeculatedTask) {
             None
           } else {
             hasDequeuedSpeculatedTask = true
-            Some((0, TaskLocality.PROCESS_LOCAL))
+            Some((0, TaskLocality.PROCESS_LOCAL, true))
           }
         }
       }
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index da566dd..4bc8ee4 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -740,6 +740,7 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
     // Mark the task as available for speculation, and then offer another 
resource,
     // which should be used to launch a speculative copy of the task.
     manager.speculatableTasks += singleTask.partitionId
+    manager.addPendingTask(singleTask.partitionId, speculatable = true)
     val task2 = manager.resourceOffer("execB", "host2", TaskLocality.ANY).get
 
     assert(manager.runningTasks === 2)
@@ -885,6 +886,7 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
     assert(manager.resourceOffer("execA", "host1", NO_PREF).get.index == 1)
 
     manager.speculatableTasks += 1
+    manager.addPendingTask(1, speculatable = true)
     clock.advance(LOCALITY_WAIT_MS)
     // schedule the nonPref task
     assert(manager.resourceOffer("execA", "host1", NO_PREF).get.index === 2)
@@ -975,7 +977,7 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
     sched.addExecutor("execA", "host1")
     sched.addExecutor("execB.2", "host2")
     manager.executorAdded()
-    assert(manager.pendingTasksWithNoPrefs.size === 0)
+    assert(manager.pendingTasks.noPrefs.size === 0)
     // Valid locality should contain PROCESS_LOCAL, NODE_LOCAL and ANY
     assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, 
NODE_LOCAL, ANY)))
     assert(manager.resourceOffer("execA", "host1", ANY) !== None)
@@ -1166,7 +1168,7 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
     // Because the SchedulerBackend was a mock, the 2nd copy of the task won't 
actually be
     // killed, so the FakeTaskScheduler is only told about the successful 
completion
     // of the speculated task.
-    assert(sched.endedTasks(3) === Success)
+    assert(sched.endedTasks(4) === Success)
     // also because the scheduler is a mock, our manager isn't notified about 
the task killed event,
     // so we do that manually
     manager.handleFailedTask(origTask.taskId, TaskState.KILLED, 
TaskKilled("test"))
@@ -1327,7 +1329,7 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
     val taskDesc = taskSetManagerSpy.resourceOffer(exec, host, 
TaskLocality.ANY)
 
     // Assert the task has been black listed on the executor it was last 
executed on.
-    when(taskSetManagerSpy.addPendingTask(anyInt(), anyBoolean())).thenAnswer(
+    when(taskSetManagerSpy.addPendingTask(anyInt(), anyBoolean(), 
anyBoolean())).thenAnswer(
       (invocationOnMock: InvocationOnMock) => {
         val task: Int = invocationOnMock.getArgument(0)
         assert(taskSetManager.taskSetBlacklistHelperOpt.get.
@@ -1339,7 +1341,7 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
     val e = new ExceptionFailure("a", "b", Array(), "c", None)
     taskSetManagerSpy.handleFailedTask(taskDesc.get.taskId, TaskState.FAILED, 
e)
 
-    verify(taskSetManagerSpy, times(1)).addPendingTask(0, false)
+    verify(taskSetManagerSpy, times(1)).addPendingTask(0, false, false)
   }
 
   test("SPARK-21563 context's added jars shouldn't change mid-TaskSet") {
@@ -1655,4 +1657,116 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
     // get removed inside TaskSchedulerImpl later.
     assert(availableResources(GPU) sameElements Array("0", "1", "2", "3"))
   }
+
+  test("SPARK-26755 Ensure that a speculative task is submitted only once for 
execution") {
+    sc = new SparkContext("local", "test")
+    sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
+    val taskSet = FakeTask.createTaskSet(4)
+    // Set the speculation multiplier to be 0 so speculative tasks are 
launched immediately
+    sc.conf.set(config.SPECULATION_MULTIPLIER, 0.0)
+    sc.conf.set(config.SPECULATION_ENABLED, true)
+    sc.conf.set(config.SPECULATION_QUANTILE, 0.5)
+    val clock = new ManualClock()
+    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock 
= clock)
+    val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = 
taskSet.tasks.map { task =>
+      task.metrics.internalAccums
+    }
+    // Offer resources for 4 tasks to start, 2 on each exec
+    Seq("exec1" -> "host1", "exec2" -> "host2").foreach { case (exec, host) =>
+      (0 until 2).foreach { _ =>
+        val taskOption = manager.resourceOffer(exec, host, NO_PREF)
+        assert(taskOption.isDefined)
+        assert(taskOption.get.executorId === exec)
+      }
+    }
+    assert(sched.startedTasks.toSet === Set(0, 1, 2, 3))
+    clock.advance(1)
+    // Complete the first 2 tasks and leave the other 2 tasks in running
+    for (id <- Set(0, 2)) {
+      manager.handleSuccessfulTask(id, createTaskResult(id, 
accumUpdatesByTask(id)))
+      assert(sched.endedTasks(id) === Success)
+    }
+    // checkSpeculatableTasks checks that the task runtime is greater than the 
threshold for
+    // speculating. Since we use a threshold of 0 for speculation, tasks need 
to be running for
+    // > 0ms, so advance the clock by 1ms here.
+    clock.advance(1)
+    assert(manager.checkSpeculatableTasks(0))
+    assert(sched.speculativeTasks.toSet === Set(1, 3))
+    assert(manager.copiesRunning(1) === 1)
+    assert(manager.copiesRunning(3) === 1)
+
+    // Offer resource to start the speculative attempt for the running task. 
We offer more
+    // resources, and ensure that speculative tasks get scheduled 
appropriately -- only one extra
+    // copy per speculatable task
+    val taskOption2 = manager.resourceOffer("exec1", "host1", NO_PREF)
+    val taskOption3 = manager.resourceOffer("exec2", "host2", NO_PREF)
+    assert(taskOption2.isDefined)
+    val task2 = taskOption2.get
+    // Ensure that task index 3 is launched on host1 and task index 4 on host2
+    assert(task2.index === 3)
+    assert(task2.taskId === 4)
+    assert(task2.executorId === "exec1")
+    assert(task2.attemptNumber === 1)
+    assert(taskOption3.isDefined)
+    val task3 = taskOption3.get
+    assert(task3.index === 1)
+    assert(task3.taskId === 5)
+    assert(task3.executorId === "exec2")
+    assert(task3.attemptNumber === 1)
+    clock.advance(1)
+    // Running checkSpeculatableTasks again should return false
+    assert(!manager.checkSpeculatableTasks(0))
+    assert(manager.copiesRunning(1) === 2)
+    assert(manager.copiesRunning(3) === 2)
+    // Offering additional resources should not lead to any speculative tasks 
being respawned
+    assert(manager.resourceOffer("exec1", "host1", ANY).isEmpty)
+    assert(manager.resourceOffer("exec2", "host2", ANY).isEmpty)
+    assert(manager.resourceOffer("exec3", "host3", ANY).isEmpty)
+  }
+
+  test("SPARK-26755 Ensure that a speculative task obeys original locality 
preferences") {
+    sc = new SparkContext("local", "test")
+    // Set the speculation multiplier to be 0 so speculative tasks are 
launched immediately
+    sc.conf.set(config.SPECULATION_MULTIPLIER, 0.0)
+    sc.conf.set(config.SPECULATION_ENABLED, true)
+    sc.conf.set(config.SPECULATION_QUANTILE, 0.5)
+    // Launch a new set of tasks with locality preferences
+    sched = new FakeTaskScheduler(sc, ("exec1", "host1"),
+      ("exec2", "host2"), ("exec3", "host3"), ("exec4", "host4"))
+    val taskSet = FakeTask.createTaskSet(3,
+      Seq(TaskLocation("host1"), TaskLocation("host3")),
+      Seq(TaskLocation("host2")),
+      Seq(TaskLocation("host3")))
+    val clock = new ManualClock()
+    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock 
= clock)
+    val accumUpdatesByTask2: Array[Seq[AccumulatorV2[_, _]]] = 
taskSet.tasks.map { task =>
+      task.metrics.internalAccums
+    }
+    // Offer resources for 3 tasks to start
+    Seq("exec1" -> "host1", "exec2" -> "host2", "exec3" -> "host3").foreach { 
case (exec, host) =>
+      val taskOption = manager.resourceOffer(exec, host, NO_PREF)
+      assert(taskOption.isDefined)
+      assert(taskOption.get.executorId === exec)
+    }
+    assert(sched.startedTasks.toSet === Set(0, 1, 2))
+    clock.advance(1)
+    // Finish one task and mark the others as speculatable
+    manager.handleSuccessfulTask(2, createTaskResult(2, 
accumUpdatesByTask2(2)))
+    assert(sched.endedTasks(2) === Success)
+    clock.advance(1)
+    assert(manager.checkSpeculatableTasks(0))
+    assert(sched.speculativeTasks.toSet === Set(0, 1))
+    // Ensure that the speculatable tasks obey the original locality 
preferences
+    assert(manager.resourceOffer("exec4", "host4", NODE_LOCAL).isEmpty)
+    // task 1 does have a node-local preference for host2 -- but we've already 
got a regular
+    // task running there, so we should not schedule a speculative there as 
well.
+    assert(manager.resourceOffer("exec2", "host2", NODE_LOCAL).isEmpty)
+    assert(manager.resourceOffer("exec3", "host3", NODE_LOCAL).isDefined)
+    assert(manager.resourceOffer("exec4", "host4", ANY).isDefined)
+    // Since, all speculatable tasks have been launched, making another offer
+    // should not schedule any more tasks
+    assert(manager.resourceOffer("exec1", "host1", ANY).isEmpty)
+    assert(!manager.checkSpeculatableTasks(0))
+    assert(manager.resourceOffer("exec1", "host1", ANY).isEmpty)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to