[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21131#discussion_r203435933 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -764,6 +769,19 @@ private[spark] class TaskSetManager( maybeFinishTaskSet() } + private[scheduler] def markPartitionCompleted(partitionId: Int): Unit = { +partitionToIndex.get(partitionId).foreach { index => + if (!successful(index)) { +tasksSuccessful += 1 +successful(index) = true +if (tasksSuccessful == numTasks) { + isZombie = true +} +maybeFinishTaskSet() --- End diff -- it's too minor. If we touch this file again, let's remove it. Otherwise maybe not bother about it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21131#discussion_r203415036 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -764,6 +769,19 @@ private[spark] class TaskSetManager( maybeFinishTaskSet() } + private[scheduler] def markPartitionCompleted(partitionId: Int): Unit = { +partitionToIndex.get(partitionId).foreach { index => + if (!successful(index)) { +tasksSuccessful += 1 +successful(index) = true +if (tasksSuccessful == numTasks) { + isZombie = true +} +maybeFinishTaskSet() --- End diff -- I think you're right, its not needed, its called when the tasks succeed, fail, or are aborted, and when this called while that taskset still has running tasks, then its a no-op, as it would fail the `runningTasks == 0` check inside `maybeFinishTaskSet()`. do you think its worth removing? I'm fine either way. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21131#discussion_r203257926 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -764,6 +769,19 @@ private[spark] class TaskSetManager( maybeFinishTaskSet() } + private[scheduler] def markPartitionCompleted(partitionId: Int): Unit = { +partitionToIndex.get(partitionId).foreach { index => + if (!successful(index)) { +tasksSuccessful += 1 +successful(index) = true +if (tasksSuccessful == numTasks) { + isZombie = true +} +maybeFinishTaskSet() --- End diff -- is this line needed? We will call `maybeFinishTaskSet()` at the end of `handleSuccessfulTask` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...
Github user squito closed the pull request at: https://github.com/apache/spark/pull/21131 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/21131#discussion_r183797532 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala --- @@ -917,4 +917,111 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B taskScheduler.initialize(new FakeSchedulerBackend) } } + + test("Completions in zombie tasksets update status of non-zombie taskset") { +val taskScheduler = setupSchedulerWithMockTaskSetBlacklist() +val valueSer = SparkEnv.get.serializer.newInstance() + +def completeTaskSuccessfully(tsm: TaskSetManager, partition: Int): Unit = { + val indexInTsm = tsm.partitionToIndex(partition) + val matchingTaskInfo = tsm.taskAttempts.flatten.filter(_.index == indexInTsm).head + val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq()) + tsm.handleSuccessfulTask(matchingTaskInfo.taskId, result) +} + +// Submit a task set, have it fail with a fetch failed, and then re-submit the task attempt, +// two times, so we have three active task sets for one stage. (For this to really happen, +// you'd need the previous stage to also get restarted, and then succeed, in between each +// attempt, but that happens outside what we're mocking here.) +val zombieAttempts = (0 until 2).map { stageAttempt => + val attempt = FakeTask.createTaskSet(10, stageAttemptId = stageAttempt) + taskScheduler.submitTasks(attempt) + val tsm = taskScheduler.taskSetManagerForAttempt(0, stageAttempt).get + val offers = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) } + taskScheduler.resourceOffers(offers) + assert(tsm.runningTasks === 10) + if (stageAttempt < 2) { +// fail attempt +tsm.handleFailedTask(tsm.taskAttempts.head.head.taskId, TaskState.FAILED, + FetchFailed(null, 0, 0, 0, "fetch failed")) +// the attempt is a zombie, but the tasks are still running (this could be true even if +// we actively killed those tasks, as killing is best-effort) +assert(tsm.isZombie) +assert(tsm.runningTasks === 9) + } + tsm +} + +// we've now got 2 zombie attempts, each with 9 tasks still active. Submit the 3rd attempt for +// the stage, but this time with insufficient resources so not all tasks are active. + +val finalAttempt = FakeTask.createTaskSet(10, stageAttemptId = 2) +taskScheduler.submitTasks(finalAttempt) +val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get +val offers = (0 until 5).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) } +val finalAttemptLaunchedPartitions = taskScheduler.resourceOffers(offers).flatten.map { task => --- End diff -- The explanation is quite clear and I get understand now. Thank you very mush! @squito --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21131#discussion_r183793745 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala --- @@ -917,4 +917,111 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B taskScheduler.initialize(new FakeSchedulerBackend) } } + + test("Completions in zombie tasksets update status of non-zombie taskset") { +val taskScheduler = setupSchedulerWithMockTaskSetBlacklist() +val valueSer = SparkEnv.get.serializer.newInstance() + +def completeTaskSuccessfully(tsm: TaskSetManager, partition: Int): Unit = { + val indexInTsm = tsm.partitionToIndex(partition) + val matchingTaskInfo = tsm.taskAttempts.flatten.filter(_.index == indexInTsm).head + val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq()) + tsm.handleSuccessfulTask(matchingTaskInfo.taskId, result) +} + +// Submit a task set, have it fail with a fetch failed, and then re-submit the task attempt, +// two times, so we have three active task sets for one stage. (For this to really happen, +// you'd need the previous stage to also get restarted, and then succeed, in between each +// attempt, but that happens outside what we're mocking here.) +val zombieAttempts = (0 until 2).map { stageAttempt => + val attempt = FakeTask.createTaskSet(10, stageAttemptId = stageAttempt) + taskScheduler.submitTasks(attempt) + val tsm = taskScheduler.taskSetManagerForAttempt(0, stageAttempt).get + val offers = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) } + taskScheduler.resourceOffers(offers) + assert(tsm.runningTasks === 10) + if (stageAttempt < 2) { +// fail attempt +tsm.handleFailedTask(tsm.taskAttempts.head.head.taskId, TaskState.FAILED, + FetchFailed(null, 0, 0, 0, "fetch failed")) +// the attempt is a zombie, but the tasks are still running (this could be true even if +// we actively killed those tasks, as killing is best-effort) +assert(tsm.isZombie) +assert(tsm.runningTasks === 9) + } + tsm +} + +// we've now got 2 zombie attempts, each with 9 tasks still active. Submit the 3rd attempt for +// the stage, but this time with insufficient resources so not all tasks are active. + +val finalAttempt = FakeTask.createTaskSet(10, stageAttemptId = 2) +taskScheduler.submitTasks(finalAttempt) +val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get +val offers = (0 until 5).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) } +val finalAttemptLaunchedPartitions = taskScheduler.resourceOffers(offers).flatten.map { task => --- End diff -- the assumption is that a fetchfailure means that *all* data on that host is unavailable. As shuffles are all-to-all, its very likely that every task is going to need some piece of data from that host. Its possible that they already grabbed all the data they need, before the problem occurred with the host, we don't know. Also, there is no "partial progress" for a task -- tasks don't know how to grab all the shuffle output they can, then just wait until the missing bit becomes available again. They fail as soon as the data they need is unavailable (with some retries, but there is no "pause" nor a check for data on another source). Also the dagscheduler is a little confusing on this -- it does the unregister in two parts (I have no idea why anymore, to be honest): https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1391 https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1406 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/21131#discussion_r183790463 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala --- @@ -917,4 +917,111 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B taskScheduler.initialize(new FakeSchedulerBackend) } } + + test("Completions in zombie tasksets update status of non-zombie taskset") { +val taskScheduler = setupSchedulerWithMockTaskSetBlacklist() +val valueSer = SparkEnv.get.serializer.newInstance() + +def completeTaskSuccessfully(tsm: TaskSetManager, partition: Int): Unit = { + val indexInTsm = tsm.partitionToIndex(partition) + val matchingTaskInfo = tsm.taskAttempts.flatten.filter(_.index == indexInTsm).head + val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq()) + tsm.handleSuccessfulTask(matchingTaskInfo.taskId, result) +} + +// Submit a task set, have it fail with a fetch failed, and then re-submit the task attempt, +// two times, so we have three active task sets for one stage. (For this to really happen, +// you'd need the previous stage to also get restarted, and then succeed, in between each +// attempt, but that happens outside what we're mocking here.) +val zombieAttempts = (0 until 2).map { stageAttempt => + val attempt = FakeTask.createTaskSet(10, stageAttemptId = stageAttempt) + taskScheduler.submitTasks(attempt) + val tsm = taskScheduler.taskSetManagerForAttempt(0, stageAttempt).get + val offers = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) } + taskScheduler.resourceOffers(offers) + assert(tsm.runningTasks === 10) + if (stageAttempt < 2) { +// fail attempt +tsm.handleFailedTask(tsm.taskAttempts.head.head.taskId, TaskState.FAILED, + FetchFailed(null, 0, 0, 0, "fetch failed")) +// the attempt is a zombie, but the tasks are still running (this could be true even if +// we actively killed those tasks, as killing is best-effort) +assert(tsm.isZombie) +assert(tsm.runningTasks === 9) + } + tsm +} + +// we've now got 2 zombie attempts, each with 9 tasks still active. Submit the 3rd attempt for +// the stage, but this time with insufficient resources so not all tasks are active. + +val finalAttempt = FakeTask.createTaskSet(10, stageAttemptId = 2) +taskScheduler.submitTasks(finalAttempt) +val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get +val offers = (0 until 5).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) } +val finalAttemptLaunchedPartitions = taskScheduler.resourceOffers(offers).flatten.map { task => + finalAttempt.tasks(task.index).partitionId +}.toSet +assert(finalTsm.runningTasks === 5) +assert(!finalTsm.isZombie) + +// We simulate late completions from our zombie tasksets, corresponding to all the pending +// partitions in our final attempt. This means we're only waiting on the tasks we've already +// launched. +val finalAttemptPendingPartitions = (0 until 10).toSet.diff(finalAttemptLaunchedPartitions) +finalAttemptPendingPartitions.foreach { partition => + completeTaskSuccessfully(zombieAttempts(0), partition) +} + +// If there is another resource offer, we shouldn't run anything. Though our final attempt +// used to have pending tasks, now those tasks have been completed by zombie attempts. The +// remaining tasks to compute are already active in the non-zombie attempt. +assert( + taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("exec-1", "host-1", 1))).flatten.isEmpty) + +val allTaskSets = zombieAttempts ++ Seq(finalTsm) +val remainingTasks = (0 until 10).toSet.diff(finalAttemptPendingPartitions) + +// finally, if we finish the remaining partitions from a mix of tasksets, all attempts should be +// marked as zombie. +// for each of the remaining tasks, find the tasksets with an active copy of the task, and +// finish the task. +remainingTasks.foreach { partition => + val tsm = if (partition == 0) { +// we failed this task on both zombie attempts, this one is only present in the latest +// taskset +finalTsm + } else { +// should be active in every taskset. We choose a zombie taskset just to make sure that +// we transition the active taskset correctly even if the final completion comes +// from a zombie. +zombieAttempts(partition
[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/21131#discussion_r183789814 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala --- @@ -917,4 +917,111 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B taskScheduler.initialize(new FakeSchedulerBackend) } } + + test("Completions in zombie tasksets update status of non-zombie taskset") { +val taskScheduler = setupSchedulerWithMockTaskSetBlacklist() +val valueSer = SparkEnv.get.serializer.newInstance() + +def completeTaskSuccessfully(tsm: TaskSetManager, partition: Int): Unit = { + val indexInTsm = tsm.partitionToIndex(partition) + val matchingTaskInfo = tsm.taskAttempts.flatten.filter(_.index == indexInTsm).head + val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq()) + tsm.handleSuccessfulTask(matchingTaskInfo.taskId, result) +} + +// Submit a task set, have it fail with a fetch failed, and then re-submit the task attempt, +// two times, so we have three active task sets for one stage. (For this to really happen, +// you'd need the previous stage to also get restarted, and then succeed, in between each +// attempt, but that happens outside what we're mocking here.) +val zombieAttempts = (0 until 2).map { stageAttempt => + val attempt = FakeTask.createTaskSet(10, stageAttemptId = stageAttempt) + taskScheduler.submitTasks(attempt) + val tsm = taskScheduler.taskSetManagerForAttempt(0, stageAttempt).get + val offers = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) } + taskScheduler.resourceOffers(offers) + assert(tsm.runningTasks === 10) + if (stageAttempt < 2) { +// fail attempt +tsm.handleFailedTask(tsm.taskAttempts.head.head.taskId, TaskState.FAILED, + FetchFailed(null, 0, 0, 0, "fetch failed")) +// the attempt is a zombie, but the tasks are still running (this could be true even if +// we actively killed those tasks, as killing is best-effort) +assert(tsm.isZombie) +assert(tsm.runningTasks === 9) + } + tsm +} + +// we've now got 2 zombie attempts, each with 9 tasks still active. Submit the 3rd attempt for +// the stage, but this time with insufficient resources so not all tasks are active. + +val finalAttempt = FakeTask.createTaskSet(10, stageAttemptId = 2) +taskScheduler.submitTasks(finalAttempt) +val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get +val offers = (0 until 5).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) } +val finalAttemptLaunchedPartitions = taskScheduler.resourceOffers(offers).flatten.map { task => --- End diff -- > because they won't be able to get their shuffle input, same as the original fetch failure why? In `DAGScheduler`, we only unregister one MapStatus of parent stage, so other running tasks within the failed (child) stage (caused by a fetch fail task) may still get MapOutputs from `MapOutputTrackerMaster`, and fetch data from other `Executor`s. So, they can success normally. Do I miss something? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...
Github user attilapiros commented on a diff in the pull request: https://github.com/apache/spark/pull/21131#discussion_r183783006 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala --- @@ -917,4 +917,111 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B taskScheduler.initialize(new FakeSchedulerBackend) } } + + test("Completions in zombie tasksets update status of non-zombie taskset") { +val taskScheduler = setupSchedulerWithMockTaskSetBlacklist() +val valueSer = SparkEnv.get.serializer.newInstance() + +def completeTaskSuccessfully(tsm: TaskSetManager, partition: Int): Unit = { + val indexInTsm = tsm.partitionToIndex(partition) + val matchingTaskInfo = tsm.taskAttempts.flatten.filter(_.index == indexInTsm).head + val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq()) + tsm.handleSuccessfulTask(matchingTaskInfo.taskId, result) +} + +// Submit a task set, have it fail with a fetch failed, and then re-submit the task attempt, +// two times, so we have three active task sets for one stage. (For this to really happen, +// you'd need the previous stage to also get restarted, and then succeed, in between each +// attempt, but that happens outside what we're mocking here.) +val zombieAttempts = (0 until 2).map { stageAttempt => + val attempt = FakeTask.createTaskSet(10, stageAttemptId = stageAttempt) + taskScheduler.submitTasks(attempt) + val tsm = taskScheduler.taskSetManagerForAttempt(0, stageAttempt).get + val offers = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) } + taskScheduler.resourceOffers(offers) + assert(tsm.runningTasks === 10) + if (stageAttempt < 2) { +// fail attempt +tsm.handleFailedTask(tsm.taskAttempts.head.head.taskId, TaskState.FAILED, + FetchFailed(null, 0, 0, 0, "fetch failed")) +// the attempt is a zombie, but the tasks are still running (this could be true even if +// we actively killed those tasks, as killing is best-effort) +assert(tsm.isZombie) +assert(tsm.runningTasks === 9) + } + tsm +} + +// we've now got 2 zombie attempts, each with 9 tasks still active. Submit the 3rd attempt for +// the stage, but this time with insufficient resources so not all tasks are active. + +val finalAttempt = FakeTask.createTaskSet(10, stageAttemptId = 2) +taskScheduler.submitTasks(finalAttempt) +val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get +val offers = (0 until 5).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) } +val finalAttemptLaunchedPartitions = taskScheduler.resourceOffers(offers).flatten.map { task => + finalAttempt.tasks(task.index).partitionId +}.toSet +assert(finalTsm.runningTasks === 5) +assert(!finalTsm.isZombie) + +// We simulate late completions from our zombie tasksets, corresponding to all the pending +// partitions in our final attempt. This means we're only waiting on the tasks we've already +// launched. +val finalAttemptPendingPartitions = (0 until 10).toSet.diff(finalAttemptLaunchedPartitions) +finalAttemptPendingPartitions.foreach { partition => + completeTaskSuccessfully(zombieAttempts(0), partition) +} + +// If there is another resource offer, we shouldn't run anything. Though our final attempt +// used to have pending tasks, now those tasks have been completed by zombie attempts. The +// remaining tasks to compute are already active in the non-zombie attempt. +assert( + taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("exec-1", "host-1", 1))).flatten.isEmpty) + +val allTaskSets = zombieAttempts ++ Seq(finalTsm) +val remainingTasks = (0 until 10).toSet.diff(finalAttemptPendingPartitions) --- End diff -- As I see remainingTasks is always the same as finalAttemptLaunchedPartitions. I am wondering whether it is more readable to use finalAttemptLaunchedPartitions here for initialisation. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21131#discussion_r183781695 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala --- @@ -917,4 +917,111 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B taskScheduler.initialize(new FakeSchedulerBackend) } } + + test("Completions in zombie tasksets update status of non-zombie taskset") { +val taskScheduler = setupSchedulerWithMockTaskSetBlacklist() +val valueSer = SparkEnv.get.serializer.newInstance() + +def completeTaskSuccessfully(tsm: TaskSetManager, partition: Int): Unit = { + val indexInTsm = tsm.partitionToIndex(partition) + val matchingTaskInfo = tsm.taskAttempts.flatten.filter(_.index == indexInTsm).head + val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq()) + tsm.handleSuccessfulTask(matchingTaskInfo.taskId, result) +} + +// Submit a task set, have it fail with a fetch failed, and then re-submit the task attempt, +// two times, so we have three active task sets for one stage. (For this to really happen, +// you'd need the previous stage to also get restarted, and then succeed, in between each +// attempt, but that happens outside what we're mocking here.) +val zombieAttempts = (0 until 2).map { stageAttempt => + val attempt = FakeTask.createTaskSet(10, stageAttemptId = stageAttempt) + taskScheduler.submitTasks(attempt) + val tsm = taskScheduler.taskSetManagerForAttempt(0, stageAttempt).get + val offers = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) } + taskScheduler.resourceOffers(offers) + assert(tsm.runningTasks === 10) + if (stageAttempt < 2) { +// fail attempt +tsm.handleFailedTask(tsm.taskAttempts.head.head.taskId, TaskState.FAILED, + FetchFailed(null, 0, 0, 0, "fetch failed")) +// the attempt is a zombie, but the tasks are still running (this could be true even if +// we actively killed those tasks, as killing is best-effort) +assert(tsm.isZombie) +assert(tsm.runningTasks === 9) + } + tsm +} + +// we've now got 2 zombie attempts, each with 9 tasks still active. Submit the 3rd attempt for +// the stage, but this time with insufficient resources so not all tasks are active. + +val finalAttempt = FakeTask.createTaskSet(10, stageAttemptId = 2) +taskScheduler.submitTasks(finalAttempt) +val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get +val offers = (0 until 5).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) } +val finalAttemptLaunchedPartitions = taskScheduler.resourceOffers(offers).flatten.map { task => --- End diff -- we've previously debated about what to do with the tasks still running in a zombie attempt, and there hasn't been any definitive conclusion. I'm just trying to do a correctness fix here. Briefly, in general there is an expectation that those tasks are unlikely to succeed (because they won't be able to get their shuffle input, same as the original fetch failure), so we don't want to delay starting a new attempt of that task. And perhaps we should even actively kill those tasks (you'll see comments about that in various places). But if they do succeed, we need to handle them correctly. Note that even if we did try to actively kill them, you'd still need to handle a late-completion, as killing would only be "best-effort". --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21131#discussion_r183779968 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala --- @@ -917,4 +917,111 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B taskScheduler.initialize(new FakeSchedulerBackend) } } + + test("Completions in zombie tasksets update status of non-zombie taskset") { +val taskScheduler = setupSchedulerWithMockTaskSetBlacklist() +val valueSer = SparkEnv.get.serializer.newInstance() + +def completeTaskSuccessfully(tsm: TaskSetManager, partition: Int): Unit = { + val indexInTsm = tsm.partitionToIndex(partition) + val matchingTaskInfo = tsm.taskAttempts.flatten.filter(_.index == indexInTsm).head + val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq()) + tsm.handleSuccessfulTask(matchingTaskInfo.taskId, result) +} + +// Submit a task set, have it fail with a fetch failed, and then re-submit the task attempt, +// two times, so we have three active task sets for one stage. (For this to really happen, +// you'd need the previous stage to also get restarted, and then succeed, in between each +// attempt, but that happens outside what we're mocking here.) +val zombieAttempts = (0 until 2).map { stageAttempt => + val attempt = FakeTask.createTaskSet(10, stageAttemptId = stageAttempt) + taskScheduler.submitTasks(attempt) + val tsm = taskScheduler.taskSetManagerForAttempt(0, stageAttempt).get + val offers = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) } + taskScheduler.resourceOffers(offers) + assert(tsm.runningTasks === 10) + if (stageAttempt < 2) { --- End diff -- good point, fixed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21131#discussion_r183778637 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala --- @@ -917,4 +917,111 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B taskScheduler.initialize(new FakeSchedulerBackend) } } + + test("Completions in zombie tasksets update status of non-zombie taskset") { +val taskScheduler = setupSchedulerWithMockTaskSetBlacklist() +val valueSer = SparkEnv.get.serializer.newInstance() + +def completeTaskSuccessfully(tsm: TaskSetManager, partition: Int): Unit = { + val indexInTsm = tsm.partitionToIndex(partition) + val matchingTaskInfo = tsm.taskAttempts.flatten.filter(_.index == indexInTsm).head + val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq()) + tsm.handleSuccessfulTask(matchingTaskInfo.taskId, result) +} + +// Submit a task set, have it fail with a fetch failed, and then re-submit the task attempt, +// two times, so we have three active task sets for one stage. (For this to really happen, +// you'd need the previous stage to also get restarted, and then succeed, in between each +// attempt, but that happens outside what we're mocking here.) +val zombieAttempts = (0 until 2).map { stageAttempt => + val attempt = FakeTask.createTaskSet(10, stageAttemptId = stageAttempt) + taskScheduler.submitTasks(attempt) + val tsm = taskScheduler.taskSetManagerForAttempt(0, stageAttempt).get + val offers = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) } + taskScheduler.resourceOffers(offers) + assert(tsm.runningTasks === 10) + if (stageAttempt < 2) { +// fail attempt +tsm.handleFailedTask(tsm.taskAttempts.head.head.taskId, TaskState.FAILED, + FetchFailed(null, 0, 0, 0, "fetch failed")) +// the attempt is a zombie, but the tasks are still running (this could be true even if +// we actively killed those tasks, as killing is best-effort) +assert(tsm.isZombie) +assert(tsm.runningTasks === 9) + } + tsm +} + +// we've now got 2 zombie attempts, each with 9 tasks still active. Submit the 3rd attempt for +// the stage, but this time with insufficient resources so not all tasks are active. + +val finalAttempt = FakeTask.createTaskSet(10, stageAttemptId = 2) +taskScheduler.submitTasks(finalAttempt) +val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get +val offers = (0 until 5).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) } +val finalAttemptLaunchedPartitions = taskScheduler.resourceOffers(offers).flatten.map { task => + finalAttempt.tasks(task.index).partitionId +}.toSet +assert(finalTsm.runningTasks === 5) +assert(!finalTsm.isZombie) + +// We simulate late completions from our zombie tasksets, corresponding to all the pending +// partitions in our final attempt. This means we're only waiting on the tasks we've already +// launched. +val finalAttemptPendingPartitions = (0 until 10).toSet.diff(finalAttemptLaunchedPartitions) +finalAttemptPendingPartitions.foreach { partition => + completeTaskSuccessfully(zombieAttempts(0), partition) +} + +// If there is another resource offer, we shouldn't run anything. Though our final attempt +// used to have pending tasks, now those tasks have been completed by zombie attempts. The +// remaining tasks to compute are already active in the non-zombie attempt. +assert( + taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("exec-1", "host-1", 1))).flatten.isEmpty) + +val allTaskSets = zombieAttempts ++ Seq(finalTsm) +val remainingTasks = (0 until 10).toSet.diff(finalAttemptPendingPartitions) + +// finally, if we finish the remaining partitions from a mix of tasksets, all attempts should be +// marked as zombie. +// for each of the remaining tasks, find the tasksets with an active copy of the task, and +// finish the task. +remainingTasks.foreach { partition => + val tsm = if (partition == 0) { +// we failed this task on both zombie attempts, this one is only present in the latest +// taskset +finalTsm + } else { +// should be active in every taskset. We choose a zombie taskset just to make sure that +// we transition the active taskset correctly even if the final completion comes +// from a zombie. +zombieAttempts(partition %
[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21131#discussion_r183777930 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -689,6 +689,20 @@ private[spark] class TaskSchedulerImpl( } } + /** + * Marks the task has completed in all TaskSetManagers for the given stage. + * + * After stage failure and retry, there may be multiple active TaskSetManagers for the stage. --- End diff -- yeah the terminology is a bit of mess here ... I dunno if we consistently distinguish the use of "active" for one taskset which is non-zombie vs. all the tasksets which have some tasks that are running (though all-but-one must be zombies). @markhamstra @kayousterhout thoughts on naming? In any case, I think you're right, I will remove "active" here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21131#discussion_r183776603 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala --- @@ -917,4 +917,111 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B taskScheduler.initialize(new FakeSchedulerBackend) } } + + test("Completions in zombie tasksets update status of non-zombie taskset") { +val taskScheduler = setupSchedulerWithMockTaskSetBlacklist() +val valueSer = SparkEnv.get.serializer.newInstance() + +def completeTaskSuccessfully(tsm: TaskSetManager, partition: Int): Unit = { + val indexInTsm = tsm.partitionToIndex(partition) + val matchingTaskInfo = tsm.taskAttempts.flatten.filter(_.index == indexInTsm).head + val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq()) + tsm.handleSuccessfulTask(matchingTaskInfo.taskId, result) +} + +// Submit a task set, have it fail with a fetch failed, and then re-submit the task attempt, +// two times, so we have three active task sets for one stage. (For this to really happen, +// you'd need the previous stage to also get restarted, and then succeed, in between each +// attempt, but that happens outside what we're mocking here.) +val zombieAttempts = (0 until 2).map { stageAttempt => + val attempt = FakeTask.createTaskSet(10, stageAttemptId = stageAttempt) + taskScheduler.submitTasks(attempt) + val tsm = taskScheduler.taskSetManagerForAttempt(0, stageAttempt).get + val offers = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) } + taskScheduler.resourceOffers(offers) + assert(tsm.runningTasks === 10) + if (stageAttempt < 2) { +// fail attempt +tsm.handleFailedTask(tsm.taskAttempts.head.head.taskId, TaskState.FAILED, + FetchFailed(null, 0, 0, 0, "fetch failed")) +// the attempt is a zombie, but the tasks are still running (this could be true even if +// we actively killed those tasks, as killing is best-effort) +assert(tsm.isZombie) +assert(tsm.runningTasks === 9) + } + tsm +} + +// we've now got 2 zombie attempts, each with 9 tasks still active. Submit the 3rd attempt for +// the stage, but this time with insufficient resources so not all tasks are active. + +val finalAttempt = FakeTask.createTaskSet(10, stageAttemptId = 2) +taskScheduler.submitTasks(finalAttempt) +val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get +val offers = (0 until 5).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) } +val finalAttemptLaunchedPartitions = taskScheduler.resourceOffers(offers).flatten.map { task => + finalAttempt.tasks(task.index).partitionId +}.toSet +assert(finalTsm.runningTasks === 5) +assert(!finalTsm.isZombie) + +// We simulate late completions from our zombie tasksets, corresponding to all the pending +// partitions in our final attempt. This means we're only waiting on the tasks we've already +// launched. +val finalAttemptPendingPartitions = (0 until 10).toSet.diff(finalAttemptLaunchedPartitions) +finalAttemptPendingPartitions.foreach { partition => + completeTaskSuccessfully(zombieAttempts(0), partition) +} + +// If there is another resource offer, we shouldn't run anything. Though our final attempt +// used to have pending tasks, now those tasks have been completed by zombie attempts. The +// remaining tasks to compute are already active in the non-zombie attempt. +assert( + taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("exec-1", "host-1", 1))).flatten.isEmpty) + +val allTaskSets = zombieAttempts ++ Seq(finalTsm) +val remainingTasks = (0 until 10).toSet.diff(finalAttemptPendingPartitions) + +// finally, if we finish the remaining partitions from a mix of tasksets, all attempts should be +// marked as zombie. +// for each of the remaining tasks, find the tasksets with an active copy of the task, and +// finish the task. +remainingTasks.foreach { partition => + val tsm = if (partition == 0) { +// we failed this task on both zombie attempts, this one is only present in the latest +// taskset +finalTsm + } else { +// should be active in every taskset. We choose a zombie taskset just to make sure that +// we transition the active taskset correctly even if the final completion comes +// from a zombie. +zombieAttempts(partition %
[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...
Github user attilapiros commented on a diff in the pull request: https://github.com/apache/spark/pull/21131#discussion_r183775077 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala --- @@ -917,4 +917,111 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B taskScheduler.initialize(new FakeSchedulerBackend) } } + + test("Completions in zombie tasksets update status of non-zombie taskset") { +val taskScheduler = setupSchedulerWithMockTaskSetBlacklist() +val valueSer = SparkEnv.get.serializer.newInstance() + +def completeTaskSuccessfully(tsm: TaskSetManager, partition: Int): Unit = { + val indexInTsm = tsm.partitionToIndex(partition) + val matchingTaskInfo = tsm.taskAttempts.flatten.filter(_.index == indexInTsm).head + val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq()) + tsm.handleSuccessfulTask(matchingTaskInfo.taskId, result) +} + +// Submit a task set, have it fail with a fetch failed, and then re-submit the task attempt, +// two times, so we have three active task sets for one stage. (For this to really happen, +// you'd need the previous stage to also get restarted, and then succeed, in between each +// attempt, but that happens outside what we're mocking here.) +val zombieAttempts = (0 until 2).map { stageAttempt => + val attempt = FakeTask.createTaskSet(10, stageAttemptId = stageAttempt) + taskScheduler.submitTasks(attempt) + val tsm = taskScheduler.taskSetManagerForAttempt(0, stageAttempt).get + val offers = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) } + taskScheduler.resourceOffers(offers) + assert(tsm.runningTasks === 10) + if (stageAttempt < 2) { --- End diff -- This condition is not needed as the stageAttempt iterates on Range(0, 1). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...
Github user attilapiros commented on a diff in the pull request: https://github.com/apache/spark/pull/21131#discussion_r183774355 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala --- @@ -917,4 +917,111 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B taskScheduler.initialize(new FakeSchedulerBackend) } } + + test("Completions in zombie tasksets update status of non-zombie taskset") { +val taskScheduler = setupSchedulerWithMockTaskSetBlacklist() +val valueSer = SparkEnv.get.serializer.newInstance() + +def completeTaskSuccessfully(tsm: TaskSetManager, partition: Int): Unit = { + val indexInTsm = tsm.partitionToIndex(partition) + val matchingTaskInfo = tsm.taskAttempts.flatten.filter(_.index == indexInTsm).head + val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq()) + tsm.handleSuccessfulTask(matchingTaskInfo.taskId, result) +} + +// Submit a task set, have it fail with a fetch failed, and then re-submit the task attempt, +// two times, so we have three active task sets for one stage. (For this to really happen, +// you'd need the previous stage to also get restarted, and then succeed, in between each +// attempt, but that happens outside what we're mocking here.) +val zombieAttempts = (0 until 2).map { stageAttempt => + val attempt = FakeTask.createTaskSet(10, stageAttemptId = stageAttempt) + taskScheduler.submitTasks(attempt) + val tsm = taskScheduler.taskSetManagerForAttempt(0, stageAttempt).get + val offers = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) } + taskScheduler.resourceOffers(offers) + assert(tsm.runningTasks === 10) + if (stageAttempt < 2) { +// fail attempt +tsm.handleFailedTask(tsm.taskAttempts.head.head.taskId, TaskState.FAILED, + FetchFailed(null, 0, 0, 0, "fetch failed")) +// the attempt is a zombie, but the tasks are still running (this could be true even if +// we actively killed those tasks, as killing is best-effort) +assert(tsm.isZombie) +assert(tsm.runningTasks === 9) + } + tsm +} + +// we've now got 2 zombie attempts, each with 9 tasks still active. Submit the 3rd attempt for +// the stage, but this time with insufficient resources so not all tasks are active. + +val finalAttempt = FakeTask.createTaskSet(10, stageAttemptId = 2) +taskScheduler.submitTasks(finalAttempt) +val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get +val offers = (0 until 5).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) } +val finalAttemptLaunchedPartitions = taskScheduler.resourceOffers(offers).flatten.map { task => + finalAttempt.tasks(task.index).partitionId +}.toSet +assert(finalTsm.runningTasks === 5) +assert(!finalTsm.isZombie) + +// We simulate late completions from our zombie tasksets, corresponding to all the pending +// partitions in our final attempt. This means we're only waiting on the tasks we've already +// launched. +val finalAttemptPendingPartitions = (0 until 10).toSet.diff(finalAttemptLaunchedPartitions) +finalAttemptPendingPartitions.foreach { partition => + completeTaskSuccessfully(zombieAttempts(0), partition) +} + +// If there is another resource offer, we shouldn't run anything. Though our final attempt +// used to have pending tasks, now those tasks have been completed by zombie attempts. The +// remaining tasks to compute are already active in the non-zombie attempt. +assert( + taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("exec-1", "host-1", 1))).flatten.isEmpty) + +val allTaskSets = zombieAttempts ++ Seq(finalTsm) +val remainingTasks = (0 until 10).toSet.diff(finalAttemptPendingPartitions) + +// finally, if we finish the remaining partitions from a mix of tasksets, all attempts should be +// marked as zombie. +// for each of the remaining tasks, find the tasksets with an active copy of the task, and +// finish the task. +remainingTasks.foreach { partition => + val tsm = if (partition == 0) { +// we failed this task on both zombie attempts, this one is only present in the latest +// taskset +finalTsm + } else { +// should be active in every taskset. We choose a zombie taskset just to make sure that +// we transition the active taskset correctly even if the final completion comes +// from a zombie. +zombieAttempts(partit
[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21131#discussion_r183775539 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala --- @@ -917,4 +917,111 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B taskScheduler.initialize(new FakeSchedulerBackend) } } + + test("Completions in zombie tasksets update status of non-zombie taskset") { +val taskScheduler = setupSchedulerWithMockTaskSetBlacklist() +val valueSer = SparkEnv.get.serializer.newInstance() + +def completeTaskSuccessfully(tsm: TaskSetManager, partition: Int): Unit = { + val indexInTsm = tsm.partitionToIndex(partition) + val matchingTaskInfo = tsm.taskAttempts.flatten.filter(_.index == indexInTsm).head + val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq()) + tsm.handleSuccessfulTask(matchingTaskInfo.taskId, result) +} + +// Submit a task set, have it fail with a fetch failed, and then re-submit the task attempt, +// two times, so we have three active task sets for one stage. (For this to really happen, +// you'd need the previous stage to also get restarted, and then succeed, in between each +// attempt, but that happens outside what we're mocking here.) +val zombieAttempts = (0 until 2).map { stageAttempt => + val attempt = FakeTask.createTaskSet(10, stageAttemptId = stageAttempt) + taskScheduler.submitTasks(attempt) + val tsm = taskScheduler.taskSetManagerForAttempt(0, stageAttempt).get + val offers = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) } + taskScheduler.resourceOffers(offers) + assert(tsm.runningTasks === 10) + if (stageAttempt < 2) { +// fail attempt +tsm.handleFailedTask(tsm.taskAttempts.head.head.taskId, TaskState.FAILED, + FetchFailed(null, 0, 0, 0, "fetch failed")) +// the attempt is a zombie, but the tasks are still running (this could be true even if +// we actively killed those tasks, as killing is best-effort) +assert(tsm.isZombie) +assert(tsm.runningTasks === 9) + } + tsm +} + +// we've now got 2 zombie attempts, each with 9 tasks still active. Submit the 3rd attempt for +// the stage, but this time with insufficient resources so not all tasks are active. + +val finalAttempt = FakeTask.createTaskSet(10, stageAttemptId = 2) +taskScheduler.submitTasks(finalAttempt) +val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get +val offers = (0 until 5).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) } +val finalAttemptLaunchedPartitions = taskScheduler.resourceOffers(offers).flatten.map { task => + finalAttempt.tasks(task.index).partitionId +}.toSet +assert(finalTsm.runningTasks === 5) +assert(!finalTsm.isZombie) + +// We simulate late completions from our zombie tasksets, corresponding to all the pending +// partitions in our final attempt. This means we're only waiting on the tasks we've already +// launched. +val finalAttemptPendingPartitions = (0 until 10).toSet.diff(finalAttemptLaunchedPartitions) +finalAttemptPendingPartitions.foreach { partition => + completeTaskSuccessfully(zombieAttempts(0), partition) +} + +// If there is another resource offer, we shouldn't run anything. Though our final attempt +// used to have pending tasks, now those tasks have been completed by zombie attempts. The +// remaining tasks to compute are already active in the non-zombie attempt. +assert( + taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("exec-1", "host-1", 1))).flatten.isEmpty) + +val allTaskSets = zombieAttempts ++ Seq(finalTsm) +val remainingTasks = (0 until 10).toSet.diff(finalAttemptPendingPartitions) + +// finally, if we finish the remaining partitions from a mix of tasksets, all attempts should be +// marked as zombie. +// for each of the remaining tasks, find the tasksets with an active copy of the task, and +// finish the task. +remainingTasks.foreach { partition => + val tsm = if (partition == 0) { +// we failed this task on both zombie attempts, this one is only present in the latest +// taskset +finalTsm + } else { +// should be active in every taskset. We choose a zombie taskset just to make sure that +// we transition the active taskset correctly even if the final completion comes +// from a zombie. +zombieAttempts(partition %
[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/21131#discussion_r183766277 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala --- @@ -917,4 +917,111 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B taskScheduler.initialize(new FakeSchedulerBackend) } } + + test("Completions in zombie tasksets update status of non-zombie taskset") { +val taskScheduler = setupSchedulerWithMockTaskSetBlacklist() +val valueSer = SparkEnv.get.serializer.newInstance() + +def completeTaskSuccessfully(tsm: TaskSetManager, partition: Int): Unit = { + val indexInTsm = tsm.partitionToIndex(partition) + val matchingTaskInfo = tsm.taskAttempts.flatten.filter(_.index == indexInTsm).head + val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq()) + tsm.handleSuccessfulTask(matchingTaskInfo.taskId, result) +} + +// Submit a task set, have it fail with a fetch failed, and then re-submit the task attempt, +// two times, so we have three active task sets for one stage. (For this to really happen, +// you'd need the previous stage to also get restarted, and then succeed, in between each +// attempt, but that happens outside what we're mocking here.) +val zombieAttempts = (0 until 2).map { stageAttempt => + val attempt = FakeTask.createTaskSet(10, stageAttemptId = stageAttempt) + taskScheduler.submitTasks(attempt) + val tsm = taskScheduler.taskSetManagerForAttempt(0, stageAttempt).get + val offers = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) } + taskScheduler.resourceOffers(offers) + assert(tsm.runningTasks === 10) + if (stageAttempt < 2) { +// fail attempt +tsm.handleFailedTask(tsm.taskAttempts.head.head.taskId, TaskState.FAILED, + FetchFailed(null, 0, 0, 0, "fetch failed")) +// the attempt is a zombie, but the tasks are still running (this could be true even if +// we actively killed those tasks, as killing is best-effort) +assert(tsm.isZombie) +assert(tsm.runningTasks === 9) + } + tsm +} + +// we've now got 2 zombie attempts, each with 9 tasks still active. Submit the 3rd attempt for +// the stage, but this time with insufficient resources so not all tasks are active. + +val finalAttempt = FakeTask.createTaskSet(10, stageAttemptId = 2) +taskScheduler.submitTasks(finalAttempt) +val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get +val offers = (0 until 5).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) } +val finalAttemptLaunchedPartitions = taskScheduler.resourceOffers(offers).flatten.map { task => + finalAttempt.tasks(task.index).partitionId +}.toSet +assert(finalTsm.runningTasks === 5) +assert(!finalTsm.isZombie) + +// We simulate late completions from our zombie tasksets, corresponding to all the pending +// partitions in our final attempt. This means we're only waiting on the tasks we've already +// launched. +val finalAttemptPendingPartitions = (0 until 10).toSet.diff(finalAttemptLaunchedPartitions) +finalAttemptPendingPartitions.foreach { partition => + completeTaskSuccessfully(zombieAttempts(0), partition) +} + +// If there is another resource offer, we shouldn't run anything. Though our final attempt +// used to have pending tasks, now those tasks have been completed by zombie attempts. The +// remaining tasks to compute are already active in the non-zombie attempt. +assert( + taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("exec-1", "host-1", 1))).flatten.isEmpty) + +val allTaskSets = zombieAttempts ++ Seq(finalTsm) +val remainingTasks = (0 until 10).toSet.diff(finalAttemptPendingPartitions) + +// finally, if we finish the remaining partitions from a mix of tasksets, all attempts should be +// marked as zombie. +// for each of the remaining tasks, find the tasksets with an active copy of the task, and +// finish the task. +remainingTasks.foreach { partition => + val tsm = if (partition == 0) { +// we failed this task on both zombie attempts, this one is only present in the latest +// taskset +finalTsm + } else { +// should be active in every taskset. We choose a zombie taskset just to make sure that +// we transition the active taskset correctly even if the final completion comes +// from a zombie. +zombieAttempts(partit
[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/21131#discussion_r183701269 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala --- @@ -917,4 +917,111 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B taskScheduler.initialize(new FakeSchedulerBackend) } } + + test("Completions in zombie tasksets update status of non-zombie taskset") { +val taskScheduler = setupSchedulerWithMockTaskSetBlacklist() +val valueSer = SparkEnv.get.serializer.newInstance() + +def completeTaskSuccessfully(tsm: TaskSetManager, partition: Int): Unit = { + val indexInTsm = tsm.partitionToIndex(partition) + val matchingTaskInfo = tsm.taskAttempts.flatten.filter(_.index == indexInTsm).head + val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq()) + tsm.handleSuccessfulTask(matchingTaskInfo.taskId, result) +} + +// Submit a task set, have it fail with a fetch failed, and then re-submit the task attempt, +// two times, so we have three active task sets for one stage. (For this to really happen, +// you'd need the previous stage to also get restarted, and then succeed, in between each +// attempt, but that happens outside what we're mocking here.) +val zombieAttempts = (0 until 2).map { stageAttempt => + val attempt = FakeTask.createTaskSet(10, stageAttemptId = stageAttempt) + taskScheduler.submitTasks(attempt) + val tsm = taskScheduler.taskSetManagerForAttempt(0, stageAttempt).get + val offers = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) } + taskScheduler.resourceOffers(offers) + assert(tsm.runningTasks === 10) + if (stageAttempt < 2) { +// fail attempt +tsm.handleFailedTask(tsm.taskAttempts.head.head.taskId, TaskState.FAILED, + FetchFailed(null, 0, 0, 0, "fetch failed")) +// the attempt is a zombie, but the tasks are still running (this could be true even if +// we actively killed those tasks, as killing is best-effort) +assert(tsm.isZombie) +assert(tsm.runningTasks === 9) + } + tsm +} + +// we've now got 2 zombie attempts, each with 9 tasks still active. Submit the 3rd attempt for +// the stage, but this time with insufficient resources so not all tasks are active. + +val finalAttempt = FakeTask.createTaskSet(10, stageAttemptId = 2) +taskScheduler.submitTasks(finalAttempt) +val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get +val offers = (0 until 5).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) } +val finalAttemptLaunchedPartitions = taskScheduler.resourceOffers(offers).flatten.map { task => --- End diff -- Yet, launched tasks has nothing to do with other running tasks in other `TaskSet`s. But, is it possible to take those running tasks into consideration when launch a new task (in source code) ? For example, launching FetchFailed task or tasks who do not have a running copy across `TaskSet`s firstly ? (But, it seems we will always have running copies in other `TaskSet`s for our final `TaskSet`, except FetchFailed task, right? It's more like we are not talking about resubmitting a stage, but resubmitting tasks who do not have running copies across previous `TaskSet`s.) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/21131#discussion_r183690133 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala --- @@ -917,4 +917,111 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B taskScheduler.initialize(new FakeSchedulerBackend) } } + + test("Completions in zombie tasksets update status of non-zombie taskset") { +val taskScheduler = setupSchedulerWithMockTaskSetBlacklist() +val valueSer = SparkEnv.get.serializer.newInstance() + +def completeTaskSuccessfully(tsm: TaskSetManager, partition: Int): Unit = { + val indexInTsm = tsm.partitionToIndex(partition) + val matchingTaskInfo = tsm.taskAttempts.flatten.filter(_.index == indexInTsm).head + val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq()) + tsm.handleSuccessfulTask(matchingTaskInfo.taskId, result) +} + +// Submit a task set, have it fail with a fetch failed, and then re-submit the task attempt, +// two times, so we have three active task sets for one stage. (For this to really happen, +// you'd need the previous stage to also get restarted, and then succeed, in between each +// attempt, but that happens outside what we're mocking here.) +val zombieAttempts = (0 until 2).map { stageAttempt => + val attempt = FakeTask.createTaskSet(10, stageAttemptId = stageAttempt) + taskScheduler.submitTasks(attempt) + val tsm = taskScheduler.taskSetManagerForAttempt(0, stageAttempt).get + val offers = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) } + taskScheduler.resourceOffers(offers) + assert(tsm.runningTasks === 10) + if (stageAttempt < 2) { +// fail attempt +tsm.handleFailedTask(tsm.taskAttempts.head.head.taskId, TaskState.FAILED, + FetchFailed(null, 0, 0, 0, "fetch failed")) +// the attempt is a zombie, but the tasks are still running (this could be true even if +// we actively killed those tasks, as killing is best-effort) +assert(tsm.isZombie) +assert(tsm.runningTasks === 9) + } + tsm +} + +// we've now got 2 zombie attempts, each with 9 tasks still active. Submit the 3rd attempt for +// the stage, but this time with insufficient resources so not all tasks are active. + +val finalAttempt = FakeTask.createTaskSet(10, stageAttemptId = 2) +taskScheduler.submitTasks(finalAttempt) +val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get +val offers = (0 until 5).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) } +val finalAttemptLaunchedPartitions = taskScheduler.resourceOffers(offers).flatten.map { task => + finalAttempt.tasks(task.index).partitionId +}.toSet +assert(finalTsm.runningTasks === 5) +assert(!finalTsm.isZombie) + +// We simulate late completions from our zombie tasksets, corresponding to all the pending +// partitions in our final attempt. This means we're only waiting on the tasks we've already +// launched. +val finalAttemptPendingPartitions = (0 until 10).toSet.diff(finalAttemptLaunchedPartitions) +finalAttemptPendingPartitions.foreach { partition => + completeTaskSuccessfully(zombieAttempts(0), partition) +} + +// If there is another resource offer, we shouldn't run anything. Though our final attempt +// used to have pending tasks, now those tasks have been completed by zombie attempts. The +// remaining tasks to compute are already active in the non-zombie attempt. +assert( + taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("exec-1", "host-1", 1))).flatten.isEmpty) + +val allTaskSets = zombieAttempts ++ Seq(finalTsm) +val remainingTasks = (0 until 10).toSet.diff(finalAttemptPendingPartitions) + +// finally, if we finish the remaining partitions from a mix of tasksets, all attempts should be +// marked as zombie. +// for each of the remaining tasks, find the tasksets with an active copy of the task, and +// finish the task. +remainingTasks.foreach { partition => + val tsm = if (partition == 0) { +// we failed this task on both zombie attempts, this one is only present in the latest +// taskset +finalTsm + } else { +// should be active in every taskset. We choose a zombie taskset just to make sure that +// we transition the active taskset correctly even if the final completion comes +// from a zombie. +zombieAttempts(partition
[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/21131#discussion_r183619646 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -689,6 +689,20 @@ private[spark] class TaskSchedulerImpl( } } + /** + * Marks the task has completed in all TaskSetManagers for the given stage. + * + * After stage failure and retry, there may be multiple active TaskSetManagers for the stage. --- End diff -- IIRC, there's only one active `TaskSetManager` for a given stage, and with some zombie `TaskSetManager`s possibly. Though, there may be some running tasks in zombie `TaskSetManager`s. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/21131#discussion_r183619704 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -689,6 +689,20 @@ private[spark] class TaskSchedulerImpl( } } + /** + * Marks the task has completed in all TaskSetManagers for the given stage. + * + * After stage failure and retry, there may be multiple active TaskSetManagers for the stage. + * If an earlier attempt of a stage completes a task, we should ensure that the later attempts + * do not also submit those same tasks. That also means that a task completion from an earlier + * attempt can lead to the entire stage getting marked as successful. + */ + private[scheduler] def markPartitionCompletedInAllTaskSets(stageId: Int, partitionId: Int) = { +taskSetsByStageIdAndAttempt.getOrElse(stageId, Map()).values.foreach { tsm => --- End diff -- Generally, it seems impossible for a unfinished `TaskSet` to get an empty `Map()` in `taskSetsByStageIdAndAttempt` . But, if it does, maybe, we can tell the caller the stage has already finished. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...
GitHub user squito opened a pull request: https://github.com/apache/spark/pull/21131 [SPARK-23433][CORE] Late zombie task completions update all tasksets Fetch failure lead to multiple tasksets which are active for a given stage. A late completion from an earlier attempt of the stage should update the most recent attempt for the stage, so it does not try to submit another task for the same partition, and so that it knows when it is completed and when it should be marked as a "zombie". Added a regression test. You can merge this pull request into a Git repository by running: $ git pull https://github.com/squito/spark SPARK-23433 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21131.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21131 commit 0720a7cd6826614e516c3d3a51bd4519259cbe3b Author: Imran Rashid Date: 2018-02-21T20:21:14Z [SPARK-23433][CORE] Late zombie task completions update all tasksets After a fetch failure and stage retry, we may have multiple tasksets which are active for a given stage. A late completion from an earlier attempt of the stage should update the most recent attempt for the stage, so it does not try to submit another task for the same partition, and so that it knows when it is completed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org