Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/15335#discussion_r84365187 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1255,27 +1255,46 @@ class DAGScheduler( s"longer running") } - if (disallowStageRetryForTest) { - abortStage(failedStage, "Fetch failure will not retry stage due to testing config", - None) - } else if (failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId)) { - abortStage(failedStage, s"$failedStage (${failedStage.name}) " + - s"has failed the maximum allowable number of " + - s"times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. " + - s"Most recent failure reason: ${failureMessage}", None) - } else { - if (failedStages.isEmpty) { - // Don't schedule an event to resubmit failed stages if failed isn't empty, because - // in that case the event will already have been scheduled. - // TODO: Cancel running tasks in the stage - logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " + - s"$failedStage (${failedStage.name}) due to fetch failure") - messageScheduler.schedule(new Runnable { - override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages) - }, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS) + val shouldAbortStage = + failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId) || + disallowStageRetryForTest + + if (shouldAbortStage) { + val abortMessage = if (disallowStageRetryForTest) { + "Fetch failure will not retry stage due to testing config" + } else { + s"""$failedStage (${failedStage.name}) + |has failed the maximum allowable number of + |times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. + |Most recent failure reason: $failureMessage""".stripMargin.replaceAll("\n", " ") } + abortStage(failedStage, abortMessage, None) + } else { // update failedStages and make sure a ResubmitFailedStages event is enqueued + // TODO: Cancel running tasks in the failed stage -- cf. SPARK-17064 + val noResubmitEnqueued = !failedStages.contains(failedStage) --- End diff -- ok, I agree with your logic ... maybe not the "obviously" part, I see myself pondering this question again when I look at the code in the future, but I can't even think of a succinct comment that would help. Maybe the thing which really bothers me here is that we bother with `mapStage` at all in this error handling. It seems totally unnecessary -- it'll get resubmitted transitivitely by `failedStage`. We need to rely on transitive resubmission anyway, since you might need to also resubmit the parents of `mapStage`. But, also orthogonal to the cleanup you are proposing here.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org