Github user shahidki31 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23038#discussion_r234399022
  
    --- Diff: 
core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala ---
    @@ -1275,6 +1275,49 @@ class AppStatusListenerSuite extends SparkFunSuite 
with BeforeAndAfter {
         assert(allJobs.head.numFailedStages == 1)
       }
     
    +  test("SPARK-25451: total tasks in the executor summary should match 
total stage tasks") {
    +    val testConf = conf.clone()
    +      .set("spark.ui.liveUpdate.period", s"${Int.MaxValue}s")
    +
    +    val listener = new AppStatusListener(store, testConf, true)
    +
    +    val stage = new StageInfo(1, 0, "stage", 4, Nil, Nil, "details")
    +    listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage), null))
    +    listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new 
Properties()))
    +
    +    val tasks = createTasks(4, Array("1", "2"))
    +    tasks.foreach { task =>
    +      listener.onTaskStart(SparkListenerTaskStart(stage.stageId, 
stage.attemptNumber, task))
    +    }
    +
    +    tasks.filter(_.index < 2).foreach { task =>
    +        time += 1
    +        var execId = (task.index % 2 + 1).toString
    +        tasks(task.index).markFinished(TaskState.FAILED, time)
    +        listener.onTaskEnd(
    +          SparkListenerTaskEnd(stage.stageId, stage.attemptId, "taskType",
    +            ExecutorLostFailure(execId, true, Some("Lost executor")), 
tasks(task.index), null))
    +    }
    +
    +    stage.failureReason = Some("Failed")
    +    listener.onStageCompleted(SparkListenerStageCompleted(stage))
    +    time += 1
    +    listener.onJobEnd(SparkListenerJobEnd(1, time, JobFailed(new 
RuntimeException("Bad Executor"))))
    +
    +    tasks.filter(_.index >= 2).foreach { task =>
    +        time += 1
    +        var execId = (task.index % 2 + 1).toString
    +        tasks(task.index).markFinished(TaskState.FAILED, time)
    +        listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, 
stage.attemptId, "taskType",
    +          ExecutorLostFailure(execId, true, Some("Lost executor")), 
tasks(task.index), null))
    +    }
    +
    +    val esummary = 
store.view(classOf[ExecutorStageSummaryWrapper]).asScala.map(_.info)
    +    esummary.foreach {
    +      execSummary => assert(execSummary.failedTasks == 2)
    --- End diff --
    
    Done


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to