Github user xuanyuanking commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21114#discussion_r187763308
  
    --- Diff: core/src/test/scala/org/apache/spark/AccumulatorSuite.scala ---
    @@ -237,6 +236,65 @@ class AccumulatorSuite extends SparkFunSuite with 
Matchers with LocalSparkContex
         acc.merge("kindness")
         assert(acc.value === "kindness")
       }
    +
    +  test("updating garbage collected accumulators") {
    +    // Simulate FetchFailedException in the first attempt to force a retry.
    +    // Then complete remaining task from the first attempt after the second
    +    // attempt started, but before it completes. Completion event for the 
first
    +    // attempt will try to update garbage collected accumulators.
    +    val numPartitions = 2
    +    sc = new SparkContext("local[2]", "test")
    +
    +    val attempt0Latch = new TestLatch("attempt0")
    +    val attempt1Latch = new TestLatch("attempt1")
    +
    +    val x = sc.parallelize(1 to 100, numPartitions).groupBy(identity)
    +    val sid = x.dependencies.head.asInstanceOf[ShuffleDependency[_, _, 
_]].shuffleHandle.shuffleId
    +    val rdd = x.mapPartitionsWithIndex { case (i, iter) =>
    +      val taskContext = TaskContext.get()
    +      if (taskContext.stageAttemptNumber() == 0) {
    +        if (i == 0) {
    +          // Fail the first task in the first stage attempt to force retry.
    +          throw new FetchFailedException(
    +            SparkEnv.get.blockManager.blockManagerId,
    +            sid,
    +            taskContext.partitionId(),
    +            taskContext.partitionId(),
    +            "simulated fetch failure")
    +        } else {
    +          // Wait till the second attempt starts.
    +          attempt0Latch.await()
    +          iter
    +        }
    +      } else {
    +        if (i == 0) {
    +          // Wait till the first attempt completes.
    +          attempt1Latch.await()
    +        }
    +        iter
    +      }
    +    }
    +
    +    sc.addSparkListener(new SparkListener {
    +      override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
    +        if (taskStart.stageId == 1 && taskStart.stageAttemptId == 1) {
    --- End diff --
    
    Should we add 'taskStart.taskInfo.index == 0' here to make sure it's the 
partition 0?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to