Repository: spark
Updated Branches:
  refs/heads/master 76ef02e49 -> fbf62b710


[SPARK-25451][SPARK-26100][CORE] Aggregated metrics table doesn't show the 
right number of the total tasks

Total tasks in the aggregated table and the tasks table are not matching some 
times in the WEBUI.
We need to force update the executor summary of the particular executorId, when 
ever last task of that executor has reached. Currently it force update based on 
last task on the stage end. So, for some particular executorId task might miss 
at the stage end.

Tests to reproduce:
```
bin/spark-shell --master yarn --conf spark.executor.instances=3
sc.parallelize(1 to 10000, 10).map{ x => throw new RuntimeException("Bad 
executor")}.collect()
```
Before patch:
![screenshot from 2018-11-15 
02-24-05](https://user-images.githubusercontent.com/23054875/48511776-b0d36480-e87d-11e8-89a8-ab97216e2c21.png)

After patch:
![screenshot from 2018-11-15 
02-32-38](https://user-images.githubusercontent.com/23054875/48512141-c39a6900-e87e-11e8-8535-903e1d11d13e.png)

Closes #23038 from shahidki31/SPARK-25451.

Authored-by: Shahid <shahidk...@gmail.com>
Signed-off-by: Marcelo Vanzin <van...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fbf62b71
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fbf62b71
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fbf62b71

Branch: refs/heads/master
Commit: fbf62b7100be992cbc4eb67e154682db6c91e60e
Parents: 76ef02e
Author: Shahid <shahidk...@gmail.com>
Authored: Mon Nov 26 13:13:06 2018 -0800
Committer: Marcelo Vanzin <van...@cloudera.com>
Committed: Mon Nov 26 13:13:45 2018 -0800

----------------------------------------------------------------------
 .../apache/spark/status/AppStatusListener.scala | 19 ++++++++-
 .../org/apache/spark/status/LiveEntity.scala    |  2 +
 .../spark/status/AppStatusListenerSuite.scala   | 45 ++++++++++++++++++++
 3 files changed, 64 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/fbf62b71/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala 
b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
index 81d39e0..8e84557 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
@@ -473,6 +473,7 @@ private[spark] class AppStatusListener(
       val locality = event.taskInfo.taskLocality.toString()
       val count = stage.localitySummary.getOrElse(locality, 0L) + 1L
       stage.localitySummary = stage.localitySummary ++ Map(locality -> count)
+      stage.activeTasksPerExecutor(event.taskInfo.executorId) += 1
       maybeUpdate(stage, now)
 
       stage.jobs.foreach { job =>
@@ -558,6 +559,7 @@ private[spark] class AppStatusListener(
       if (killedDelta > 0) {
         stage.killedSummary = killedTasksSummary(event.reason, 
stage.killedSummary)
       }
+      stage.activeTasksPerExecutor(event.taskInfo.executorId) -= 1
       // [SPARK-24415] Wait for all tasks to finish before removing stage from 
live list
       val removeStage =
         stage.activeTasks == 0 &&
@@ -582,7 +584,11 @@ private[spark] class AppStatusListener(
         if (killedDelta > 0) {
           job.killedSummary = killedTasksSummary(event.reason, 
job.killedSummary)
         }
-        conditionalLiveUpdate(job, now, removeStage)
+        if (removeStage) {
+          update(job, now)
+        } else {
+          maybeUpdate(job, now)
+        }
       }
 
       val esummary = stage.executorSummary(event.taskInfo.executorId)
@@ -593,7 +599,16 @@ private[spark] class AppStatusListener(
       if (metricsDelta != null) {
         esummary.metrics = LiveEntityHelpers.addMetrics(esummary.metrics, 
metricsDelta)
       }
-      conditionalLiveUpdate(esummary, now, removeStage)
+
+      val isLastTask = stage.activeTasksPerExecutor(event.taskInfo.executorId) 
== 0
+
+      // If the last task of the executor finished, then update the esummary
+      // for both live and history events.
+      if (isLastTask) {
+        update(esummary, now)
+      } else {
+        maybeUpdate(esummary, now)
+      }
 
       if (!stage.cleaning && stage.savedTasks.get() > maxTasksPerStage) {
         stage.cleaning = true

http://git-wip-us.apache.org/repos/asf/spark/blob/fbf62b71/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala 
b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
index 8066331..47e45a6 100644
--- a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
+++ b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
@@ -376,6 +376,8 @@ private class LiveStage extends LiveEntity {
 
   val executorSummaries = new HashMap[String, LiveExecutorStageSummary]()
 
+  val activeTasksPerExecutor = new HashMap[String, Int]().withDefaultValue(0)
+
   var blackListedExecutors = new HashSet[String]()
 
   // Used for cleanup of tasks after they reach the configured limit. Not 
written to the store.

http://git-wip-us.apache.org/repos/asf/spark/blob/fbf62b71/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala 
b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
index 5f757b7..1c787ff 100644
--- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
@@ -1273,6 +1273,51 @@ class AppStatusListenerSuite extends SparkFunSuite with 
BeforeAndAfter {
     assert(allJobs.head.numFailedStages == 1)
   }
 
+  test("SPARK-25451: total tasks in the executor summary should match total 
stage tasks") {
+    val testConf = conf.clone.set(LIVE_ENTITY_UPDATE_PERIOD, Long.MaxValue)
+
+    val listener = new AppStatusListener(store, testConf, true)
+
+    val stage = new StageInfo(1, 0, "stage", 4, Nil, Nil, "details")
+    listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage), null))
+    listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new 
Properties()))
+
+    val tasks = createTasks(4, Array("1", "2"))
+    tasks.foreach { task =>
+      listener.onTaskStart(SparkListenerTaskStart(stage.stageId, 
stage.attemptNumber, task))
+    }
+
+    time += 1
+    tasks(0).markFinished(TaskState.FINISHED, time)
+    listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptId, 
"taskType",
+      Success, tasks(0), null))
+    time += 1
+    tasks(1).markFinished(TaskState.FINISHED, time)
+    listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptId, 
"taskType",
+      Success, tasks(1), null))
+
+    stage.failureReason = Some("Failed")
+    listener.onStageCompleted(SparkListenerStageCompleted(stage))
+    time += 1
+    listener.onJobEnd(SparkListenerJobEnd(1, time, JobFailed(new 
RuntimeException("Bad Executor"))))
+
+    time += 1
+    tasks(2).markFinished(TaskState.FAILED, time)
+    listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptId, 
"taskType",
+      ExecutorLostFailure("1", true, Some("Lost executor")), tasks(2), null))
+    time += 1
+    tasks(3).markFinished(TaskState.FAILED, time)
+    listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptId, 
"taskType",
+      ExecutorLostFailure("2", true, Some("Lost executor")), tasks(3), null))
+
+    val esummary = 
store.view(classOf[ExecutorStageSummaryWrapper]).asScala.map(_.info)
+    esummary.foreach { execSummary =>
+      assert(execSummary.failedTasks === 1)
+      assert(execSummary.succeededTasks === 1)
+      assert(execSummary.killedTasks === 0)
+    }
+  }
+
   test("driver logs") {
     val listener = new AppStatusListener(store, conf, true)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to