This is an automated email from the ASF dual-hosted git repository. mridulm80 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 9c7aa90c771 [SPARK-42577][CORE] Add max attempts limitation for stages to avoid potential infinite retry 9c7aa90c771 is described below commit 9c7aa90c771868da727073f9941b8b2c4b856946 Author: Tengfei Huang <tengfe...@gmail.com> AuthorDate: Mon Mar 13 02:22:49 2023 -0500 [SPARK-42577][CORE] Add max attempts limitation for stages to avoid potential infinite retry ### What changes were proposed in this pull request? Currently a stage will be resubmitted in a few scenarios: 1. Task failed with `FetchFailed` will trigger stage re-submit; 2. Barrier task failed; 3. Shuffle data loss due to executor/host decommissioned; For the first 2 scenarios, there is a config `spark.stage.maxConsecutiveAttempts` to limit the retry times. While for the 3rd scenario, there'll be potential risks for inifinite retry if there are always executors hosting the shuffle data from successful tasks got killed/lost, the stage will be re-run again and again. To avoid the potential risk, the proposal in this PR is to add a new config `spark.stage.maxConsecutiveAttempts` to limit the overall max attempts number for each stage, the stage will be aborted once the retry times beyond the limitation. ### Why are the changes needed? To avoid the potential risks for stage infinite retry. ### Does this PR introduce _any_ user-facing change? Added limitation for stage retry times, so jobs may fail if they need to retry for mutiplte times beyond the limitation. ### How was this patch tested? Added new UT. Closes #40286 from ivoson/SPARK-42577. Authored-by: Tengfei Huang <tengfe...@gmail.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> --- .../org/apache/spark/internal/config/package.scala | 10 ++++ .../org/apache/spark/scheduler/DAGScheduler.scala | 30 ++++++++---- .../scala/org/apache/spark/scheduler/Stage.scala | 1 + .../apache/spark/scheduler/DAGSchedulerSuite.scala | 56 +++++++++++++++++++++- 4 files changed, 87 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 99f9b78c09b..7f93bf76216 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -2479,4 +2479,14 @@ package object config { .version("3.5.0") .booleanConf .createWithDefault(false) + + private[spark] val STAGE_MAX_ATTEMPTS = + ConfigBuilder("spark.stage.maxAttempts") + .doc("Specify the max attempts for a stage - the spark job will be aborted if any of its " + + "stages is resubmitted multiple times beyond the max retries limitation. The maximum " + + "number of stage retries is the maximum of `spark.stage.maxAttempts` and " + + "`spark.stage.maxConsecutiveAttempts`.") + .version("3.5.0") + .intConf + .createWithDefault(Int.MaxValue) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 1a1f0cbba7f..cc018ac6aec 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -232,6 +232,13 @@ private[spark] class DAGScheduler( sc.getConf.getInt("spark.stage.maxConsecutiveAttempts", DAGScheduler.DEFAULT_MAX_CONSECUTIVE_STAGE_ATTEMPTS) + /** + * Max stage attempts allowed before a stage is aborted. + */ + private[scheduler] val maxStageAttempts: Int = { + Math.max(maxConsecutiveStageAttempts, sc.getConf.get(config.STAGE_MAX_ATTEMPTS)) + } + /** * Whether ignore stage fetch failure caused by executor decommission when * count spark.stage.maxConsecutiveAttempts @@ -1355,16 +1362,23 @@ private[spark] class DAGScheduler( logDebug(s"submitStage($stage (name=${stage.name};" + s"jobs=${stage.jobIds.toSeq.sorted.mkString(",")}))") if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) { - val missing = getMissingParentStages(stage).sortBy(_.id) - logDebug("missing: " + missing) - if (missing.isEmpty) { - logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents") - submitMissingTasks(stage, jobId.get) + if (stage.getNextAttemptId >= maxStageAttempts) { + val reason = s"$stage (name=${stage.name}) has been resubmitted for the maximum " + + s"allowable number of times: ${maxStageAttempts}, which is the max value of " + + s"config `spark.stage.maxAttempts` and `spark.stage.maxConsecutiveAttempts`." + abortStage(stage, reason, None) } else { - for (parent <- missing) { - submitStage(parent) + val missing = getMissingParentStages(stage).sortBy(_.id) + logDebug("missing: " + missing) + if (missing.isEmpty) { + logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents") + submitMissingTasks(stage, jobId.get) + } else { + for (parent <- missing) { + submitStage(parent) + } + waitingStages += stage } - waitingStages += stage } } } else { diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala index 97072115ff8..f35beafd874 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala @@ -70,6 +70,7 @@ private[scheduler] abstract class Stage( /** The ID to use for the next new attempt for this stage. */ private var nextAttemptId: Int = 0 + private[scheduler] def getNextAttemptId: Int = nextAttemptId val name: String = callSite.shortForm val details: String = callSite.longForm diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index cc6562ef017..d441abe2233 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -405,13 +405,13 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti mapOutputTracker = spy(new MyMapOutputTrackerMaster(sc.getConf, broadcastManager)) blockManagerMaster = spy(new MyBlockManagerMaster(sc.getConf)) doNothing().when(blockManagerMaster).updateRDDBlockVisibility(any(), any()) - scheduler = new MyDAGScheduler( + scheduler = spy(new MyDAGScheduler( sc, taskScheduler, sc.listenerBus, mapOutputTracker, blockManagerMaster, - sc.env) + sc.env)) dagEventProcessLoopTester = new DAGSchedulerEventProcessLoopTester(scheduler) } @@ -4595,6 +4595,58 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti } } + test("SPARK-42577: fail the job if a shuffle map stage attempts beyond the limitation") { + setupStageAbortTest(sc) + doAnswer(_ => 2).when(scheduler).maxStageAttempts + + val shuffleMapRdd = new MyRDD(sc, 2, Nil) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(1)) + val shuffleId = shuffleDep.shuffleId + val reduceRdd = new MyRDD(sc, 1, List(shuffleDep), tracker = mapOutputTracker) + submit(reduceRdd, Array(0)) + + // Stage 0 got scheduled with 2 tasks. + assert(taskSets.size === 1 && taskSets(0).tasks.length === 2) + assert(taskSets(0).stageId === 0) + val stage0 = scheduler.stageIdToStage(0) + + // Task 0 of stage 0 finished successfully on hostA and then executor on hostA got killed and + // shuffle data got lost. Then task 1 of stage 0 finished successfully on hostB. Stage 0 will + // be resubmitted due to shuffle data lost. + runEvent(makeCompletionEvent(taskSets(0).tasks(0), Success, + makeMapStatus("hostA", reduces = 1, mapTaskId = 0), + Seq.empty, Array.empty, createFakeTaskInfoWithId(0))) + runEvent(ExecutorLost("hostA-exec", ExecutorKilled)) + runEvent(makeCompletionEvent(taskSets(0).tasks(1), Success, + makeMapStatus("hostB", reduces = 1, mapTaskId = 1), + Seq.empty, Array.empty, createFakeTaskInfoWithId(1))) + assert(taskSets.size === 2 && taskSets(1).tasks.length === 1) + assert(taskSets(1).stageId === 0 && taskSets(1).stageAttemptId === 1) + + // Executor on hostB got killed so that shuffle data from task 1 will be lost, after the + // resubmitted task completes stage 0 will be resubmitted again due to shuffle data missing. + // While because of the 2 times stage max attempts limitation, the job should be aborted. + runEvent(ExecutorLost("hostB-exec", ExecutorKilled)) + runEvent(makeCompletionEvent(taskSets(1).tasks(0), Success, + makeMapStatus("hostC", reduces = 1, mapTaskId = 2), + Seq.empty, Array.empty, createFakeTaskInfoWithId(2))) + + // Stage should have been aborted and removed from running stages + assertDataStructuresEmpty() + sc.listenerBus.waitUntilEmpty() + assert(ended) + + val expectedMsg = s"$stage0 (name=${stage0.name}) has been resubmitted for the maximum " + + s"allowable number of times: 2, which is the max value of " + + s"config `spark.stage.maxAttempts` and `spark.stage.maxConsecutiveAttempts`." + + jobResult match { + case JobFailed(reason) => + assert(reason.getMessage.contains(expectedMsg)) + case other => fail(s"expected JobFailed, not $other") + } + } + /** * Assert that the supplied TaskSet has exactly the given hosts as its preferred locations. * Note that this checks only the host and not the executor ID. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org