Repository: spark
Updated Branches:
  refs/heads/master 21eadd1d8 -> 478b71d02


[SPARK-15591][WEBUI] Paginate Stage Table in Stages tab

## What changes were proposed in this pull request?

This patch adds pagination support for the Stage Tables in the Stage tab. 
Pagination is provided for all of the four Job Tables (active, pending, 
completed, and failed). Besides, the paged stage tables are also used in 
JobPage (the detail page for one job) and PoolPage.

Interactions (jumping, sorting, and setting page size) for paged tables are 
also included.

## How was this patch tested?

Tested manually by using checking the Web UI after completing and failing 
hundreds of jobs.  Same as the testings for [Paginate Job Table in Jobs 
tab](https://github.com/apache/spark/pull/13620).

This shows the pagination for completed stages:
![paged stage 
table](https://cloud.githubusercontent.com/assets/5558370/16125696/5804e35e-3427-11e6-8923-5c5948982648.png)

Author: Tao Lin <[email protected]>

Closes #13708 from nblintao/stageTable.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/478b71d0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/478b71d0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/478b71d0

Branch: refs/heads/master
Commit: 478b71d028107d42fbb6d1bd300b86efbe0dcf7d
Parents: 21eadd1
Author: Tao Lin <[email protected]>
Authored: Wed Jul 6 10:28:05 2016 -0700
Committer: Shixiong Zhu <[email protected]>
Committed: Wed Jul 6 10:28:05 2016 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/ui/PagedTable.scala  |   1 +
 .../apache/spark/ui/jobs/AllStagesPage.scala    |  25 +-
 .../org/apache/spark/ui/jobs/JobPage.scala      |  24 +-
 .../org/apache/spark/ui/jobs/PoolPage.scala     |  15 +-
 .../org/apache/spark/ui/jobs/StageTable.scala   | 517 +++++++++++++++----
 5 files changed, 441 insertions(+), 141 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/478b71d0/core/src/main/scala/org/apache/spark/ui/PagedTable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/PagedTable.scala 
b/core/src/main/scala/org/apache/spark/ui/PagedTable.scala
index 9b6ed8c..2a7c16b 100644
--- a/core/src/main/scala/org/apache/spark/ui/PagedTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/PagedTable.scala
@@ -179,6 +179,7 @@ private[ui] trait PagedTable[T] {
           Splitter
             .on('&')
             .trimResults()
+            .omitEmptyStrings()
             .withKeyValueSeparator("=")
             .split(querystring)
             .asScala

http://git-wip-us.apache.org/repos/asf/spark/blob/478b71d0/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala
index e75f1c5..cba8f82 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala
@@ -38,22 +38,24 @@ private[ui] class AllStagesPage(parent: StagesTab) extends 
WebUIPage("") {
       val numCompletedStages = listener.numCompletedStages
       val failedStages = listener.failedStages.reverse.toSeq
       val numFailedStages = listener.numFailedStages
-      val now = System.currentTimeMillis
+      val subPath = "stages"
 
       val activeStagesTable =
-        new StageTableBase(activeStages.sortBy(_.submissionTime).reverse,
-          parent.basePath, parent.progressListener, isFairScheduler = 
parent.isFairScheduler,
-          killEnabled = parent.killEnabled)
+        new StageTableBase(request, activeStages, "activeStage", 
parent.basePath, subPath,
+          parent.progressListener, parent.isFairScheduler,
+          killEnabled = parent.killEnabled, isFailedStage = false)
       val pendingStagesTable =
-        new StageTableBase(pendingStages.sortBy(_.submissionTime).reverse,
-          parent.basePath, parent.progressListener, isFairScheduler = 
parent.isFairScheduler,
-          killEnabled = false)
+        new StageTableBase(request, pendingStages, "pendingStage", 
parent.basePath, subPath,
+          parent.progressListener, parent.isFairScheduler,
+          killEnabled = false, isFailedStage = false)
       val completedStagesTable =
-        new StageTableBase(completedStages.sortBy(_.submissionTime).reverse, 
parent.basePath,
-          parent.progressListener, isFairScheduler = parent.isFairScheduler, 
killEnabled = false)
+        new StageTableBase(request, completedStages, "completedStage", 
parent.basePath, subPath,
+          parent.progressListener, parent.isFairScheduler,
+          killEnabled = false, isFailedStage = false)
       val failedStagesTable =
-        new FailedStageTable(failedStages.sortBy(_.submissionTime).reverse, 
parent.basePath,
-          parent.progressListener, isFairScheduler = parent.isFairScheduler)
+        new StageTableBase(request, failedStages, "failedStage", 
parent.basePath, subPath,
+          parent.progressListener, parent.isFairScheduler,
+          killEnabled = false, isFailedStage = true)
 
       // For now, pool information is only accessible in live UIs
       val pools = sc.map(_.getAllPools).getOrElse(Seq.empty[Schedulable])
@@ -136,3 +138,4 @@ private[ui] class AllStagesPage(parent: StagesTab) extends 
WebUIPage("") {
     }
   }
 }
+

http://git-wip-us.apache.org/repos/asf/spark/blob/478b71d0/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
index 99f2bd8..0ec42d6 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
@@ -229,20 +229,24 @@ private[ui] class JobPage(parent: JobsTab) extends 
WebUIPage("job") {
         }
       }
 
+      val basePath = "jobs/job"
+
       val activeStagesTable =
-        new StageTableBase(activeStages.sortBy(_.submissionTime).reverse,
-          parent.basePath, parent.jobProgresslistener, isFairScheduler = 
parent.isFairScheduler,
-          killEnabled = parent.killEnabled)
+        new StageTableBase(request, activeStages, "activeStage", 
parent.basePath,
+          basePath, parent.jobProgresslistener, parent.isFairScheduler,
+          killEnabled = parent.killEnabled, isFailedStage = false)
       val pendingOrSkippedStagesTable =
-        new StageTableBase(pendingOrSkippedStages.sortBy(_.stageId).reverse,
-          parent.basePath, parent.jobProgresslistener, isFairScheduler = 
parent.isFairScheduler,
-          killEnabled = false)
+        new StageTableBase(request, pendingOrSkippedStages, "pendingStage", 
parent.basePath,
+          basePath, parent.jobProgresslistener, parent.isFairScheduler,
+          killEnabled = false, isFailedStage = false)
       val completedStagesTable =
-        new StageTableBase(completedStages.sortBy(_.submissionTime).reverse, 
parent.basePath,
-          parent.jobProgresslistener, isFairScheduler = 
parent.isFairScheduler, killEnabled = false)
+        new StageTableBase(request, completedStages, "completedStage", 
parent.basePath,
+          basePath, parent.jobProgresslistener, parent.isFairScheduler,
+          killEnabled = false, isFailedStage = false)
       val failedStagesTable =
-        new FailedStageTable(failedStages.sortBy(_.submissionTime).reverse, 
parent.basePath,
-          parent.jobProgresslistener, isFairScheduler = parent.isFairScheduler)
+        new StageTableBase(request, failedStages, "failedStage", 
parent.basePath,
+          basePath, parent.jobProgresslistener, parent.isFairScheduler,
+          killEnabled = false, isFailedStage = true)
 
       val shouldShowActiveStages = activeStages.nonEmpty
       val shouldShowPendingStages = !isComplete && 
pendingOrSkippedStages.nonEmpty

http://git-wip-us.apache.org/repos/asf/spark/blob/478b71d0/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
index 6cd2591..f9cb717 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
@@ -42,9 +42,11 @@ private[ui] class PoolPage(parent: StagesTab) extends 
WebUIPage("pool") {
         case Some(s) => s.values.toSeq
         case None => Seq[StageInfo]()
       }
-      val activeStagesTable = new 
StageTableBase(activeStages.sortBy(_.submissionTime).reverse,
-        parent.basePath, parent.progressListener, isFairScheduler = 
parent.isFairScheduler,
-        killEnabled = parent.killEnabled)
+      val shouldShowActiveStages = activeStages.nonEmpty
+      val activeStagesTable =
+        new StageTableBase(request, activeStages, "activeStage", 
parent.basePath, "stages/pool",
+          parent.progressListener, parent.isFairScheduler, parent.killEnabled,
+          isFailedStage = false)
 
       // For now, pool information is only accessible in live UIs
       val pools = sc.map(_.getPoolForName(poolName).getOrElse {
@@ -52,9 +54,10 @@ private[ui] class PoolPage(parent: StagesTab) extends 
WebUIPage("pool") {
       }).toSeq
       val poolTable = new PoolTable(pools, parent)
 
-      val content =
-        <h4>Summary </h4> ++ poolTable.toNodeSeq ++
-        <h4>{activeStages.size} Active Stages</h4> ++ 
activeStagesTable.toNodeSeq
+      var content = <h4>Summary </h4> ++ poolTable.toNodeSeq
+      if (shouldShowActiveStages) {
+        content ++= <h4>{activeStages.size} Active Stages</h4> ++ 
activeStagesTable.toNodeSeq
+      }
 
       UIUtils.headerSparkPage("Fair Scheduler Pool: " + poolName, content, 
parent)
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/478b71d0/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
index 0e02015..2a04e8f 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
@@ -17,61 +17,326 @@
 
 package org.apache.spark.ui.jobs
 
+import java.net.URLEncoder
 import java.util.Date
+import javax.servlet.http.HttpServletRequest
 
-import scala.xml.{Node, Text}
+import scala.collection.JavaConverters._
+import scala.xml._
 
 import org.apache.commons.lang3.StringEscapeUtils
 
 import org.apache.spark.scheduler.StageInfo
-import org.apache.spark.ui.{ToolTips, UIUtils}
+import org.apache.spark.ui._
+import org.apache.spark.ui.jobs.UIData.StageUIData
 import org.apache.spark.util.Utils
 
-/** Page showing list of all ongoing and recently finished stages */
 private[ui] class StageTableBase(
+    request: HttpServletRequest,
+    stages: Seq[StageInfo],
+    stageTag: String,
+    basePath: String,
+    subPath: String,
+    progressListener: JobProgressListener,
+    isFairScheduler: Boolean,
+    killEnabled: Boolean,
+    isFailedStage: Boolean) {
+  val allParameters = request.getParameterMap().asScala.toMap
+  val parameterOtherTable = allParameters.filterNot(_._1.startsWith(stageTag))
+    .map(para => para._1 + "=" + para._2(0))
+
+  val parameterStagePage = request.getParameter(stageTag + ".page")
+  val parameterStageSortColumn = request.getParameter(stageTag + ".sort")
+  val parameterStageSortDesc = request.getParameter(stageTag + ".desc")
+  val parameterStagePageSize = request.getParameter(stageTag + ".pageSize")
+  val parameterStagePrevPageSize = request.getParameter(stageTag + 
".prevPageSize")
+
+  val stagePage = Option(parameterStagePage).map(_.toInt).getOrElse(1)
+  val stageSortColumn = Option(parameterStageSortColumn).map { sortColumn =>
+    UIUtils.decodeURLParameter(sortColumn)
+  }.getOrElse("Stage Id")
+  val stageSortDesc = 
Option(parameterStageSortDesc).map(_.toBoolean).getOrElse(
+    // New stages should be shown above old jobs by default.
+    if (stageSortColumn == "Stage Id") true else false
+  )
+  val stagePageSize = 
Option(parameterStagePageSize).map(_.toInt).getOrElse(100)
+  val stagePrevPageSize = Option(parameterStagePrevPageSize).map(_.toInt)
+    .getOrElse(stagePageSize)
+
+  val page: Int = {
+    // If the user has changed to a larger page size, then go to page 1 in 
order to avoid
+    // IndexOutOfBoundsException.
+    if (stagePageSize <= stagePrevPageSize) {
+      stagePage
+    } else {
+      1
+    }
+  }
+  val currentTime = System.currentTimeMillis()
+
+  val toNodeSeq = try {
+    new StagePagedTable(
+      stages,
+      stageTag,
+      basePath,
+      subPath,
+      progressListener,
+      isFairScheduler,
+      killEnabled,
+      currentTime,
+      stagePageSize,
+      stageSortColumn,
+      stageSortDesc,
+      isFailedStage,
+      parameterOtherTable
+    ).table(page)
+  } catch {
+    case e @ (_ : IllegalArgumentException | _ : IndexOutOfBoundsException) =>
+      <div class="alert alert-error">
+        <p>Error while rendering stage table:</p>
+        <pre>
+          {Utils.exceptionString(e)}
+        </pre>
+      </div>
+  }
+}
+
+private[ui] class StageTableRowData(
+    val stageInfo: StageInfo,
+    val stageData: Option[StageUIData],
+    val stageId: Int,
+    val attemptId: Int,
+    val schedulingPool: String,
+    val description: String,
+    val descriptionOption: Option[String],
+    val submissionTime: Long,
+    val formattedSubmissionTime: String,
+    val duration: Long,
+    val formattedDuration: String,
+    val inputRead: Long,
+    val inputReadWithUnit: String,
+    val outputWrite: Long,
+    val outputWriteWithUnit: String,
+    val shuffleRead: Long,
+    val shuffleReadWithUnit: String,
+    val shuffleWrite: Long,
+    val shuffleWriteWithUnit: String)
+
+private[ui] class MissingStageTableRowData(
+    stageInfo: StageInfo,
+    stageId: Int,
+    attemptId: Int) extends StageTableRowData(
+  stageInfo, None, stageId, attemptId, "", "", None, 0, "", -1, "", 0, "", 0, 
"", 0, "", 0, "")
+
+/** Page showing list of all ongoing and recently finished stages */
+private[ui] class StagePagedTable(
     stages: Seq[StageInfo],
+    stageTag: String,
     basePath: String,
+    subPath: String,
     listener: JobProgressListener,
     isFairScheduler: Boolean,
-    killEnabled: Boolean) {
-
-  protected def columns: Seq[Node] = {
-    <th>Stage Id</th> ++
-    {if (isFairScheduler) {<th>Pool Name</th>} else Seq.empty} ++
-    <th>Description</th>
-    <th>Submitted</th>
-    <th>Duration</th>
-    <th>Tasks: Succeeded/Total</th>
-    <th><span data-toggle="tooltip" title={ToolTips.INPUT}>Input</span></th>
-    <th><span data-toggle="tooltip" title={ToolTips.OUTPUT}>Output</span></th>
-    <th><span data-toggle="tooltip" title={ToolTips.SHUFFLE_READ}>Shuffle 
Read</span></th>
-    <th>
-      <!-- Place the shuffle write tooltip on the left (rather than the 
default position
-        of on top) because the shuffle write column is the last column on the 
right side and
-        the tooltip is wider than the column, so it doesn't fit on top. -->
-      <span data-toggle="tooltip" data-placement="left" 
title={ToolTips.SHUFFLE_WRITE}>
-        Shuffle Write
-      </span>
-    </th>
+    killEnabled: Boolean,
+    currentTime: Long,
+    pageSize: Int,
+    sortColumn: String,
+    desc: Boolean,
+    isFailedStage: Boolean,
+    parameterOtherTable: Iterable[String]) extends 
PagedTable[StageTableRowData] {
+
+  override def tableId: String = stageTag + "-table"
+
+  override def tableCssClass: String =
+    "table table-bordered table-condensed table-striped table-head-clickable"
+
+  override def pageSizeFormField: String = stageTag + ".pageSize"
+
+  override def prevPageSizeFormField: String = stageTag + ".prevPageSize"
+
+  override def pageNumberFormField: String = stageTag + ".page"
+
+  val parameterPath = UIUtils.prependBaseUri(basePath) + s"/$subPath/?" +
+    parameterOtherTable.mkString("&")
+
+  override val dataSource = new StageDataSource(
+    stages,
+    listener,
+    currentTime,
+    pageSize,
+    sortColumn,
+    desc
+  )
+
+  override def pageLink(page: Int): String = {
+    val encodedSortColumn = URLEncoder.encode(sortColumn, "UTF-8")
+    parameterPath +
+      s"&$pageNumberFormField=$page" +
+      s"&$stageTag.sort=$encodedSortColumn" +
+      s"&$stageTag.desc=$desc" +
+      s"&$pageSizeFormField=$pageSize"
+  }
+
+  override def goButtonFormPath: String = {
+    val encodedSortColumn = URLEncoder.encode(sortColumn, "UTF-8")
+    s"$parameterPath&$stageTag.sort=$encodedSortColumn&$stageTag.desc=$desc"
   }
 
-  def toNodeSeq: Seq[Node] = {
-    listener.synchronized {
-      stageTable(renderStageRow, stages)
+  override def headers: Seq[Node] = {
+    // stageHeadersAndCssClasses has three parts: header title, tooltip 
information, and sortable.
+    // The tooltip information could be None, which indicates it does not have 
a tooltip.
+    // Otherwise, it has two parts: tooltip text, and position (true for left, 
false for default).
+    val stageHeadersAndCssClasses: Seq[(String, Option[(String, Boolean)], 
Boolean)] =
+      Seq(("Stage Id", None, true)) ++
+      {if (isFairScheduler) {Seq(("Pool Name", None, true))} else Seq.empty} ++
+      Seq(
+        ("Description", None, true), ("Submitted", None, true), ("Duration", 
None, true),
+        ("Tasks: Succeeded/Total", None, false),
+        ("Input", Some((ToolTips.INPUT, false)), true),
+        ("Output", Some((ToolTips.OUTPUT, false)), true),
+        ("Shuffle Read", Some((ToolTips.SHUFFLE_READ, false)), true),
+        ("Shuffle Write", Some((ToolTips.SHUFFLE_WRITE, true)), true)
+      ) ++
+      {if (isFailedStage) {Seq(("Failure Reason", None, false))} else 
Seq.empty}
+
+    if 
(!stageHeadersAndCssClasses.filter(_._3).map(_._1).contains(sortColumn)) {
+      throw new IllegalArgumentException(s"Unknown column: $sortColumn")
     }
+
+    val headerRow: Seq[Node] = {
+      stageHeadersAndCssClasses.map { case (header, tooltip, sortable) =>
+        val headerSpan = tooltip.map { case (title, left) =>
+          if (left) {
+            /* Place the shuffle write tooltip on the left (rather than the 
default position
+            of on top) because the shuffle write column is the last column on 
the right side and
+            the tooltip is wider than the column, so it doesn't fit on top. */
+            <span data-toggle="tooltip" data-placement="left" title={title}>
+              {header}
+            </span>
+          } else {
+            <span data-toggle="tooltip" title={title}>
+              {header}
+            </span>
+          }
+        }.getOrElse(
+          {header}
+        )
+
+        if (header == sortColumn) {
+          val headerLink = Unparsed(
+            parameterPath +
+              s"&$stageTag.sort=${URLEncoder.encode(header, "UTF-8")}" +
+              s"&$stageTag.desc=${!desc}" +
+              s"&$stageTag.pageSize=$pageSize")
+          val arrow = if (desc) "&#x25BE;" else "&#x25B4;" // UP or DOWN
+
+          <th>
+            <a href={headerLink}>
+              {headerSpan}<span>
+              &nbsp;{Unparsed(arrow)}
+            </span>
+            </a>
+          </th>
+        } else {
+          if (sortable) {
+            val headerLink = Unparsed(
+              parameterPath +
+                s"&$stageTag.sort=${URLEncoder.encode(header, "UTF-8")}" +
+                s"&$stageTag.pageSize=$pageSize")
+
+            <th>
+              <a href={headerLink}>
+                {headerSpan}
+              </a>
+            </th>
+          } else {
+            <th>
+              {headerSpan}
+            </th>
+          }
+        }
+      }
+    }
+    <thead>{headerRow}</thead>
+  }
+
+  override def row(data: StageTableRowData): Seq[Node] = {
+    <tr id={"stage-" + data.stageId + "-" + data.attemptId}>
+      {rowContent(data)}
+    </tr>
   }
 
-  /** Special table that merges two header cells. */
-  protected def stageTable[T](makeRow: T => Seq[Node], rows: Seq[T]): 
Seq[Node] = {
-    <table class="table table-bordered table-striped table-condensed sortable">
-      <thead>{columns}</thead>
-      <tbody>
-        {rows.map(r => makeRow(r))}
-      </tbody>
-    </table>
+  private def rowContent(data: StageTableRowData): Seq[Node] = {
+    data.stageData match {
+      case None => missingStageRow(data.stageId)
+      case Some(stageData) =>
+        val info = data.stageInfo
+
+        {if (data.attemptId > 0) {
+          <td>{data.stageId} (retry {data.attemptId})</td>
+        } else {
+          <td>{data.stageId}</td>
+        }} ++
+        {if (isFairScheduler) {
+          <td>
+            <a href={"%s/stages/pool?poolname=%s"
+              .format(UIUtils.prependBaseUri(basePath), data.schedulingPool)}>
+              {data.schedulingPool}
+            </a>
+          </td>
+        } else {
+          Seq.empty
+        }} ++
+        <td>{makeDescription(info, data.descriptionOption)}</td>
+        <td valign="middle">
+          {data.formattedSubmissionTime}
+        </td>
+        <td>{data.formattedDuration}</td>
+        <td class="progress-cell">
+          {UIUtils.makeProgressBar(started = stageData.numActiveTasks,
+          completed = stageData.completedIndices.size, failed = 
stageData.numFailedTasks,
+          skipped = 0, killed = stageData.numKilledTasks, total = 
info.numTasks)}
+        </td>
+        <td>{data.inputReadWithUnit}</td>
+        <td>{data.outputWriteWithUnit}</td>
+        <td>{data.shuffleReadWithUnit}</td>
+        <td>{data.shuffleWriteWithUnit}</td> ++
+        {
+          if (isFailedStage) {
+            failureReasonHtml(info)
+          } else {
+            Seq.empty
+          }
+        }
+    }
   }
 
-  private def makeDescription(s: StageInfo): Seq[Node] = {
+  private def failureReasonHtml(s: StageInfo): Seq[Node] = {
+    val failureReason = s.failureReason.getOrElse("")
+    val isMultiline = failureReason.indexOf('\n') >= 0
+    // Display the first line by default
+    val failureReasonSummary = StringEscapeUtils.escapeHtml4(
+      if (isMultiline) {
+        failureReason.substring(0, failureReason.indexOf('\n'))
+      } else {
+        failureReason
+      })
+    val details = if (isMultiline) {
+      // scalastyle:off
+      <span 
onclick="this.parentNode.querySelector('.stacktrace-details').classList.toggle('collapsed')"
+            class="expand-details">
+        +details
+      </span> ++
+        <div class="stacktrace-details collapsed">
+          <pre>{failureReason}</pre>
+        </div>
+      // scalastyle:on
+    } else {
+      ""
+    }
+    <td valign="middle">{failureReasonSummary}{details}</td>
+  }
+
+  private def makeDescription(s: StageInfo, descriptionOption: 
Option[String]): Seq[Node] = {
     val basePathUri = UIUtils.prependBaseUri(basePath)
 
     val killLink = if (killEnabled) {
@@ -111,12 +376,7 @@ private[ui] class StageTableBase(
       </div>
     }
 
-    val stageDesc = for {
-      stageData <- listener.stageIdToData.get((s.stageId, s.attemptId))
-      desc <- stageData.description
-    } yield {
-      UIUtils.makeDescription(desc, basePathUri)
-    }
+    val stageDesc = descriptionOption.map(UIUtils.makeDescription(_, 
basePathUri))
     <div>{stageDesc.getOrElse("")} {killLink} {nameLink} {details}</div>
   }
 
@@ -132,19 +392,44 @@ private[ui] class StageTableBase(
     <td></td> ++ // Shuffle Read
     <td></td> // Shuffle Write
   }
+}
+
+private[ui] class StageDataSource(
+    stages: Seq[StageInfo],
+    listener: JobProgressListener,
+    currentTime: Long,
+    pageSize: Int,
+    sortColumn: String,
+    desc: Boolean) extends PagedDataSource[StageTableRowData](pageSize) {
+  // Convert StageInfo to StageTableRowData which contains the final contents 
to show in the table
+  // so that we can avoid creating duplicate contents during sorting the data
+  private val data = stages.map(stageRow).sorted(ordering(sortColumn, desc))
+
+  private var _slicedStageIds: Set[Int] = null
 
-  protected def stageRow(s: StageInfo): Seq[Node] = {
+  override def dataSize: Int = data.size
+
+  override def sliceData(from: Int, to: Int): Seq[StageTableRowData] = {
+    val r = data.slice(from, to)
+    _slicedStageIds = r.map(_.stageId).toSet
+    r
+  }
+
+  private def stageRow(s: StageInfo): StageTableRowData = {
     val stageDataOption = listener.stageIdToData.get((s.stageId, s.attemptId))
+
     if (stageDataOption.isEmpty) {
-      return missingStageRow(s.stageId)
+      return new MissingStageTableRowData(s, s.stageId, s.attemptId)
     }
-
     val stageData = stageDataOption.get
-    val submissionTime = s.submissionTime match {
+
+    val description = stageData.description
+
+    val formattedSubmissionTime = s.submissionTime match {
       case Some(t) => UIUtils.formatDate(new Date(t))
       case None => "Unknown"
     }
-    val finishTime = s.completionTime.getOrElse(System.currentTimeMillis)
+    val finishTime = s.completionTime.getOrElse(currentTime)
 
     // The submission time for a stage is misleading because it counts the time
     // the stage waits to be launched. (SPARK-10930)
@@ -156,7 +441,7 @@ private[ui] class StageTableBase(
         if (finishTime > startTime) {
           Some(finishTime - startTime)
         } else {
-          Some(System.currentTimeMillis() - startTime)
+          Some(currentTime - startTime)
         }
       } else {
         None
@@ -172,76 +457,80 @@ private[ui] class StageTableBase(
     val shuffleWrite = stageData.shuffleWriteBytes
     val shuffleWriteWithUnit = if (shuffleWrite > 0) 
Utils.bytesToString(shuffleWrite) else ""
 
-    {if (s.attemptId > 0) {
-      <td>{s.stageId} (retry {s.attemptId})</td>
-    } else {
-      <td>{s.stageId}</td>
-    }} ++
-    {if (isFairScheduler) {
-      <td>
-        <a href={"%s/stages/pool?poolname=%s"
-          .format(UIUtils.prependBaseUri(basePath), stageData.schedulingPool)}>
-          {stageData.schedulingPool}
-        </a>
-      </td>
-    } else {
-      Seq.empty
-    }} ++
-    <td>{makeDescription(s)}</td>
-    <td sorttable_customkey={s.submissionTime.getOrElse(0).toString} 
valign="middle">
-      {submissionTime}
-    </td>
-    <td 
sorttable_customkey={duration.getOrElse(-1).toString}>{formattedDuration}</td>
-    <td class="progress-cell">
-      {UIUtils.makeProgressBar(started = stageData.numActiveTasks,
-        completed = stageData.completedIndices.size, failed = 
stageData.numFailedTasks,
-        skipped = 0, killed = stageData.numKilledTasks, total = s.numTasks)}
-    </td>
-    <td sorttable_customkey={inputRead.toString}>{inputReadWithUnit}</td>
-    <td sorttable_customkey={outputWrite.toString}>{outputWriteWithUnit}</td>
-    <td sorttable_customkey={shuffleRead.toString}>{shuffleReadWithUnit}</td>
-    <td sorttable_customkey={shuffleWrite.toString}>{shuffleWriteWithUnit}</td>
-  }
 
-  /** Render an HTML row that represents a stage */
-  private def renderStageRow(s: StageInfo): Seq[Node] =
-    <tr id={"stage-" + s.stageId + "-" + s.attemptId}>{stageRow(s)}</tr>
-}
-
-private[ui] class FailedStageTable(
-    stages: Seq[StageInfo],
-    basePath: String,
-    listener: JobProgressListener,
-    isFairScheduler: Boolean)
-  extends StageTableBase(stages, basePath, listener, isFairScheduler, 
killEnabled = false) {
-
-  override protected def columns: Seq[Node] = super.columns ++ <th>Failure 
Reason</th>
+    new StageTableRowData(
+      s,
+      stageDataOption,
+      s.stageId,
+      s.attemptId,
+      stageData.schedulingPool,
+      description.getOrElse(""),
+      description,
+      s.submissionTime.getOrElse(0),
+      formattedSubmissionTime,
+      duration.getOrElse(-1),
+      formattedDuration,
+      inputRead,
+      inputReadWithUnit,
+      outputWrite,
+      outputWriteWithUnit,
+      shuffleRead,
+      shuffleReadWithUnit,
+      shuffleWrite,
+      shuffleWriteWithUnit
+    )
+  }
 
-  override protected def stageRow(s: StageInfo): Seq[Node] = {
-    val basicColumns = super.stageRow(s)
-    val failureReason = s.failureReason.getOrElse("")
-    val isMultiline = failureReason.indexOf('\n') >= 0
-    // Display the first line by default
-    val failureReasonSummary = StringEscapeUtils.escapeHtml4(
-      if (isMultiline) {
-        failureReason.substring(0, failureReason.indexOf('\n'))
-      } else {
-        failureReason
-      })
-    val details = if (isMultiline) {
-      // scalastyle:off
-      <span 
onclick="this.parentNode.querySelector('.stacktrace-details').classList.toggle('collapsed')"
-            class="expand-details">
-        +details
-      </span> ++
-        <div class="stacktrace-details collapsed">
-          <pre>{failureReason}</pre>
-        </div>
-      // scalastyle:on
+  /**
+   * Return Ordering according to sortColumn and desc
+   */
+  private def ordering(sortColumn: String, desc: Boolean): 
Ordering[StageTableRowData] = {
+    val ordering = sortColumn match {
+      case "Stage Id" => new Ordering[StageTableRowData] {
+        override def compare(x: StageTableRowData, y: StageTableRowData): Int =
+          Ordering.Int.compare(x.stageId, y.stageId)
+      }
+      case "Pool Name" => new Ordering[StageTableRowData] {
+        override def compare(x: StageTableRowData, y: StageTableRowData): Int =
+          Ordering.String.compare(x.schedulingPool, y.schedulingPool)
+      }
+      case "Description" => new Ordering[StageTableRowData] {
+        override def compare(x: StageTableRowData, y: StageTableRowData): Int =
+          Ordering.String.compare(x.description, y.description)
+      }
+      case "Submitted" => new Ordering[StageTableRowData] {
+        override def compare(x: StageTableRowData, y: StageTableRowData): Int =
+          Ordering.Long.compare(x.submissionTime, y.submissionTime)
+      }
+      case "Duration" => new Ordering[StageTableRowData] {
+        override def compare(x: StageTableRowData, y: StageTableRowData): Int =
+          Ordering.Long.compare(x.duration, y.duration)
+      }
+      case "Input" => new Ordering[StageTableRowData] {
+        override def compare(x: StageTableRowData, y: StageTableRowData): Int =
+          Ordering.Long.compare(x.inputRead, y.inputRead)
+      }
+      case "Output" => new Ordering[StageTableRowData] {
+        override def compare(x: StageTableRowData, y: StageTableRowData): Int =
+          Ordering.Long.compare(x.outputWrite, y.outputWrite)
+      }
+      case "Shuffle Read" => new Ordering[StageTableRowData] {
+        override def compare(x: StageTableRowData, y: StageTableRowData): Int =
+          Ordering.Long.compare(x.shuffleRead, y.shuffleRead)
+      }
+      case "Shuffle Write" => new Ordering[StageTableRowData] {
+        override def compare(x: StageTableRowData, y: StageTableRowData): Int =
+          Ordering.Long.compare(x.shuffleWrite, y.shuffleWrite)
+      }
+      case "Tasks: Succeeded/Total" =>
+        throw new IllegalArgumentException(s"Unsortable column: $sortColumn")
+      case unknownColumn => throw new IllegalArgumentException(s"Unknown 
column: $unknownColumn")
+    }
+    if (desc) {
+      ordering.reverse
     } else {
-      ""
+      ordering
     }
-    val failureReasonHtml = <td 
valign="middle">{failureReasonSummary}{details}</td>
-    basicColumns ++ failureReasonHtml
   }
 }
+


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to