This is an automated email from the ASF dual-hosted git repository. irashid pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 38263f6 [SPARK-27630][CORE] Properly handle task end events from completed stages 38263f6 is described below commit 38263f6d153944b6f2f0248d9284861fc82532d6 Author: sychen <syc...@ctrip.com> AuthorDate: Tue Jun 25 14:30:13 2019 -0500 [SPARK-27630][CORE] Properly handle task end events from completed stages ## What changes were proposed in this pull request? Track tasks separately for each stage attempt (instead of tracking by stage), and do NOT reset the numRunningTasks to 0 on StageCompleted. In the case of stage retry, the `taskEnd` event from the zombie stage sometimes makes the number of `totalRunningTasks` negative, which will causes the job to get stuck. Similar problem also exists with `stageIdToTaskIndices` & `stageIdToSpeculativeTaskIndices`. If it is a failed `taskEnd` event of the zombie stage, this will cause `stageIdToTaskIndices` or `stageIdToSpeculativeTaskIndices` to remove the task index of the active stage, and the number of `totalPendingTasks` will increase unexpectedly. ## How was this patch tested? unit test properly handle task end events from completed stages Closes #24497 from cxzl25/fix_stuck_job_follow_up. Authored-by: sychen <syc...@ctrip.com> Signed-off-by: Imran Rashid <iras...@cloudera.com> --- .../apache/spark/ExecutorAllocationManager.scala | 113 ++++++++++++--------- .../org/apache/spark/scheduler/DAGScheduler.scala | 2 +- .../org/apache/spark/scheduler/SparkListener.scala | 5 +- .../spark/ExecutorAllocationManagerSuite.scala | 33 +++++- project/MimaExcludes.scala | 6 ++ 5 files changed, 104 insertions(+), 55 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index bb95fea..bceb26c 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -491,6 +491,10 @@ private[spark] class ExecutorAllocationManager( numExecutorsToAdd = 1 } + private case class StageAttempt(stageId: Int, stageAttemptId: Int) { + override def toString: String = s"Stage $stageId (Attempt $stageAttemptId)" + } + /** * A listener that notifies the given allocation manager of when to add and remove executors. * @@ -499,29 +503,32 @@ private[spark] class ExecutorAllocationManager( */ private[spark] class ExecutorAllocationListener extends SparkListener { - private val stageIdToNumTasks = new mutable.HashMap[Int, Int] - // Number of running tasks per stage including speculative tasks. + private val stageAttemptToNumTasks = new mutable.HashMap[StageAttempt, Int] + // Number of running tasks per stageAttempt including speculative tasks. // Should be 0 when no stages are active. - private val stageIdToNumRunningTask = new mutable.HashMap[Int, Int] - private val stageIdToTaskIndices = new mutable.HashMap[Int, mutable.HashSet[Int]] - // Number of speculative tasks to be scheduled in each stage - private val stageIdToNumSpeculativeTasks = new mutable.HashMap[Int, Int] - // The speculative tasks started in each stage - private val stageIdToSpeculativeTaskIndices = new mutable.HashMap[Int, mutable.HashSet[Int]] - - // stageId to tuple (the number of task with locality preferences, a map where each pair is a - // node and the number of tasks that would like to be scheduled on that node) map, - // maintain the executor placement hints for each stage Id used by resource framework to better - // place the executors. - private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, (Int, Map[String, Int])] + private val stageAttemptToNumRunningTask = new mutable.HashMap[StageAttempt, Int] + private val stageAttemptToTaskIndices = new mutable.HashMap[StageAttempt, mutable.HashSet[Int]] + // Number of speculative tasks to be scheduled in each stageAttempt + private val stageAttemptToNumSpeculativeTasks = new mutable.HashMap[StageAttempt, Int] + // The speculative tasks started in each stageAttempt + private val stageAttemptToSpeculativeTaskIndices = + new mutable.HashMap[StageAttempt, mutable.HashSet[Int]] + + // stageAttempt to tuple (the number of task with locality preferences, a map where each pair + // is a node and the number of tasks that would like to be scheduled on that node) map, + // maintain the executor placement hints for each stageAttempt used by resource framework + // to better place the executors. + private val stageAttemptToExecutorPlacementHints = + new mutable.HashMap[StageAttempt, (Int, Map[String, Int])] override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = { initializing = false val stageId = stageSubmitted.stageInfo.stageId + val stageAttemptId = stageSubmitted.stageInfo.attemptNumber() + val stageAttempt = StageAttempt(stageId, stageAttemptId) val numTasks = stageSubmitted.stageInfo.numTasks allocationManager.synchronized { - stageIdToNumTasks(stageId) = numTasks - stageIdToNumRunningTask(stageId) = 0 + stageAttemptToNumTasks(stageAttempt) = numTasks allocationManager.onSchedulerBacklogged() // Compute the number of tasks requested by the stage on each host @@ -536,7 +543,7 @@ private[spark] class ExecutorAllocationManager( } } } - stageIdToExecutorPlacementHints.put(stageId, + stageAttemptToExecutorPlacementHints.put(stageAttempt, (numTasksPending, hostToLocalTaskCountPerStage.toMap)) // Update the executor placement hints @@ -546,20 +553,24 @@ private[spark] class ExecutorAllocationManager( override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { val stageId = stageCompleted.stageInfo.stageId + val stageAttemptId = stageCompleted.stageInfo.attemptNumber() + val stageAttempt = StageAttempt(stageId, stageAttemptId) allocationManager.synchronized { - stageIdToNumTasks -= stageId - stageIdToNumRunningTask -= stageId - stageIdToNumSpeculativeTasks -= stageId - stageIdToTaskIndices -= stageId - stageIdToSpeculativeTaskIndices -= stageId - stageIdToExecutorPlacementHints -= stageId + // do NOT remove stageAttempt from stageAttemptToNumRunningTasks, + // because the attempt may still have running tasks, + // even after another attempt for the stage is submitted. + stageAttemptToNumTasks -= stageAttempt + stageAttemptToNumSpeculativeTasks -= stageAttempt + stageAttemptToTaskIndices -= stageAttempt + stageAttemptToSpeculativeTaskIndices -= stageAttempt + stageAttemptToExecutorPlacementHints -= stageAttempt // Update the executor placement hints updateExecutorPlacementHints() // If this is the last stage with pending tasks, mark the scheduler queue as empty // This is needed in case the stage is aborted for any reason - if (stageIdToNumTasks.isEmpty && stageIdToNumSpeculativeTasks.isEmpty) { + if (stageAttemptToNumTasks.isEmpty && stageAttemptToNumSpeculativeTasks.isEmpty) { allocationManager.onSchedulerQueueEmpty() } } @@ -567,19 +578,19 @@ private[spark] class ExecutorAllocationManager( override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { val stageId = taskStart.stageId + val stageAttemptId = taskStart.stageAttemptId + val stageAttempt = StageAttempt(stageId, stageAttemptId) val taskIndex = taskStart.taskInfo.index - allocationManager.synchronized { - if (stageIdToNumRunningTask.contains(stageId)) { - stageIdToNumRunningTask(stageId) += 1 - } - + stageAttemptToNumRunningTask(stageAttempt) = + stageAttemptToNumRunningTask.getOrElse(stageAttempt, 0) + 1 // If this is the last pending task, mark the scheduler queue as empty if (taskStart.taskInfo.speculative) { - stageIdToSpeculativeTaskIndices.getOrElseUpdate(stageId, new mutable.HashSet[Int]) += - taskIndex + stageAttemptToSpeculativeTaskIndices.getOrElseUpdate(stageAttempt, + new mutable.HashSet[Int]) += taskIndex } else { - stageIdToTaskIndices.getOrElseUpdate(stageId, new mutable.HashSet[Int]) += taskIndex + stageAttemptToTaskIndices.getOrElseUpdate(stageAttempt, + new mutable.HashSet[Int]) += taskIndex } if (totalPendingTasks() == 0) { allocationManager.onSchedulerQueueEmpty() @@ -588,13 +599,17 @@ private[spark] class ExecutorAllocationManager( } override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { - val taskIndex = taskEnd.taskInfo.index val stageId = taskEnd.stageId + val stageAttemptId = taskEnd.stageAttemptId + val stageAttempt = StageAttempt(stageId, stageAttemptId) + val taskIndex = taskEnd.taskInfo.index allocationManager.synchronized { - if (stageIdToNumRunningTask.contains(stageId)) { - stageIdToNumRunningTask(stageId) -= 1 + if (stageAttemptToNumRunningTask.contains(stageAttempt)) { + stageAttemptToNumRunningTask(stageAttempt) -= 1 + if (stageAttemptToNumRunningTask(stageAttempt) == 0) { + stageAttemptToNumRunningTask -= stageAttempt + } } - // If the task failed, we expect it to be resubmitted later. To ensure we have // enough resources to run the resubmitted task, we need to mark the scheduler // as backlogged again if it's not already marked as such (SPARK-8366) @@ -603,9 +618,9 @@ private[spark] class ExecutorAllocationManager( allocationManager.onSchedulerBacklogged() } if (taskEnd.taskInfo.speculative) { - stageIdToSpeculativeTaskIndices.get(stageId).foreach {_.remove(taskIndex)} + stageAttemptToSpeculativeTaskIndices.get(stageAttempt).foreach {_.remove(taskIndex)} } else { - stageIdToTaskIndices.get(stageId).foreach {_.remove(taskIndex)} + stageAttemptToTaskIndices.get(stageAttempt).foreach {_.remove(taskIndex)} } } } @@ -613,11 +628,12 @@ private[spark] class ExecutorAllocationManager( override def onSpeculativeTaskSubmitted(speculativeTask: SparkListenerSpeculativeTaskSubmitted) : Unit = { - val stageId = speculativeTask.stageId - + val stageId = speculativeTask.stageId + val stageAttemptId = speculativeTask.stageAttemptId + val stageAttempt = StageAttempt(stageId, stageAttemptId) allocationManager.synchronized { - stageIdToNumSpeculativeTasks(stageId) = - stageIdToNumSpeculativeTasks.getOrElse(stageId, 0) + 1 + stageAttemptToNumSpeculativeTasks(stageAttempt) = + stageAttemptToNumSpeculativeTasks.getOrElse(stageAttempt, 0) + 1 allocationManager.onSchedulerBacklogged() } } @@ -629,14 +645,14 @@ private[spark] class ExecutorAllocationManager( * Note: This is not thread-safe without the caller owning the `allocationManager` lock. */ def pendingTasks(): Int = { - stageIdToNumTasks.map { case (stageId, numTasks) => - numTasks - stageIdToTaskIndices.get(stageId).map(_.size).getOrElse(0) + stageAttemptToNumTasks.map { case (stageAttempt, numTasks) => + numTasks - stageAttemptToTaskIndices.get(stageAttempt).map(_.size).getOrElse(0) }.sum } def pendingSpeculativeTasks(): Int = { - stageIdToNumSpeculativeTasks.map { case (stageId, numTasks) => - numTasks - stageIdToSpeculativeTaskIndices.get(stageId).map(_.size).getOrElse(0) + stageAttemptToNumSpeculativeTasks.map { case (stageAttempt, numTasks) => + numTasks - stageAttemptToSpeculativeTaskIndices.get(stageAttempt).map(_.size).getOrElse(0) }.sum } @@ -646,9 +662,10 @@ private[spark] class ExecutorAllocationManager( /** * The number of tasks currently running across all stages. + * Include running-but-zombie stage attempts */ def totalRunningTasks(): Int = { - stageIdToNumRunningTask.values.sum + stageAttemptToNumRunningTask.values.sum } /** @@ -662,7 +679,7 @@ private[spark] class ExecutorAllocationManager( def updateExecutorPlacementHints(): Unit = { var localityAwareTasks = 0 val localityToCount = new mutable.HashMap[String, Int]() - stageIdToExecutorPlacementHints.values.foreach { case (numTasksPending, localities) => + stageAttemptToExecutorPlacementHints.values.foreach { case (numTasksPending, localities) => localityAwareTasks += numTasksPending localities.foreach { case (hostname, count) => val updatedCount = localityToCount.getOrElse(hostname, 0) + count diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index de57807..5072e61 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -933,7 +933,7 @@ private[spark] class DAGScheduler( } private[scheduler] def handleSpeculativeTaskSubmitted(task: Task[_]): Unit = { - listenerBus.post(SparkListenerSpeculativeTaskSubmitted(task.stageId)) + listenerBus.post(SparkListenerSpeculativeTaskSubmitted(task.stageId, task.stageAttemptId)) } private[scheduler] def handleTaskSetFailed( diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index 1acfd90..666ce3d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -52,7 +52,10 @@ case class SparkListenerTaskStart(stageId: Int, stageAttemptId: Int, taskInfo: T case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) extends SparkListenerEvent @DeveloperApi -case class SparkListenerSpeculativeTaskSubmitted(stageId: Int) extends SparkListenerEvent +case class SparkListenerSpeculativeTaskSubmitted( + stageId: Int, + stageAttemptId: Int = 0) + extends SparkListenerEvent @DeveloperApi case class SparkListenerTaskEnd( diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index 2b75f2e..3ba33e3 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -254,14 +254,19 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite { assert(numExecutorsToAdd(manager) === 1) // Verify that running a speculative task doesn't affect the target - post(SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-2", true))) + post(SparkListenerTaskStart(1, 0, createTaskInfo(1, 0, "executor-2", true))) assert(numExecutorsTarget(manager) === 5) assert(addExecutors(manager) === 0) assert(numExecutorsToAdd(manager) === 1) } - test("ignore task end events from completed stages") { + test("properly handle task end events from completed stages") { val manager = createManager(createConf(0, 10, 0)) + + // We simulate having a stage fail, but with tasks still running. Then another attempt for + // that stage is started, and we get task completions from the first stage attempt. Make sure + // the value of `totalTasksRunning` is consistent as tasks finish from both attempts (we count + // all running tasks, from the zombie & non-zombie attempts) val stage = createStageInfo(0, 5) post(SparkListenerStageSubmitted(stage)) val taskInfo1 = createTaskInfo(0, 0, "executor-1") @@ -269,10 +274,27 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite { post(SparkListenerTaskStart(0, 0, taskInfo1)) post(SparkListenerTaskStart(0, 0, taskInfo2)) + // The tasks in the zombie attempt haven't completed yet, so we still count them post(SparkListenerStageCompleted(stage)) + // There are still two tasks that belong to the zombie stage running. + assert(totalRunningTasks(manager) === 2) + + // submit another attempt for the stage. We count completions from the first zombie attempt + val stageAttempt1 = createStageInfo(stage.stageId, 5, attemptId = 1) + post(SparkListenerStageSubmitted(stageAttempt1)) post(SparkListenerTaskEnd(0, 0, null, Success, taskInfo1, null)) - post(SparkListenerTaskEnd(2, 0, null, Success, taskInfo2, null)) + assert(totalRunningTasks(manager) === 1) + val attemptTaskInfo1 = createTaskInfo(3, 0, "executor-1") + val attemptTaskInfo2 = createTaskInfo(4, 1, "executor-1") + post(SparkListenerTaskStart(0, 1, attemptTaskInfo1)) + post(SparkListenerTaskStart(0, 1, attemptTaskInfo2)) + assert(totalRunningTasks(manager) === 3) + post(SparkListenerTaskEnd(0, 1, null, Success, attemptTaskInfo1, null)) + assert(totalRunningTasks(manager) === 2) + post(SparkListenerTaskEnd(0, 0, null, Success, taskInfo2, null)) + assert(totalRunningTasks(manager) === 1) + post(SparkListenerTaskEnd(0, 1, null, Success, attemptTaskInfo2, null)) assert(totalRunningTasks(manager) === 0) } @@ -1033,9 +1055,10 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester { private def createStageInfo( stageId: Int, numTasks: Int, - taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty + taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty, + attemptId: Int = 0 ): StageInfo = { - new StageInfo(stageId, 0, "name", numTasks, Seq.empty, Seq.empty, "no details", + new StageInfo(stageId, attemptId, "name", numTasks, Seq.empty, Seq.empty, "no details", taskLocalityPreferences = taskLocalityPreferences) } diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 38ec5a0..cb3b803 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -79,6 +79,12 @@ object MimaExcludes { ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerApplicationStart.this"), ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.scheduler.SparkListenerApplicationStart$"), + // [SPARK-27630][CORE] Properly handle task end events from completed stages + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerSpeculativeTaskSubmitted.apply"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerSpeculativeTaskSubmitted.copy"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerSpeculativeTaskSubmitted.this"), + ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.scheduler.SparkListenerSpeculativeTaskSubmitted$"), + // [SPARK-26632][Core] Separate Thread Configurations of Driver and Executor ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.network.netty.SparkTransportConf.fromSparkConf"), --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org