Github user Ngone51 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21131#discussion_r183797532
  
    --- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala ---
    @@ -917,4 +917,111 @@ class TaskSchedulerImplSuite extends SparkFunSuite 
with LocalSparkContext with B
           taskScheduler.initialize(new FakeSchedulerBackend)
         }
       }
    +
    +  test("Completions in zombie tasksets update status of non-zombie 
taskset") {
    +    val taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
    +    val valueSer = SparkEnv.get.serializer.newInstance()
    +
    +    def completeTaskSuccessfully(tsm: TaskSetManager, partition: Int): 
Unit = {
    +      val indexInTsm = tsm.partitionToIndex(partition)
    +      val matchingTaskInfo = tsm.taskAttempts.flatten.filter(_.index == 
indexInTsm).head
    +      val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq())
    +      tsm.handleSuccessfulTask(matchingTaskInfo.taskId, result)
    +    }
    +
    +    // Submit a task set, have it fail with a fetch failed, and then 
re-submit the task attempt,
    +    // two times, so we have three active task sets for one stage.  (For 
this to really happen,
    +    // you'd need the previous stage to also get restarted, and then 
succeed, in between each
    +    // attempt, but that happens outside what we're mocking here.)
    +    val zombieAttempts = (0 until 2).map { stageAttempt =>
    +      val attempt = FakeTask.createTaskSet(10, stageAttemptId = 
stageAttempt)
    +      taskScheduler.submitTasks(attempt)
    +      val tsm = taskScheduler.taskSetManagerForAttempt(0, stageAttempt).get
    +      val offers = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", 
s"host-$idx", 1) }
    +      taskScheduler.resourceOffers(offers)
    +      assert(tsm.runningTasks === 10)
    +      if (stageAttempt < 2) {
    +        // fail attempt
    +        tsm.handleFailedTask(tsm.taskAttempts.head.head.taskId, 
TaskState.FAILED,
    +          FetchFailed(null, 0, 0, 0, "fetch failed"))
    +        // the attempt is a zombie, but the tasks are still running (this 
could be true even if
    +        // we actively killed those tasks, as killing is best-effort)
    +        assert(tsm.isZombie)
    +        assert(tsm.runningTasks === 9)
    +      }
    +      tsm
    +    }
    +
    +    // we've now got 2 zombie attempts, each with 9 tasks still active.  
Submit the 3rd attempt for
    +    // the stage, but this time with insufficient resources so not all 
tasks are active.
    +
    +    val finalAttempt = FakeTask.createTaskSet(10, stageAttemptId = 2)
    +    taskScheduler.submitTasks(finalAttempt)
    +    val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get
    +    val offers = (0 until 5).map{ idx => WorkerOffer(s"exec-$idx", 
s"host-$idx", 1) }
    +    val finalAttemptLaunchedPartitions = 
taskScheduler.resourceOffers(offers).flatten.map { task =>
    --- End diff --
    
    The explanation is quite clear and I get understand now. Thank you very 
mush! @squito 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to