Repository: spark
Updated Branches:
  refs/heads/branch-1.0 fa167194c -> c445b3af3


[SPARK-2284][UI] Mark all failed tasks as failures.

Previously only tasks failed with ExceptionFailure reason was marked as failure.

Author: Reynold Xin <r...@apache.org>

Closes #1224 from rxin/SPARK-2284 and squashes the following commits:

be79dbd [Reynold Xin] [SPARK-2284][UI] Mark all failed tasks as failures.

(cherry picked from commit 4a346e242c3f241c575f35536220df01ad724e23)
Signed-off-by: Reynold Xin <r...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c445b3af
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c445b3af
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c445b3af

Branch: refs/heads/branch-1.0
Commit: c445b3af3b6c524c6113dd036df0ed2f909d184c
Parents: fa16719
Author: Reynold Xin <r...@apache.org>
Authored: Wed Jun 25 22:35:03 2014 -0700
Committer: Reynold Xin <r...@apache.org>
Committed: Wed Jun 25 22:35:17 2014 -0700

----------------------------------------------------------------------
 .../spark/ui/jobs/JobProgressListener.scala     |  9 ++++--
 .../ui/jobs/JobProgressListenerSuite.scala      | 30 +++++++++++++++++++-
 2 files changed, 35 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c445b3af/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index 396cbcb..bfefe4d 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -185,12 +185,15 @@ class JobProgressListener(conf: SparkConf) extends 
SparkListener {
 
       val (failureInfo, metrics): (Option[ExceptionFailure], 
Option[TaskMetrics]) =
         taskEnd.reason match {
+          case org.apache.spark.Success =>
+            stageIdToTasksComplete(sid) = 
stageIdToTasksComplete.getOrElse(sid, 0) + 1
+            (None, Option(taskEnd.taskMetrics))
           case e: ExceptionFailure =>
             stageIdToTasksFailed(sid) = stageIdToTasksFailed.getOrElse(sid, 0) 
+ 1
             (Some(e), e.metrics)
-          case _ =>
-            stageIdToTasksComplete(sid) = 
stageIdToTasksComplete.getOrElse(sid, 0) + 1
-            (None, Option(taskEnd.taskMetrics))
+          case e: org.apache.spark.TaskEndReason =>
+            stageIdToTasksFailed(sid) = stageIdToTasksFailed.getOrElse(sid, 0) 
+ 1
+            (None, None)
         }
 
       stageIdToTime.getOrElseUpdate(sid, 0L)

http://git-wip-us.apache.org/repos/asf/spark/blob/c445b3af/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala 
b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index 8c06a2d..86a5cda 100644
--- 
a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.ui.jobs
 import org.scalatest.FunSuite
 import org.scalatest.matchers.ShouldMatchers
 
-import org.apache.spark.{LocalSparkContext, SparkConf, Success}
+import org.apache.spark._
 import org.apache.spark.executor.{ShuffleReadMetrics, TaskMetrics}
 import org.apache.spark.scheduler._
 import org.apache.spark.util.Utils
@@ -101,4 +101,32 @@ class JobProgressListenerSuite extends FunSuite with 
LocalSparkContext with Shou
     assert(listener.stageIdToExecutorSummaries.getOrElse(0, 
fail()).getOrElse("exe-2", fail())
       .shuffleRead == 1000)
   }
+
+  test("test task success vs failure counting for different task end reasons") 
{
+    val conf = new SparkConf()
+    val listener = new JobProgressListener(conf)
+    val metrics = new TaskMetrics()
+    val taskInfo = new TaskInfo(1234L, 0, 0L, "exe-1", "host1", 
TaskLocality.NODE_LOCAL)
+    taskInfo.finishTime = 1
+    val task = new ShuffleMapTask(0, null, null, 0, null)
+    val taskType = Utils.getFormattedClassName(task)
+
+    // Go through all the failure cases to make sure we are counting them as 
failures.
+    val taskFailedReasons = Seq(
+      Resubmitted,
+      new FetchFailed(null, 0, 0, 0),
+      new ExceptionFailure("Exception", "description", null, None),
+      TaskResultLost,
+      TaskKilled,
+      ExecutorLostFailure,
+      UnknownReason)
+    for (reason <- taskFailedReasons) {
+      listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, reason, 
taskInfo, metrics))
+      assert(listener.stageIdToTasksComplete.get(task.stageId) === None)
+    }
+
+    // Make sure we count success as success.
+    listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, 
taskInfo, metrics))
+    assert(listener.stageIdToTasksComplete.get(task.stageId) === Some(1))
+  }
 }

Reply via email to