Github user kayousterhout commented on the issue: https://github.com/apache/spark/pull/16620 tl;dr I donât think Markâs change is quite correct, which is why the tests were failing. Instead, I think we need to replace the failedEpoch if/else statement and the pendingPartitions update in DAGScheduler.handleTaskCompletion with: `if (stageIdToStage(task.stageId).latestInfo.attemptId == task.stageAttemptId) {⨠// This task was for the currently running attempt of the stage. Since the task ⨠// completed successfully from the perspective of the TaskSetManager, mark it as ⨠// no longer pending (the TaskSetManager may consider the task complete even // when the output needs to be ignored because the task's epoch is too small below). ⨠shuffleStage.pendingPartitions -= task.partitionId â¨} â¨â¨if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) { ⨠logInfo(s"Ignoring possibly bogus $smt completion from executor $execId")⨠} else { ⨠// The epoch of the task is acceptable (i.e., the task was launched after the most⨠// recent failure we're aware of for the executor), so mark the task's output as ⨠// available. ⨠shuffleStage.addOutputLoc(smt.partitionId, status)⨠// Remove the task's partition from pending partitions. This may have already been ⨠// done above, but will not have been done yet in cases where the task attempt was⨠// from an earlier attempt of the stage (i.e., not the attempt that's currently ⨠// running). This allows the DAGScheduler to mark the stage as complete when one⨠// copy of each task has finished successfully, even if the currently active stage ⨠// still has tasks running.⨠shuffleStage.pendingPartitions -= task.partitionIdâ¨} ` I submitted #16892 to attempt to clarify the test case where Markâs change originally failed (this PR shouldn't block on that -- that's just to clarify things for ourselves in the future), and also wrote a very long write up of whatâs going on below. âââââ There are three relevant pieces of state to consider here: (1) The tasks that the TaskSetManager (TSM) considers currently pending. The TSM encodes these pending tasks in its âsuccessfulâ array. When a task set is launched, all of its tasks are considered pending, and all of the entries in the successful array are False. Tasks are no longer considered pending (and are marked as True in the âsuccessfulâ array) if either (a) a copy of the task finishes successfully or (b) a copy of the task fails with a fetch failed (in which case the TSM assumes that the task will never complete successfully, because the previous stage needs to be re-run). Additionally, a task that previously completed successfully can be re-marked as pending if the stage is a shuffle map stage, and the executor where the task ran died (this is because the map output needs to be re-generated, and the TSM will re-schedule the task). The TSM notifies the DAGScheduler that the stage has completed if either (a) the stage fails (e.g., thereâs a fetch failure) or (b) all of the entries in âsuccessfulâ are true (i.e., there are no more pending tasks). (2) ShuffleMapStage.pendingPartitions. This variable is used by the DAGScheduler to track the pending tasks for a stage, and mostly is consistent with the TSMâs pending tasks (described above). When a stage begins, the DAGScheduler marks all of the partitions that need to be computed as pending, and then removes them from pendingPartitions as the TSM notifies the DAGScheduler that tasks have successfully completed. When a TSM determines that a task needs to be re-run (because itâs a shuffle map task that ran on a now-dead executor), the TSM sends a Resubmitted task completion event to the DAGScheduler, which causes the DAGScheduler to re-add the task to pendingPartitions (in doing so, the DAGScheduler is keeping pendingPartitions consistent with the TSMâs pending tasks). I believe there are two scenarios (currently) where ShuffleMapStage.pendingPartitions and the TSMâs pending tasks become inconsistent: -Scenario A (performance optimization, as discussed here already): This happens if a ShuffleMapStage gets re-run (e.g., because the first time it ran, it encountered a fetch failure, so the previous stage needed to be re-run to generate the missing output). Call the original attempt #0 and the currently running attempt #1. If thereâs a task from attempt #0 thatâs still running, and it is running on an executor that *was not* marked as failed (this is the condition captured by the failedEpoch if-statement), and it completes successfully, this event will be handled by the TSM for attempt #0. When the DAGScheduler hears that the task completed successfully, it will remove it from pendingPartitions (even though thereâs still a running copy of this task in the TSM for attempt #1, which is the currently active attempt). This allows the DAGScheduler to mark the stage has finished earlier than when the TSM thinks that the stage is finished. -Scenario B (bug, as discussed): This happens in the same case as scenario one, except that itâs when a task from attempt #0 completes successfully, but itâs on an executor that *was* marked as failed (again, this is the failedEpoch if-statement). In this case, the DAGScheduler considers the output âbogusâ (because the executor has since been lost, so the output is probably gone), but the DAGScheduler still removes the task from pendingPartitions. This can cause the DAGScheduler to determine that the stage is complete (the shuffleStage.pendingPartitions.isEmpty) if-statement, even though thereâs still another running copy of that task (in the TSM for attempt #1) that could complete successfully. The DAGScheduler will notice an output is missing (âif !shuffleStage.isAvailable)â and re-submit the stage, leading to an exception being thrown, because thereâs still an active TaskSetManager. This is the root cause of the bug here, and is fixed by the proposed code a bove. (3) ShuffleMapStage.outputLocs This tracks the output locations for all of the tasks in a stage. If a stage gets re-run, only the tasks that need to be re-run will be in the two variables above, but all of the tasks in the stage (including ones that have finished) will always be in outputLocs. One use of this variable thatâs different than the others is that outputLocs can be used after a stage completes and when no tasks are actively running. For example, if a task in the next stage fails with a fetch failure, the output location for the data that triggered the failure will be removed from ShuffleMapStage.outputLocs. outputLocs also may track multiple locations for a particular task (e.g., if two copies of the task completed successfully). As far as I understand, this will be inconsistent with pendingPartitions in two cases: - (A) Consider a task X that finishes successfully on an executor E. Now suppose executor E gets marked as lost (e.g., because of another task that failed to fetch data from E), causing the DAGScheduler to update the epoch on E, and the TaskSetManager to mark X as Resubmitted. Sometime after executor E is marked as lost, the TaskSetManager processes the task successful message for X. The TaskSetManager still considers X to be done and marks it as successful, and the DAGScheduler removes the task from ShuffleMapStage.pendingTasks. However, because the epoch on the machine is too old (in other words, because the DAGScheduler knows that executor E failed sometime after task X started), the DAGScheduler wonât register the output location for the task. This particular functionality is necessary for correctness, and will trigger the âif !shuffleStage.isAvailableâ statement. Task X needs to be re-run, and the TSM doesnât âknowâ that X needs to be re-run (it thinks X co mpleted successfully). If the DAGScheduler didnât remove X from pendingPartitions, things would end up hanging, as @jinxing64 pointed out. This is the test case I improved in #16892. - (B) In buggy scenario B above (which should get fixed by this PR). There are some more fixes we could do to clean some of this up and make it easier to reason about â but in the immediate future, I think the fix at the top is the best way to fix the current bug.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org