Github user gengliangwang commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23038#discussion_r234728748
  
    --- Diff: 
core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala ---
    @@ -1275,6 +1275,49 @@ class AppStatusListenerSuite extends SparkFunSuite 
with BeforeAndAfter {
         assert(allJobs.head.numFailedStages == 1)
       }
     
    +  test("SPARK-25451: total tasks in the executor summary should match 
total stage tasks") {
    +    val testConf = conf.clone.set(LIVE_ENTITY_UPDATE_PERIOD, Long.MaxValue)
    +
    +    val listener = new AppStatusListener(store, testConf, true)
    +
    +    val stage = new StageInfo(1, 0, "stage", 4, Nil, Nil, "details")
    +    listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage), null))
    +    listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new 
Properties()))
    +
    +    val tasks = createTasks(4, Array("1", "2"))
    +    tasks.foreach { task =>
    +      listener.onTaskStart(SparkListenerTaskStart(stage.stageId, 
stage.attemptNumber, task))
    +    }
    +
    +    time += 1
    +    tasks(0).markFinished(TaskState.FAILED, time)
    +    listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, 
stage.attemptId, "taskType",
    +      ExecutorLostFailure("1", true, Some("Lost executor")), tasks(0), 
null))
    +    time += 1
    +    tasks(1).markFinished(TaskState.FAILED, time)
    +    listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, 
stage.attemptId, "taskType",
    +      ExecutorLostFailure("2", true, Some("Lost executor")), tasks(1), 
null))
    +
    +    stage.failureReason = Some("Failed")
    +    listener.onStageCompleted(SparkListenerStageCompleted(stage))
    +    time += 1
    +    listener.onJobEnd(SparkListenerJobEnd(1, time, JobFailed(new 
RuntimeException("Bad Executor"))))
    +
    +    time += 1
    +    tasks(2).markFinished(TaskState.FAILED, time)
    +    listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, 
stage.attemptId, "taskType",
    +      ExecutorLostFailure("1", true, Some("Lost executor")), tasks(2), 
null))
    +    time += 1
    +    tasks(3).markFinished(TaskState.FAILED, time)
    +    listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, 
stage.attemptId, "taskType",
    +      ExecutorLostFailure("2", true, Some("Lost executor")), tasks(3), 
null))
    +
    +    val esummary = 
store.view(classOf[ExecutorStageSummaryWrapper]).asScala.map(_.info)
    +    esummary.foreach { execSummary =>
    +      assert(execSummary.failedTasks == 2)
    --- End diff --
    
    Nit: also check `succeededTasks` and `killedTasks`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to