Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/15335#discussion_r84365187
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
    @@ -1255,27 +1255,46 @@ class DAGScheduler(
                   s"longer running")
               }
     
    -          if (disallowStageRetryForTest) {
    -            abortStage(failedStage, "Fetch failure will not retry stage 
due to testing config",
    -              None)
    -          } else if 
(failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId)) {
    -            abortStage(failedStage, s"$failedStage (${failedStage.name}) " 
+
    -              s"has failed the maximum allowable number of " +
    -              s"times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. " +
    -              s"Most recent failure reason: ${failureMessage}", None)
    -          } else {
    -            if (failedStages.isEmpty) {
    -              // Don't schedule an event to resubmit failed stages if 
failed isn't empty, because
    -              // in that case the event will already have been scheduled.
    -              // TODO: Cancel running tasks in the stage
    -              logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " +
    -                s"$failedStage (${failedStage.name}) due to fetch failure")
    -              messageScheduler.schedule(new Runnable {
    -                override def run(): Unit = 
eventProcessLoop.post(ResubmitFailedStages)
    -              }, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS)
    +          val shouldAbortStage =
    +            failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId) ||
    +            disallowStageRetryForTest
    +
    +          if (shouldAbortStage) {
    +            val abortMessage = if (disallowStageRetryForTest) {
    +              "Fetch failure will not retry stage due to testing config"
    +            } else {
    +              s"""$failedStage (${failedStage.name})
    +                 |has failed the maximum allowable number of
    +                 |times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}.
    +                 |Most recent failure reason: 
$failureMessage""".stripMargin.replaceAll("\n", " ")
                 }
    +            abortStage(failedStage, abortMessage, None)
    +          } else { // update failedStages and make sure a 
ResubmitFailedStages event is enqueued
    +            // TODO: Cancel running tasks in the failed stage -- cf. 
SPARK-17064
    +            val noResubmitEnqueued = !failedStages.contains(failedStage)
    --- End diff --
    
    ok, I agree with your logic ... maybe not the "obviously" part, I see 
myself pondering this question again when I look at the code in the future, but 
I can't even think of a succinct comment that would help.
    
    Maybe the thing which really bothers me here is that we bother with 
`mapStage` at all in this error handling.  It seems totally unnecessary -- 
it'll get resubmitted transitivitely by `failedStage`.  We need to rely on 
transitive resubmission anyway, since you might need to also resubmit the 
parents of `mapStage`.  But, also orthogonal to the cleanup you are proposing 
here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to