Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22209#discussion_r215343938
  
    --- Diff: 
core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala ---
    @@ -1190,6 +1190,61 @@ class AppStatusListenerSuite extends SparkFunSuite 
with BeforeAndAfter {
         assert(appStore.asOption(appStore.lastStageAttempt(3)) === None)
       }
     
    +  test("SPARK-24415: update metrics for tasks that finish late") {
    +    val listener = new AppStatusListener(store, conf, true)
    +
    +    val stage1 = new StageInfo(1, 0, "stage1", 4, Nil, Nil, "details1")
    +    val stage2 = new StageInfo(2, 0, "stage2", 4, Nil, Nil, "details2")
    +
    +    // Start job
    +    listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage1, 
stage2), null))
    +
    +    // Start 2 stages
    +    listener.onStageSubmitted(SparkListenerStageSubmitted(stage1, new 
Properties()))
    +    listener.onStageSubmitted(SparkListenerStageSubmitted(stage2, new 
Properties()))
    +
    +    // Start 2 Tasks
    +    val tasks = createTasks(2, Array("1"))
    +    tasks.foreach { task =>
    +      listener.onTaskStart(SparkListenerTaskStart(stage1.stageId, 
stage1.attemptNumber, task))
    +    }
    +
    +    // Task 1 Finished
    +    time += 1
    +    tasks(0).markFinished(TaskState.FINISHED, time)
    +    listener.onTaskEnd(
    +      SparkListenerTaskEnd(stage1.stageId, stage1.attemptId, "taskType", 
Success, tasks(0), null))
    +
    +    // Stage 1 Completed
    +    stage1.failureReason = Some("Failed")
    +    listener.onStageCompleted(SparkListenerStageCompleted(stage1))
    +
    +    // Stop job 1
    +    time += 1
    +    listener.onJobEnd(SparkListenerJobEnd(1, time, JobSucceeded))
    +
    +    // Task 2 Killed
    +    time += 1
    +    tasks(1).markFinished(TaskState.FINISHED, time)
    +    listener.onTaskEnd(
    +      SparkListenerTaskEnd(stage1.stageId, stage1.attemptId, "taskType",
    +        TaskKilled(reason = "Killed"), tasks(1), null))
    +
    +    // Ensure killed task metrics are updated
    +    val allStages = 
store.view(classOf[StageDataWrapper]).reverse().asScala.map(_.info)
    +    val failedStages = allStages.filter(_.status == v1.StageStatus.FAILED)
    +    assert(failedStages.size == 1)
    +    assert(failedStages.head.numKilledTasks == 1)
    +    assert(failedStages.head.numCompleteTasks == 1)
    +
    +    val allJobs = 
store.view(classOf[JobDataWrapper]).reverse().asScala.map(_.info)
    +    assert(allJobs.size == 1)
    --- End diff --
    
    Ah, missed that. Took another look and it should be fine.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to