Github user advancedxy commented on a diff in the pull request: https://github.com/apache/spark/pull/20082#discussion_r158773445 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala --- @@ -158,6 +159,28 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark assert(attemptIdsWithFailedTask.toSet === Set(0, 1)) } + test("TaskContext.stageAttemptId getter") { + sc = new SparkContext("local[1,2]", "test") + + // Check stage attemptIds are 0 for initial stage + val stageAttemptIds = sc.parallelize(Seq(1, 2), 2).mapPartitions { _ => + Seq(TaskContext.get().stageAttemptId()).iterator + }.collect() + assert(stageAttemptIds.toSet === Set(0)) + + // Check stage attemptIds that are resubmitted when task fails + val stageAttemptIdsWithFailedStage = + sc.parallelize(Seq(1, 2, 3, 4), 4).repartition(1).mapPartitions { _ => + val stageAttemptId = TaskContext.get().stageAttemptId() + if (stageAttemptId < 2) { + throw new FetchFailedException(null, 0, 0, 0, "Fake") --- End diff -- Related to repartition part. I use FetchFailedException to explicitly trigger a stage resubmission. Otherwise, the task would be resubmitted in the same stage if IIRC.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org