[SPARK-20650][CORE] Remove JobProgressListener.

The only remaining use of this class was the SparkStatusTracker, which
was modified to use the new status store. The test code to wait for
executors was moved to TestUtils and now uses the SparkStatusTracker API.

Indirectly, ConsoleProgressBar also uses this data. Because it has
some lower latency requirements, a shortcut to efficiently get the
active stages from the active listener was added to the AppStateStore.

Now that all UI code goes through the status store to get its data,
the FsHistoryProvider can be cleaned up to only replay event logs
when needed - that is, when there is no pre-existing disk store for
the application.

As part of this change I also modified the streaming UI to read the needed
data from the store, which was missed in the previous patch that made
JobProgressListener redundant.

Author: Marcelo Vanzin <van...@cloudera.com>

Closes #19750 from vanzin/SPARK-20650.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8ff474f6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8ff474f6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8ff474f6

Branch: refs/heads/master
Commit: 8ff474f6e543203fac5d49af7fbe98a8a98da567
Parents: 193555f
Author: Marcelo Vanzin <van...@cloudera.com>
Authored: Wed Nov 29 14:34:41 2017 -0800
Committer: Marcelo Vanzin <van...@cloudera.com>
Committed: Wed Nov 29 14:34:41 2017 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   |  11 +-
 .../org/apache/spark/SparkStatusTracker.scala   |  76 ++-
 .../main/scala/org/apache/spark/TestUtils.scala |  26 +-
 .../deploy/history/FsHistoryProvider.scala      |  65 +-
 .../apache/spark/status/AppStatusListener.scala |  51 +-
 .../apache/spark/status/AppStatusStore.scala    |  17 +-
 .../org/apache/spark/status/LiveEntity.scala    |   8 +-
 .../spark/status/api/v1/StagesResource.scala    |   1 -
 .../apache/spark/ui/ConsoleProgressBar.scala    |  18 +-
 .../spark/ui/jobs/JobProgressListener.scala     | 612 -------------------
 .../org/apache/spark/ui/jobs/StagePage.scala    |   1 -
 .../scala/org/apache/spark/ui/jobs/UIData.scala | 311 ----------
 .../org/apache/spark/DistributedSuite.scala     |   2 +-
 .../spark/ExternalShuffleServiceSuite.scala     |   2 +-
 .../org/apache/spark/StatusTrackerSuite.scala   |   6 +-
 .../apache/spark/broadcast/BroadcastSuite.scala |   2 +-
 .../spark/scheduler/DAGSchedulerSuite.scala     |   4 +-
 .../SparkListenerWithClusterSuite.scala         |   4 +-
 .../ui/jobs/JobProgressListenerSuite.scala      | 442 --------------
 project/MimaExcludes.scala                      |   2 +
 .../apache/spark/streaming/ui/BatchPage.scala   |  75 +--
 21 files changed, 208 insertions(+), 1528 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8ff474f6/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 23fd54f..984dd0a 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -58,7 +58,6 @@ import org.apache.spark.status.{AppStatusPlugin, 
AppStatusStore}
 import org.apache.spark.storage._
 import org.apache.spark.storage.BlockManagerMessages.TriggerThreadDump
 import org.apache.spark.ui.{ConsoleProgressBar, SparkUI}
-import org.apache.spark.ui.jobs.JobProgressListener
 import org.apache.spark.util._
 
 /**
@@ -195,7 +194,6 @@ class SparkContext(config: SparkConf) extends Logging {
   private var _eventLogCodec: Option[String] = None
   private var _listenerBus: LiveListenerBus = _
   private var _env: SparkEnv = _
-  private var _jobProgressListener: JobProgressListener = _
   private var _statusTracker: SparkStatusTracker = _
   private var _progressBar: Option[ConsoleProgressBar] = None
   private var _ui: Option[SparkUI] = None
@@ -270,8 +268,6 @@ class SparkContext(config: SparkConf) extends Logging {
     val map: ConcurrentMap[Int, RDD[_]] = new 
MapMaker().weakValues().makeMap[Int, RDD[_]]()
     map.asScala
   }
-  private[spark] def jobProgressListener: JobProgressListener = 
_jobProgressListener
-
   def statusTracker: SparkStatusTracker = _statusTracker
 
   private[spark] def progressBar: Option[ConsoleProgressBar] = _progressBar
@@ -421,11 +417,6 @@ class SparkContext(config: SparkConf) extends Logging {
 
     _listenerBus = new LiveListenerBus(_conf)
 
-    // "_jobProgressListener" should be set up before creating SparkEnv 
because when creating
-    // "SparkEnv", some messages will be posted to "listenerBus" and we should 
not miss them.
-    _jobProgressListener = new JobProgressListener(_conf)
-    listenerBus.addToStatusQueue(jobProgressListener)
-
     // Initialize the app status store and listener before SparkEnv is created 
so that it gets
     // all events.
     _statusStore = AppStatusStore.createLiveStore(conf, l => 
listenerBus.addToStatusQueue(l))
@@ -440,7 +431,7 @@ class SparkContext(config: SparkConf) extends Logging {
       _conf.set("spark.repl.class.uri", replUri)
     }
 
-    _statusTracker = new SparkStatusTracker(this)
+    _statusTracker = new SparkStatusTracker(this, _statusStore)
 
     _progressBar =
       if (_conf.get(UI_SHOW_CONSOLE_PROGRESS) && !log.isInfoEnabled) {

http://git-wip-us.apache.org/repos/asf/spark/blob/8ff474f6/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala 
b/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala
index 22a553e..70865cb 100644
--- a/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala
+++ b/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala
@@ -17,7 +17,10 @@
 
 package org.apache.spark
 
-import org.apache.spark.scheduler.TaskSchedulerImpl
+import java.util.Arrays
+
+import org.apache.spark.status.AppStatusStore
+import org.apache.spark.status.api.v1.StageStatus
 
 /**
  * Low-level status reporting APIs for monitoring job and stage progress.
@@ -33,9 +36,7 @@ import org.apache.spark.scheduler.TaskSchedulerImpl
  *
  * NOTE: this class's constructor should be considered private and may be 
subject to change.
  */
-class SparkStatusTracker private[spark] (sc: SparkContext) {
-
-  private val jobProgressListener = sc.jobProgressListener
+class SparkStatusTracker private[spark] (sc: SparkContext, store: 
AppStatusStore) {
 
   /**
    * Return a list of all known jobs in a particular job group.  If `jobGroup` 
is `null`, then
@@ -46,9 +47,8 @@ class SparkStatusTracker private[spark] (sc: SparkContext) {
    * its result.
    */
   def getJobIdsForGroup(jobGroup: String): Array[Int] = {
-    jobProgressListener.synchronized {
-      jobProgressListener.jobGroupToJobIds.getOrElse(jobGroup, 
Seq.empty).toArray
-    }
+    val expected = Option(jobGroup)
+    store.jobsList(null).filter(_.jobGroup == expected).map(_.jobId).toArray
   }
 
   /**
@@ -57,9 +57,7 @@ class SparkStatusTracker private[spark] (sc: SparkContext) {
    * This method does not guarantee the order of the elements in its result.
    */
   def getActiveStageIds(): Array[Int] = {
-    jobProgressListener.synchronized {
-      jobProgressListener.activeStages.values.map(_.stageId).toArray
-    }
+    store.stageList(Arrays.asList(StageStatus.ACTIVE)).map(_.stageId).toArray
   }
 
   /**
@@ -68,19 +66,15 @@ class SparkStatusTracker private[spark] (sc: SparkContext) {
    * This method does not guarantee the order of the elements in its result.
    */
   def getActiveJobIds(): Array[Int] = {
-    jobProgressListener.synchronized {
-      jobProgressListener.activeJobs.values.map(_.jobId).toArray
-    }
+    
store.jobsList(Arrays.asList(JobExecutionStatus.RUNNING)).map(_.jobId).toArray
   }
 
   /**
    * Returns job information, or `None` if the job info could not be found or 
was garbage collected.
    */
   def getJobInfo(jobId: Int): Option[SparkJobInfo] = {
-    jobProgressListener.synchronized {
-      jobProgressListener.jobIdToData.get(jobId).map { data =>
-        new SparkJobInfoImpl(jobId, data.stageIds.toArray, data.status)
-      }
+    store.asOption(store.job(jobId)).map { job =>
+      new SparkJobInfoImpl(jobId, job.stageIds.toArray, job.status)
     }
   }
 
@@ -89,21 +83,16 @@ class SparkStatusTracker private[spark] (sc: SparkContext) {
    * garbage collected.
    */
   def getStageInfo(stageId: Int): Option[SparkStageInfo] = {
-    jobProgressListener.synchronized {
-      for (
-        info <- jobProgressListener.stageIdToInfo.get(stageId);
-        data <- jobProgressListener.stageIdToData.get((stageId, 
info.attemptId))
-      ) yield {
-        new SparkStageInfoImpl(
-          stageId,
-          info.attemptId,
-          info.submissionTime.getOrElse(0),
-          info.name,
-          info.numTasks,
-          data.numActiveTasks,
-          data.numCompleteTasks,
-          data.numFailedTasks)
-      }
+    store.asOption(store.lastStageAttempt(stageId)).map { stage =>
+      new SparkStageInfoImpl(
+        stageId,
+        stage.attemptId,
+        stage.submissionTime.map(_.getTime()).getOrElse(0L),
+        stage.name,
+        stage.numTasks,
+        stage.numActiveTasks,
+        stage.numCompleteTasks,
+        stage.numFailedTasks)
     }
   }
 
@@ -111,17 +100,20 @@ class SparkStatusTracker private[spark] (sc: 
SparkContext) {
    * Returns information of all known executors, including host, port, 
cacheSize, numRunningTasks.
    */
   def getExecutorInfos: Array[SparkExecutorInfo] = {
-    val executorIdToRunningTasks: Map[String, Int] =
-      sc.taskScheduler.asInstanceOf[TaskSchedulerImpl].runningTasksByExecutors
+    store.executorList(true).map { exec =>
+      val (host, port) = exec.hostPort.split(":", 2) match {
+        case Array(h, p) => (h, p.toInt)
+        case Array(h) => (h, -1)
+      }
+      val cachedMem = exec.memoryMetrics.map { mem =>
+        mem.usedOnHeapStorageMemory + mem.usedOffHeapStorageMemory
+      }.getOrElse(0L)
 
-    sc.getExecutorStorageStatus.map { status =>
-      val bmId = status.blockManagerId
       new SparkExecutorInfoImpl(
-        bmId.host,
-        bmId.port,
-        status.cacheSize,
-        executorIdToRunningTasks.getOrElse(bmId.executorId, 0)
-      )
-    }
+        host,
+        port,
+        cachedMem,
+        exec.activeTasks)
+    }.toArray
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/8ff474f6/core/src/main/scala/org/apache/spark/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala 
b/core/src/main/scala/org/apache/spark/TestUtils.scala
index a80016d..93e7ee3 100644
--- a/core/src/main/scala/org/apache/spark/TestUtils.scala
+++ b/core/src/main/scala/org/apache/spark/TestUtils.scala
@@ -23,7 +23,7 @@ import java.nio.charset.StandardCharsets
 import java.security.SecureRandom
 import java.security.cert.X509Certificate
 import java.util.Arrays
-import java.util.concurrent.{CountDownLatch, TimeUnit}
+import java.util.concurrent.{CountDownLatch, TimeoutException, TimeUnit}
 import java.util.jar.{JarEntry, JarOutputStream}
 import javax.net.ssl._
 import javax.tools.{JavaFileObject, SimpleJavaFileObject, ToolProvider}
@@ -232,6 +232,30 @@ private[spark] object TestUtils {
     }
   }
 
+  /**
+   * Wait until at least `numExecutors` executors are up, or throw 
`TimeoutException` if the waiting
+   * time elapsed before `numExecutors` executors up. Exposed for testing.
+   *
+   * @param numExecutors the number of executors to wait at least
+   * @param timeout time to wait in milliseconds
+   */
+  private[spark] def waitUntilExecutorsUp(
+      sc: SparkContext,
+      numExecutors: Int,
+      timeout: Long): Unit = {
+    val finishTime = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeout)
+    while (System.nanoTime() < finishTime) {
+      if (sc.statusTracker.getExecutorInfos.length > numExecutors) {
+        return
+      }
+      // Sleep rather than using wait/notify, because this is used only for 
testing and wait/notify
+      // add overhead in the general case.
+      Thread.sleep(10)
+    }
+    throw new TimeoutException(
+      s"Can't find $numExecutors executors before $timeout milliseconds 
elapsed")
+  }
+
 }
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/8ff474f6/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 69ccde3..6a83c10 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -299,8 +299,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
       attempt.adminAclsGroups.getOrElse(""))
     secManager.setViewAclsGroups(attempt.viewAclsGroups.getOrElse(""))
 
-    val replayBus = new ReplayListenerBus()
-
     val uiStorePath = storePath.map { path => getStorePath(path, appId, 
attemptId) }
 
     val (kvstore, needReplay) = uiStorePath match {
@@ -320,48 +318,43 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
         (new InMemoryStore(), true)
     }
 
-    val listener = if (needReplay) {
-      val _listener = new AppStatusListener(kvstore, conf, false,
+    if (needReplay) {
+      val replayBus = new ReplayListenerBus()
+      val listener = new AppStatusListener(kvstore, conf, false,
         lastUpdateTime = Some(attempt.info.lastUpdated.getTime()))
-      replayBus.addListener(_listener)
+      replayBus.addListener(listener)
       AppStatusPlugin.loadPlugins().foreach { plugin =>
         plugin.setupListeners(conf, kvstore, l => replayBus.addListener(l), 
false)
       }
-      Some(_listener)
-    } else {
-      None
+      try {
+        val fileStatus = fs.getFileStatus(new Path(logDir, attempt.logPath))
+        replay(fileStatus, isApplicationCompleted(fileStatus), replayBus)
+        listener.flush()
+      } catch {
+        case e: Exception =>
+          try {
+            kvstore.close()
+          } catch {
+            case _e: Exception => logInfo("Error closing store.", _e)
+          }
+          uiStorePath.foreach(Utils.deleteRecursively)
+          if (e.isInstanceOf[FileNotFoundException]) {
+            return None
+          } else {
+            throw e
+          }
+      }
     }
 
-    val loadedUI = {
-      val ui = SparkUI.create(None, new AppStatusStore(kvstore), conf, 
secManager, app.info.name,
-        HistoryServer.getAttemptURI(appId, attempt.info.attemptId),
-        attempt.info.startTime.getTime(),
-        attempt.info.appSparkVersion)
-      LoadedAppUI(ui)
+    val ui = SparkUI.create(None, new AppStatusStore(kvstore), conf, 
secManager, app.info.name,
+      HistoryServer.getAttemptURI(appId, attempt.info.attemptId),
+      attempt.info.startTime.getTime(),
+      attempt.info.appSparkVersion)
+    AppStatusPlugin.loadPlugins().foreach { plugin =>
+      plugin.setupUI(ui)
     }
 
-    try {
-      AppStatusPlugin.loadPlugins().foreach { plugin =>
-        plugin.setupUI(loadedUI.ui)
-      }
-
-      val fileStatus = fs.getFileStatus(new Path(logDir, attempt.logPath))
-      replay(fileStatus, isApplicationCompleted(fileStatus), replayBus)
-      listener.foreach(_.flush())
-    } catch {
-      case e: Exception =>
-        try {
-          kvstore.close()
-        } catch {
-          case _e: Exception => logInfo("Error closing store.", _e)
-        }
-        uiStorePath.foreach(Utils.deleteRecursively)
-        if (e.isInstanceOf[FileNotFoundException]) {
-          return None
-        } else {
-          throw e
-        }
-    }
+    val loadedUI = LoadedAppUI(ui)
 
     synchronized {
       activeUIs((appId, attemptId)) = loadedUI

http://git-wip-us.apache.org/repos/asf/spark/blob/8ff474f6/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala 
b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
index f2d8e0a..9c23d9d 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
@@ -18,7 +18,10 @@
 package org.apache.spark.status
 
 import java.util.Date
+import java.util.concurrent.ConcurrentHashMap
+import java.util.function.Function
 
+import scala.collection.JavaConverters._
 import scala.collection.mutable.HashMap
 
 import org.apache.spark._
@@ -59,7 +62,7 @@ private[spark] class AppStatusListener(
 
   // Keep track of live entities, so that task metrics can be efficiently 
updated (without
   // causing too many writes to the underlying store, and other expensive 
operations).
-  private val liveStages = new HashMap[(Int, Int), LiveStage]()
+  private val liveStages = new ConcurrentHashMap[(Int, Int), LiveStage]()
   private val liveJobs = new HashMap[Int, LiveJob]()
   private val liveExecutors = new HashMap[String, LiveExecutor]()
   private val liveTasks = new HashMap[Long, LiveTask]()
@@ -268,13 +271,15 @@ private[spark] class AppStatusListener(
       val now = System.nanoTime()
 
       // Check if there are any pending stages that match this job; mark those 
as skipped.
-      job.stageIds.foreach { sid =>
-        val pending = liveStages.filter { case ((id, _), _) => id == sid }
-        pending.foreach { case (key, stage) =>
+      val it = liveStages.entrySet.iterator()
+      while (it.hasNext()) {
+        val e = it.next()
+        if (job.stageIds.contains(e.getKey()._1)) {
+          val stage = e.getValue()
           stage.status = v1.StageStatus.SKIPPED
           job.skippedStages += stage.info.stageId
           job.skippedTasks += stage.info.numTasks
-          liveStages.remove(key)
+          it.remove()
           update(stage, now)
         }
       }
@@ -336,7 +341,7 @@ private[spark] class AppStatusListener(
     liveTasks.put(event.taskInfo.taskId, task)
     liveUpdate(task, now)
 
-    liveStages.get((event.stageId, event.stageAttemptId)).foreach { stage =>
+    Option(liveStages.get((event.stageId, event.stageAttemptId))).foreach { 
stage =>
       stage.activeTasks += 1
       stage.firstLaunchTime = math.min(stage.firstLaunchTime, 
event.taskInfo.launchTime)
       maybeUpdate(stage, now)
@@ -403,7 +408,7 @@ private[spark] class AppStatusListener(
         (0, 1, 0)
     }
 
-    liveStages.get((event.stageId, event.stageAttemptId)).foreach { stage =>
+    Option(liveStages.get((event.stageId, event.stageAttemptId))).foreach { 
stage =>
       if (metricsDelta != null) {
         stage.metrics.update(metricsDelta)
       }
@@ -466,12 +471,19 @@ private[spark] class AppStatusListener(
         }
       }
 
-      maybeUpdate(exec, now)
+      // Force an update on live applications when the number of active tasks 
reaches 0. This is
+      // checked in some tests (e.g. SQLTestUtilsBase) so it needs to be 
reliably up to date.
+      if (exec.activeTasks == 0) {
+        liveUpdate(exec, now)
+      } else {
+        maybeUpdate(exec, now)
+      }
     }
   }
 
   override def onStageCompleted(event: SparkListenerStageCompleted): Unit = {
-    liveStages.remove((event.stageInfo.stageId, 
event.stageInfo.attemptId)).foreach { stage =>
+    val maybeStage = Option(liveStages.remove((event.stageInfo.stageId, 
event.stageInfo.attemptId)))
+    maybeStage.foreach { stage =>
       val now = System.nanoTime()
       stage.info = event.stageInfo
 
@@ -540,7 +552,7 @@ private[spark] class AppStatusListener(
         val delta = task.updateMetrics(metrics)
         maybeUpdate(task, now)
 
-        liveStages.get((sid, sAttempt)).foreach { stage =>
+        Option(liveStages.get((sid, sAttempt))).foreach { stage =>
           stage.metrics.update(delta)
           maybeUpdate(stage, now)
 
@@ -563,7 +575,7 @@ private[spark] class AppStatusListener(
   /** Flush all live entities' data to the underlying store. */
   def flush(): Unit = {
     val now = System.nanoTime()
-    liveStages.values.foreach { stage =>
+    liveStages.values.asScala.foreach { stage =>
       update(stage, now)
       stage.executorSummaries.values.foreach(update(_, now))
     }
@@ -574,6 +586,18 @@ private[spark] class AppStatusListener(
     pools.values.foreach(update(_, now))
   }
 
+  /**
+   * Shortcut to get active stages quickly in a live application, for use by 
the console
+   * progress bar.
+   */
+  def activeStages(): Seq[v1.StageData] = {
+    liveStages.values.asScala
+      .filter(_.info.submissionTime.isDefined)
+      .map(_.toApi())
+      .toList
+      .sortBy(_.stageId)
+  }
+
   private def updateRDDBlock(event: SparkListenerBlockUpdated, block: 
RDDBlockId): Unit = {
     val now = System.nanoTime()
     val executorId = event.blockUpdatedInfo.blockManagerId.executorId
@@ -708,7 +732,10 @@ private[spark] class AppStatusListener(
   }
 
   private def getOrCreateStage(info: StageInfo): LiveStage = {
-    val stage = liveStages.getOrElseUpdate((info.stageId, info.attemptId), new 
LiveStage())
+    val stage = liveStages.computeIfAbsent((info.stageId, info.attemptId),
+      new Function[(Int, Int), LiveStage]() {
+        override def apply(key: (Int, Int)): LiveStage = new LiveStage()
+      })
     stage.info = info
     stage
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/8ff474f6/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala 
b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
index d0615e5..22d768b 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
@@ -32,7 +32,9 @@ import org.apache.spark.util.kvstore.{InMemoryStore, KVStore}
 /**
  * A wrapper around a KVStore that provides methods for accessing the API data 
stored within.
  */
-private[spark] class AppStatusStore(val store: KVStore) {
+private[spark] class AppStatusStore(
+    val store: KVStore,
+    listener: Option[AppStatusListener] = None) {
 
   def applicationInfo(): v1.ApplicationInfo = {
     store.view(classOf[ApplicationInfoWrapper]).max(1).iterator().next().info
@@ -70,6 +72,14 @@ private[spark] class AppStatusStore(val store: KVStore) {
     store.read(classOf[ExecutorSummaryWrapper], executorId).info
   }
 
+  /**
+   * This is used by ConsoleProgressBar to quickly fetch active stages for 
drawing the progress
+   * bar. It will only return anything useful when called from a live 
application.
+   */
+  def activeStages(): Seq[v1.StageData] = {
+    listener.map(_.activeStages()).getOrElse(Nil)
+  }
+
   def stageList(statuses: JList[v1.StageStatus]): Seq[v1.StageData] = {
     val it = 
store.view(classOf[StageDataWrapper]).reverse().asScala.map(_.info)
     if (statuses != null && !statuses.isEmpty()) {
@@ -338,11 +348,12 @@ private[spark] object AppStatusStore {
    */
   def createLiveStore(conf: SparkConf, addListenerFn: SparkListener => Unit): 
AppStatusStore = {
     val store = new InMemoryStore()
-    addListenerFn(new AppStatusListener(store, conf, true))
+    val listener = new AppStatusListener(store, conf, true)
+    addListenerFn(listener)
     AppStatusPlugin.loadPlugins().foreach { p =>
       p.setupListeners(conf, store, addListenerFn, true)
     }
-    new AppStatusStore(store)
+    new AppStatusStore(store, listener = Some(listener))
   }
 
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/8ff474f6/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala 
b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
index ef2936c..983c58a 100644
--- a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
+++ b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
@@ -408,8 +408,8 @@ private class LiveStage extends LiveEntity {
       new LiveExecutorStageSummary(info.stageId, info.attemptId, executorId))
   }
 
-  override protected def doUpdate(): Any = {
-    val update = new v1.StageData(
+  def toApi(): v1.StageData = {
+    new v1.StageData(
       status,
       info.stageId,
       info.attemptId,
@@ -449,8 +449,10 @@ private class LiveStage extends LiveEntity {
       None,
       None,
       killedSummary)
+  }
 
-    new StageDataWrapper(update, jobIds)
+  override protected def doUpdate(): Any = {
+    new StageDataWrapper(toApi(), jobIds)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/8ff474f6/core/src/main/scala/org/apache/spark/status/api/v1/StagesResource.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/status/api/v1/StagesResource.scala 
b/core/src/main/scala/org/apache/spark/status/api/v1/StagesResource.scala
index bd4dfe3..b356110 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/StagesResource.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/StagesResource.scala
@@ -25,7 +25,6 @@ import org.apache.spark.scheduler.StageInfo
 import org.apache.spark.status.api.v1.StageStatus._
 import org.apache.spark.status.api.v1.TaskSorting._
 import org.apache.spark.ui.SparkUI
-import org.apache.spark.ui.jobs.UIData.StageUIData
 
 @Produces(Array(MediaType.APPLICATION_JSON))
 private[v1] class StagesResource extends BaseAppResource {

http://git-wip-us.apache.org/repos/asf/spark/blob/8ff474f6/core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala 
b/core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala
index 3ae80ec..3c4ee4e 100644
--- a/core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala
+++ b/core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala
@@ -21,10 +21,11 @@ import java.util.{Timer, TimerTask}
 
 import org.apache.spark._
 import org.apache.spark.internal.Logging
+import org.apache.spark.status.api.v1.StageData
 
 /**
  * ConsoleProgressBar shows the progress of stages in the next line of the 
console. It poll the
- * status of active stages from `sc.statusTracker` periodically, the progress 
bar will be showed
+ * status of active stages from the app state store periodically, the progress 
bar will be showed
  * up after the stage has ran at least 500ms. If multiple stages run in the 
same time, the status
  * of them will be combined together, showed in one line.
  */
@@ -64,9 +65,8 @@ private[spark] class ConsoleProgressBar(sc: SparkContext) 
extends Logging {
     if (now - lastFinishTime < firstDelayMSec) {
       return
     }
-    val stageIds = sc.statusTracker.getActiveStageIds()
-    val stages = 
stageIds.flatMap(sc.statusTracker.getStageInfo).filter(_.numTasks() > 1)
-      .filter(now - _.submissionTime() > firstDelayMSec).sortBy(_.stageId())
+    val stages = sc.statusStore.activeStages()
+      .filter { s => now - s.submissionTime.get.getTime() > firstDelayMSec }
     if (stages.length > 0) {
       show(now, stages.take(3))  // display at most 3 stages in same time
     }
@@ -77,15 +77,15 @@ private[spark] class ConsoleProgressBar(sc: SparkContext) 
extends Logging {
    * after your last output, keeps overwriting itself to hold in one line. The 
logging will follow
    * the progress bar, then progress bar will be showed in next line without 
overwrite logs.
    */
-  private def show(now: Long, stages: Seq[SparkStageInfo]) {
+  private def show(now: Long, stages: Seq[StageData]) {
     val width = TerminalWidth / stages.size
     val bar = stages.map { s =>
-      val total = s.numTasks()
-      val header = s"[Stage ${s.stageId()}:"
-      val tailer = s"(${s.numCompletedTasks()} + ${s.numActiveTasks()}) / 
$total]"
+      val total = s.numTasks
+      val header = s"[Stage ${s.stageId}:"
+      val tailer = s"(${s.numCompleteTasks} + ${s.numActiveTasks}) / $total]"
       val w = width - header.length - tailer.length
       val bar = if (w > 0) {
-        val percent = w * s.numCompletedTasks() / total
+        val percent = w * s.numCompleteTasks / total
         (0 until w).map { i =>
           if (i < percent) "=" else if (i == percent) ">" else " "
         }.mkString("")

http://git-wip-us.apache.org/repos/asf/spark/blob/8ff474f6/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
deleted file mode 100644
index a18e86e..0000000
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ /dev/null
@@ -1,612 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.ui.jobs
-
-import java.util.concurrent.TimeoutException
-
-import scala.collection.mutable.{HashMap, HashSet, LinkedHashMap, ListBuffer}
-
-import org.apache.spark._
-import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.executor.TaskMetrics
-import org.apache.spark.internal.Logging
-import org.apache.spark.internal.config._
-import org.apache.spark.scheduler._
-import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
-import org.apache.spark.storage.BlockManagerId
-import org.apache.spark.ui.SparkUI
-import org.apache.spark.ui.jobs.UIData._
-
-/**
- * :: DeveloperApi ::
- * Tracks task-level information to be displayed in the UI.
- *
- * All access to the data structures in this class must be synchronized on the
- * class, since the UI thread and the EventBus loop may otherwise be reading 
and
- * updating the internal data structures concurrently.
- */
-@DeveloperApi
-@deprecated("This class will be removed in a future release.", "2.2.0")
-class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
-
-  // Define a handful of type aliases so that data structures' types can serve 
as documentation.
-  // These type aliases are public because they're used in the types of public 
fields:
-
-  type JobId = Int
-  type JobGroupId = String
-  type StageId = Int
-  type StageAttemptId = Int
-  type PoolName = String
-  type ExecutorId = String
-
-  // Application:
-  @volatile var startTime = -1L
-  @volatile var endTime = -1L
-
-  // Jobs:
-  val activeJobs = new HashMap[JobId, JobUIData]
-  val completedJobs = ListBuffer[JobUIData]()
-  val failedJobs = ListBuffer[JobUIData]()
-  val jobIdToData = new HashMap[JobId, JobUIData]
-  val jobGroupToJobIds = new HashMap[JobGroupId, HashSet[JobId]]
-
-  // Stages:
-  val pendingStages = new HashMap[StageId, StageInfo]
-  val activeStages = new HashMap[StageId, StageInfo]
-  val completedStages = ListBuffer[StageInfo]()
-  val skippedStages = ListBuffer[StageInfo]()
-  val failedStages = ListBuffer[StageInfo]()
-  val stageIdToData = new HashMap[(StageId, StageAttemptId), StageUIData]
-  val stageIdToInfo = new HashMap[StageId, StageInfo]
-  val stageIdToActiveJobIds = new HashMap[StageId, HashSet[JobId]]
-  val poolToActiveStages = HashMap[PoolName, HashMap[StageId, StageInfo]]()
-  // Total of completed and failed stages that have ever been run.  These may 
be greater than
-  // `completedStages.size` and `failedStages.size` if we have run more stages 
or jobs than
-  // JobProgressListener's retention limits.
-  var numCompletedStages = 0
-  var numFailedStages = 0
-  var numCompletedJobs = 0
-  var numFailedJobs = 0
-
-  // Misc:
-  val executorIdToBlockManagerId = HashMap[ExecutorId, BlockManagerId]()
-
-  def blockManagerIds: Seq[BlockManagerId] = 
executorIdToBlockManagerId.values.toSeq
-
-  var schedulingMode: Option[SchedulingMode] = None
-
-  // To limit the total memory usage of JobProgressListener, we only track 
information for a fixed
-  // number of non-active jobs and stages (there is no limit for active jobs 
and stages):
-
-  val retainedStages = conf.getInt("spark.ui.retainedStages", 
SparkUI.DEFAULT_RETAINED_STAGES)
-  val retainedJobs = conf.getInt("spark.ui.retainedJobs", 
SparkUI.DEFAULT_RETAINED_JOBS)
-  val retainedTasks = conf.get(UI_RETAINED_TASKS)
-
-  // We can test for memory leaks by ensuring that collections that track 
non-active jobs and
-  // stages do not grow without bound and that collections for active 
jobs/stages eventually become
-  // empty once Spark is idle.  Let's partition our collections into ones that 
should be empty
-  // once Spark is idle and ones that should have a hard- or soft-limited 
sizes.
-  // These methods are used by unit tests, but they're defined here so that 
people don't forget to
-  // update the tests when adding new collections.  Some collections have 
multiple levels of
-  // nesting, etc, so this lets us customize our notion of "size" for each 
structure:
-
-  // These collections should all be empty once Spark is idle (no active 
stages / jobs):
-  private[spark] def getSizesOfActiveStateTrackingCollections: Map[String, 
Int] = {
-    Map(
-      "activeStages" -> activeStages.size,
-      "activeJobs" -> activeJobs.size,
-      "poolToActiveStages" -> poolToActiveStages.values.map(_.size).sum,
-      "stageIdToActiveJobIds" -> stageIdToActiveJobIds.values.map(_.size).sum
-    )
-  }
-
-  // These collections should stop growing once we have run at least 
`spark.ui.retainedStages`
-  // stages and `spark.ui.retainedJobs` jobs:
-  private[spark] def getSizesOfHardSizeLimitedCollections: Map[String, Int] = {
-    Map(
-      "completedJobs" -> completedJobs.size,
-      "failedJobs" -> failedJobs.size,
-      "completedStages" -> completedStages.size,
-      "skippedStages" -> skippedStages.size,
-      "failedStages" -> failedStages.size
-    )
-  }
-
-  // These collections may grow arbitrarily, but once Spark becomes idle they 
should shrink back to
-  // some bound based on the `spark.ui.retainedStages` and 
`spark.ui.retainedJobs` settings:
-  private[spark] def getSizesOfSoftSizeLimitedCollections: Map[String, Int] = {
-    Map(
-      "jobIdToData" -> jobIdToData.size,
-      "stageIdToData" -> stageIdToData.size,
-      "stageIdToStageInfo" -> stageIdToInfo.size,
-      "jobGroupToJobIds" -> jobGroupToJobIds.values.map(_.size).sum,
-      // Since jobGroupToJobIds is map of sets, check that we don't leak keys 
with empty values:
-      "jobGroupToJobIds keySet" -> jobGroupToJobIds.keys.size
-    )
-  }
-
-  /** If stages is too large, remove and garbage collect old stages */
-  private def trimStagesIfNecessary(stages: ListBuffer[StageInfo]) = 
synchronized {
-    if (stages.size > retainedStages) {
-      val toRemove = calculateNumberToRemove(stages.size, retainedStages)
-      stages.take(toRemove).foreach { s =>
-        stageIdToData.remove((s.stageId, s.attemptId))
-        stageIdToInfo.remove(s.stageId)
-      }
-      stages.trimStart(toRemove)
-    }
-  }
-
-  /** If jobs is too large, remove and garbage collect old jobs */
-  private def trimJobsIfNecessary(jobs: ListBuffer[JobUIData]) = synchronized {
-    if (jobs.size > retainedJobs) {
-      val toRemove = calculateNumberToRemove(jobs.size, retainedJobs)
-      jobs.take(toRemove).foreach { job =>
-        // Remove the job's UI data, if it exists
-        jobIdToData.remove(job.jobId).foreach { removedJob =>
-          // A null jobGroupId is used for jobs that are run without a job 
group
-          val jobGroupId = removedJob.jobGroup.orNull
-          // Remove the job group -> job mapping entry, if it exists
-          jobGroupToJobIds.get(jobGroupId).foreach { jobsInGroup =>
-            jobsInGroup.remove(job.jobId)
-            // If this was the last job in this job group, remove the map 
entry for the job group
-            if (jobsInGroup.isEmpty) {
-              jobGroupToJobIds.remove(jobGroupId)
-            }
-          }
-        }
-      }
-      jobs.trimStart(toRemove)
-    }
-  }
-
-  override def onJobStart(jobStart: SparkListenerJobStart): Unit = 
synchronized {
-    val jobGroup = for (
-      props <- Option(jobStart.properties);
-      group <- Option(props.getProperty(SparkContext.SPARK_JOB_GROUP_ID))
-    ) yield group
-    val jobData: JobUIData =
-      new JobUIData(
-        jobId = jobStart.jobId,
-        submissionTime = Option(jobStart.time).filter(_ >= 0),
-        stageIds = jobStart.stageIds,
-        jobGroup = jobGroup,
-        status = JobExecutionStatus.RUNNING)
-    // A null jobGroupId is used for jobs that are run without a job group
-    jobGroupToJobIds.getOrElseUpdate(jobGroup.orNull, new 
HashSet[JobId]).add(jobStart.jobId)
-    jobStart.stageInfos.foreach(x => pendingStages(x.stageId) = x)
-    // Compute (a potential underestimate of) the number of tasks that will be 
run by this job.
-    // This may be an underestimate because the job start event references all 
of the result
-    // stages' transitive stage dependencies, but some of these stages might 
be skipped if their
-    // output is available from earlier runs.
-    // See https://github.com/apache/spark/pull/3009 for a more extensive 
discussion.
-    jobData.numTasks = {
-      val allStages = jobStart.stageInfos
-      val missingStages = allStages.filter(_.completionTime.isEmpty)
-      missingStages.map(_.numTasks).sum
-    }
-    jobIdToData(jobStart.jobId) = jobData
-    activeJobs(jobStart.jobId) = jobData
-    for (stageId <- jobStart.stageIds) {
-      stageIdToActiveJobIds.getOrElseUpdate(stageId, new 
HashSet[StageId]).add(jobStart.jobId)
-    }
-    // If there's no information for a stage, store the StageInfo received 
from the scheduler
-    // so that we can display stage descriptions for pending stages:
-    for (stageInfo <- jobStart.stageInfos) {
-      stageIdToInfo.getOrElseUpdate(stageInfo.stageId, stageInfo)
-      stageIdToData.getOrElseUpdate((stageInfo.stageId, stageInfo.attemptId), 
new StageUIData)
-    }
-  }
-
-  override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = synchronized {
-    val jobData = activeJobs.remove(jobEnd.jobId).getOrElse {
-      logWarning(s"Job completed for unknown job ${jobEnd.jobId}")
-      new JobUIData(jobId = jobEnd.jobId)
-    }
-    jobData.completionTime = Option(jobEnd.time).filter(_ >= 0)
-
-    jobData.stageIds.foreach(pendingStages.remove)
-    jobEnd.jobResult match {
-      case JobSucceeded =>
-        completedJobs += jobData
-        trimJobsIfNecessary(completedJobs)
-        jobData.status = JobExecutionStatus.SUCCEEDED
-        numCompletedJobs += 1
-      case JobFailed(_) =>
-        failedJobs += jobData
-        trimJobsIfNecessary(failedJobs)
-        jobData.status = JobExecutionStatus.FAILED
-        numFailedJobs += 1
-    }
-    for (stageId <- jobData.stageIds) {
-      stageIdToActiveJobIds.get(stageId).foreach { jobsUsingStage =>
-        jobsUsingStage.remove(jobEnd.jobId)
-        if (jobsUsingStage.isEmpty) {
-          stageIdToActiveJobIds.remove(stageId)
-        }
-        stageIdToInfo.get(stageId).foreach { stageInfo =>
-          if (stageInfo.submissionTime.isEmpty) {
-            // if this stage is pending, it won't complete, so mark it as 
"skipped":
-            skippedStages += stageInfo
-            trimStagesIfNecessary(skippedStages)
-            jobData.numSkippedStages += 1
-            jobData.numSkippedTasks += stageInfo.numTasks
-          }
-        }
-      }
-    }
-  }
-
-  override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): 
Unit = synchronized {
-    val stage = stageCompleted.stageInfo
-    stageIdToInfo(stage.stageId) = stage
-    val stageData = stageIdToData.getOrElseUpdate((stage.stageId, 
stage.attemptId), {
-      logWarning("Stage completed for unknown stage " + stage.stageId)
-      new StageUIData
-    })
-
-    for ((id, info) <- stageCompleted.stageInfo.accumulables) {
-      stageData.accumulables(id) = info
-    }
-
-    poolToActiveStages.get(stageData.schedulingPool).foreach { hashMap =>
-      hashMap.remove(stage.stageId)
-    }
-    activeStages.remove(stage.stageId)
-    if (stage.failureReason.isEmpty) {
-      completedStages += stage
-      numCompletedStages += 1
-      trimStagesIfNecessary(completedStages)
-    } else {
-      failedStages += stage
-      numFailedStages += 1
-      trimStagesIfNecessary(failedStages)
-    }
-
-    for (
-      activeJobsDependentOnStage <- stageIdToActiveJobIds.get(stage.stageId);
-      jobId <- activeJobsDependentOnStage;
-      jobData <- jobIdToData.get(jobId)
-    ) {
-      jobData.numActiveStages -= 1
-      if (stage.failureReason.isEmpty) {
-        if (stage.submissionTime.isDefined) {
-          jobData.completedStageIndices.add(stage.stageId)
-        }
-      } else {
-        jobData.numFailedStages += 1
-      }
-    }
-  }
-
-  /** For FIFO, all stages are contained by "default" pool but "default" pool 
here is meaningless */
-  override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): 
Unit = synchronized {
-    val stage = stageSubmitted.stageInfo
-    activeStages(stage.stageId) = stage
-    pendingStages.remove(stage.stageId)
-    val poolName = Option(stageSubmitted.properties).map {
-      p => p.getProperty("spark.scheduler.pool", SparkUI.DEFAULT_POOL_NAME)
-    }.getOrElse(SparkUI.DEFAULT_POOL_NAME)
-
-    stageIdToInfo(stage.stageId) = stage
-    val stageData = stageIdToData.getOrElseUpdate((stage.stageId, 
stage.attemptId), new StageUIData)
-    stageData.schedulingPool = poolName
-
-    stageData.description = Option(stageSubmitted.properties).flatMap {
-      p => Option(p.getProperty(SparkContext.SPARK_JOB_DESCRIPTION))
-    }
-
-    val stages = poolToActiveStages.getOrElseUpdate(poolName, new HashMap[Int, 
StageInfo])
-    stages(stage.stageId) = stage
-
-    for (
-      activeJobsDependentOnStage <- stageIdToActiveJobIds.get(stage.stageId);
-      jobId <- activeJobsDependentOnStage;
-      jobData <- jobIdToData.get(jobId)
-    ) {
-      jobData.numActiveStages += 1
-
-      // If a stage retries again, it should be removed from 
completedStageIndices set
-      jobData.completedStageIndices.remove(stage.stageId)
-    }
-  }
-
-  override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = 
synchronized {
-    val taskInfo = taskStart.taskInfo
-    if (taskInfo != null) {
-      val stageData = stageIdToData.getOrElseUpdate((taskStart.stageId, 
taskStart.stageAttemptId), {
-        logWarning("Task start for unknown stage " + taskStart.stageId)
-        new StageUIData
-      })
-      stageData.numActiveTasks += 1
-      stageData.taskData.put(taskInfo.taskId, TaskUIData(taskInfo))
-    }
-    for (
-      activeJobsDependentOnStage <- 
stageIdToActiveJobIds.get(taskStart.stageId);
-      jobId <- activeJobsDependentOnStage;
-      jobData <- jobIdToData.get(jobId)
-    ) {
-      jobData.numActiveTasks += 1
-    }
-  }
-
-  override def onTaskGettingResult(taskGettingResult: 
SparkListenerTaskGettingResult) {
-    // Do nothing: because we don't do a deep copy of the TaskInfo, the 
TaskInfo in
-    // stageToTaskInfos already has the updated status.
-  }
-
-  override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized {
-    val info = taskEnd.taskInfo
-    // If stage attempt id is -1, it means the DAGScheduler had no idea which 
attempt this task
-    // completion event is for. Let's just drop it here. This means we might 
have some speculation
-    // tasks on the web ui that's never marked as complete.
-    if (info != null && taskEnd.stageAttemptId != -1) {
-      val stageData = stageIdToData.getOrElseUpdate((taskEnd.stageId, 
taskEnd.stageAttemptId), {
-        logWarning("Task end for unknown stage " + taskEnd.stageId)
-        new StageUIData
-      })
-
-      for (accumulableInfo <- info.accumulables) {
-        stageData.accumulables(accumulableInfo.id) = accumulableInfo
-      }
-
-      val execSummaryMap = stageData.executorSummary
-      val execSummary = execSummaryMap.getOrElseUpdate(info.executorId, new 
ExecutorSummary)
-
-      taskEnd.reason match {
-        case Success =>
-          execSummary.succeededTasks += 1
-        case kill: TaskKilled =>
-          execSummary.reasonToNumKilled = 
execSummary.reasonToNumKilled.updated(
-            kill.reason, execSummary.reasonToNumKilled.getOrElse(kill.reason, 
0) + 1)
-        case commitDenied: TaskCommitDenied =>
-          execSummary.reasonToNumKilled = 
execSummary.reasonToNumKilled.updated(
-            commitDenied.toErrorString, 
execSummary.reasonToNumKilled.getOrElse(
-              commitDenied.toErrorString, 0) + 1)
-        case _ =>
-          execSummary.failedTasks += 1
-      }
-      execSummary.taskTime += info.duration
-      stageData.numActiveTasks -= 1
-
-      val errorMessage: Option[String] =
-        taskEnd.reason match {
-          case org.apache.spark.Success =>
-            stageData.completedIndices.add(info.index)
-            stageData.numCompleteTasks += 1
-            None
-          case kill: TaskKilled =>
-            stageData.reasonToNumKilled = stageData.reasonToNumKilled.updated(
-              kill.reason, stageData.reasonToNumKilled.getOrElse(kill.reason, 
0) + 1)
-            Some(kill.toErrorString)
-          case commitDenied: TaskCommitDenied =>
-            stageData.reasonToNumKilled = stageData.reasonToNumKilled.updated(
-              commitDenied.toErrorString, 
stageData.reasonToNumKilled.getOrElse(
-                commitDenied.toErrorString, 0) + 1)
-            Some(commitDenied.toErrorString)
-          case e: ExceptionFailure => // Handle ExceptionFailure because we 
might have accumUpdates
-            stageData.numFailedTasks += 1
-            Some(e.toErrorString)
-          case e: TaskFailedReason => // All other failure cases
-            stageData.numFailedTasks += 1
-            Some(e.toErrorString)
-        }
-
-      val taskMetrics = Option(taskEnd.taskMetrics)
-      taskMetrics.foreach { m =>
-        val oldMetrics = stageData.taskData.get(info.taskId).flatMap(_.metrics)
-        updateAggregateMetrics(stageData, info.executorId, m, oldMetrics)
-      }
-
-      val taskData = stageData.taskData.getOrElseUpdate(info.taskId, 
TaskUIData(info))
-      taskData.updateTaskInfo(info)
-      taskData.updateTaskMetrics(taskMetrics)
-      taskData.errorMessage = errorMessage
-
-      // If Tasks is too large, remove and garbage collect old tasks
-      if (stageData.taskData.size > retainedTasks) {
-        stageData.taskData = stageData.taskData.drop(
-          calculateNumberToRemove(stageData.taskData.size, retainedTasks))
-      }
-
-      for (
-        activeJobsDependentOnStage <- 
stageIdToActiveJobIds.get(taskEnd.stageId);
-        jobId <- activeJobsDependentOnStage;
-        jobData <- jobIdToData.get(jobId)
-      ) {
-        jobData.numActiveTasks -= 1
-        taskEnd.reason match {
-          case Success =>
-            jobData.completedIndices.add((taskEnd.stageId, info.index))
-            jobData.numCompletedTasks += 1
-          case kill: TaskKilled =>
-            jobData.reasonToNumKilled = jobData.reasonToNumKilled.updated(
-              kill.reason, jobData.reasonToNumKilled.getOrElse(kill.reason, 0) 
+ 1)
-          case commitDenied: TaskCommitDenied =>
-            jobData.reasonToNumKilled = jobData.reasonToNumKilled.updated(
-              commitDenied.toErrorString, jobData.reasonToNumKilled.getOrElse(
-                commitDenied.toErrorString, 0) + 1)
-          case _ =>
-            jobData.numFailedTasks += 1
-        }
-      }
-    }
-  }
-
-  /**
-   * Remove at least (maxRetained / 10) items to reduce friction.
-   */
-  private def calculateNumberToRemove(dataSize: Int, retainedSize: Int): Int = 
{
-    math.max(retainedSize / 10, dataSize - retainedSize)
-  }
-
-  /**
-   * Upon receiving new metrics for a task, updates the per-stage and 
per-executor-per-stage
-   * aggregate metrics by calculating deltas between the currently recorded 
metrics and the new
-   * metrics.
-   */
-  def updateAggregateMetrics(
-      stageData: StageUIData,
-      execId: String,
-      taskMetrics: TaskMetrics,
-      oldMetrics: Option[TaskMetricsUIData]) {
-    val execSummary = stageData.executorSummary.getOrElseUpdate(execId, new 
ExecutorSummary)
-
-    val shuffleWriteDelta =
-      taskMetrics.shuffleWriteMetrics.bytesWritten -
-        oldMetrics.map(_.shuffleWriteMetrics.bytesWritten).getOrElse(0L)
-    stageData.shuffleWriteBytes += shuffleWriteDelta
-    execSummary.shuffleWrite += shuffleWriteDelta
-
-    val shuffleWriteRecordsDelta =
-      taskMetrics.shuffleWriteMetrics.recordsWritten -
-        oldMetrics.map(_.shuffleWriteMetrics.recordsWritten).getOrElse(0L)
-    stageData.shuffleWriteRecords += shuffleWriteRecordsDelta
-    execSummary.shuffleWriteRecords += shuffleWriteRecordsDelta
-
-    val shuffleReadDelta =
-      taskMetrics.shuffleReadMetrics.totalBytesRead -
-        oldMetrics.map(_.shuffleReadMetrics.totalBytesRead).getOrElse(0L)
-    stageData.shuffleReadTotalBytes += shuffleReadDelta
-    execSummary.shuffleRead += shuffleReadDelta
-
-    val shuffleReadRecordsDelta =
-      taskMetrics.shuffleReadMetrics.recordsRead -
-        oldMetrics.map(_.shuffleReadMetrics.recordsRead).getOrElse(0L)
-    stageData.shuffleReadRecords += shuffleReadRecordsDelta
-    execSummary.shuffleReadRecords += shuffleReadRecordsDelta
-
-    val inputBytesDelta =
-      taskMetrics.inputMetrics.bytesRead -
-        oldMetrics.map(_.inputMetrics.bytesRead).getOrElse(0L)
-    stageData.inputBytes += inputBytesDelta
-    execSummary.inputBytes += inputBytesDelta
-
-    val inputRecordsDelta =
-      taskMetrics.inputMetrics.recordsRead -
-        oldMetrics.map(_.inputMetrics.recordsRead).getOrElse(0L)
-    stageData.inputRecords += inputRecordsDelta
-    execSummary.inputRecords += inputRecordsDelta
-
-    val outputBytesDelta =
-      taskMetrics.outputMetrics.bytesWritten -
-        oldMetrics.map(_.outputMetrics.bytesWritten).getOrElse(0L)
-    stageData.outputBytes += outputBytesDelta
-    execSummary.outputBytes += outputBytesDelta
-
-    val outputRecordsDelta =
-      taskMetrics.outputMetrics.recordsWritten -
-        oldMetrics.map(_.outputMetrics.recordsWritten).getOrElse(0L)
-    stageData.outputRecords += outputRecordsDelta
-    execSummary.outputRecords += outputRecordsDelta
-
-    val diskSpillDelta =
-      taskMetrics.diskBytesSpilled - 
oldMetrics.map(_.diskBytesSpilled).getOrElse(0L)
-    stageData.diskBytesSpilled += diskSpillDelta
-    execSummary.diskBytesSpilled += diskSpillDelta
-
-    val memorySpillDelta =
-      taskMetrics.memoryBytesSpilled - 
oldMetrics.map(_.memoryBytesSpilled).getOrElse(0L)
-    stageData.memoryBytesSpilled += memorySpillDelta
-    execSummary.memoryBytesSpilled += memorySpillDelta
-
-    val timeDelta =
-      taskMetrics.executorRunTime - 
oldMetrics.map(_.executorRunTime).getOrElse(0L)
-    stageData.executorRunTime += timeDelta
-
-    val cpuTimeDelta =
-      taskMetrics.executorCpuTime - 
oldMetrics.map(_.executorCpuTime).getOrElse(0L)
-    stageData.executorCpuTime += cpuTimeDelta
-  }
-
-  override def onExecutorMetricsUpdate(executorMetricsUpdate: 
SparkListenerExecutorMetricsUpdate) {
-    for ((taskId, sid, sAttempt, accumUpdates) <- 
executorMetricsUpdate.accumUpdates) {
-      val stageData = stageIdToData.getOrElseUpdate((sid, sAttempt), {
-        logWarning("Metrics update for task in unknown stage " + sid)
-        new StageUIData
-      })
-      val taskData = stageData.taskData.get(taskId)
-      val metrics = TaskMetrics.fromAccumulatorInfos(accumUpdates)
-      taskData.foreach { t =>
-        if (!t.taskInfo.finished) {
-          updateAggregateMetrics(stageData, executorMetricsUpdate.execId, 
metrics, t.metrics)
-          // Overwrite task metrics
-          t.updateTaskMetrics(Some(metrics))
-        }
-      }
-    }
-  }
-
-  override def onEnvironmentUpdate(environmentUpdate: 
SparkListenerEnvironmentUpdate) {
-    synchronized {
-      schedulingMode = environmentUpdate
-        .environmentDetails("Spark Properties").toMap
-        .get("spark.scheduler.mode")
-        .map(SchedulingMode.withName)
-    }
-  }
-
-  override def onBlockManagerAdded(blockManagerAdded: 
SparkListenerBlockManagerAdded) {
-    synchronized {
-      val blockManagerId = blockManagerAdded.blockManagerId
-      val executorId = blockManagerId.executorId
-      executorIdToBlockManagerId(executorId) = blockManagerId
-    }
-  }
-
-  override def onBlockManagerRemoved(blockManagerRemoved: 
SparkListenerBlockManagerRemoved) {
-    synchronized {
-      val executorId = blockManagerRemoved.blockManagerId.executorId
-      executorIdToBlockManagerId.remove(executorId)
-    }
-  }
-
-  override def onApplicationStart(appStarted: SparkListenerApplicationStart) {
-    startTime = appStarted.time
-  }
-
-  override def onApplicationEnd(appEnded: SparkListenerApplicationEnd) {
-    endTime = appEnded.time
-  }
-
-  /**
-   * For testing only. Wait until at least `numExecutors` executors are up, or 
throw
-   * `TimeoutException` if the waiting time elapsed before `numExecutors` 
executors up.
-   * Exposed for testing.
-   *
-   * @param numExecutors the number of executors to wait at least
-   * @param timeout time to wait in milliseconds
-   */
-  private[spark] def waitUntilExecutorsUp(numExecutors: Int, timeout: Long): 
Unit = {
-    val finishTime = System.currentTimeMillis() + timeout
-    while (System.currentTimeMillis() < finishTime) {
-      val numBlockManagers = synchronized {
-        blockManagerIds.size
-      }
-      if (numBlockManagers >= numExecutors + 1) {
-        // Need to count the block manager in driver
-        return
-      }
-      // Sleep rather than using wait/notify, because this is used only for 
testing and wait/notify
-      // add overhead in the general case.
-      Thread.sleep(10)
-    }
-    throw new TimeoutException(
-      s"Can't find $numExecutors executors before $timeout milliseconds 
elapsed")
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/8ff474f6/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 5f93f2f..11a6a34 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -32,7 +32,6 @@ import org.apache.spark.scheduler.TaskLocality
 import org.apache.spark.status.AppStatusStore
 import org.apache.spark.status.api.v1._
 import org.apache.spark.ui._
-import org.apache.spark.ui.jobs.UIData._
 import org.apache.spark.util.{Distribution, Utils}
 
 /** Page showing statistics and task list for a given stage */

http://git-wip-us.apache.org/repos/asf/spark/blob/8ff474f6/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
deleted file mode 100644
index 5acec0d..0000000
--- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
+++ /dev/null
@@ -1,311 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.ui.jobs
-
-import scala.collection.mutable
-import scala.collection.mutable.{HashMap, LinkedHashMap}
-
-import com.google.common.collect.Interners
-
-import org.apache.spark.JobExecutionStatus
-import org.apache.spark.executor._
-import org.apache.spark.scheduler.{AccumulableInfo, TaskInfo}
-import org.apache.spark.util.AccumulatorContext
-import org.apache.spark.util.collection.OpenHashSet
-
-private[spark] object UIData {
-
-  class ExecutorSummary {
-    var taskTime : Long = 0
-    var failedTasks : Int = 0
-    var succeededTasks : Int = 0
-    var reasonToNumKilled : Map[String, Int] = Map.empty
-    var inputBytes : Long = 0
-    var inputRecords : Long = 0
-    var outputBytes : Long = 0
-    var outputRecords : Long = 0
-    var shuffleRead : Long = 0
-    var shuffleReadRecords : Long = 0
-    var shuffleWrite : Long = 0
-    var shuffleWriteRecords : Long = 0
-    var memoryBytesSpilled : Long = 0
-    var diskBytesSpilled : Long = 0
-    var isBlacklisted : Int = 0
-  }
-
-  class JobUIData(
-    var jobId: Int = -1,
-    var submissionTime: Option[Long] = None,
-    var completionTime: Option[Long] = None,
-    var stageIds: Seq[Int] = Seq.empty,
-    var jobGroup: Option[String] = None,
-    var status: JobExecutionStatus = JobExecutionStatus.UNKNOWN,
-    /* Tasks */
-    // `numTasks` is a potential underestimate of the true number of tasks 
that this job will run.
-    // This may be an underestimate because the job start event references all 
of the result
-    // stages' transitive stage dependencies, but some of these stages might 
be skipped if their
-    // output is available from earlier runs.
-    // See https://github.com/apache/spark/pull/3009 for a more extensive 
discussion.
-    var numTasks: Int = 0,
-    var numActiveTasks: Int = 0,
-    var numCompletedTasks: Int = 0,
-    var completedIndices: OpenHashSet[(Int, Int)] = new OpenHashSet[(Int, 
Int)](),
-    var numSkippedTasks: Int = 0,
-    var numFailedTasks: Int = 0,
-    var reasonToNumKilled: Map[String, Int] = Map.empty,
-    /* Stages */
-    var numActiveStages: Int = 0,
-    // This needs to be a set instead of a simple count to prevent 
double-counting of rerun stages:
-    var completedStageIndices: mutable.HashSet[Int] = new 
mutable.HashSet[Int](),
-    var numSkippedStages: Int = 0,
-    var numFailedStages: Int = 0
-  )
-
-  class StageUIData {
-    var numActiveTasks: Int = _
-    var numCompleteTasks: Int = _
-    var completedIndices = new OpenHashSet[Int]()
-    var numFailedTasks: Int = _
-    var reasonToNumKilled: Map[String, Int] = Map.empty
-
-    var executorRunTime: Long = _
-    var executorCpuTime: Long = _
-
-    var inputBytes: Long = _
-    var inputRecords: Long = _
-    var outputBytes: Long = _
-    var outputRecords: Long = _
-    var shuffleReadTotalBytes: Long = _
-    var shuffleReadRecords : Long = _
-    var shuffleWriteBytes: Long = _
-    var shuffleWriteRecords: Long = _
-    var memoryBytesSpilled: Long = _
-    var diskBytesSpilled: Long = _
-    var isBlacklisted: Int = _
-    var lastUpdateTime: Option[Long] = None
-
-    var schedulingPool: String = ""
-    var description: Option[String] = None
-
-    var accumulables = new HashMap[Long, AccumulableInfo]
-    var taskData = new LinkedHashMap[Long, TaskUIData]
-    var executorSummary = new HashMap[String, ExecutorSummary]
-
-    def hasInput: Boolean = inputBytes > 0
-    def hasOutput: Boolean = outputBytes > 0
-    def hasShuffleRead: Boolean = shuffleReadTotalBytes > 0
-    def hasShuffleWrite: Boolean = shuffleWriteBytes > 0
-    def hasBytesSpilled: Boolean = memoryBytesSpilled > 0 || diskBytesSpilled 
> 0
-  }
-
-  /**
-   * These are kept mutable and reused throughout a task's lifetime to avoid 
excessive reallocation.
-   */
-  class TaskUIData private(private var _taskInfo: TaskInfo) {
-
-    private[this] var _metrics: Option[TaskMetricsUIData] = 
Some(TaskMetricsUIData.EMPTY)
-
-    var errorMessage: Option[String] = None
-
-    def taskInfo: TaskInfo = _taskInfo
-
-    def metrics: Option[TaskMetricsUIData] = _metrics
-
-    def updateTaskInfo(taskInfo: TaskInfo): Unit = {
-      _taskInfo = TaskUIData.dropInternalAndSQLAccumulables(taskInfo)
-    }
-
-    def updateTaskMetrics(metrics: Option[TaskMetrics]): Unit = {
-      _metrics = metrics.map(TaskMetricsUIData.fromTaskMetrics)
-    }
-
-    def taskDuration(lastUpdateTime: Option[Long] = None): Option[Long] = {
-      if (taskInfo.status == "RUNNING") {
-        
Some(_taskInfo.timeRunning(lastUpdateTime.getOrElse(System.currentTimeMillis)))
-      } else {
-        _metrics.map(_.executorRunTime)
-      }
-    }
-  }
-
-  object TaskUIData {
-
-    private val stringInterner = Interners.newWeakInterner[String]()
-
-    /** String interning to reduce the memory usage. */
-    private def weakIntern(s: String): String = {
-      stringInterner.intern(s)
-    }
-
-    def apply(taskInfo: TaskInfo): TaskUIData = {
-      new TaskUIData(dropInternalAndSQLAccumulables(taskInfo))
-    }
-
-    /**
-     * We don't need to store internal or SQL accumulables as their values 
will be shown in other
-     * places, so drop them to reduce the memory usage.
-     */
-    private[spark] def dropInternalAndSQLAccumulables(taskInfo: TaskInfo): 
TaskInfo = {
-      val newTaskInfo = new TaskInfo(
-        taskId = taskInfo.taskId,
-        index = taskInfo.index,
-        attemptNumber = taskInfo.attemptNumber,
-        launchTime = taskInfo.launchTime,
-        executorId = weakIntern(taskInfo.executorId),
-        host = weakIntern(taskInfo.host),
-        taskLocality = taskInfo.taskLocality,
-        speculative = taskInfo.speculative
-      )
-      newTaskInfo.gettingResultTime = taskInfo.gettingResultTime
-      newTaskInfo.setAccumulables(taskInfo.accumulables.filter {
-        accum => !accum.internal && accum.metadata != 
Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER)
-      })
-      newTaskInfo.finishTime = taskInfo.finishTime
-      newTaskInfo.failed = taskInfo.failed
-      newTaskInfo.killed = taskInfo.killed
-      newTaskInfo
-    }
-  }
-
-  case class TaskMetricsUIData(
-      executorDeserializeTime: Long,
-      executorDeserializeCpuTime: Long,
-      executorRunTime: Long,
-      executorCpuTime: Long,
-      resultSize: Long,
-      jvmGCTime: Long,
-      resultSerializationTime: Long,
-      memoryBytesSpilled: Long,
-      diskBytesSpilled: Long,
-      peakExecutionMemory: Long,
-      inputMetrics: InputMetricsUIData,
-      outputMetrics: OutputMetricsUIData,
-      shuffleReadMetrics: ShuffleReadMetricsUIData,
-      shuffleWriteMetrics: ShuffleWriteMetricsUIData)
-
-  object TaskMetricsUIData {
-    def fromTaskMetrics(m: TaskMetrics): TaskMetricsUIData = {
-      TaskMetricsUIData(
-        executorDeserializeTime = m.executorDeserializeTime,
-        executorDeserializeCpuTime = m.executorDeserializeCpuTime,
-        executorRunTime = m.executorRunTime,
-        executorCpuTime = m.executorCpuTime,
-        resultSize = m.resultSize,
-        jvmGCTime = m.jvmGCTime,
-        resultSerializationTime = m.resultSerializationTime,
-        memoryBytesSpilled = m.memoryBytesSpilled,
-        diskBytesSpilled = m.diskBytesSpilled,
-        peakExecutionMemory = m.peakExecutionMemory,
-        inputMetrics = InputMetricsUIData(m.inputMetrics),
-        outputMetrics = OutputMetricsUIData(m.outputMetrics),
-        shuffleReadMetrics = ShuffleReadMetricsUIData(m.shuffleReadMetrics),
-        shuffleWriteMetrics = ShuffleWriteMetricsUIData(m.shuffleWriteMetrics))
-    }
-
-    val EMPTY: TaskMetricsUIData = fromTaskMetrics(TaskMetrics.empty)
-  }
-
-  case class InputMetricsUIData(bytesRead: Long, recordsRead: Long)
-  object InputMetricsUIData {
-    def apply(metrics: InputMetrics): InputMetricsUIData = {
-      if (metrics.bytesRead == 0 && metrics.recordsRead == 0) {
-        EMPTY
-      } else {
-        new InputMetricsUIData(
-          bytesRead = metrics.bytesRead,
-          recordsRead = metrics.recordsRead)
-      }
-    }
-    private val EMPTY = InputMetricsUIData(0, 0)
-  }
-
-  case class OutputMetricsUIData(bytesWritten: Long, recordsWritten: Long)
-  object OutputMetricsUIData {
-    def apply(metrics: OutputMetrics): OutputMetricsUIData = {
-      if (metrics.bytesWritten == 0 && metrics.recordsWritten == 0) {
-        EMPTY
-      } else {
-        new OutputMetricsUIData(
-          bytesWritten = metrics.bytesWritten,
-          recordsWritten = metrics.recordsWritten)
-      }
-    }
-    private val EMPTY = OutputMetricsUIData(0, 0)
-  }
-
-  case class ShuffleReadMetricsUIData(
-      remoteBlocksFetched: Long,
-      localBlocksFetched: Long,
-      remoteBytesRead: Long,
-      remoteBytesReadToDisk: Long,
-      localBytesRead: Long,
-      fetchWaitTime: Long,
-      recordsRead: Long,
-      totalBytesRead: Long,
-      totalBlocksFetched: Long)
-
-  object ShuffleReadMetricsUIData {
-    def apply(metrics: ShuffleReadMetrics): ShuffleReadMetricsUIData = {
-      if (
-          metrics.remoteBlocksFetched == 0 &&
-          metrics.localBlocksFetched == 0 &&
-          metrics.remoteBytesRead == 0 &&
-          metrics.localBytesRead == 0 &&
-          metrics.fetchWaitTime == 0 &&
-          metrics.recordsRead == 0 &&
-          metrics.totalBytesRead == 0 &&
-          metrics.totalBlocksFetched == 0) {
-        EMPTY
-      } else {
-        new ShuffleReadMetricsUIData(
-          remoteBlocksFetched = metrics.remoteBlocksFetched,
-          localBlocksFetched = metrics.localBlocksFetched,
-          remoteBytesRead = metrics.remoteBytesRead,
-          remoteBytesReadToDisk = metrics.remoteBytesReadToDisk,
-          localBytesRead = metrics.localBytesRead,
-          fetchWaitTime = metrics.fetchWaitTime,
-          recordsRead = metrics.recordsRead,
-          totalBytesRead = metrics.totalBytesRead,
-          totalBlocksFetched = metrics.totalBlocksFetched
-        )
-      }
-    }
-    private val EMPTY = ShuffleReadMetricsUIData(0, 0, 0, 0, 0, 0, 0, 0, 0)
-  }
-
-  case class ShuffleWriteMetricsUIData(
-      bytesWritten: Long,
-      recordsWritten: Long,
-      writeTime: Long)
-
-  object ShuffleWriteMetricsUIData {
-    def apply(metrics: ShuffleWriteMetrics): ShuffleWriteMetricsUIData = {
-      if (metrics.bytesWritten == 0 && metrics.recordsWritten == 0 && 
metrics.writeTime == 0) {
-        EMPTY
-      } else {
-        new ShuffleWriteMetricsUIData(
-          bytesWritten = metrics.bytesWritten,
-          recordsWritten = metrics.recordsWritten,
-          writeTime = metrics.writeTime
-        )
-      }
-    }
-    private val EMPTY = ShuffleWriteMetricsUIData(0, 0, 0)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/8ff474f6/core/src/test/scala/org/apache/spark/DistributedSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala 
b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
index ea9f6d2..e09d5f5 100644
--- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
@@ -156,7 +156,7 @@ class DistributedSuite extends SparkFunSuite with Matchers 
with LocalSparkContex
 
   private def testCaching(conf: SparkConf, storageLevel: StorageLevel): Unit = 
{
     sc = new SparkContext(conf.setMaster(clusterUrl).setAppName("test"))
-    sc.jobProgressListener.waitUntilExecutorsUp(2, 30000)
+    TestUtils.waitUntilExecutorsUp(sc, 2, 30000)
     val data = sc.parallelize(1 to 1000, 10)
     val cachedData = data.persist(storageLevel)
     assert(cachedData.count === 1000)

http://git-wip-us.apache.org/repos/asf/spark/blob/8ff474f6/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala 
b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
index fe94403..472952a 100644
--- a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
@@ -66,7 +66,7 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with 
BeforeAndAfterAll {
     // local blocks from the local BlockManager and won't send requests to 
ExternalShuffleService.
     // In this case, we won't receive FetchFailed. And it will make this test 
fail.
     // Therefore, we should wait until all slaves are up
-    sc.jobProgressListener.waitUntilExecutorsUp(2, 60000)
+    TestUtils.waitUntilExecutorsUp(sc, 2, 60000)
 
     val rdd = sc.parallelize(0 until 1000, 10).map(i => (i, 1)).reduceByKey(_ 
+ _)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/8ff474f6/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala 
b/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala
index 5483f2b..a15ae04 100644
--- a/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala
@@ -44,13 +44,13 @@ class StatusTrackerSuite extends SparkFunSuite with 
Matchers with LocalSparkCont
     stageIds.size should be(2)
 
     val firstStageInfo = eventually(timeout(10 seconds)) {
-      sc.statusTracker.getStageInfo(stageIds(0)).get
+      sc.statusTracker.getStageInfo(stageIds.min).get
     }
-    firstStageInfo.stageId() should be(stageIds(0))
+    firstStageInfo.stageId() should be(stageIds.min)
     firstStageInfo.currentAttemptId() should be(0)
     firstStageInfo.numTasks() should be(2)
     eventually(timeout(10 seconds)) {
-      val updatedFirstStageInfo = 
sc.statusTracker.getStageInfo(stageIds(0)).get
+      val updatedFirstStageInfo = 
sc.statusTracker.getStageInfo(stageIds.min).get
       updatedFirstStageInfo.numCompletedTasks() should be(2)
       updatedFirstStageInfo.numActiveTasks() should be(0)
       updatedFirstStageInfo.numFailedTasks() should be(0)

http://git-wip-us.apache.org/repos/asf/spark/blob/8ff474f6/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala 
b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
index 46f9ac6..1596298 100644
--- a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
+++ b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
@@ -224,7 +224,7 @@ class BroadcastSuite extends SparkFunSuite with 
LocalSparkContext with Encryptio
         new SparkContext("local-cluster[%d, 1, 1024]".format(numSlaves), 
"test")
       // Wait until all salves are up
       try {
-        _sc.jobProgressListener.waitUntilExecutorsUp(numSlaves, 60000)
+        TestUtils.waitUntilExecutorsUp(_sc, numSlaves, 60000)
         _sc
       } catch {
         case e: Throwable =>

http://git-wip-us.apache.org/repos/asf/spark/blob/8ff474f6/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index d395e09..feefb6a 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -2406,13 +2406,13 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
   // OutputCommitCoordinator requires the task info itself to not be null.
   private def createFakeTaskInfo(): TaskInfo = {
     val info = new TaskInfo(0, 0, 0, 0L, "", "", TaskLocality.ANY, false)
-    info.finishTime = 1  // to prevent spurious errors in JobProgressListener
+    info.finishTime = 1
     info
   }
 
   private def createFakeTaskInfoWithId(taskId: Long): TaskInfo = {
     val info = new TaskInfo(taskId, 0, 0, 0L, "", "", TaskLocality.ANY, false)
-    info.finishTime = 1  // to prevent spurious errors in JobProgressListener
+    info.finishTime = 1
     info
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/8ff474f6/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala
index 9fa8859..123f7f4 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala
@@ -21,7 +21,7 @@ import scala.collection.mutable
 
 import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
 
-import org.apache.spark.{LocalSparkContext, SparkContext, SparkFunSuite}
+import org.apache.spark.{LocalSparkContext, SparkContext, SparkFunSuite, 
TestUtils}
 import org.apache.spark.scheduler.cluster.ExecutorInfo
 
 /**
@@ -43,7 +43,7 @@ class SparkListenerWithClusterSuite extends SparkFunSuite 
with LocalSparkContext
 
     // This test will check if the number of executors received by 
"SparkListener" is same as the
     // number of all executors, so we need to wait until all executors are up
-    sc.jobProgressListener.waitUntilExecutorsUp(2, 60000)
+    TestUtils.waitUntilExecutorsUp(sc, 2, 60000)
 
     val rdd1 = sc.parallelize(1 to 100, 4)
     val rdd2 = rdd1.map(_.toString)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to