Github user advancedxy commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20082#discussion_r158773445
  
    --- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala ---
    @@ -158,6 +159,28 @@ class TaskContextSuite extends SparkFunSuite with 
BeforeAndAfter with LocalSpark
         assert(attemptIdsWithFailedTask.toSet === Set(0, 1))
       }
     
    +  test("TaskContext.stageAttemptId getter") {
    +    sc = new SparkContext("local[1,2]", "test")
    +
    +    // Check stage attemptIds are 0 for initial stage
    +    val stageAttemptIds = sc.parallelize(Seq(1, 2), 2).mapPartitions { _ =>
    +      Seq(TaskContext.get().stageAttemptId()).iterator
    +    }.collect()
    +    assert(stageAttemptIds.toSet === Set(0))
    +
    +    // Check stage attemptIds that are resubmitted when task fails
    +    val stageAttemptIdsWithFailedStage =
    +      sc.parallelize(Seq(1, 2, 3, 4), 4).repartition(1).mapPartitions { _ 
=>
    +      val stageAttemptId = TaskContext.get().stageAttemptId()
    +      if (stageAttemptId < 2) {
    +        throw new FetchFailedException(null, 0, 0, 0, "Fake")
    --- End diff --
    
    Related to repartition part.
    
    I use FetchFailedException to explicitly trigger a stage resubmission.  
Otherwise, the task would be resubmitted in the same stage if IIRC.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to