Repository: spark
Updated Branches:
  refs/heads/master bf578deaf -> d1636dd72


[SPARK-2297][UI] Make task attempt and speculation more explicit in UI.

New UI:

![screen shot 2014-06-26 at 1 43 52 
pm](https://cloud.githubusercontent.com/assets/323388/3404643/82b9ddc6-fd73-11e3-96f9-f7592a7aee79.png)

Author: Reynold Xin <r...@apache.org>

Closes #1236 from rxin/ui-task-attempt and squashes the following commits:

3b645dd [Reynold Xin] Expose attemptId in Stage.
c0474b1 [Reynold Xin] Beefed up unit test.
c404bdd [Reynold Xin] Fix ReplayListenerSuite.
f56be4b [Reynold Xin] Fixed JsonProtocolSuite.
e29e0f7 [Reynold Xin] Minor update.
5e4354a [Reynold Xin] [SPARK-2297][UI] Make task attempt and speculation more 
explicit in UI.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d1636dd7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d1636dd7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d1636dd7

Branch: refs/heads/master
Commit: d1636dd72fc4966413baeb97ba55b313dc1da63d
Parents: bf578de
Author: Reynold Xin <r...@apache.org>
Authored: Thu Jun 26 21:13:26 2014 -0700
Committer: Reynold Xin <r...@apache.org>
Committed: Thu Jun 26 21:13:26 2014 -0700

----------------------------------------------------------------------
 .../apache/spark/scheduler/SparkListener.scala  |  2 +
 .../org/apache/spark/scheduler/Stage.scala      |  2 +
 .../org/apache/spark/scheduler/TaskInfo.scala   |  4 +-
 .../apache/spark/scheduler/TaskSetManager.scala | 24 ++++--
 .../org/apache/spark/ui/jobs/StagePage.scala    | 11 ++-
 .../org/apache/spark/util/JsonProtocol.scala    |  9 +-
 .../ui/jobs/JobProgressListenerSuite.scala      | 11 +--
 .../apache/spark/util/JsonProtocolSuite.scala   | 88 +++++++++++++-------
 8 files changed, 102 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d1636dd7/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala 
b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index 378cf1a..82163ea 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -75,9 +75,11 @@ case class SparkListenerBlockManagerRemoved(blockManagerId: 
BlockManagerId)
 @DeveloperApi
 case class SparkListenerUnpersistRDD(rddId: Int) extends SparkListenerEvent
 
+@DeveloperApi
 case class SparkListenerApplicationStart(appName: String, time: Long, 
sparkUser: String)
   extends SparkListenerEvent
 
+@DeveloperApi
 case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent
 
 /** An event used in the listener to shutdown the listener daemon thread. */

http://git-wip-us.apache.org/repos/asf/spark/blob/d1636dd7/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala 
b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
index 9a4be43..8ec482a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
@@ -106,6 +106,8 @@ private[spark] class Stage(
     id
   }
 
+  def attemptId: Int = nextAttemptId
+
   val name = callSite.short
   val details = callSite.long
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d1636dd7/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
index 4c62e4d..6aecdfe 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
@@ -27,10 +27,12 @@ import org.apache.spark.annotation.DeveloperApi
 class TaskInfo(
     val taskId: Long,
     val index: Int,
+    val attempt: Int,
     val launchTime: Long,
     val executorId: String,
     val host: String,
-    val taskLocality: TaskLocality.TaskLocality) {
+    val taskLocality: TaskLocality.TaskLocality,
+    val speculative: Boolean) {
 
   /**
    * The time when the task started remotely getting the result. Will not be 
set if the

http://git-wip-us.apache.org/repos/asf/spark/blob/d1636dd7/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index c0898f6..83ff6b8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -335,17 +335,19 @@ private[spark] class TaskSetManager(
   /**
    * Dequeue a pending task for a given node and return its index and locality 
level.
    * Only search for tasks matching the given locality constraint.
+   *
+   * @return An option containing (task index within the task set, locality, 
is speculative?)
    */
   private def findTask(execId: String, host: String, locality: 
TaskLocality.Value)
-    : Option[(Int, TaskLocality.Value)] =
+    : Option[(Int, TaskLocality.Value, Boolean)] =
   {
     for (index <- findTaskFromList(execId, 
getPendingTasksForExecutor(execId))) {
-      return Some((index, TaskLocality.PROCESS_LOCAL))
+      return Some((index, TaskLocality.PROCESS_LOCAL, false))
     }
 
     if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) {
       for (index <- findTaskFromList(execId, getPendingTasksForHost(host))) {
-        return Some((index, TaskLocality.NODE_LOCAL))
+        return Some((index, TaskLocality.NODE_LOCAL, false))
       }
     }
 
@@ -354,23 +356,25 @@ private[spark] class TaskSetManager(
         rack <- sched.getRackForHost(host)
         index <- findTaskFromList(execId, getPendingTasksForRack(rack))
       } {
-        return Some((index, TaskLocality.RACK_LOCAL))
+        return Some((index, TaskLocality.RACK_LOCAL, false))
       }
     }
 
     // Look for no-pref tasks after rack-local tasks since they can run 
anywhere.
     for (index <- findTaskFromList(execId, pendingTasksWithNoPrefs)) {
-      return Some((index, TaskLocality.PROCESS_LOCAL))
+      return Some((index, TaskLocality.PROCESS_LOCAL, false))
     }
 
     if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) {
       for (index <- findTaskFromList(execId, allPendingTasks)) {
-        return Some((index, TaskLocality.ANY))
+        return Some((index, TaskLocality.ANY, false))
       }
     }
 
     // Finally, if all else has failed, find a speculative task
-    findSpeculativeTask(execId, host, locality)
+    findSpeculativeTask(execId, host, locality).map { case (taskIndex, 
allowedLocality) =>
+      (taskIndex, allowedLocality, true)
+    }
   }
 
   /**
@@ -391,7 +395,7 @@ private[spark] class TaskSetManager(
       }
 
       findTask(execId, host, allowedLocality) match {
-        case Some((index, taskLocality)) => {
+        case Some((index, taskLocality, speculative)) => {
           // Found a task; do some bookkeeping and return a task description
           val task = tasks(index)
           val taskId = sched.newTaskId()
@@ -400,7 +404,9 @@ private[spark] class TaskSetManager(
             taskSet.id, index, taskId, execId, host, taskLocality))
           // Do various bookkeeping
           copiesRunning(index) += 1
-          val info = new TaskInfo(taskId, index, curTime, execId, host, 
taskLocality)
+          val attemptNum = taskAttempts(index).size
+          val info = new TaskInfo(
+            taskId, index, attemptNum + 1, curTime, execId, host, 
taskLocality, speculative)
           taskInfos(taskId) = info
           taskAttempts(index) = info :: taskAttempts(index)
           // Update our locality level for delay scheduling

http://git-wip-us.apache.org/repos/asf/spark/blob/d1636dd7/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 8b65f06..8e3d5d1 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -95,8 +95,9 @@ private[ui] class StagePage(parent: JobProgressTab) extends 
WebUIPage("stage") {
         </div>
         // scalastyle:on
       val taskHeaders: Seq[String] =
-        Seq("Task Index", "Task ID", "Status", "Locality Level", "Executor", 
"Launch Time") ++
-        Seq("Duration", "GC Time", "Result Ser Time") ++
+        Seq(
+          "Index", "ID", "Attempt", "Status", "Locality Level", "Executor",
+          "Launch Time", "Duration", "GC Time") ++
         {if (hasShuffleRead) Seq("Shuffle Read")  else Nil} ++
         {if (hasShuffleWrite) Seq("Write Time", "Shuffle Write") else Nil} ++
         {if (hasBytesSpilled) Seq("Shuffle Spill (Memory)", "Shuffle Spill 
(Disk)") else Nil} ++
@@ -245,6 +246,9 @@ private[ui] class StagePage(parent: JobProgressTab) extends 
WebUIPage("stage") {
       <tr>
         <td>{info.index}</td>
         <td>{info.taskId}</td>
+        <td sorttable_customkey={info.attempt.toString}>{
+          if (info.speculative) s"${info.attempt} (speculative)" else 
info.attempt.toString
+        }</td>
         <td>{info.status}</td>
         <td>{info.taskLocality}</td>
         <td>{info.host}</td>
@@ -255,9 +259,12 @@ private[ui] class StagePage(parent: JobProgressTab) 
extends WebUIPage("stage") {
         <td sorttable_customkey={gcTime.toString}>
           {if (gcTime > 0) UIUtils.formatDuration(gcTime) else ""}
         </td>
+        <!--
+        TODO: Add this back after we add support to hide certain columns.
         <td sorttable_customkey={serializationTime.toString}>
           {if (serializationTime > 0) 
UIUtils.formatDuration(serializationTime) else ""}
         </td>
+        -->
         {if (shuffleRead) {
            <td sorttable_customkey={shuffleReadSortable}>
              {shuffleReadReadable}

http://git-wip-us.apache.org/repos/asf/spark/blob/d1636dd7/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala 
b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 7cecbfe..6245b4b 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -32,6 +32,8 @@ import org.apache.spark.storage._
 import org.apache.spark._
 
 private[spark] object JsonProtocol {
+  // TODO: Remove this file and put JSON serialization into each individual 
class.
+
   private implicit val format = DefaultFormats
 
   /** ------------------------------------------------- *
@@ -194,10 +196,12 @@ private[spark] object JsonProtocol {
   def taskInfoToJson(taskInfo: TaskInfo): JValue = {
     ("Task ID" -> taskInfo.taskId) ~
     ("Index" -> taskInfo.index) ~
+    ("Attempt" -> taskInfo.attempt) ~
     ("Launch Time" -> taskInfo.launchTime) ~
     ("Executor ID" -> taskInfo.executorId) ~
     ("Host" -> taskInfo.host) ~
     ("Locality" -> taskInfo.taskLocality.toString) ~
+    ("Speculative" -> taskInfo.speculative) ~
     ("Getting Result Time" -> taskInfo.gettingResultTime) ~
     ("Finish Time" -> taskInfo.finishTime) ~
     ("Failed" -> taskInfo.failed) ~
@@ -487,16 +491,19 @@ private[spark] object JsonProtocol {
   def taskInfoFromJson(json: JValue): TaskInfo = {
     val taskId = (json \ "Task ID").extract[Long]
     val index = (json \ "Index").extract[Int]
+    val attempt = (json \ "Attempt").extractOpt[Int].getOrElse(1)
     val launchTime = (json \ "Launch Time").extract[Long]
     val executorId = (json \ "Executor ID").extract[String]
     val host = (json \ "Host").extract[String]
     val taskLocality = TaskLocality.withName((json \ 
"Locality").extract[String])
+    val speculative = (json \ 
"Speculative").extractOpt[Boolean].getOrElse(false)
     val gettingResultTime = (json \ "Getting Result Time").extract[Long]
     val finishTime = (json \ "Finish Time").extract[Long]
     val failed = (json \ "Failed").extract[Boolean]
     val serializedSize = (json \ "Serialized Size").extract[Int]
 
-    val taskInfo = new TaskInfo(taskId, index, launchTime, executorId, host, 
taskLocality)
+    val taskInfo =
+      new TaskInfo(taskId, index, attempt, launchTime, executorId, host, 
taskLocality, speculative)
     taskInfo.gettingResultTime = gettingResultTime
     taskInfo.finishTime = finishTime
     taskInfo.failed = failed

http://git-wip-us.apache.org/repos/asf/spark/blob/d1636dd7/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala 
b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index e0fec6a..fa43b66 100644
--- 
a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -66,7 +66,7 @@ class JobProgressListenerSuite extends FunSuite with 
LocalSparkContext with Matc
     // finish this task, should get updated shuffleRead
     shuffleReadMetrics.remoteBytesRead = 1000
     taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics)
-    var taskInfo = new TaskInfo(1234L, 0, 0L, "exe-1", "host1", 
TaskLocality.NODE_LOCAL)
+    var taskInfo = new TaskInfo(1234L, 0, 1, 0L, "exe-1", "host1", 
TaskLocality.NODE_LOCAL, false)
     taskInfo.finishTime = 1
     var task = new ShuffleMapTask(0, null, null, 0, null)
     val taskType = Utils.getFormattedClassName(task)
@@ -75,7 +75,8 @@ class JobProgressListenerSuite extends FunSuite with 
LocalSparkContext with Matc
       .shuffleRead == 1000)
 
     // finish a task with unknown executor-id, nothing should happen
-    taskInfo = new TaskInfo(1234L, 0, 1000L, "exe-unknown", "host1", 
TaskLocality.NODE_LOCAL)
+    taskInfo =
+      new TaskInfo(1234L, 0, 1, 1000L, "exe-unknown", "host1", 
TaskLocality.NODE_LOCAL, true)
     taskInfo.finishTime = 1
     task = new ShuffleMapTask(0, null, null, 0, null)
     listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, 
taskInfo, taskMetrics))
@@ -84,7 +85,7 @@ class JobProgressListenerSuite extends FunSuite with 
LocalSparkContext with Matc
     // finish this task, should get updated duration
     shuffleReadMetrics.remoteBytesRead = 1000
     taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics)
-    taskInfo = new TaskInfo(1235L, 0, 0L, "exe-1", "host1", 
TaskLocality.NODE_LOCAL)
+    taskInfo = new TaskInfo(1235L, 0, 1, 0L, "exe-1", "host1", 
TaskLocality.NODE_LOCAL, false)
     taskInfo.finishTime = 1
     task = new ShuffleMapTask(0, null, null, 0, null)
     listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, 
taskInfo, taskMetrics))
@@ -94,7 +95,7 @@ class JobProgressListenerSuite extends FunSuite with 
LocalSparkContext with Matc
     // finish this task, should get updated duration
     shuffleReadMetrics.remoteBytesRead = 1000
     taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics)
-    taskInfo = new TaskInfo(1236L, 0, 0L, "exe-2", "host1", 
TaskLocality.NODE_LOCAL)
+    taskInfo = new TaskInfo(1236L, 0, 2, 0L, "exe-2", "host1", 
TaskLocality.NODE_LOCAL, false)
     taskInfo.finishTime = 1
     task = new ShuffleMapTask(0, null, null, 0, null)
     listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, 
taskInfo, taskMetrics))
@@ -106,7 +107,7 @@ class JobProgressListenerSuite extends FunSuite with 
LocalSparkContext with Matc
     val conf = new SparkConf()
     val listener = new JobProgressListener(conf)
     val metrics = new TaskMetrics()
-    val taskInfo = new TaskInfo(1234L, 0, 0L, "exe-1", "host1", 
TaskLocality.NODE_LOCAL)
+    val taskInfo = new TaskInfo(1234L, 0, 3, 0L, "exe-1", "host1", 
TaskLocality.NODE_LOCAL, false)
     taskInfo.finishTime = 1
     val task = new ShuffleMapTask(0, null, null, 0, null)
     val taskType = Utils.getFormattedClassName(task)

http://git-wip-us.apache.org/repos/asf/spark/blob/d1636dd7/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala 
b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 495e1b7..6c49870 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -35,10 +35,11 @@ class JsonProtocolSuite extends FunSuite {
     val stageSubmitted =
       SparkListenerStageSubmitted(makeStageInfo(100, 200, 300, 400L, 500L), 
properties)
     val stageCompleted = SparkListenerStageCompleted(makeStageInfo(101, 201, 
301, 401L, 501L))
-    val taskStart = SparkListenerTaskStart(111, makeTaskInfo(222L, 333, 444L))
-    val taskGettingResult = SparkListenerTaskGettingResult(makeTaskInfo(1000L, 
2000, 3000L))
+    val taskStart = SparkListenerTaskStart(111, makeTaskInfo(222L, 333, 1, 
444L, false))
+    val taskGettingResult =
+      SparkListenerTaskGettingResult(makeTaskInfo(1000L, 2000, 5, 3000L, true))
     val taskEnd = SparkListenerTaskEnd(1, "ShuffleMapTask", Success,
-      makeTaskInfo(123L, 234, 345L), makeTaskMetrics(300L, 400L, 500L, 600L, 
700, 800))
+      makeTaskInfo(123L, 234, 67, 345L, false), makeTaskMetrics(300L, 400L, 
500L, 600L, 700, 800))
     val jobStart = SparkListenerJobStart(10, Seq[Int](1, 2, 3, 4), properties)
     val jobEnd = SparkListenerJobEnd(20, JobSucceeded)
     val environmentUpdate = SparkListenerEnvironmentUpdate(Map[String, 
Seq[(String, String)]](
@@ -73,7 +74,7 @@ class JsonProtocolSuite extends FunSuite {
   test("Dependent Classes") {
     testRDDInfo(makeRddInfo(2, 3, 4, 5L, 6L))
     testStageInfo(makeStageInfo(10, 20, 30, 40L, 50L))
-    testTaskInfo(makeTaskInfo(999L, 888, 777L))
+    testTaskInfo(makeTaskInfo(999L, 888, 55, 777L, false))
     testTaskMetrics(makeTaskMetrics(33333L, 44444L, 55555L, 66666L, 7, 8))
     testBlockManagerId(BlockManagerId("Hong", "Kong", 500, 1000))
 
@@ -269,10 +270,12 @@ class JsonProtocolSuite extends FunSuite {
   private def assertEquals(info1: TaskInfo, info2: TaskInfo) {
     assert(info1.taskId === info2.taskId)
     assert(info1.index === info2.index)
+    assert(info1.attempt === info2.attempt)
     assert(info1.launchTime === info2.launchTime)
     assert(info1.executorId === info2.executorId)
     assert(info1.host === info2.host)
     assert(info1.taskLocality === info2.taskLocality)
+    assert(info1.speculative === info2.speculative)
     assert(info1.gettingResultTime === info2.gettingResultTime)
     assert(info1.finishTime === info2.finishTime)
     assert(info1.failed === info2.failed)
@@ -453,8 +456,8 @@ class JsonProtocolSuite extends FunSuite {
     new StageInfo(a, "greetings", b, rddInfos, "details")
   }
 
-  private def makeTaskInfo(a: Long, b: Int, c: Long) = {
-    new TaskInfo(a, b, c, "executor", "your kind sir", TaskLocality.NODE_LOCAL)
+  private def makeTaskInfo(a: Long, b: Int, c: Int, d: Long, speculative: 
Boolean) = {
+    new TaskInfo(a, b, c, d, "executor", "your kind sir", 
TaskLocality.NODE_LOCAL, speculative)
   }
 
   private def makeTaskMetrics(a: Long, b: Long, c: Long, d: Long, e: Int, f: 
Int) = {
@@ -510,37 +513,60 @@ class JsonProtocolSuite extends FunSuite {
 
   private val taskStartJsonString =
     """
-      {"Event":"SparkListenerTaskStart","Stage ID":111,"Task Info":{"Task 
ID":222,
-      "Index":333,"Launch Time":444,"Executor ID":"executor","Host":"your kind 
sir",
-      "Locality":"NODE_LOCAL","Getting Result Time":0,"Finish 
Time":0,"Failed":false,
-      "Serialized Size":0}}
-    """
+      |{"Event":"SparkListenerTaskStart","Stage ID":111,"Task Info":{"Task 
ID":222,
+      |"Index":333,"Attempt":1,"Launch Time":444,"Executor 
ID":"executor","Host":"your kind sir",
+      |"Locality":"NODE_LOCAL","Speculative":false,"Getting Result 
Time":0,"Finish Time":0,
+      |"Failed":false,"Serialized Size":0}}
+    """.stripMargin
 
   private val taskGettingResultJsonString =
     """
-      {"Event":"SparkListenerTaskGettingResult","Task Info":{"Task 
ID":1000,"Index":
-      2000,"Launch Time":3000,"Executor ID":"executor","Host":"your kind sir",
-      "Locality":"NODE_LOCAL","Getting Result Time":0,"Finish 
Time":0,"Failed":false,
-      "Serialized Size":0}}
-    """
+      |{"Event":"SparkListenerTaskGettingResult","Task Info":
+      |  {"Task ID":1000,"Index":2000,"Attempt":5,"Launch Time":3000,"Executor 
ID":"executor",
+      |   "Host":"your kind 
sir","Locality":"NODE_LOCAL","Speculative":true,"Getting Result Time":0,
+      |   "Finish Time":0,"Failed":false,"Serialized Size":0
+      |  }
+      |}
+    """.stripMargin
 
   private val taskEndJsonString =
     """
-      {"Event":"SparkListenerTaskEnd","Stage ID":1,"Task 
Type":"ShuffleMapTask",
-      "Task End Reason":{"Reason":"Success"},"Task Info":{"Task 
ID":123,"Index":
-      234,"Launch Time":345,"Executor ID":"executor","Host":"your kind sir",
-      "Locality":"NODE_LOCAL","Getting Result Time":0,"Finish Time":0,"Failed":
-      false,"Serialized Size":0},"Task Metrics":{"Host Name":"localhost",
-      "Executor Deserialize Time":300,"Executor Run Time":400,"Result 
Size":500,
-      "JVM GC Time":600,"Result Serialization Time":700,"Memory Bytes Spilled":
-      800,"Disk Bytes Spilled":0,"Shuffle Read Metrics":{"Shuffle Finish Time":
-      900,"Total Blocks Fetched":1500,"Remote Blocks Fetched":800,"Local 
Blocks Fetched":
-      700,"Fetch Wait Time":900,"Remote Bytes Read":1000},"Shuffle Write 
Metrics":
-      {"Shuffle Bytes Written":1200,"Shuffle Write Time":1500},"Updated 
Blocks":
-      [{"Block ID":"rdd_0_0","Status":{"Storage Level":{"Use Disk":true,"Use 
Memory":true,
-      "Use Tachyon":false,"Deserialized":false,"Replication":2},"Memory 
Size":0,"Tachyon Size":0,
-      "Disk Size":0}}]}}
-    """
+      |{"Event":"SparkListenerTaskEnd","Stage ID":1,"Task 
Type":"ShuffleMapTask",
+      |"Task End Reason":{"Reason":"Success"},
+      |"Task Info":{
+      |  "Task ID":123,"Index":234,"Attempt":67,"Launch Time":345,"Executor 
ID":"executor",
+      |  "Host":"your kind sir","Locality":"NODE_LOCAL","Speculative":false,
+      |  "Getting Result Time":0,"Finish Time":0,"Failed":false,"Serialized 
Size":0
+      |},
+      |"Task Metrics":{
+      |  "Host Name":"localhost","Executor Deserialize Time":300,"Executor Run 
Time":400,
+      |  "Result Size":500,"JVM GC Time":600,"Result Serialization Time":700,
+      |  "Memory Bytes Spilled":800,"Disk Bytes Spilled":0,
+      |  "Shuffle Read Metrics":{
+      |    "Shuffle Finish Time":900,
+      |    "Total Blocks Fetched":1500,
+      |    "Remote Blocks Fetched":800,
+      |    "Local Blocks Fetched":700,
+      |    "Fetch Wait Time":900,
+      |    "Remote Bytes Read":1000
+      |  },
+      |  "Shuffle Write Metrics":{
+      |    "Shuffle Bytes Written":1200,
+      |    "Shuffle Write Time":1500},
+      |    "Updated Blocks":[
+      |    {"Block ID":"rdd_0_0",
+      |      "Status":{
+      |        "Storage Level":{
+      |          "Use Disk":true,"Use Memory":true,"Use 
Tachyon":false,"Deserialized":false,
+      |          "Replication":2
+      |        },
+      |        "Memory Size":0,"Tachyon Size":0,"Disk Size":0
+      |      }
+      |    }
+      |    ]
+      |  }
+      |}
+    """.stripMargin
 
   private val jobStartJsonString =
     """

Reply via email to