Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21114#discussion_r187763308 --- Diff: core/src/test/scala/org/apache/spark/AccumulatorSuite.scala --- @@ -237,6 +236,65 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex acc.merge("kindness") assert(acc.value === "kindness") } + + test("updating garbage collected accumulators") { + // Simulate FetchFailedException in the first attempt to force a retry. + // Then complete remaining task from the first attempt after the second + // attempt started, but before it completes. Completion event for the first + // attempt will try to update garbage collected accumulators. + val numPartitions = 2 + sc = new SparkContext("local[2]", "test") + + val attempt0Latch = new TestLatch("attempt0") + val attempt1Latch = new TestLatch("attempt1") + + val x = sc.parallelize(1 to 100, numPartitions).groupBy(identity) + val sid = x.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleHandle.shuffleId + val rdd = x.mapPartitionsWithIndex { case (i, iter) => + val taskContext = TaskContext.get() + if (taskContext.stageAttemptNumber() == 0) { + if (i == 0) { + // Fail the first task in the first stage attempt to force retry. + throw new FetchFailedException( + SparkEnv.get.blockManager.blockManagerId, + sid, + taskContext.partitionId(), + taskContext.partitionId(), + "simulated fetch failure") + } else { + // Wait till the second attempt starts. + attempt0Latch.await() + iter + } + } else { + if (i == 0) { + // Wait till the first attempt completes. + attempt1Latch.await() + } + iter + } + } + + sc.addSparkListener(new SparkListener { + override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { + if (taskStart.stageId == 1 && taskStart.stageAttemptId == 1) { --- End diff -- Should we add 'taskStart.taskInfo.index == 0' here to make sure it's the partition 0?
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org