[SPARK-20648][CORE] Port JobsTab and StageTab to the new UI backend.

This change is a little larger because there's a whole lot of logic
behind these pages, all really tied to internal types and listeners,
and some of that logic had to be implemented in the new listener and
the needed data exposed through the API types.

- Added missing StageData and ExecutorStageSummary fields which are
  used by the UI. Some json golden files needed to be updated to account
  for new fields.

- Save RDD graph data in the store. This tries to re-use existing types as
  much as possible, so that the code doesn't need to be re-written. So it's
  probably not very optimal.

- Some old classes (e.g. JobProgressListener) still remain, since they're used
  in other parts of the code; they're not used by the UI anymore, though, and
  will be cleaned up in a separate change.

- Save information about active pools in the store. This data is not really used
  in the SHS, but it's not a lot of data so it's still recorded when replaying
  applications.

- Because the new store sorts things slightly differently from the previous
  code, some json golden files had some elements within them shuffled around.

- The retention unit test in UISeleniumSuite was disabled because the code
  to throw away old stages / tasks hasn't been added yet.

- The job description field in the API tries to follow the old behavior, which
  makes it be empty most of the time, even though there's information to fill it
  in. For stages, a new field was added to hold the description (which is 
basically
  the job description), so that the UI can be rendered in the old way.

- A new stage status ("SKIPPED") was added to account for the fact that the API
  couldn't represent that state before. Without this, the stage would show up as
  "PENDING" in the UI, which is now based on API types.

- The API used to expose "executorRunTime" as the value of the task's duration,
  which wasn't really correct (also because that value was easily available
  from the metrics object); this change fixes that by storing the correct 
duration,
  which also means a few expectation files needed to be updated to account for
  the new durations and sorting differences due to the changed values.

- Added changes to implement SPARK-20713 and SPARK-21922 in the new code.

Tested with existing unit tests (and by using the UI a lot).

Author: Marcelo Vanzin <van...@cloudera.com>

Closes #19698 from vanzin/SPARK-20648.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4741c078
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4741c078
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4741c078

Branch: refs/heads/master
Commit: 4741c07809393ab85be8b4a169d4ed3da93a4781
Parents: 11b60af
Author: Marcelo Vanzin <van...@cloudera.com>
Authored: Tue Nov 14 10:34:32 2017 -0600
Committer: Imran Rashid <iras...@cloudera.com>
Committed: Tue Nov 14 10:34:32 2017 -0600

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   |    8 +-
 .../deploy/history/FsHistoryProvider.scala      |   10 +-
 .../apache/spark/status/AppStatusListener.scala |  121 +-
 .../apache/spark/status/AppStatusStore.scala    |  135 ++-
 .../org/apache/spark/status/LiveEntity.scala    |   57 +-
 .../spark/status/api/v1/AllJobsResource.scala   |   70 +-
 .../spark/status/api/v1/AllStagesResource.scala |  290 +----
 .../spark/status/api/v1/OneJobResource.scala    |   15 +-
 .../spark/status/api/v1/OneStageResource.scala  |  112 +-
 .../org/apache/spark/status/api/v1/api.scala    |   21 +-
 .../scala/org/apache/spark/status/config.scala  |    4 +
 .../org/apache/spark/status/storeTypes.scala    |   40 +
 .../scala/org/apache/spark/ui/SparkUI.scala     |   30 +-
 .../org/apache/spark/ui/jobs/AllJobsPage.scala  |  286 +++--
 .../apache/spark/ui/jobs/AllStagesPage.scala    |  189 ++--
 .../apache/spark/ui/jobs/ExecutorTable.scala    |  158 +--
 .../org/apache/spark/ui/jobs/JobPage.scala      |  326 +++---
 .../org/apache/spark/ui/jobs/JobsTab.scala      |   38 +-
 .../org/apache/spark/ui/jobs/PoolPage.scala     |   57 +-
 .../org/apache/spark/ui/jobs/PoolTable.scala    |   34 +-
 .../org/apache/spark/ui/jobs/StagePage.scala    | 1064 +++++++++---------
 .../org/apache/spark/ui/jobs/StageTable.scala   |  100 +-
 .../org/apache/spark/ui/jobs/StagesTab.scala    |   28 +-
 .../spark/ui/scope/RDDOperationGraph.scala      |   10 +-
 .../ui/scope/RDDOperationGraphListener.scala    |  150 ---
 .../complete_stage_list_json_expectation.json   |   21 +-
 .../failed_stage_list_json_expectation.json     |    8 +-
 ...m_multi_attempt_app_json_1__expectation.json |    5 +-
 ...m_multi_attempt_app_json_2__expectation.json |    5 +-
 .../job_list_json_expectation.json              |   15 +-
 .../one_job_json_expectation.json               |    5 +-
 .../one_stage_attempt_json_expectation.json     |  126 ++-
 .../one_stage_json_expectation.json             |  126 ++-
 .../stage_list_json_expectation.json            |   79 +-
 ..._list_with_accumulable_json_expectation.json |    7 +-
 .../stage_task_list_expectation.json            |   60 +-
 ...m_multi_attempt_app_json_1__expectation.json |   24 +-
 ...m_multi_attempt_app_json_2__expectation.json |   24 +-
 ...ask_list_w__offset___length_expectation.json |  150 ++-
 .../stage_task_list_w__sortBy_expectation.json  |  190 ++--
 ...ortBy_short_names___runtime_expectation.json |  190 ++--
 ...sortBy_short_names__runtime_expectation.json |   60 +-
 ...stage_with_accumulable_json_expectation.json |   36 +-
 ...ceeded_failed_job_list_json_expectation.json |   15 +-
 .../succeeded_job_list_json_expectation.json    |   10 +-
 .../deploy/history/HistoryServerSuite.scala     |   13 +-
 .../spark/status/AppStatusListenerSuite.scala   |   80 +-
 .../status/api/v1/AllStagesResourceSuite.scala  |   62 -
 .../org/apache/spark/ui/StagePageSuite.scala    |   64 +-
 .../org/apache/spark/ui/UISeleniumSuite.scala   |   60 +-
 .../scope/RDDOperationGraphListenerSuite.scala  |  226 ----
 project/MimaExcludes.scala                      |    2 +
 52 files changed, 2359 insertions(+), 2657 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4741c078/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index e5aaaf6..1d325e6 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -426,7 +426,7 @@ class SparkContext(config: SparkConf) extends Logging {
 
     // Initialize the app status store and listener before SparkEnv is created 
so that it gets
     // all events.
-    _statusStore = AppStatusStore.createLiveStore(conf, listenerBus)
+    _statusStore = AppStatusStore.createLiveStore(conf, l => 
listenerBus.addToStatusQueue(l))
 
     // Create the Spark execution environment (cache, map output tracker, etc)
     _env = createSparkEnv(_conf, isLocal, listenerBus)
@@ -449,11 +449,7 @@ class SparkContext(config: SparkConf) extends Logging {
 
     _ui =
       if (conf.getBoolean("spark.ui.enabled", true)) {
-        Some(SparkUI.create(Some(this), _statusStore, _conf,
-          l => listenerBus.addToStatusQueue(l),
-          _env.securityManager,
-          appName,
-          "",
+        Some(SparkUI.create(Some(this), _statusStore, _conf, 
_env.securityManager, appName, "",
           startTime))
       } else {
         // For tests, do not enable the UI

http://git-wip-us.apache.org/repos/asf/spark/blob/4741c078/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index f16ddde..a6dc533 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -316,7 +316,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
     }
 
     val listener = if (needReplay) {
-      val _listener = new AppStatusListener(kvstore, conf, false)
+      val _listener = new AppStatusListener(kvstore, conf, false,
+        lastUpdateTime = Some(attempt.info.lastUpdated.getTime()))
       replayBus.addListener(_listener)
       Some(_listener)
     } else {
@@ -324,13 +325,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
     }
 
     val loadedUI = {
-      val ui = SparkUI.create(None, new AppStatusStore(kvstore), conf,
-        l => replayBus.addListener(l),
-        secManager,
-        app.info.name,
+      val ui = SparkUI.create(None, new AppStatusStore(kvstore), conf, 
secManager, app.info.name,
         HistoryServer.getAttemptURI(appId, attempt.info.attemptId),
         attempt.info.startTime.getTime(),
-        appSparkVersion = attempt.info.appSparkVersion)
+        attempt.info.appSparkVersion)
       LoadedAppUI(ui)
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/4741c078/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala 
b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
index 7f2c00c..f2d8e0a 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
@@ -28,16 +28,21 @@ import org.apache.spark.scheduler._
 import org.apache.spark.status.api.v1
 import org.apache.spark.storage._
 import org.apache.spark.ui.SparkUI
+import org.apache.spark.ui.scope._
 import org.apache.spark.util.kvstore.KVStore
 
 /**
  * A Spark listener that writes application information to a data store. The 
types written to the
  * store are defined in the `storeTypes.scala` file and are based on the 
public REST API.
+ *
+ * @param lastUpdateTime When replaying logs, the log's last update time, so 
that the duration of
+ *                       unfinished tasks can be more accurately calculated 
(see SPARK-21922).
  */
 private[spark] class AppStatusListener(
     kvstore: KVStore,
     conf: SparkConf,
-    live: Boolean) extends SparkListener with Logging {
+    live: Boolean,
+    lastUpdateTime: Option[Long] = None) extends SparkListener with Logging {
 
   import config._
 
@@ -50,6 +55,8 @@ private[spark] class AppStatusListener(
   // operations that we can live without when rapidly processing incoming task 
events.
   private val liveUpdatePeriodNs = if (live) 
conf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L
 
+  private val maxGraphRootNodes = conf.get(MAX_RETAINED_ROOT_NODES)
+
   // Keep track of live entities, so that task metrics can be efficiently 
updated (without
   // causing too many writes to the underlying store, and other expensive 
operations).
   private val liveStages = new HashMap[(Int, Int), LiveStage]()
@@ -57,6 +64,7 @@ private[spark] class AppStatusListener(
   private val liveExecutors = new HashMap[String, LiveExecutor]()
   private val liveTasks = new HashMap[Long, LiveTask]()
   private val liveRDDs = new HashMap[Int, LiveRDD]()
+  private val pools = new HashMap[String, SchedulerPool]()
 
   override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
     case SparkListenerLogStart(version) => sparkVersion = version
@@ -210,16 +218,15 @@ private[spark] class AppStatusListener(
       missingStages.map(_.numTasks).sum
     }
 
-    val lastStageInfo = event.stageInfos.lastOption
+    val lastStageInfo = event.stageInfos.sortBy(_.stageId).lastOption
     val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown Stage 
Name)")
-
     val jobGroup = Option(event.properties)
       .flatMap { p => Option(p.getProperty(SparkContext.SPARK_JOB_GROUP_ID)) }
 
     val job = new LiveJob(
       event.jobId,
       lastStageName,
-      Some(new Date(event.time)),
+      if (event.time > 0) Some(new Date(event.time)) else None,
       event.stageIds,
       jobGroup,
       numTasks)
@@ -234,17 +241,51 @@ private[spark] class AppStatusListener(
       stage.jobIds += event.jobId
       liveUpdate(stage, now)
     }
+
+    // Create the graph data for all the job's stages.
+    event.stageInfos.foreach { stage =>
+      val graph = RDDOperationGraph.makeOperationGraph(stage, 
maxGraphRootNodes)
+      val uigraph = new RDDOperationGraphWrapper(
+        stage.stageId,
+        graph.edges,
+        graph.outgoingEdges,
+        graph.incomingEdges,
+        newRDDOperationCluster(graph.rootCluster))
+      kvstore.write(uigraph)
+    }
+  }
+
+  private def newRDDOperationCluster(cluster: RDDOperationCluster): 
RDDOperationClusterWrapper = {
+    new RDDOperationClusterWrapper(
+      cluster.id,
+      cluster.name,
+      cluster.childNodes,
+      cluster.childClusters.map(newRDDOperationCluster))
   }
 
   override def onJobEnd(event: SparkListenerJobEnd): Unit = {
     liveJobs.remove(event.jobId).foreach { job =>
+      val now = System.nanoTime()
+
+      // Check if there are any pending stages that match this job; mark those 
as skipped.
+      job.stageIds.foreach { sid =>
+        val pending = liveStages.filter { case ((id, _), _) => id == sid }
+        pending.foreach { case (key, stage) =>
+          stage.status = v1.StageStatus.SKIPPED
+          job.skippedStages += stage.info.stageId
+          job.skippedTasks += stage.info.numTasks
+          liveStages.remove(key)
+          update(stage, now)
+        }
+      }
+
       job.status = event.jobResult match {
         case JobSucceeded => JobExecutionStatus.SUCCEEDED
         case JobFailed(_) => JobExecutionStatus.FAILED
       }
 
-      job.completionTime = Some(new Date(event.time))
-      update(job, System.nanoTime())
+      job.completionTime = if (event.time > 0) Some(new Date(event.time)) else 
None
+      update(job, now)
     }
   }
 
@@ -262,12 +303,24 @@ private[spark] class AppStatusListener(
       .toSeq
     stage.jobIds = stage.jobs.map(_.jobId).toSet
 
+    stage.schedulingPool = Option(event.properties).flatMap { p =>
+      Option(p.getProperty("spark.scheduler.pool"))
+    }.getOrElse(SparkUI.DEFAULT_POOL_NAME)
+
+    stage.description = Option(event.properties).flatMap { p =>
+      Option(p.getProperty(SparkContext.SPARK_JOB_DESCRIPTION))
+    }
+
     stage.jobs.foreach { job =>
       job.completedStages = job.completedStages - event.stageInfo.stageId
       job.activeStages += 1
       liveUpdate(job, now)
     }
 
+    val pool = pools.getOrElseUpdate(stage.schedulingPool, new 
SchedulerPool(stage.schedulingPool))
+    pool.stageIds = pool.stageIds + event.stageInfo.stageId
+    update(pool, now)
+
     event.stageInfo.rddInfos.foreach { info =>
       if (info.storageLevel.isValid) {
         liveUpdate(liveRDDs.getOrElseUpdate(info.id, new LiveRDD(info)), now)
@@ -279,7 +332,7 @@ private[spark] class AppStatusListener(
 
   override def onTaskStart(event: SparkListenerTaskStart): Unit = {
     val now = System.nanoTime()
-    val task = new LiveTask(event.taskInfo, event.stageId, 
event.stageAttemptId)
+    val task = new LiveTask(event.taskInfo, event.stageId, 
event.stageAttemptId, lastUpdateTime)
     liveTasks.put(event.taskInfo.taskId, task)
     liveUpdate(task, now)
 
@@ -318,6 +371,8 @@ private[spark] class AppStatusListener(
     val now = System.nanoTime()
 
     val metricsDelta = liveTasks.remove(event.taskInfo.taskId).map { task =>
+      task.info = event.taskInfo
+
       val errorMessage = event.reason match {
         case Success =>
           None
@@ -337,11 +392,15 @@ private[spark] class AppStatusListener(
       delta
     }.orNull
 
-    val (completedDelta, failedDelta) = event.reason match {
+    val (completedDelta, failedDelta, killedDelta) = event.reason match {
       case Success =>
-        (1, 0)
+        (1, 0, 0)
+      case _: TaskKilled =>
+        (0, 0, 1)
+      case _: TaskCommitDenied =>
+        (0, 0, 1)
       case _ =>
-        (0, 1)
+        (0, 1, 0)
     }
 
     liveStages.get((event.stageId, event.stageAttemptId)).foreach { stage =>
@@ -350,13 +409,29 @@ private[spark] class AppStatusListener(
       }
       stage.activeTasks -= 1
       stage.completedTasks += completedDelta
+      if (completedDelta > 0) {
+        stage.completedIndices.add(event.taskInfo.index)
+      }
       stage.failedTasks += failedDelta
+      stage.killedTasks += killedDelta
+      if (killedDelta > 0) {
+        stage.killedSummary = killedTasksSummary(event.reason, 
stage.killedSummary)
+      }
       maybeUpdate(stage, now)
 
+      // Store both stage ID and task index in a single long variable for 
tracking at job level.
+      val taskIndex = (event.stageId.toLong << Integer.SIZE) | 
event.taskInfo.index
       stage.jobs.foreach { job =>
         job.activeTasks -= 1
         job.completedTasks += completedDelta
+        if (completedDelta > 0) {
+          job.completedIndices.add(taskIndex)
+        }
         job.failedTasks += failedDelta
+        job.killedTasks += killedDelta
+        if (killedDelta > 0) {
+          job.killedSummary = killedTasksSummary(event.reason, 
job.killedSummary)
+        }
         maybeUpdate(job, now)
       }
 
@@ -364,6 +439,7 @@ private[spark] class AppStatusListener(
       esummary.taskTime += event.taskInfo.duration
       esummary.succeededTasks += completedDelta
       esummary.failedTasks += failedDelta
+      esummary.killedTasks += killedDelta
       if (metricsDelta != null) {
         esummary.metrics.update(metricsDelta)
       }
@@ -422,6 +498,11 @@ private[spark] class AppStatusListener(
         liveUpdate(job, now)
       }
 
+      pools.get(stage.schedulingPool).foreach { pool =>
+        pool.stageIds = pool.stageIds - event.stageInfo.stageId
+        update(pool, now)
+      }
+
       stage.executorSummaries.values.foreach(update(_, now))
       update(stage, now)
     }
@@ -482,11 +563,15 @@ private[spark] class AppStatusListener(
   /** Flush all live entities' data to the underlying store. */
   def flush(): Unit = {
     val now = System.nanoTime()
-    liveStages.values.foreach(update(_, now))
+    liveStages.values.foreach { stage =>
+      update(stage, now)
+      stage.executorSummaries.values.foreach(update(_, now))
+    }
     liveJobs.values.foreach(update(_, now))
     liveExecutors.values.foreach(update(_, now))
     liveTasks.values.foreach(update(_, now))
     liveRDDs.values.foreach(update(_, now))
+    pools.values.foreach(update(_, now))
   }
 
   private def updateRDDBlock(event: SparkListenerBlockUpdated, block: 
RDDBlockId): Unit = {
@@ -628,6 +713,20 @@ private[spark] class AppStatusListener(
     stage
   }
 
+  private def killedTasksSummary(
+      reason: TaskEndReason,
+      oldSummary: Map[String, Int]): Map[String, Int] = {
+    reason match {
+      case k: TaskKilled =>
+        oldSummary.updated(k.reason, oldSummary.getOrElse(k.reason, 0) + 1)
+      case denied: TaskCommitDenied =>
+        val reason = denied.toErrorString
+        oldSummary.updated(reason, oldSummary.getOrElse(reason, 0) + 1)
+      case _ =>
+        oldSummary
+    }
+  }
+
   private def update(entity: LiveEntity, now: Long): Unit = {
     entity.write(kvstore, now)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/4741c078/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala 
b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
index 80c8d7d..9b42f55 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
@@ -23,8 +23,9 @@ import java.util.{Arrays, List => JList}
 import scala.collection.JavaConverters._
 
 import org.apache.spark.{JobExecutionStatus, SparkConf}
-import org.apache.spark.scheduler.LiveListenerBus
+import org.apache.spark.scheduler.SparkListener
 import org.apache.spark.status.api.v1
+import org.apache.spark.ui.scope._
 import org.apache.spark.util.{Distribution, Utils}
 import org.apache.spark.util.kvstore.{InMemoryStore, KVStore}
 
@@ -43,8 +44,8 @@ private[spark] class AppStatusStore(store: KVStore) {
   }
 
   def jobsList(statuses: JList[JobExecutionStatus]): Seq[v1.JobData] = {
-    val it = store.view(classOf[JobDataWrapper]).asScala.map(_.info)
-    if (!statuses.isEmpty()) {
+    val it = store.view(classOf[JobDataWrapper]).reverse().asScala.map(_.info)
+    if (statuses != null && !statuses.isEmpty()) {
       it.filter { job => statuses.contains(job.status) }.toSeq
     } else {
       it.toSeq
@@ -65,31 +66,40 @@ private[spark] class AppStatusStore(store: KVStore) {
     filtered.asScala.map(_.info).toSeq
   }
 
-  def executorSummary(executorId: String): Option[v1.ExecutorSummary] = {
-    try {
-      Some(store.read(classOf[ExecutorSummaryWrapper], executorId).info)
-    } catch {
-      case _: NoSuchElementException =>
-        None
-    }
+  def executorSummary(executorId: String): v1.ExecutorSummary = {
+    store.read(classOf[ExecutorSummaryWrapper], executorId).info
   }
 
   def stageList(statuses: JList[v1.StageStatus]): Seq[v1.StageData] = {
-    val it = store.view(classOf[StageDataWrapper]).asScala.map(_.info)
-    if (!statuses.isEmpty) {
+    val it = 
store.view(classOf[StageDataWrapper]).reverse().asScala.map(_.info)
+    if (statuses != null && !statuses.isEmpty()) {
       it.filter { s => statuses.contains(s.status) }.toSeq
     } else {
       it.toSeq
     }
   }
 
-  def stageData(stageId: Int): Seq[v1.StageData] = {
+  def stageData(stageId: Int, details: Boolean = false): Seq[v1.StageData] = {
     
store.view(classOf[StageDataWrapper]).index("stageId").first(stageId).last(stageId)
-      .asScala.map(_.info).toSeq
+      .asScala.map { s =>
+        if (details) stageWithDetails(s.info) else s.info
+      }.toSeq
+  }
+
+  def lastStageAttempt(stageId: Int): v1.StageData = {
+    val it = 
store.view(classOf[StageDataWrapper]).index("stageId").reverse().first(stageId)
+      .closeableIterator()
+    try {
+      it.next().info
+    } finally {
+      it.close()
+    }
   }
 
-  def stageAttempt(stageId: Int, stageAttemptId: Int): v1.StageData = {
-    store.read(classOf[StageDataWrapper], Array(stageId, stageAttemptId)).info
+  def stageAttempt(stageId: Int, stageAttemptId: Int, details: Boolean = 
false): v1.StageData = {
+    val stageKey = Array(stageId, stageAttemptId)
+    val stage = store.read(classOf[StageDataWrapper], stageKey).info
+    if (details) stageWithDetails(stage) else stage
   }
 
   def taskSummary(
@@ -189,6 +199,12 @@ private[spark] class AppStatusStore(store: KVStore) {
     )
   }
 
+  def taskList(stageId: Int, stageAttemptId: Int, maxTasks: Int): 
Seq[v1.TaskData] = {
+    val stageKey = Array(stageId, stageAttemptId)
+    
store.view(classOf[TaskDataWrapper]).index("stage").first(stageKey).last(stageKey).reverse()
+      .max(maxTasks).asScala.map(_.info).toSeq.reverse
+  }
+
   def taskList(
       stageId: Int,
       stageAttemptId: Int,
@@ -215,6 +231,66 @@ private[spark] class AppStatusStore(store: KVStore) {
     }.toSeq
   }
 
+  /**
+   * Calls a closure that may throw a NoSuchElementException and returns 
`None` when the exception
+   * is thrown.
+   */
+  def asOption[T](fn: => T): Option[T] = {
+    try {
+      Some(fn)
+    } catch {
+      case _: NoSuchElementException => None
+    }
+  }
+
+  private def stageWithDetails(stage: v1.StageData): v1.StageData = {
+    val tasks = taskList(stage.stageId, stage.attemptId, Int.MaxValue)
+      .map { t => (t.taskId, t) }
+      .toMap
+
+    val stageKey = Array(stage.stageId, stage.attemptId)
+    val execs = 
store.view(classOf[ExecutorStageSummaryWrapper]).index("stage").first(stageKey)
+      .last(stageKey).closeableIterator().asScala
+      .map { exec => (exec.executorId -> exec.info) }
+      .toMap
+
+    new v1.StageData(
+      stage.status,
+      stage.stageId,
+      stage.attemptId,
+      stage.numTasks,
+      stage.numActiveTasks,
+      stage.numCompleteTasks,
+      stage.numFailedTasks,
+      stage.numKilledTasks,
+      stage.numCompletedIndices,
+      stage.executorRunTime,
+      stage.executorCpuTime,
+      stage.submissionTime,
+      stage.firstTaskLaunchedTime,
+      stage.completionTime,
+      stage.failureReason,
+      stage.inputBytes,
+      stage.inputRecords,
+      stage.outputBytes,
+      stage.outputRecords,
+      stage.shuffleReadBytes,
+      stage.shuffleReadRecords,
+      stage.shuffleWriteBytes,
+      stage.shuffleWriteRecords,
+      stage.memoryBytesSpilled,
+      stage.diskBytesSpilled,
+      stage.name,
+      stage.description,
+      stage.details,
+      stage.schedulingPool,
+      stage.rddIds,
+      stage.accumulatorUpdates,
+      Some(tasks),
+      Some(execs),
+      stage.killedTasksSummary)
+  }
+
   def rdd(rddId: Int): v1.RDDStorageInfo = {
     store.read(classOf[RDDStorageInfoWrapper], rddId).info
   }
@@ -223,6 +299,27 @@ private[spark] class AppStatusStore(store: KVStore) {
     store.view(classOf[StreamBlockData]).asScala.toSeq
   }
 
+  def operationGraphForStage(stageId: Int): RDDOperationGraph = {
+    store.read(classOf[RDDOperationGraphWrapper], 
stageId).toRDDOperationGraph()
+  }
+
+  def operationGraphForJob(jobId: Int): Seq[RDDOperationGraph] = {
+    val job = store.read(classOf[JobDataWrapper], jobId)
+    val stages = job.info.stageIds
+
+    stages.map { id =>
+      val g = store.read(classOf[RDDOperationGraphWrapper], 
id).toRDDOperationGraph()
+      if (job.skippedStages.contains(id) && 
!g.rootCluster.name.contains("skipped")) {
+        g.rootCluster.setName(g.rootCluster.name + " (skipped)")
+      }
+      g
+    }
+  }
+
+  def pool(name: String): PoolData = {
+    store.read(classOf[PoolData], name)
+  }
+
   def close(): Unit = {
     store.close()
   }
@@ -237,12 +334,12 @@ private[spark] object AppStatusStore {
    * Create an in-memory store for a live application.
    *
    * @param conf Configuration.
-   * @param bus Where to attach the listener to populate the store.
+   * @param addListenerFn Function to register a listener with a bus.
    */
-  def createLiveStore(conf: SparkConf, bus: LiveListenerBus): AppStatusStore = 
{
+  def createLiveStore(conf: SparkConf, addListenerFn: SparkListener => Unit): 
AppStatusStore = {
     val store = new InMemoryStore()
     val stateStore = new AppStatusStore(store)
-    bus.addToStatusQueue(new AppStatusListener(store, conf, true))
+    addListenerFn(new AppStatusListener(store, conf, true))
     stateStore
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/4741c078/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala 
b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
index 706d94c..ef2936c 100644
--- a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
+++ b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
@@ -28,6 +28,7 @@ import org.apache.spark.status.api.v1
 import org.apache.spark.storage.RDDInfo
 import org.apache.spark.ui.SparkUI
 import org.apache.spark.util.AccumulatorContext
+import org.apache.spark.util.collection.OpenHashSet
 import org.apache.spark.util.kvstore.KVStore
 
 /**
@@ -64,6 +65,12 @@ private class LiveJob(
   var completedTasks = 0
   var failedTasks = 0
 
+  // Holds both the stage ID and the task index, packed into a single long 
value.
+  val completedIndices = new OpenHashSet[Long]()
+
+  var killedTasks = 0
+  var killedSummary: Map[String, Int] = Map()
+
   var skippedTasks = 0
   var skippedStages = Set[Int]()
 
@@ -89,19 +96,23 @@ private class LiveJob(
       completedTasks,
       skippedTasks,
       failedTasks,
+      killedTasks,
+      completedIndices.size,
       activeStages,
       completedStages.size,
       skippedStages.size,
-      failedStages)
+      failedStages,
+      killedSummary)
     new JobDataWrapper(info, skippedStages)
   }
 
 }
 
 private class LiveTask(
-    info: TaskInfo,
+    var info: TaskInfo,
     stageId: Int,
-    stageAttemptId: Int) extends LiveEntity {
+    stageAttemptId: Int,
+    lastUpdateTime: Option[Long]) extends LiveEntity {
 
   import LiveEntityHelpers._
 
@@ -126,6 +137,7 @@ private class LiveTask(
         metrics.resultSerializationTime,
         metrics.memoryBytesSpilled,
         metrics.diskBytesSpilled,
+        metrics.peakExecutionMemory,
         new v1.InputMetrics(
           metrics.inputMetrics.bytesRead,
           metrics.inputMetrics.recordsRead),
@@ -186,6 +198,7 @@ private class LiveTask(
       0L, 0L, 0L,
       metrics.memoryBytesSpilled - old.memoryBytesSpilled,
       metrics.diskBytesSpilled - old.diskBytesSpilled,
+      0L,
       inputDelta,
       outputDelta,
       shuffleReadDelta,
@@ -193,12 +206,19 @@ private class LiveTask(
   }
 
   override protected def doUpdate(): Any = {
+    val duration = if (info.finished) {
+      info.duration
+    } else {
+      info.timeRunning(lastUpdateTime.getOrElse(System.currentTimeMillis()))
+    }
+
     val task = new v1.TaskData(
       info.taskId,
       info.index,
       info.attemptNumber,
       new Date(info.launchTime),
-      if (info.finished) Some(info.duration) else None,
+      if (info.gettingResult) Some(new Date(info.gettingResultTime)) else None,
+      Some(duration),
       info.executorId,
       info.host,
       info.status,
@@ -340,10 +360,15 @@ private class LiveExecutorStageSummary(
       taskTime,
       failedTasks,
       succeededTasks,
+      killedTasks,
       metrics.inputBytes,
+      metrics.inputRecords,
       metrics.outputBytes,
+      metrics.outputRecords,
       metrics.shuffleReadBytes,
+      metrics.shuffleReadRecords,
       metrics.shuffleWriteBytes,
+      metrics.shuffleWriteRecords,
       metrics.memoryBytesSpilled,
       metrics.diskBytesSpilled)
     new ExecutorStageSummaryWrapper(stageId, attemptId, executorId, info)
@@ -361,11 +386,16 @@ private class LiveStage extends LiveEntity {
   var info: StageInfo = null
   var status = v1.StageStatus.PENDING
 
+  var description: Option[String] = None
   var schedulingPool: String = SparkUI.DEFAULT_POOL_NAME
 
   var activeTasks = 0
   var completedTasks = 0
   var failedTasks = 0
+  val completedIndices = new OpenHashSet[Int]()
+
+  var killedTasks = 0
+  var killedSummary: Map[String, Int] = Map()
 
   var firstLaunchTime = Long.MaxValue
 
@@ -384,15 +414,19 @@ private class LiveStage extends LiveEntity {
       info.stageId,
       info.attemptId,
 
+      info.numTasks,
       activeTasks,
       completedTasks,
       failedTasks,
+      killedTasks,
+      completedIndices.size,
 
       metrics.executorRunTime,
       metrics.executorCpuTime,
       info.submissionTime.map(new Date(_)),
       if (firstLaunchTime < Long.MaxValue) Some(new Date(firstLaunchTime)) 
else None,
       info.completionTime.map(new Date(_)),
+      info.failureReason,
 
       metrics.inputBytes,
       metrics.inputRecords,
@@ -406,12 +440,15 @@ private class LiveStage extends LiveEntity {
       metrics.diskBytesSpilled,
 
       info.name,
+      description,
       info.details,
       schedulingPool,
 
+      info.rddInfos.map(_.id),
       newAccumulatorInfos(info.accumulables.values),
       None,
-      None)
+      None,
+      killedSummary)
 
     new StageDataWrapper(update, jobIds)
   }
@@ -535,6 +572,16 @@ private class LiveRDD(val info: RDDInfo) extends 
LiveEntity {
 
 }
 
+private class SchedulerPool(name: String) extends LiveEntity {
+
+  var stageIds = Set[Int]()
+
+  override protected def doUpdate(): Any = {
+    new PoolData(name, stageIds)
+  }
+
+}
+
 private object LiveEntityHelpers {
 
   def newAccumulatorInfos(accums: Iterable[AccumulableInfo]): 
Seq[v1.AccumulableInfo] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/4741c078/core/src/main/scala/org/apache/spark/status/api/v1/AllJobsResource.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/status/api/v1/AllJobsResource.scala 
b/core/src/main/scala/org/apache/spark/status/api/v1/AllJobsResource.scala
index d0d9ef1..b4fa3e6 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/AllJobsResource.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/AllJobsResource.scala
@@ -22,7 +22,6 @@ import javax.ws.rs.core.MediaType
 
 import org.apache.spark.JobExecutionStatus
 import org.apache.spark.ui.SparkUI
-import org.apache.spark.ui.jobs.JobProgressListener
 import org.apache.spark.ui.jobs.UIData.JobUIData
 
 @Produces(Array(MediaType.APPLICATION_JSON))
@@ -30,74 +29,7 @@ private[v1] class AllJobsResource(ui: SparkUI) {
 
   @GET
   def jobsList(@QueryParam("status") statuses: JList[JobExecutionStatus]): 
Seq[JobData] = {
-    val statusToJobs: Seq[(JobExecutionStatus, Seq[JobUIData])] =
-      AllJobsResource.getStatusToJobs(ui)
-    val adjStatuses: JList[JobExecutionStatus] = {
-      if (statuses.isEmpty) {
-        Arrays.asList(JobExecutionStatus.values(): _*)
-      } else {
-        statuses
-      }
-    }
-    val jobInfos = for {
-      (status, jobs) <- statusToJobs
-      job <- jobs if adjStatuses.contains(status)
-    } yield {
-      AllJobsResource.convertJobData(job, ui.jobProgressListener, false)
-    }
-    jobInfos.sortBy{- _.jobId}
+    ui.store.jobsList(statuses)
   }
 
 }
-
-private[v1] object AllJobsResource {
-
-  def getStatusToJobs(ui: SparkUI): Seq[(JobExecutionStatus, Seq[JobUIData])] 
= {
-    val statusToJobs = ui.jobProgressListener.synchronized {
-      Seq(
-        JobExecutionStatus.RUNNING -> 
ui.jobProgressListener.activeJobs.values.toSeq,
-        JobExecutionStatus.SUCCEEDED -> 
ui.jobProgressListener.completedJobs.toSeq,
-        JobExecutionStatus.FAILED -> 
ui.jobProgressListener.failedJobs.reverse.toSeq
-      )
-    }
-    statusToJobs
-  }
-
-  def convertJobData(
-      job: JobUIData,
-      listener: JobProgressListener,
-      includeStageDetails: Boolean): JobData = {
-    listener.synchronized {
-      val lastStageInfo =
-        if (job.stageIds.isEmpty) {
-          None
-        } else {
-          listener.stageIdToInfo.get(job.stageIds.max)
-        }
-      val lastStageData = lastStageInfo.flatMap { s =>
-        listener.stageIdToData.get((s.stageId, s.attemptId))
-      }
-      val lastStageName = lastStageInfo.map { _.name }.getOrElse("(Unknown 
Stage Name)")
-      val lastStageDescription = lastStageData.flatMap { _.description }
-      new JobData(
-        jobId = job.jobId,
-        name = lastStageName,
-        description = lastStageDescription,
-        submissionTime = job.submissionTime.map{new Date(_)},
-        completionTime = job.completionTime.map{new Date(_)},
-        stageIds = job.stageIds,
-        jobGroup = job.jobGroup,
-        status = job.status,
-        numTasks = job.numTasks,
-        numActiveTasks = job.numActiveTasks,
-        numCompletedTasks = job.numCompletedTasks,
-        numSkippedTasks = job.numSkippedTasks,
-        numFailedTasks = job.numFailedTasks,
-        numActiveStages = job.numActiveStages,
-        numCompletedStages = job.completedStageIndices.size,
-        numSkippedStages = job.numSkippedStages,
-        numFailedStages = job.numFailedStages
-      )
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/4741c078/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala 
b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
index 5f69949..e1c91cb 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
@@ -16,304 +16,18 @@
  */
 package org.apache.spark.status.api.v1
 
-import java.util.{Arrays, Date, List => JList}
+import java.util.{List => JList}
 import javax.ws.rs.{GET, Produces, QueryParam}
 import javax.ws.rs.core.MediaType
 
-import org.apache.spark.scheduler.{AccumulableInfo => InternalAccumulableInfo, 
StageInfo}
 import org.apache.spark.ui.SparkUI
-import org.apache.spark.ui.jobs.UIData.{StageUIData, TaskUIData}
-import org.apache.spark.ui.jobs.UIData.{InputMetricsUIData => 
InternalInputMetrics, OutputMetricsUIData => InternalOutputMetrics, 
ShuffleReadMetricsUIData => InternalShuffleReadMetrics, 
ShuffleWriteMetricsUIData => InternalShuffleWriteMetrics, TaskMetricsUIData => 
InternalTaskMetrics}
-import org.apache.spark.util.Distribution
 
 @Produces(Array(MediaType.APPLICATION_JSON))
 private[v1] class AllStagesResource(ui: SparkUI) {
 
   @GET
   def stageList(@QueryParam("status") statuses: JList[StageStatus]): 
Seq[StageData] = {
-    val listener = ui.jobProgressListener
-    val stageAndStatus = AllStagesResource.stagesAndStatus(ui)
-    val adjStatuses = {
-      if (statuses.isEmpty()) {
-        Arrays.asList(StageStatus.values(): _*)
-      } else {
-        statuses
-      }
-    }
-    for {
-      (status, stageList) <- stageAndStatus
-      stageInfo: StageInfo <- stageList if adjStatuses.contains(status)
-      stageUiData: StageUIData <- listener.synchronized {
-        listener.stageIdToData.get((stageInfo.stageId, stageInfo.attemptId))
-      }
-    } yield {
-      stageUiData.lastUpdateTime = ui.lastUpdateTime
-      AllStagesResource.stageUiToStageData(status, stageInfo, stageUiData, 
includeDetails = false)
-    }
+    ui.store.stageList(statuses)
   }
-}
-
-private[v1] object AllStagesResource {
-  def stageUiToStageData(
-      status: StageStatus,
-      stageInfo: StageInfo,
-      stageUiData: StageUIData,
-      includeDetails: Boolean): StageData = {
-
-    val taskLaunchTimes = 
stageUiData.taskData.values.map(_.taskInfo.launchTime).filter(_ > 0)
-
-    val firstTaskLaunchedTime: Option[Date] =
-      if (taskLaunchTimes.nonEmpty) {
-        Some(new Date(taskLaunchTimes.min))
-      } else {
-        None
-      }
-
-    val taskData = if (includeDetails) {
-      Some(stageUiData.taskData.map { case (k, v) =>
-        k -> convertTaskData(v, stageUiData.lastUpdateTime) }.toMap)
-    } else {
-      None
-    }
-    val executorSummary = if (includeDetails) {
-      Some(stageUiData.executorSummary.map { case (k, summary) =>
-        k -> new ExecutorStageSummary(
-          taskTime = summary.taskTime,
-          failedTasks = summary.failedTasks,
-          succeededTasks = summary.succeededTasks,
-          inputBytes = summary.inputBytes,
-          outputBytes = summary.outputBytes,
-          shuffleRead = summary.shuffleRead,
-          shuffleWrite = summary.shuffleWrite,
-          memoryBytesSpilled = summary.memoryBytesSpilled,
-          diskBytesSpilled = summary.diskBytesSpilled
-        )
-      }.toMap)
-    } else {
-      None
-    }
-
-    val accumulableInfo = stageUiData.accumulables.values.map { 
convertAccumulableInfo }.toSeq
-
-    new StageData(
-      status = status,
-      stageId = stageInfo.stageId,
-      attemptId = stageInfo.attemptId,
-      numActiveTasks = stageUiData.numActiveTasks,
-      numCompleteTasks = stageUiData.numCompleteTasks,
-      numFailedTasks = stageUiData.numFailedTasks,
-      executorRunTime = stageUiData.executorRunTime,
-      executorCpuTime = stageUiData.executorCpuTime,
-      submissionTime = stageInfo.submissionTime.map(new Date(_)),
-      firstTaskLaunchedTime,
-      completionTime = stageInfo.completionTime.map(new Date(_)),
-      inputBytes = stageUiData.inputBytes,
-      inputRecords = stageUiData.inputRecords,
-      outputBytes = stageUiData.outputBytes,
-      outputRecords = stageUiData.outputRecords,
-      shuffleReadBytes = stageUiData.shuffleReadTotalBytes,
-      shuffleReadRecords = stageUiData.shuffleReadRecords,
-      shuffleWriteBytes = stageUiData.shuffleWriteBytes,
-      shuffleWriteRecords = stageUiData.shuffleWriteRecords,
-      memoryBytesSpilled = stageUiData.memoryBytesSpilled,
-      diskBytesSpilled = stageUiData.diskBytesSpilled,
-      schedulingPool = stageUiData.schedulingPool,
-      name = stageInfo.name,
-      details = stageInfo.details,
-      accumulatorUpdates = accumulableInfo,
-      tasks = taskData,
-      executorSummary = executorSummary
-    )
-  }
-
-  def stagesAndStatus(ui: SparkUI): Seq[(StageStatus, Seq[StageInfo])] = {
-    val listener = ui.jobProgressListener
-    listener.synchronized {
-      Seq(
-        StageStatus.ACTIVE -> listener.activeStages.values.toSeq,
-        StageStatus.COMPLETE -> listener.completedStages.reverse.toSeq,
-        StageStatus.FAILED -> listener.failedStages.reverse.toSeq,
-        StageStatus.PENDING -> listener.pendingStages.values.toSeq
-      )
-    }
-  }
-
-  def convertTaskData(uiData: TaskUIData, lastUpdateTime: Option[Long]): 
TaskData = {
-    new TaskData(
-      taskId = uiData.taskInfo.taskId,
-      index = uiData.taskInfo.index,
-      attempt = uiData.taskInfo.attemptNumber,
-      launchTime = new Date(uiData.taskInfo.launchTime),
-      duration = uiData.taskDuration(lastUpdateTime),
-      executorId = uiData.taskInfo.executorId,
-      host = uiData.taskInfo.host,
-      status = uiData.taskInfo.status,
-      taskLocality = uiData.taskInfo.taskLocality.toString(),
-      speculative = uiData.taskInfo.speculative,
-      accumulatorUpdates = uiData.taskInfo.accumulables.map { 
convertAccumulableInfo },
-      errorMessage = uiData.errorMessage,
-      taskMetrics = uiData.metrics.map { convertUiTaskMetrics }
-    )
-  }
-
-  def taskMetricDistributions(
-      allTaskData: Iterable[TaskUIData],
-      quantiles: Array[Double]): TaskMetricDistributions = {
-
-    val rawMetrics = allTaskData.flatMap{_.metrics}.toSeq
-
-    def metricQuantiles(f: InternalTaskMetrics => Double): IndexedSeq[Double] =
-      Distribution(rawMetrics.map { d => f(d) }).get.getQuantiles(quantiles)
-
-    // We need to do a lot of similar munging to nested metrics here.  For 
each one,
-    // we want (a) extract the values for nested metrics (b) make a 
distribution for each metric
-    // (c) shove the distribution into the right field in our return type and 
(d) only return
-    // a result if the option is defined for any of the tasks.  MetricHelper 
is a little util
-    // to make it a little easier to deal w/ all of the nested options.  
Mostly it lets us just
-    // implement one "build" method, which just builds the quantiles for each 
field.
-
-    val inputMetrics: InputMetricDistributions =
-      new MetricHelper[InternalInputMetrics, 
InputMetricDistributions](rawMetrics, quantiles) {
-        def getSubmetrics(raw: InternalTaskMetrics): InternalInputMetrics = 
raw.inputMetrics
-
-        def build: InputMetricDistributions = new InputMetricDistributions(
-          bytesRead = submetricQuantiles(_.bytesRead),
-          recordsRead = submetricQuantiles(_.recordsRead)
-        )
-      }.build
-
-    val outputMetrics: OutputMetricDistributions =
-      new MetricHelper[InternalOutputMetrics, 
OutputMetricDistributions](rawMetrics, quantiles) {
-        def getSubmetrics(raw: InternalTaskMetrics): InternalOutputMetrics = 
raw.outputMetrics
-
-        def build: OutputMetricDistributions = new OutputMetricDistributions(
-          bytesWritten = submetricQuantiles(_.bytesWritten),
-          recordsWritten = submetricQuantiles(_.recordsWritten)
-        )
-      }.build
-
-    val shuffleReadMetrics: ShuffleReadMetricDistributions =
-      new MetricHelper[InternalShuffleReadMetrics, 
ShuffleReadMetricDistributions](rawMetrics,
-        quantiles) {
-        def getSubmetrics(raw: InternalTaskMetrics): 
InternalShuffleReadMetrics =
-          raw.shuffleReadMetrics
-
-        def build: ShuffleReadMetricDistributions = new 
ShuffleReadMetricDistributions(
-          readBytes = submetricQuantiles(_.totalBytesRead),
-          readRecords = submetricQuantiles(_.recordsRead),
-          remoteBytesRead = submetricQuantiles(_.remoteBytesRead),
-          remoteBytesReadToDisk = submetricQuantiles(_.remoteBytesReadToDisk),
-          remoteBlocksFetched = submetricQuantiles(_.remoteBlocksFetched),
-          localBlocksFetched = submetricQuantiles(_.localBlocksFetched),
-          totalBlocksFetched = submetricQuantiles(_.totalBlocksFetched),
-          fetchWaitTime = submetricQuantiles(_.fetchWaitTime)
-        )
-      }.build
-
-    val shuffleWriteMetrics: ShuffleWriteMetricDistributions =
-      new MetricHelper[InternalShuffleWriteMetrics, 
ShuffleWriteMetricDistributions](rawMetrics,
-        quantiles) {
-        def getSubmetrics(raw: InternalTaskMetrics): 
InternalShuffleWriteMetrics =
-          raw.shuffleWriteMetrics
 
-        def build: ShuffleWriteMetricDistributions = new 
ShuffleWriteMetricDistributions(
-          writeBytes = submetricQuantiles(_.bytesWritten),
-          writeRecords = submetricQuantiles(_.recordsWritten),
-          writeTime = submetricQuantiles(_.writeTime)
-        )
-      }.build
-
-    new TaskMetricDistributions(
-      quantiles = quantiles,
-      executorDeserializeTime = metricQuantiles(_.executorDeserializeTime),
-      executorDeserializeCpuTime = 
metricQuantiles(_.executorDeserializeCpuTime),
-      executorRunTime = metricQuantiles(_.executorRunTime),
-      executorCpuTime = metricQuantiles(_.executorCpuTime),
-      resultSize = metricQuantiles(_.resultSize),
-      jvmGcTime = metricQuantiles(_.jvmGCTime),
-      resultSerializationTime = metricQuantiles(_.resultSerializationTime),
-      memoryBytesSpilled = metricQuantiles(_.memoryBytesSpilled),
-      diskBytesSpilled = metricQuantiles(_.diskBytesSpilled),
-      inputMetrics = inputMetrics,
-      outputMetrics = outputMetrics,
-      shuffleReadMetrics = shuffleReadMetrics,
-      shuffleWriteMetrics = shuffleWriteMetrics
-    )
-  }
-
-  def convertAccumulableInfo(acc: InternalAccumulableInfo): AccumulableInfo = {
-    new AccumulableInfo(
-      acc.id, acc.name.orNull, acc.update.map(_.toString), 
acc.value.map(_.toString).orNull)
-  }
-
-  def convertUiTaskMetrics(internal: InternalTaskMetrics): TaskMetrics = {
-    new TaskMetrics(
-      executorDeserializeTime = internal.executorDeserializeTime,
-      executorDeserializeCpuTime = internal.executorDeserializeCpuTime,
-      executorRunTime = internal.executorRunTime,
-      executorCpuTime = internal.executorCpuTime,
-      resultSize = internal.resultSize,
-      jvmGcTime = internal.jvmGCTime,
-      resultSerializationTime = internal.resultSerializationTime,
-      memoryBytesSpilled = internal.memoryBytesSpilled,
-      diskBytesSpilled = internal.diskBytesSpilled,
-      inputMetrics = convertInputMetrics(internal.inputMetrics),
-      outputMetrics = convertOutputMetrics(internal.outputMetrics),
-      shuffleReadMetrics = 
convertShuffleReadMetrics(internal.shuffleReadMetrics),
-      shuffleWriteMetrics = 
convertShuffleWriteMetrics(internal.shuffleWriteMetrics)
-    )
-  }
-
-  def convertInputMetrics(internal: InternalInputMetrics): InputMetrics = {
-    new InputMetrics(
-      bytesRead = internal.bytesRead,
-      recordsRead = internal.recordsRead
-    )
-  }
-
-  def convertOutputMetrics(internal: InternalOutputMetrics): OutputMetrics = {
-    new OutputMetrics(
-      bytesWritten = internal.bytesWritten,
-      recordsWritten = internal.recordsWritten
-    )
-  }
-
-  def convertShuffleReadMetrics(internal: InternalShuffleReadMetrics): 
ShuffleReadMetrics = {
-    new ShuffleReadMetrics(
-      remoteBlocksFetched = internal.remoteBlocksFetched,
-      localBlocksFetched = internal.localBlocksFetched,
-      fetchWaitTime = internal.fetchWaitTime,
-      remoteBytesRead = internal.remoteBytesRead,
-      remoteBytesReadToDisk = internal.remoteBytesReadToDisk,
-      localBytesRead = internal.localBytesRead,
-      recordsRead = internal.recordsRead
-    )
-  }
-
-  def convertShuffleWriteMetrics(internal: InternalShuffleWriteMetrics): 
ShuffleWriteMetrics = {
-    new ShuffleWriteMetrics(
-      bytesWritten = internal.bytesWritten,
-      writeTime = internal.writeTime,
-      recordsWritten = internal.recordsWritten
-    )
-  }
-}
-
-/**
- * Helper for getting distributions from nested metric types.
- */
-private[v1] abstract class MetricHelper[I, O](
-    rawMetrics: Seq[InternalTaskMetrics],
-    quantiles: Array[Double]) {
-
-  def getSubmetrics(raw: InternalTaskMetrics): I
-
-  def build: O
-
-  val data: Seq[I] = rawMetrics.map(getSubmetrics)
-
-  /** applies the given function to all input metrics, and returns the 
quantiles */
-  def submetricQuantiles(f: I => Double): IndexedSeq[Double] = {
-    Distribution(data.map { d => f(d) }).get.getQuantiles(quantiles)
-  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/4741c078/core/src/main/scala/org/apache/spark/status/api/v1/OneJobResource.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/status/api/v1/OneJobResource.scala 
b/core/src/main/scala/org/apache/spark/status/api/v1/OneJobResource.scala
index 6531503..3ee884e 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/OneJobResource.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/OneJobResource.scala
@@ -16,25 +16,22 @@
  */
 package org.apache.spark.status.api.v1
 
+import java.util.NoSuchElementException
 import javax.ws.rs.{GET, PathParam, Produces}
 import javax.ws.rs.core.MediaType
 
-import org.apache.spark.JobExecutionStatus
 import org.apache.spark.ui.SparkUI
-import org.apache.spark.ui.jobs.UIData.JobUIData
 
 @Produces(Array(MediaType.APPLICATION_JSON))
 private[v1] class OneJobResource(ui: SparkUI) {
 
   @GET
   def oneJob(@PathParam("jobId") jobId: Int): JobData = {
-    val statusToJobs: Seq[(JobExecutionStatus, Seq[JobUIData])] =
-      AllJobsResource.getStatusToJobs(ui)
-    val jobOpt = statusToJobs.flatMap(_._2).find { jobInfo => jobInfo.jobId == 
jobId}
-    jobOpt.map { job =>
-      AllJobsResource.convertJobData(job, ui.jobProgressListener, false)
-    }.getOrElse {
-      throw new NotFoundException("unknown job: " + jobId)
+    try {
+      ui.store.job(jobId)
+    } catch {
+      case _: NoSuchElementException =>
+        throw new NotFoundException("unknown job: " + jobId)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/4741c078/core/src/main/scala/org/apache/spark/status/api/v1/OneStageResource.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/status/api/v1/OneStageResource.scala 
b/core/src/main/scala/org/apache/spark/status/api/v1/OneStageResource.scala
index f15073b..20dd73e 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/OneStageResource.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/OneStageResource.scala
@@ -24,7 +24,6 @@ import org.apache.spark.scheduler.StageInfo
 import org.apache.spark.status.api.v1.StageStatus._
 import org.apache.spark.status.api.v1.TaskSorting._
 import org.apache.spark.ui.SparkUI
-import org.apache.spark.ui.jobs.JobProgressListener
 import org.apache.spark.ui.jobs.UIData.StageUIData
 
 @Produces(Array(MediaType.APPLICATION_JSON))
@@ -32,13 +31,14 @@ private[v1] class OneStageResource(ui: SparkUI) {
 
   @GET
   @Path("")
-  def stageData(@PathParam("stageId") stageId: Int): Seq[StageData] = {
-    withStage(stageId) { stageAttempts =>
-      stageAttempts.map { stage =>
-        stage.ui.lastUpdateTime = ui.lastUpdateTime
-        AllStagesResource.stageUiToStageData(stage.status, stage.info, 
stage.ui,
-          includeDetails = true)
-      }
+  def stageData(
+      @PathParam("stageId") stageId: Int,
+      @QueryParam("details") @DefaultValue("true") details: Boolean): 
Seq[StageData] = {
+    val ret = ui.store.stageData(stageId, details = details)
+    if (ret.nonEmpty) {
+      ret
+    } else {
+      throw new NotFoundException(s"unknown stage: $stageId")
     }
   }
 
@@ -46,11 +46,13 @@ private[v1] class OneStageResource(ui: SparkUI) {
   @Path("/{stageAttemptId: \\d+}")
   def oneAttemptData(
       @PathParam("stageId") stageId: Int,
-      @PathParam("stageAttemptId") stageAttemptId: Int): StageData = {
-    withStageAttempt(stageId, stageAttemptId) { stage =>
-      stage.ui.lastUpdateTime = ui.lastUpdateTime
-      AllStagesResource.stageUiToStageData(stage.status, stage.info, stage.ui,
-        includeDetails = true)
+      @PathParam("stageAttemptId") stageAttemptId: Int,
+      @QueryParam("details") @DefaultValue("true") details: Boolean): 
StageData = {
+    try {
+      ui.store.stageAttempt(stageId, stageAttemptId, details = details)
+    } catch {
+      case _: NoSuchElementException =>
+        throw new NotFoundException(s"unknown attempt $stageAttemptId for 
stage $stageId.")
     }
   }
 
@@ -61,17 +63,16 @@ private[v1] class OneStageResource(ui: SparkUI) {
       @PathParam("stageAttemptId") stageAttemptId: Int,
       @DefaultValue("0.05,0.25,0.5,0.75,0.95") @QueryParam("quantiles") 
quantileString: String)
   : TaskMetricDistributions = {
-    withStageAttempt(stageId, stageAttemptId) { stage =>
-      val quantiles = quantileString.split(",").map { s =>
-        try {
-          s.toDouble
-        } catch {
-          case nfe: NumberFormatException =>
-            throw new BadParameterException("quantiles", "double", s)
-        }
+    val quantiles = quantileString.split(",").map { s =>
+      try {
+        s.toDouble
+      } catch {
+        case nfe: NumberFormatException =>
+          throw new BadParameterException("quantiles", "double", s)
       }
-      AllStagesResource.taskMetricDistributions(stage.ui.taskData.values, 
quantiles)
     }
+
+    ui.store.taskSummary(stageId, stageAttemptId, quantiles)
   }
 
   @GET
@@ -82,72 +83,7 @@ private[v1] class OneStageResource(ui: SparkUI) {
       @DefaultValue("0") @QueryParam("offset") offset: Int,
       @DefaultValue("20") @QueryParam("length") length: Int,
       @DefaultValue("ID") @QueryParam("sortBy") sortBy: TaskSorting): 
Seq[TaskData] = {
-    withStageAttempt(stageId, stageAttemptId) { stage =>
-      val tasks = stage.ui.taskData.values
-        .map{ AllStagesResource.convertTaskData(_, 
ui.lastUpdateTime)}.toIndexedSeq
-        .sorted(OneStageResource.ordering(sortBy))
-      tasks.slice(offset, offset + length)
-    }
-  }
-
-  private case class StageStatusInfoUi(status: StageStatus, info: StageInfo, 
ui: StageUIData)
-
-  private def withStage[T](stageId: Int)(f: Seq[StageStatusInfoUi] => T): T = {
-    val stageAttempts = findStageStatusUIData(ui.jobProgressListener, stageId)
-    if (stageAttempts.isEmpty) {
-      throw new NotFoundException("unknown stage: " + stageId)
-    } else {
-      f(stageAttempts)
-    }
+    ui.store.taskList(stageId, stageAttemptId, offset, length, sortBy)
   }
 
-  private def findStageStatusUIData(
-      listener: JobProgressListener,
-      stageId: Int): Seq[StageStatusInfoUi] = {
-    listener.synchronized {
-      def getStatusInfoUi(status: StageStatus, infos: Seq[StageInfo]): 
Seq[StageStatusInfoUi] = {
-        infos.filter { _.stageId == stageId }.map { info =>
-          val ui = listener.stageIdToData.getOrElse((info.stageId, 
info.attemptId),
-            // this is an internal error -- we should always have uiData
-            throw new SparkException(
-              s"no stage ui data found for stage: 
${info.stageId}:${info.attemptId}")
-          )
-          StageStatusInfoUi(status, info, ui)
-        }
-      }
-      getStatusInfoUi(ACTIVE, listener.activeStages.values.toSeq) ++
-        getStatusInfoUi(COMPLETE, listener.completedStages) ++
-        getStatusInfoUi(FAILED, listener.failedStages) ++
-        getStatusInfoUi(PENDING, listener.pendingStages.values.toSeq)
-    }
-  }
-
-  private def withStageAttempt[T](
-      stageId: Int,
-      stageAttemptId: Int)
-      (f: StageStatusInfoUi => T): T = {
-    withStage(stageId) { attempts =>
-        val oneAttempt = attempts.find { stage => stage.info.attemptId == 
stageAttemptId }
-        oneAttempt match {
-          case Some(stage) =>
-            f(stage)
-          case None =>
-            val stageAttempts = attempts.map { _.info.attemptId }
-            throw new NotFoundException(s"unknown attempt for stage $stageId.  
" +
-              s"Found attempts: ${stageAttempts.mkString("[", ",", "]")}")
-        }
-    }
-  }
-}
-
-object OneStageResource {
-  def ordering(taskSorting: TaskSorting): Ordering[TaskData] = {
-    val extractor: (TaskData => Long) = td =>
-      taskSorting match {
-        case ID => td.taskId
-        case INCREASING_RUNTIME => 
td.taskMetrics.map{_.executorRunTime}.getOrElse(-1L)
-        case DECREASING_RUNTIME => 
-td.taskMetrics.map{_.executorRunTime}.getOrElse(-1L)
-      }
-    Ordering.by(extractor)
-  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/4741c078/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala 
b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
index b338b1f..1428009 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
@@ -58,10 +58,15 @@ class ExecutorStageSummary private[spark](
     val taskTime : Long,
     val failedTasks : Int,
     val succeededTasks : Int,
+    val killedTasks : Int,
     val inputBytes : Long,
+    val inputRecords : Long,
     val outputBytes : Long,
+    val outputRecords : Long,
     val shuffleRead : Long,
+    val shuffleReadRecords : Long,
     val shuffleWrite : Long,
+    val shuffleWriteRecords : Long,
     val memoryBytesSpilled : Long,
     val diskBytesSpilled : Long)
 
@@ -111,10 +116,13 @@ class JobData private[spark](
     val numCompletedTasks: Int,
     val numSkippedTasks: Int,
     val numFailedTasks: Int,
+    val numKilledTasks: Int,
+    val numCompletedIndices: Int,
     val numActiveStages: Int,
     val numCompletedStages: Int,
     val numSkippedStages: Int,
-    val numFailedStages: Int)
+    val numFailedStages: Int,
+    val killedTasksSummary: Map[String, Int])
 
 class RDDStorageInfo private[spark](
     val id: Int,
@@ -152,15 +160,19 @@ class StageData private[spark](
     val status: StageStatus,
     val stageId: Int,
     val attemptId: Int,
+    val numTasks: Int,
     val numActiveTasks: Int,
     val numCompleteTasks: Int,
     val numFailedTasks: Int,
+    val numKilledTasks: Int,
+    val numCompletedIndices: Int,
 
     val executorRunTime: Long,
     val executorCpuTime: Long,
     val submissionTime: Option[Date],
     val firstTaskLaunchedTime: Option[Date],
     val completionTime: Option[Date],
+    val failureReason: Option[String],
 
     val inputBytes: Long,
     val inputRecords: Long,
@@ -174,18 +186,22 @@ class StageData private[spark](
     val diskBytesSpilled: Long,
 
     val name: String,
+    val description: Option[String],
     val details: String,
     val schedulingPool: String,
 
+    val rddIds: Seq[Int],
     val accumulatorUpdates: Seq[AccumulableInfo],
     val tasks: Option[Map[Long, TaskData]],
-    val executorSummary: Option[Map[String, ExecutorStageSummary]])
+    val executorSummary: Option[Map[String, ExecutorStageSummary]],
+    val killedTasksSummary: Map[String, Int])
 
 class TaskData private[spark](
     val taskId: Long,
     val index: Int,
     val attempt: Int,
     val launchTime: Date,
+    val resultFetchStart: Option[Date],
     @JsonDeserialize(contentAs = classOf[JLong])
     val duration: Option[Long],
     val executorId: String,
@@ -207,6 +223,7 @@ class TaskMetrics private[spark](
     val resultSerializationTime: Long,
     val memoryBytesSpilled: Long,
     val diskBytesSpilled: Long,
+    val peakExecutionMemory: Long,
     val inputMetrics: InputMetrics,
     val outputMetrics: OutputMetrics,
     val shuffleReadMetrics: ShuffleReadMetrics,

http://git-wip-us.apache.org/repos/asf/spark/blob/4741c078/core/src/main/scala/org/apache/spark/status/config.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/config.scala 
b/core/src/main/scala/org/apache/spark/status/config.scala
index 49144fc..7af9dff 100644
--- a/core/src/main/scala/org/apache/spark/status/config.scala
+++ b/core/src/main/scala/org/apache/spark/status/config.scala
@@ -27,4 +27,8 @@ private[spark] object config {
     .timeConf(TimeUnit.NANOSECONDS)
     .createWithDefaultString("100ms")
 
+  val MAX_RETAINED_ROOT_NODES = 
ConfigBuilder("spark.ui.dagGraph.retainedRootRDDs")
+    .intConf
+    .createWithDefault(Int.MaxValue)
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/4741c078/core/src/main/scala/org/apache/spark/status/storeTypes.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/storeTypes.scala 
b/core/src/main/scala/org/apache/spark/status/storeTypes.scala
index f44b793..c1ea875 100644
--- a/core/src/main/scala/org/apache/spark/status/storeTypes.scala
+++ b/core/src/main/scala/org/apache/spark/status/storeTypes.scala
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore
 
 import org.apache.spark.status.KVUtils._
 import org.apache.spark.status.api.v1._
+import org.apache.spark.ui.scope._
 import org.apache.spark.util.kvstore.KVIndex
 
 private[spark] case class AppStatusStoreMetadata(version: Long)
@@ -106,6 +107,11 @@ private[spark] class TaskDataWrapper(
     Array(stageId: JInteger, stageAttemptId: JInteger, _runtime: JLong)
   }
 
+  @JsonIgnore @KVIndex("startTime")
+  def startTime: Array[AnyRef] = {
+    Array(stageId: JInteger, stageAttemptId: JInteger, 
info.launchTime.getTime(): JLong)
+  }
+
 }
 
 private[spark] class RDDStorageInfoWrapper(val info: RDDStorageInfo) {
@@ -147,3 +153,37 @@ private[spark] class StreamBlockData(
   def key: Array[String] = Array(name, executorId)
 
 }
+
+private[spark] class RDDOperationClusterWrapper(
+    val id: String,
+    val name: String,
+    val childNodes: Seq[RDDOperationNode],
+    val childClusters: Seq[RDDOperationClusterWrapper]) {
+
+  def toRDDOperationCluster(): RDDOperationCluster = {
+    val cluster = new RDDOperationCluster(id, name)
+    childNodes.foreach(cluster.attachChildNode)
+    childClusters.foreach { child =>
+      cluster.attachChildCluster(child.toRDDOperationCluster())
+    }
+    cluster
+  }
+
+}
+
+private[spark] class RDDOperationGraphWrapper(
+    @KVIndexParam val stageId: Int,
+    val edges: Seq[RDDOperationEdge],
+    val outgoingEdges: Seq[RDDOperationEdge],
+    val incomingEdges: Seq[RDDOperationEdge],
+    val rootCluster: RDDOperationClusterWrapper) {
+
+  def toRDDOperationGraph(): RDDOperationGraph = {
+    new RDDOperationGraph(edges, outgoingEdges, incomingEdges, 
rootCluster.toRDDOperationCluster())
+  }
+
+}
+
+private[spark] class PoolData(
+    @KVIndexParam val name: String,
+    val stageIds: Set[Int])

http://git-wip-us.apache.org/repos/asf/spark/blob/4741c078/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala 
b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
index e93ade0..35da3c3 100644
--- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
@@ -17,11 +17,11 @@
 
 package org.apache.spark.ui
 
-import java.util.{Date, ServiceLoader}
+import java.util.{Date, List => JList, ServiceLoader}
 
 import scala.collection.JavaConverters._
 
-import org.apache.spark.{SecurityManager, SparkConf, SparkContext}
+import org.apache.spark.{JobExecutionStatus, SecurityManager, SparkConf, 
SparkContext}
 import org.apache.spark.internal.Logging
 import org.apache.spark.scheduler._
 import org.apache.spark.status.AppStatusStore
@@ -29,8 +29,7 @@ import org.apache.spark.status.api.v1._
 import org.apache.spark.ui.JettyUtils._
 import org.apache.spark.ui.env.EnvironmentTab
 import org.apache.spark.ui.exec.ExecutorsTab
-import org.apache.spark.ui.jobs.{JobProgressListener, JobsTab, StagesTab}
-import org.apache.spark.ui.scope.RDDOperationGraphListener
+import org.apache.spark.ui.jobs.{JobsTab, StagesTab}
 import org.apache.spark.ui.storage.StorageTab
 import org.apache.spark.util.Utils
 
@@ -42,11 +41,8 @@ private[spark] class SparkUI private (
     val sc: Option[SparkContext],
     val conf: SparkConf,
     securityManager: SecurityManager,
-    val jobProgressListener: JobProgressListener,
-    val operationGraphListener: RDDOperationGraphListener,
     var appName: String,
     val basePath: String,
-    val lastUpdateTime: Option[Long] = None,
     val startTime: Long,
     val appSparkVersion: String)
   extends WebUI(securityManager, securityManager.getSSLOptions("ui"), 
SparkUI.getUIPort(conf),
@@ -61,8 +57,8 @@ private[spark] class SparkUI private (
   private var streamingJobProgressListener: Option[SparkListener] = None
 
   /** Initialize all components of the server. */
-  def initialize() {
-    val jobsTab = new JobsTab(this)
+  def initialize(): Unit = {
+    val jobsTab = new JobsTab(this, store)
     attachTab(jobsTab)
     val stagesTab = new StagesTab(this, store)
     attachTab(stagesTab)
@@ -72,6 +68,7 @@ private[spark] class SparkUI private (
     attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))
     attachHandler(createRedirectHandler("/", "/jobs/", basePath = basePath))
     attachHandler(ApiRootResource.getServletHandler(this))
+
     // These should be POST only, but, the YARN AM proxy won't proxy POSTs
     attachHandler(createRedirectHandler(
       "/jobs/job/kill", "/jobs/", jobsTab.handleKillRequest, httpMethods = 
Set("GET", "POST")))
@@ -79,6 +76,7 @@ private[spark] class SparkUI private (
       "/stages/stage/kill", "/stages/", stagesTab.handleKillRequest,
       httpMethods = Set("GET", "POST")))
   }
+
   initialize()
 
   def getSparkUser: String = {
@@ -170,25 +168,13 @@ private[spark] object SparkUI {
       sc: Option[SparkContext],
       store: AppStatusStore,
       conf: SparkConf,
-      addListenerFn: SparkListenerInterface => Unit,
       securityManager: SecurityManager,
       appName: String,
       basePath: String,
       startTime: Long,
-      lastUpdateTime: Option[Long] = None,
       appSparkVersion: String = org.apache.spark.SPARK_VERSION): SparkUI = {
 
-    val jobProgressListener = sc.map(_.jobProgressListener).getOrElse {
-      val listener = new JobProgressListener(conf)
-      addListenerFn(listener)
-      listener
-    }
-    val operationGraphListener = new RDDOperationGraphListener(conf)
-
-    addListenerFn(operationGraphListener)
-
-    new SparkUI(store, sc, conf, securityManager, jobProgressListener, 
operationGraphListener,
-      appName, basePath, lastUpdateTime, startTime, appSparkVersion)
+    new SparkUI(store, sc, conf, securityManager, appName, basePath, 
startTime, appSparkVersion)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/4741c078/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
index a647a11..b60d39b 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
@@ -22,20 +22,20 @@ import java.util.Date
 import javax.servlet.http.HttpServletRequest
 
 import scala.collection.JavaConverters._
-import scala.collection.mutable.{HashMap, ListBuffer}
+import scala.collection.mutable.ListBuffer
 import scala.xml._
 
 import org.apache.commons.lang3.StringEscapeUtils
 
 import org.apache.spark.JobExecutionStatus
 import org.apache.spark.scheduler._
-import org.apache.spark.status.api.v1.ExecutorSummary
+import org.apache.spark.status.AppStatusStore
+import org.apache.spark.status.api.v1
 import org.apache.spark.ui._
-import org.apache.spark.ui.jobs.UIData.{JobUIData, StageUIData}
 import org.apache.spark.util.Utils
 
 /** Page showing list of all ongoing and recently finished jobs */
-private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {
+private[ui] class AllJobsPage(parent: JobsTab, store: AppStatusStore) extends 
WebUIPage("") {
   private val JOBS_LEGEND =
     <div class="legend-area"><svg width="150px" height="85px">
       <rect class="succeeded-job-legend"
@@ -59,34 +59,20 @@ private[ui] class AllJobsPage(parent: JobsTab) extends 
WebUIPage("") {
       <text x="35px" y="42px">Removed</text>
     </svg></div>.toString.filter(_ != '\n')
 
-  private def getLastStageNameAndDescription(job: JobUIData): (String, String) 
= {
-    val lastStageInfo = Option(job.stageIds)
-      .filter(_.nonEmpty)
-      .flatMap { ids => parent.jobProgresslistener.stageIdToInfo.get(ids.max)}
-    val lastStageData = lastStageInfo.flatMap { s =>
-      parent.jobProgresslistener.stageIdToData.get((s.stageId, s.attemptId))
-    }
-    val name = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)")
-    val description = lastStageData.flatMap(_.description).getOrElse("")
-    (name, description)
-  }
-
-  private def makeJobEvent(jobUIDatas: Seq[JobUIData]): Seq[String] = {
-    jobUIDatas.filter { jobUIData =>
-      jobUIData.status != JobExecutionStatus.UNKNOWN && 
jobUIData.submissionTime.isDefined
-    }.map { jobUIData =>
-      val jobId = jobUIData.jobId
-      val status = jobUIData.status
-      val (jobName, jobDescription) = getLastStageNameAndDescription(jobUIData)
+  private def makeJobEvent(jobs: Seq[v1.JobData]): Seq[String] = {
+    jobs.filter { job =>
+      job.status != JobExecutionStatus.UNKNOWN && job.submissionTime.isDefined
+    }.map { job =>
+      val jobId = job.jobId
+      val status = job.status
       val displayJobDescription =
-        if (jobDescription.isEmpty) {
-          jobName
+        if (job.description.isEmpty) {
+          job.name
         } else {
-          UIUtils.makeDescription(jobDescription, "", plainText = true).text
+          UIUtils.makeDescription(job.description.get, "", plainText = 
true).text
         }
-      val submissionTime = jobUIData.submissionTime.get
-      val completionTimeOpt = jobUIData.completionTime
-      val completionTime = 
completionTimeOpt.getOrElse(System.currentTimeMillis())
+      val submissionTime = job.submissionTime.get.getTime()
+      val completionTime = 
job.completionTime.map(_.getTime()).getOrElse(System.currentTimeMillis())
       val classNameByStatus = status match {
         case JobExecutionStatus.SUCCEEDED => "succeeded"
         case JobExecutionStatus.FAILED => "failed"
@@ -124,7 +110,7 @@ private[ui] class AllJobsPage(parent: JobsTab) extends 
WebUIPage("") {
     }
   }
 
-  private def makeExecutorEvent(executors: Seq[ExecutorSummary]):
+  private def makeExecutorEvent(executors: Seq[v1.ExecutorSummary]):
       Seq[String] = {
     val events = ListBuffer[String]()
     executors.foreach { e =>
@@ -169,8 +155,8 @@ private[ui] class AllJobsPage(parent: JobsTab) extends 
WebUIPage("") {
   }
 
   private def makeTimeline(
-      jobs: Seq[JobUIData],
-      executors: Seq[ExecutorSummary],
+      jobs: Seq[v1.JobData],
+      executors: Seq[v1.ExecutorSummary],
       startTime: Long): Seq[Node] = {
 
     val jobEventJsonAsStrSeq = makeJobEvent(jobs)
@@ -217,7 +203,7 @@ private[ui] class AllJobsPage(parent: JobsTab) extends 
WebUIPage("") {
       request: HttpServletRequest,
       tableHeaderId: String,
       jobTag: String,
-      jobs: Seq[JobUIData],
+      jobs: Seq[v1.JobData],
       killEnabled: Boolean): Seq[Node] = {
     // stripXSS is called to remove suspicious characters used in XSS attacks
     val allParameters = 
request.getParameterMap.asScala.toMap.mapValues(_.map(UIUtils.stripXSS))
@@ -258,14 +244,13 @@ private[ui] class AllJobsPage(parent: JobsTab) extends 
WebUIPage("") {
 
     try {
       new JobPagedTable(
+        store,
         jobs,
         tableHeaderId,
         jobTag,
         UIUtils.prependBaseUri(parent.basePath),
         "jobs", // subPath
         parameterOtherTable,
-        parent.jobProgresslistener.stageIdToInfo,
-        parent.jobProgresslistener.stageIdToData,
         killEnabled,
         currentTime,
         jobIdTitle,
@@ -285,106 +270,117 @@ private[ui] class AllJobsPage(parent: JobsTab) extends 
WebUIPage("") {
   }
 
   def render(request: HttpServletRequest): Seq[Node] = {
-    val listener = parent.jobProgresslistener
-    listener.synchronized {
-      val startTime = listener.startTime
-      val endTime = listener.endTime
-      val activeJobs = listener.activeJobs.values.toSeq
-      val completedJobs = listener.completedJobs.reverse
-      val failedJobs = listener.failedJobs.reverse
-
-      val activeJobsTable =
-        jobsTable(request, "active", "activeJob", activeJobs, killEnabled = 
parent.killEnabled)
-      val completedJobsTable =
-        jobsTable(request, "completed", "completedJob", completedJobs, 
killEnabled = false)
-      val failedJobsTable =
-        jobsTable(request, "failed", "failedJob", failedJobs, killEnabled = 
false)
-
-      val shouldShowActiveJobs = activeJobs.nonEmpty
-      val shouldShowCompletedJobs = completedJobs.nonEmpty
-      val shouldShowFailedJobs = failedJobs.nonEmpty
-
-      val completedJobNumStr = if (completedJobs.size == 
listener.numCompletedJobs) {
-        s"${completedJobs.size}"
-      } else {
-        s"${listener.numCompletedJobs}, only showing ${completedJobs.size}"
+    val appInfo = store.applicationInfo()
+    val startTime = appInfo.attempts.head.startTime.getTime()
+    val endTime = appInfo.attempts.head.endTime.getTime()
+
+    val activeJobs = new ListBuffer[v1.JobData]()
+    val completedJobs = new ListBuffer[v1.JobData]()
+    val failedJobs = new ListBuffer[v1.JobData]()
+
+    store.jobsList(null).foreach { job =>
+      job.status match {
+        case JobExecutionStatus.SUCCEEDED =>
+          completedJobs += job
+        case JobExecutionStatus.FAILED =>
+          failedJobs += job
+        case _ =>
+          activeJobs += job
       }
+    }
 
-      val summary: NodeSeq =
-        <div>
-          <ul class="unstyled">
-            <li>
-              <strong>User:</strong>
-              {parent.getSparkUser}
-            </li>
-            <li>
-              <strong>Total Uptime:</strong>
-              {
-                if (endTime < 0 && parent.sc.isDefined) {
-                  UIUtils.formatDuration(System.currentTimeMillis() - 
startTime)
-                } else if (endTime > 0) {
-                  UIUtils.formatDuration(endTime - startTime)
-                }
-              }
-            </li>
-            <li>
-              <strong>Scheduling Mode: </strong>
-              {listener.schedulingMode.map(_.toString).getOrElse("Unknown")}
-            </li>
+    val activeJobsTable =
+      jobsTable(request, "active", "activeJob", activeJobs, killEnabled = 
parent.killEnabled)
+    val completedJobsTable =
+      jobsTable(request, "completed", "completedJob", completedJobs, 
killEnabled = false)
+    val failedJobsTable =
+      jobsTable(request, "failed", "failedJob", failedJobs, killEnabled = 
false)
+
+    val shouldShowActiveJobs = activeJobs.nonEmpty
+    val shouldShowCompletedJobs = completedJobs.nonEmpty
+    val shouldShowFailedJobs = failedJobs.nonEmpty
+
+    val completedJobNumStr = s"${completedJobs.size}"
+    val schedulingMode = store.environmentInfo().sparkProperties.toMap
+      .get("spark.scheduler.mode")
+      .map { mode => SchedulingMode.withName(mode).toString }
+      .getOrElse("Unknown")
+
+    val summary: NodeSeq =
+      <div>
+        <ul class="unstyled">
+          <li>
+            <strong>User:</strong>
+            {parent.getSparkUser}
+          </li>
+          <li>
+            <strong>Total Uptime:</strong>
             {
-              if (shouldShowActiveJobs) {
-                <li>
-                  <a href="#active"><strong>Active Jobs:</strong></a>
-                  {activeJobs.size}
-                </li>
+              if (endTime < 0 && parent.sc.isDefined) {
+                UIUtils.formatDuration(System.currentTimeMillis() - startTime)
+              } else if (endTime > 0) {
+                UIUtils.formatDuration(endTime - startTime)
               }
             }
-            {
-              if (shouldShowCompletedJobs) {
-                <li id="completed-summary">
-                  <a href="#completed"><strong>Completed Jobs:</strong></a>
-                  {completedJobNumStr}
-                </li>
-              }
+          </li>
+          <li>
+            <strong>Scheduling Mode: </strong>
+            {schedulingMode}
+          </li>
+          {
+            if (shouldShowActiveJobs) {
+              <li>
+                <a href="#active"><strong>Active Jobs:</strong></a>
+                {activeJobs.size}
+              </li>
             }
-            {
-              if (shouldShowFailedJobs) {
-                <li>
-                  <a href="#failed"><strong>Failed Jobs:</strong></a>
-                  {listener.numFailedJobs}
-                </li>
-              }
+          }
+          {
+            if (shouldShowCompletedJobs) {
+              <li id="completed-summary">
+                <a href="#completed"><strong>Completed Jobs:</strong></a>
+                {completedJobNumStr}
+              </li>
             }
-          </ul>
-        </div>
+          }
+          {
+            if (shouldShowFailedJobs) {
+              <li>
+                <a href="#failed"><strong>Failed Jobs:</strong></a>
+                {failedJobs.size}
+              </li>
+            }
+          }
+        </ul>
+      </div>
 
-      var content = summary
-      content ++= makeTimeline(activeJobs ++ completedJobs ++ failedJobs,
-          parent.parent.store.executorList(false), startTime)
+    var content = summary
+    content ++= makeTimeline(activeJobs ++ completedJobs ++ failedJobs,
+      store.executorList(false), startTime)
 
-      if (shouldShowActiveJobs) {
-        content ++= <h4 id="active">Active Jobs ({activeJobs.size})</h4> ++
-          activeJobsTable
-      }
-      if (shouldShowCompletedJobs) {
-        content ++= <h4 id="completed">Completed Jobs 
({completedJobNumStr})</h4> ++
-          completedJobsTable
-      }
-      if (shouldShowFailedJobs) {
-        content ++= <h4 id ="failed">Failed Jobs ({failedJobs.size})</h4> ++
-          failedJobsTable
-      }
+    if (shouldShowActiveJobs) {
+      content ++= <h4 id="active">Active Jobs ({activeJobs.size})</h4> ++
+        activeJobsTable
+    }
+    if (shouldShowCompletedJobs) {
+      content ++= <h4 id="completed">Completed Jobs 
({completedJobNumStr})</h4> ++
+        completedJobsTable
+    }
+    if (shouldShowFailedJobs) {
+      content ++= <h4 id ="failed">Failed Jobs ({failedJobs.size})</h4> ++
+        failedJobsTable
+    }
 
-      val helpText = """A job is triggered by an action, like count() or 
saveAsTextFile().""" +
-        " Click on a job to see information about the stages of tasks inside 
it."
+    val helpText = """A job is triggered by an action, like count() or 
saveAsTextFile().""" +
+      " Click on a job to see information about the stages of tasks inside it."
 
-      UIUtils.headerSparkPage("Spark Jobs", content, parent, helpText = 
Some(helpText))
-    }
+    UIUtils.headerSparkPage("Spark Jobs", content, parent, helpText = 
Some(helpText))
   }
+
 }
 
 private[ui] class JobTableRowData(
-    val jobData: JobUIData,
+    val jobData: v1.JobData,
     val lastStageName: String,
     val lastStageDescription: String,
     val duration: Long,
@@ -395,9 +391,8 @@ private[ui] class JobTableRowData(
     val detailUrl: String)
 
 private[ui] class JobDataSource(
-    jobs: Seq[JobUIData],
-    stageIdToInfo: HashMap[Int, StageInfo],
-    stageIdToData: HashMap[(Int, Int), StageUIData],
+    store: AppStatusStore,
+    jobs: Seq[v1.JobData],
     basePath: String,
     currentTime: Long,
     pageSize: Int,
@@ -418,40 +413,28 @@ private[ui] class JobDataSource(
     r
   }
 
-  private def getLastStageNameAndDescription(job: JobUIData): (String, String) 
= {
-    val lastStageInfo = Option(job.stageIds)
-      .filter(_.nonEmpty)
-      .flatMap { ids => stageIdToInfo.get(ids.max)}
-    val lastStageData = lastStageInfo.flatMap { s =>
-      stageIdToData.get((s.stageId, s.attemptId))
-    }
-    val name = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)")
-    val description = lastStageData.flatMap(_.description).getOrElse("")
-    (name, description)
-  }
-
-  private def jobRow(jobData: JobUIData): JobTableRowData = {
-    val (lastStageName, lastStageDescription) = 
getLastStageNameAndDescription(jobData)
+  private def jobRow(jobData: v1.JobData): JobTableRowData = {
     val duration: Option[Long] = {
       jobData.submissionTime.map { start =>
-        val end = jobData.completionTime.getOrElse(System.currentTimeMillis())
-        end - start
+        val end = 
jobData.completionTime.map(_.getTime()).getOrElse(System.currentTimeMillis())
+        end - start.getTime()
       }
     }
     val formattedDuration = duration.map(d => 
UIUtils.formatDuration(d)).getOrElse("Unknown")
     val submissionTime = jobData.submissionTime
     val formattedSubmissionTime = 
submissionTime.map(UIUtils.formatDate).getOrElse("Unknown")
-    val jobDescription = UIUtils.makeDescription(lastStageDescription, 
basePath, plainText = false)
+    val jobDescription = 
UIUtils.makeDescription(jobData.description.getOrElse(""),
+      basePath, plainText = false)
 
     val detailUrl = "%s/jobs/job?id=%s".format(basePath, jobData.jobId)
 
-    new JobTableRowData (
+    new JobTableRowData(
       jobData,
-      lastStageName,
-      lastStageDescription,
+      jobData.name,
+      jobData.description.getOrElse(jobData.name),
       duration.getOrElse(-1),
       formattedDuration,
-      submissionTime.getOrElse(-1),
+      submissionTime.map(_.getTime()).getOrElse(-1L),
       formattedSubmissionTime,
       jobDescription,
       detailUrl
@@ -479,15 +462,15 @@ private[ui] class JobDataSource(
   }
 
 }
+
 private[ui] class JobPagedTable(
-    data: Seq[JobUIData],
+    store: AppStatusStore,
+    data: Seq[v1.JobData],
     tableHeaderId: String,
     jobTag: String,
     basePath: String,
     subPath: String,
     parameterOtherTable: Iterable[String],
-    stageIdToInfo: HashMap[Int, StageInfo],
-    stageIdToData: HashMap[(Int, Int), StageUIData],
     killEnabled: Boolean,
     currentTime: Long,
     jobIdTitle: String,
@@ -510,9 +493,8 @@ private[ui] class JobPagedTable(
   override def pageNumberFormField: String = jobTag + ".page"
 
   override val dataSource = new JobDataSource(
+    store,
     data,
-    stageIdToInfo,
-    stageIdToData,
     basePath,
     currentTime,
     pageSize,
@@ -624,15 +606,15 @@ private[ui] class JobPagedTable(
       </td>
       <td>{jobTableRow.formattedDuration}</td>
       <td class="stage-progress-cell">
-        {job.completedStageIndices.size}/{job.stageIds.size - 
job.numSkippedStages}
+        {job.numCompletedStages}/{job.stageIds.size - job.numSkippedStages}
         {if (job.numFailedStages > 0) s"(${job.numFailedStages} failed)"}
         {if (job.numSkippedStages > 0) s"(${job.numSkippedStages} skipped)"}
       </td>
       <td class="progress-cell">
         {UIUtils.makeProgressBar(started = job.numActiveTasks,
-        completed = job.completedIndices.size,
+        completed = job.numCompletedIndices,
         failed = job.numFailedTasks, skipped = job.numSkippedTasks,
-        reasonToNumKilled = job.reasonToNumKilled, total = job.numTasks - 
job.numSkippedTasks)}
+        reasonToNumKilled = job.killedTasksSummary, total = job.numTasks - 
job.numSkippedTasks)}
       </td>
     </tr>
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to