Github user markhamstra commented on a diff in the pull request:

    https://github.com/apache/spark/pull/15335#discussion_r83710471
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
    @@ -1255,27 +1255,46 @@ class DAGScheduler(
                   s"longer running")
               }
     
    -          if (disallowStageRetryForTest) {
    -            abortStage(failedStage, "Fetch failure will not retry stage 
due to testing config",
    -              None)
    -          } else if 
(failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId)) {
    -            abortStage(failedStage, s"$failedStage (${failedStage.name}) " 
+
    -              s"has failed the maximum allowable number of " +
    -              s"times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. " +
    -              s"Most recent failure reason: ${failureMessage}", None)
    -          } else {
    -            if (failedStages.isEmpty) {
    -              // Don't schedule an event to resubmit failed stages if 
failed isn't empty, because
    -              // in that case the event will already have been scheduled.
    -              // TODO: Cancel running tasks in the stage
    -              logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " +
    -                s"$failedStage (${failedStage.name}) due to fetch failure")
    -              messageScheduler.schedule(new Runnable {
    -                override def run(): Unit = 
eventProcessLoop.post(ResubmitFailedStages)
    -              }, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS)
    +          val shouldAbortStage =
    +            failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId) ||
    +            disallowStageRetryForTest
    +
    +          if (shouldAbortStage) {
    +            val abortMessage = if (disallowStageRetryForTest) {
    +              "Fetch failure will not retry stage due to testing config"
    +            } else {
    +              s"""$failedStage (${failedStage.name})
    +                 |has failed the maximum allowable number of
    +                 |times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}.
    +                 |Most recent failure reason: 
$failureMessage""".stripMargin.replaceAll("\n", " ")
                 }
    +            abortStage(failedStage, abortMessage, None)
    +          } else { // update failedStages and make sure a 
ResubmitFailedStages event is enqueued
    +            // TODO: Cancel running tasks in the failed stage -- cf. 
SPARK-17064
    +            val noResubmitEnqueued = !failedStages.contains(failedStage)
    --- End diff --
    
    Ok, but it's really not that complicated or difficult to understand.
    
    There is only one way to add stages to `failedStages`: within the 
`FetchFailed` case.  When a `failedStage` is added to `failedStages`, it is 
always accompanied by the parent `mapStage`.
    
    There are only two ways to remove stages from `failedStages`: 1) within the 
handling of a `ResubmitFailedStages` event, when the entire `failedStages` is 
cleared; 2) within `cleanupStateForJobAndIndependentStages` when we call 
`removeStage`.  Obviously, 1) can't produce a state where `mapStage` is not in 
`failedStage` while a corresponding `failedStage` is, so the only logic we need 
to concern ourselves with is in 2).
    
    In order for 2) to produce a state where `mapStage` is absent from 
`failedStages` while an associated `failedStage` is present, `removeStage` 
would need to have been called on the `mapStage` while not being called on the 
`failedStage`.  But that can't happen because `removeStage` will not be called 
on a stage unless no Job needs that stage anymore.  If no job needs the 
`mapStage`, then no job can need a `failedStage` that uses the output of that 
`mapStage` -- i.e. it is not possible that a `mapStage` will be removed in 
`cleanupStateForJobAndIndependentStages` unless every associated `failedStage` 
will also be removed.
    
    Conclusion: It is never possible for `mapStage` to be absent from 
`failedStages` at the same time that `failedStages` is present, so the proposed 
`|| !failedStages.contains(mapStage)` condition will never be checked -- it 
would just be unreachable and misleading code.
    
    There also isn't really any need for concern over lack of tests.  There is 
no need to prove correctness of the current code for something that can't 
happen presently, so the only point of such a test would be to guard against 
some future mistaken change making it possible to remove a failed `mapStage` 
while some `failedStage` still needs it.  If that happens, then we've got far 
bigger problems than checking whether we need to issue a new 
`ResubmitFailedStages` event, and checks for that kind of broken removal of 
parents while their children are still depending onthem should be covered in 
the tests of `cleanupStateForJobAndIndependentStages`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to