Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21131#discussion_r183781695
  
    --- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala ---
    @@ -917,4 +917,111 @@ class TaskSchedulerImplSuite extends SparkFunSuite 
with LocalSparkContext with B
           taskScheduler.initialize(new FakeSchedulerBackend)
         }
       }
    +
    +  test("Completions in zombie tasksets update status of non-zombie 
taskset") {
    +    val taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
    +    val valueSer = SparkEnv.get.serializer.newInstance()
    +
    +    def completeTaskSuccessfully(tsm: TaskSetManager, partition: Int): 
Unit = {
    +      val indexInTsm = tsm.partitionToIndex(partition)
    +      val matchingTaskInfo = tsm.taskAttempts.flatten.filter(_.index == 
indexInTsm).head
    +      val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq())
    +      tsm.handleSuccessfulTask(matchingTaskInfo.taskId, result)
    +    }
    +
    +    // Submit a task set, have it fail with a fetch failed, and then 
re-submit the task attempt,
    +    // two times, so we have three active task sets for one stage.  (For 
this to really happen,
    +    // you'd need the previous stage to also get restarted, and then 
succeed, in between each
    +    // attempt, but that happens outside what we're mocking here.)
    +    val zombieAttempts = (0 until 2).map { stageAttempt =>
    +      val attempt = FakeTask.createTaskSet(10, stageAttemptId = 
stageAttempt)
    +      taskScheduler.submitTasks(attempt)
    +      val tsm = taskScheduler.taskSetManagerForAttempt(0, stageAttempt).get
    +      val offers = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", 
s"host-$idx", 1) }
    +      taskScheduler.resourceOffers(offers)
    +      assert(tsm.runningTasks === 10)
    +      if (stageAttempt < 2) {
    +        // fail attempt
    +        tsm.handleFailedTask(tsm.taskAttempts.head.head.taskId, 
TaskState.FAILED,
    +          FetchFailed(null, 0, 0, 0, "fetch failed"))
    +        // the attempt is a zombie, but the tasks are still running (this 
could be true even if
    +        // we actively killed those tasks, as killing is best-effort)
    +        assert(tsm.isZombie)
    +        assert(tsm.runningTasks === 9)
    +      }
    +      tsm
    +    }
    +
    +    // we've now got 2 zombie attempts, each with 9 tasks still active.  
Submit the 3rd attempt for
    +    // the stage, but this time with insufficient resources so not all 
tasks are active.
    +
    +    val finalAttempt = FakeTask.createTaskSet(10, stageAttemptId = 2)
    +    taskScheduler.submitTasks(finalAttempt)
    +    val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get
    +    val offers = (0 until 5).map{ idx => WorkerOffer(s"exec-$idx", 
s"host-$idx", 1) }
    +    val finalAttemptLaunchedPartitions = 
taskScheduler.resourceOffers(offers).flatten.map { task =>
    --- End diff --
    
    we've previously debated about what to do with the tasks still running in a 
zombie attempt, and there hasn't been any definitive conclusion.  I'm just 
trying to do a correctness fix here.  Briefly, in general there is an 
expectation that those tasks are unlikely to succeed (because they won't be 
able to get their shuffle input, same as the original fetch failure), so we 
don't want to delay starting a new attempt of that task.  And perhaps we should 
even actively kill those tasks (you'll see comments about that in various 
places).  But if they do succeed, we need to handle them correctly.  Note that 
even if we did try to actively kill them, you'd still need to handle a 
late-completion, as killing would only be "best-effort".


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to