This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new b219f27586b [SPARK-41469][CORE] Avoid unnecessary task rerun on 
decommissioned executor lost if shuffle data migrated
b219f27586b is described below

commit b219f27586b80bb552f0ae3a4121c7f12ed1f970
Author: Yi Wu <yi...@databricks.com>
AuthorDate: Tue Dec 27 15:37:45 2022 -0600

    [SPARK-41469][CORE] Avoid unnecessary task rerun on decommissioned executor 
lost if shuffle data migrated
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to avoid rerunning the finished shuffle map task in 
`TaskSetManager.executorLost()` if the executor lost is caused by decommission 
and the shuffle data has been successfully migrated.
    
    ### Why are the changes needed?
    
    To avoid unnecessary task recomputation.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Added UT
    
    Closes #39011 from Ngone51/decom-executor-lost.
    
    Authored-by: Yi Wu <yi...@databricks.com>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
 .../scala/org/apache/spark/MapOutputTracker.scala  | 11 ++++++
 .../org/apache/spark/scheduler/DAGScheduler.scala  |  7 +++-
 .../scala/org/apache/spark/scheduler/TaskSet.scala |  3 +-
 .../apache/spark/scheduler/TaskSetManager.scala    | 42 ++++++++++++++++++---
 .../spark/deploy/DecommissionWorkerSuite.scala     |  3 ++
 .../org/apache/spark/scheduler/FakeTask.scala      |  6 +--
 .../org/apache/spark/scheduler/PoolSuite.scala     |  2 +-
 .../spark/scheduler/TaskSchedulerImplSuite.scala   |  6 +--
 .../spark/scheduler/TaskSetManagerSuite.scala      | 44 ++++++++++++++++++++--
 9 files changed, 105 insertions(+), 19 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala 
b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 3b5a21df4d6..a163fef693e 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -1128,6 +1128,17 @@ private[spark] class MapOutputTrackerMaster(
     }
   }
 
+  /**
+   * Get map output location by (shuffleId, mapId)
+   */
+  def getMapOutputLocation(shuffleId: Int, mapId: Long): 
Option[BlockManagerId] = {
+    shuffleStatuses.get(shuffleId).flatMap { shuffleStatus =>
+      shuffleStatus.withMapStatuses { mapStatues =>
+        mapStatues.filter(_ != null).find(_.mapId == mapId).map(_.location)
+      }
+    }
+  }
+
   def incrementEpoch(): Unit = {
     epochLock.synchronized {
       epoch += 1
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index bb17a987717..cc991178481 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1590,9 +1590,14 @@ private[spark] class DAGScheduler(
     if (tasks.nonEmpty) {
       logInfo(s"Submitting ${tasks.size} missing tasks from $stage 
(${stage.rdd}) (first 15 " +
         s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")
+      val shuffleId = stage match {
+        case s: ShuffleMapStage => Some(s.shuffleDep.shuffleId)
+        case _: ResultStage => None
+      }
+
       taskScheduler.submitTasks(new TaskSet(
         tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, 
properties,
-        stage.resourceProfileId))
+        stage.resourceProfileId, shuffleId))
     } else {
       // Because we posted SparkListenerStageSubmitted earlier, we should mark
       // the stage as completed here in case there are no tasks to run
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala
index 7a8ed16f6eb..6411757313e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala
@@ -29,7 +29,8 @@ private[spark] class TaskSet(
     val stageAttemptId: Int,
     val priority: Int,
     val properties: Properties,
-    val resourceProfileId: Int) {
+    val resourceProfileId: Int,
+    val shuffleId: Option[Int]) {
   val id: String = stageId + "." + stageAttemptId
 
   override def toString: String = "TaskSet " + id
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index cbb8fd0a334..124a27502fe 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -76,6 +76,8 @@ private[spark] class TaskSetManager(
 
   val tasks = taskSet.tasks
   private val isShuffleMapTasks = tasks(0).isInstanceOf[ShuffleMapTask]
+  // shuffleId is only available when isShuffleMapTasks=true
+  private val shuffleId = taskSet.shuffleId
   private[scheduler] val partitionToIndex = tasks.zipWithIndex
     .map { case (t, idx) => t.partitionId -> idx }.toMap
   val numTasks = tasks.length
@@ -1046,17 +1048,45 @@ private[spark] class TaskSetManager(
 
   /** Called by TaskScheduler when an executor is lost so we can re-enqueue 
our tasks */
   override def executorLost(execId: String, host: String, reason: 
ExecutorLossReason): Unit = {
-    // Re-enqueue any tasks that ran on the failed executor if this is a 
shuffle map stage,
-    // and we are not using an external shuffle server which could serve the 
shuffle outputs.
-    // The reason is the next stage wouldn't be able to fetch the data from 
this dead executor
-    // so we would need to rerun these tasks on other executors.
-    if (isShuffleMapTasks && !env.blockManager.externalShuffleServiceEnabled 
&& !isZombie) {
+    // Re-enqueue any tasks with potential shuffle data loss that ran on the 
failed executor
+    // if this is a shuffle map stage, and we are not using an external 
shuffle server which
+    // could serve the shuffle outputs or the executor lost is caused by 
decommission (which
+    // can destroy the whole host). The reason is the next stage wouldn't be 
able to fetch the
+    // data from this dead executor so we would need to rerun these tasks on 
other executors.
+    val maybeShuffleMapOutputLoss = isShuffleMapTasks &&
+      (reason.isInstanceOf[ExecutorDecommission] || 
!env.blockManager.externalShuffleServiceEnabled)
+    if (maybeShuffleMapOutputLoss && !isZombie) {
       for ((tid, info) <- taskInfos if info.executorId == execId) {
         val index = info.index
+        lazy val isShuffleMapOutputAvailable = reason match {
+          case ExecutorDecommission(_, _) =>
+            val mapId = if (conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL)) {
+              info.partitionId
+            } else {
+              tid
+            }
+            val locationOpt = 
env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
+              .getMapOutputLocation(shuffleId.get, mapId)
+            // There are 3 cases of locationOpt:
+            // 1) locationOpt.isDefined && locationOpt.get.host == host:
+            //    this case implies that the shuffle map output is still on 
the lost executor. The
+            //    map output file is supposed to lose so we should rerun this 
task;
+            // 2) locationOpt.isDefined && locationOpt.get.host != host:
+            //    this case implies that the shuffle map output has been 
migrated to another
+            //    host. The task doesn't need to rerun;
+            // 3) locationOpt.isEmpty:
+            //    This shouldn't not happen ideally since TaskSetManager 
handles executor lost first
+            //    before DAGScheduler. So the map statues for the successful 
task must be available
+            //    at this moment. keep it here in case the handling order 
changes.
+            locationOpt.exists(_.host != host)
+
+          case _ => false
+        }
         // We may have a running task whose partition has been marked as 
successful,
         // this partition has another task completed in another stage attempt.
         // We treat it as a running task and will call handleFailedTask later.
-        if (successful(index) && !info.running && 
!killedByOtherAttempt.contains(tid)) {
+        if (successful(index) && !info.running && 
!killedByOtherAttempt.contains(tid) &&
+            !isShuffleMapOutputAvailable) {
           successful(index) = false
           copiesRunning(index) -= 1
           tasksSuccessful -= 1
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/DecommissionWorkerSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/DecommissionWorkerSuite.scala
index c2486b9650d..fe9bce770f5 100644
--- a/core/src/test/scala/org/apache/spark/deploy/DecommissionWorkerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/DecommissionWorkerSuite.scala
@@ -321,6 +321,9 @@ class DecommissionWorkerSuite
     }
 
     override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
+      // Task resubmit is a signal to DAGScheduler not a real task end event. 
Ignore it here
+      // to avoid over count.
+      if (taskEnd.reason == Resubmitted) return
       val taskSignature = getSignature(taskEnd.taskInfo, taskEnd.stageId, 
taskEnd.stageAttemptId)
       logInfo(s"Task End $taskSignature")
       tasksFinished.add(taskSignature)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala 
b/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala
index fdd89378927..6ab56d3fffe 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala
@@ -73,7 +73,7 @@ object FakeTask {
     val tasks = Array.tabulate[Task[_]](numTasks) { i =>
       new FakeTask(stageId, i, if (prefLocs.size != 0) prefLocs(i) else Nil)
     }
-    new TaskSet(tasks, stageId, stageAttemptId, priority = priority, null, 
rpId)
+    new TaskSet(tasks, stageId, stageAttemptId, priority = priority, null, 
rpId, None)
   }
 
   def createShuffleMapTaskSet(
@@ -100,7 +100,7 @@ object FakeTask {
         
SparkEnv.get.closureSerializer.newInstance().serialize(TaskMetrics.registered).array())
     }
     new TaskSet(tasks, stageId, stageAttemptId, priority = priority, null,
-      ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+      ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID, Some(0))
   }
 
   def createBarrierTaskSet(numTasks: Int, prefLocs: Seq[TaskLocation]*): 
TaskSet = {
@@ -129,6 +129,6 @@ object FakeTask {
     val tasks = Array.tabulate[Task[_]](numTasks) { i =>
       new FakeTask(stageId, i, if (prefLocs.size != 0) prefLocs(i) else Nil, 
isBarrier = true)
     }
-    new TaskSet(tasks, stageId, stageAttemptId, priority = priority, null, 
rpId)
+    new TaskSet(tasks, stageId, stageAttemptId, priority = priority, null, 
rpId, None)
   }
 }
diff --git a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala
index fa2c5eaee8b..85ade97eb92 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala
@@ -45,7 +45,7 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
       new FakeTask(stageId, i, Nil)
     }
     new TaskSetManager(taskScheduler, new TaskSet(tasks, stageId, 0, 0, null,
-      ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID), 0)
+      ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID, None), 0)
   }
 
   def scheduleTaskAndVerifyId(taskId: Int, rootPool: Pool, expectedStageId: 
Int): Unit = {
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index eec5449bc72..af4cf8731b6 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -531,7 +531,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext
     val numFreeCores = 1
     val taskSet = new TaskSet(
       Array(new NotSerializableFakeTask(1, 0), new NotSerializableFakeTask(0, 
1)),
-      0, 0, 0, null, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+      0, 0, 0, null, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID, None)
     val multiCoreWorkerOffers = IndexedSeq(new WorkerOffer("executor0", 
"host0", taskCpus),
       new WorkerOffer("executor1", "host1", numFreeCores))
     taskScheduler.submitTasks(taskSet)
@@ -546,7 +546,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext
     taskScheduler.submitTasks(FakeTask.createTaskSet(1))
     val taskSet2 = new TaskSet(
       Array(new NotSerializableFakeTask(1, 0), new NotSerializableFakeTask(0, 
1)),
-      1, 0, 0, null, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+      1, 0, 0, null, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID, None)
     taskScheduler.submitTasks(taskSet2)
     taskDescriptions = 
taskScheduler.resourceOffers(multiCoreWorkerOffers).flatten
     assert(taskDescriptions.map(_.executorId) === Seq("executor0"))
@@ -2160,7 +2160,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext
       override def index: Int = 1
     }, 1, Seq(TaskLocation("host1", "executor1")), new Properties, null)
 
-    val taskSet = new TaskSet(Array(task1, task2), 0, 0, 0, null, 0)
+    val taskSet = new TaskSet(Array(task1, task2), 0, 0, 0, null, 0, Some(0))
 
     taskScheduler.submitTasks(taskSet)
     val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index a3b9eff8084..45360f486ed 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -670,6 +670,42 @@ class TaskSetManagerSuite
     assert(manager.resourceOffer("execA", "host1", ANY)._1.isDefined)
   }
 
+  test("SPARK-41469: task doesn't need to rerun on executor lost if shuffle 
data has migrated") {
+    sc = new SparkContext("local", "test")
+    sched = new FakeTaskScheduler(sc)
+    val backend = mock(classOf[SchedulerBackend])
+    doNothing().when(backend).reviveOffers()
+    sched.initialize(backend)
+
+    sched.addExecutor("exec0", "host0")
+
+    val mapOutputTracker = 
sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
+    mapOutputTracker.registerShuffle(0, 2, 0)
+
+    val taskSet = FakeTask.createShuffleMapTaskSet(2, 0, 0,
+      Seq(TaskLocation("host0", "exec0")), Seq(TaskLocation("host1", "exec1")))
+    sched.submitTasks(taskSet)
+    val manager = sched.taskSetManagerForAttempt(0, 0).get
+
+    // Schedule task 0 and mark it as completed with shuffle map output 
registered
+    val taskDesc = manager.resourceOffer("exec0", "host0", PROCESS_LOCAL)._1
+    assert(taskDesc.isDefined)
+    val taskIndex = taskDesc.get.index
+    val taskId = taskDesc.get.taskId
+    manager.handleSuccessfulTask(taskId, createTaskResult(taskId.toInt))
+    mapOutputTracker.registerMapOutput(0, taskIndex,
+      MapStatus(BlockManagerId("exec0", "host0", 8848), Array(1024), taskId))
+
+    // Mock executor "exec0" decommission and migrate shuffle map output of 
task 0
+    manager.executorDecommission("exec0")
+    mapOutputTracker.updateMapOutput(0, taskId, BlockManagerId("exec1", 
"host1", 8848))
+
+    // Trigger executor "exec0" lost. Since the map output of task 0 has been 
migrated, it doesn't
+    // need to rerun. So task 0 should still remain in the successful status.
+    manager.executorLost("exec0", "host0", ExecutorDecommission())
+    assert(manager.successful(taskIndex))
+  }
+
   test("SPARK-32653: Decommissioned host should not be used to calculate 
locality levels") {
     sc = new SparkContext("local", "test")
     sched = new FakeTaskScheduler(sc)
@@ -770,7 +806,7 @@ class TaskSetManagerSuite
     sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
 
     val taskSet = new TaskSet(Array(new LargeTask(0)), 0, 0, 0,
-      null, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+      null, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID, None)
     val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)
 
     assert(!manager.emittedTaskSizeWarning)
@@ -786,7 +822,7 @@ class TaskSetManagerSuite
 
     val taskSet = new TaskSet(
       Array(new NotSerializableFakeTask(1, 0), new NotSerializableFakeTask(0, 
1)),
-      0, 0, 0, null, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+      0, 0, 0, null, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID, None)
     val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)
 
     intercept[TaskNotSerializableException] {
@@ -866,7 +902,7 @@ class TaskSetManagerSuite
         override def index: Int = 0
       }, 1, Seq(TaskLocation("host1", "execA")), new Properties, null)
     val taskSet = new TaskSet(Array(singleTask), 0, 0, 0,
-      null, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+      null, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID, Some(0))
     val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)
 
     // Offer host1, which should be accepted as a PROCESS_LOCAL location
@@ -2192,7 +2228,7 @@ class TaskSetManagerSuite
 
     val tasks = Array.tabulate[Task[_]](2)(partition => new 
FakeLongTasks(stageId = 0, partition))
     val taskSet: TaskSet = new TaskSet(tasks, stageId = 0, stageAttemptId = 0, 
priority = 0, null,
-      ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+      ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID, None)
     val stageId = taskSet.stageId
     val stageAttemptId = taskSet.stageAttemptId
     sched.submitTasks(taskSet)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to