This is an automated email from the ASF dual-hosted git repository. mridulm80 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 801e07996a4 [SPARK-41192][CORE] Remove unscheduled speculative tasks when task finished to obtain better dynamic 801e07996a4 is described below commit 801e07996a4d4ea448b6fc468cc6c9d6904ceef2 Author: wangyazhi <wangya...@baidu.com> AuthorDate: Tue Dec 20 21:35:37 2022 -0600 [SPARK-41192][CORE] Remove unscheduled speculative tasks when task finished to obtain better dynamic ### What changes were proposed in this pull request? ExecutorAllocationManager only record count for speculative task, `stageAttemptToNumSpeculativeTasks` increment when speculative task submit, and only decrement when speculative task end. If task finished before speculative task start, the speculative task will never be scheduled, which will cause leak of `stageAttemptToNumSpeculativeTasks` and mislead the calculation of target executors. This PR fixes the issue by add task index in `SparkListenerSpeculativeTaskSubmitted` event, and record speculative task with task index when submitted, task index should be removed when speculative task start or task finished(whether it is speculative or not) ### Why are the changes needed? To fix idle executors caused by pending speculative task from task that has been finished ### Does this PR introduce _any_ user-facing change? DeveloperApi `SparkListenerSpeculativeTaskSubmitted` add taskIndex with default value -1 ### How was this patch tested? Add a comprehensive unit test. Pass the GA Closes #38711 from toujours33/SPARK-41192. Lead-authored-by: wangyazhi <wangya...@baidu.com> Co-authored-by: toujours33 <wangya...@baidu.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> --- .../apache/spark/ExecutorAllocationManager.scala | 38 +++++---- .../org/apache/spark/scheduler/DAGScheduler.scala | 14 ++-- .../apache/spark/scheduler/DAGSchedulerEvent.scala | 2 +- .../org/apache/spark/scheduler/SparkListener.scala | 16 +++- .../apache/spark/scheduler/TaskSetManager.scala | 2 +- .../spark/ExecutorAllocationManagerSuite.scala | 97 ++++++++++++++++++++-- .../spark/scheduler/TaskSetManagerSuite.scala | 2 +- 7 files changed, 139 insertions(+), 32 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 204ffc39a11..f06312c15cf 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -643,10 +643,12 @@ private[spark] class ExecutorAllocationManager( // Should be 0 when no stages are active. private val stageAttemptToNumRunningTask = new mutable.HashMap[StageAttempt, Int] private val stageAttemptToTaskIndices = new mutable.HashMap[StageAttempt, mutable.HashSet[Int]] - // Number of speculative tasks pending/running in each stageAttempt - private val stageAttemptToNumSpeculativeTasks = new mutable.HashMap[StageAttempt, Int] - // The speculative tasks started in each stageAttempt + // Map from each stageAttempt to a set of running speculative task indexes + // TODO(SPARK-41192): We simply need an Int for this. private val stageAttemptToSpeculativeTaskIndices = + new mutable.HashMap[StageAttempt, mutable.HashSet[Int]]() + // Map from each stageAttempt to a set of pending speculative task indexes + private val stageAttemptToPendingSpeculativeTasks = new mutable.HashMap[StageAttempt, mutable.HashSet[Int]] private val resourceProfileIdToStageAttempt = @@ -722,7 +724,7 @@ private[spark] class ExecutorAllocationManager( // because the attempt may still have running tasks, // even after another attempt for the stage is submitted. stageAttemptToNumTasks -= stageAttempt - stageAttemptToNumSpeculativeTasks -= stageAttempt + stageAttemptToPendingSpeculativeTasks -= stageAttempt stageAttemptToTaskIndices -= stageAttempt stageAttemptToSpeculativeTaskIndices -= stageAttempt stageAttemptToExecutorPlacementHints -= stageAttempt @@ -733,7 +735,9 @@ private[spark] class ExecutorAllocationManager( // If this is the last stage with pending tasks, mark the scheduler queue as empty // This is needed in case the stage is aborted for any reason - if (stageAttemptToNumTasks.isEmpty && stageAttemptToNumSpeculativeTasks.isEmpty) { + if (stageAttemptToNumTasks.isEmpty + && stageAttemptToPendingSpeculativeTasks.isEmpty + && stageAttemptToSpeculativeTaskIndices.isEmpty) { allocationManager.onSchedulerQueueEmpty() } } @@ -751,6 +755,8 @@ private[spark] class ExecutorAllocationManager( if (taskStart.taskInfo.speculative) { stageAttemptToSpeculativeTaskIndices.getOrElseUpdate(stageAttempt, new mutable.HashSet[Int]) += taskIndex + stageAttemptToPendingSpeculativeTasks + .get(stageAttempt).foreach(_.remove(taskIndex)) } else { stageAttemptToTaskIndices.getOrElseUpdate(stageAttempt, new mutable.HashSet[Int]) += taskIndex @@ -776,15 +782,14 @@ private[spark] class ExecutorAllocationManager( } if (taskEnd.taskInfo.speculative) { stageAttemptToSpeculativeTaskIndices.get(stageAttempt).foreach {_.remove{taskIndex}} - // If the previous task attempt succeeded first and it was the last task in a stage, - // the stage may have been removed before handing this speculative TaskEnd event. - if (stageAttemptToNumSpeculativeTasks.contains(stageAttempt)) { - stageAttemptToNumSpeculativeTasks(stageAttempt) -= 1 - } } taskEnd.reason match { - case Success | _: TaskKilled => + case Success => + // Remove pending speculative task in case the normal task + // is finished before starting the speculative task + stageAttemptToPendingSpeculativeTasks.get(stageAttempt).foreach(_.remove(taskIndex)) + case _: TaskKilled => case _ => if (!hasPendingTasks) { // If the task failed (not intentionally killed), we expect it to be resubmitted @@ -810,9 +815,10 @@ private[spark] class ExecutorAllocationManager( val stageId = speculativeTask.stageId val stageAttemptId = speculativeTask.stageAttemptId val stageAttempt = StageAttempt(stageId, stageAttemptId) + val taskIndex = speculativeTask.taskIndex allocationManager.synchronized { - stageAttemptToNumSpeculativeTasks(stageAttempt) = - stageAttemptToNumSpeculativeTasks.getOrElse(stageAttempt, 0) + 1 + stageAttemptToPendingSpeculativeTasks.getOrElseUpdate(stageAttempt, + new mutable.HashSet[Int]).add(taskIndex) allocationManager.onSchedulerBacklogged() } } @@ -843,7 +849,7 @@ private[spark] class ExecutorAllocationManager( def removeStageFromResourceProfileIfUnused(stageAttempt: StageAttempt): Unit = { if (!stageAttemptToNumRunningTask.contains(stageAttempt) && !stageAttemptToNumTasks.contains(stageAttempt) && - !stageAttemptToNumSpeculativeTasks.contains(stageAttempt) && + !stageAttemptToPendingSpeculativeTasks.contains(stageAttempt) && !stageAttemptToTaskIndices.contains(stageAttempt) && !stageAttemptToSpeculativeTaskIndices.contains(stageAttempt) ) { @@ -896,9 +902,7 @@ private[spark] class ExecutorAllocationManager( } private def getPendingSpeculativeTaskSum(attempt: StageAttempt): Int = { - val numTotalTasks = stageAttemptToNumSpeculativeTasks.getOrElse(attempt, 0) - val numRunning = stageAttemptToSpeculativeTaskIndices.get(attempt).map(_.size).getOrElse(0) - numTotalTasks - numRunning + stageAttemptToPendingSpeculativeTasks.get(attempt).map(_.size).getOrElse(0) } /** diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index c55d44dfd59..bb17a987717 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -383,8 +383,8 @@ private[spark] class DAGScheduler( /** * Called by the TaskSetManager when it decides a speculative task is needed. */ - def speculativeTaskSubmitted(task: Task[_]): Unit = { - eventProcessLoop.post(SpeculativeTaskSubmitted(task)) + def speculativeTaskSubmitted(task: Task[_], taskIndex: Int): Unit = { + eventProcessLoop.post(SpeculativeTaskSubmitted(task, taskIndex)) } /** @@ -1178,8 +1178,10 @@ private[spark] class DAGScheduler( listenerBus.post(SparkListenerTaskStart(task.stageId, stageAttemptId, taskInfo)) } - private[scheduler] def handleSpeculativeTaskSubmitted(task: Task[_]): Unit = { - listenerBus.post(SparkListenerSpeculativeTaskSubmitted(task.stageId, task.stageAttemptId)) + private[scheduler] def handleSpeculativeTaskSubmitted(task: Task[_], taskIndex: Int): Unit = { + val speculativeTaskSubmittedEvent = new SparkListenerSpeculativeTaskSubmitted( + task.stageId, task.stageAttemptId, taskIndex, task.partitionId) + listenerBus.post(speculativeTaskSubmittedEvent) } private[scheduler] def handleUnschedulableTaskSetAdded( @@ -2962,8 +2964,8 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler case BeginEvent(task, taskInfo) => dagScheduler.handleBeginEvent(task, taskInfo) - case SpeculativeTaskSubmitted(task) => - dagScheduler.handleSpeculativeTaskSubmitted(task) + case SpeculativeTaskSubmitted(task, taskIndex) => + dagScheduler.handleSpeculativeTaskSubmitted(task, taskIndex) case UnschedulableTaskSetAdded(stageId, stageAttemptId) => dagScheduler.handleUnschedulableTaskSetAdded(stageId, stageAttemptId) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala index f9df8de620f..c16e5ea03d7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala @@ -99,7 +99,7 @@ case class TaskSetFailed(taskSet: TaskSet, reason: String, exception: Option[Thr private[scheduler] case object ResubmitFailedStages extends DAGSchedulerEvent private[scheduler] -case class SpeculativeTaskSubmitted(task: Task[_]) extends DAGSchedulerEvent +case class SpeculativeTaskSubmitted(task: Task[_], taskIndex: Int = -1) extends DAGSchedulerEvent private[scheduler] case class UnschedulableTaskSetAdded(stageId: Int, stageAttemptId: Int) diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index a9d86347940..d3bbbaffd59 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -56,7 +56,21 @@ case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) extends SparkListe case class SparkListenerSpeculativeTaskSubmitted( stageId: Int, stageAttemptId: Int = 0) - extends SparkListenerEvent + extends SparkListenerEvent { + // Note: this is here for backwards-compatibility with older versions of this event which + // didn't stored taskIndex + private var _taskIndex: Int = -1 + private var _partitionId: Int = -1 + + def taskIndex: Int = _taskIndex + def partitionId: Int = _partitionId + + def this(stageId: Int, stageAttemptId: Int, taskIndex: Int, partitionId: Int) = { + this(stageId, stageAttemptId) + _partitionId = partitionId + _taskIndex = taskIndex + } +} @DeveloperApi case class SparkListenerTaskEnd( diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 2ed1ce1a49e..cbb8fd0a334 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -1143,7 +1143,7 @@ private[spark] class TaskSetManager( " than %.0f ms(%d speculatable tasks in this taskset now)") .format(index, taskSet.id, info.host, threshold, speculatableTasks.size + 1)) speculatableTasks += index - sched.dagScheduler.speculativeTaskSubmitted(tasks(index)) + sched.dagScheduler.speculativeTaskSubmitted(tasks(index), index) } foundTasksResult |= speculated } diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index c616c43fe1b..1cb913b248f 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -461,6 +461,16 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite { assert(numExecutorsTargetForDefaultProfileId(manager) === 10) } + private def speculativeTaskSubmitEventFromTaskIndex( + stageId: Int, + stageAttemptId: Int = 0, + taskIndex: Int = -1, + partitionId: Int = -1): SparkListenerSpeculativeTaskSubmitted = { + val event = new SparkListenerSpeculativeTaskSubmitted(stageId, stageAttemptId, + taskIndex = taskIndex, partitionId = partitionId) + event + } + test("add executors when speculative tasks added") { val manager = createManager(createConf(0, 10, 0)) @@ -469,13 +479,13 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite { post(SparkListenerStageSubmitted(createStageInfo(1, 2))) // Verify that we're capped at number of tasks including the speculative ones in the stage - post(SparkListenerSpeculativeTaskSubmitted(1)) + post(speculativeTaskSubmitEventFromTaskIndex(1, taskIndex = 0)) assert(numExecutorsTargetForDefaultProfileId(manager) === 0) assert(numExecutorsToAddForDefaultProfile(manager) === 1) assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 1) doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis()) - post(SparkListenerSpeculativeTaskSubmitted(1)) - post(SparkListenerSpeculativeTaskSubmitted(1)) + post(speculativeTaskSubmitEventFromTaskIndex(1, taskIndex = 1)) + post(speculativeTaskSubmitEventFromTaskIndex(1, taskIndex = 2)) assert(numExecutorsTargetForDefaultProfileId(manager) === 1) assert(numExecutorsToAddForDefaultProfile(manager) === 2) assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 2) @@ -671,6 +681,83 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite { onExecutorRemoved(manager, "5") } + test("SPARK-41192: remove executors when task finished before speculative task scheduled") { + val clock = new ManualClock() + val stage = createStageInfo(0, 40) + val conf = createConf(0, 10, 0).set(config.EXECUTOR_CORES, 4) + val manager = createManager(conf, clock = clock) + val updatesNeeded = + new mutable.HashMap[ResourceProfile, ExecutorAllocationManager.TargetNumUpdates] + + // submit 40 tasks, total executors needed = 40/4 = 10 + post(SparkListenerStageSubmitted(stage)) + assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 1) + doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis()) + assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 2) + doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis()) + assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 4) + doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis()) + assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 3) + doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis()) + + (0 until 10).foreach(execId => onExecutorAddedDefaultProfile(manager, execId.toString)) + (0 until 40).map { i => createTaskInfo(i, i, executorId = s"${i / 4}")}.foreach { + info => post(SparkListenerTaskStart(0, 0, info)) + } + assert(numExecutorsTarget(manager, defaultProfile.id) === 10) + assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 10) + // 30 tasks (0 - 29) finished + (0 until 30).map { i => createTaskInfo(i, i, executorId = s"${i / 4}")}.foreach { + info => post(SparkListenerTaskEnd(0, 0, null, Success, info, new ExecutorMetrics, null)) } + // 10 speculative tasks (30 - 39) launch for the remaining tasks + (30 until 40).foreach { index => + post(speculativeTaskSubmitEventFromTaskIndex(0, taskIndex = index))} + clock.advance(1000) + manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime()) + assert(numExecutorsTarget(manager, defaultProfile.id) === 5) + assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 5) + (0 until 5).foreach { i => assert(removeExecutorDefaultProfile(manager, i.toString))} + (0 until 5).foreach { i => onExecutorRemoved(manager, i.toString)} + + // 5 original tasks (30 - 34) finished before speculative task start, + // the speculative task will be removed from pending tasks + // executors needed = (5 + 5) / 4 + 1 + (30 until 35).map { i => + createTaskInfo(i, i, executorId = s"${i / 4}")} + .foreach { info => post( + SparkListenerTaskEnd(0, 0, null, Success, info, new ExecutorMetrics, null))} + clock.advance(1000) + manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime()) + assert(numExecutorsTarget(manager, defaultProfile.id) === 3) + assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 3) + + (40 until 45).map { i => + createTaskInfo(i, i - 5, executorId = s"${i / 4}", speculative = true) + }.foreach { + info => post(SparkListenerTaskStart(0, 0, info)) + } + clock.advance(1000) + manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime()) + assert(numExecutorsTarget(manager, defaultProfile.id) === 3) + assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 3) + + (35 until 39).map { i => + createTaskInfo(i, i, executorId = s"${i / 4}") + }.foreach { + info => post(SparkListenerTaskEnd(0, 0, null, Success, info, new ExecutorMetrics, null)) + } + (35 until 39).map { i => + createTaskInfo(i + 5, i, executorId = s"${(i + 5) / 4}", speculative = true) + }.foreach { + info => post(SparkListenerTaskEnd(0, 0, null, TaskKilled("attempt"), + info, new ExecutorMetrics, null)) + } + clock.advance(1000) + manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime()) + assert(numExecutorsTarget(manager, defaultProfile.id) === 1) + assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 1) + } + test("SPARK-30511 remove executors when speculative tasks end") { val clock = new ManualClock() val stage = createStageInfo(0, 40) @@ -707,7 +794,7 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite { (0 to 6).foreach { i => onExecutorRemoved(manager, i.toString)} // 10 speculative tasks (30 - 39) launch for the remaining tasks - (30 to 39).foreach { _ => post(SparkListenerSpeculativeTaskSubmitted(0))} + (30 to 39).foreach { i => post(speculativeTaskSubmitEventFromTaskIndex(0, taskIndex = i))} assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 1) doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis()) assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 1) @@ -785,7 +872,7 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite { createTaskInfo(38, 38, executorId = "9"), new ExecutorMetrics, null)) post(SparkListenerTaskEnd(0, 0, null, UnknownReason, createTaskInfo(49, 39, executorId = "12", speculative = true), new ExecutorMetrics, null)) - post(SparkListenerSpeculativeTaskSubmitted(0)) + post(speculativeTaskSubmitEventFromTaskIndex(0, taskIndex = 39)) clock.advance(1000) manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime()) // maxNeeded = 1, allocate one more to satisfy speculation locality requirement diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 2dc7f0d0dfa..a3b9eff8084 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -74,7 +74,7 @@ class FakeDAGScheduler(sc: SparkContext, taskScheduler: FakeTaskScheduler) taskScheduler.taskSetsFailed += taskSet.id } - override def speculativeTaskSubmitted(task: Task[_]): Unit = { + override def speculativeTaskSubmitted(task: Task[_], taskIndex: Int): Unit = { taskScheduler.speculativeTasks += task.partitionId } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org