Repository: spark
Updated Branches:
  refs/heads/branch-2.3 61e7bc0c1 -> 8509284e1


[SPARK-23433][CORE] Late zombie task completions update all tasksets

Fetch failure lead to multiple tasksets which are active for a given
stage.  While there is only one "active" version of the taskset, the
earlier attempts can still have running tasks, which can complete
successfully.  So a task completion needs to update every taskset
so that it knows the partition is completed.  That way the final active
taskset does not try to submit another task for the same partition,
and so that it knows when it is completed and when it should be
marked as a "zombie".

Added a regression test.

Author: Imran Rashid <iras...@cloudera.com>

Closes #21131 from squito/SPARK-23433.

(cherry picked from commit 94641fe6cc68e5977dd8663b8f232a287a783acb)
Signed-off-by: Imran Rashid <iras...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8509284e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8509284e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8509284e

Branch: refs/heads/branch-2.3
Commit: 8509284e1ec048d5afa87d41071c0429924e45c9
Parents: 61e7bc0
Author: Imran Rashid <iras...@cloudera.com>
Authored: Thu May 3 10:59:18 2018 -0500
Committer: Imran Rashid <iras...@cloudera.com>
Committed: Thu May 3 10:59:30 2018 -0500

----------------------------------------------------------------------
 .../spark/scheduler/TaskSchedulerImpl.scala     |  14 +++
 .../apache/spark/scheduler/TaskSetManager.scala |  20 +++-
 .../scheduler/TaskSchedulerImplSuite.scala      | 104 +++++++++++++++++++
 3 files changed, 137 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8509284e/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 0c11806..8e97b3d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -689,6 +689,20 @@ private[spark] class TaskSchedulerImpl(
     }
   }
 
+  /**
+   * Marks the task has completed in all TaskSetManagers for the given stage.
+   *
+   * After stage failure and retry, there may be multiple TaskSetManagers for 
the stage.
+   * If an earlier attempt of a stage completes a task, we should ensure that 
the later attempts
+   * do not also submit those same tasks.  That also means that a task 
completion from an  earlier
+   * attempt can lead to the entire stage getting marked as successful.
+   */
+  private[scheduler] def markPartitionCompletedInAllTaskSets(stageId: Int, 
partitionId: Int) = {
+    taskSetsByStageIdAndAttempt.getOrElse(stageId, Map()).values.foreach { tsm 
=>
+      tsm.markPartitionCompleted(partitionId)
+    }
+  }
+
 }
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/8509284e/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index c3ed11b..b52e376 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -74,6 +74,8 @@ private[spark] class TaskSetManager(
   val ser = env.closureSerializer.newInstance()
 
   val tasks = taskSet.tasks
+  private[scheduler] val partitionToIndex = tasks.zipWithIndex
+    .map { case (t, idx) => t.partitionId -> idx }.toMap
   val numTasks = tasks.length
   val copiesRunning = new Array[Int](numTasks)
 
@@ -154,7 +156,7 @@ private[spark] class TaskSetManager(
   private[scheduler] val speculatableTasks = new HashSet[Int]
 
   // Task index, start and finish time for each task attempt (indexed by task 
ID)
-  private val taskInfos = new HashMap[Long, TaskInfo]
+  private[scheduler] val taskInfos = new HashMap[Long, TaskInfo]
 
   // Use a MedianHeap to record durations of successful tasks so we know when 
to launch
   // speculative tasks. This is only used when speculation is enabled, to 
avoid the overhead
@@ -755,6 +757,9 @@ private[spark] class TaskSetManager(
       logInfo("Ignoring task-finished event for " + info.id + " in stage " + 
taskSet.id +
         " because task " + index + " has already completed successfully")
     }
+    // There may be multiple tasksets for this stage -- we let all of them 
know that the partition
+    // was completed.  This may result in some of the tasksets getting 
completed.
+    sched.markPartitionCompletedInAllTaskSets(stageId, 
tasks(index).partitionId)
     // This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which 
holds the
     // "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, 
we should not
     // "deserialize" the value when holding a lock to avoid blocking other 
threads. So we call
@@ -765,6 +770,19 @@ private[spark] class TaskSetManager(
     maybeFinishTaskSet()
   }
 
+  private[scheduler] def markPartitionCompleted(partitionId: Int): Unit = {
+    partitionToIndex.get(partitionId).foreach { index =>
+      if (!successful(index)) {
+        tasksSuccessful += 1
+        successful(index) = true
+        if (tasksSuccessful == numTasks) {
+          isZombie = true
+        }
+        maybeFinishTaskSet()
+      }
+    }
+  }
+
   /**
    * Marks the task as failed, re-adds it to the list of pending tasks, and 
notifies the
    * DAG Scheduler.

http://git-wip-us.apache.org/repos/asf/spark/blob/8509284e/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index 6003899..33f2ea1 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -917,4 +917,108 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with B
       taskScheduler.initialize(new FakeSchedulerBackend)
     }
   }
+
+  test("Completions in zombie tasksets update status of non-zombie taskset") {
+    val taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
+    val valueSer = SparkEnv.get.serializer.newInstance()
+
+    def completeTaskSuccessfully(tsm: TaskSetManager, partition: Int): Unit = {
+      val indexInTsm = tsm.partitionToIndex(partition)
+      val matchingTaskInfo = tsm.taskAttempts.flatten.filter(_.index == 
indexInTsm).head
+      val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq())
+      tsm.handleSuccessfulTask(matchingTaskInfo.taskId, result)
+    }
+
+    // Submit a task set, have it fail with a fetch failed, and then re-submit 
the task attempt,
+    // two times, so we have three active task sets for one stage.  (For this 
to really happen,
+    // you'd need the previous stage to also get restarted, and then succeed, 
in between each
+    // attempt, but that happens outside what we're mocking here.)
+    val zombieAttempts = (0 until 2).map { stageAttempt =>
+      val attempt = FakeTask.createTaskSet(10, stageAttemptId = stageAttempt)
+      taskScheduler.submitTasks(attempt)
+      val tsm = taskScheduler.taskSetManagerForAttempt(0, stageAttempt).get
+      val offers = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", 
s"host-$idx", 1) }
+      taskScheduler.resourceOffers(offers)
+      assert(tsm.runningTasks === 10)
+      // fail attempt
+      tsm.handleFailedTask(tsm.taskAttempts.head.head.taskId, TaskState.FAILED,
+        FetchFailed(null, 0, 0, 0, "fetch failed"))
+      // the attempt is a zombie, but the tasks are still running (this could 
be true even if
+      // we actively killed those tasks, as killing is best-effort)
+      assert(tsm.isZombie)
+      assert(tsm.runningTasks === 9)
+      tsm
+    }
+
+    // we've now got 2 zombie attempts, each with 9 tasks still active.  
Submit the 3rd attempt for
+    // the stage, but this time with insufficient resources so not all tasks 
are active.
+
+    val finalAttempt = FakeTask.createTaskSet(10, stageAttemptId = 2)
+    taskScheduler.submitTasks(finalAttempt)
+    val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get
+    val offers = (0 until 5).map{ idx => WorkerOffer(s"exec-$idx", 
s"host-$idx", 1) }
+    val finalAttemptLaunchedPartitions = 
taskScheduler.resourceOffers(offers).flatten.map { task =>
+      finalAttempt.tasks(task.index).partitionId
+    }.toSet
+    assert(finalTsm.runningTasks === 5)
+    assert(!finalTsm.isZombie)
+
+    // We simulate late completions from our zombie tasksets, corresponding to 
all the pending
+    // partitions in our final attempt.  This means we're only waiting on the 
tasks we've already
+    // launched.
+    val finalAttemptPendingPartitions = (0 until 
10).toSet.diff(finalAttemptLaunchedPartitions)
+    finalAttemptPendingPartitions.foreach { partition =>
+      completeTaskSuccessfully(zombieAttempts(0), partition)
+    }
+
+    // If there is another resource offer, we shouldn't run anything.  Though 
our final attempt
+    // used to have pending tasks, now those tasks have been completed by 
zombie attempts.  The
+    // remaining tasks to compute are already active in the non-zombie attempt.
+    assert(
+      taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("exec-1", "host-1", 
1))).flatten.isEmpty)
+
+    val remainingTasks = finalAttemptLaunchedPartitions.toIndexedSeq.sorted
+
+    // finally, if we finish the remaining partitions from a mix of tasksets, 
all attempts should be
+    // marked as zombie.
+    // for each of the remaining tasks, find the tasksets with an active copy 
of the task, and
+    // finish the task.
+    remainingTasks.foreach { partition =>
+      val tsm = if (partition == 0) {
+        // we failed this task on both zombie attempts, this one is only 
present in the latest
+        // taskset
+        finalTsm
+      } else {
+        // should be active in every taskset.  We choose a zombie taskset just 
to make sure that
+        // we transition the active taskset correctly even if the final 
completion comes
+        // from a zombie.
+        zombieAttempts(partition % 2)
+      }
+      completeTaskSuccessfully(tsm, partition)
+    }
+
+    assert(finalTsm.isZombie)
+
+    // no taskset has completed all of its tasks, so no updates to the 
blacklist tracker yet
+    verify(blacklist, never).updateBlacklistForSuccessfulTaskSet(anyInt(), 
anyInt(), anyObject())
+
+    // finally, lets complete all the tasks.  We simulate failures in attempt 
1, but everything
+    // else succeeds, to make sure we get the right updates to the blacklist 
in all cases.
+    (zombieAttempts ++ Seq(finalTsm)).foreach { tsm =>
+      val stageAttempt = tsm.taskSet.stageAttemptId
+      tsm.runningTasksSet.foreach { index =>
+        if (stageAttempt == 1) {
+          tsm.handleFailedTask(tsm.taskInfos(index).taskId, TaskState.FAILED, 
TaskResultLost)
+        } else {
+          val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq())
+          tsm.handleSuccessfulTask(tsm.taskInfos(index).taskId, result)
+        }
+      }
+
+      // we update the blacklist for the stage attempts with all successful 
tasks.  Even though
+      // some tasksets had failures, we still consider them all successful 
from a blacklisting
+      // perspective, as the failures weren't from a problem w/ the tasks 
themselves.
+      verify(blacklist).updateBlacklistForSuccessfulTaskSet(meq(0), 
meq(stageAttempt), anyObject())
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to