Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22209#discussion_r214411084 --- Diff: core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala --- @@ -1190,6 +1190,61 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { assert(appStore.asOption(appStore.lastStageAttempt(3)) === None) } + test("SPARK-24415: update metrics for tasks that finish late") { + val listener = new AppStatusListener(store, conf, true) + + val stage1 = new StageInfo(1, 0, "stage1", 4, Nil, Nil, "details1") + val stage2 = new StageInfo(2, 0, "stage2", 4, Nil, Nil, "details2") + + // Start job + listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage1, stage2), null)) + + // Start 2 stages + listener.onStageSubmitted(SparkListenerStageSubmitted(stage1, new Properties())) + listener.onStageSubmitted(SparkListenerStageSubmitted(stage2, new Properties())) + + // Start 2 Tasks + val tasks = createTasks(2, Array("1")) + tasks.foreach { task => + listener.onTaskStart(SparkListenerTaskStart(stage1.stageId, stage1.attemptNumber, task)) + } + + // Task 1 Finished + time += 1 + tasks(0).markFinished(TaskState.FINISHED, time) + listener.onTaskEnd( + SparkListenerTaskEnd(stage1.stageId, stage1.attemptId, "taskType", Success, tasks(0), null)) + + // Stage 1 Completed + stage1.failureReason = Some("Failed") + listener.onStageCompleted(SparkListenerStageCompleted(stage1)) + + // Stop job 1 + time += 1 + listener.onJobEnd(SparkListenerJobEnd(1, time, JobSucceeded)) + + // Task 2 Killed + time += 1 + tasks(1).markFinished(TaskState.FINISHED, time) + listener.onTaskEnd( + SparkListenerTaskEnd(stage1.stageId, stage1.attemptId, "taskType", + TaskKilled(reason = "Killed"), tasks(1), null)) + + // Ensure killed task metrics are updated + val allStages = store.view(classOf[StageDataWrapper]).reverse().asScala.map(_.info) + val failedStages = allStages.filter(_.status == v1.StageStatus.FAILED) + assert(failedStages.size == 1) + assert(failedStages.head.numKilledTasks == 1) + assert(failedStages.head.numCompleteTasks == 1) + + val allJobs = store.view(classOf[JobDataWrapper]).reverse().asScala.map(_.info) + assert(allJobs.size == 1) --- End diff -- > I am only trying to fix any issues which are caused by out-of-order events Well, the job metrics not being updated is an out-of-order issue. The code in `onTaskEnd` that updates the job is this: ``` stage.jobs.foreach { job => ... maybeUpdate(job, now) } ``` That `maybeUpdate` call, with default settings, may not write the job data to the store, and you'll end up with out-of-date info on the UI. This will only happen on live apps, since the SHS flushes everything at the end.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org