This is an automated email from the ASF dual-hosted git repository. irashid pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 70910e6 [SPARK-26755][SCHEDULER] : Optimize Spark Scheduler to dequeue speculative tasks… 70910e6 is described below commit 70910e6ad00f0de4075217d5305d87a477ff1dc4 Author: pgandhi <pgan...@verizonmedia.com> AuthorDate: Tue Jul 30 09:54:51 2019 -0500 [SPARK-26755][SCHEDULER] : Optimize Spark Scheduler to dequeue speculative tasks… … more efficiently This PR improves the performance of scheduling speculative tasks to be O(1) instead of O(numSpeculativeTasks), using the same approach used for scheduling regular tasks. The performance of this method is particularly important because a lock is held on the TaskSchedulerImpl which is a bottleneck for all scheduling operations. We ran a Join query on a large dataset with speculation enabled and out of 100000 tasks for the ShuffleMapStage, the maximum number of speculatable tasks that wa [...] In particular, this works by storing a separate stack of tasks by executor, node, and rack locality preferences. Then when trying to schedule a speculative task, rather than scanning all speculative tasks to find ones which match the given executor (or node, or rack) preference, we can jump to a quick check of tasks matching the resource offer. This technique was already used for regular tasks -- this change refactors the code to allow sharing with regular and speculative task execution. ## What changes were proposed in this pull request? Have split the main queue "speculatableTasks" into 5 separate queues based on locality preference similar to how normal tasks are enqueued. Thus, the "dequeueSpeculativeTask" method will avoid performing locality checks for each task at runtime and simply return the preferable task to be executed. ## How was this patch tested? We ran a spark job that performed a join on a 10 TB dataset to test the code change. Original Code: <img width="1433" alt="screen shot 2019-01-28 at 5 07 22 pm" src="https://user-images.githubusercontent.com/22228190/51873321-572df280-2322-11e9-9149-0aae08d5edc6.png"> Optimized Code: <img width="1435" alt="screen shot 2019-01-28 at 5 08 19 pm" src="https://user-images.githubusercontent.com/22228190/51873343-6745d200-2322-11e9-947b-2cfd0f06bcab.png"> As you can see, the run time of the ShuffleMapStage came down from 40 min to 6 min approximately, thus, reducing the overall running time of the spark job by a significant amount. Another example for the same job: Original Code: <img width="1440" alt="screen shot 2019-01-28 at 5 11 30 pm" src="https://user-images.githubusercontent.com/22228190/51873355-70cf3a00-2322-11e9-9c3a-af035449a306.png"> Optimized Code: <img width="1440" alt="screen shot 2019-01-28 at 5 12 16 pm" src="https://user-images.githubusercontent.com/22228190/51873367-7dec2900-2322-11e9-8d07-1b1b49285f71.png"> Closes #23677 from pgandhi999/SPARK-26755. Lead-authored-by: pgandhi <pgan...@verizonmedia.com> Co-authored-by: pgandhi <pgan...@oath.com> Signed-off-by: Imran Rashid <iras...@cloudera.com> --- .../apache/spark/scheduler/TaskSetManager.scala | 292 ++++++++------------- .../scheduler/OutputCommitCoordinatorSuite.scala | 12 +- .../spark/scheduler/TaskSetManagerSuite.scala | 122 ++++++++- 3 files changed, 242 insertions(+), 184 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index e7645fc..79a1afc 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -131,37 +131,17 @@ private[spark] class TaskSetManager( // same time for a barrier stage. private[scheduler] def isBarrier = taskSet.tasks.nonEmpty && taskSet.tasks(0).isBarrier - // Set of pending tasks for each executor. These collections are actually - // treated as stacks, in which new tasks are added to the end of the - // ArrayBuffer and removed from the end. This makes it faster to detect - // tasks that repeatedly fail because whenever a task failed, it is put - // back at the head of the stack. These collections may contain duplicates - // for two reasons: - // (1): Tasks are only removed lazily; when a task is launched, it remains - // in all the pending lists except the one that it was launched from. - // (2): Tasks may be re-added to these lists multiple times as a result - // of failures. - // Duplicates are handled in dequeueTaskFromList, which ensures that a - // task hasn't already started running before launching it. - private val pendingTasksForExecutor = new HashMap[String, ArrayBuffer[Int]] - - // Set of pending tasks for each host. Similar to pendingTasksForExecutor, - // but at host level. - private val pendingTasksForHost = new HashMap[String, ArrayBuffer[Int]] - - // Set of pending tasks for each rack -- similar to the above. - private val pendingTasksForRack = new HashMap[String, ArrayBuffer[Int]] - - // Set containing pending tasks with no locality preferences. - private[scheduler] var pendingTasksWithNoPrefs = new ArrayBuffer[Int] - - // Set containing all pending tasks (also used as a stack, as above). - private val allPendingTasks = new ArrayBuffer[Int] + // Store tasks waiting to be scheduled by locality preferences + private[scheduler] val pendingTasks = new PendingTasksByLocality() // Tasks that can be speculated. Since these will be a small fraction of total - // tasks, we'll just hold them in a HashSet. + // tasks, we'll just hold them in a HashSet. The HashSet here ensures that we do not add + // duplicate speculatable tasks. private[scheduler] val speculatableTasks = new HashSet[Int] + // Store speculatable tasks by locality preferences + private[scheduler] val pendingSpeculatableTasks = new PendingTasksByLocality() + // Task index, start and finish time for each task attempt (indexed by task ID) private[scheduler] val taskInfos = new HashMap[Long, TaskInfo] @@ -197,11 +177,11 @@ private[spark] class TaskSetManager( } // Resolve the rack for each host. This can be slow, so de-dupe the list of hosts, // and assign the rack to all relevant task indices. - val (hosts, indicesForHosts) = pendingTasksForHost.toSeq.unzip + val (hosts, indicesForHosts) = pendingTasks.forHost.toSeq.unzip val racks = sched.getRacksForHosts(hosts) racks.zip(indicesForHosts).foreach { case (Some(rack), indices) => - pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer) ++= indices + pendingTasks.forRack.getOrElseUpdate(rack, new ArrayBuffer) ++= indices case (None, _) => // no rack, nothing to do } } @@ -234,63 +214,41 @@ private[spark] class TaskSetManager( /** Add a task to all the pending-task lists that it should be on. */ private[spark] def addPendingTask( index: Int, - resolveRacks: Boolean = true): Unit = { + resolveRacks: Boolean = true, + speculatable: Boolean = false): Unit = { + val pendingTaskSetToAddTo = if (speculatable) pendingSpeculatableTasks else pendingTasks for (loc <- tasks(index).preferredLocations) { loc match { case e: ExecutorCacheTaskLocation => - pendingTasksForExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer) += index + pendingTaskSetToAddTo.forExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer) += index case e: HDFSCacheTaskLocation => val exe = sched.getExecutorsAliveOnHost(loc.host) exe match { case Some(set) => for (e <- set) { - pendingTasksForExecutor.getOrElseUpdate(e, new ArrayBuffer) += index + pendingTaskSetToAddTo.forExecutor.getOrElseUpdate(e, new ArrayBuffer) += index } logInfo(s"Pending task $index has a cached location at ${e.host} " + ", where there are executors " + set.mkString(",")) case None => logDebug(s"Pending task $index has a cached location at ${e.host} " + - ", but there are no executors alive there.") + ", but there are no executors alive there.") } case _ => } - pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer) += index + pendingTaskSetToAddTo.forHost.getOrElseUpdate(loc.host, new ArrayBuffer) += index if (resolveRacks) { sched.getRackForHost(loc.host).foreach { rack => - pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer) += index + pendingTaskSetToAddTo.forRack.getOrElseUpdate(rack, new ArrayBuffer) += index } } } if (tasks(index).preferredLocations == Nil) { - pendingTasksWithNoPrefs += index + pendingTaskSetToAddTo.noPrefs += index } - allPendingTasks += index // No point scanning this whole list to find the old task there - } - - /** - * Return the pending tasks list for a given executor ID, or an empty list if - * there is no map entry for that host - */ - private def getPendingTasksForExecutor(executorId: String): ArrayBuffer[Int] = { - pendingTasksForExecutor.getOrElse(executorId, ArrayBuffer()) - } - - /** - * Return the pending tasks list for a given host, or an empty list if - * there is no map entry for that host - */ - private def getPendingTasksForHost(host: String): ArrayBuffer[Int] = { - pendingTasksForHost.getOrElse(host, ArrayBuffer()) - } - - /** - * Return the pending rack-local task list for a given rack, or an empty list if - * there is no map entry for that rack - */ - private def getPendingTasksForRack(rack: String): ArrayBuffer[Int] = { - pendingTasksForRack.getOrElse(rack, ArrayBuffer()) + pendingTaskSetToAddTo.all += index } /** @@ -302,16 +260,24 @@ private[spark] class TaskSetManager( private def dequeueTaskFromList( execId: String, host: String, - list: ArrayBuffer[Int]): Option[Int] = { + list: ArrayBuffer[Int], + speculative: Boolean = false): Option[Int] = { var indexOffset = list.size while (indexOffset > 0) { indexOffset -= 1 val index = list(indexOffset) - if (!isTaskBlacklistedOnExecOrNode(index, execId, host)) { + if (!isTaskBlacklistedOnExecOrNode(index, execId, host) && + !(speculative && hasAttemptOnHost(index, host))) { // This should almost always be list.trimEnd(1) to remove tail list.remove(indexOffset) - if (copiesRunning(index) == 0 && !successful(index)) { - return Some(index) + // Speculatable task should only be launched when at most one copy of the + // original task is running + if (!successful(index)) { + if (copiesRunning(index) == 0) { + return Some(index) + } else if (speculative && copiesRunning(index) == 1) { + return Some(index) + } } } } @@ -331,127 +297,70 @@ private[spark] class TaskSetManager( } /** - * Return a speculative task for a given executor if any are available. The task should not have - * an attempt running on this host, in case the host is slow. In addition, the task should meet - * the given locality constraint. + * Dequeue a pending task for a given node and return its index and locality level. + * Only search for tasks matching the given locality constraint. + * + * @return An option containing (task index within the task set, locality, is speculative?) */ - // Labeled as protected to allow tests to override providing speculative tasks if necessary - protected def dequeueSpeculativeTask(execId: String, host: String, locality: TaskLocality.Value) - : Option[(Int, TaskLocality.Value)] = - { - speculatableTasks.retain(index => !successful(index)) // Remove finished tasks from set + private def dequeueTask( + execId: String, + host: String, + maxLocality: TaskLocality.Value): Option[(Int, TaskLocality.Value, Boolean)] = { + // Tries to schedule a regular task first; if it returns None, then schedules + // a speculative task + dequeueTaskHelper(execId, host, maxLocality, false).orElse( + dequeueTaskHelper(execId, host, maxLocality, true)) + } - def canRunOnHost(index: Int): Boolean = { - !hasAttemptOnHost(index, host) && - !isTaskBlacklistedOnExecOrNode(index, execId, host) + protected def dequeueTaskHelper( + execId: String, + host: String, + maxLocality: TaskLocality.Value, + speculative: Boolean): Option[(Int, TaskLocality.Value, Boolean)] = { + if (speculative && speculatableTasks.isEmpty) { + return None } - - if (!speculatableTasks.isEmpty) { - // Check for process-local tasks; note that tasks can be process-local - // on multiple nodes when we replicate cached blocks, as in Spark Streaming - for (index <- speculatableTasks if canRunOnHost(index)) { - val prefs = tasks(index).preferredLocations - val executors = prefs.flatMap(_ match { - case e: ExecutorCacheTaskLocation => Some(e.executorId) - case _ => None - }) - if (executors.contains(execId)) { - speculatableTasks -= index - return Some((index, TaskLocality.PROCESS_LOCAL)) - } - } - - // Check for node-local tasks - if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) { - for (index <- speculatableTasks if canRunOnHost(index)) { - val locations = tasks(index).preferredLocations.map(_.host) - if (locations.contains(host)) { - speculatableTasks -= index - return Some((index, TaskLocality.NODE_LOCAL)) - } - } - } - - // Check for no-preference tasks - if (TaskLocality.isAllowed(locality, TaskLocality.NO_PREF)) { - for (index <- speculatableTasks if canRunOnHost(index)) { - val locations = tasks(index).preferredLocations - if (locations.size == 0) { - speculatableTasks -= index - return Some((index, TaskLocality.PROCESS_LOCAL)) - } - } - } - - // Check for rack-local tasks - if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) { - for (rack <- sched.getRackForHost(host)) { - for (index <- speculatableTasks if canRunOnHost(index)) { - val racks = tasks(index).preferredLocations.map(_.host).flatMap(sched.getRackForHost) - if (racks.contains(rack)) { - speculatableTasks -= index - return Some((index, TaskLocality.RACK_LOCAL)) - } - } - } - } - - // Check for non-local tasks - if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) { - for (index <- speculatableTasks if canRunOnHost(index)) { - speculatableTasks -= index - return Some((index, TaskLocality.ANY)) - } + val pendingTaskSetToUse = if (speculative) pendingSpeculatableTasks else pendingTasks + def dequeue(list: ArrayBuffer[Int]): Option[Int] = { + val task = dequeueTaskFromList(execId, host, list, speculative) + if (speculative && task.isDefined) { + speculatableTasks -= task.get } + task } - None - } - - /** - * Dequeue a pending task for a given node and return its index and locality level. - * Only search for tasks matching the given locality constraint. - * - * @return An option containing (task index within the task set, locality, is speculative?) - */ - private def dequeueTask(execId: String, host: String, maxLocality: TaskLocality.Value) - : Option[(Int, TaskLocality.Value, Boolean)] = - { - for (index <- dequeueTaskFromList(execId, host, getPendingTasksForExecutor(execId))) { - return Some((index, TaskLocality.PROCESS_LOCAL, false)) + dequeue(pendingTaskSetToUse.forExecutor.getOrElse(execId, ArrayBuffer())).foreach { index => + return Some((index, TaskLocality.PROCESS_LOCAL, speculative)) } if (TaskLocality.isAllowed(maxLocality, TaskLocality.NODE_LOCAL)) { - for (index <- dequeueTaskFromList(execId, host, getPendingTasksForHost(host))) { - return Some((index, TaskLocality.NODE_LOCAL, false)) + dequeue(pendingTaskSetToUse.forHost.getOrElse(host, ArrayBuffer())).foreach { index => + return Some((index, TaskLocality.NODE_LOCAL, speculative)) } } + // Look for noPref tasks after NODE_LOCAL for minimize cross-rack traffic if (TaskLocality.isAllowed(maxLocality, TaskLocality.NO_PREF)) { - // Look for noPref tasks after NODE_LOCAL for minimize cross-rack traffic - for (index <- dequeueTaskFromList(execId, host, pendingTasksWithNoPrefs)) { - return Some((index, TaskLocality.PROCESS_LOCAL, false)) + dequeue(pendingTaskSetToUse.noPrefs).foreach { index => + return Some((index, TaskLocality.PROCESS_LOCAL, speculative)) } } if (TaskLocality.isAllowed(maxLocality, TaskLocality.RACK_LOCAL)) { for { rack <- sched.getRackForHost(host) - index <- dequeueTaskFromList(execId, host, getPendingTasksForRack(rack)) + index <- dequeue(pendingTaskSetToUse.forRack.getOrElse(rack, ArrayBuffer())) } { - return Some((index, TaskLocality.RACK_LOCAL, false)) + return Some((index, TaskLocality.RACK_LOCAL, speculative)) } } if (TaskLocality.isAllowed(maxLocality, TaskLocality.ANY)) { - for (index <- dequeueTaskFromList(execId, host, allPendingTasks)) { - return Some((index, TaskLocality.ANY, false)) + dequeue(pendingTaskSetToUse.all).foreach { index => + return Some((index, TaskLocality.ANY, speculative)) } } - - // find a speculative task if all others tasks have been scheduled - dequeueSpeculativeTask(execId, host, maxLocality).map { - case (taskIndex, allowedLocality) => (taskIndex, allowedLocality, true)} + None } /** @@ -616,10 +525,10 @@ private[spark] class TaskSetManager( while (currentLocalityIndex < myLocalityLevels.length - 1) { val moreTasks = myLocalityLevels(currentLocalityIndex) match { - case TaskLocality.PROCESS_LOCAL => moreTasksToRunIn(pendingTasksForExecutor) - case TaskLocality.NODE_LOCAL => moreTasksToRunIn(pendingTasksForHost) - case TaskLocality.NO_PREF => pendingTasksWithNoPrefs.nonEmpty - case TaskLocality.RACK_LOCAL => moreTasksToRunIn(pendingTasksForRack) + case TaskLocality.PROCESS_LOCAL => moreTasksToRunIn(pendingTasks.forExecutor) + case TaskLocality.NODE_LOCAL => moreTasksToRunIn(pendingTasks.forHost) + case TaskLocality.NO_PREF => pendingTasks.noPrefs.nonEmpty + case TaskLocality.RACK_LOCAL => moreTasksToRunIn(pendingTasks.forRack) } if (!moreTasks) { // This is a performance optimization: if there are no more tasks that can @@ -686,13 +595,13 @@ private[spark] class TaskSetManager( // from each list, we may need to go deeper in the list. We poll from the end because // failed tasks are put back at the end of allPendingTasks, so we're more likely to find // an unschedulable task this way. - val indexOffset = allPendingTasks.lastIndexWhere { indexInTaskSet => + val indexOffset = pendingTasks.all.lastIndexWhere { indexInTaskSet => copiesRunning(indexInTaskSet) == 0 && !successful(indexInTaskSet) } if (indexOffset == -1) { None } else { - Some(allPendingTasks(indexOffset)) + Some(pendingTasks.all(indexOffset)) } } @@ -1064,10 +973,12 @@ private[spark] class TaskSetManager( val info = taskInfos(tid) val index = info.index if (!successful(index) && copiesRunning(index) == 1 && info.timeRunning(time) > threshold && - !speculatableTasks.contains(index)) { + !speculatableTasks.contains(index)) { + addPendingTask(index, speculatable = true) logInfo( - "Marking task %d in stage %s (on %s) as speculatable because it ran more than %.0f ms" - .format(index, taskSet.id, info.host, threshold)) + ("Marking task %d in stage %s (on %s) as speculatable because it ran more" + + " than %.0f ms(%d speculatable tasks in this taskset now)") + .format(index, taskSet.id, info.host, threshold, speculatableTasks.size + 1)) speculatableTasks += index sched.dagScheduler.speculativeTaskSubmitted(tasks(index)) foundTasks = true @@ -1100,19 +1011,19 @@ private[spark] class TaskSetManager( private def computeValidLocalityLevels(): Array[TaskLocality.TaskLocality] = { import TaskLocality.{PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY} val levels = new ArrayBuffer[TaskLocality.TaskLocality] - if (!pendingTasksForExecutor.isEmpty && - pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive(_))) { + if (!pendingTasks.forExecutor.isEmpty && + pendingTasks.forExecutor.keySet.exists(sched.isExecutorAlive(_))) { levels += PROCESS_LOCAL } - if (!pendingTasksForHost.isEmpty && - pendingTasksForHost.keySet.exists(sched.hasExecutorsAliveOnHost(_))) { + if (!pendingTasks.forHost.isEmpty && + pendingTasks.forHost.keySet.exists(sched.hasExecutorsAliveOnHost(_))) { levels += NODE_LOCAL } - if (!pendingTasksWithNoPrefs.isEmpty) { + if (!pendingTasks.noPrefs.isEmpty) { levels += NO_PREF } - if (!pendingTasksForRack.isEmpty && - pendingTasksForRack.keySet.exists(sched.hasHostAliveOnRack(_))) { + if (!pendingTasks.forRack.isEmpty && + pendingTasks.forRack.keySet.exists(sched.hasHostAliveOnRack(_))) { levels += RACK_LOCAL } levels += ANY @@ -1137,3 +1048,32 @@ private[spark] object TaskSetManager { // this. val TASK_SIZE_TO_WARN_KIB = 1000 } + +/** + * Set of pending tasks for various levels of locality: executor, host, rack, + * noPrefs and anyPrefs. These collections are actually + * treated as stacks, in which new tasks are added to the end of the + * ArrayBuffer and removed from the end. This makes it faster to detect + * tasks that repeatedly fail because whenever a task failed, it is put + * back at the head of the stack. These collections may contain duplicates + * for two reasons: + * (1): Tasks are only removed lazily; when a task is launched, it remains + * in all the pending lists except the one that it was launched from. + * (2): Tasks may be re-added to these lists multiple times as a result + * of failures. + * Duplicates are handled in dequeueTaskFromList, which ensures that a + * task hasn't already started running before launching it. + */ +private[scheduler] class PendingTasksByLocality { + + // Set of pending tasks for each executor. + val forExecutor = new HashMap[String, ArrayBuffer[Int]] + // Set of pending tasks for each host. Similar to pendingTasksForExecutor, but at host level. + val forHost = new HashMap[String, ArrayBuffer[Int]] + // Set containing pending tasks with no locality preferences. + val noPrefs = new ArrayBuffer[Int] + // Set of pending tasks for each rack -- similar to the above. + val forRack = new HashMap[String, ArrayBuffer[Int]] + // Set containing all pending tasks (also used as a stack, as above). + val all = new ArrayBuffer[Int] +} diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala index f582ef5..d696406 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala @@ -107,14 +107,18 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter { val taskSet = invoke.getArguments()(0).asInstanceOf[TaskSet] new TaskSetManager(mockTaskScheduler, taskSet, 4) { private var hasDequeuedSpeculatedTask = false - override def dequeueSpeculativeTask(execId: String, + override def dequeueTaskHelper( + execId: String, host: String, - locality: TaskLocality.Value): Option[(Int, TaskLocality.Value)] = { - if (hasDequeuedSpeculatedTask) { + locality: TaskLocality.Value, + speculative: Boolean): Option[(Int, TaskLocality.Value, Boolean)] = { + if (!speculative) { + super.dequeueTaskHelper(execId, host, locality, speculative) + } else if (hasDequeuedSpeculatedTask) { None } else { hasDequeuedSpeculatedTask = true - Some((0, TaskLocality.PROCESS_LOCAL)) + Some((0, TaskLocality.PROCESS_LOCAL, true)) } } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index da566dd..4bc8ee4 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -740,6 +740,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg // Mark the task as available for speculation, and then offer another resource, // which should be used to launch a speculative copy of the task. manager.speculatableTasks += singleTask.partitionId + manager.addPendingTask(singleTask.partitionId, speculatable = true) val task2 = manager.resourceOffer("execB", "host2", TaskLocality.ANY).get assert(manager.runningTasks === 2) @@ -885,6 +886,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg assert(manager.resourceOffer("execA", "host1", NO_PREF).get.index == 1) manager.speculatableTasks += 1 + manager.addPendingTask(1, speculatable = true) clock.advance(LOCALITY_WAIT_MS) // schedule the nonPref task assert(manager.resourceOffer("execA", "host1", NO_PREF).get.index === 2) @@ -975,7 +977,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg sched.addExecutor("execA", "host1") sched.addExecutor("execB.2", "host2") manager.executorAdded() - assert(manager.pendingTasksWithNoPrefs.size === 0) + assert(manager.pendingTasks.noPrefs.size === 0) // Valid locality should contain PROCESS_LOCAL, NODE_LOCAL and ANY assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, NODE_LOCAL, ANY))) assert(manager.resourceOffer("execA", "host1", ANY) !== None) @@ -1166,7 +1168,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg // Because the SchedulerBackend was a mock, the 2nd copy of the task won't actually be // killed, so the FakeTaskScheduler is only told about the successful completion // of the speculated task. - assert(sched.endedTasks(3) === Success) + assert(sched.endedTasks(4) === Success) // also because the scheduler is a mock, our manager isn't notified about the task killed event, // so we do that manually manager.handleFailedTask(origTask.taskId, TaskState.KILLED, TaskKilled("test")) @@ -1327,7 +1329,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg val taskDesc = taskSetManagerSpy.resourceOffer(exec, host, TaskLocality.ANY) // Assert the task has been black listed on the executor it was last executed on. - when(taskSetManagerSpy.addPendingTask(anyInt(), anyBoolean())).thenAnswer( + when(taskSetManagerSpy.addPendingTask(anyInt(), anyBoolean(), anyBoolean())).thenAnswer( (invocationOnMock: InvocationOnMock) => { val task: Int = invocationOnMock.getArgument(0) assert(taskSetManager.taskSetBlacklistHelperOpt.get. @@ -1339,7 +1341,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg val e = new ExceptionFailure("a", "b", Array(), "c", None) taskSetManagerSpy.handleFailedTask(taskDesc.get.taskId, TaskState.FAILED, e) - verify(taskSetManagerSpy, times(1)).addPendingTask(0, false) + verify(taskSetManagerSpy, times(1)).addPendingTask(0, false, false) } test("SPARK-21563 context's added jars shouldn't change mid-TaskSet") { @@ -1655,4 +1657,116 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg // get removed inside TaskSchedulerImpl later. assert(availableResources(GPU) sameElements Array("0", "1", "2", "3")) } + + test("SPARK-26755 Ensure that a speculative task is submitted only once for execution") { + sc = new SparkContext("local", "test") + sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2")) + val taskSet = FakeTask.createTaskSet(4) + // Set the speculation multiplier to be 0 so speculative tasks are launched immediately + sc.conf.set(config.SPECULATION_MULTIPLIER, 0.0) + sc.conf.set(config.SPECULATION_ENABLED, true) + sc.conf.set(config.SPECULATION_QUANTILE, 0.5) + val clock = new ManualClock() + val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock) + val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task => + task.metrics.internalAccums + } + // Offer resources for 4 tasks to start, 2 on each exec + Seq("exec1" -> "host1", "exec2" -> "host2").foreach { case (exec, host) => + (0 until 2).foreach { _ => + val taskOption = manager.resourceOffer(exec, host, NO_PREF) + assert(taskOption.isDefined) + assert(taskOption.get.executorId === exec) + } + } + assert(sched.startedTasks.toSet === Set(0, 1, 2, 3)) + clock.advance(1) + // Complete the first 2 tasks and leave the other 2 tasks in running + for (id <- Set(0, 2)) { + manager.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id))) + assert(sched.endedTasks(id) === Success) + } + // checkSpeculatableTasks checks that the task runtime is greater than the threshold for + // speculating. Since we use a threshold of 0 for speculation, tasks need to be running for + // > 0ms, so advance the clock by 1ms here. + clock.advance(1) + assert(manager.checkSpeculatableTasks(0)) + assert(sched.speculativeTasks.toSet === Set(1, 3)) + assert(manager.copiesRunning(1) === 1) + assert(manager.copiesRunning(3) === 1) + + // Offer resource to start the speculative attempt for the running task. We offer more + // resources, and ensure that speculative tasks get scheduled appropriately -- only one extra + // copy per speculatable task + val taskOption2 = manager.resourceOffer("exec1", "host1", NO_PREF) + val taskOption3 = manager.resourceOffer("exec2", "host2", NO_PREF) + assert(taskOption2.isDefined) + val task2 = taskOption2.get + // Ensure that task index 3 is launched on host1 and task index 4 on host2 + assert(task2.index === 3) + assert(task2.taskId === 4) + assert(task2.executorId === "exec1") + assert(task2.attemptNumber === 1) + assert(taskOption3.isDefined) + val task3 = taskOption3.get + assert(task3.index === 1) + assert(task3.taskId === 5) + assert(task3.executorId === "exec2") + assert(task3.attemptNumber === 1) + clock.advance(1) + // Running checkSpeculatableTasks again should return false + assert(!manager.checkSpeculatableTasks(0)) + assert(manager.copiesRunning(1) === 2) + assert(manager.copiesRunning(3) === 2) + // Offering additional resources should not lead to any speculative tasks being respawned + assert(manager.resourceOffer("exec1", "host1", ANY).isEmpty) + assert(manager.resourceOffer("exec2", "host2", ANY).isEmpty) + assert(manager.resourceOffer("exec3", "host3", ANY).isEmpty) + } + + test("SPARK-26755 Ensure that a speculative task obeys original locality preferences") { + sc = new SparkContext("local", "test") + // Set the speculation multiplier to be 0 so speculative tasks are launched immediately + sc.conf.set(config.SPECULATION_MULTIPLIER, 0.0) + sc.conf.set(config.SPECULATION_ENABLED, true) + sc.conf.set(config.SPECULATION_QUANTILE, 0.5) + // Launch a new set of tasks with locality preferences + sched = new FakeTaskScheduler(sc, ("exec1", "host1"), + ("exec2", "host2"), ("exec3", "host3"), ("exec4", "host4")) + val taskSet = FakeTask.createTaskSet(3, + Seq(TaskLocation("host1"), TaskLocation("host3")), + Seq(TaskLocation("host2")), + Seq(TaskLocation("host3"))) + val clock = new ManualClock() + val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock) + val accumUpdatesByTask2: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task => + task.metrics.internalAccums + } + // Offer resources for 3 tasks to start + Seq("exec1" -> "host1", "exec2" -> "host2", "exec3" -> "host3").foreach { case (exec, host) => + val taskOption = manager.resourceOffer(exec, host, NO_PREF) + assert(taskOption.isDefined) + assert(taskOption.get.executorId === exec) + } + assert(sched.startedTasks.toSet === Set(0, 1, 2)) + clock.advance(1) + // Finish one task and mark the others as speculatable + manager.handleSuccessfulTask(2, createTaskResult(2, accumUpdatesByTask2(2))) + assert(sched.endedTasks(2) === Success) + clock.advance(1) + assert(manager.checkSpeculatableTasks(0)) + assert(sched.speculativeTasks.toSet === Set(0, 1)) + // Ensure that the speculatable tasks obey the original locality preferences + assert(manager.resourceOffer("exec4", "host4", NODE_LOCAL).isEmpty) + // task 1 does have a node-local preference for host2 -- but we've already got a regular + // task running there, so we should not schedule a speculative there as well. + assert(manager.resourceOffer("exec2", "host2", NODE_LOCAL).isEmpty) + assert(manager.resourceOffer("exec3", "host3", NODE_LOCAL).isDefined) + assert(manager.resourceOffer("exec4", "host4", ANY).isDefined) + // Since, all speculatable tasks have been launched, making another offer + // should not schedule any more tasks + assert(manager.resourceOffer("exec1", "host1", ANY).isEmpty) + assert(!manager.checkSpeculatableTasks(0)) + assert(manager.resourceOffer("exec1", "host1", ANY).isEmpty) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org