Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22209#discussion_r214187951
  
    --- Diff: 
core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala ---
    @@ -1190,6 +1190,61 @@ class AppStatusListenerSuite extends SparkFunSuite 
with BeforeAndAfter {
         assert(appStore.asOption(appStore.lastStageAttempt(3)) === None)
       }
     
    +  test("SPARK-24415: update metrics for tasks that finish late") {
    +    val listener = new AppStatusListener(store, conf, true)
    +
    +    val stage1 = new StageInfo(1, 0, "stage1", 4, Nil, Nil, "details1")
    +    val stage2 = new StageInfo(2, 0, "stage2", 4, Nil, Nil, "details2")
    +
    +    // Start job
    +    listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage1, 
stage2), null))
    +
    +    // Start 2 stages
    +    listener.onStageSubmitted(SparkListenerStageSubmitted(stage1, new 
Properties()))
    +    listener.onStageSubmitted(SparkListenerStageSubmitted(stage2, new 
Properties()))
    +
    +    // Start 2 Tasks
    +    val tasks = createTasks(2, Array("1"))
    +    tasks.foreach { task =>
    +      listener.onTaskStart(SparkListenerTaskStart(stage1.stageId, 
stage1.attemptNumber, task))
    +    }
    +
    +    // Task 1 Finished
    +    time += 1
    +    tasks(0).markFinished(TaskState.FINISHED, time)
    +    listener.onTaskEnd(
    +      SparkListenerTaskEnd(stage1.stageId, stage1.attemptId, "taskType", 
Success, tasks(0), null))
    +
    +    // Stage 1 Completed
    +    stage1.failureReason = Some("Failed")
    +    listener.onStageCompleted(SparkListenerStageCompleted(stage1))
    +
    +    // Stop job 1
    +    time += 1
    +    listener.onJobEnd(SparkListenerJobEnd(1, time, JobSucceeded))
    +
    +    // Task 2 Killed
    +    time += 1
    +    tasks(1).markFinished(TaskState.FINISHED, time)
    +    listener.onTaskEnd(
    +      SparkListenerTaskEnd(stage1.stageId, stage1.attemptId, "taskType",
    +        TaskKilled(reason = "Killed"), tasks(1), null))
    +
    +    // Ensure killed task metrics are updated
    +    val allStages = 
store.view(classOf[StageDataWrapper]).reverse().asScala.map(_.info)
    +    val failedStages = allStages.filter(_.status == v1.StageStatus.FAILED)
    +    assert(failedStages.size == 1)
    +    assert(failedStages.head.numKilledTasks == 1)
    +    assert(failedStages.head.numCompleteTasks == 1)
    +
    +    val allJobs = 
store.view(classOf[JobDataWrapper]).reverse().asScala.map(_.info)
    +    assert(allJobs.size == 1)
    --- End diff --
    
    I was a little puzzled about why this test was working. Turns out that job 
metrics are updated in `onTaskEnd` based on the jobs that the stage is tracking.
    
    The weirdness if because above, in your test, you're ending the job before 
the task end event, but the job is still being updated. That's because of the 
above, and because `LIVE_ENTITY_UPDATE_PERIOD` is `0` (disabled) in the tests.
    
    So in a real app you could still miss the updates to the job metrics, since 
the `maybeUpdate` call in the `onTaskEnd` handler may skip updating the job, 
and since it's not tracked anymore, it won't be flushed.
    
    Anyway, this is probably minor and could be a separate fix. Could you file 
a separate bug to audit remaining event order issues in this code (like the 
above), and also what happens when events are dropped?
    
    Unless you want to fix that here.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to