[SPARK-4145] Web UI job pages This PR adds two new pages to the Spark Web UI:
- A jobs overview page, which shows details on running / completed / failed jobs. - A job details page, which displays information on an individual job's stages. The jobs overview page is now the default UI homepage; the old homepage is still accessible at `/stages`. ### Screenshots #### New UI homepage ![image](https://cloud.githubusercontent.com/assets/50748/5119035/fd0a69e6-701f-11e4-89cb-db7e9705714f.png) #### Job details page (This is effectively a per-job version of the stages page that can be extended later with other things, such as DAG visualizations) ![image](https://cloud.githubusercontent.com/assets/50748/5134910/50b340d4-70c7-11e4-88e1-6b73237ea7c8.png) ### Key changes in this PR - Rename `JobProgressPage` to `AllStagesPage` - Expose `StageInfo` objects in the ``SparkListenerJobStart` event; add backwards-compatibility tests to JsonProtocol. - Add additional data structures to `JobProgressListener` to map from stages to jobs. - Add several fields to `JobUIData`. I also added ~150 lines of Selenium tests as I uncovered UI issues while developing this patch. ### Limitations If a job contains stages that aren't run, then its overall job progress bar may be an underestimate of the total job progress; in other words, a completed job may appear to have a progress bar that's not at 100%. If stages or tasks fail, then the progress bar will not go backwards to reflect the true amount of remaining work. Author: Josh Rosen <joshro...@databricks.com> Closes #3009 from JoshRosen/job-page and squashes the following commits: eb05e90 [Josh Rosen] Disable kill button in completed stages tables. f00c851 [Josh Rosen] Fix JsonProtocol compatibility b89c258 [Josh Rosen] More JSON protocol backwards-compatibility fixes. ff804cd [Josh Rosen] Don't write "Stage Ids" field in JobStartEvent JSON. 6f17f3f [Josh Rosen] Only store StageInfos in SparkListenerJobStart event. 2bbf41a [Josh Rosen] Update job progress bar to reflect skipped tasks/stages. 61c265a [Josh Rosen] Add âskipped stagesâ table; only display non-empty tables. 1f45d44 [Josh Rosen] Incorporate a bunch of minor review feedback. 0b77e3e [Josh Rosen] More bug fixes for phantom stages. 034aa8d [Josh Rosen] Use `.max()` to find result stage for job. eebdc2c [Josh Rosen] Donât display pending stages for completed jobs. 67080ba [Josh Rosen] Ensure that "phantom stages" don't cause memory leaks. 7d10b97 [Josh Rosen] Merge remote-tracking branch 'apache/master' into job-page d69c775 [Josh Rosen] Fix table sorting on all jobs page. 5eb39dc [Josh Rosen] Add pending stages table to job page. f2a15da [Josh Rosen] Add status field to job details page. 171b53c [Josh Rosen] Move `startTime` to the start of SparkContext. e2f2c43 [Josh Rosen] Fix sorting of stages in job details page. 8955f4c [Josh Rosen] Display information for pending stages on jobs page. 8ab6c28 [Josh Rosen] Compute numTasks from job start stage infos. 5884f91 [Josh Rosen] Add StageInfos to SparkListenerJobStart event. 79793cd [Josh Rosen] Track indices of completed stage to avoid overcounting when failures occur. d62ea7b [Josh Rosen] Add failing Selenium test for stage overcounting issue. 1145c60 [Josh Rosen] Display text instead of progress bar for stages. 3d0a007 [Josh Rosen] Merge remote-tracking branch 'origin/master' into job-page 8a2351b [Josh Rosen] Add help tooltip to Spark Jobs page. b7bf30e [Josh Rosen] Add stages progress bar; fix bug where active stages show as completed. 4846ce4 [Josh Rosen] Hide "(Job Group") if no jobs were submitted in job groups. 4d58e55 [Josh Rosen] Change label to "Tasks (for all stages)" 85e9c85 [Josh Rosen] Extract startTime into separate variable. 1cf4987 [Josh Rosen] Fix broken kill links; add Selenium test to avoid future regressions. 56701fa [Josh Rosen] Move last stage name / description logic out of markup. a475ea1 [Josh Rosen] Add progress bars to jobs page. 45343b8 [Josh Rosen] More comments 4b206fb [Josh Rosen] Merge remote-tracking branch 'origin/master' into job-page bfce2b9 [Josh Rosen] Address review comments, except for progress bar. 4487dcb [Josh Rosen] [SPARK-4145] Web UI job pages 2568a6c [Josh Rosen] Rename JobProgressPage to AllStagesPage: (cherry picked from commit 4a90276ab22d6989dffb2ee2d8118d9253365646) Signed-off-by: Patrick Wendell <pwend...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2d35cc08 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2d35cc08 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2d35cc08 Branch: refs/heads/branch-1.2 Commit: 2d35cc0852e5ce426b143b51d03a71f16ad06c11 Parents: 97b7eb4 Author: Josh Rosen <joshro...@databricks.com> Authored: Mon Nov 24 13:18:14 2014 -0800 Committer: Patrick Wendell <pwend...@gmail.com> Committed: Mon Nov 24 13:18:39 2014 -0800 ---------------------------------------------------------------------- .../scala/org/apache/spark/SparkContext.scala | 4 +- .../apache/spark/scheduler/DAGScheduler.scala | 7 +- .../apache/spark/scheduler/SparkListener.scala | 11 +- .../scala/org/apache/spark/ui/SparkUI.scala | 13 +- .../scala/org/apache/spark/ui/UIUtils.scala | 27 +- .../org/apache/spark/ui/jobs/AllJobsPage.scala | 151 ++++++++++ .../apache/spark/ui/jobs/AllStagesPage.scala | 102 +++++++ .../apache/spark/ui/jobs/ExecutorTable.scala | 2 +- .../org/apache/spark/ui/jobs/JobPage.scala | 177 ++++++++++++ .../spark/ui/jobs/JobProgressListener.scala | 99 ++++++- .../apache/spark/ui/jobs/JobProgressPage.scala | 99 ------- .../apache/spark/ui/jobs/JobProgressTab.scala | 53 ---- .../org/apache/spark/ui/jobs/JobsTab.scala | 32 +++ .../org/apache/spark/ui/jobs/PoolPage.scala | 7 +- .../org/apache/spark/ui/jobs/PoolTable.scala | 2 +- .../org/apache/spark/ui/jobs/StagePage.scala | 2 +- .../org/apache/spark/ui/jobs/StageTable.scala | 43 +-- .../org/apache/spark/ui/jobs/StagesTab.scala | 51 ++++ .../scala/org/apache/spark/ui/jobs/UIData.scala | 21 +- .../org/apache/spark/util/JsonProtocol.scala | 23 +- .../org/apache/spark/ui/UISeleniumSuite.scala | 201 +++++++++++++- .../ui/jobs/JobProgressListenerSuite.scala | 8 +- .../apache/spark/util/JsonProtocolSuite.scala | 276 ++++++++++++++++++- 23 files changed, 1195 insertions(+), 216 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/2d35cc08/core/src/main/scala/org/apache/spark/SparkContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index ae8bbfb..e95819d 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -83,6 +83,8 @@ class SparkContext(config: SparkConf) extends Logging { // contains a map from hostname to a list of input format splits on the host. private[spark] var preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map() + val startTime = System.currentTimeMillis() + /** * Create a SparkContext that loads settings from system properties (for instance, when * launching with ./bin/spark-submit). @@ -269,8 +271,6 @@ class SparkContext(config: SparkConf) extends Logging { /** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */ val hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(conf) - val startTime = System.currentTimeMillis() - // Add each JAR given through the constructor if (jars != null) { jars.foreach(addJar) http://git-wip-us.apache.org/repos/asf/spark/blob/2d35cc08/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 2244951..b1222af 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -751,14 +751,15 @@ class DAGScheduler( localExecutionEnabled && allowLocal && finalStage.parents.isEmpty && partitions.length == 1 if (shouldRunLocally) { // Compute very short actions like first() or take() with no parent stages locally. - listenerBus.post(SparkListenerJobStart(job.jobId, Array[Int](), properties)) + listenerBus.post(SparkListenerJobStart(job.jobId, Seq.empty, properties)) runLocally(job) } else { jobIdToActiveJob(jobId) = job activeJobs += job finalStage.resultOfJob = Some(job) - listenerBus.post(SparkListenerJobStart(job.jobId, jobIdToStageIds(jobId).toArray, - properties)) + val stageIds = jobIdToStageIds(jobId).toArray + val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo)) + listenerBus.post(SparkListenerJobStart(job.jobId, stageInfos, properties)) submitStage(finalStage) } } http://git-wip-us.apache.org/repos/asf/spark/blob/2d35cc08/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index 86afe3b..b62b0c1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -56,8 +56,15 @@ case class SparkListenerTaskEnd( extends SparkListenerEvent @DeveloperApi -case class SparkListenerJobStart(jobId: Int, stageIds: Seq[Int], properties: Properties = null) - extends SparkListenerEvent +case class SparkListenerJobStart( + jobId: Int, + stageInfos: Seq[StageInfo], + properties: Properties = null) + extends SparkListenerEvent { + // Note: this is here for backwards-compatibility with older versions of this event which + // only stored stageIds and not StageInfos: + val stageIds: Seq[Int] = stageInfos.map(_.stageId) +} @DeveloperApi case class SparkListenerJobEnd(jobId: Int, jobResult: JobResult) extends SparkListenerEvent http://git-wip-us.apache.org/repos/asf/spark/blob/2d35cc08/core/src/main/scala/org/apache/spark/ui/SparkUI.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index 049938f..176907d 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -23,7 +23,7 @@ import org.apache.spark.storage.StorageStatusListener import org.apache.spark.ui.JettyUtils._ import org.apache.spark.ui.env.{EnvironmentListener, EnvironmentTab} import org.apache.spark.ui.exec.{ExecutorsListener, ExecutorsTab} -import org.apache.spark.ui.jobs.{JobProgressListener, JobProgressTab} +import org.apache.spark.ui.jobs.{JobsTab, JobProgressListener, StagesTab} import org.apache.spark.ui.storage.{StorageListener, StorageTab} /** @@ -43,17 +43,20 @@ private[spark] class SparkUI private ( extends WebUI(securityManager, SparkUI.getUIPort(conf), conf, basePath, "SparkUI") with Logging { + val killEnabled = sc.map(_.conf.getBoolean("spark.ui.killEnabled", true)).getOrElse(false) + /** Initialize all components of the server. */ def initialize() { - val jobProgressTab = new JobProgressTab(this) - attachTab(jobProgressTab) + attachTab(new JobsTab(this)) + val stagesTab = new StagesTab(this) + attachTab(stagesTab) attachTab(new StorageTab(this)) attachTab(new EnvironmentTab(this)) attachTab(new ExecutorsTab(this)) attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static")) - attachHandler(createRedirectHandler("/", "/stages", basePath = basePath)) + attachHandler(createRedirectHandler("/", "/jobs", basePath = basePath)) attachHandler( - createRedirectHandler("/stages/stage/kill", "/stages", jobProgressTab.handleKillRequest)) + createRedirectHandler("/stages/stage/kill", "/stages", stagesTab.handleKillRequest)) // If the UI is live, then serve sc.foreach { _.env.metricsSystem.getServletHandlers.foreach(attachHandler) } } http://git-wip-us.apache.org/repos/asf/spark/blob/2d35cc08/core/src/main/scala/org/apache/spark/ui/UIUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala index 7bc1e24..0c418be 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala @@ -169,7 +169,8 @@ private[spark] object UIUtils extends Logging { title: String, content: => Seq[Node], activeTab: SparkUITab, - refreshInterval: Option[Int] = None): Seq[Node] = { + refreshInterval: Option[Int] = None, + helpText: Option[String] = None): Seq[Node] = { val appName = activeTab.appName val shortAppName = if (appName.length < 36) appName else appName.take(32) + "..." @@ -178,6 +179,9 @@ private[spark] object UIUtils extends Logging { <a href={prependBaseUri(activeTab.basePath, "/" + tab.prefix + "/")}>{tab.name}</a> </li> } + val helpButton: Seq[Node] = helpText.map { helpText => + <a data-toggle="tooltip" data-placement="bottom" title={helpText}>(?)</a> + }.getOrElse(Seq.empty) <html> <head> @@ -201,6 +205,7 @@ private[spark] object UIUtils extends Logging { <div class="span12"> <h3 style="vertical-align: bottom; display: inline-block;"> {title} + {helpButton} </h3> </div> </div> @@ -283,4 +288,24 @@ private[spark] object UIUtils extends Logging { </tbody> </table> } + + def makeProgressBar( + started: Int, + completed: Int, + failed: Int, + skipped:Int, + total: Int): Seq[Node] = { + val completeWidth = "width: %s%%".format((completed.toDouble/total)*100) + val startWidth = "width: %s%%".format((started.toDouble/total)*100) + + <div class="progress"> + <span style="text-align:center; position:absolute; width:100%; left:0;"> + {completed}/{total} + { if (failed > 0) s"($failed failed)" } + { if (skipped > 0) s"($skipped skipped)" } + </span> + <div class="bar bar-completed" style={completeWidth}></div> + <div class="bar bar-running" style={startWidth}></div> + </div> + } } http://git-wip-us.apache.org/repos/asf/spark/blob/2d35cc08/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala new file mode 100644 index 0000000..ea2d187 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ui.jobs + +import scala.xml.{Node, NodeSeq} + +import javax.servlet.http.HttpServletRequest + +import org.apache.spark.JobExecutionStatus +import org.apache.spark.ui.{WebUIPage, UIUtils} +import org.apache.spark.ui.jobs.UIData.JobUIData + +/** Page showing list of all ongoing and recently finished jobs */ +private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") { + private val startTime: Option[Long] = parent.sc.map(_.startTime) + private val listener = parent.listener + + private def jobsTable(jobs: Seq[JobUIData]): Seq[Node] = { + val someJobHasJobGroup = jobs.exists(_.jobGroup.isDefined) + + val columns: Seq[Node] = { + <th>{if (someJobHasJobGroup) "Job Id (Job Group)" else "Job Id"}</th> + <th>Description</th> + <th>Submitted</th> + <th>Duration</th> + <th class="sorttable_nosort">Stages: Succeeded/Total</th> + <th class="sorttable_nosort">Tasks (for all stages): Succeeded/Total</th> + } + + def makeRow(job: JobUIData): Seq[Node] = { + val lastStageInfo = listener.stageIdToInfo.get(job.stageIds.max) + val lastStageData = lastStageInfo.flatMap { s => + listener.stageIdToData.get((s.stageId, s.attemptId)) + } + val isComplete = job.status == JobExecutionStatus.SUCCEEDED + val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)") + val lastStageDescription = lastStageData.flatMap(_.description).getOrElse("") + val duration: Option[Long] = { + job.startTime.map { start => + val end = job.endTime.getOrElse(System.currentTimeMillis()) + end - start + } + } + val formattedDuration = duration.map(d => UIUtils.formatDuration(d)).getOrElse("Unknown") + val formattedSubmissionTime = job.startTime.map(UIUtils.formatDate).getOrElse("Unknown") + val detailUrl = + "%s/jobs/job?id=%s".format(UIUtils.prependBaseUri(parent.basePath), job.jobId) + <tr> + <td sorttable_customkey={job.jobId.toString}> + {job.jobId} {job.jobGroup.map(id => s"($id)").getOrElse("")} + </td> + <td> + <div><em>{lastStageDescription}</em></div> + <a href={detailUrl}>{lastStageName}</a> + </td> + <td sorttable_customkey={job.startTime.getOrElse(-1).toString}> + {formattedSubmissionTime} + </td> + <td sorttable_customkey={duration.getOrElse(-1).toString}>{formattedDuration}</td> + <td class="stage-progress-cell"> + {job.completedStageIndices.size}/{job.stageIds.size - job.numSkippedStages} + {if (job.numFailedStages > 0) s"(${job.numFailedStages} failed)"} + {if (job.numSkippedStages > 0) s"(${job.numSkippedStages} skipped)"} + </td> + <td class="progress-cell"> + {UIUtils.makeProgressBar(started = job.numActiveTasks, completed = job.numCompletedTasks, + failed = job.numFailedTasks, skipped = job.numSkippedTasks, + total = job.numTasks - job.numSkippedTasks)} + </td> + </tr> + } + + <table class="table table-bordered table-striped table-condensed sortable"> + <thead>{columns}</thead> + <tbody> + {jobs.map(makeRow)} + </tbody> + </table> + } + + def render(request: HttpServletRequest): Seq[Node] = { + listener.synchronized { + val activeJobs = listener.activeJobs.values.toSeq + val completedJobs = listener.completedJobs.reverse.toSeq + val failedJobs = listener.failedJobs.reverse.toSeq + val now = System.currentTimeMillis + + val activeJobsTable = + jobsTable(activeJobs.sortBy(_.startTime.getOrElse(-1L)).reverse) + val completedJobsTable = + jobsTable(completedJobs.sortBy(_.endTime.getOrElse(-1L)).reverse) + val failedJobsTable = + jobsTable(failedJobs.sortBy(_.endTime.getOrElse(-1L)).reverse) + + val summary: NodeSeq = + <div> + <ul class="unstyled"> + {if (startTime.isDefined) { + // Total duration is not meaningful unless the UI is live + <li> + <strong>Total Duration: </strong> + {UIUtils.formatDuration(now - startTime.get)} + </li> + }} + <li> + <strong>Scheduling Mode: </strong> + {listener.schedulingMode.map(_.toString).getOrElse("Unknown")} + </li> + <li> + <a href="#active"><strong>Active Jobs:</strong></a> + {activeJobs.size} + </li> + <li> + <a href="#completed"><strong>Completed Jobs:</strong></a> + {completedJobs.size} + </li> + <li> + <a href="#failed"><strong>Failed Jobs:</strong></a> + {failedJobs.size} + </li> + </ul> + </div> + + val content = summary ++ + <h4 id="active">Active Jobs ({activeJobs.size})</h4> ++ activeJobsTable ++ + <h4 id="completed">Completed Jobs ({completedJobs.size})</h4> ++ completedJobsTable ++ + <h4 id ="failed">Failed Jobs ({failedJobs.size})</h4> ++ failedJobsTable + + val helpText = """A job is triggered by a action, like "count()" or "saveAsTextFile()".""" + + " Click on a job's title to see information about the stages of tasks associated with" + + " the job." + + UIUtils.headerSparkPage("Spark Jobs", content, parent, helpText = Some(helpText)) + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/2d35cc08/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala new file mode 100644 index 0000000..b0f8ca2 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ui.jobs + +import javax.servlet.http.HttpServletRequest + +import scala.xml.{Node, NodeSeq} + +import org.apache.spark.scheduler.Schedulable +import org.apache.spark.ui.{WebUIPage, UIUtils} + +/** Page showing list of all ongoing and recently finished stages and pools */ +private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") { + private val sc = parent.sc + private val listener = parent.listener + private def isFairScheduler = parent.isFairScheduler + + def render(request: HttpServletRequest): Seq[Node] = { + listener.synchronized { + val activeStages = listener.activeStages.values.toSeq + val completedStages = listener.completedStages.reverse.toSeq + val numCompletedStages = listener.numCompletedStages + val failedStages = listener.failedStages.reverse.toSeq + val numFailedStages = listener.numFailedStages + val now = System.currentTimeMillis + + val activeStagesTable = + new StageTableBase(activeStages.sortBy(_.submissionTime).reverse, + parent.basePath, parent.listener, isFairScheduler = parent.isFairScheduler, + killEnabled = parent.killEnabled) + val completedStagesTable = + new StageTableBase(completedStages.sortBy(_.submissionTime).reverse, parent.basePath, + parent.listener, isFairScheduler = parent.isFairScheduler, killEnabled = false) + val failedStagesTable = + new FailedStageTable(failedStages.sortBy(_.submissionTime).reverse, parent.basePath, + parent.listener, isFairScheduler = parent.isFairScheduler) + + // For now, pool information is only accessible in live UIs + val pools = sc.map(_.getAllPools).getOrElse(Seq.empty[Schedulable]) + val poolTable = new PoolTable(pools, parent) + + val summary: NodeSeq = + <div> + <ul class="unstyled"> + {if (sc.isDefined) { + // Total duration is not meaningful unless the UI is live + <li> + <strong>Total Duration: </strong> + {UIUtils.formatDuration(now - sc.get.startTime)} + </li> + }} + <li> + <strong>Scheduling Mode: </strong> + {listener.schedulingMode.map(_.toString).getOrElse("Unknown")} + </li> + <li> + <a href="#active"><strong>Active Stages:</strong></a> + {activeStages.size} + </li> + <li> + <a href="#completed"><strong>Completed Stages:</strong></a> + {numCompletedStages} + </li> + <li> + <a href="#failed"><strong>Failed Stages:</strong></a> + {numFailedStages} + </li> + </ul> + </div> + + val content = summary ++ + {if (sc.isDefined && isFairScheduler) { + <h4>{pools.size} Fair Scheduler Pools</h4> ++ poolTable.toNodeSeq + } else { + Seq[Node]() + }} ++ + <h4 id="active">Active Stages ({activeStages.size})</h4> ++ + activeStagesTable.toNodeSeq ++ + <h4 id="completed">Completed Stages ({numCompletedStages})</h4> ++ + completedStagesTable.toNodeSeq ++ + <h4 id ="failed">Failed Stages ({numFailedStages})</h4> ++ + failedStagesTable.toNodeSeq + + UIUtils.headerSparkPage("Spark Stages (for all jobs)", content, parent) + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/2d35cc08/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala index fa0f96b..35bbe8b 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala @@ -25,7 +25,7 @@ import org.apache.spark.ui.jobs.UIData.StageUIData import org.apache.spark.util.Utils /** Stage summary grouped by executors. */ -private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: JobProgressTab) { +private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: StagesTab) { private val listener = parent.listener def toNodeSeq: Seq[Node] = { http://git-wip-us.apache.org/repos/asf/spark/blob/2d35cc08/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala new file mode 100644 index 0000000..77d3620 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ui.jobs + +import scala.collection.mutable +import scala.xml.{NodeSeq, Node} + +import javax.servlet.http.HttpServletRequest + +import org.apache.spark.JobExecutionStatus +import org.apache.spark.scheduler.StageInfo +import org.apache.spark.ui.{UIUtils, WebUIPage} + +/** Page showing statistics and stage list for a given job */ +private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") { + private val listener = parent.listener + + def render(request: HttpServletRequest): Seq[Node] = { + listener.synchronized { + val jobId = request.getParameter("id").toInt + val jobDataOption = listener.jobIdToData.get(jobId) + if (jobDataOption.isEmpty) { + val content = + <div> + <p>No information to display for job {jobId}</p> + </div> + return UIUtils.headerSparkPage( + s"Details for Job $jobId", content, parent) + } + val jobData = jobDataOption.get + val isComplete = jobData.status != JobExecutionStatus.RUNNING + val stages = jobData.stageIds.map { stageId => + // This could be empty if the JobProgressListener hasn't received information about the + // stage or if the stage information has been garbage collected + listener.stageIdToInfo.getOrElse(stageId, + new StageInfo(stageId, 0, "Unknown", 0, Seq.empty, "Unknown")) + } + + val activeStages = mutable.Buffer[StageInfo]() + val completedStages = mutable.Buffer[StageInfo]() + // If the job is completed, then any pending stages are displayed as "skipped": + val pendingOrSkippedStages = mutable.Buffer[StageInfo]() + val failedStages = mutable.Buffer[StageInfo]() + for (stage <- stages) { + if (stage.submissionTime.isEmpty) { + pendingOrSkippedStages += stage + } else if (stage.completionTime.isDefined) { + if (stage.failureReason.isDefined) { + failedStages += stage + } else { + completedStages += stage + } + } else { + activeStages += stage + } + } + + val activeStagesTable = + new StageTableBase(activeStages.sortBy(_.submissionTime).reverse, + parent.basePath, parent.listener, isFairScheduler = parent.isFairScheduler, + killEnabled = parent.killEnabled) + val pendingOrSkippedStagesTable = + new StageTableBase(pendingOrSkippedStages.sortBy(_.stageId).reverse, + parent.basePath, parent.listener, isFairScheduler = parent.isFairScheduler, + killEnabled = false) + val completedStagesTable = + new StageTableBase(completedStages.sortBy(_.submissionTime).reverse, parent.basePath, + parent.listener, isFairScheduler = parent.isFairScheduler, killEnabled = false) + val failedStagesTable = + new FailedStageTable(failedStages.sortBy(_.submissionTime).reverse, parent.basePath, + parent.listener, isFairScheduler = parent.isFairScheduler) + + val shouldShowActiveStages = activeStages.nonEmpty + val shouldShowPendingStages = !isComplete && pendingOrSkippedStages.nonEmpty + val shouldShowCompletedStages = completedStages.nonEmpty + val shouldShowSkippedStages = isComplete && pendingOrSkippedStages.nonEmpty + val shouldShowFailedStages = failedStages.nonEmpty + + val summary: NodeSeq = + <div> + <ul class="unstyled"> + <li> + <Strong>Status:</Strong> + {jobData.status} + </li> + { + if (jobData.jobGroup.isDefined) { + <li> + <strong>Job Group:</strong> + {jobData.jobGroup.get} + </li> + } + } + { + if (shouldShowActiveStages) { + <li> + <a href="#active"><strong>Active Stages:</strong></a> + {activeStages.size} + </li> + } + } + { + if (shouldShowPendingStages) { + <li> + <a href="#pending"> + <strong>Pending Stages:</strong> + </a>{pendingOrSkippedStages.size} + </li> + } + } + { + if (shouldShowCompletedStages) { + <li> + <a href="#completed"><strong>Completed Stages:</strong></a> + {completedStages.size} + </li> + } + } + { + if (shouldShowSkippedStages) { + <li> + <a href="#skipped"><strong>Skipped Stages:</strong></a> + {pendingOrSkippedStages.size} + </li> + } + } + { + if (shouldShowFailedStages) { + <li> + <a href="#failed"><strong>Failed Stages:</strong></a> + {failedStages.size} + </li> + } + } + </ul> + </div> + + var content = summary + if (shouldShowActiveStages) { + content ++= <h4 id="active">Active Stages ({activeStages.size})</h4> ++ + activeStagesTable.toNodeSeq + } + if (shouldShowPendingStages) { + content ++= <h4 id="pending">Pending Stages ({pendingOrSkippedStages.size})</h4> ++ + pendingOrSkippedStagesTable.toNodeSeq + } + if (shouldShowCompletedStages) { + content ++= <h4 id="completed">Completed Stages ({completedStages.size})</h4> ++ + completedStagesTable.toNodeSeq + } + if (shouldShowSkippedStages) { + content ++= <h4 id="skipped">Skipped Stages ({pendingOrSkippedStages.size})</h4> ++ + pendingOrSkippedStagesTable.toNodeSeq + } + if (shouldShowFailedStages) { + content ++= <h4 id ="failed">Failed Stages ({failedStages.size})</h4> ++ + failedStagesTable.toNodeSeq + } + UIUtils.headerSparkPage(s"Details for Job $jobId", content, parent) + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/2d35cc08/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index ccdcf0e..72935be 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -17,7 +17,7 @@ package org.apache.spark.ui.jobs -import scala.collection.mutable.{HashMap, ListBuffer} +import scala.collection.mutable.{HashMap, HashSet, ListBuffer} import org.apache.spark._ import org.apache.spark.annotation.DeveloperApi @@ -49,8 +49,6 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { type PoolName = String type ExecutorId = String - // Define all of our state: - // Jobs: val activeJobs = new HashMap[JobId, JobUIData] val completedJobs = ListBuffer[JobUIData]() @@ -60,9 +58,11 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { // Stages: val activeStages = new HashMap[StageId, StageInfo] val completedStages = ListBuffer[StageInfo]() + val skippedStages = ListBuffer[StageInfo]() val failedStages = ListBuffer[StageInfo]() val stageIdToData = new HashMap[(StageId, StageAttemptId), StageUIData] val stageIdToInfo = new HashMap[StageId, StageInfo] + val stageIdToActiveJobIds = new HashMap[StageId, HashSet[JobId]] val poolToActiveStages = HashMap[PoolName, HashMap[StageId, StageInfo]]() // Total of completed and failed stages that have ever been run. These may be greater than // `completedStages.size` and `failedStages.size` if we have run more stages or jobs than @@ -95,7 +95,8 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { Map( "activeStages" -> activeStages.size, "activeJobs" -> activeJobs.size, - "poolToActiveStages" -> poolToActiveStages.values.map(_.size).sum + "poolToActiveStages" -> poolToActiveStages.values.map(_.size).sum, + "stageIdToActiveJobIds" -> stageIdToActiveJobIds.values.map(_.size).sum ) } @@ -106,6 +107,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { "completedJobs" -> completedJobs.size, "failedJobs" -> failedJobs.size, "completedStages" -> completedStages.size, + "skippedStages" -> skippedStages.size, "failedStages" -> failedStages.size ) } @@ -144,11 +146,39 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { } override def onJobStart(jobStart: SparkListenerJobStart) = synchronized { - val jobGroup = Option(jobStart.properties).map(_.getProperty(SparkContext.SPARK_JOB_GROUP_ID)) + val jobGroup = for ( + props <- Option(jobStart.properties); + group <- Option(props.getProperty(SparkContext.SPARK_JOB_GROUP_ID)) + ) yield group val jobData: JobUIData = - new JobUIData(jobStart.jobId, jobStart.stageIds, jobGroup, JobExecutionStatus.RUNNING) + new JobUIData( + jobId = jobStart.jobId, + startTime = Some(System.currentTimeMillis), + endTime = None, + stageIds = jobStart.stageIds, + jobGroup = jobGroup, + status = JobExecutionStatus.RUNNING) + // Compute (a potential underestimate of) the number of tasks that will be run by this job. + // This may be an underestimate because the job start event references all of the result + // stages's transitive stage dependencies, but some of these stages might be skipped if their + // output is available from earlier runs. + // See https://github.com/apache/spark/pull/3009 for a more extensive discussion. + jobData.numTasks = { + val allStages = jobStart.stageInfos + val missingStages = allStages.filter(_.completionTime.isEmpty) + missingStages.map(_.numTasks).sum + } jobIdToData(jobStart.jobId) = jobData activeJobs(jobStart.jobId) = jobData + for (stageId <- jobStart.stageIds) { + stageIdToActiveJobIds.getOrElseUpdate(stageId, new HashSet[StageId]).add(jobStart.jobId) + } + // If there's no information for a stage, store the StageInfo received from the scheduler + // so that we can display stage descriptions for pending stages: + for (stageInfo <- jobStart.stageInfos) { + stageIdToInfo.getOrElseUpdate(stageInfo.stageId, stageInfo) + stageIdToData.getOrElseUpdate((stageInfo.stageId, stageInfo.attemptId), new StageUIData) + } } override def onJobEnd(jobEnd: SparkListenerJobEnd) = synchronized { @@ -156,6 +186,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { logWarning(s"Job completed for unknown job ${jobEnd.jobId}") new JobUIData(jobId = jobEnd.jobId) } + jobData.endTime = Some(System.currentTimeMillis()) jobEnd.jobResult match { case JobSucceeded => completedJobs += jobData @@ -166,6 +197,20 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { trimJobsIfNecessary(failedJobs) jobData.status = JobExecutionStatus.FAILED } + for (stageId <- jobData.stageIds) { + stageIdToActiveJobIds.get(stageId).foreach { jobsUsingStage => + jobsUsingStage.remove(jobEnd.jobId) + stageIdToInfo.get(stageId).foreach { stageInfo => + if (stageInfo.submissionTime.isEmpty) { + // if this stage is pending, it won't complete, so mark it as "skipped": + skippedStages += stageInfo + trimStagesIfNecessary(skippedStages) + jobData.numSkippedStages += 1 + jobData.numSkippedTasks += stageInfo.numTasks + } + } + } + } } override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) = synchronized { @@ -193,6 +238,19 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { numFailedStages += 1 trimStagesIfNecessary(failedStages) } + + for ( + activeJobsDependentOnStage <- stageIdToActiveJobIds.get(stage.stageId); + jobId <- activeJobsDependentOnStage; + jobData <- jobIdToData.get(jobId) + ) { + jobData.numActiveStages -= 1 + if (stage.failureReason.isEmpty) { + jobData.completedStageIndices.add(stage.stageId) + } else { + jobData.numFailedStages += 1 + } + } } /** For FIFO, all stages are contained by "default" pool but "default" pool here is meaningless */ @@ -214,6 +272,14 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { val stages = poolToActiveStages.getOrElseUpdate(poolName, new HashMap[Int, StageInfo]) stages(stage.stageId) = stage + + for ( + activeJobsDependentOnStage <- stageIdToActiveJobIds.get(stage.stageId); + jobId <- activeJobsDependentOnStage; + jobData <- jobIdToData.get(jobId) + ) { + jobData.numActiveStages += 1 + } } override def onTaskStart(taskStart: SparkListenerTaskStart) = synchronized { @@ -226,6 +292,13 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { stageData.numActiveTasks += 1 stageData.taskData.put(taskInfo.taskId, new TaskUIData(taskInfo)) } + for ( + activeJobsDependentOnStage <- stageIdToActiveJobIds.get(taskStart.stageId); + jobId <- activeJobsDependentOnStage; + jobData <- jobIdToData.get(jobId) + ) { + jobData.numActiveTasks += 1 + } } override def onTaskGettingResult(taskGettingResult: SparkListenerTaskGettingResult) { @@ -283,6 +356,20 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { taskData.taskInfo = info taskData.taskMetrics = metrics taskData.errorMessage = errorMessage + + for ( + activeJobsDependentOnStage <- stageIdToActiveJobIds.get(taskEnd.stageId); + jobId <- activeJobsDependentOnStage; + jobData <- jobIdToData.get(jobId) + ) { + jobData.numActiveTasks -= 1 + taskEnd.reason match { + case Success => + jobData.numCompletedTasks += 1 + case _ => + jobData.numFailedTasks += 1 + } + } } } http://git-wip-us.apache.org/repos/asf/spark/blob/2d35cc08/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala deleted file mode 100644 index 83a7898..0000000 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.ui.jobs - -import javax.servlet.http.HttpServletRequest - -import scala.xml.{Node, NodeSeq} - -import org.apache.spark.scheduler.Schedulable -import org.apache.spark.ui.{WebUIPage, UIUtils} - -/** Page showing list of all ongoing and recently finished stages and pools */ -private[ui] class JobProgressPage(parent: JobProgressTab) extends WebUIPage("") { - private val sc = parent.sc - private val listener = parent.listener - private def isFairScheduler = parent.isFairScheduler - - def render(request: HttpServletRequest): Seq[Node] = { - listener.synchronized { - val activeStages = listener.activeStages.values.toSeq - val completedStages = listener.completedStages.reverse.toSeq - val numCompletedStages = listener.numCompletedStages - val failedStages = listener.failedStages.reverse.toSeq - val numFailedStages = listener.numFailedStages - val now = System.currentTimeMillis - - val activeStagesTable = - new StageTableBase(activeStages.sortBy(_.submissionTime).reverse, - parent, parent.killEnabled) - val completedStagesTable = - new StageTableBase(completedStages.sortBy(_.submissionTime).reverse, parent) - val failedStagesTable = - new FailedStageTable(failedStages.sortBy(_.submissionTime).reverse, parent) - - // For now, pool information is only accessible in live UIs - val pools = sc.map(_.getAllPools).getOrElse(Seq.empty[Schedulable]) - val poolTable = new PoolTable(pools, parent) - - val summary: NodeSeq = - <div> - <ul class="unstyled"> - {if (sc.isDefined) { - // Total duration is not meaningful unless the UI is live - <li> - <strong>Total Duration: </strong> - {UIUtils.formatDuration(now - sc.get.startTime)} - </li> - }} - <li> - <strong>Scheduling Mode: </strong> - {listener.schedulingMode.map(_.toString).getOrElse("Unknown")} - </li> - <li> - <a href="#active"><strong>Active Stages:</strong></a> - {activeStages.size} - </li> - <li> - <a href="#completed"><strong>Completed Stages:</strong></a> - {numCompletedStages} - </li> - <li> - <a href="#failed"><strong>Failed Stages:</strong></a> - {numFailedStages} - </li> - </ul> - </div> - - val content = summary ++ - {if (sc.isDefined && isFairScheduler) { - <h4>{pools.size} Fair Scheduler Pools</h4> ++ poolTable.toNodeSeq - } else { - Seq[Node]() - }} ++ - <h4 id="active">Active Stages ({activeStages.size})</h4> ++ - activeStagesTable.toNodeSeq ++ - <h4 id="completed">Completed Stages ({numCompletedStages})</h4> ++ - completedStagesTable.toNodeSeq ++ - <h4 id ="failed">Failed Stages ({numFailedStages})</h4> ++ - failedStagesTable.toNodeSeq - - UIUtils.headerSparkPage("Spark Stages", content, parent) - } - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/2d35cc08/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala deleted file mode 100644 index 03ca918..0000000 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.ui.jobs - -import javax.servlet.http.HttpServletRequest - -import org.apache.spark.SparkConf -import org.apache.spark.scheduler.SchedulingMode -import org.apache.spark.ui.{SparkUI, SparkUITab} - -/** Web UI showing progress status of all jobs in the given SparkContext. */ -private[ui] class JobProgressTab(parent: SparkUI) extends SparkUITab(parent, "stages") { - val sc = parent.sc - val conf = sc.map(_.conf).getOrElse(new SparkConf) - val killEnabled = sc.map(_.conf.getBoolean("spark.ui.killEnabled", true)).getOrElse(false) - val listener = parent.jobProgressListener - - attachPage(new JobProgressPage(this)) - attachPage(new StagePage(this)) - attachPage(new PoolPage(this)) - - def isFairScheduler = listener.schedulingMode.exists(_ == SchedulingMode.FAIR) - - def handleKillRequest(request: HttpServletRequest) = { - if ((killEnabled) && (parent.securityManager.checkModifyPermissions(request.getRemoteUser))) { - val killFlag = Option(request.getParameter("terminate")).getOrElse("false").toBoolean - val stageId = Option(request.getParameter("id")).getOrElse("-1").toInt - if (stageId >= 0 && killFlag && listener.activeStages.contains(stageId)) { - sc.get.cancelStage(stageId) - } - // Do a quick pause here to give Spark time to kill the stage so it shows up as - // killed after the refresh. Note that this will block the serving thread so the - // time should be limited in duration. - Thread.sleep(100) - } - } - -} http://git-wip-us.apache.org/repos/asf/spark/blob/2d35cc08/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala new file mode 100644 index 0000000..b2bbfde --- /dev/null +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ui.jobs + +import org.apache.spark.scheduler.SchedulingMode +import org.apache.spark.ui.{SparkUI, SparkUITab} + +/** Web UI showing progress status of all jobs in the given SparkContext. */ +private[ui] class JobsTab(parent: SparkUI) extends SparkUITab(parent, "jobs") { + val sc = parent.sc + val killEnabled = parent.killEnabled + def isFairScheduler = listener.schedulingMode.exists(_ == SchedulingMode.FAIR) + val listener = parent.jobProgressListener + + attachPage(new AllJobsPage(this)) + attachPage(new JobPage(this)) +} http://git-wip-us.apache.org/repos/asf/spark/blob/2d35cc08/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala index 770d99e..5fc6cc7 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala @@ -25,7 +25,7 @@ import org.apache.spark.scheduler.{Schedulable, StageInfo} import org.apache.spark.ui.{WebUIPage, UIUtils} /** Page showing specific pool details */ -private[ui] class PoolPage(parent: JobProgressTab) extends WebUIPage("pool") { +private[ui] class PoolPage(parent: StagesTab) extends WebUIPage("pool") { private val sc = parent.sc private val listener = parent.listener @@ -37,8 +37,9 @@ private[ui] class PoolPage(parent: JobProgressTab) extends WebUIPage("pool") { case Some(s) => s.values.toSeq case None => Seq[StageInfo]() } - val activeStagesTable = - new StageTableBase(activeStages.sortBy(_.submissionTime).reverse, parent) + val activeStagesTable = new StageTableBase(activeStages.sortBy(_.submissionTime).reverse, + parent.basePath, parent.listener, isFairScheduler = parent.isFairScheduler, + killEnabled = parent.killEnabled) // For now, pool information is only accessible in live UIs val pools = sc.map(_.getPoolForName(poolName).get).toSeq http://git-wip-us.apache.org/repos/asf/spark/blob/2d35cc08/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala index 64178e1..df1899e 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala @@ -24,7 +24,7 @@ import org.apache.spark.scheduler.{Schedulable, StageInfo} import org.apache.spark.ui.UIUtils /** Table showing list of pools */ -private[ui] class PoolTable(pools: Seq[Schedulable], parent: JobProgressTab) { +private[ui] class PoolTable(pools: Seq[Schedulable], parent: StagesTab) { private val listener = parent.listener def toNodeSeq: Seq[Node] = { http://git-wip-us.apache.org/repos/asf/spark/blob/2d35cc08/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 36afc49..40e05f8 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -31,7 +31,7 @@ import org.apache.spark.util.{Utils, Distribution} import org.apache.spark.scheduler.{AccumulableInfo, TaskInfo} /** Page showing statistics and task list for a given stage */ -private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { +private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { private val listener = parent.listener def render(request: HttpServletRequest): Seq[Node] = { http://git-wip-us.apache.org/repos/asf/spark/blob/2d35cc08/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala index 2ff561c..e7d6244 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala @@ -31,11 +31,10 @@ import org.apache.spark.util.Utils /** Page showing list of all ongoing and recently finished stages */ private[ui] class StageTableBase( stages: Seq[StageInfo], - parent: JobProgressTab, - killEnabled: Boolean = false) { - - private val listener = parent.listener - protected def isFairScheduler = parent.isFairScheduler + basePath: String, + listener: JobProgressListener, + isFairScheduler: Boolean, + killEnabled: Boolean) { protected def columns: Seq[Node] = { <th>Stage Id</th> ++ @@ -73,25 +72,11 @@ private[ui] class StageTableBase( </table> } - private def makeProgressBar(started: Int, completed: Int, failed: Int, total: Int): Seq[Node] = - { - val completeWidth = "width: %s%%".format((completed.toDouble/total)*100) - val startWidth = "width: %s%%".format((started.toDouble/total)*100) - - <div class="progress"> - <span style="text-align:center; position:absolute; width:100%; left:0;"> - {completed}/{total} { if (failed > 0) s"($failed failed)" else "" } - </span> - <div class="bar bar-completed" style={completeWidth}></div> - <div class="bar bar-running" style={startWidth}></div> - </div> - } - private def makeDescription(s: StageInfo): Seq[Node] = { // scalastyle:off val killLink = if (killEnabled) { val killLinkUri = "%s/stages/stage/kill?id=%s&terminate=true" - .format(UIUtils.prependBaseUri(parent.basePath), s.stageId) + .format(UIUtils.prependBaseUri(basePath), s.stageId) val confirm = "return window.confirm('Are you sure you want to kill stage %s ?');" .format(s.stageId) <span class="kill-link"> @@ -101,7 +86,7 @@ private[ui] class StageTableBase( // scalastyle:on val nameLinkUri ="%s/stages/stage?id=%s&attempt=%s" - .format(UIUtils.prependBaseUri(parent.basePath), s.stageId, s.attemptId) + .format(UIUtils.prependBaseUri(basePath), s.stageId, s.attemptId) val nameLink = <a href={nameLinkUri}>{s.name}</a> val cachedRddInfos = s.rddInfos.filter(_.numCachedPartitions > 0) @@ -115,7 +100,7 @@ private[ui] class StageTableBase( Text("RDD: ") ++ // scalastyle:off cachedRddInfos.map { i => - <a href={"%s/storage/rdd?id=%d".format(UIUtils.prependBaseUri(parent.basePath), i.id)}>{i.name}</a> + <a href={"%s/storage/rdd?id=%d".format(UIUtils.prependBaseUri(basePath), i.id)}>{i.name}</a> } // scalastyle:on }} @@ -167,7 +152,7 @@ private[ui] class StageTableBase( {if (isFairScheduler) { <td> <a href={"%s/stages/pool?poolname=%s" - .format(UIUtils.prependBaseUri(parent.basePath), stageData.schedulingPool)}> + .format(UIUtils.prependBaseUri(basePath), stageData.schedulingPool)}> {stageData.schedulingPool} </a> </td> @@ -180,8 +165,9 @@ private[ui] class StageTableBase( </td> <td sorttable_customkey={duration.getOrElse(-1).toString}>{formattedDuration}</td> <td class="progress-cell"> - {makeProgressBar(stageData.numActiveTasks, stageData.completedIndices.size, - stageData.numFailedTasks, s.numTasks)} + {UIUtils.makeProgressBar(started = stageData.numActiveTasks, + completed = stageData.completedIndices.size, failed = stageData.numFailedTasks, + skipped = 0, total = s.numTasks)} </td> <td sorttable_customkey={inputRead.toString}>{inputReadWithUnit}</td> <td sorttable_customkey={outputWrite.toString}>{outputWriteWithUnit}</td> @@ -195,9 +181,10 @@ private[ui] class StageTableBase( private[ui] class FailedStageTable( stages: Seq[StageInfo], - parent: JobProgressTab, - killEnabled: Boolean = false) - extends StageTableBase(stages, parent, killEnabled) { + basePath: String, + listener: JobProgressListener, + isFairScheduler: Boolean) + extends StageTableBase(stages, basePath, listener, isFairScheduler, killEnabled = false) { override protected def columns: Seq[Node] = super.columns ++ <th>Failure Reason</th> http://git-wip-us.apache.org/repos/asf/spark/blob/2d35cc08/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala new file mode 100644 index 0000000..937261d --- /dev/null +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ui.jobs + +import javax.servlet.http.HttpServletRequest + +import org.apache.spark.scheduler.SchedulingMode +import org.apache.spark.ui.{SparkUI, SparkUITab} + +/** Web UI showing progress status of all stages in the given SparkContext. */ +private[ui] class StagesTab(parent: SparkUI) extends SparkUITab(parent, "stages") { + val sc = parent.sc + val killEnabled = parent.killEnabled + val listener = parent.jobProgressListener + + attachPage(new AllStagesPage(this)) + attachPage(new StagePage(this)) + attachPage(new PoolPage(this)) + + def isFairScheduler = listener.schedulingMode.exists(_ == SchedulingMode.FAIR) + + def handleKillRequest(request: HttpServletRequest) = { + if ((killEnabled) && (parent.securityManager.checkModifyPermissions(request.getRemoteUser))) { + val killFlag = Option(request.getParameter("terminate")).getOrElse("false").toBoolean + val stageId = Option(request.getParameter("id")).getOrElse("-1").toInt + if (stageId >= 0 && killFlag && listener.activeStages.contains(stageId)) { + sc.get.cancelStage(stageId) + } + // Do a quick pause here to give Spark time to kill the stage so it shows up as + // killed after the refresh. Note that this will block the serving thread so the + // time should be limited in duration. + Thread.sleep(100) + } + } + +} http://git-wip-us.apache.org/repos/asf/spark/blob/2d35cc08/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala index 2f7d618..48fd7ca 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala @@ -40,9 +40,28 @@ private[jobs] object UIData { class JobUIData( var jobId: Int = -1, + var startTime: Option[Long] = None, + var endTime: Option[Long] = None, var stageIds: Seq[Int] = Seq.empty, var jobGroup: Option[String] = None, - var status: JobExecutionStatus = JobExecutionStatus.UNKNOWN + var status: JobExecutionStatus = JobExecutionStatus.UNKNOWN, + /* Tasks */ + // `numTasks` is a potential underestimate of the true number of tasks that this job will run. + // This may be an underestimate because the job start event references all of the result + // stages's transitive stage dependencies, but some of these stages might be skipped if their + // output is available from earlier runs. + // See https://github.com/apache/spark/pull/3009 for a more extensive discussion. + var numTasks: Int = 0, + var numActiveTasks: Int = 0, + var numCompletedTasks: Int = 0, + var numSkippedTasks: Int = 0, + var numFailedTasks: Int = 0, + /* Stages */ + var numActiveStages: Int = 0, + // This needs to be a set instead of a simple count to prevent double-counting of rerun stages: + var completedStageIndices: OpenHashSet[Int] = new OpenHashSet[Int](), + var numSkippedStages: Int = 0, + var numFailedStages: Int = 0 ) class StageUIData { http://git-wip-us.apache.org/repos/asf/spark/blob/2d35cc08/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 7e536ed..7b5db1e 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -31,6 +31,21 @@ import org.apache.spark.scheduler._ import org.apache.spark.storage._ import org.apache.spark._ +/** + * Serializes SparkListener events to/from JSON. This protocol provides strong backwards- + * and forwards-compatibility guarantees: any version of Spark should be able to read JSON output + * written by any other version, including newer versions. + * + * JsonProtocolSuite contains backwards-compatibility tests which check that the current version of + * JsonProtocol is able to read output written by earlier versions. We do not currently have tests + * for reading newer JSON output with older Spark versions. + * + * To ensure that we provide these guarantees, follow these rules when modifying these methods: + * + * - Never delete any JSON fields. + * - Any new JSON fields should be optional; use `Utils.jsonOption` when reading these fields + * in `*FromJson` methods. + */ private[spark] object JsonProtocol { // TODO: Remove this file and put JSON serialization into each individual class. @@ -121,6 +136,7 @@ private[spark] object JsonProtocol { val properties = propertiesToJson(jobStart.properties) ("Event" -> Utils.getFormattedClassName(jobStart)) ~ ("Job ID" -> jobStart.jobId) ~ + ("Stage Infos" -> jobStart.stageInfos.map(stageInfoToJson)) ~ // Added in Spark 1.2.0 ("Stage IDs" -> jobStart.stageIds) ~ ("Properties" -> properties) } @@ -455,7 +471,12 @@ private[spark] object JsonProtocol { val jobId = (json \ "Job ID").extract[Int] val stageIds = (json \ "Stage IDs").extract[List[JValue]].map(_.extract[Int]) val properties = propertiesFromJson(json \ "Properties") - SparkListenerJobStart(jobId, stageIds, properties) + // The "Stage Infos" field was added in Spark 1.2.0 + val stageInfos = Utils.jsonOption(json \ "Stage Infos") + .map(_.extract[Seq[JValue]].map(stageInfoFromJson)).getOrElse { + stageIds.map(id => new StageInfo(id, 0, "unknown", 0, Seq.empty, "unknown")) + } + SparkListenerJobStart(jobId, stageInfos, properties) } def jobEndFromJson(json: JValue): SparkListenerJobEnd = { http://git-wip-us.apache.org/repos/asf/spark/blob/2d35cc08/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala index bacf6a1..d2857b8 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala @@ -17,16 +17,20 @@ package org.apache.spark.ui -import org.apache.spark.api.java.StorageLevels -import org.apache.spark.{SparkException, SparkConf, SparkContext} -import org.openqa.selenium.WebDriver +import scala.collection.JavaConversions._ + +import org.openqa.selenium.{By, WebDriver} import org.openqa.selenium.htmlunit.HtmlUnitDriver import org.scalatest._ import org.scalatest.concurrent.Eventually._ import org.scalatest.selenium.WebBrowser import org.scalatest.time.SpanSugar._ +import org.apache.spark._ +import org.apache.spark.SparkContext._ import org.apache.spark.LocalSparkContext._ +import org.apache.spark.api.java.StorageLevels +import org.apache.spark.shuffle.FetchFailedException /** * Selenium tests for the Spark Web UI. These tests are not run by default @@ -89,7 +93,7 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers { sc.parallelize(1 to 10).map { x => throw new Exception()}.collect() } eventually(timeout(5 seconds), interval(50 milliseconds)) { - go to sc.ui.get.appUIAddress + go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/stages") find(id("active")).get.text should be("Active Stages (0)") find(id("failed")).get.text should be("Failed Stages (1)") } @@ -101,7 +105,7 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers { sc.parallelize(1 to 10).map { x => unserializableObject}.collect() } eventually(timeout(5 seconds), interval(50 milliseconds)) { - go to sc.ui.get.appUIAddress + go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/stages") find(id("active")).get.text should be("Active Stages (0)") // The failure occurs before the stage becomes active, hence we should still show only one // failed stage, not two: @@ -109,4 +113,191 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers { } } } + + test("spark.ui.killEnabled should properly control kill button display") { + def getSparkContext(killEnabled: Boolean): SparkContext = { + val conf = new SparkConf() + .setMaster("local") + .setAppName("test") + .set("spark.ui.enabled", "true") + .set("spark.ui.killEnabled", killEnabled.toString) + new SparkContext(conf) + } + + def hasKillLink = find(className("kill-link")).isDefined + def runSlowJob(sc: SparkContext) { + sc.parallelize(1 to 10).map{x => Thread.sleep(10000); x}.countAsync() + } + + withSpark(getSparkContext(killEnabled = true)) { sc => + runSlowJob(sc) + eventually(timeout(5 seconds), interval(50 milliseconds)) { + go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/stages") + assert(hasKillLink) + } + } + + withSpark(getSparkContext(killEnabled = false)) { sc => + runSlowJob(sc) + eventually(timeout(5 seconds), interval(50 milliseconds)) { + go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/stages") + assert(!hasKillLink) + } + } + } + + test("jobs page should not display job group name unless some job was submitted in a job group") { + withSpark(newSparkContext()) { sc => + // If no job has been run in a job group, then "(Job Group)" should not appear in the header + sc.parallelize(Seq(1, 2, 3)).count() + eventually(timeout(5 seconds), interval(50 milliseconds)) { + go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/jobs") + val tableHeaders = findAll(cssSelector("th")).map(_.text).toSeq + tableHeaders should not contain "Job Id (Job Group)" + } + // Once at least one job has been run in a job group, then we should display the group name: + sc.setJobGroup("my-job-group", "my-job-group-description") + sc.parallelize(Seq(1, 2, 3)).count() + eventually(timeout(5 seconds), interval(50 milliseconds)) { + go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/jobs") + val tableHeaders = findAll(cssSelector("th")).map(_.text).toSeq + tableHeaders should contain ("Job Id (Job Group)") + } + } + } + + test("job progress bars should handle stage / task failures") { + withSpark(newSparkContext()) { sc => + val data = sc.parallelize(Seq(1, 2, 3)).map(identity).groupBy(identity) + val shuffleHandle = + data.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleHandle + // Simulate fetch failures: + val mappedData = data.map { x => + val taskContext = TaskContext.get + if (taskContext.attemptId() == 1) { // Cause this stage to fail on its first attempt. + val env = SparkEnv.get + val bmAddress = env.blockManager.blockManagerId + val shuffleId = shuffleHandle.shuffleId + val mapId = 0 + val reduceId = taskContext.partitionId() + val message = "Simulated fetch failure" + throw new FetchFailedException(bmAddress, shuffleId, mapId, reduceId, message) + } else { + x + } + } + mappedData.count() + eventually(timeout(5 seconds), interval(50 milliseconds)) { + go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/jobs") + find(cssSelector(".stage-progress-cell")).get.text should be ("2/2 (1 failed)") + // Ideally, the following test would pass, but currently we overcount completed tasks + // if task recomputations occur: + // find(cssSelector(".progress-cell .progress")).get.text should be ("2/2 (1 failed)") + // Instead, we guarantee that the total number of tasks is always correct, while the number + // of completed tasks may be higher: + find(cssSelector(".progress-cell .progress")).get.text should be ("3/2 (1 failed)") + } + } + } + + test("job details page should display useful information for stages that haven't started") { + withSpark(newSparkContext()) { sc => + // Create a multi-stage job with a long delay in the first stage: + val rdd = sc.parallelize(Seq(1, 2, 3)).map { x => + // This long sleep call won't slow down the tests because we don't actually need to wait + // for the job to finish. + Thread.sleep(20000) + }.groupBy(identity).map(identity).groupBy(identity).map(identity) + // Start the job: + rdd.countAsync() + eventually(timeout(10 seconds), interval(50 milliseconds)) { + go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/jobs/job/?id=0") + find(id("active")).get.text should be ("Active Stages (1)") + find(id("pending")).get.text should be ("Pending Stages (2)") + // Essentially, we want to check that none of the stage rows show + // "No data available for this stage". Checking for the absence of that string is brittle + // because someone could change the error message and cause this test to pass by accident. + // Instead, it's safer to check that each row contains a link to a stage details page. + findAll(cssSelector("tbody tr")).foreach { row => + val link = row.underlying.findElement(By.xpath(".//a")) + link.getAttribute("href") should include ("stage") + } + } + } + } + + test("job progress bars / cells reflect skipped stages / tasks") { + withSpark(newSparkContext()) { sc => + // Create an RDD that involves multiple stages: + val rdd = sc.parallelize(1 to 8, 8) + .map(x => x).groupBy((x: Int) => x, numPartitions = 8) + .flatMap(x => x._2).groupBy((x: Int) => x, numPartitions = 8) + // Run it twice; this will cause the second job to have two "phantom" stages that were + // mentioned in its job start event but which were never actually executed: + rdd.count() + rdd.count() + eventually(timeout(10 seconds), interval(50 milliseconds)) { + go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/jobs") + // The completed jobs table should have two rows. The first row will be the most recent job: + val firstRow = find(cssSelector("tbody tr")).get.underlying + val firstRowColumns = firstRow.findElements(By.tagName("td")) + firstRowColumns(0).getText should be ("1") + firstRowColumns(4).getText should be ("1/1 (2 skipped)") + firstRowColumns(5).getText should be ("8/8 (16 skipped)") + // The second row is the first run of the job, where nothing was skipped: + val secondRow = findAll(cssSelector("tbody tr")).toSeq(1).underlying + val secondRowColumns = secondRow.findElements(By.tagName("td")) + secondRowColumns(0).getText should be ("0") + secondRowColumns(4).getText should be ("3/3") + secondRowColumns(5).getText should be ("24/24") + } + } + } + + test("stages that aren't run appear as 'skipped stages' after a job finishes") { + withSpark(newSparkContext()) { sc => + // Create an RDD that involves multiple stages: + val rdd = + sc.parallelize(Seq(1, 2, 3)).map(identity).groupBy(identity).map(identity).groupBy(identity) + // Run it twice; this will cause the second job to have two "phantom" stages that were + // mentioned in its job start event but which were never actually executed: + rdd.count() + rdd.count() + eventually(timeout(10 seconds), interval(50 milliseconds)) { + go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/jobs/job/?id=1") + find(id("pending")) should be (None) + find(id("active")) should be (None) + find(id("failed")) should be (None) + find(id("completed")).get.text should be ("Completed Stages (1)") + find(id("skipped")).get.text should be ("Skipped Stages (2)") + // Essentially, we want to check that none of the stage rows show + // "No data available for this stage". Checking for the absence of that string is brittle + // because someone could change the error message and cause this test to pass by accident. + // Instead, it's safer to check that each row contains a link to a stage details page. + findAll(cssSelector("tbody tr")).foreach { row => + val link = row.underlying.findElement(By.xpath(".//a")) + link.getAttribute("href") should include ("stage") + } + } + } + } + + test("jobs with stages that are skipped should show correct link descriptions on all jobs page") { + withSpark(newSparkContext()) { sc => + // Create an RDD that involves multiple stages: + val rdd = + sc.parallelize(Seq(1, 2, 3)).map(identity).groupBy(identity).map(identity).groupBy(identity) + // Run it twice; this will cause the second job to have two "phantom" stages that were + // mentioned in its job start event but which were never actually executed: + rdd.count() + rdd.count() + eventually(timeout(10 seconds), interval(50 milliseconds)) { + go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/jobs") + findAll(cssSelector("tbody tr a")).foreach { link => + link.text.toLowerCase should include ("count") + link.text.toLowerCase should not include "unknown" + } + } + } + } } http://git-wip-us.apache.org/repos/asf/spark/blob/2d35cc08/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index 15c5b4e..12af60c 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -43,7 +43,10 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc } private def createJobStartEvent(jobId: Int, stageIds: Seq[Int]) = { - SparkListenerJobStart(jobId, stageIds) + val stageInfos = stageIds.map { stageId => + new StageInfo(stageId, 0, stageId.toString, 0, null, "") + } + SparkListenerJobStart(jobId, stageInfos) } private def createJobEndEvent(jobId: Int, failed: Boolean = false) = { @@ -52,8 +55,9 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc } private def runJob(listener: SparkListener, jobId: Int, shouldFail: Boolean = false) { + val stagesThatWontBeRun = jobId * 200 to jobId * 200 + 10 val stageIds = jobId * 100 to jobId * 100 + 50 - listener.onJobStart(createJobStartEvent(jobId, stageIds)) + listener.onJobStart(createJobStartEvent(jobId, stageIds ++ stagesThatWontBeRun)) for (stageId <- stageIds) { listener.onStageSubmitted(createStageStartEvent(stageId)) listener.onStageCompleted(createStageEndEvent(stageId, failed = stageId % 2 == 0)) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org