This is an automated email from the ASF dual-hosted git repository.

irashid pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.3 by this push:
     new 877b8db  [SPARK-27065][CORE] avoid more than one active task set 
managers for a stage
877b8db is described below

commit 877b8db25b70ffb0793a619342e7b8edda712b31
Author: Wenchen Fan <wenc...@databricks.com>
AuthorDate: Wed Mar 6 12:00:33 2019 -0600

    [SPARK-27065][CORE] avoid more than one active task set managers for a stage
    
    ## What changes were proposed in this pull request?
    
    This is another attempt to fix the more-than-one-active-task-set-managers 
bug.
    
    https://github.com/apache/spark/pull/17208 is the first attempt. It marks 
the TSM as zombie before sending a task completion event to DAGScheduler. This 
is necessary, because when the DAGScheduler gets the task completion event, and 
it's for the last partition, then the stage is finished. However, if it's a 
shuffle stage and it has missing map outputs, DAGScheduler will resubmit it(see 
the 
[code](https://github.com/apache/spark/blob/v2.4.0/core/src/main/scala/org/apache/spark/schedule
 [...]
    
    This fix has a hole: Let's say a stage has 10 partitions and 2 task set 
managers: TSM1(zombie) and TSM2(active). TSM1 has a running task for partition 
10 and it completes. TSM2 finishes tasks for partitions 1-9, and thinks he is 
still active because he hasn't finished partition 10 yet. However, DAGScheduler 
gets task completion events for all the 10 partitions and thinks the stage is 
finished. Then the same problem occurs: DAGScheduler may resubmit the stage and 
cause more than one ac [...]
    
    https://github.com/apache/spark/pull/21131 fixed this hole by notifying all 
the task set managers when a task finishes. For the above case, TSM2 will know 
that partition 10 is already completed, so he can mark himself as zombie after 
partitions 1-9 are completed.
    
    However, #21131 still has a hole: TSM2 may be created after the task from 
TSM1 is completed. Then TSM2 can't get notified about the task completion, and 
leads to the more than one active TSM error.
    
    #22806 and #23871 are created to fix this hole. However the fix is 
complicated and there are still ongoing discussions.
    
    This PR proposes a simple fix, which can be easy to backport: mark all 
existing task set managers as zombie when trying to create a new task set 
manager.
    
    After this PR, #21131 is still necessary, to avoid launching unnecessary 
tasks and fix [SPARK-25250](https://issues.apache.org/jira/browse/SPARK-25250 
). #22806 and #23871 are its followups to fix the hole.
    
    ## How was this patch tested?
    
    existing tests.
    
    Closes #23927 from cloud-fan/scheduler.
    
    Authored-by: Wenchen Fan <wenc...@databricks.com>
    Signed-off-by: Imran Rashid <iras...@cloudera.com>
    (cherry picked from commit cb20fbc43e7f54af1ed30b9eb6d76ca50b4eb750)
    Signed-off-by: Imran Rashid <iras...@cloudera.com>
---
 .../apache/spark/scheduler/TaskSchedulerImpl.scala | 20 ++++++++-----
 .../spark/scheduler/TaskSchedulerImplSuite.scala   | 33 ++++++++++++++--------
 2 files changed, 35 insertions(+), 18 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 4edc6b2..edf79aa 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -185,14 +185,20 @@ private[spark] class TaskSchedulerImpl(
       val stage = taskSet.stageId
       val stageTaskSets =
         taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, 
TaskSetManager])
-      stageTaskSets(taskSet.stageAttemptId) = manager
-      val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
-        ts.taskSet != taskSet && !ts.isZombie
-      }
-      if (conflictingTaskSet) {
-        throw new IllegalStateException(s"more than one active taskSet for 
stage $stage:" +
-          s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")
+
+      // Mark all the existing TaskSetManagers of this stage as zombie, as we 
are adding a new one.
+      // This is necessary to handle a corner case. Let's say a stage has 10 
partitions and has 2
+      // TaskSetManagers: TSM1(zombie) and TSM2(active). TSM1 has a running 
task for partition 10
+      // and it completes. TSM2 finishes tasks for partition 1-9, and thinks 
he is still active
+      // because partition 10 is not completed yet. However, DAGScheduler gets 
task completion
+      // events for all the 10 partitions and thinks the stage is finished. If 
it's a shuffle stage
+      // and somehow it has missing map outputs, then DAGScheduler will 
resubmit it and create a
+      // TSM3 for it. As a stage can't have more than one active task set 
managers, we must mark
+      // TSM2 as zombie (it actually is).
+      stageTaskSets.foreach { case (_, ts) =>
+        ts.isZombie = true
       }
+      stageTaskSets(taskSet.stageAttemptId) = manager
       schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
 
       if (!isLocal && !hasReceivedTask) {
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index 932bfca..bc9a39c 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -196,28 +196,39 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with B
     // Even if one of the task sets has not-serializable tasks, the other task 
set should
     // still be processed without error
     taskScheduler.submitTasks(FakeTask.createTaskSet(1))
-    taskScheduler.submitTasks(taskSet)
+    val taskSet2 = new TaskSet(
+      Array(new NotSerializableFakeTask(1, 0), new NotSerializableFakeTask(0, 
1)), 1, 0, 0, null)
+    taskScheduler.submitTasks(taskSet2)
     taskDescriptions = 
taskScheduler.resourceOffers(multiCoreWorkerOffers).flatten
     assert(taskDescriptions.map(_.executorId) === Seq("executor0"))
   }
 
-  test("refuse to schedule concurrent attempts for the same stage 
(SPARK-8103)") {
+  test("concurrent attempts for the same stage only have one active taskset") {
     val taskScheduler = setupScheduler()
+    def isTasksetZombie(taskset: TaskSet): Boolean = {
+      taskScheduler.taskSetManagerForAttempt(taskset.stageId, 
taskset.stageAttemptId).get.isZombie
+    }
+
     val attempt1 = FakeTask.createTaskSet(1, 0)
-    val attempt2 = FakeTask.createTaskSet(1, 1)
     taskScheduler.submitTasks(attempt1)
-    intercept[IllegalStateException] { taskScheduler.submitTasks(attempt2) }
+    // The first submitted taskset is active
+    assert(!isTasksetZombie(attempt1))
 
-    // OK to submit multiple if previous attempts are all zombie
-    taskScheduler.taskSetManagerForAttempt(attempt1.stageId, 
attempt1.stageAttemptId)
-      .get.isZombie = true
+    val attempt2 = FakeTask.createTaskSet(1, 1)
     taskScheduler.submitTasks(attempt2)
+    // The first submitted taskset is zombie now
+    assert(isTasksetZombie(attempt1))
+    // The newly submitted taskset is active
+    assert(!isTasksetZombie(attempt2))
+
     val attempt3 = FakeTask.createTaskSet(1, 2)
-    intercept[IllegalStateException] { taskScheduler.submitTasks(attempt3) }
-    taskScheduler.taskSetManagerForAttempt(attempt2.stageId, 
attempt2.stageAttemptId)
-      .get.isZombie = true
     taskScheduler.submitTasks(attempt3)
-    assert(!failedTaskSet)
+    // The first submitted taskset remains zombie
+    assert(isTasksetZombie(attempt1))
+    // The second submitted taskset is zombie now
+    assert(isTasksetZombie(attempt2))
+    // The newly submitted taskset is active
+    assert(!isTasksetZombie(attempt3))
   }
 
   test("don't schedule more tasks after a taskset is zombie") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to