Repository: spark
Updated Branches:
  refs/heads/branch-2.3 d9a973d65 -> b78130123


http://git-wip-us.apache.org/repos/asf/spark/blob/b7813012/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 11a6a34..7c6e06c 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -19,6 +19,7 @@ package org.apache.spark.ui.jobs
 
 import java.net.URLEncoder
 import java.util.Date
+import java.util.concurrent.TimeUnit
 import javax.servlet.http.HttpServletRequest
 
 import scala.collection.mutable.{HashMap, HashSet}
@@ -29,15 +30,14 @@ import org.apache.commons.lang3.StringEscapeUtils
 import org.apache.spark.SparkConf
 import org.apache.spark.internal.config._
 import org.apache.spark.scheduler.TaskLocality
-import org.apache.spark.status.AppStatusStore
+import org.apache.spark.status._
 import org.apache.spark.status.api.v1._
 import org.apache.spark.ui._
-import org.apache.spark.util.{Distribution, Utils}
+import org.apache.spark.util.Utils
 
 /** Page showing statistics and task list for a given stage */
 private[ui] class StagePage(parent: StagesTab, store: AppStatusStore) extends 
WebUIPage("stage") {
   import ApiHelper._
-  import StagePage._
 
   private val TIMELINE_LEGEND = {
     <div class="legend-area">
@@ -67,17 +67,17 @@ private[ui] class StagePage(parent: StagesTab, store: 
AppStatusStore) extends We
   // if we find that it's okay.
   private val MAX_TIMELINE_TASKS = 
parent.conf.getInt("spark.ui.timeline.tasks.maximum", 1000)
 
-  private def getLocalitySummaryString(stageData: StageData, taskList: 
Seq[TaskData]): String = {
-    val localities = taskList.map(_.taskLocality)
-    val localityCounts = localities.groupBy(identity).mapValues(_.size)
+  private def getLocalitySummaryString(localitySummary: Map[String, Long]): 
String = {
     val names = Map(
       TaskLocality.PROCESS_LOCAL.toString() -> "Process local",
       TaskLocality.NODE_LOCAL.toString() -> "Node local",
       TaskLocality.RACK_LOCAL.toString() -> "Rack local",
       TaskLocality.ANY.toString() -> "Any")
-    val localityNamesAndCounts = localityCounts.toSeq.map { case (locality, 
count) =>
-      s"${names(locality)}: $count"
-    }
+    val localityNamesAndCounts = names.flatMap { case (key, name) =>
+      localitySummary.get(key).map { count =>
+        s"$name: $count"
+      }
+    }.toSeq
     localityNamesAndCounts.sorted.mkString("; ")
   }
 
@@ -108,7 +108,7 @@ private[ui] class StagePage(parent: StagesTab, store: 
AppStatusStore) extends We
 
     val stageHeader = s"Details for Stage $stageId (Attempt $stageAttemptId)"
     val stageData = parent.store
-      .asOption(parent.store.stageAttempt(stageId, stageAttemptId, details = 
true))
+      .asOption(parent.store.stageAttempt(stageId, stageAttemptId, details = 
false))
       .getOrElse {
         val content =
           <div id="no-info">
@@ -117,8 +117,11 @@ private[ui] class StagePage(parent: StagesTab, store: 
AppStatusStore) extends We
         return UIUtils.headerSparkPage(stageHeader, content, parent)
       }
 
-    val tasks = stageData.tasks.getOrElse(Map.empty).values.toSeq
-    if (tasks.isEmpty) {
+    val localitySummary = store.localitySummary(stageData.stageId, 
stageData.attemptId)
+
+    val totalTasks = stageData.numActiveTasks + stageData.numCompleteTasks +
+      stageData.numFailedTasks + stageData.numKilledTasks
+    if (totalTasks == 0) {
       val content =
         <div>
           <h4>Summary Metrics</h4> No tasks have started yet
@@ -127,18 +130,14 @@ private[ui] class StagePage(parent: StagesTab, store: 
AppStatusStore) extends We
       return UIUtils.headerSparkPage(stageHeader, content, parent)
     }
 
+    val storedTasks = store.taskCount(stageData.stageId, stageData.attemptId)
     val numCompleted = stageData.numCompleteTasks
-    val totalTasks = stageData.numActiveTasks + stageData.numCompleteTasks +
-      stageData.numFailedTasks + stageData.numKilledTasks
-    val totalTasksNumStr = if (totalTasks == tasks.size) {
+    val totalTasksNumStr = if (totalTasks == storedTasks) {
       s"$totalTasks"
     } else {
-      s"$totalTasks, showing ${tasks.size}"
+      s"$totalTasks, showing ${storedTasks}"
     }
 
-    val externalAccumulables = stageData.accumulatorUpdates
-    val hasAccumulators = externalAccumulables.size > 0
-
     val summary =
       <div>
         <ul class="unstyled">
@@ -148,7 +147,7 @@ private[ui] class StagePage(parent: StagesTab, store: 
AppStatusStore) extends We
           </li>
           <li>
             <strong>Locality Level Summary: </strong>
-            {getLocalitySummaryString(stageData, tasks)}
+            {getLocalitySummaryString(localitySummary)}
           </li>
           {if (hasInput(stageData)) {
             <li>
@@ -266,7 +265,7 @@ private[ui] class StagePage(parent: StagesTab, store: 
AppStatusStore) extends We
     val accumulableTable = UIUtils.listingTable(
       accumulableHeaders,
       accumulableRow,
-      externalAccumulables.toSeq)
+      stageData.accumulatorUpdates.toSeq)
 
     val page: Int = {
       // If the user has changed to a larger page size, then go to page 1 in 
order to avoid
@@ -280,16 +279,9 @@ private[ui] class StagePage(parent: StagesTab, store: 
AppStatusStore) extends We
     val currentTime = System.currentTimeMillis()
     val (taskTable, taskTableHTML) = try {
       val _taskTable = new TaskPagedTable(
-        parent.conf,
+        stageData,
         UIUtils.prependBaseUri(parent.basePath) +
           s"/stages/stage?id=${stageId}&attempt=${stageAttemptId}",
-        tasks,
-        hasAccumulators,
-        hasInput(stageData),
-        hasOutput(stageData),
-        hasShuffleRead(stageData),
-        hasShuffleWrite(stageData),
-        hasBytesSpilled(stageData),
         currentTime,
         pageSize = taskPageSize,
         sortColumn = taskSortColumn,
@@ -320,217 +312,155 @@ private[ui] class StagePage(parent: StagesTab, store: 
AppStatusStore) extends We
             |  }
             |});
           """.stripMargin
-         }
+          }
         }
       </script>
 
-    val taskIdsInPage = if (taskTable == null) Set.empty[Long]
-      else taskTable.dataSource.slicedTaskIds
+    val metricsSummary = store.taskSummary(stageData.stageId, 
stageData.attemptId,
+      Array(0, 0.25, 0.5, 0.75, 1.0))
 
-    // Excludes tasks which failed and have incomplete metrics
-    val validTasks = tasks.filter(t => t.status == "SUCCESS" && 
t.taskMetrics.isDefined)
-
-    val summaryTable: Option[Seq[Node]] =
-      if (validTasks.size == 0) {
-        None
-      } else {
-        def getDistributionQuantiles(data: Seq[Double]): IndexedSeq[Double] = {
-          Distribution(data).get.getQuantiles()
-        }
-        def getFormattedTimeQuantiles(times: Seq[Double]): Seq[Node] = {
-          getDistributionQuantiles(times).map { millis =>
-            <td>{UIUtils.formatDuration(millis.toLong)}</td>
-          }
-        }
-        def getFormattedSizeQuantiles(data: Seq[Double]): Seq[Elem] = {
-          getDistributionQuantiles(data).map(d => 
<td>{Utils.bytesToString(d.toLong)}</td>)
+    val summaryTable = metricsSummary.map { metrics =>
+      def timeQuantiles(data: IndexedSeq[Double]): Seq[Node] = {
+        data.map { millis =>
+          <td>{UIUtils.formatDuration(millis.toLong)}</td>
         }
+      }
 
-        val deserializationTimes = validTasks.map { task =>
-          task.taskMetrics.get.executorDeserializeTime.toDouble
-        }
-        val deserializationQuantiles =
-          <td>
-            <span data-toggle="tooltip" 
title={ToolTips.TASK_DESERIALIZATION_TIME}
-                  data-placement="right">
-              Task Deserialization Time
-            </span>
-          </td> +: getFormattedTimeQuantiles(deserializationTimes)
-
-        val serviceTimes = 
validTasks.map(_.taskMetrics.get.executorRunTime.toDouble)
-        val serviceQuantiles = <td>Duration</td> +: 
getFormattedTimeQuantiles(serviceTimes)
-
-        val gcTimes = validTasks.map(_.taskMetrics.get.jvmGcTime.toDouble)
-        val gcQuantiles =
-          <td>
-            <span data-toggle="tooltip"
-                title={ToolTips.GC_TIME} data-placement="right">GC Time
-            </span>
-          </td> +: getFormattedTimeQuantiles(gcTimes)
-
-        val serializationTimes = 
validTasks.map(_.taskMetrics.get.resultSerializationTime.toDouble)
-        val serializationQuantiles =
-          <td>
-            <span data-toggle="tooltip"
-                  title={ToolTips.RESULT_SERIALIZATION_TIME} 
data-placement="right">
-              Result Serialization Time
-            </span>
-          </td> +: getFormattedTimeQuantiles(serializationTimes)
-
-        val gettingResultTimes = validTasks.map(getGettingResultTime(_, 
currentTime).toDouble)
-        val gettingResultQuantiles =
-          <td>
-            <span data-toggle="tooltip"
-                title={ToolTips.GETTING_RESULT_TIME} data-placement="right">
-              Getting Result Time
-            </span>
-          </td> +:
-          getFormattedTimeQuantiles(gettingResultTimes)
-
-        val peakExecutionMemory = 
validTasks.map(_.taskMetrics.get.peakExecutionMemory.toDouble)
-        val peakExecutionMemoryQuantiles = {
-          <td>
-            <span data-toggle="tooltip"
-                  title={ToolTips.PEAK_EXECUTION_MEMORY} 
data-placement="right">
-              Peak Execution Memory
-            </span>
-          </td> +: getFormattedSizeQuantiles(peakExecutionMemory)
+      def sizeQuantiles(data: IndexedSeq[Double]): Seq[Node] = {
+        data.map { size =>
+          <td>{Utils.bytesToString(size.toLong)}</td>
         }
+      }
 
-        // The scheduler delay includes the network delay to send the task to 
the worker
-        // machine and to send back the result (but not the time to fetch the 
task result,
-        // if it needed to be fetched from the block manager on the worker).
-        val schedulerDelays = validTasks.map { task =>
-          getSchedulerDelay(task, task.taskMetrics.get, currentTime).toDouble
-        }
-        val schedulerDelayTitle = <td><span data-toggle="tooltip"
-          title={ToolTips.SCHEDULER_DELAY} data-placement="right">Scheduler 
Delay</span></td>
-        val schedulerDelayQuantiles = schedulerDelayTitle +:
-          getFormattedTimeQuantiles(schedulerDelays)
-        def getFormattedSizeQuantilesWithRecords(data: Seq[Double], records: 
Seq[Double])
-          : Seq[Elem] = {
-          val recordDist = getDistributionQuantiles(records).iterator
-          getDistributionQuantiles(data).map(d =>
-            <td>{s"${Utils.bytesToString(d.toLong)} / 
${recordDist.next().toLong}"}</td>
-          )
+      def sizeQuantilesWithRecords(
+          data: IndexedSeq[Double],
+          records: IndexedSeq[Double]) : Seq[Node] = {
+        data.zip(records).map { case (d, r) =>
+          <td>{s"${Utils.bytesToString(d.toLong)} / ${r.toLong}"}</td>
         }
+      }
 
-        val inputSizes = 
validTasks.map(_.taskMetrics.get.inputMetrics.bytesRead.toDouble)
-        val inputRecords = 
validTasks.map(_.taskMetrics.get.inputMetrics.recordsRead.toDouble)
-        val inputQuantiles = <td>Input Size / Records</td> +:
-          getFormattedSizeQuantilesWithRecords(inputSizes, inputRecords)
+      def titleCell(title: String, tooltip: String): Seq[Node] = {
+        <td>
+          <span data-toggle="tooltip" title={tooltip} data-placement="right">
+            {title}
+          </span>
+        </td>
+      }
 
-        val outputSizes = 
validTasks.map(_.taskMetrics.get.outputMetrics.bytesWritten.toDouble)
-        val outputRecords = 
validTasks.map(_.taskMetrics.get.outputMetrics.recordsWritten.toDouble)
-        val outputQuantiles = <td>Output Size / Records</td> +:
-          getFormattedSizeQuantilesWithRecords(outputSizes, outputRecords)
+      def simpleTitleCell(title: String): Seq[Node] = <td>{title}</td>
 
-        val shuffleReadBlockedTimes = validTasks.map { task =>
-          task.taskMetrics.get.shuffleReadMetrics.fetchWaitTime.toDouble
-        }
-        val shuffleReadBlockedQuantiles =
-          <td>
-            <span data-toggle="tooltip"
-                  title={ToolTips.SHUFFLE_READ_BLOCKED_TIME} 
data-placement="right">
-              Shuffle Read Blocked Time
-            </span>
-          </td> +:
-          getFormattedTimeQuantiles(shuffleReadBlockedTimes)
-
-        val shuffleReadTotalSizes = validTasks.map { task =>
-          totalBytesRead(task.taskMetrics.get.shuffleReadMetrics).toDouble
-        }
-        val shuffleReadTotalRecords = validTasks.map { task =>
-          task.taskMetrics.get.shuffleReadMetrics.recordsRead.toDouble
-        }
-        val shuffleReadTotalQuantiles =
-          <td>
-            <span data-toggle="tooltip"
-                  title={ToolTips.SHUFFLE_READ} data-placement="right">
-              Shuffle Read Size / Records
-            </span>
-          </td> +:
-          getFormattedSizeQuantilesWithRecords(shuffleReadTotalSizes, 
shuffleReadTotalRecords)
-
-        val shuffleReadRemoteSizes = validTasks.map { task =>
-          task.taskMetrics.get.shuffleReadMetrics.remoteBytesRead.toDouble
-        }
-        val shuffleReadRemoteQuantiles =
-          <td>
-            <span data-toggle="tooltip"
-                  title={ToolTips.SHUFFLE_READ_REMOTE_SIZE} 
data-placement="right">
-              Shuffle Remote Reads
-            </span>
-          </td> +:
-          getFormattedSizeQuantiles(shuffleReadRemoteSizes)
-
-        val shuffleWriteSizes = validTasks.map { task =>
-          task.taskMetrics.get.shuffleWriteMetrics.bytesWritten.toDouble
-        }
+      val deserializationQuantiles = titleCell("Task Deserialization Time",
+        ToolTips.TASK_DESERIALIZATION_TIME) ++ 
timeQuantiles(metrics.executorDeserializeTime)
 
-        val shuffleWriteRecords = validTasks.map { task =>
-          task.taskMetrics.get.shuffleWriteMetrics.recordsWritten.toDouble
-        }
+      val serviceQuantiles = simpleTitleCell("Duration") ++ 
timeQuantiles(metrics.executorRunTime)
 
-        val shuffleWriteQuantiles = <td>Shuffle Write Size / Records</td> +:
-          getFormattedSizeQuantilesWithRecords(shuffleWriteSizes, 
shuffleWriteRecords)
+      val gcQuantiles = titleCell("GC Time", ToolTips.GC_TIME) ++ 
timeQuantiles(metrics.jvmGcTime)
 
-        val memoryBytesSpilledSizes = 
validTasks.map(_.taskMetrics.get.memoryBytesSpilled.toDouble)
-        val memoryBytesSpilledQuantiles = <td>Shuffle spill (memory)</td> +:
-          getFormattedSizeQuantiles(memoryBytesSpilledSizes)
+      val serializationQuantiles = titleCell("Result Serialization Time",
+        ToolTips.RESULT_SERIALIZATION_TIME) ++ 
timeQuantiles(metrics.resultSerializationTime)
 
-        val diskBytesSpilledSizes = 
validTasks.map(_.taskMetrics.get.diskBytesSpilled.toDouble)
-        val diskBytesSpilledQuantiles = <td>Shuffle spill (disk)</td> +:
-          getFormattedSizeQuantiles(diskBytesSpilledSizes)
+      val gettingResultQuantiles = titleCell("Getting Result Time", 
ToolTips.GETTING_RESULT_TIME) ++
+        timeQuantiles(metrics.gettingResultTime)
 
-        val listings: Seq[Seq[Node]] = Seq(
-          <tr>{serviceQuantiles}</tr>,
-          <tr 
class={TaskDetailsClassNames.SCHEDULER_DELAY}>{schedulerDelayQuantiles}</tr>,
-          <tr class={TaskDetailsClassNames.TASK_DESERIALIZATION_TIME}>
-            {deserializationQuantiles}
-          </tr>
-          <tr>{gcQuantiles}</tr>,
-          <tr class={TaskDetailsClassNames.RESULT_SERIALIZATION_TIME}>
-            {serializationQuantiles}
-          </tr>,
-          <tr 
class={TaskDetailsClassNames.GETTING_RESULT_TIME}>{gettingResultQuantiles}</tr>,
-          <tr class={TaskDetailsClassNames.PEAK_EXECUTION_MEMORY}>
-            {peakExecutionMemoryQuantiles}
-          </tr>,
-          if (hasInput(stageData)) <tr>{inputQuantiles}</tr> else Nil,
-          if (hasOutput(stageData)) <tr>{outputQuantiles}</tr> else Nil,
-          if (hasShuffleRead(stageData)) {
-            <tr class={TaskDetailsClassNames.SHUFFLE_READ_BLOCKED_TIME}>
-              {shuffleReadBlockedQuantiles}
-            </tr>
-            <tr>{shuffleReadTotalQuantiles}</tr>
-            <tr class={TaskDetailsClassNames.SHUFFLE_READ_REMOTE_SIZE}>
-              {shuffleReadRemoteQuantiles}
-            </tr>
-          } else {
-            Nil
-          },
-          if (hasShuffleWrite(stageData)) <tr>{shuffleWriteQuantiles}</tr> 
else Nil,
-          if (hasBytesSpilled(stageData)) 
<tr>{memoryBytesSpilledQuantiles}</tr> else Nil,
-          if (hasBytesSpilled(stageData)) <tr>{diskBytesSpilledQuantiles}</tr> 
else Nil)
-
-        val quantileHeaders = Seq("Metric", "Min", "25th percentile",
-          "Median", "75th percentile", "Max")
-        // The summary table does not use CSS to stripe rows, which doesn't 
work with hidden
-        // rows (instead, JavaScript in table.js is used to stripe the 
non-hidden rows).
-        Some(UIUtils.listingTable(
-          quantileHeaders,
-          identity[Seq[Node]],
-          listings,
-          fixedWidth = true,
-          id = Some("task-summary-table"),
-          stripeRowsWithCss = false))
+      val peakExecutionMemoryQuantiles = titleCell("Peak Execution Memory",
+        ToolTips.PEAK_EXECUTION_MEMORY) ++ 
sizeQuantiles(metrics.peakExecutionMemory)
+
+      // The scheduler delay includes the network delay to send the task to 
the worker
+      // machine and to send back the result (but not the time to fetch the 
task result,
+      // if it needed to be fetched from the block manager on the worker).
+      val schedulerDelayQuantiles = titleCell("Scheduler Delay", 
ToolTips.SCHEDULER_DELAY) ++
+        timeQuantiles(metrics.schedulerDelay)
+
+      def inputQuantiles: Seq[Node] = {
+        simpleTitleCell("Input Size / Records") ++
+          sizeQuantilesWithRecords(metrics.inputMetrics.bytesRead, 
metrics.inputMetrics.recordsRead)
+      }
+
+      def outputQuantiles: Seq[Node] = {
+        simpleTitleCell("Output Size / Records") ++
+          sizeQuantilesWithRecords(metrics.outputMetrics.bytesWritten,
+            metrics.outputMetrics.recordsWritten)
       }
 
+      def shuffleReadBlockedQuantiles: Seq[Node] = {
+        titleCell("Shuffle Read Blocked Time", 
ToolTips.SHUFFLE_READ_BLOCKED_TIME) ++
+          timeQuantiles(metrics.shuffleReadMetrics.fetchWaitTime)
+      }
+
+      def shuffleReadTotalQuantiles: Seq[Node] = {
+        titleCell("Shuffle Read Size / Records", ToolTips.SHUFFLE_READ) ++
+          sizeQuantilesWithRecords(metrics.shuffleReadMetrics.readBytes,
+            metrics.shuffleReadMetrics.readRecords)
+      }
+
+      def shuffleReadRemoteQuantiles: Seq[Node] = {
+        titleCell("Shuffle Remote Reads", ToolTips.SHUFFLE_READ_REMOTE_SIZE) ++
+          sizeQuantiles(metrics.shuffleReadMetrics.remoteBytesRead)
+      }
+
+      def shuffleWriteQuantiles: Seq[Node] = {
+        simpleTitleCell("Shuffle Write Size / Records") ++
+          sizeQuantilesWithRecords(metrics.shuffleWriteMetrics.writeBytes,
+            metrics.shuffleWriteMetrics.writeRecords)
+      }
+
+      def memoryBytesSpilledQuantiles: Seq[Node] = {
+        simpleTitleCell("Shuffle spill (memory)") ++ 
sizeQuantiles(metrics.memoryBytesSpilled)
+      }
+
+      def diskBytesSpilledQuantiles: Seq[Node] = {
+        simpleTitleCell("Shuffle spill (disk)") ++ 
sizeQuantiles(metrics.diskBytesSpilled)
+      }
+
+      val listings: Seq[Seq[Node]] = Seq(
+        <tr>{serviceQuantiles}</tr>,
+        <tr 
class={TaskDetailsClassNames.SCHEDULER_DELAY}>{schedulerDelayQuantiles}</tr>,
+        <tr class={TaskDetailsClassNames.TASK_DESERIALIZATION_TIME}>
+          {deserializationQuantiles}
+        </tr>
+        <tr>{gcQuantiles}</tr>,
+        <tr class={TaskDetailsClassNames.RESULT_SERIALIZATION_TIME}>
+          {serializationQuantiles}
+        </tr>,
+        <tr 
class={TaskDetailsClassNames.GETTING_RESULT_TIME}>{gettingResultQuantiles}</tr>,
+        <tr class={TaskDetailsClassNames.PEAK_EXECUTION_MEMORY}>
+          {peakExecutionMemoryQuantiles}
+        </tr>,
+        if (hasInput(stageData)) <tr>{inputQuantiles}</tr> else Nil,
+        if (hasOutput(stageData)) <tr>{outputQuantiles}</tr> else Nil,
+        if (hasShuffleRead(stageData)) {
+          <tr class={TaskDetailsClassNames.SHUFFLE_READ_BLOCKED_TIME}>
+            {shuffleReadBlockedQuantiles}
+          </tr>
+          <tr>{shuffleReadTotalQuantiles}</tr>
+          <tr class={TaskDetailsClassNames.SHUFFLE_READ_REMOTE_SIZE}>
+            {shuffleReadRemoteQuantiles}
+          </tr>
+        } else {
+          Nil
+        },
+        if (hasShuffleWrite(stageData)) <tr>{shuffleWriteQuantiles}</tr> else 
Nil,
+        if (hasBytesSpilled(stageData)) <tr>{memoryBytesSpilledQuantiles}</tr> 
else Nil,
+        if (hasBytesSpilled(stageData)) <tr>{diskBytesSpilledQuantiles}</tr> 
else Nil)
+
+      val quantileHeaders = Seq("Metric", "Min", "25th percentile", "Median", 
"75th percentile",
+        "Max")
+      // The summary table does not use CSS to stripe rows, which doesn't work 
with hidden
+      // rows (instead, JavaScript in table.js is used to stripe the 
non-hidden rows).
+      UIUtils.listingTable(
+        quantileHeaders,
+        identity[Seq[Node]],
+        listings,
+        fixedWidth = true,
+        id = Some("task-summary-table"),
+        stripeRowsWithCss = false)
+    }
+
     val executorTable = new ExecutorTable(stageData, parent.store)
 
     val maybeAccumulableTable: Seq[Node] =
-      if (hasAccumulators) { <h4>Accumulators</h4> ++ accumulableTable } else 
Seq()
+      if (hasAccumulators(stageData)) { <h4>Accumulators</h4> ++ 
accumulableTable } else Seq()
 
     val aggMetrics =
       <span class="collapse-aggregated-metrics collapse-table"
@@ -550,7 +480,7 @@ private[ui] class StagePage(parent: StagesTab, store: 
AppStatusStore) extends We
       showAdditionalMetrics ++
       makeTimeline(
         // Only show the tasks in the table
-        tasks.filter { t => taskIdsInPage.contains(t.taskId) },
+        Option(taskTable).map(_.dataSource.tasks).getOrElse(Nil),
         currentTime) ++
       <h4>Summary Metrics for <a href="#tasks-section">{numCompleted} 
Completed Tasks</a></h4> ++
       <div>{summaryTable.getOrElse("No tasks have reported metrics 
yet.")}</div> ++
@@ -593,10 +523,9 @@ private[ui] class StagePage(parent: StagesTab, store: 
AppStatusStore) extends We
         val serializationTimeProportion = toProportion(serializationTime)
         val deserializationTime = 
metricsOpt.map(_.executorDeserializeTime).getOrElse(0L)
         val deserializationTimeProportion = toProportion(deserializationTime)
-        val gettingResultTime = getGettingResultTime(taskInfo, currentTime)
+        val gettingResultTime = AppStatusUtils.gettingResultTime(taskInfo)
         val gettingResultTimeProportion = toProportion(gettingResultTime)
-        val schedulerDelay =
-          metricsOpt.map(getSchedulerDelay(taskInfo, _, 
currentTime)).getOrElse(0L)
+        val schedulerDelay = AppStatusUtils.schedulerDelay(taskInfo)
         val schedulerDelayProportion = toProportion(schedulerDelay)
 
         val executorOverhead = serializationTime + deserializationTime
@@ -708,7 +637,7 @@ private[ui] class StagePage(parent: StagesTab, store: 
AppStatusStore) extends We
       {
         if (MAX_TIMELINE_TASKS < tasks.size) {
           <strong>
-            This stage has more than the maximum number of tasks that can be 
shown in the
+            This page has more than the maximum number of tasks that can be 
shown in the
             visualization! Only the most recent {MAX_TIMELINE_TASKS} tasks
             (of {tasks.size} total) are shown.
           </strong>
@@ -733,402 +662,49 @@ private[ui] class StagePage(parent: StagesTab, store: 
AppStatusStore) extends We
 
 }
 
-private[ui] object StagePage {
-  private[ui] def getGettingResultTime(info: TaskData, currentTime: Long): 
Long = {
-    info.resultFetchStart match {
-      case Some(start) =>
-        info.duration match {
-          case Some(duration) =>
-            info.launchTime.getTime() + duration - start.getTime()
-
-          case _ =>
-            currentTime - start.getTime()
-        }
-
-      case _ =>
-        0L
-    }
-  }
-
-  private[ui] def getSchedulerDelay(
-      info: TaskData,
-      metrics: TaskMetrics,
-      currentTime: Long): Long = {
-    info.duration match {
-      case Some(duration) =>
-        val executorOverhead = metrics.executorDeserializeTime + 
metrics.resultSerializationTime
-        math.max(
-          0,
-          duration - metrics.executorRunTime - executorOverhead -
-            getGettingResultTime(info, currentTime))
-
-      case _ =>
-        // The task is still running and the metrics like executorRunTime are 
not available.
-        0L
-    }
-  }
-
-}
-
-private[ui] case class TaskTableRowInputData(inputSortable: Long, 
inputReadable: String)
-
-private[ui] case class TaskTableRowOutputData(outputSortable: Long, 
outputReadable: String)
-
-private[ui] case class TaskTableRowShuffleReadData(
-    shuffleReadBlockedTimeSortable: Long,
-    shuffleReadBlockedTimeReadable: String,
-    shuffleReadSortable: Long,
-    shuffleReadReadable: String,
-    shuffleReadRemoteSortable: Long,
-    shuffleReadRemoteReadable: String)
-
-private[ui] case class TaskTableRowShuffleWriteData(
-    writeTimeSortable: Long,
-    writeTimeReadable: String,
-    shuffleWriteSortable: Long,
-    shuffleWriteReadable: String)
-
-private[ui] case class TaskTableRowBytesSpilledData(
-    memoryBytesSpilledSortable: Long,
-    memoryBytesSpilledReadable: String,
-    diskBytesSpilledSortable: Long,
-    diskBytesSpilledReadable: String)
-
-/**
- * Contains all data that needs for sorting and generating HTML. Using this 
one rather than
- * TaskData to avoid creating duplicate contents during sorting the data.
- */
-private[ui] class TaskTableRowData(
-    val index: Int,
-    val taskId: Long,
-    val attempt: Int,
-    val speculative: Boolean,
-    val status: String,
-    val taskLocality: String,
-    val executorId: String,
-    val host: String,
-    val launchTime: Long,
-    val duration: Long,
-    val formatDuration: String,
-    val schedulerDelay: Long,
-    val taskDeserializationTime: Long,
-    val gcTime: Long,
-    val serializationTime: Long,
-    val gettingResultTime: Long,
-    val peakExecutionMemoryUsed: Long,
-    val accumulators: Option[String], // HTML
-    val input: Option[TaskTableRowInputData],
-    val output: Option[TaskTableRowOutputData],
-    val shuffleRead: Option[TaskTableRowShuffleReadData],
-    val shuffleWrite: Option[TaskTableRowShuffleWriteData],
-    val bytesSpilled: Option[TaskTableRowBytesSpilledData],
-    val error: String,
-    val logs: Map[String, String])
-
 private[ui] class TaskDataSource(
-    tasks: Seq[TaskData],
-    hasAccumulators: Boolean,
-    hasInput: Boolean,
-    hasOutput: Boolean,
-    hasShuffleRead: Boolean,
-    hasShuffleWrite: Boolean,
-    hasBytesSpilled: Boolean,
+    stage: StageData,
     currentTime: Long,
     pageSize: Int,
     sortColumn: String,
     desc: Boolean,
-    store: AppStatusStore) extends PagedDataSource[TaskTableRowData](pageSize) 
{
-  import StagePage._
+    store: AppStatusStore) extends PagedDataSource[TaskData](pageSize) {
+  import ApiHelper._
 
   // Keep an internal cache of executor log maps so that long task lists 
render faster.
   private val executorIdToLogs = new HashMap[String, Map[String, String]]()
 
-  // Convert TaskData to TaskTableRowData which contains the final contents to 
show in the table
-  // so that we can avoid creating duplicate contents during sorting the data
-  private val data = tasks.map(taskRow).sorted(ordering(sortColumn, desc))
-
-  private var _slicedTaskIds: Set[Long] = _
+  private var _tasksToShow: Seq[TaskData] = null
 
-  override def dataSize: Int = data.size
+  override def dataSize: Int = stage.numCompleteTasks + stage.numFailedTasks + 
stage.numKilledTasks
 
-  override def sliceData(from: Int, to: Int): Seq[TaskTableRowData] = {
-    val r = data.slice(from, to)
-    _slicedTaskIds = r.map(_.taskId).toSet
-    r
-  }
-
-  def slicedTaskIds: Set[Long] = _slicedTaskIds
-
-  private def taskRow(info: TaskData): TaskTableRowData = {
-    val metrics = info.taskMetrics
-    val duration = info.duration.getOrElse(1L)
-    val formatDuration = info.duration.map(d => 
UIUtils.formatDuration(d)).getOrElse("")
-    val schedulerDelay = metrics.map(getSchedulerDelay(info, _, 
currentTime)).getOrElse(0L)
-    val gcTime = metrics.map(_.jvmGcTime).getOrElse(0L)
-    val taskDeserializationTime = 
metrics.map(_.executorDeserializeTime).getOrElse(0L)
-    val serializationTime = 
metrics.map(_.resultSerializationTime).getOrElse(0L)
-    val gettingResultTime = getGettingResultTime(info, currentTime)
-
-    val externalAccumulableReadable = info.accumulatorUpdates.map { acc =>
-      StringEscapeUtils.escapeHtml4(s"${acc.name}: ${acc.update}")
+  override def sliceData(from: Int, to: Int): Seq[TaskData] = {
+    if (_tasksToShow == null) {
+      _tasksToShow = store.taskList(stage.stageId, stage.attemptId, from, to - 
from,
+        indexName(sortColumn), !desc)
     }
-    val peakExecutionMemoryUsed = 
metrics.map(_.peakExecutionMemory).getOrElse(0L)
-
-    val maybeInput = metrics.map(_.inputMetrics)
-    val inputSortable = maybeInput.map(_.bytesRead).getOrElse(0L)
-    val inputReadable = maybeInput
-      .map(m => s"${Utils.bytesToString(m.bytesRead)}")
-      .getOrElse("")
-    val inputRecords = maybeInput.map(_.recordsRead.toString).getOrElse("")
-
-    val maybeOutput = metrics.map(_.outputMetrics)
-    val outputSortable = maybeOutput.map(_.bytesWritten).getOrElse(0L)
-    val outputReadable = maybeOutput
-      .map(m => s"${Utils.bytesToString(m.bytesWritten)}")
-      .getOrElse("")
-    val outputRecords = 
maybeOutput.map(_.recordsWritten.toString).getOrElse("")
-
-    val maybeShuffleRead = metrics.map(_.shuffleReadMetrics)
-    val shuffleReadBlockedTimeSortable = 
maybeShuffleRead.map(_.fetchWaitTime).getOrElse(0L)
-    val shuffleReadBlockedTimeReadable =
-      maybeShuffleRead.map(ms => 
UIUtils.formatDuration(ms.fetchWaitTime)).getOrElse("")
-
-    val totalShuffleBytes = maybeShuffleRead.map(ApiHelper.totalBytesRead)
-    val shuffleReadSortable = totalShuffleBytes.getOrElse(0L)
-    val shuffleReadReadable = 
totalShuffleBytes.map(Utils.bytesToString).getOrElse("")
-    val shuffleReadRecords = 
maybeShuffleRead.map(_.recordsRead.toString).getOrElse("")
-
-    val remoteShuffleBytes = maybeShuffleRead.map(_.remoteBytesRead)
-    val shuffleReadRemoteSortable = remoteShuffleBytes.getOrElse(0L)
-    val shuffleReadRemoteReadable = 
remoteShuffleBytes.map(Utils.bytesToString).getOrElse("")
-
-    val maybeShuffleWrite = metrics.map(_.shuffleWriteMetrics)
-    val shuffleWriteSortable = 
maybeShuffleWrite.map(_.bytesWritten).getOrElse(0L)
-    val shuffleWriteReadable = maybeShuffleWrite
-      .map(m => s"${Utils.bytesToString(m.bytesWritten)}").getOrElse("")
-    val shuffleWriteRecords = maybeShuffleWrite
-      .map(_.recordsWritten.toString).getOrElse("")
-
-    val maybeWriteTime = metrics.map(_.shuffleWriteMetrics.writeTime)
-    val writeTimeSortable = maybeWriteTime.getOrElse(0L)
-    val writeTimeReadable = maybeWriteTime.map(t => t / (1000 * 1000)).map { 
ms =>
-      if (ms == 0) "" else UIUtils.formatDuration(ms)
-    }.getOrElse("")
-
-    val maybeMemoryBytesSpilled = metrics.map(_.memoryBytesSpilled)
-    val memoryBytesSpilledSortable = maybeMemoryBytesSpilled.getOrElse(0L)
-    val memoryBytesSpilledReadable =
-      maybeMemoryBytesSpilled.map(Utils.bytesToString).getOrElse("")
-
-    val maybeDiskBytesSpilled = metrics.map(_.diskBytesSpilled)
-    val diskBytesSpilledSortable = maybeDiskBytesSpilled.getOrElse(0L)
-    val diskBytesSpilledReadable = 
maybeDiskBytesSpilled.map(Utils.bytesToString).getOrElse("")
-
-    val input =
-      if (hasInput) {
-        Some(TaskTableRowInputData(inputSortable, s"$inputReadable / 
$inputRecords"))
-      } else {
-        None
-      }
-
-    val output =
-      if (hasOutput) {
-        Some(TaskTableRowOutputData(outputSortable, s"$outputReadable / 
$outputRecords"))
-      } else {
-        None
-      }
-
-    val shuffleRead =
-      if (hasShuffleRead) {
-        Some(TaskTableRowShuffleReadData(
-          shuffleReadBlockedTimeSortable,
-          shuffleReadBlockedTimeReadable,
-          shuffleReadSortable,
-          s"$shuffleReadReadable / $shuffleReadRecords",
-          shuffleReadRemoteSortable,
-          shuffleReadRemoteReadable
-        ))
-      } else {
-        None
-      }
-
-    val shuffleWrite =
-      if (hasShuffleWrite) {
-        Some(TaskTableRowShuffleWriteData(
-          writeTimeSortable,
-          writeTimeReadable,
-          shuffleWriteSortable,
-          s"$shuffleWriteReadable / $shuffleWriteRecords"
-        ))
-      } else {
-        None
-      }
-
-    val bytesSpilled =
-      if (hasBytesSpilled) {
-        Some(TaskTableRowBytesSpilledData(
-          memoryBytesSpilledSortable,
-          memoryBytesSpilledReadable,
-          diskBytesSpilledSortable,
-          diskBytesSpilledReadable
-        ))
-      } else {
-        None
-      }
-
-    new TaskTableRowData(
-      info.index,
-      info.taskId,
-      info.attempt,
-      info.speculative,
-      info.status,
-      info.taskLocality.toString,
-      info.executorId,
-      info.host,
-      info.launchTime.getTime(),
-      duration,
-      formatDuration,
-      schedulerDelay,
-      taskDeserializationTime,
-      gcTime,
-      serializationTime,
-      gettingResultTime,
-      peakExecutionMemoryUsed,
-      if (hasAccumulators) Some(externalAccumulableReadable.mkString("<br/>")) 
else None,
-      input,
-      output,
-      shuffleRead,
-      shuffleWrite,
-      bytesSpilled,
-      info.errorMessage.getOrElse(""),
-      executorLogs(info.executorId))
+    _tasksToShow
   }
 
-  private def executorLogs(id: String): Map[String, String] = {
+  def tasks: Seq[TaskData] = _tasksToShow
+
+  def executorLogs(id: String): Map[String, String] = {
     executorIdToLogs.getOrElseUpdate(id,
       
store.asOption(store.executorSummary(id)).map(_.executorLogs).getOrElse(Map.empty))
   }
 
-  /**
-   * Return Ordering according to sortColumn and desc
-   */
-  private def ordering(sortColumn: String, desc: Boolean): 
Ordering[TaskTableRowData] = {
-    val ordering: Ordering[TaskTableRowData] = sortColumn match {
-      case "Index" => Ordering.by(_.index)
-      case "ID" => Ordering.by(_.taskId)
-      case "Attempt" => Ordering.by(_.attempt)
-      case "Status" => Ordering.by(_.status)
-      case "Locality Level" => Ordering.by(_.taskLocality)
-      case "Executor ID" => Ordering.by(_.executorId)
-      case "Host" => Ordering.by(_.host)
-      case "Launch Time" => Ordering.by(_.launchTime)
-      case "Duration" => Ordering.by(_.duration)
-      case "Scheduler Delay" => Ordering.by(_.schedulerDelay)
-      case "Task Deserialization Time" => 
Ordering.by(_.taskDeserializationTime)
-      case "GC Time" => Ordering.by(_.gcTime)
-      case "Result Serialization Time" => Ordering.by(_.serializationTime)
-      case "Getting Result Time" => Ordering.by(_.gettingResultTime)
-      case "Peak Execution Memory" => Ordering.by(_.peakExecutionMemoryUsed)
-      case "Accumulators" =>
-        if (hasAccumulators) {
-          Ordering.by(_.accumulators.get)
-        } else {
-          throw new IllegalArgumentException(
-            "Cannot sort by Accumulators because of no accumulators")
-        }
-      case "Input Size / Records" =>
-        if (hasInput) {
-          Ordering.by(_.input.get.inputSortable)
-        } else {
-          throw new IllegalArgumentException(
-            "Cannot sort by Input Size / Records because of no inputs")
-        }
-      case "Output Size / Records" =>
-        if (hasOutput) {
-          Ordering.by(_.output.get.outputSortable)
-        } else {
-          throw new IllegalArgumentException(
-            "Cannot sort by Output Size / Records because of no outputs")
-        }
-      // ShuffleRead
-      case "Shuffle Read Blocked Time" =>
-        if (hasShuffleRead) {
-          Ordering.by(_.shuffleRead.get.shuffleReadBlockedTimeSortable)
-        } else {
-          throw new IllegalArgumentException(
-            "Cannot sort by Shuffle Read Blocked Time because of no shuffle 
reads")
-        }
-      case "Shuffle Read Size / Records" =>
-        if (hasShuffleRead) {
-          Ordering.by(_.shuffleRead.get.shuffleReadSortable)
-        } else {
-          throw new IllegalArgumentException(
-            "Cannot sort by Shuffle Read Size / Records because of no shuffle 
reads")
-        }
-      case "Shuffle Remote Reads" =>
-        if (hasShuffleRead) {
-          Ordering.by(_.shuffleRead.get.shuffleReadRemoteSortable)
-        } else {
-          throw new IllegalArgumentException(
-            "Cannot sort by Shuffle Remote Reads because of no shuffle reads")
-        }
-      // ShuffleWrite
-      case "Write Time" =>
-        if (hasShuffleWrite) {
-          Ordering.by(_.shuffleWrite.get.writeTimeSortable)
-        } else {
-          throw new IllegalArgumentException(
-            "Cannot sort by Write Time because of no shuffle writes")
-        }
-      case "Shuffle Write Size / Records" =>
-        if (hasShuffleWrite) {
-          Ordering.by(_.shuffleWrite.get.shuffleWriteSortable)
-        } else {
-          throw new IllegalArgumentException(
-            "Cannot sort by Shuffle Write Size / Records because of no shuffle 
writes")
-        }
-      // BytesSpilled
-      case "Shuffle Spill (Memory)" =>
-        if (hasBytesSpilled) {
-          Ordering.by(_.bytesSpilled.get.memoryBytesSpilledSortable)
-        } else {
-          throw new IllegalArgumentException(
-            "Cannot sort by Shuffle Spill (Memory) because of no spills")
-        }
-      case "Shuffle Spill (Disk)" =>
-        if (hasBytesSpilled) {
-          Ordering.by(_.bytesSpilled.get.diskBytesSpilledSortable)
-        } else {
-          throw new IllegalArgumentException(
-            "Cannot sort by Shuffle Spill (Disk) because of no spills")
-        }
-      case "Errors" => Ordering.by(_.error)
-      case unknownColumn => throw new IllegalArgumentException(s"Unknown 
column: $unknownColumn")
-    }
-    if (desc) {
-      ordering.reverse
-    } else {
-      ordering
-    }
-  }
-
 }
 
 private[ui] class TaskPagedTable(
-    conf: SparkConf,
+    stage: StageData,
     basePath: String,
-    data: Seq[TaskData],
-    hasAccumulators: Boolean,
-    hasInput: Boolean,
-    hasOutput: Boolean,
-    hasShuffleRead: Boolean,
-    hasShuffleWrite: Boolean,
-    hasBytesSpilled: Boolean,
     currentTime: Long,
     pageSize: Int,
     sortColumn: String,
     desc: Boolean,
-    store: AppStatusStore) extends PagedTable[TaskTableRowData] {
+    store: AppStatusStore) extends PagedTable[TaskData] {
+
+  import ApiHelper._
 
   override def tableId: String = "task-table"
 
@@ -1142,13 +718,7 @@ private[ui] class TaskPagedTable(
   override def pageNumberFormField: String = "task.page"
 
   override val dataSource: TaskDataSource = new TaskDataSource(
-    data,
-    hasAccumulators,
-    hasInput,
-    hasOutput,
-    hasShuffleRead,
-    hasShuffleWrite,
-    hasBytesSpilled,
+    stage,
     currentTime,
     pageSize,
     sortColumn,
@@ -1180,22 +750,22 @@ private[ui] class TaskPagedTable(
         ("Result Serialization Time", 
TaskDetailsClassNames.RESULT_SERIALIZATION_TIME),
         ("Getting Result Time", TaskDetailsClassNames.GETTING_RESULT_TIME),
         ("Peak Execution Memory", 
TaskDetailsClassNames.PEAK_EXECUTION_MEMORY)) ++
-        {if (hasAccumulators) Seq(("Accumulators", "")) else Nil} ++
-        {if (hasInput) Seq(("Input Size / Records", "")) else Nil} ++
-        {if (hasOutput) Seq(("Output Size / Records", "")) else Nil} ++
-        {if (hasShuffleRead) {
+        {if (hasAccumulators(stage)) Seq(("Accumulators", "")) else Nil} ++
+        {if (hasInput(stage)) Seq(("Input Size / Records", "")) else Nil} ++
+        {if (hasOutput(stage)) Seq(("Output Size / Records", "")) else Nil} ++
+        {if (hasShuffleRead(stage)) {
           Seq(("Shuffle Read Blocked Time", 
TaskDetailsClassNames.SHUFFLE_READ_BLOCKED_TIME),
             ("Shuffle Read Size / Records", ""),
             ("Shuffle Remote Reads", 
TaskDetailsClassNames.SHUFFLE_READ_REMOTE_SIZE))
         } else {
           Nil
         }} ++
-        {if (hasShuffleWrite) {
+        {if (hasShuffleWrite(stage)) {
           Seq(("Write Time", ""), ("Shuffle Write Size / Records", ""))
         } else {
           Nil
         }} ++
-        {if (hasBytesSpilled) {
+        {if (hasBytesSpilled(stage)) {
           Seq(("Shuffle Spill (Memory)", ""), ("Shuffle Spill (Disk)", ""))
         } else {
           Nil
@@ -1237,7 +807,17 @@ private[ui] class TaskPagedTable(
     <thead>{headerRow}</thead>
   }
 
-  def row(task: TaskTableRowData): Seq[Node] = {
+  def row(task: TaskData): Seq[Node] = {
+    def formatDuration(value: Option[Long], hideZero: Boolean = false): String 
= {
+      value.map { v =>
+        if (v > 0 || !hideZero) UIUtils.formatDuration(v) else ""
+      }.getOrElse("")
+    }
+
+    def formatBytes(value: Option[Long]): String = {
+      Utils.bytesToString(value.getOrElse(0L))
+    }
+
     <tr>
       <td>{task.index}</td>
       <td>{task.taskId}</td>
@@ -1249,62 +829,98 @@ private[ui] class TaskPagedTable(
         <div style="float: left">{task.host}</div>
         <div style="float: right">
         {
-          task.logs.map {
+          dataSource.executorLogs(task.executorId).map {
             case (logName, logUrl) => <div><a href={logUrl}>{logName}</a></div>
           }
         }
         </div>
       </td>
-      <td>{UIUtils.formatDate(new Date(task.launchTime))}</td>
-      <td>{task.formatDuration}</td>
+      <td>{UIUtils.formatDate(task.launchTime)}</td>
+      <td>{formatDuration(task.duration)}</td>
       <td class={TaskDetailsClassNames.SCHEDULER_DELAY}>
-        {UIUtils.formatDuration(task.schedulerDelay)}
+        {UIUtils.formatDuration(AppStatusUtils.schedulerDelay(task))}
       </td>
       <td class={TaskDetailsClassNames.TASK_DESERIALIZATION_TIME}>
-        {UIUtils.formatDuration(task.taskDeserializationTime)}
+        {formatDuration(task.taskMetrics.map(_.executorDeserializeTime))}
       </td>
       <td>
-        {if (task.gcTime > 0) UIUtils.formatDuration(task.gcTime) else ""}
+        {formatDuration(task.taskMetrics.map(_.jvmGcTime), hideZero = true)}
       </td>
       <td class={TaskDetailsClassNames.RESULT_SERIALIZATION_TIME}>
-        {UIUtils.formatDuration(task.serializationTime)}
+        {formatDuration(task.taskMetrics.map(_.resultSerializationTime))}
       </td>
       <td class={TaskDetailsClassNames.GETTING_RESULT_TIME}>
-        {UIUtils.formatDuration(task.gettingResultTime)}
+        {UIUtils.formatDuration(AppStatusUtils.gettingResultTime(task))}
       </td>
       <td class={TaskDetailsClassNames.PEAK_EXECUTION_MEMORY}>
-        {Utils.bytesToString(task.peakExecutionMemoryUsed)}
+        {formatBytes(task.taskMetrics.map(_.peakExecutionMemory))}
       </td>
-      {if (task.accumulators.nonEmpty) {
-        <td>{Unparsed(task.accumulators.get)}</td>
+      {if (hasAccumulators(stage)) {
+        accumulatorsInfo(task)
       }}
-      {if (task.input.nonEmpty) {
-        <td>{task.input.get.inputReadable}</td>
+      {if (hasInput(stage)) {
+        metricInfo(task) { m =>
+          val bytesRead = Utils.bytesToString(m.inputMetrics.bytesRead)
+          val records = m.inputMetrics.recordsRead
+          <td>{bytesRead} / {records}</td>
+        }
       }}
-      {if (task.output.nonEmpty) {
-        <td>{task.output.get.outputReadable}</td>
+      {if (hasOutput(stage)) {
+        metricInfo(task) { m =>
+          val bytesWritten = Utils.bytesToString(m.outputMetrics.bytesWritten)
+          val records = m.outputMetrics.recordsWritten
+          <td>{bytesWritten} / {records}</td>
+        }
       }}
-      {if (task.shuffleRead.nonEmpty) {
+      {if (hasShuffleRead(stage)) {
         <td class={TaskDetailsClassNames.SHUFFLE_READ_BLOCKED_TIME}>
-          {task.shuffleRead.get.shuffleReadBlockedTimeReadable}
+          
{formatDuration(task.taskMetrics.map(_.shuffleReadMetrics.fetchWaitTime))}
         </td>
-        <td>{task.shuffleRead.get.shuffleReadReadable}</td>
+        <td>{
+          metricInfo(task) { m =>
+            val bytesRead = 
Utils.bytesToString(totalBytesRead(m.shuffleReadMetrics))
+            val records = m.shuffleReadMetrics.recordsRead
+            Unparsed(s"$bytesRead / $records")
+          }
+        }</td>
         <td class={TaskDetailsClassNames.SHUFFLE_READ_REMOTE_SIZE}>
-          {task.shuffleRead.get.shuffleReadRemoteReadable}
+          
{formatBytes(task.taskMetrics.map(_.shuffleReadMetrics.remoteBytesRead))}
         </td>
       }}
-      {if (task.shuffleWrite.nonEmpty) {
-        <td>{task.shuffleWrite.get.writeTimeReadable}</td>
-        <td>{task.shuffleWrite.get.shuffleWriteReadable}</td>
+      {if (hasShuffleWrite(stage)) {
+        <td>{
+          formatDuration(
+            task.taskMetrics.map { m =>
+              TimeUnit.NANOSECONDS.toMillis(m.shuffleWriteMetrics.writeTime)
+            },
+            hideZero = true)
+        }</td>
+        <td>{
+          metricInfo(task) { m =>
+            val bytesWritten = 
Utils.bytesToString(m.shuffleWriteMetrics.bytesWritten)
+            val records = m.shuffleWriteMetrics.recordsWritten
+            Unparsed(s"$bytesWritten / $records")
+          }
+        }</td>
       }}
-      {if (task.bytesSpilled.nonEmpty) {
-        <td>{task.bytesSpilled.get.memoryBytesSpilledReadable}</td>
-        <td>{task.bytesSpilled.get.diskBytesSpilledReadable}</td>
+      {if (hasBytesSpilled(stage)) {
+        <td>{formatBytes(task.taskMetrics.map(_.memoryBytesSpilled))}</td>
+        <td>{formatBytes(task.taskMetrics.map(_.diskBytesSpilled))}</td>
       }}
-      {errorMessageCell(task.error)}
+      {errorMessageCell(task.errorMessage.getOrElse(""))}
     </tr>
   }
 
+  private def accumulatorsInfo(task: TaskData): Seq[Node] = {
+    task.accumulatorUpdates.map { acc =>
+      Unparsed(StringEscapeUtils.escapeHtml4(s"${acc.name}: ${acc.update}"))
+    }
+  }
+
+  private def metricInfo(task: TaskData)(fn: TaskMetrics => Seq[Node]): 
Seq[Node] = {
+    task.taskMetrics.map(fn).getOrElse(Nil)
+  }
+
   private def errorMessageCell(error: String): Seq[Node] = {
     val isMultiline = error.indexOf('\n') >= 0
     // Display the first line by default
@@ -1333,6 +949,36 @@ private[ui] class TaskPagedTable(
 
 private object ApiHelper {
 
+
+  private val COLUMN_TO_INDEX = Map(
+    "ID" -> null.asInstanceOf[String],
+    "Index" -> TaskIndexNames.TASK_INDEX,
+    "Attempt" -> TaskIndexNames.ATTEMPT,
+    "Status" -> TaskIndexNames.STATUS,
+    "Locality Level" -> TaskIndexNames.LOCALITY,
+    "Executor ID / Host" -> TaskIndexNames.EXECUTOR,
+    "Launch Time" -> TaskIndexNames.LAUNCH_TIME,
+    "Duration" -> TaskIndexNames.DURATION,
+    "Scheduler Delay" -> TaskIndexNames.SCHEDULER_DELAY,
+    "Task Deserialization Time" -> TaskIndexNames.DESER_TIME,
+    "GC Time" -> TaskIndexNames.GC_TIME,
+    "Result Serialization Time" -> TaskIndexNames.SER_TIME,
+    "Getting Result Time" -> TaskIndexNames.GETTING_RESULT_TIME,
+    "Peak Execution Memory" -> TaskIndexNames.PEAK_MEM,
+    "Accumulators" -> TaskIndexNames.ACCUMULATORS,
+    "Input Size / Records" -> TaskIndexNames.INPUT_SIZE,
+    "Output Size / Records" -> TaskIndexNames.OUTPUT_SIZE,
+    "Shuffle Read Blocked Time" -> TaskIndexNames.SHUFFLE_READ_TIME,
+    "Shuffle Read Size / Records" -> TaskIndexNames.SHUFFLE_TOTAL_READS,
+    "Shuffle Remote Reads" -> TaskIndexNames.SHUFFLE_REMOTE_READS,
+    "Write Time" -> TaskIndexNames.SHUFFLE_WRITE_TIME,
+    "Shuffle Write Size / Records" -> TaskIndexNames.SHUFFLE_WRITE_SIZE,
+    "Shuffle Spill (Memory)" -> TaskIndexNames.MEM_SPILL,
+    "Shuffle Spill (Disk)" -> TaskIndexNames.DISK_SPILL,
+    "Errors" -> TaskIndexNames.ERROR)
+
+  def hasAccumulators(stageData: StageData): Boolean = 
stageData.accumulatorUpdates.size > 0
+
   def hasInput(stageData: StageData): Boolean = stageData.inputBytes > 0
 
   def hasOutput(stageData: StageData): Boolean = stageData.outputBytes > 0
@@ -1349,4 +995,11 @@ private object ApiHelper {
     metrics.localBytesRead + metrics.remoteBytesRead
   }
 
+  def indexName(sortColumn: String): Option[String] = {
+    COLUMN_TO_INDEX.get(sortColumn) match {
+      case Some(v) => Option(v)
+      case _ => throw new IllegalArgumentException(s"Invalid sort column: 
$sortColumn")
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/b7813012/core/src/test/resources/HistoryServerExpectations/stage_task_summary_w__custom_quantiles_expectation.json
----------------------------------------------------------------------
diff --git 
a/core/src/test/resources/HistoryServerExpectations/stage_task_summary_w__custom_quantiles_expectation.json
 
b/core/src/test/resources/HistoryServerExpectations/stage_task_summary_w__custom_quantiles_expectation.json
index f8e2770..5c42ac1 100644
--- 
a/core/src/test/resources/HistoryServerExpectations/stage_task_summary_w__custom_quantiles_expectation.json
+++ 
b/core/src/test/resources/HistoryServerExpectations/stage_task_summary_w__custom_quantiles_expectation.json
@@ -7,6 +7,9 @@
   "resultSize" : [ 2010.0, 2065.0, 2065.0 ],
   "jvmGcTime" : [ 0.0, 0.0, 7.0 ],
   "resultSerializationTime" : [ 0.0, 0.0, 2.0 ],
+  "gettingResultTime" : [ 0.0, 0.0, 0.0 ],
+  "schedulerDelay" : [ 2.0, 6.0, 53.0 ],
+  "peakExecutionMemory" : [ 0.0, 0.0, 0.0 ],
   "memoryBytesSpilled" : [ 0.0, 0.0, 0.0 ],
   "diskBytesSpilled" : [ 0.0, 0.0, 0.0 ],
   "inputMetrics" : {

http://git-wip-us.apache.org/repos/asf/spark/blob/b7813012/core/src/test/resources/HistoryServerExpectations/stage_task_summary_w_shuffle_read_expectation.json
----------------------------------------------------------------------
diff --git 
a/core/src/test/resources/HistoryServerExpectations/stage_task_summary_w_shuffle_read_expectation.json
 
b/core/src/test/resources/HistoryServerExpectations/stage_task_summary_w_shuffle_read_expectation.json
index a28bda1..e6b7059 100644
--- 
a/core/src/test/resources/HistoryServerExpectations/stage_task_summary_w_shuffle_read_expectation.json
+++ 
b/core/src/test/resources/HistoryServerExpectations/stage_task_summary_w_shuffle_read_expectation.json
@@ -7,6 +7,9 @@
   "resultSize" : [ 1034.0, 1034.0, 1034.0, 1034.0, 1034.0 ],
   "jvmGcTime" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ],
   "resultSerializationTime" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ],
+  "gettingResultTime" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ],
+  "schedulerDelay" : [ 4.0, 4.0, 6.0, 7.0, 9.0 ],
+  "peakExecutionMemory" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ],
   "memoryBytesSpilled" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ],
   "diskBytesSpilled" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ],
   "inputMetrics" : {

http://git-wip-us.apache.org/repos/asf/spark/blob/b7813012/core/src/test/resources/HistoryServerExpectations/stage_task_summary_w_shuffle_write_expectation.json
----------------------------------------------------------------------
diff --git 
a/core/src/test/resources/HistoryServerExpectations/stage_task_summary_w_shuffle_write_expectation.json
 
b/core/src/test/resources/HistoryServerExpectations/stage_task_summary_w_shuffle_write_expectation.json
index ede3eae..788f28c 100644
--- 
a/core/src/test/resources/HistoryServerExpectations/stage_task_summary_w_shuffle_write_expectation.json
+++ 
b/core/src/test/resources/HistoryServerExpectations/stage_task_summary_w_shuffle_write_expectation.json
@@ -7,6 +7,9 @@
   "resultSize" : [ 2010.0, 2065.0, 2065.0, 2065.0, 2065.0 ],
   "jvmGcTime" : [ 0.0, 0.0, 0.0, 5.0, 7.0 ],
   "resultSerializationTime" : [ 0.0, 0.0, 0.0, 0.0, 1.0 ],
+  "gettingResultTime" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ],
+  "schedulerDelay" : [ 2.0, 4.0, 6.0, 13.0, 40.0 ],
+  "peakExecutionMemory" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ],
   "memoryBytesSpilled" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ],
   "diskBytesSpilled" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ],
   "inputMetrics" : {

http://git-wip-us.apache.org/repos/asf/spark/blob/b7813012/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala 
b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
index b8c84e2..ca66b6b 100644
--- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
@@ -213,45 +213,42 @@ class AppStatusListenerSuite extends SparkFunSuite with 
BeforeAndAfter {
 
     s1Tasks.foreach { task =>
       check[TaskDataWrapper](task.taskId) { wrapper =>
-        assert(wrapper.info.taskId === task.taskId)
+        assert(wrapper.taskId === task.taskId)
         assert(wrapper.stageId === stages.head.stageId)
-        assert(wrapper.stageAttemptId === stages.head.attemptNumber)
-        assert(Arrays.equals(wrapper.stage, Array(stages.head.stageId, 
stages.head.attemptNumber)))
-
-        val runtime = Array[AnyRef](stages.head.stageId: JInteger,
-          stages.head.attemptNumber: JInteger,
-          -1L: JLong)
-        assert(Arrays.equals(wrapper.runtime, runtime))
-
-        assert(wrapper.info.index === task.index)
-        assert(wrapper.info.attempt === task.attemptNumber)
-        assert(wrapper.info.launchTime === new Date(task.launchTime))
-        assert(wrapper.info.executorId === task.executorId)
-        assert(wrapper.info.host === task.host)
-        assert(wrapper.info.status === task.status)
-        assert(wrapper.info.taskLocality === task.taskLocality.toString())
-        assert(wrapper.info.speculative === task.speculative)
+        assert(wrapper.stageAttemptId === stages.head.attemptId)
+        assert(wrapper.index === task.index)
+        assert(wrapper.attempt === task.attemptNumber)
+        assert(wrapper.launchTime === task.launchTime)
+        assert(wrapper.executorId === task.executorId)
+        assert(wrapper.host === task.host)
+        assert(wrapper.status === task.status)
+        assert(wrapper.taskLocality === task.taskLocality.toString())
+        assert(wrapper.speculative === task.speculative)
       }
     }
 
-    // Send executor metrics update. Only update one metric to avoid a lot of 
boilerplate code.
-    s1Tasks.foreach { task =>
-      val accum = new AccumulableInfo(1L, 
Some(InternalAccumulator.MEMORY_BYTES_SPILLED),
-        Some(1L), None, true, false, None)
-      listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate(
-        task.executorId,
-        Seq((task.taskId, stages.head.stageId, stages.head.attemptNumber, 
Seq(accum)))))
-    }
+    // Send two executor metrics update. Only update one metric to avoid a lot 
of boilerplate code.
+    // The tasks are distributed among the two executors, so the 
executor-level metrics should
+    // hold half of the cummulative value of the metric being updated.
+    Seq(1L, 2L).foreach { value =>
+      s1Tasks.foreach { task =>
+        val accum = new AccumulableInfo(1L, 
Some(InternalAccumulator.MEMORY_BYTES_SPILLED),
+          Some(value), None, true, false, None)
+        listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate(
+          task.executorId,
+          Seq((task.taskId, stages.head.stageId, stages.head.attemptNumber, 
Seq(accum)))))
+      }
 
-    check[StageDataWrapper](key(stages.head)) { stage =>
-      assert(stage.info.memoryBytesSpilled === s1Tasks.size)
-    }
+      check[StageDataWrapper](key(stages.head)) { stage =>
+        assert(stage.info.memoryBytesSpilled === s1Tasks.size * value)
+      }
 
-    val execs = store.view(classOf[ExecutorStageSummaryWrapper]).index("stage")
-      .first(key(stages.head)).last(key(stages.head)).asScala.toSeq
-    assert(execs.size > 0)
-    execs.foreach { exec =>
-      assert(exec.info.memoryBytesSpilled === s1Tasks.size / 2)
+      val execs = 
store.view(classOf[ExecutorStageSummaryWrapper]).index("stage")
+        .first(key(stages.head)).last(key(stages.head)).asScala.toSeq
+      assert(execs.size > 0)
+      execs.foreach { exec =>
+        assert(exec.info.memoryBytesSpilled === s1Tasks.size * value / 2)
+      }
     }
 
     // Fail one of the tasks, re-start it.
@@ -278,13 +275,13 @@ class AppStatusListenerSuite extends SparkFunSuite with 
BeforeAndAfter {
     }
 
     check[TaskDataWrapper](s1Tasks.head.taskId) { task =>
-      assert(task.info.status === s1Tasks.head.status)
-      assert(task.info.errorMessage == Some(TaskResultLost.toErrorString))
+      assert(task.status === s1Tasks.head.status)
+      assert(task.errorMessage == Some(TaskResultLost.toErrorString))
     }
 
     check[TaskDataWrapper](reattempt.taskId) { task =>
-      assert(task.info.index === s1Tasks.head.index)
-      assert(task.info.attempt === reattempt.attemptNumber)
+      assert(task.index === s1Tasks.head.index)
+      assert(task.attempt === reattempt.attemptNumber)
     }
 
     // Kill one task, restart it.
@@ -306,8 +303,8 @@ class AppStatusListenerSuite extends SparkFunSuite with 
BeforeAndAfter {
     }
 
     check[TaskDataWrapper](killed.taskId) { task =>
-      assert(task.info.index === killed.index)
-      assert(task.info.errorMessage === Some("killed"))
+      assert(task.index === killed.index)
+      assert(task.errorMessage === Some("killed"))
     }
 
     // Start a new attempt and finish it with TaskCommitDenied, make sure it's 
handled like a kill.
@@ -334,8 +331,8 @@ class AppStatusListenerSuite extends SparkFunSuite with 
BeforeAndAfter {
     }
 
     check[TaskDataWrapper](denied.taskId) { task =>
-      assert(task.info.index === killed.index)
-      assert(task.info.errorMessage === Some(denyReason.toErrorString))
+      assert(task.index === killed.index)
+      assert(task.errorMessage === Some(denyReason.toErrorString))
     }
 
     // Start a new attempt.
@@ -373,10 +370,10 @@ class AppStatusListenerSuite extends SparkFunSuite with 
BeforeAndAfter {
 
     pending.foreach { task =>
       check[TaskDataWrapper](task.taskId) { wrapper =>
-        assert(wrapper.info.errorMessage === None)
-        assert(wrapper.info.taskMetrics.get.executorCpuTime === 2L)
-        assert(wrapper.info.taskMetrics.get.executorRunTime === 4L)
-        assert(wrapper.info.duration === Some(task.duration))
+        assert(wrapper.errorMessage === None)
+        assert(wrapper.executorCpuTime === 2L)
+        assert(wrapper.executorRunTime === 4L)
+        assert(wrapper.duration === task.duration)
       }
     }
 
@@ -894,6 +891,23 @@ class AppStatusListenerSuite extends SparkFunSuite with 
BeforeAndAfter {
     assert(store.count(classOf[StageDataWrapper]) === 3)
     assert(store.count(classOf[RDDOperationGraphWrapper]) === 3)
 
+    val dropped = stages.drop(1).head
+
+    // Cache some quantiles by calling AppStatusStore.taskSummary(). For 
quantiles to be
+    // calculcated, we need at least one finished task.
+    time += 1
+    val task = createTasks(1, Array("1")).head
+    listener.onTaskStart(SparkListenerTaskStart(dropped.stageId, 
dropped.attemptId, task))
+
+    time += 1
+    task.markFinished(TaskState.FINISHED, time)
+    listener.onTaskEnd(SparkListenerTaskEnd(dropped.stageId, dropped.attemptId,
+      "taskType", Success, task, null))
+
+    new AppStatusStore(store)
+      .taskSummary(dropped.stageId, dropped.attemptId, Array(0.25d, 0.50d, 
0.75d))
+    assert(store.count(classOf[CachedQuantile], "stage", key(dropped)) === 3)
+
     stages.drop(1).foreach { s =>
       time += 1
       s.completionTime = Some(time)
@@ -905,6 +919,7 @@ class AppStatusListenerSuite extends SparkFunSuite with 
BeforeAndAfter {
     intercept[NoSuchElementException] {
       store.read(classOf[StageDataWrapper], Array(2, 0))
     }
+    assert(store.count(classOf[CachedQuantile], "stage", key(dropped)) === 0)
 
     val attempt2 = new StageInfo(3, 1, "stage3", 4, Nil, Nil, "details3")
     time += 1

http://git-wip-us.apache.org/repos/asf/spark/blob/b7813012/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala 
b/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala
new file mode 100644
index 0000000..92f90f3
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.status.api.v1.TaskMetricDistributions
+import org.apache.spark.util.Distribution
+import org.apache.spark.util.kvstore._
+
+class AppStatusStoreSuite extends SparkFunSuite {
+
+  private val uiQuantiles = Array(0.0, 0.25, 0.5, 0.75, 1.0)
+  private val stageId = 1
+  private val attemptId = 1
+
+  test("quantile calculation: 1 task") {
+    compareQuantiles(1, uiQuantiles)
+  }
+
+  test("quantile calculation: few tasks") {
+    compareQuantiles(4, uiQuantiles)
+  }
+
+  test("quantile calculation: more tasks") {
+    compareQuantiles(100, uiQuantiles)
+  }
+
+  test("quantile calculation: lots of tasks") {
+    compareQuantiles(4096, uiQuantiles)
+  }
+
+  test("quantile calculation: custom quantiles") {
+    compareQuantiles(4096, Array(0.01, 0.33, 0.5, 0.42, 0.69, 0.99))
+  }
+
+  test("quantile cache") {
+    val store = new InMemoryStore()
+    (0 until 4096).foreach { i => store.write(newTaskData(i)) }
+
+    val appStore = new AppStatusStore(store)
+
+    appStore.taskSummary(stageId, attemptId, Array(0.13d))
+    intercept[NoSuchElementException] {
+      store.read(classOf[CachedQuantile], Array(stageId, attemptId, "13"))
+    }
+
+    appStore.taskSummary(stageId, attemptId, Array(0.25d))
+    val d1 = store.read(classOf[CachedQuantile], Array(stageId, attemptId, 
"25"))
+
+    // Add a new task to force the cached quantile to be evicted, and make 
sure it's updated.
+    store.write(newTaskData(4096))
+    appStore.taskSummary(stageId, attemptId, Array(0.25d, 0.50d, 0.73d))
+
+    val d2 = store.read(classOf[CachedQuantile], Array(stageId, attemptId, 
"25"))
+    assert(d1.taskCount != d2.taskCount)
+
+    store.read(classOf[CachedQuantile], Array(stageId, attemptId, "50"))
+    intercept[NoSuchElementException] {
+      store.read(classOf[CachedQuantile], Array(stageId, attemptId, "73"))
+    }
+
+    assert(store.count(classOf[CachedQuantile]) === 2)
+  }
+
+  private def compareQuantiles(count: Int, quantiles: Array[Double]): Unit = {
+    val store = new InMemoryStore()
+    val values = (0 until count).map { i =>
+      val task = newTaskData(i)
+      store.write(task)
+      i.toDouble
+    }.toArray
+
+    val summary = new AppStatusStore(store).taskSummary(stageId, attemptId, 
quantiles).get
+    val dist = new Distribution(values, 0, 
values.length).getQuantiles(quantiles.sorted)
+
+    dist.zip(summary.executorRunTime).foreach { case (expected, actual) =>
+      assert(expected === actual)
+    }
+  }
+
+  private def newTaskData(i: Int): TaskDataWrapper = {
+    new TaskDataWrapper(
+      i, i, i, i, i, i, i.toString, i.toString, i.toString, i.toString, false, 
Nil, None,
+      i, i, i, i, i, i, i, i, i, i,
+      i, i, i, i, i, i, i, i, i, i,
+      i, i, i, i, stageId, attemptId)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/b7813012/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala 
b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
index 661d0d4..0aeddf7 100644
--- a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
@@ -28,6 +28,7 @@ import org.apache.spark._
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.scheduler._
 import org.apache.spark.status.AppStatusStore
+import org.apache.spark.status.config._
 import org.apache.spark.ui.jobs.{StagePage, StagesTab}
 
 class StagePageSuite extends SparkFunSuite with LocalSparkContext {
@@ -35,15 +36,13 @@ class StagePageSuite extends SparkFunSuite with 
LocalSparkContext {
   private val peakExecutionMemory = 10
 
   test("peak execution memory should displayed") {
-    val conf = new SparkConf(false)
-    val html = renderStagePage(conf).toString().toLowerCase(Locale.ROOT)
+    val html = renderStagePage().toString().toLowerCase(Locale.ROOT)
     val targetString = "peak execution memory"
     assert(html.contains(targetString))
   }
 
   test("SPARK-10543: peak execution memory should be per-task rather than 
cumulative") {
-    val conf = new SparkConf(false)
-    val html = renderStagePage(conf).toString().toLowerCase(Locale.ROOT)
+    val html = renderStagePage().toString().toLowerCase(Locale.ROOT)
     // verify min/25/50/75/max show task value not cumulative values
     assert(html.contains(s"<td>$peakExecutionMemory.0 b</td>" * 5))
   }
@@ -52,7 +51,8 @@ class StagePageSuite extends SparkFunSuite with 
LocalSparkContext {
    * Render a stage page started with the given conf and return the HTML.
    * This also runs a dummy stage to populate the page with useful content.
    */
-  private def renderStagePage(conf: SparkConf): Seq[Node] = {
+  private def renderStagePage(): Seq[Node] = {
+    val conf = new SparkConf(false).set(LIVE_ENTITY_UPDATE_PERIOD, 0L)
     val statusStore = AppStatusStore.createLiveStore(conf)
     val listener = statusStore.listener.get
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b7813012/scalastyle-config.xml
----------------------------------------------------------------------
diff --git a/scalastyle-config.xml b/scalastyle-config.xml
index 7bdd3fa..e2fa575 100644
--- a/scalastyle-config.xml
+++ b/scalastyle-config.xml
@@ -93,7 +93,7 @@ This file is divided into 3 sections:
     <parameters><parameter 
name="regex"><![CDATA[^[a-z][A-Za-z]*$]]></parameter></parameters>
   </check>
 
-  <check level="error" 
class="org.scalastyle.scalariform.ParameterNumberChecker" enabled="true">
+  <check customId="argcount" level="error" 
class="org.scalastyle.scalariform.ParameterNumberChecker" enabled="true">
     <parameters><parameter 
name="maxParameters"><![CDATA[10]]></parameter></parameters>
   </check>
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to