Github user attilapiros commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21131#discussion_r183774355
  
    --- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala ---
    @@ -917,4 +917,111 @@ class TaskSchedulerImplSuite extends SparkFunSuite 
with LocalSparkContext with B
           taskScheduler.initialize(new FakeSchedulerBackend)
         }
       }
    +
    +  test("Completions in zombie tasksets update status of non-zombie 
taskset") {
    +    val taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
    +    val valueSer = SparkEnv.get.serializer.newInstance()
    +
    +    def completeTaskSuccessfully(tsm: TaskSetManager, partition: Int): 
Unit = {
    +      val indexInTsm = tsm.partitionToIndex(partition)
    +      val matchingTaskInfo = tsm.taskAttempts.flatten.filter(_.index == 
indexInTsm).head
    +      val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq())
    +      tsm.handleSuccessfulTask(matchingTaskInfo.taskId, result)
    +    }
    +
    +    // Submit a task set, have it fail with a fetch failed, and then 
re-submit the task attempt,
    +    // two times, so we have three active task sets for one stage.  (For 
this to really happen,
    +    // you'd need the previous stage to also get restarted, and then 
succeed, in between each
    +    // attempt, but that happens outside what we're mocking here.)
    +    val zombieAttempts = (0 until 2).map { stageAttempt =>
    +      val attempt = FakeTask.createTaskSet(10, stageAttemptId = 
stageAttempt)
    +      taskScheduler.submitTasks(attempt)
    +      val tsm = taskScheduler.taskSetManagerForAttempt(0, stageAttempt).get
    +      val offers = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", 
s"host-$idx", 1) }
    +      taskScheduler.resourceOffers(offers)
    +      assert(tsm.runningTasks === 10)
    +      if (stageAttempt < 2) {
    +        // fail attempt
    +        tsm.handleFailedTask(tsm.taskAttempts.head.head.taskId, 
TaskState.FAILED,
    +          FetchFailed(null, 0, 0, 0, "fetch failed"))
    +        // the attempt is a zombie, but the tasks are still running (this 
could be true even if
    +        // we actively killed those tasks, as killing is best-effort)
    +        assert(tsm.isZombie)
    +        assert(tsm.runningTasks === 9)
    +      }
    +      tsm
    +    }
    +
    +    // we've now got 2 zombie attempts, each with 9 tasks still active.  
Submit the 3rd attempt for
    +    // the stage, but this time with insufficient resources so not all 
tasks are active.
    +
    +    val finalAttempt = FakeTask.createTaskSet(10, stageAttemptId = 2)
    +    taskScheduler.submitTasks(finalAttempt)
    +    val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get
    +    val offers = (0 until 5).map{ idx => WorkerOffer(s"exec-$idx", 
s"host-$idx", 1) }
    +    val finalAttemptLaunchedPartitions = 
taskScheduler.resourceOffers(offers).flatten.map { task =>
    +      finalAttempt.tasks(task.index).partitionId
    +    }.toSet
    +    assert(finalTsm.runningTasks === 5)
    +    assert(!finalTsm.isZombie)
    +
    +    // We simulate late completions from our zombie tasksets, 
corresponding to all the pending
    +    // partitions in our final attempt.  This means we're only waiting on 
the tasks we've already
    +    // launched.
    +    val finalAttemptPendingPartitions = (0 until 
10).toSet.diff(finalAttemptLaunchedPartitions)
    +    finalAttemptPendingPartitions.foreach { partition =>
    +      completeTaskSuccessfully(zombieAttempts(0), partition)
    +    }
    +
    +    // If there is another resource offer, we shouldn't run anything.  
Though our final attempt
    +    // used to have pending tasks, now those tasks have been completed by 
zombie attempts.  The
    +    // remaining tasks to compute are already active in the non-zombie 
attempt.
    +    assert(
    +      taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("exec-1", 
"host-1", 1))).flatten.isEmpty)
    +
    +    val allTaskSets = zombieAttempts ++ Seq(finalTsm)
    +    val remainingTasks = (0 until 
10).toSet.diff(finalAttemptPendingPartitions)
    +
    +    // finally, if we finish the remaining partitions from a mix of 
tasksets, all attempts should be
    +    // marked as zombie.
    +    // for each of the remaining tasks, find the tasksets with an active 
copy of the task, and
    +    // finish the task.
    +    remainingTasks.foreach { partition =>
    +      val tsm = if (partition == 0) {
    +        // we failed this task on both zombie attempts, this one is only 
present in the latest
    +        // taskset
    +        finalTsm
    +      } else {
    +        // should be active in every taskset.  We choose a zombie taskset 
just to make sure that
    +        // we transition the active taskset correctly even if the final 
completion comes
    +        // from a zombie.
    +        zombieAttempts(partition % 2)
    +      }
    +      completeTaskSuccessfully(tsm, partition)
    +    }
    +
    +    assert(finalTsm.isZombie)
    +
    +    // no taskset has completed all of its tasks, so no updates to the 
blacklist tracker yet
    +    verify(blacklist, never).updateBlacklistForSuccessfulTaskSet(anyInt(), 
anyInt(), anyObject())
    +
    +    // finally, lets complete all the tasks.  We simulate failures in 
attempt 1, but everything
    +    // else succeeds, to make sure we get the right updates to the 
blacklist in all cases.
    +    (zombieAttempts ++ Seq(finalTsm)).foreach { tsm =>
    --- End diff --
    
    Here you can reuse the val "allTaskSets".


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to