Repository: spark Updated Branches: refs/heads/master 21eadd1d8 -> 478b71d02
[SPARK-15591][WEBUI] Paginate Stage Table in Stages tab ## What changes were proposed in this pull request? This patch adds pagination support for the Stage Tables in the Stage tab. Pagination is provided for all of the four Job Tables (active, pending, completed, and failed). Besides, the paged stage tables are also used in JobPage (the detail page for one job) and PoolPage. Interactions (jumping, sorting, and setting page size) for paged tables are also included. ## How was this patch tested? Tested manually by using checking the Web UI after completing and failing hundreds of jobs. Same as the testings for [Paginate Job Table in Jobs tab](https://github.com/apache/spark/pull/13620). This shows the pagination for completed stages:  Author: Tao Lin <[email protected]> Closes #13708 from nblintao/stageTable. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/478b71d0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/478b71d0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/478b71d0 Branch: refs/heads/master Commit: 478b71d028107d42fbb6d1bd300b86efbe0dcf7d Parents: 21eadd1 Author: Tao Lin <[email protected]> Authored: Wed Jul 6 10:28:05 2016 -0700 Committer: Shixiong Zhu <[email protected]> Committed: Wed Jul 6 10:28:05 2016 -0700 ---------------------------------------------------------------------- .../scala/org/apache/spark/ui/PagedTable.scala | 1 + .../apache/spark/ui/jobs/AllStagesPage.scala | 25 +- .../org/apache/spark/ui/jobs/JobPage.scala | 24 +- .../org/apache/spark/ui/jobs/PoolPage.scala | 15 +- .../org/apache/spark/ui/jobs/StageTable.scala | 517 +++++++++++++++---- 5 files changed, 441 insertions(+), 141 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/478b71d0/core/src/main/scala/org/apache/spark/ui/PagedTable.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/PagedTable.scala b/core/src/main/scala/org/apache/spark/ui/PagedTable.scala index 9b6ed8c..2a7c16b 100644 --- a/core/src/main/scala/org/apache/spark/ui/PagedTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/PagedTable.scala @@ -179,6 +179,7 @@ private[ui] trait PagedTable[T] { Splitter .on('&') .trimResults() + .omitEmptyStrings() .withKeyValueSeparator("=") .split(querystring) .asScala http://git-wip-us.apache.org/repos/asf/spark/blob/478b71d0/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala index e75f1c5..cba8f82 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala @@ -38,22 +38,24 @@ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") { val numCompletedStages = listener.numCompletedStages val failedStages = listener.failedStages.reverse.toSeq val numFailedStages = listener.numFailedStages - val now = System.currentTimeMillis + val subPath = "stages" val activeStagesTable = - new StageTableBase(activeStages.sortBy(_.submissionTime).reverse, - parent.basePath, parent.progressListener, isFairScheduler = parent.isFairScheduler, - killEnabled = parent.killEnabled) + new StageTableBase(request, activeStages, "activeStage", parent.basePath, subPath, + parent.progressListener, parent.isFairScheduler, + killEnabled = parent.killEnabled, isFailedStage = false) val pendingStagesTable = - new StageTableBase(pendingStages.sortBy(_.submissionTime).reverse, - parent.basePath, parent.progressListener, isFairScheduler = parent.isFairScheduler, - killEnabled = false) + new StageTableBase(request, pendingStages, "pendingStage", parent.basePath, subPath, + parent.progressListener, parent.isFairScheduler, + killEnabled = false, isFailedStage = false) val completedStagesTable = - new StageTableBase(completedStages.sortBy(_.submissionTime).reverse, parent.basePath, - parent.progressListener, isFairScheduler = parent.isFairScheduler, killEnabled = false) + new StageTableBase(request, completedStages, "completedStage", parent.basePath, subPath, + parent.progressListener, parent.isFairScheduler, + killEnabled = false, isFailedStage = false) val failedStagesTable = - new FailedStageTable(failedStages.sortBy(_.submissionTime).reverse, parent.basePath, - parent.progressListener, isFairScheduler = parent.isFairScheduler) + new StageTableBase(request, failedStages, "failedStage", parent.basePath, subPath, + parent.progressListener, parent.isFairScheduler, + killEnabled = false, isFailedStage = true) // For now, pool information is only accessible in live UIs val pools = sc.map(_.getAllPools).getOrElse(Seq.empty[Schedulable]) @@ -136,3 +138,4 @@ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") { } } } + http://git-wip-us.apache.org/repos/asf/spark/blob/478b71d0/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala index 99f2bd8..0ec42d6 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala @@ -229,20 +229,24 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") { } } + val basePath = "jobs/job" + val activeStagesTable = - new StageTableBase(activeStages.sortBy(_.submissionTime).reverse, - parent.basePath, parent.jobProgresslistener, isFairScheduler = parent.isFairScheduler, - killEnabled = parent.killEnabled) + new StageTableBase(request, activeStages, "activeStage", parent.basePath, + basePath, parent.jobProgresslistener, parent.isFairScheduler, + killEnabled = parent.killEnabled, isFailedStage = false) val pendingOrSkippedStagesTable = - new StageTableBase(pendingOrSkippedStages.sortBy(_.stageId).reverse, - parent.basePath, parent.jobProgresslistener, isFairScheduler = parent.isFairScheduler, - killEnabled = false) + new StageTableBase(request, pendingOrSkippedStages, "pendingStage", parent.basePath, + basePath, parent.jobProgresslistener, parent.isFairScheduler, + killEnabled = false, isFailedStage = false) val completedStagesTable = - new StageTableBase(completedStages.sortBy(_.submissionTime).reverse, parent.basePath, - parent.jobProgresslistener, isFairScheduler = parent.isFairScheduler, killEnabled = false) + new StageTableBase(request, completedStages, "completedStage", parent.basePath, + basePath, parent.jobProgresslistener, parent.isFairScheduler, + killEnabled = false, isFailedStage = false) val failedStagesTable = - new FailedStageTable(failedStages.sortBy(_.submissionTime).reverse, parent.basePath, - parent.jobProgresslistener, isFairScheduler = parent.isFairScheduler) + new StageTableBase(request, failedStages, "failedStage", parent.basePath, + basePath, parent.jobProgresslistener, parent.isFairScheduler, + killEnabled = false, isFailedStage = true) val shouldShowActiveStages = activeStages.nonEmpty val shouldShowPendingStages = !isComplete && pendingOrSkippedStages.nonEmpty http://git-wip-us.apache.org/repos/asf/spark/blob/478b71d0/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala index 6cd2591..f9cb717 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala @@ -42,9 +42,11 @@ private[ui] class PoolPage(parent: StagesTab) extends WebUIPage("pool") { case Some(s) => s.values.toSeq case None => Seq[StageInfo]() } - val activeStagesTable = new StageTableBase(activeStages.sortBy(_.submissionTime).reverse, - parent.basePath, parent.progressListener, isFairScheduler = parent.isFairScheduler, - killEnabled = parent.killEnabled) + val shouldShowActiveStages = activeStages.nonEmpty + val activeStagesTable = + new StageTableBase(request, activeStages, "activeStage", parent.basePath, "stages/pool", + parent.progressListener, parent.isFairScheduler, parent.killEnabled, + isFailedStage = false) // For now, pool information is only accessible in live UIs val pools = sc.map(_.getPoolForName(poolName).getOrElse { @@ -52,9 +54,10 @@ private[ui] class PoolPage(parent: StagesTab) extends WebUIPage("pool") { }).toSeq val poolTable = new PoolTable(pools, parent) - val content = - <h4>Summary </h4> ++ poolTable.toNodeSeq ++ - <h4>{activeStages.size} Active Stages</h4> ++ activeStagesTable.toNodeSeq + var content = <h4>Summary </h4> ++ poolTable.toNodeSeq + if (shouldShowActiveStages) { + content ++= <h4>{activeStages.size} Active Stages</h4> ++ activeStagesTable.toNodeSeq + } UIUtils.headerSparkPage("Fair Scheduler Pool: " + poolName, content, parent) } http://git-wip-us.apache.org/repos/asf/spark/blob/478b71d0/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala index 0e02015..2a04e8f 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala @@ -17,61 +17,326 @@ package org.apache.spark.ui.jobs +import java.net.URLEncoder import java.util.Date +import javax.servlet.http.HttpServletRequest -import scala.xml.{Node, Text} +import scala.collection.JavaConverters._ +import scala.xml._ import org.apache.commons.lang3.StringEscapeUtils import org.apache.spark.scheduler.StageInfo -import org.apache.spark.ui.{ToolTips, UIUtils} +import org.apache.spark.ui._ +import org.apache.spark.ui.jobs.UIData.StageUIData import org.apache.spark.util.Utils -/** Page showing list of all ongoing and recently finished stages */ private[ui] class StageTableBase( + request: HttpServletRequest, + stages: Seq[StageInfo], + stageTag: String, + basePath: String, + subPath: String, + progressListener: JobProgressListener, + isFairScheduler: Boolean, + killEnabled: Boolean, + isFailedStage: Boolean) { + val allParameters = request.getParameterMap().asScala.toMap + val parameterOtherTable = allParameters.filterNot(_._1.startsWith(stageTag)) + .map(para => para._1 + "=" + para._2(0)) + + val parameterStagePage = request.getParameter(stageTag + ".page") + val parameterStageSortColumn = request.getParameter(stageTag + ".sort") + val parameterStageSortDesc = request.getParameter(stageTag + ".desc") + val parameterStagePageSize = request.getParameter(stageTag + ".pageSize") + val parameterStagePrevPageSize = request.getParameter(stageTag + ".prevPageSize") + + val stagePage = Option(parameterStagePage).map(_.toInt).getOrElse(1) + val stageSortColumn = Option(parameterStageSortColumn).map { sortColumn => + UIUtils.decodeURLParameter(sortColumn) + }.getOrElse("Stage Id") + val stageSortDesc = Option(parameterStageSortDesc).map(_.toBoolean).getOrElse( + // New stages should be shown above old jobs by default. + if (stageSortColumn == "Stage Id") true else false + ) + val stagePageSize = Option(parameterStagePageSize).map(_.toInt).getOrElse(100) + val stagePrevPageSize = Option(parameterStagePrevPageSize).map(_.toInt) + .getOrElse(stagePageSize) + + val page: Int = { + // If the user has changed to a larger page size, then go to page 1 in order to avoid + // IndexOutOfBoundsException. + if (stagePageSize <= stagePrevPageSize) { + stagePage + } else { + 1 + } + } + val currentTime = System.currentTimeMillis() + + val toNodeSeq = try { + new StagePagedTable( + stages, + stageTag, + basePath, + subPath, + progressListener, + isFairScheduler, + killEnabled, + currentTime, + stagePageSize, + stageSortColumn, + stageSortDesc, + isFailedStage, + parameterOtherTable + ).table(page) + } catch { + case e @ (_ : IllegalArgumentException | _ : IndexOutOfBoundsException) => + <div class="alert alert-error"> + <p>Error while rendering stage table:</p> + <pre> + {Utils.exceptionString(e)} + </pre> + </div> + } +} + +private[ui] class StageTableRowData( + val stageInfo: StageInfo, + val stageData: Option[StageUIData], + val stageId: Int, + val attemptId: Int, + val schedulingPool: String, + val description: String, + val descriptionOption: Option[String], + val submissionTime: Long, + val formattedSubmissionTime: String, + val duration: Long, + val formattedDuration: String, + val inputRead: Long, + val inputReadWithUnit: String, + val outputWrite: Long, + val outputWriteWithUnit: String, + val shuffleRead: Long, + val shuffleReadWithUnit: String, + val shuffleWrite: Long, + val shuffleWriteWithUnit: String) + +private[ui] class MissingStageTableRowData( + stageInfo: StageInfo, + stageId: Int, + attemptId: Int) extends StageTableRowData( + stageInfo, None, stageId, attemptId, "", "", None, 0, "", -1, "", 0, "", 0, "", 0, "", 0, "") + +/** Page showing list of all ongoing and recently finished stages */ +private[ui] class StagePagedTable( stages: Seq[StageInfo], + stageTag: String, basePath: String, + subPath: String, listener: JobProgressListener, isFairScheduler: Boolean, - killEnabled: Boolean) { - - protected def columns: Seq[Node] = { - <th>Stage Id</th> ++ - {if (isFairScheduler) {<th>Pool Name</th>} else Seq.empty} ++ - <th>Description</th> - <th>Submitted</th> - <th>Duration</th> - <th>Tasks: Succeeded/Total</th> - <th><span data-toggle="tooltip" title={ToolTips.INPUT}>Input</span></th> - <th><span data-toggle="tooltip" title={ToolTips.OUTPUT}>Output</span></th> - <th><span data-toggle="tooltip" title={ToolTips.SHUFFLE_READ}>Shuffle Read</span></th> - <th> - <!-- Place the shuffle write tooltip on the left (rather than the default position - of on top) because the shuffle write column is the last column on the right side and - the tooltip is wider than the column, so it doesn't fit on top. --> - <span data-toggle="tooltip" data-placement="left" title={ToolTips.SHUFFLE_WRITE}> - Shuffle Write - </span> - </th> + killEnabled: Boolean, + currentTime: Long, + pageSize: Int, + sortColumn: String, + desc: Boolean, + isFailedStage: Boolean, + parameterOtherTable: Iterable[String]) extends PagedTable[StageTableRowData] { + + override def tableId: String = stageTag + "-table" + + override def tableCssClass: String = + "table table-bordered table-condensed table-striped table-head-clickable" + + override def pageSizeFormField: String = stageTag + ".pageSize" + + override def prevPageSizeFormField: String = stageTag + ".prevPageSize" + + override def pageNumberFormField: String = stageTag + ".page" + + val parameterPath = UIUtils.prependBaseUri(basePath) + s"/$subPath/?" + + parameterOtherTable.mkString("&") + + override val dataSource = new StageDataSource( + stages, + listener, + currentTime, + pageSize, + sortColumn, + desc + ) + + override def pageLink(page: Int): String = { + val encodedSortColumn = URLEncoder.encode(sortColumn, "UTF-8") + parameterPath + + s"&$pageNumberFormField=$page" + + s"&$stageTag.sort=$encodedSortColumn" + + s"&$stageTag.desc=$desc" + + s"&$pageSizeFormField=$pageSize" + } + + override def goButtonFormPath: String = { + val encodedSortColumn = URLEncoder.encode(sortColumn, "UTF-8") + s"$parameterPath&$stageTag.sort=$encodedSortColumn&$stageTag.desc=$desc" } - def toNodeSeq: Seq[Node] = { - listener.synchronized { - stageTable(renderStageRow, stages) + override def headers: Seq[Node] = { + // stageHeadersAndCssClasses has three parts: header title, tooltip information, and sortable. + // The tooltip information could be None, which indicates it does not have a tooltip. + // Otherwise, it has two parts: tooltip text, and position (true for left, false for default). + val stageHeadersAndCssClasses: Seq[(String, Option[(String, Boolean)], Boolean)] = + Seq(("Stage Id", None, true)) ++ + {if (isFairScheduler) {Seq(("Pool Name", None, true))} else Seq.empty} ++ + Seq( + ("Description", None, true), ("Submitted", None, true), ("Duration", None, true), + ("Tasks: Succeeded/Total", None, false), + ("Input", Some((ToolTips.INPUT, false)), true), + ("Output", Some((ToolTips.OUTPUT, false)), true), + ("Shuffle Read", Some((ToolTips.SHUFFLE_READ, false)), true), + ("Shuffle Write", Some((ToolTips.SHUFFLE_WRITE, true)), true) + ) ++ + {if (isFailedStage) {Seq(("Failure Reason", None, false))} else Seq.empty} + + if (!stageHeadersAndCssClasses.filter(_._3).map(_._1).contains(sortColumn)) { + throw new IllegalArgumentException(s"Unknown column: $sortColumn") } + + val headerRow: Seq[Node] = { + stageHeadersAndCssClasses.map { case (header, tooltip, sortable) => + val headerSpan = tooltip.map { case (title, left) => + if (left) { + /* Place the shuffle write tooltip on the left (rather than the default position + of on top) because the shuffle write column is the last column on the right side and + the tooltip is wider than the column, so it doesn't fit on top. */ + <span data-toggle="tooltip" data-placement="left" title={title}> + {header} + </span> + } else { + <span data-toggle="tooltip" title={title}> + {header} + </span> + } + }.getOrElse( + {header} + ) + + if (header == sortColumn) { + val headerLink = Unparsed( + parameterPath + + s"&$stageTag.sort=${URLEncoder.encode(header, "UTF-8")}" + + s"&$stageTag.desc=${!desc}" + + s"&$stageTag.pageSize=$pageSize") + val arrow = if (desc) "▾" else "▴" // UP or DOWN + + <th> + <a href={headerLink}> + {headerSpan}<span> + {Unparsed(arrow)} + </span> + </a> + </th> + } else { + if (sortable) { + val headerLink = Unparsed( + parameterPath + + s"&$stageTag.sort=${URLEncoder.encode(header, "UTF-8")}" + + s"&$stageTag.pageSize=$pageSize") + + <th> + <a href={headerLink}> + {headerSpan} + </a> + </th> + } else { + <th> + {headerSpan} + </th> + } + } + } + } + <thead>{headerRow}</thead> + } + + override def row(data: StageTableRowData): Seq[Node] = { + <tr id={"stage-" + data.stageId + "-" + data.attemptId}> + {rowContent(data)} + </tr> } - /** Special table that merges two header cells. */ - protected def stageTable[T](makeRow: T => Seq[Node], rows: Seq[T]): Seq[Node] = { - <table class="table table-bordered table-striped table-condensed sortable"> - <thead>{columns}</thead> - <tbody> - {rows.map(r => makeRow(r))} - </tbody> - </table> + private def rowContent(data: StageTableRowData): Seq[Node] = { + data.stageData match { + case None => missingStageRow(data.stageId) + case Some(stageData) => + val info = data.stageInfo + + {if (data.attemptId > 0) { + <td>{data.stageId} (retry {data.attemptId})</td> + } else { + <td>{data.stageId}</td> + }} ++ + {if (isFairScheduler) { + <td> + <a href={"%s/stages/pool?poolname=%s" + .format(UIUtils.prependBaseUri(basePath), data.schedulingPool)}> + {data.schedulingPool} + </a> + </td> + } else { + Seq.empty + }} ++ + <td>{makeDescription(info, data.descriptionOption)}</td> + <td valign="middle"> + {data.formattedSubmissionTime} + </td> + <td>{data.formattedDuration}</td> + <td class="progress-cell"> + {UIUtils.makeProgressBar(started = stageData.numActiveTasks, + completed = stageData.completedIndices.size, failed = stageData.numFailedTasks, + skipped = 0, killed = stageData.numKilledTasks, total = info.numTasks)} + </td> + <td>{data.inputReadWithUnit}</td> + <td>{data.outputWriteWithUnit}</td> + <td>{data.shuffleReadWithUnit}</td> + <td>{data.shuffleWriteWithUnit}</td> ++ + { + if (isFailedStage) { + failureReasonHtml(info) + } else { + Seq.empty + } + } + } } - private def makeDescription(s: StageInfo): Seq[Node] = { + private def failureReasonHtml(s: StageInfo): Seq[Node] = { + val failureReason = s.failureReason.getOrElse("") + val isMultiline = failureReason.indexOf('\n') >= 0 + // Display the first line by default + val failureReasonSummary = StringEscapeUtils.escapeHtml4( + if (isMultiline) { + failureReason.substring(0, failureReason.indexOf('\n')) + } else { + failureReason + }) + val details = if (isMultiline) { + // scalastyle:off + <span onclick="this.parentNode.querySelector('.stacktrace-details').classList.toggle('collapsed')" + class="expand-details"> + +details + </span> ++ + <div class="stacktrace-details collapsed"> + <pre>{failureReason}</pre> + </div> + // scalastyle:on + } else { + "" + } + <td valign="middle">{failureReasonSummary}{details}</td> + } + + private def makeDescription(s: StageInfo, descriptionOption: Option[String]): Seq[Node] = { val basePathUri = UIUtils.prependBaseUri(basePath) val killLink = if (killEnabled) { @@ -111,12 +376,7 @@ private[ui] class StageTableBase( </div> } - val stageDesc = for { - stageData <- listener.stageIdToData.get((s.stageId, s.attemptId)) - desc <- stageData.description - } yield { - UIUtils.makeDescription(desc, basePathUri) - } + val stageDesc = descriptionOption.map(UIUtils.makeDescription(_, basePathUri)) <div>{stageDesc.getOrElse("")} {killLink} {nameLink} {details}</div> } @@ -132,19 +392,44 @@ private[ui] class StageTableBase( <td></td> ++ // Shuffle Read <td></td> // Shuffle Write } +} + +private[ui] class StageDataSource( + stages: Seq[StageInfo], + listener: JobProgressListener, + currentTime: Long, + pageSize: Int, + sortColumn: String, + desc: Boolean) extends PagedDataSource[StageTableRowData](pageSize) { + // Convert StageInfo to StageTableRowData which contains the final contents to show in the table + // so that we can avoid creating duplicate contents during sorting the data + private val data = stages.map(stageRow).sorted(ordering(sortColumn, desc)) + + private var _slicedStageIds: Set[Int] = null - protected def stageRow(s: StageInfo): Seq[Node] = { + override def dataSize: Int = data.size + + override def sliceData(from: Int, to: Int): Seq[StageTableRowData] = { + val r = data.slice(from, to) + _slicedStageIds = r.map(_.stageId).toSet + r + } + + private def stageRow(s: StageInfo): StageTableRowData = { val stageDataOption = listener.stageIdToData.get((s.stageId, s.attemptId)) + if (stageDataOption.isEmpty) { - return missingStageRow(s.stageId) + return new MissingStageTableRowData(s, s.stageId, s.attemptId) } - val stageData = stageDataOption.get - val submissionTime = s.submissionTime match { + + val description = stageData.description + + val formattedSubmissionTime = s.submissionTime match { case Some(t) => UIUtils.formatDate(new Date(t)) case None => "Unknown" } - val finishTime = s.completionTime.getOrElse(System.currentTimeMillis) + val finishTime = s.completionTime.getOrElse(currentTime) // The submission time for a stage is misleading because it counts the time // the stage waits to be launched. (SPARK-10930) @@ -156,7 +441,7 @@ private[ui] class StageTableBase( if (finishTime > startTime) { Some(finishTime - startTime) } else { - Some(System.currentTimeMillis() - startTime) + Some(currentTime - startTime) } } else { None @@ -172,76 +457,80 @@ private[ui] class StageTableBase( val shuffleWrite = stageData.shuffleWriteBytes val shuffleWriteWithUnit = if (shuffleWrite > 0) Utils.bytesToString(shuffleWrite) else "" - {if (s.attemptId > 0) { - <td>{s.stageId} (retry {s.attemptId})</td> - } else { - <td>{s.stageId}</td> - }} ++ - {if (isFairScheduler) { - <td> - <a href={"%s/stages/pool?poolname=%s" - .format(UIUtils.prependBaseUri(basePath), stageData.schedulingPool)}> - {stageData.schedulingPool} - </a> - </td> - } else { - Seq.empty - }} ++ - <td>{makeDescription(s)}</td> - <td sorttable_customkey={s.submissionTime.getOrElse(0).toString} valign="middle"> - {submissionTime} - </td> - <td sorttable_customkey={duration.getOrElse(-1).toString}>{formattedDuration}</td> - <td class="progress-cell"> - {UIUtils.makeProgressBar(started = stageData.numActiveTasks, - completed = stageData.completedIndices.size, failed = stageData.numFailedTasks, - skipped = 0, killed = stageData.numKilledTasks, total = s.numTasks)} - </td> - <td sorttable_customkey={inputRead.toString}>{inputReadWithUnit}</td> - <td sorttable_customkey={outputWrite.toString}>{outputWriteWithUnit}</td> - <td sorttable_customkey={shuffleRead.toString}>{shuffleReadWithUnit}</td> - <td sorttable_customkey={shuffleWrite.toString}>{shuffleWriteWithUnit}</td> - } - /** Render an HTML row that represents a stage */ - private def renderStageRow(s: StageInfo): Seq[Node] = - <tr id={"stage-" + s.stageId + "-" + s.attemptId}>{stageRow(s)}</tr> -} - -private[ui] class FailedStageTable( - stages: Seq[StageInfo], - basePath: String, - listener: JobProgressListener, - isFairScheduler: Boolean) - extends StageTableBase(stages, basePath, listener, isFairScheduler, killEnabled = false) { - - override protected def columns: Seq[Node] = super.columns ++ <th>Failure Reason</th> + new StageTableRowData( + s, + stageDataOption, + s.stageId, + s.attemptId, + stageData.schedulingPool, + description.getOrElse(""), + description, + s.submissionTime.getOrElse(0), + formattedSubmissionTime, + duration.getOrElse(-1), + formattedDuration, + inputRead, + inputReadWithUnit, + outputWrite, + outputWriteWithUnit, + shuffleRead, + shuffleReadWithUnit, + shuffleWrite, + shuffleWriteWithUnit + ) + } - override protected def stageRow(s: StageInfo): Seq[Node] = { - val basicColumns = super.stageRow(s) - val failureReason = s.failureReason.getOrElse("") - val isMultiline = failureReason.indexOf('\n') >= 0 - // Display the first line by default - val failureReasonSummary = StringEscapeUtils.escapeHtml4( - if (isMultiline) { - failureReason.substring(0, failureReason.indexOf('\n')) - } else { - failureReason - }) - val details = if (isMultiline) { - // scalastyle:off - <span onclick="this.parentNode.querySelector('.stacktrace-details').classList.toggle('collapsed')" - class="expand-details"> - +details - </span> ++ - <div class="stacktrace-details collapsed"> - <pre>{failureReason}</pre> - </div> - // scalastyle:on + /** + * Return Ordering according to sortColumn and desc + */ + private def ordering(sortColumn: String, desc: Boolean): Ordering[StageTableRowData] = { + val ordering = sortColumn match { + case "Stage Id" => new Ordering[StageTableRowData] { + override def compare(x: StageTableRowData, y: StageTableRowData): Int = + Ordering.Int.compare(x.stageId, y.stageId) + } + case "Pool Name" => new Ordering[StageTableRowData] { + override def compare(x: StageTableRowData, y: StageTableRowData): Int = + Ordering.String.compare(x.schedulingPool, y.schedulingPool) + } + case "Description" => new Ordering[StageTableRowData] { + override def compare(x: StageTableRowData, y: StageTableRowData): Int = + Ordering.String.compare(x.description, y.description) + } + case "Submitted" => new Ordering[StageTableRowData] { + override def compare(x: StageTableRowData, y: StageTableRowData): Int = + Ordering.Long.compare(x.submissionTime, y.submissionTime) + } + case "Duration" => new Ordering[StageTableRowData] { + override def compare(x: StageTableRowData, y: StageTableRowData): Int = + Ordering.Long.compare(x.duration, y.duration) + } + case "Input" => new Ordering[StageTableRowData] { + override def compare(x: StageTableRowData, y: StageTableRowData): Int = + Ordering.Long.compare(x.inputRead, y.inputRead) + } + case "Output" => new Ordering[StageTableRowData] { + override def compare(x: StageTableRowData, y: StageTableRowData): Int = + Ordering.Long.compare(x.outputWrite, y.outputWrite) + } + case "Shuffle Read" => new Ordering[StageTableRowData] { + override def compare(x: StageTableRowData, y: StageTableRowData): Int = + Ordering.Long.compare(x.shuffleRead, y.shuffleRead) + } + case "Shuffle Write" => new Ordering[StageTableRowData] { + override def compare(x: StageTableRowData, y: StageTableRowData): Int = + Ordering.Long.compare(x.shuffleWrite, y.shuffleWrite) + } + case "Tasks: Succeeded/Total" => + throw new IllegalArgumentException(s"Unsortable column: $sortColumn") + case unknownColumn => throw new IllegalArgumentException(s"Unknown column: $unknownColumn") + } + if (desc) { + ordering.reverse } else { - "" + ordering } - val failureReasonHtml = <td valign="middle">{failureReasonSummary}{details}</td> - basicColumns ++ failureReasonHtml } } + --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
