Repository: spark
Updated Branches:
  refs/heads/master c979c8bba -> db36e1e75


[SPARK-15590][WEBUI] Paginate Job Table in Jobs tab

## What changes were proposed in this pull request?

This patch adds pagination support for the Job Tables in the Jobs tab. 
Pagination is provided for all of the three Job Tables (active, completed, and 
failed). Interactions (jumping, sorting, and setting page size) for paged 
tables are also included.

The diff didn't keep track of some lines based on the original ones. The 
function `makeRow`of the original `AllJobsPage.scala` is reused. They are 
separated at the beginning of the function `jobRow` (L427-439) and the function 
`row`(L594-618) in the new `AllJobsPage.scala`.

## How was this patch tested?

Tested manually by using checking the Web UI after completing and failing 
hundreds of jobs.
Generate completed jobs by:
```scala
val d = sc.parallelize(Array(1,2,3,4,5))
for(i <- 1 to 255){ var b = d.collect() }
```
Generate failed jobs by calling the following code multiple times:
```scala
var b = d.map(_/0).collect()
```
Interactions like jumping, sorting, and setting page size are all tested.

This shows the pagination for completed jobs:
![paginate success 
jobs](https://cloud.githubusercontent.com/assets/5558370/15986498/efa12ef6-303b-11e6-8b1d-c3382aeb9ad0.png)

This shows the sorting works in job tables:
![sorting](https://cloud.githubusercontent.com/assets/5558370/15986539/98c8a81a-303c-11e6-86f2-8d2bc7924ee9.png)

This shows the pagination for failed jobs and the effect of jumping and setting 
page size:
![paginate failed 
jobs](https://cloud.githubusercontent.com/assets/5558370/15986556/d8c1323e-303c-11e6-8e4b-7bdb030ea42b.png)

Author: Tao Lin <nblin...@gmail.com>

Closes #13620 from nblintao/dev.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/db36e1e7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/db36e1e7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/db36e1e7

Branch: refs/heads/master
Commit: db36e1e75d69d63b76312e85ae3a6c95cebbe65e
Parents: c979c8b
Author: Tao Lin <nblin...@gmail.com>
Authored: Mon Jul 25 17:35:50 2016 -0700
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Mon Jul 25 17:35:50 2016 -0700

----------------------------------------------------------------------
 .../org/apache/spark/ui/jobs/AllJobsPage.scala  | 369 ++++++++++++++++---
 .../org/apache/spark/ui/UISeleniumSuite.scala   |   5 +-
 2 files changed, 312 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/db36e1e7/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
index 035d706..e5363ce 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
@@ -17,17 +17,21 @@
 
 package org.apache.spark.ui.jobs
 
+import java.net.URLEncoder
 import java.util.Date
 import javax.servlet.http.HttpServletRequest
 
+import scala.collection.JavaConverters._
 import scala.collection.mutable.{HashMap, ListBuffer}
 import scala.xml._
 
 import org.apache.commons.lang3.StringEscapeUtils
 
 import org.apache.spark.JobExecutionStatus
-import org.apache.spark.ui.{ToolTips, UIUtils, WebUIPage}
-import org.apache.spark.ui.jobs.UIData.{ExecutorUIData, JobUIData}
+import org.apache.spark.scheduler.StageInfo
+import org.apache.spark.ui._
+import org.apache.spark.ui.jobs.UIData.{ExecutorUIData, JobUIData, StageUIData}
+import org.apache.spark.util.Utils
 
 /** Page showing list of all ongoing and recently finished jobs */
 private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {
@@ -210,64 +214,69 @@ private[ui] class AllJobsPage(parent: JobsTab) extends 
WebUIPage("") {
     </script>
   }
 
-  private def jobsTable(jobs: Seq[JobUIData]): Seq[Node] = {
+  private def jobsTable(
+      request: HttpServletRequest,
+      jobTag: String,
+      jobs: Seq[JobUIData]): Seq[Node] = {
+    val allParameters = request.getParameterMap.asScala.toMap
+    val parameterOtherTable = allParameters.filterNot(_._1.startsWith(jobTag))
+      .map(para => para._1 + "=" + para._2(0))
+
     val someJobHasJobGroup = jobs.exists(_.jobGroup.isDefined)
+    val jobIdTitle = if (someJobHasJobGroup) "Job Id (Job Group)" else "Job Id"
 
-    val columns: Seq[Node] = {
-      <th>{if (someJobHasJobGroup) "Job Id (Job Group)" else "Job Id"}</th>
-      <th>Description</th>
-      <th>Submitted</th>
-      <th>Duration</th>
-      <th class="sorttable_nosort">Stages: Succeeded/Total</th>
-      <th class="sorttable_nosort">Tasks (for all stages): Succeeded/Total</th>
-    }
+    val parameterJobPage = request.getParameter(jobTag + ".page")
+    val parameterJobSortColumn = request.getParameter(jobTag + ".sort")
+    val parameterJobSortDesc = request.getParameter(jobTag + ".desc")
+    val parameterJobPageSize = request.getParameter(jobTag + ".pageSize")
+    val parameterJobPrevPageSize = request.getParameter(jobTag + 
".prevPageSize")
 
-    def makeRow(job: JobUIData): Seq[Node] = {
-      val (lastStageName, lastStageDescription) = 
getLastStageNameAndDescription(job)
-      val duration: Option[Long] = {
-        job.submissionTime.map { start =>
-          val end = job.completionTime.getOrElse(System.currentTimeMillis())
-          end - start
-        }
+    val jobPage = Option(parameterJobPage).map(_.toInt).getOrElse(1)
+    val jobSortColumn = Option(parameterJobSortColumn).map { sortColumn =>
+      UIUtils.decodeURLParameter(sortColumn)
+    }.getOrElse(jobIdTitle)
+    val jobSortDesc = Option(parameterJobSortDesc).map(_.toBoolean).getOrElse(
+      // New jobs should be shown above old jobs by default.
+      if (jobSortColumn == jobIdTitle) true else false
+    )
+    val jobPageSize = Option(parameterJobPageSize).map(_.toInt).getOrElse(100)
+    val jobPrevPageSize = 
Option(parameterJobPrevPageSize).map(_.toInt).getOrElse(jobPageSize)
+
+    val page: Int = {
+      // If the user has changed to a larger page size, then go to page 1 in 
order to avoid
+      // IndexOutOfBoundsException.
+      if (jobPageSize <= jobPrevPageSize) {
+        jobPage
+      } else {
+        1
       }
-      val formattedDuration = duration.map(d => 
UIUtils.formatDuration(d)).getOrElse("Unknown")
-      val formattedSubmissionTime = 
job.submissionTime.map(UIUtils.formatDate).getOrElse("Unknown")
-      val basePathUri = UIUtils.prependBaseUri(parent.basePath)
-      val jobDescription =
-        UIUtils.makeDescription(lastStageDescription, basePathUri, plainText = 
false)
-
-      val detailUrl = "%s/jobs/job?id=%s".format(basePathUri, job.jobId)
-      <tr id={"job-" + job.jobId}>
-        <td sorttable_customkey={job.jobId.toString}>
-          {job.jobId} {job.jobGroup.map(id => s"($id)").getOrElse("")}
-        </td>
-        <td>
-          {jobDescription}
-          <a href={detailUrl} class="name-link">{lastStageName}</a>
-        </td>
-        <td sorttable_customkey={job.submissionTime.getOrElse(-1).toString}>
-          {formattedSubmissionTime}
-        </td>
-        <td 
sorttable_customkey={duration.getOrElse(-1).toString}>{formattedDuration}</td>
-        <td class="stage-progress-cell">
-          {job.completedStageIndices.size}/{job.stageIds.size - 
job.numSkippedStages}
-          {if (job.numFailedStages > 0) s"(${job.numFailedStages} failed)"}
-          {if (job.numSkippedStages > 0) s"(${job.numSkippedStages} skipped)"}
-        </td>
-        <td class="progress-cell">
-          {UIUtils.makeProgressBar(started = job.numActiveTasks, completed = 
job.numCompletedTasks,
-           failed = job.numFailedTasks, skipped = job.numSkippedTasks, killed 
= job.numKilledTasks,
-           total = job.numTasks - job.numSkippedTasks)}
-        </td>
-      </tr>
     }
+    val currentTime = System.currentTimeMillis()
 
-    <table class="table table-bordered table-striped table-condensed sortable">
-      <thead>{columns}</thead>
-      <tbody>
-        {jobs.map(makeRow)}
-      </tbody>
-    </table>
+    try {
+      new JobPagedTable(
+        jobs,
+        jobTag,
+        UIUtils.prependBaseUri(parent.basePath),
+        "jobs", // subPath
+        parameterOtherTable,
+        parent.jobProgresslistener.stageIdToInfo,
+        parent.jobProgresslistener.stageIdToData,
+        currentTime,
+        jobIdTitle,
+        pageSize = jobPageSize,
+        sortColumn = jobSortColumn,
+        desc = jobSortDesc
+      ).table(page)
+    } catch {
+      case e @ (_ : IllegalArgumentException | _ : IndexOutOfBoundsException) 
=>
+        <div class="alert alert-error">
+          <p>Error while rendering job table:</p>
+          <pre>
+            {Utils.exceptionString(e)}
+          </pre>
+        </div>
+    }
   }
 
   def render(request: HttpServletRequest): Seq[Node] = {
@@ -279,12 +288,9 @@ private[ui] class AllJobsPage(parent: JobsTab) extends 
WebUIPage("") {
       val completedJobs = listener.completedJobs.reverse.toSeq
       val failedJobs = listener.failedJobs.reverse.toSeq
 
-      val activeJobsTable =
-        jobsTable(activeJobs.sortBy(_.submissionTime.getOrElse(-1L)).reverse)
-      val completedJobsTable =
-        
jobsTable(completedJobs.sortBy(_.completionTime.getOrElse(-1L)).reverse)
-      val failedJobsTable =
-        jobsTable(failedJobs.sortBy(_.completionTime.getOrElse(-1L)).reverse)
+      val activeJobsTable = jobsTable(request, "activeJob", activeJobs)
+      val completedJobsTable = jobsTable(request, "completedJob", 
completedJobs)
+      val failedJobsTable = jobsTable(request, "failedJob", failedJobs)
 
       val shouldShowActiveJobs = activeJobs.nonEmpty
       val shouldShowCompletedJobs = completedJobs.nonEmpty
@@ -369,3 +375,246 @@ private[ui] class AllJobsPage(parent: JobsTab) extends 
WebUIPage("") {
     }
   }
 }
+
+private[ui] class JobTableRowData(
+    val jobData: JobUIData,
+    val lastStageName: String,
+    val lastStageDescription: String,
+    val duration: Long,
+    val formattedDuration: String,
+    val submissionTime: Long,
+    val formattedSubmissionTime: String,
+    val jobDescription: NodeSeq,
+    val detailUrl: String)
+
+private[ui] class JobDataSource(
+    jobs: Seq[JobUIData],
+    stageIdToInfo: HashMap[Int, StageInfo],
+    stageIdToData: HashMap[(Int, Int), StageUIData],
+    basePath: String,
+    currentTime: Long,
+    pageSize: Int,
+    sortColumn: String,
+    desc: Boolean) extends PagedDataSource[JobTableRowData](pageSize) {
+
+  // Convert JobUIData to JobTableRowData which contains the final contents to 
show in the table
+  // so that we can avoid creating duplicate contents during sorting the data
+  private val data = jobs.map(jobRow).sorted(ordering(sortColumn, desc))
+
+  private var _slicedJobIds: Set[Int] = null
+
+  override def dataSize: Int = data.size
+
+  override def sliceData(from: Int, to: Int): Seq[JobTableRowData] = {
+    val r = data.slice(from, to)
+    _slicedJobIds = r.map(_.jobData.jobId).toSet
+    r
+  }
+
+  private def getLastStageNameAndDescription(job: JobUIData): (String, String) 
= {
+    val lastStageInfo = Option(job.stageIds)
+      .filter(_.nonEmpty)
+      .flatMap { ids => stageIdToInfo.get(ids.max)}
+    val lastStageData = lastStageInfo.flatMap { s =>
+      stageIdToData.get((s.stageId, s.attemptId))
+    }
+    val name = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)")
+    val description = lastStageData.flatMap(_.description).getOrElse("")
+    (name, description)
+  }
+
+  private def jobRow(jobData: JobUIData): JobTableRowData = {
+    val (lastStageName, lastStageDescription) = 
getLastStageNameAndDescription(jobData)
+    val duration: Option[Long] = {
+      jobData.submissionTime.map { start =>
+        val end = jobData.completionTime.getOrElse(System.currentTimeMillis())
+        end - start
+      }
+    }
+    val formattedDuration = duration.map(d => 
UIUtils.formatDuration(d)).getOrElse("Unknown")
+    val submissionTime = jobData.submissionTime
+    val formattedSubmissionTime = 
submissionTime.map(UIUtils.formatDate).getOrElse("Unknown")
+    val jobDescription = UIUtils.makeDescription(lastStageDescription, 
basePath, plainText = false)
+
+    val detailUrl = "%s/jobs/job?id=%s".format(basePath, jobData.jobId)
+
+    new JobTableRowData (
+      jobData,
+      lastStageName,
+      lastStageDescription,
+      duration.getOrElse(-1),
+      formattedDuration,
+      submissionTime.getOrElse(-1),
+      formattedSubmissionTime,
+      jobDescription,
+      detailUrl
+    )
+  }
+
+  /**
+   * Return Ordering according to sortColumn and desc
+   */
+  private def ordering(sortColumn: String, desc: Boolean): 
Ordering[JobTableRowData] = {
+    val ordering = sortColumn match {
+      case "Job Id" | "Job Id (Job Group)" => new Ordering[JobTableRowData] {
+        override def compare(x: JobTableRowData, y: JobTableRowData): Int =
+          Ordering.Int.compare(x.jobData.jobId, y.jobData.jobId)
+      }
+      case "Description" => new Ordering[JobTableRowData] {
+        override def compare(x: JobTableRowData, y: JobTableRowData): Int =
+          Ordering.String.compare(x.lastStageDescription, 
y.lastStageDescription)
+      }
+      case "Submitted" => new Ordering[JobTableRowData] {
+        override def compare(x: JobTableRowData, y: JobTableRowData): Int =
+          Ordering.Long.compare(x.submissionTime, y.submissionTime)
+      }
+      case "Duration" => new Ordering[JobTableRowData] {
+        override def compare(x: JobTableRowData, y: JobTableRowData): Int =
+          Ordering.Long.compare(x.duration, y.duration)
+      }
+      case "Stages: Succeeded/Total" | "Tasks (for all stages): 
Succeeded/Total" =>
+        throw new IllegalArgumentException(s"Unsortable column: $sortColumn")
+      case unknownColumn => throw new IllegalArgumentException(s"Unknown 
column: $unknownColumn")
+    }
+    if (desc) {
+      ordering.reverse
+    } else {
+      ordering
+    }
+  }
+
+}
+private[ui] class JobPagedTable(
+    data: Seq[JobUIData],
+    jobTag: String,
+    basePath: String,
+    subPath: String,
+    parameterOtherTable: Iterable[String],
+    stageIdToInfo: HashMap[Int, StageInfo],
+    stageIdToData: HashMap[(Int, Int), StageUIData],
+    currentTime: Long,
+    jobIdTitle: String,
+    pageSize: Int,
+    sortColumn: String,
+    desc: Boolean
+  ) extends PagedTable[JobTableRowData] {
+  val parameterPath = UIUtils.prependBaseUri(basePath) + s"/$subPath/?" +
+    parameterOtherTable.mkString("&")
+
+  override def tableId: String = jobTag + "-table"
+
+  override def tableCssClass: String =
+    "table table-bordered table-condensed table-striped table-head-clickable"
+
+  override def pageSizeFormField: String = jobTag + ".pageSize"
+
+  override def prevPageSizeFormField: String = jobTag + ".prevPageSize"
+
+  override def pageNumberFormField: String = jobTag + ".page"
+
+  override val dataSource = new JobDataSource(
+    data,
+    stageIdToInfo,
+    stageIdToData,
+    basePath,
+    currentTime,
+    pageSize,
+    sortColumn,
+    desc)
+
+  override def pageLink(page: Int): String = {
+    val encodedSortColumn = URLEncoder.encode(sortColumn, "UTF-8")
+    parameterPath +
+      s"&$pageNumberFormField=$page" +
+      s"&$jobTag.sort=$encodedSortColumn" +
+      s"&$jobTag.desc=$desc" +
+      s"&$pageSizeFormField=$pageSize"
+  }
+
+  override def goButtonFormPath: String = {
+    val encodedSortColumn = URLEncoder.encode(sortColumn, "UTF-8")
+    s"$parameterPath&$jobTag.sort=$encodedSortColumn&$jobTag.desc=$desc"
+  }
+
+  override def headers: Seq[Node] = {
+    // Information for each header: title, cssClass, and sortable
+    val jobHeadersAndCssClasses: Seq[(String, String, Boolean)] =
+      Seq(
+        (jobIdTitle, "", true),
+        ("Description", "", true), ("Submitted", "", true), ("Duration", "", 
true),
+        ("Stages: Succeeded/Total", "", false),
+        ("Tasks (for all stages): Succeeded/Total", "", false)
+      )
+
+    if (!jobHeadersAndCssClasses.filter(_._3).map(_._1).contains(sortColumn)) {
+      throw new IllegalArgumentException(s"Unknown column: $sortColumn")
+    }
+
+    val headerRow: Seq[Node] = {
+      jobHeadersAndCssClasses.map { case (header, cssClass, sortable) =>
+        if (header == sortColumn) {
+          val headerLink = Unparsed(
+            parameterPath +
+              s"&$jobTag.sort=${URLEncoder.encode(header, "UTF-8")}" +
+              s"&$jobTag.desc=${!desc}" +
+              s"&$jobTag.pageSize=$pageSize")
+          val arrow = if (desc) "&#x25BE;" else "&#x25B4;" // UP or DOWN
+
+          <th class={cssClass}>
+            <a href={headerLink}>
+              {header}<span>
+              &nbsp;{Unparsed(arrow)}
+            </span>
+            </a>
+          </th>
+        } else {
+          if (sortable) {
+            val headerLink = Unparsed(
+              parameterPath +
+                s"&$jobTag.sort=${URLEncoder.encode(header, "UTF-8")}" +
+                s"&$jobTag.pageSize=$pageSize")
+
+            <th class={cssClass}>
+              <a href={headerLink}>
+                {header}
+              </a>
+            </th>
+          } else {
+            <th class={cssClass}>
+              {header}
+            </th>
+          }
+        }
+      }
+    }
+    <thead>{headerRow}</thead>
+  }
+
+  override def row(jobTableRow: JobTableRowData): Seq[Node] = {
+    val job = jobTableRow.jobData
+
+    <tr id={"job-" + job.jobId}>
+      <td>
+        {job.jobId} {job.jobGroup.map(id => s"($id)").getOrElse("")}
+      </td>
+      <td>
+        {jobTableRow.jobDescription}
+        <a href={jobTableRow.detailUrl} 
class="name-link">{jobTableRow.lastStageName}</a>
+      </td>
+      <td>
+        {jobTableRow.formattedSubmissionTime}
+      </td>
+      <td>{jobTableRow.formattedDuration}</td>
+      <td class="stage-progress-cell">
+        {job.completedStageIndices.size}/{job.stageIds.size - 
job.numSkippedStages}
+        {if (job.numFailedStages > 0) s"(${job.numFailedStages} failed)"}
+        {if (job.numSkippedStages > 0) s"(${job.numSkippedStages} skipped)"}
+      </td>
+      <td class="progress-cell">
+        {UIUtils.makeProgressBar(started = job.numActiveTasks, completed = 
job.numCompletedTasks,
+        failed = job.numFailedTasks, skipped = job.numSkippedTasks, killed = 
job.numKilledTasks,
+        total = job.numTasks - job.numSkippedTasks)}
+      </td>
+    </tr>
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/db36e1e7/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala 
b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
index b0a35fe..fd12a21 100644
--- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
@@ -218,7 +218,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser 
with Matchers with B
       eventually(timeout(5 seconds), interval(50 milliseconds)) {
         goToUi(sc, "/jobs")
         val tableHeaders = findAll(cssSelector("th")).map(_.text).toSeq
-        tableHeaders should not contain "Job Id (Job Group)"
+        tableHeaders(0) should not startWith "Job Id (Job Group)"
       }
       // Once at least one job has been run in a job group, then we should 
display the group name:
       sc.setJobGroup("my-job-group", "my-job-group-description")
@@ -226,7 +226,8 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser 
with Matchers with B
       eventually(timeout(5 seconds), interval(50 milliseconds)) {
         goToUi(sc, "/jobs")
         val tableHeaders = findAll(cssSelector("th")).map(_.text).toSeq
-        tableHeaders should contain ("Job Id (Job Group)")
+        // Can suffix up/down arrow in the header
+        tableHeaders(0) should startWith ("Job Id (Job Group)")
       }
 
       val jobJson = getJson(sc.ui.get, "jobs")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to