Repository: spark
Updated Branches:
  refs/heads/master 8440e3072 -> 39a02d8f7


[SPARK-24415][CORE] Fixed the aggregated stage metrics by retaining stage 
objects in liveStages until all tasks are complete

The problem occurs because stage object is removed from liveStages in
AppStatusListener onStageCompletion. Because of this any onTaskEnd event
received after onStageCompletion event do not update stage metrics.

The fix is to retain stage objects in liveStages until all tasks are complete.

1. Fixed the reproducible example posted in the JIRA
2. Added unit test

Closes #22209 from ankuriitg/ankurgupta/SPARK-24415.

Authored-by: ankurgupta <ankur.gu...@cloudera.com>
Signed-off-by: Marcelo Vanzin <van...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/39a02d8f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/39a02d8f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/39a02d8f

Branch: refs/heads/master
Commit: 39a02d8f75def7191c66d388729ba1721c92188d
Parents: 8440e30
Author: ankurgupta <ankur.gu...@cloudera.com>
Authored: Wed Sep 5 09:41:05 2018 -0700
Committer: Marcelo Vanzin <van...@cloudera.com>
Committed: Wed Sep 5 09:52:04 2018 -0700

----------------------------------------------------------------------
 .../apache/spark/status/AppStatusListener.scala | 61 +++++++++++++++-----
 .../spark/status/AppStatusListenerSuite.scala   | 55 ++++++++++++++++++
 .../spark/streaming/UISeleniumSuite.scala       |  9 ++-
 3 files changed, 108 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/39a02d8f/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala 
b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
index 5ea161c..91b75e4 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
@@ -350,11 +350,20 @@ private[spark] class AppStatusListener(
         val e = it.next()
         if (job.stageIds.contains(e.getKey()._1)) {
           val stage = e.getValue()
-          stage.status = v1.StageStatus.SKIPPED
-          job.skippedStages += stage.info.stageId
-          job.skippedTasks += stage.info.numTasks
-          it.remove()
-          update(stage, now)
+          if (v1.StageStatus.PENDING.equals(stage.status)) {
+            stage.status = v1.StageStatus.SKIPPED
+            job.skippedStages += stage.info.stageId
+            job.skippedTasks += stage.info.numTasks
+            job.activeStages -= 1
+
+            pools.get(stage.schedulingPool).foreach { pool =>
+              pool.stageIds = pool.stageIds - stage.info.stageId
+              update(pool, now)
+            }
+
+            it.remove()
+            update(stage, now, last = true)
+          }
         }
       }
 
@@ -506,7 +515,16 @@ private[spark] class AppStatusListener(
       if (killedDelta > 0) {
         stage.killedSummary = killedTasksSummary(event.reason, 
stage.killedSummary)
       }
-      maybeUpdate(stage, now)
+      // [SPARK-24415] Wait for all tasks to finish before removing stage from 
live list
+      val removeStage =
+        stage.activeTasks == 0 &&
+          (v1.StageStatus.COMPLETE.equals(stage.status) ||
+            v1.StageStatus.FAILED.equals(stage.status))
+      if (removeStage) {
+        update(stage, now, last = true)
+      } else {
+        maybeUpdate(stage, now)
+      }
 
       // Store both stage ID and task index in a single long variable for 
tracking at job level.
       val taskIndex = (event.stageId.toLong << Integer.SIZE) | 
event.taskInfo.index
@@ -521,7 +539,7 @@ private[spark] class AppStatusListener(
         if (killedDelta > 0) {
           job.killedSummary = killedTasksSummary(event.reason, 
job.killedSummary)
         }
-        maybeUpdate(job, now)
+        conditionalLiveUpdate(job, now, removeStage)
       }
 
       val esummary = stage.executorSummary(event.taskInfo.executorId)
@@ -532,7 +550,7 @@ private[spark] class AppStatusListener(
       if (metricsDelta != null) {
         esummary.metrics = LiveEntityHelpers.addMetrics(esummary.metrics, 
metricsDelta)
       }
-      maybeUpdate(esummary, now)
+      conditionalLiveUpdate(esummary, now, removeStage)
 
       if (!stage.cleaning && stage.savedTasks.get() > maxTasksPerStage) {
         stage.cleaning = true
@@ -540,6 +558,9 @@ private[spark] class AppStatusListener(
           cleanupTasks(stage)
         }
       }
+      if (removeStage) {
+        liveStages.remove((event.stageId, event.stageAttemptId))
+      }
     }
 
     liveExecutors.get(event.taskInfo.executorId).foreach { exec =>
@@ -564,17 +585,13 @@ private[spark] class AppStatusListener(
 
       // Force an update on live applications when the number of active tasks 
reaches 0. This is
       // checked in some tests (e.g. SQLTestUtilsBase) so it needs to be 
reliably up to date.
-      if (exec.activeTasks == 0) {
-        liveUpdate(exec, now)
-      } else {
-        maybeUpdate(exec, now)
-      }
+      conditionalLiveUpdate(exec, now, exec.activeTasks == 0)
     }
   }
 
   override def onStageCompleted(event: SparkListenerStageCompleted): Unit = {
     val maybeStage =
-      Option(liveStages.remove((event.stageInfo.stageId, 
event.stageInfo.attemptNumber)))
+      Option(liveStages.get((event.stageInfo.stageId, 
event.stageInfo.attemptNumber)))
     maybeStage.foreach { stage =>
       val now = System.nanoTime()
       stage.info = event.stageInfo
@@ -608,7 +625,6 @@ private[spark] class AppStatusListener(
       }
 
       stage.executorSummaries.values.foreach(update(_, now))
-      update(stage, now, last = true)
 
       val executorIdsForStage = stage.blackListedExecutors
       executorIdsForStage.foreach { executorId =>
@@ -616,6 +632,13 @@ private[spark] class AppStatusListener(
           removeBlackListedStageFrom(exec, event.stageInfo.stageId, now)
         }
       }
+
+      // Remove stage only if there are no active tasks remaining
+      val removeStage = stage.activeTasks == 0
+      update(stage, now, last = removeStage)
+      if (removeStage) {
+        liveStages.remove((event.stageInfo.stageId, 
event.stageInfo.attemptNumber))
+      }
     }
 
     appSummary = new AppSummary(appSummary.numCompletedJobs, 
appSummary.numCompletedStages + 1)
@@ -882,6 +905,14 @@ private[spark] class AppStatusListener(
     }
   }
 
+  private def conditionalLiveUpdate(entity: LiveEntity, now: Long, condition: 
Boolean): Unit = {
+    if (condition) {
+      liveUpdate(entity, now)
+    } else {
+      maybeUpdate(entity, now)
+    }
+  }
+
   private def cleanupExecutors(count: Long): Unit = {
     // Because the limit is on the number of *dead* executors, we need to 
calculate whether
     // there are actually enough dead executors to be deleted.

http://git-wip-us.apache.org/repos/asf/spark/blob/39a02d8f/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala 
b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
index 1b3639a..ea80fea 100644
--- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
@@ -1190,6 +1190,61 @@ class AppStatusListenerSuite extends SparkFunSuite with 
BeforeAndAfter {
     assert(appStore.asOption(appStore.lastStageAttempt(3)) === None)
   }
 
+  test("SPARK-24415: update metrics for tasks that finish late") {
+    val listener = new AppStatusListener(store, conf, true)
+
+    val stage1 = new StageInfo(1, 0, "stage1", 4, Nil, Nil, "details1")
+    val stage2 = new StageInfo(2, 0, "stage2", 4, Nil, Nil, "details2")
+
+    // Start job
+    listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage1, stage2), 
null))
+
+    // Start 2 stages
+    listener.onStageSubmitted(SparkListenerStageSubmitted(stage1, new 
Properties()))
+    listener.onStageSubmitted(SparkListenerStageSubmitted(stage2, new 
Properties()))
+
+    // Start 2 Tasks
+    val tasks = createTasks(2, Array("1"))
+    tasks.foreach { task =>
+      listener.onTaskStart(SparkListenerTaskStart(stage1.stageId, 
stage1.attemptNumber, task))
+    }
+
+    // Task 1 Finished
+    time += 1
+    tasks(0).markFinished(TaskState.FINISHED, time)
+    listener.onTaskEnd(
+      SparkListenerTaskEnd(stage1.stageId, stage1.attemptId, "taskType", 
Success, tasks(0), null))
+
+    // Stage 1 Completed
+    stage1.failureReason = Some("Failed")
+    listener.onStageCompleted(SparkListenerStageCompleted(stage1))
+
+    // Stop job 1
+    time += 1
+    listener.onJobEnd(SparkListenerJobEnd(1, time, JobSucceeded))
+
+    // Task 2 Killed
+    time += 1
+    tasks(1).markFinished(TaskState.FINISHED, time)
+    listener.onTaskEnd(
+      SparkListenerTaskEnd(stage1.stageId, stage1.attemptId, "taskType",
+        TaskKilled(reason = "Killed"), tasks(1), null))
+
+    // Ensure killed task metrics are updated
+    val allStages = 
store.view(classOf[StageDataWrapper]).reverse().asScala.map(_.info)
+    val failedStages = allStages.filter(_.status == v1.StageStatus.FAILED)
+    assert(failedStages.size == 1)
+    assert(failedStages.head.numKilledTasks == 1)
+    assert(failedStages.head.numCompleteTasks == 1)
+
+    val allJobs = 
store.view(classOf[JobDataWrapper]).reverse().asScala.map(_.info)
+    assert(allJobs.size == 1)
+    assert(allJobs.head.numKilledTasks == 1)
+    assert(allJobs.head.numCompletedTasks == 1)
+    assert(allJobs.head.numActiveStages == 1)
+    assert(allJobs.head.numFailedStages == 1)
+  }
+
   test("driver logs") {
     val listener = new AppStatusListener(store, conf, true)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/39a02d8f/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala 
b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
index f2204a1..957feca 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
@@ -77,7 +77,12 @@ class UISeleniumSuite
     inputStream.foreachRDD { rdd =>
       rdd.foreach(_ => {})
       try {
-        rdd.foreach(_ => throw new RuntimeException("Oops"))
+        rdd.foreach { _ =>
+          // Failing the task with id 15 to ensure only one task fails
+          if (TaskContext.get.taskAttemptId() % 15 == 0) {
+            throw new RuntimeException("Oops")
+          }
+        }
       } catch {
         case e: SparkException if e.getMessage.contains("Oops") =>
       }
@@ -166,7 +171,7 @@ class UISeleniumSuite
 
         // Check job progress
         findAll(cssSelector(""".progress-cell""")).map(_.text).toList should 
be (
-          List("4/4", "4/4", "4/4", "0/4 (1 failed)"))
+          List("4/4", "4/4", "4/4", "3/4 (1 failed)"))
 
         // Check stacktrace
         val errorCells = 
findAll(cssSelector(""".stacktrace-details""")).map(_.underlying).toSeq


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to