Repository: spark Updated Branches: refs/heads/branch-1.0 fa167194c -> c445b3af3
[SPARK-2284][UI] Mark all failed tasks as failures. Previously only tasks failed with ExceptionFailure reason was marked as failure. Author: Reynold Xin <r...@apache.org> Closes #1224 from rxin/SPARK-2284 and squashes the following commits: be79dbd [Reynold Xin] [SPARK-2284][UI] Mark all failed tasks as failures. (cherry picked from commit 4a346e242c3f241c575f35536220df01ad724e23) Signed-off-by: Reynold Xin <r...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c445b3af Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c445b3af Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c445b3af Branch: refs/heads/branch-1.0 Commit: c445b3af3b6c524c6113dd036df0ed2f909d184c Parents: fa16719 Author: Reynold Xin <r...@apache.org> Authored: Wed Jun 25 22:35:03 2014 -0700 Committer: Reynold Xin <r...@apache.org> Committed: Wed Jun 25 22:35:17 2014 -0700 ---------------------------------------------------------------------- .../spark/ui/jobs/JobProgressListener.scala | 9 ++++-- .../ui/jobs/JobProgressListenerSuite.scala | 30 +++++++++++++++++++- 2 files changed, 35 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/c445b3af/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 396cbcb..bfefe4d 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -185,12 +185,15 @@ class JobProgressListener(conf: SparkConf) extends SparkListener { val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) = taskEnd.reason match { + case org.apache.spark.Success => + stageIdToTasksComplete(sid) = stageIdToTasksComplete.getOrElse(sid, 0) + 1 + (None, Option(taskEnd.taskMetrics)) case e: ExceptionFailure => stageIdToTasksFailed(sid) = stageIdToTasksFailed.getOrElse(sid, 0) + 1 (Some(e), e.metrics) - case _ => - stageIdToTasksComplete(sid) = stageIdToTasksComplete.getOrElse(sid, 0) + 1 - (None, Option(taskEnd.taskMetrics)) + case e: org.apache.spark.TaskEndReason => + stageIdToTasksFailed(sid) = stageIdToTasksFailed.getOrElse(sid, 0) + 1 + (None, None) } stageIdToTime.getOrElseUpdate(sid, 0L) http://git-wip-us.apache.org/repos/asf/spark/blob/c445b3af/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index 8c06a2d..86a5cda 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.ui.jobs import org.scalatest.FunSuite import org.scalatest.matchers.ShouldMatchers -import org.apache.spark.{LocalSparkContext, SparkConf, Success} +import org.apache.spark._ import org.apache.spark.executor.{ShuffleReadMetrics, TaskMetrics} import org.apache.spark.scheduler._ import org.apache.spark.util.Utils @@ -101,4 +101,32 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Shou assert(listener.stageIdToExecutorSummaries.getOrElse(0, fail()).getOrElse("exe-2", fail()) .shuffleRead == 1000) } + + test("test task success vs failure counting for different task end reasons") { + val conf = new SparkConf() + val listener = new JobProgressListener(conf) + val metrics = new TaskMetrics() + val taskInfo = new TaskInfo(1234L, 0, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL) + taskInfo.finishTime = 1 + val task = new ShuffleMapTask(0, null, null, 0, null) + val taskType = Utils.getFormattedClassName(task) + + // Go through all the failure cases to make sure we are counting them as failures. + val taskFailedReasons = Seq( + Resubmitted, + new FetchFailed(null, 0, 0, 0), + new ExceptionFailure("Exception", "description", null, None), + TaskResultLost, + TaskKilled, + ExecutorLostFailure, + UnknownReason) + for (reason <- taskFailedReasons) { + listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, reason, taskInfo, metrics)) + assert(listener.stageIdToTasksComplete.get(task.stageId) === None) + } + + // Make sure we count success as success. + listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, metrics)) + assert(listener.stageIdToTasksComplete.get(task.stageId) === Some(1)) + } }