Github user kayousterhout commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    tl;dr I don’t think Mark’s change is quite correct, which is why the 
tests were failing.  Instead, I think we need to replace the failedEpoch 
if/else statement and the pendingPartitions update in 
DAGScheduler.handleTaskCompletion with:
    
    `if (stageIdToStage(task.stageId).latestInfo.attemptId == 
task.stageAttemptId) {

      // This task was for the currently running attempt of the stage. Since 
the task
    
  // completed successfully from the perspective of the TaskSetManager, 
mark it as
    
  // no longer pending (the TaskSetManager may consider the task 
complete even
      // when the output needs to be ignored because the task's epoch is too 
small below).
    
  shuffleStage.pendingPartitions -= task.partitionId
    
}
    
    

if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) 
{
    
  logInfo(s"Ignoring possibly bogus $smt completion from executor 
$execId")

    } else {
    
  // The epoch of the task is acceptable (i.e., the task was launched 
after the most

      // recent failure we're aware of for the executor), so mark the task's 
output as
    
  // available.
    
  shuffleStage.addOutputLoc(smt.partitionId, status)

      // Remove the task's partition from pending partitions.  This may have 
already been
    
  // done above, but will not have been done yet in cases where the task 
attempt was

      // from an earlier attempt of the stage (i.e., not the attempt that's 
currently
    
  // running).  This allows the DAGScheduler to mark the stage as 
complete when one

      // copy of each task has finished successfully, even if the currently 
active stage
    
  // still has tasks running.

      shuffleStage.pendingPartitions -= task.partitionId
}
    `
    
    I submitted #16892 to attempt to clarify the test case where Mark’s 
change originally failed (this PR shouldn't block on that -- that's just to 
clarify things for ourselves in the future), and also wrote a very long write 
up of what’s going on below.
    
    —————
    
    There are three relevant pieces of state to consider here:
    
    (1) The tasks that the TaskSetManager (TSM) considers currently pending.  
The TSM encodes these pending tasks in its “successful” array.  When a task 
set is launched, all of its tasks are considered pending, and all of the 
entries in the successful array are False.  Tasks are no longer considered 
pending (and are marked as True in the “successful” array) if either (a) a 
copy of the task finishes successfully or (b) a copy of the task fails with a 
fetch failed (in which case the TSM assumes that the task will never complete 
successfully, because the previous stage needs to be re-run).  Additionally, a 
task that previously completed successfully can be re-marked as pending if the 
stage is a shuffle map stage, and the executor where the task ran died (this is 
because the map output needs to be re-generated, and the TSM will re-schedule 
the task).
    
    The TSM notifies the DAGScheduler that the stage has completed if either 
(a) the stage fails (e.g., there’s a fetch failure) or (b) all of the entries 
in “successful” are true (i.e., there are no more pending tasks).
    
    (2)  ShuffleMapStage.pendingPartitions.  This variable is used by the 
DAGScheduler to track the pending tasks for a stage, and mostly is consistent 
with the TSM’s pending tasks (described above).  When a stage begins, the 
DAGScheduler marks all of the partitions that need to be computed as pending, 
and then removes them from pendingPartitions as the TSM notifies the 
DAGScheduler that tasks have successfully completed.  When a TSM determines 
that a task needs to be re-run (because it’s a shuffle map task that ran on a 
now-dead executor), the TSM sends a Resubmitted task completion event to the 
DAGScheduler, which causes the DAGScheduler to re-add the task to 
pendingPartitions (in doing so, the DAGScheduler is keeping pendingPartitions 
consistent with the TSM’s pending tasks).
    
    I believe there are two scenarios (currently) where 
ShuffleMapStage.pendingPartitions and the TSM’s pending tasks become 
inconsistent: 
    -Scenario A (performance optimization, as discussed here already): This 
happens if a ShuffleMapStage gets re-run (e.g., because the first time it ran, 
it encountered a fetch failure, so the previous stage needed to be re-run to 
generate the missing output).  Call the original attempt #0 and the currently 
running attempt #1.  If there’s a task from attempt #0 that’s still 
running, and it is running on an executor that *was not* marked as failed (this 
is the condition captured by the failedEpoch if-statement), and it completes 
successfully, this event will be handled by the TSM for attempt #0.  When the 
DAGScheduler hears that the task completed successfully, it will remove it from 
pendingPartitions (even though there’s still a running copy of this task in 
the TSM for attempt #1, which is the currently active attempt).  This allows 
the DAGScheduler to mark the stage has finished earlier than when the TSM 
thinks that the stage is finished.
    -Scenario B (bug, as discussed): This happens in the same case as scenario 
one, except that it’s when a task from attempt #0 completes successfully, but 
it’s on an executor that *was* marked as failed (again, this is the 
failedEpoch if-statement).  In this case, the DAGScheduler considers the output 
“bogus” (because the executor has since been lost, so the output is 
probably gone), but the DAGScheduler still removes the task from 
pendingPartitions.  This can cause the DAGScheduler to determine that the stage 
is complete (the shuffleStage.pendingPartitions.isEmpty) if-statement, even 
though there’s still another running copy of that task (in the TSM for 
attempt #1) that could complete successfully.  The DAGScheduler will notice an 
output is missing (“if !shuffleStage.isAvailable)” and re-submit the stage, 
leading to an exception being thrown, because there’s still an active 
TaskSetManager.  This is the root cause of the bug here, and is fixed by the 
proposed code a
 bove.
    
    (3) ShuffleMapStage.outputLocs This tracks the output locations for all of 
the tasks in a stage.  If a stage gets re-run, only the tasks that need to be 
re-run will be in the two variables above, but all of the tasks in the stage 
(including ones that have finished) will always be in outputLocs.  One use of 
this variable that’s different than the others is that outputLocs can be used 
after a stage completes and when no tasks are actively running.  For example, 
if a task in the next stage fails with a fetch failure, the output location for 
the data that triggered the failure will be removed from 
ShuffleMapStage.outputLocs.  outputLocs also may track multiple locations for a 
particular task (e.g., if two copies of the task completed successfully).
    
    As far as I understand, this will be inconsistent with pendingPartitions in 
two cases:
    - (A) Consider a task X that finishes successfully on an executor E.  Now 
suppose executor E gets marked as lost (e.g., because of another task that 
failed to fetch data from E), causing the DAGScheduler to update the epoch on 
E, and the TaskSetManager to mark X as Resubmitted.  Sometime after executor E 
is marked as lost, the TaskSetManager processes the task successful message for 
X.  The TaskSetManager still considers X to be done and marks it as successful, 
and the DAGScheduler removes the task from ShuffleMapStage.pendingTasks.  
However, because the epoch on the machine is too old (in other words, because 
the DAGScheduler knows that executor E failed sometime after task X started), 
the DAGScheduler won’t register the output location for the task.  This 
particular functionality is necessary for correctness, and will trigger the 
“if !shuffleStage.isAvailable” statement.  Task X needs to be re-run, and 
the TSM doesn’t “know” that X needs to be re-run (it thinks X co
 mpleted successfully).  If the DAGScheduler didn’t remove X from 
pendingPartitions, things would end up hanging, as @jinxing64 pointed out.  
This is the test case I improved in #16892.
    - (B) In buggy scenario B above (which should get fixed by this PR).
    
    There are some more fixes we could do to clean some of this up and make it 
easier to reason about — but in the immediate future, I think the fix at the 
top is the best way to fix the current bug.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to