Repository: spark
Updated Branches:
  refs/heads/master 647ee05e5 -> ad79fc0a8


[SPARK-17406][WEB UI] limit timeline executor events

## What changes were proposed in this pull request?
The job page will be too slow to open when there are thousands of executor 
events(added or removed). I found that in ExecutorsTab file, executorIdToData 
will not remove elements, it will increase all the time.Before this pr, it 
looks like 
[timeline1.png](https://issues.apache.org/jira/secure/attachment/12827112/timeline1.png).
 After this pr, it looks like 
[timeline2.png](https://issues.apache.org/jira/secure/attachment/12827113/timeline2.png)(we
 can set how many executor events will be displayed)

Author: cenyuhai <cenyu...@didichuxing.com>

Closes #14969 from cenyuhai/SPARK-17406.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ad79fc0a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ad79fc0a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ad79fc0a

Branch: refs/heads/master
Commit: ad79fc0a8407a950a03869f2f8cdc3ed0bf13875
Parents: 647ee05
Author: cenyuhai <cenyu...@didichuxing.com>
Authored: Thu Sep 15 09:58:53 2016 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Thu Sep 15 09:58:53 2016 +0100

----------------------------------------------------------------------
 .../apache/spark/ui/exec/ExecutorsPage.scala    |  41 +++----
 .../org/apache/spark/ui/exec/ExecutorsTab.scala | 112 +++++++++++--------
 .../org/apache/spark/ui/jobs/AllJobsPage.scala  |  66 +++++------
 .../apache/spark/ui/jobs/ExecutorTable.scala    |   3 +-
 .../org/apache/spark/ui/jobs/JobPage.scala      |  67 ++++++-----
 .../org/apache/spark/ui/jobs/StagePage.scala    |   4 +-
 .../scala/org/apache/spark/ui/jobs/UIData.scala |   5 -
 project/MimaExcludes.scala                      |  12 ++
 8 files changed, 162 insertions(+), 148 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ad79fc0a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala 
b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
index 982e891..7953d77 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
@@ -17,14 +17,12 @@
 
 package org.apache.spark.ui.exec
 
-import java.net.URLEncoder
 import javax.servlet.http.HttpServletRequest
 
 import scala.xml.Node
 
 import org.apache.spark.status.api.v1.ExecutorSummary
-import org.apache.spark.ui.{ToolTips, UIUtils, WebUIPage}
-import org.apache.spark.util.Utils
+import org.apache.spark.ui.{UIUtils, WebUIPage}
 
 // This isn't even used anymore -- but we need to keep it b/c of a MiMa false 
positive
 private[ui] case class ExecutorSummaryInfo(
@@ -83,18 +81,7 @@ private[spark] object ExecutorsPage {
     val memUsed = status.memUsed
     val maxMem = status.maxMem
     val diskUsed = status.diskUsed
-    val totalCores = listener.executorToTotalCores.getOrElse(execId, 0)
-    val maxTasks = listener.executorToTasksMax.getOrElse(execId, 0)
-    val activeTasks = listener.executorToTasksActive.getOrElse(execId, 0)
-    val failedTasks = listener.executorToTasksFailed.getOrElse(execId, 0)
-    val completedTasks = listener.executorToTasksComplete.getOrElse(execId, 0)
-    val totalTasks = activeTasks + failedTasks + completedTasks
-    val totalDuration = listener.executorToDuration.getOrElse(execId, 0L)
-    val totalGCTime = listener.executorToJvmGCTime.getOrElse(execId, 0L)
-    val totalInputBytes = listener.executorToInputBytes.getOrElse(execId, 0L)
-    val totalShuffleRead = listener.executorToShuffleRead.getOrElse(execId, 0L)
-    val totalShuffleWrite = listener.executorToShuffleWrite.getOrElse(execId, 
0L)
-    val executorLogs = listener.executorToLogUrls.getOrElse(execId, Map.empty)
+    val taskSummary = listener.executorToTaskSummary.getOrElse(execId, 
ExecutorTaskSummary(execId))
 
     new ExecutorSummary(
       execId,
@@ -103,19 +90,19 @@ private[spark] object ExecutorsPage {
       rddBlocks,
       memUsed,
       diskUsed,
-      totalCores,
-      maxTasks,
-      activeTasks,
-      failedTasks,
-      completedTasks,
-      totalTasks,
-      totalDuration,
-      totalGCTime,
-      totalInputBytes,
-      totalShuffleRead,
-      totalShuffleWrite,
+      taskSummary.totalCores,
+      taskSummary.tasksMax,
+      taskSummary.tasksActive,
+      taskSummary.tasksFailed,
+      taskSummary.tasksComplete,
+      taskSummary.tasksActive + taskSummary.tasksFailed + 
taskSummary.tasksComplete,
+      taskSummary.duration,
+      taskSummary.jvmGCTime,
+      taskSummary.inputBytes,
+      taskSummary.shuffleRead,
+      taskSummary.shuffleWrite,
       maxMem,
-      executorLogs
+      taskSummary.executorLogs
     )
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ad79fc0a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala 
b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
index 676f445..678571f 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
@@ -17,14 +17,13 @@
 
 package org.apache.spark.ui.exec
 
-import scala.collection.mutable.HashMap
+import scala.collection.mutable.{LinkedHashMap, ListBuffer}
 
 import org.apache.spark.{ExceptionFailure, Resubmitted, SparkConf, 
SparkContext}
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.scheduler._
 import org.apache.spark.storage.{StorageStatus, StorageStatusListener}
 import org.apache.spark.ui.{SparkUI, SparkUITab}
-import org.apache.spark.ui.jobs.UIData.ExecutorUIData
 
 private[ui] class ExecutorsTab(parent: SparkUI) extends SparkUITab(parent, 
"executors") {
   val listener = parent.executorsListener
@@ -38,6 +37,25 @@ private[ui] class ExecutorsTab(parent: SparkUI) extends 
SparkUITab(parent, "exec
   }
 }
 
+private[ui] case class ExecutorTaskSummary(
+    var executorId: String,
+    var totalCores: Int = 0,
+    var tasksMax: Int = 0,
+    var tasksActive: Int = 0,
+    var tasksFailed: Int = 0,
+    var tasksComplete: Int = 0,
+    var duration: Long = 0L,
+    var jvmGCTime: Long = 0L,
+    var inputBytes: Long = 0L,
+    var inputRecords: Long = 0L,
+    var outputBytes: Long = 0L,
+    var outputRecords: Long = 0L,
+    var shuffleRead: Long = 0L,
+    var shuffleWrite: Long = 0L,
+    var executorLogs: Map[String, String] = Map.empty,
+    var isAlive: Boolean = true
+)
+
 /**
  * :: DeveloperApi ::
  * A SparkListener that prepares information to be displayed on the 
ExecutorsTab
@@ -45,21 +63,11 @@ private[ui] class ExecutorsTab(parent: SparkUI) extends 
SparkUITab(parent, "exec
 @DeveloperApi
 class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: 
SparkConf)
     extends SparkListener {
-  val executorToTotalCores = HashMap[String, Int]()
-  val executorToTasksMax = HashMap[String, Int]()
-  val executorToTasksActive = HashMap[String, Int]()
-  val executorToTasksComplete = HashMap[String, Int]()
-  val executorToTasksFailed = HashMap[String, Int]()
-  val executorToDuration = HashMap[String, Long]()
-  val executorToJvmGCTime = HashMap[String, Long]()
-  val executorToInputBytes = HashMap[String, Long]()
-  val executorToInputRecords = HashMap[String, Long]()
-  val executorToOutputBytes = HashMap[String, Long]()
-  val executorToOutputRecords = HashMap[String, Long]()
-  val executorToShuffleRead = HashMap[String, Long]()
-  val executorToShuffleWrite = HashMap[String, Long]()
-  val executorToLogUrls = HashMap[String, Map[String, String]]()
-  val executorIdToData = HashMap[String, ExecutorUIData]()
+  var executorToTaskSummary = LinkedHashMap[String, ExecutorTaskSummary]()
+  var executorEvents = new ListBuffer[SparkListenerEvent]()
+
+  private val maxTimelineExecutors = 
conf.getInt("spark.ui.timeline.executors.maximum", 1000)
+  private val retainedDeadExecutors = 
conf.getInt("spark.ui.retainedDeadExecutors", 100)
 
   def activeStorageStatusList: Seq[StorageStatus] = 
storageStatusListener.storageStatusList
 
@@ -67,18 +75,29 @@ class ExecutorsListener(storageStatusListener: 
StorageStatusListener, conf: Spar
 
   override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): 
Unit = synchronized {
     val eid = executorAdded.executorId
-    executorToLogUrls(eid) = executorAdded.executorInfo.logUrlMap
-    executorToTotalCores(eid) = executorAdded.executorInfo.totalCores
-    executorToTasksMax(eid) = executorToTotalCores(eid) / 
conf.getInt("spark.task.cpus", 1)
-    executorIdToData(eid) = new ExecutorUIData(executorAdded.time)
+    val taskSummary = executorToTaskSummary.getOrElseUpdate(eid, 
ExecutorTaskSummary(eid))
+    taskSummary.executorLogs = executorAdded.executorInfo.logUrlMap
+    taskSummary.totalCores = executorAdded.executorInfo.totalCores
+    taskSummary.tasksMax = taskSummary.totalCores / 
conf.getInt("spark.task.cpus", 1)
+    executorEvents += executorAdded
+    if (executorEvents.size > maxTimelineExecutors) {
+      executorEvents.remove(0)
+    }
+
+    val deadExecutors = executorToTaskSummary.filter(e => !e._2.isAlive)
+    if (deadExecutors.size > retainedDeadExecutors) {
+      val head = deadExecutors.head
+      executorToTaskSummary.remove(head._1)
+    }
   }
 
   override def onExecutorRemoved(
       executorRemoved: SparkListenerExecutorRemoved): Unit = synchronized {
-    val eid = executorRemoved.executorId
-    val uiData = executorIdToData(eid)
-    uiData.finishTime = Some(executorRemoved.time)
-    uiData.finishReason = Some(executorRemoved.reason)
+    executorEvents += executorRemoved
+    if (executorEvents.size > maxTimelineExecutors) {
+      executorEvents.remove(0)
+    }
+    executorToTaskSummary.get(executorRemoved.executorId).foreach(e => 
e.isAlive = false)
   }
 
   override def onApplicationStart(applicationStart: 
SparkListenerApplicationStart): Unit = {
@@ -87,19 +106,25 @@ class ExecutorsListener(storageStatusListener: 
StorageStatusListener, conf: Spar
         s.blockManagerId.executorId == SparkContext.LEGACY_DRIVER_IDENTIFIER ||
         s.blockManagerId.executorId == SparkContext.DRIVER_IDENTIFIER
       }
-      storageStatus.foreach { s => 
executorToLogUrls(s.blockManagerId.executorId) = logs.toMap }
+      storageStatus.foreach { s =>
+        val eid = s.blockManagerId.executorId
+        val taskSummary = executorToTaskSummary.getOrElseUpdate(eid, 
ExecutorTaskSummary(eid))
+        taskSummary.executorLogs = logs.toMap
+      }
     }
   }
 
   override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = 
synchronized {
     val eid = taskStart.taskInfo.executorId
-    executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 0) + 1
+    val taskSummary = executorToTaskSummary.getOrElseUpdate(eid, 
ExecutorTaskSummary(eid))
+    taskSummary.tasksActive += 1
   }
 
   override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized {
     val info = taskEnd.taskInfo
     if (info != null) {
       val eid = info.executorId
+      val taskSummary = executorToTaskSummary.getOrElseUpdate(eid, 
ExecutorTaskSummary(eid))
       taskEnd.reason match {
         case Resubmitted =>
           // Note: For resubmitted tasks, we continue to use the metrics that 
belong to the
@@ -108,31 +133,26 @@ class ExecutorsListener(storageStatusListener: 
StorageStatusListener, conf: Spar
           // metrics added by each attempt, but this is much more complicated.
           return
         case e: ExceptionFailure =>
-          executorToTasksFailed(eid) = executorToTasksFailed.getOrElse(eid, 0) 
+ 1
+          taskSummary.tasksFailed += 1
         case _ =>
-          executorToTasksComplete(eid) = 
executorToTasksComplete.getOrElse(eid, 0) + 1
+          taskSummary.tasksComplete += 1
       }
-
-      executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 1) - 1
-      executorToDuration(eid) = executorToDuration.getOrElse(eid, 0L) + 
info.duration
+      if (taskSummary.tasksActive >= 1) {
+        taskSummary.tasksActive -= 1
+      }
+      taskSummary.duration += info.duration
 
       // Update shuffle read/write
       val metrics = taskEnd.taskMetrics
       if (metrics != null) {
-        executorToInputBytes(eid) =
-          executorToInputBytes.getOrElse(eid, 0L) + 
metrics.inputMetrics.bytesRead
-        executorToInputRecords(eid) =
-          executorToInputRecords.getOrElse(eid, 0L) + 
metrics.inputMetrics.recordsRead
-        executorToOutputBytes(eid) =
-          executorToOutputBytes.getOrElse(eid, 0L) + 
metrics.outputMetrics.bytesWritten
-        executorToOutputRecords(eid) =
-          executorToOutputRecords.getOrElse(eid, 0L) + 
metrics.outputMetrics.recordsWritten
-
-        executorToShuffleRead(eid) =
-          executorToShuffleRead.getOrElse(eid, 0L) + 
metrics.shuffleReadMetrics.remoteBytesRead
-        executorToShuffleWrite(eid) =
-          executorToShuffleWrite.getOrElse(eid, 0L) + 
metrics.shuffleWriteMetrics.bytesWritten
-        executorToJvmGCTime(eid) = executorToJvmGCTime.getOrElse(eid, 0L) + 
metrics.jvmGCTime
+        taskSummary.inputBytes += metrics.inputMetrics.bytesRead
+        taskSummary.inputRecords += metrics.inputMetrics.recordsRead
+        taskSummary.outputBytes += metrics.outputMetrics.bytesWritten
+        taskSummary.outputRecords += metrics.outputMetrics.recordsWritten
+
+        taskSummary.shuffleRead += metrics.shuffleReadMetrics.remoteBytesRead
+        taskSummary.shuffleWrite += metrics.shuffleWriteMetrics.bytesWritten
+        taskSummary.jvmGCTime += metrics.jvmGCTime
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/ad79fc0a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
index e5363ce..c04964e 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
@@ -28,9 +28,9 @@ import scala.xml._
 import org.apache.commons.lang3.StringEscapeUtils
 
 import org.apache.spark.JobExecutionStatus
-import org.apache.spark.scheduler.StageInfo
+import org.apache.spark.scheduler._
 import org.apache.spark.ui._
-import org.apache.spark.ui.jobs.UIData.{ExecutorUIData, JobUIData, StageUIData}
+import org.apache.spark.ui.jobs.UIData.{JobUIData, StageUIData}
 import org.apache.spark.util.Utils
 
 /** Page showing list of all ongoing and recently finished jobs */
@@ -123,55 +123,55 @@ private[ui] class AllJobsPage(parent: JobsTab) extends 
WebUIPage("") {
     }
   }
 
-  private def makeExecutorEvent(executorUIDatas: HashMap[String, 
ExecutorUIData]): Seq[String] = {
+  private def makeExecutorEvent(executorUIDatas: Seq[SparkListenerEvent]):
+      Seq[String] = {
     val events = ListBuffer[String]()
     executorUIDatas.foreach {
-      case (executorId, event) =>
+      case a: SparkListenerExecutorAdded =>
         val addedEvent =
           s"""
              |{
              |  'className': 'executor added',
              |  'group': 'executors',
-             |  'start': new Date(${event.startTime}),
+             |  'start': new Date(${a.time}),
              |  'content': '<div class="executor-event-content"' +
              |    'data-toggle="tooltip" data-placement="bottom"' +
-             |    'data-title="Executor ${executorId}<br>' +
-             |    'Added at ${UIUtils.formatDate(new Date(event.startTime))}"' 
+
-             |    'data-html="true">Executor ${executorId} added</div>'
+             |    'data-title="Executor ${a.executorId}<br>' +
+             |    'Added at ${UIUtils.formatDate(new Date(a.time))}"' +
+             |    'data-html="true">Executor ${a.executorId} added</div>'
              |}
            """.stripMargin
         events += addedEvent
+      case e: SparkListenerExecutorRemoved =>
+        val removedEvent =
+          s"""
+             |{
+             |  'className': 'executor removed',
+             |  'group': 'executors',
+             |  'start': new Date(${e.time}),
+             |  'content': '<div class="executor-event-content"' +
+             |    'data-toggle="tooltip" data-placement="bottom"' +
+             |    'data-title="Executor ${e.executorId}<br>' +
+             |    'Removed at ${UIUtils.formatDate(new Date(e.time))}' +
+             |    '${
+                      if (e.reason != null) {
+                        s"""<br>Reason: ${e.reason.replace("\n", " ")}"""
+                      } else {
+                        ""
+                      }
+                   }"' +
+             |    'data-html="true">Executor ${e.executorId} removed</div>'
+             |}
+           """.stripMargin
+        events += removedEvent
 
-        if (event.finishTime.isDefined) {
-          val removedEvent =
-            s"""
-               |{
-               |  'className': 'executor removed',
-               |  'group': 'executors',
-               |  'start': new Date(${event.finishTime.get}),
-               |  'content': '<div class="executor-event-content"' +
-               |    'data-toggle="tooltip" data-placement="bottom"' +
-               |    'data-title="Executor ${executorId}<br>' +
-               |    'Removed at ${UIUtils.formatDate(new 
Date(event.finishTime.get))}' +
-               |    '${
-                        if (event.finishReason.isDefined) {
-                          s"""<br>Reason: 
${event.finishReason.get.replace("\n", " ")}"""
-                        } else {
-                          ""
-                        }
-                     }"' +
-               |    'data-html="true">Executor ${executorId} removed</div>'
-               |}
-             """.stripMargin
-          events += removedEvent
-        }
     }
     events.toSeq
   }
 
   private def makeTimeline(
       jobs: Seq[JobUIData],
-      executors: HashMap[String, ExecutorUIData],
+      executors: Seq[SparkListenerEvent],
       startTime: Long): Seq[Node] = {
 
     val jobEventJsonAsStrSeq = makeJobEvent(jobs)
@@ -353,7 +353,7 @@ private[ui] class AllJobsPage(parent: JobsTab) extends 
WebUIPage("") {
       var content = summary
       val executorListener = parent.executorListener
       content ++= makeTimeline(activeJobs ++ completedJobs ++ failedJobs,
-          executorListener.executorIdToData, startTime)
+          executorListener.executorEvents, startTime)
 
       if (shouldShowActiveJobs) {
         content ++= <h4 id="active">Active Jobs ({activeJobs.size})</h4> ++

http://git-wip-us.apache.org/repos/asf/spark/blob/ad79fc0a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
index 133c3b1..9fb3f35 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
@@ -118,7 +118,8 @@ private[ui] class ExecutorTable(stageId: Int, 
stageAttemptId: Int, parent: Stage
               <div style="float: left">{k}</div>
               <div style="float: right">
               {
-                val logs = 
parent.executorsListener.executorToLogUrls.getOrElse(k, Map.empty)
+                val logs = 
parent.executorsListener.executorToTaskSummary.get(k)
+                  .map(_.executorLogs).getOrElse(Map.empty)
                 logs.map {
                   case (logName, logUrl) => <div><a 
href={logUrl}>{logName}</a></div>
                 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ad79fc0a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
index 0ec42d6..2f7f897 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
@@ -20,15 +20,14 @@ package org.apache.spark.ui.jobs
 import java.util.Date
 import javax.servlet.http.HttpServletRequest
 
-import scala.collection.mutable.{Buffer, HashMap, ListBuffer}
+import scala.collection.mutable.{Buffer, ListBuffer}
 import scala.xml.{Node, NodeSeq, Unparsed, Utility}
 
 import org.apache.commons.lang3.StringEscapeUtils
 
 import org.apache.spark.JobExecutionStatus
-import org.apache.spark.scheduler.StageInfo
+import org.apache.spark.scheduler._
 import org.apache.spark.ui.{ToolTips, UIUtils, WebUIPage}
-import org.apache.spark.ui.jobs.UIData.ExecutorUIData
 
 /** Page showing statistics and stage list for a given job */
 private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") {
@@ -93,55 +92,55 @@ private[ui] class JobPage(parent: JobsTab) extends 
WebUIPage("job") {
     }
   }
 
-  def makeExecutorEvent(executorUIDatas: HashMap[String, ExecutorUIData]): 
Seq[String] = {
+  def makeExecutorEvent(executorUIDatas: Seq[SparkListenerEvent]): Seq[String] 
= {
     val events = ListBuffer[String]()
     executorUIDatas.foreach {
-      case (executorId, event) =>
+      case a: SparkListenerExecutorAdded =>
         val addedEvent =
           s"""
              |{
              |  'className': 'executor added',
              |  'group': 'executors',
-             |  'start': new Date(${event.startTime}),
+             |  'start': new Date(${a.time}),
              |  'content': '<div class="executor-event-content"' +
              |    'data-toggle="tooltip" data-placement="bottom"' +
-             |    'data-title="Executor ${executorId}<br>' +
-             |    'Added at ${UIUtils.formatDate(new Date(event.startTime))}"' 
+
-             |    'data-html="true">Executor ${executorId} added</div>'
+             |    'data-title="Executor ${a.executorId}<br>' +
+             |    'Added at ${UIUtils.formatDate(new Date(a.time))}"' +
+             |    'data-html="true">Executor ${a.executorId} added</div>'
              |}
            """.stripMargin
         events += addedEvent
 
-        if (event.finishTime.isDefined) {
-          val removedEvent =
-            s"""
-               |{
-               |  'className': 'executor removed',
-               |  'group': 'executors',
-               |  'start': new Date(${event.finishTime.get}),
-               |  'content': '<div class="executor-event-content"' +
-               |    'data-toggle="tooltip" data-placement="bottom"' +
-               |    'data-title="Executor ${executorId}<br>' +
-               |    'Removed at ${UIUtils.formatDate(new 
Date(event.finishTime.get))}' +
-               |    '${
-                        if (event.finishReason.isDefined) {
-                          s"""<br>Reason: 
${event.finishReason.get.replace("\n", " ")}"""
-                        } else {
-                          ""
-                        }
-                     }"' +
-               |    'data-html="true">Executor ${executorId} removed</div>'
-               |}
-             """.stripMargin
-            events += removedEvent
-        }
+      case e: SparkListenerExecutorRemoved =>
+        val removedEvent =
+          s"""
+             |{
+             |  'className': 'executor removed',
+             |  'group': 'executors',
+             |  'start': new Date(${e.time}),
+             |  'content': '<div class="executor-event-content"' +
+             |    'data-toggle="tooltip" data-placement="bottom"' +
+             |    'data-title="Executor ${e.executorId}<br>' +
+             |    'Removed at ${UIUtils.formatDate(new Date(e.time))}' +
+             |    '${
+                      if (e.reason != null) {
+                        s"""<br>Reason: ${e.reason.replace("\n", " ")}"""
+                      } else {
+                        ""
+                      }
+                   }"' +
+             |    'data-html="true">Executor ${e.executorId} removed</div>'
+             |}
+           """.stripMargin
+          events += removedEvent
+
     }
     events.toSeq
   }
 
   private def makeTimeline(
       stages: Seq[StageInfo],
-      executors: HashMap[String, ExecutorUIData],
+      executors: Seq[SparkListenerEvent],
       appStartTime: Long): Seq[Node] = {
 
     val stageEventJsonAsStrSeq = makeStageEvent(stages)
@@ -319,7 +318,7 @@ private[ui] class JobPage(parent: JobsTab) extends 
WebUIPage("job") {
       val operationGraphListener = parent.operationGraphListener
 
       content ++= makeTimeline(activeStages ++ completedStages ++ failedStages,
-          executorListener.executorIdToData, appStartTime)
+          executorListener.executorEvents, appStartTime)
 
       content ++= UIUtils.showDagVizForJob(
         jobId, operationGraphListener.getOperationGraphForJob(jobId))

http://git-wip-us.apache.org/repos/asf/spark/blob/ad79fc0a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index de787f2..c322ae0 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -1017,8 +1017,8 @@ private[ui] class TaskDataSource(
         None
       }
 
-    val logs = executorsListener.executorToLogUrls.getOrElse(info.executorId, 
Map.empty)
-
+    val logs = executorsListener.executorToTaskSummary.get(info.executorId)
+      .map(_.executorLogs).getOrElse(Map.empty)
     new TaskTableRowData(
       info.index,
       info.taskId,

http://git-wip-us.apache.org/repos/asf/spark/blob/ad79fc0a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
index 74bca99..c729f03 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
@@ -177,11 +177,6 @@ private[spark] object UIData {
     }
   }
 
-  class ExecutorUIData(
-      val startTime: Long,
-      var finishTime: Option[Long] = None,
-      var finishReason: Option[String] = None)
-
   case class TaskMetricsUIData(
       executorDeserializeTime: Long,
       executorRunTime: Long,

http://git-wip-us.apache.org/repos/asf/spark/blob/ad79fc0a/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index fbd78ae..37fff2e 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -426,6 +426,18 @@ object MimaExcludes {
       
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.storage.StorageStatusListener.this"),
       
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.streaming.scheduler.BatchInfo.streamIdToNumRecords"),
       
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.storageStatusList"),
+      
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.executorIdToData"),
+      
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.executorToTasksActive"),
+      
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.executorToTasksComplete"),
+      
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.executorToInputRecords"),
+      
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.executorToShuffleRead"),
+      
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.executorToTasksFailed"),
+      
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.executorToShuffleWrite"),
+      
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.executorToDuration"),
+      
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.executorToInputBytes"),
+      
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.executorToLogUrls"),
+      
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.executorToOutputBytes"),
+      
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.executorToOutputRecords"),
       
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.this"),
       
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.storage.StorageListener.storageStatusList"),
       
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ExceptionFailure.apply"),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to