This is an automated email from the ASF dual-hosted git repository. mridulm80 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new b219f27586b [SPARK-41469][CORE] Avoid unnecessary task rerun on decommissioned executor lost if shuffle data migrated b219f27586b is described below commit b219f27586b80bb552f0ae3a4121c7f12ed1f970 Author: Yi Wu <yi...@databricks.com> AuthorDate: Tue Dec 27 15:37:45 2022 -0600 [SPARK-41469][CORE] Avoid unnecessary task rerun on decommissioned executor lost if shuffle data migrated ### What changes were proposed in this pull request? This PR proposes to avoid rerunning the finished shuffle map task in `TaskSetManager.executorLost()` if the executor lost is caused by decommission and the shuffle data has been successfully migrated. ### Why are the changes needed? To avoid unnecessary task recomputation. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added UT Closes #39011 from Ngone51/decom-executor-lost. Authored-by: Yi Wu <yi...@databricks.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> --- .../scala/org/apache/spark/MapOutputTracker.scala | 11 ++++++ .../org/apache/spark/scheduler/DAGScheduler.scala | 7 +++- .../scala/org/apache/spark/scheduler/TaskSet.scala | 3 +- .../apache/spark/scheduler/TaskSetManager.scala | 42 ++++++++++++++++++--- .../spark/deploy/DecommissionWorkerSuite.scala | 3 ++ .../org/apache/spark/scheduler/FakeTask.scala | 6 +-- .../org/apache/spark/scheduler/PoolSuite.scala | 2 +- .../spark/scheduler/TaskSchedulerImplSuite.scala | 6 +-- .../spark/scheduler/TaskSetManagerSuite.scala | 44 ++++++++++++++++++++-- 9 files changed, 105 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 3b5a21df4d6..a163fef693e 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -1128,6 +1128,17 @@ private[spark] class MapOutputTrackerMaster( } } + /** + * Get map output location by (shuffleId, mapId) + */ + def getMapOutputLocation(shuffleId: Int, mapId: Long): Option[BlockManagerId] = { + shuffleStatuses.get(shuffleId).flatMap { shuffleStatus => + shuffleStatus.withMapStatuses { mapStatues => + mapStatues.filter(_ != null).find(_.mapId == mapId).map(_.location) + } + } + } + def incrementEpoch(): Unit = { epochLock.synchronized { epoch += 1 diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index bb17a987717..cc991178481 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1590,9 +1590,14 @@ private[spark] class DAGScheduler( if (tasks.nonEmpty) { logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " + s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})") + val shuffleId = stage match { + case s: ShuffleMapStage => Some(s.shuffleDep.shuffleId) + case _: ResultStage => None + } + taskScheduler.submitTasks(new TaskSet( tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties, - stage.resourceProfileId)) + stage.resourceProfileId, shuffleId)) } else { // Because we posted SparkListenerStageSubmitted earlier, we should mark // the stage as completed here in case there are no tasks to run diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala index 7a8ed16f6eb..6411757313e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala @@ -29,7 +29,8 @@ private[spark] class TaskSet( val stageAttemptId: Int, val priority: Int, val properties: Properties, - val resourceProfileId: Int) { + val resourceProfileId: Int, + val shuffleId: Option[Int]) { val id: String = stageId + "." + stageAttemptId override def toString: String = "TaskSet " + id diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index cbb8fd0a334..124a27502fe 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -76,6 +76,8 @@ private[spark] class TaskSetManager( val tasks = taskSet.tasks private val isShuffleMapTasks = tasks(0).isInstanceOf[ShuffleMapTask] + // shuffleId is only available when isShuffleMapTasks=true + private val shuffleId = taskSet.shuffleId private[scheduler] val partitionToIndex = tasks.zipWithIndex .map { case (t, idx) => t.partitionId -> idx }.toMap val numTasks = tasks.length @@ -1046,17 +1048,45 @@ private[spark] class TaskSetManager( /** Called by TaskScheduler when an executor is lost so we can re-enqueue our tasks */ override def executorLost(execId: String, host: String, reason: ExecutorLossReason): Unit = { - // Re-enqueue any tasks that ran on the failed executor if this is a shuffle map stage, - // and we are not using an external shuffle server which could serve the shuffle outputs. - // The reason is the next stage wouldn't be able to fetch the data from this dead executor - // so we would need to rerun these tasks on other executors. - if (isShuffleMapTasks && !env.blockManager.externalShuffleServiceEnabled && !isZombie) { + // Re-enqueue any tasks with potential shuffle data loss that ran on the failed executor + // if this is a shuffle map stage, and we are not using an external shuffle server which + // could serve the shuffle outputs or the executor lost is caused by decommission (which + // can destroy the whole host). The reason is the next stage wouldn't be able to fetch the + // data from this dead executor so we would need to rerun these tasks on other executors. + val maybeShuffleMapOutputLoss = isShuffleMapTasks && + (reason.isInstanceOf[ExecutorDecommission] || !env.blockManager.externalShuffleServiceEnabled) + if (maybeShuffleMapOutputLoss && !isZombie) { for ((tid, info) <- taskInfos if info.executorId == execId) { val index = info.index + lazy val isShuffleMapOutputAvailable = reason match { + case ExecutorDecommission(_, _) => + val mapId = if (conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL)) { + info.partitionId + } else { + tid + } + val locationOpt = env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster] + .getMapOutputLocation(shuffleId.get, mapId) + // There are 3 cases of locationOpt: + // 1) locationOpt.isDefined && locationOpt.get.host == host: + // this case implies that the shuffle map output is still on the lost executor. The + // map output file is supposed to lose so we should rerun this task; + // 2) locationOpt.isDefined && locationOpt.get.host != host: + // this case implies that the shuffle map output has been migrated to another + // host. The task doesn't need to rerun; + // 3) locationOpt.isEmpty: + // This shouldn't not happen ideally since TaskSetManager handles executor lost first + // before DAGScheduler. So the map statues for the successful task must be available + // at this moment. keep it here in case the handling order changes. + locationOpt.exists(_.host != host) + + case _ => false + } // We may have a running task whose partition has been marked as successful, // this partition has another task completed in another stage attempt. // We treat it as a running task and will call handleFailedTask later. - if (successful(index) && !info.running && !killedByOtherAttempt.contains(tid)) { + if (successful(index) && !info.running && !killedByOtherAttempt.contains(tid) && + !isShuffleMapOutputAvailable) { successful(index) = false copiesRunning(index) -= 1 tasksSuccessful -= 1 diff --git a/core/src/test/scala/org/apache/spark/deploy/DecommissionWorkerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/DecommissionWorkerSuite.scala index c2486b9650d..fe9bce770f5 100644 --- a/core/src/test/scala/org/apache/spark/deploy/DecommissionWorkerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/DecommissionWorkerSuite.scala @@ -321,6 +321,9 @@ class DecommissionWorkerSuite } override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { + // Task resubmit is a signal to DAGScheduler not a real task end event. Ignore it here + // to avoid over count. + if (taskEnd.reason == Resubmitted) return val taskSignature = getSignature(taskEnd.taskInfo, taskEnd.stageId, taskEnd.stageAttemptId) logInfo(s"Task End $taskSignature") tasksFinished.add(taskSignature) diff --git a/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala b/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala index fdd89378927..6ab56d3fffe 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala @@ -73,7 +73,7 @@ object FakeTask { val tasks = Array.tabulate[Task[_]](numTasks) { i => new FakeTask(stageId, i, if (prefLocs.size != 0) prefLocs(i) else Nil) } - new TaskSet(tasks, stageId, stageAttemptId, priority = priority, null, rpId) + new TaskSet(tasks, stageId, stageAttemptId, priority = priority, null, rpId, None) } def createShuffleMapTaskSet( @@ -100,7 +100,7 @@ object FakeTask { SparkEnv.get.closureSerializer.newInstance().serialize(TaskMetrics.registered).array()) } new TaskSet(tasks, stageId, stageAttemptId, priority = priority, null, - ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) + ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID, Some(0)) } def createBarrierTaskSet(numTasks: Int, prefLocs: Seq[TaskLocation]*): TaskSet = { @@ -129,6 +129,6 @@ object FakeTask { val tasks = Array.tabulate[Task[_]](numTasks) { i => new FakeTask(stageId, i, if (prefLocs.size != 0) prefLocs(i) else Nil, isBarrier = true) } - new TaskSet(tasks, stageId, stageAttemptId, priority = priority, null, rpId) + new TaskSet(tasks, stageId, stageAttemptId, priority = priority, null, rpId, None) } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala index fa2c5eaee8b..85ade97eb92 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala @@ -45,7 +45,7 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { new FakeTask(stageId, i, Nil) } new TaskSetManager(taskScheduler, new TaskSet(tasks, stageId, 0, 0, null, - ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID), 0) + ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID, None), 0) } def scheduleTaskAndVerifyId(taskId: Int, rootPool: Pool, expectedStageId: Int): Unit = { diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index eec5449bc72..af4cf8731b6 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -531,7 +531,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext val numFreeCores = 1 val taskSet = new TaskSet( Array(new NotSerializableFakeTask(1, 0), new NotSerializableFakeTask(0, 1)), - 0, 0, 0, null, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) + 0, 0, 0, null, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID, None) val multiCoreWorkerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", taskCpus), new WorkerOffer("executor1", "host1", numFreeCores)) taskScheduler.submitTasks(taskSet) @@ -546,7 +546,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext taskScheduler.submitTasks(FakeTask.createTaskSet(1)) val taskSet2 = new TaskSet( Array(new NotSerializableFakeTask(1, 0), new NotSerializableFakeTask(0, 1)), - 1, 0, 0, null, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) + 1, 0, 0, null, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID, None) taskScheduler.submitTasks(taskSet2) taskDescriptions = taskScheduler.resourceOffers(multiCoreWorkerOffers).flatten assert(taskDescriptions.map(_.executorId) === Seq("executor0")) @@ -2160,7 +2160,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext override def index: Int = 1 }, 1, Seq(TaskLocation("host1", "executor1")), new Properties, null) - val taskSet = new TaskSet(Array(task1, task2), 0, 0, 0, null, 0) + val taskSet = new TaskSet(Array(task1, task2), 0, 0, 0, null, 0, Some(0)) taskScheduler.submitTasks(taskSet) val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index a3b9eff8084..45360f486ed 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -670,6 +670,42 @@ class TaskSetManagerSuite assert(manager.resourceOffer("execA", "host1", ANY)._1.isDefined) } + test("SPARK-41469: task doesn't need to rerun on executor lost if shuffle data has migrated") { + sc = new SparkContext("local", "test") + sched = new FakeTaskScheduler(sc) + val backend = mock(classOf[SchedulerBackend]) + doNothing().when(backend).reviveOffers() + sched.initialize(backend) + + sched.addExecutor("exec0", "host0") + + val mapOutputTracker = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster] + mapOutputTracker.registerShuffle(0, 2, 0) + + val taskSet = FakeTask.createShuffleMapTaskSet(2, 0, 0, + Seq(TaskLocation("host0", "exec0")), Seq(TaskLocation("host1", "exec1"))) + sched.submitTasks(taskSet) + val manager = sched.taskSetManagerForAttempt(0, 0).get + + // Schedule task 0 and mark it as completed with shuffle map output registered + val taskDesc = manager.resourceOffer("exec0", "host0", PROCESS_LOCAL)._1 + assert(taskDesc.isDefined) + val taskIndex = taskDesc.get.index + val taskId = taskDesc.get.taskId + manager.handleSuccessfulTask(taskId, createTaskResult(taskId.toInt)) + mapOutputTracker.registerMapOutput(0, taskIndex, + MapStatus(BlockManagerId("exec0", "host0", 8848), Array(1024), taskId)) + + // Mock executor "exec0" decommission and migrate shuffle map output of task 0 + manager.executorDecommission("exec0") + mapOutputTracker.updateMapOutput(0, taskId, BlockManagerId("exec1", "host1", 8848)) + + // Trigger executor "exec0" lost. Since the map output of task 0 has been migrated, it doesn't + // need to rerun. So task 0 should still remain in the successful status. + manager.executorLost("exec0", "host0", ExecutorDecommission()) + assert(manager.successful(taskIndex)) + } + test("SPARK-32653: Decommissioned host should not be used to calculate locality levels") { sc = new SparkContext("local", "test") sched = new FakeTaskScheduler(sc) @@ -770,7 +806,7 @@ class TaskSetManagerSuite sched = new FakeTaskScheduler(sc, ("exec1", "host1")) val taskSet = new TaskSet(Array(new LargeTask(0)), 0, 0, 0, - null, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) + null, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID, None) val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES) assert(!manager.emittedTaskSizeWarning) @@ -786,7 +822,7 @@ class TaskSetManagerSuite val taskSet = new TaskSet( Array(new NotSerializableFakeTask(1, 0), new NotSerializableFakeTask(0, 1)), - 0, 0, 0, null, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) + 0, 0, 0, null, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID, None) val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES) intercept[TaskNotSerializableException] { @@ -866,7 +902,7 @@ class TaskSetManagerSuite override def index: Int = 0 }, 1, Seq(TaskLocation("host1", "execA")), new Properties, null) val taskSet = new TaskSet(Array(singleTask), 0, 0, 0, - null, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) + null, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID, Some(0)) val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES) // Offer host1, which should be accepted as a PROCESS_LOCAL location @@ -2192,7 +2228,7 @@ class TaskSetManagerSuite val tasks = Array.tabulate[Task[_]](2)(partition => new FakeLongTasks(stageId = 0, partition)) val taskSet: TaskSet = new TaskSet(tasks, stageId = 0, stageAttemptId = 0, priority = 0, null, - ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) + ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID, None) val stageId = taskSet.stageId val stageAttemptId = taskSet.stageAttemptId sched.submitTasks(taskSet) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org