Repository: spark Updated Branches: refs/heads/master 297813647 -> 5d0f81da4
[SPARK-4411][WEB UI] Add "kill" link for jobs in the UI ## What changes were proposed in this pull request? Currently users can kill stages via the web ui but not jobs directly (jobs are killed if one of their stages is). I've added the ability to kill jobs via the web ui. This code change is based on #4823 by lianhuiwang and updated to work with the latest code matching how stages are currently killed. In general I've copied the kill stage code warning and note comments and all. I also updated applicable tests and documentation. ## How was this patch tested? Manually tested and dev/run-tests ![screen shot 2016-10-11 at 4 49 43 pm](https://cloud.githubusercontent.com/assets/13952758/19292857/12f1b7c0-8fd4-11e6-8982-210249f7b697.png) Author: Alex Bozarth <ajboz...@us.ibm.com> Author: Lianhui Wang <lianhuiwan...@gmail.com> Closes #15441 from ajbozarth/spark4411. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5d0f81da Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5d0f81da Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5d0f81da Branch: refs/heads/master Commit: 5d0f81da49e86ee93ecf679a20d024ea2cb8b3d3 Parents: 2978136 Author: Alex Bozarth <ajboz...@us.ibm.com> Authored: Wed Oct 26 14:26:54 2016 +0200 Committer: Sean Owen <so...@cloudera.com> Committed: Wed Oct 26 14:26:54 2016 +0200 ---------------------------------------------------------------------- .../scala/org/apache/spark/ui/SparkUI.scala | 11 ++--- .../org/apache/spark/ui/jobs/AllJobsPage.scala | 34 +++++++++++--- .../org/apache/spark/ui/jobs/JobsTab.scala | 17 +++++++ .../org/apache/spark/ui/jobs/StageTable.scala | 5 ++- .../org/apache/spark/ui/jobs/StagesTab.scala | 17 +++---- .../org/apache/spark/ui/UISeleniumSuite.scala | 47 ++++++++++++++++---- docs/configuration.md | 2 +- 7 files changed, 104 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/5d0f81da/core/src/main/scala/org/apache/spark/ui/SparkUI.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index ef71db8..f631a04 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -58,14 +58,13 @@ private[spark] class SparkUI private ( val killEnabled = sc.map(_.conf.getBoolean("spark.ui.killEnabled", true)).getOrElse(false) - - val stagesTab = new StagesTab(this) - var appId: String = _ /** Initialize all components of the server. */ def initialize() { - attachTab(new JobsTab(this)) + val jobsTab = new JobsTab(this) + attachTab(jobsTab) + val stagesTab = new StagesTab(this) attachTab(stagesTab) attachTab(new StorageTab(this)) attachTab(new EnvironmentTab(this)) @@ -73,7 +72,9 @@ private[spark] class SparkUI private ( attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static")) attachHandler(createRedirectHandler("/", "/jobs/", basePath = basePath)) attachHandler(ApiRootResource.getServletHandler(this)) - // This should be POST only, but, the YARN AM proxy won't proxy POSTs + // These should be POST only, but, the YARN AM proxy won't proxy POSTs + attachHandler(createRedirectHandler( + "/jobs/job/kill", "/jobs/", jobsTab.handleKillRequest, httpMethods = Set("GET", "POST"))) attachHandler(createRedirectHandler( "/stages/stage/kill", "/stages/", stagesTab.handleKillRequest, httpMethods = Set("GET", "POST"))) http://git-wip-us.apache.org/repos/asf/spark/blob/5d0f81da/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala index f671309..173fc3c 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala @@ -218,7 +218,8 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") { request: HttpServletRequest, tableHeaderId: String, jobTag: String, - jobs: Seq[JobUIData]): Seq[Node] = { + jobs: Seq[JobUIData], + killEnabled: Boolean): Seq[Node] = { val allParameters = request.getParameterMap.asScala.toMap val parameterOtherTable = allParameters.filterNot(_._1.startsWith(jobTag)) .map(para => para._1 + "=" + para._2(0)) @@ -264,6 +265,7 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") { parameterOtherTable, parent.jobProgresslistener.stageIdToInfo, parent.jobProgresslistener.stageIdToData, + killEnabled, currentTime, jobIdTitle, pageSize = jobPageSize, @@ -290,9 +292,12 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") { val completedJobs = listener.completedJobs.reverse.toSeq val failedJobs = listener.failedJobs.reverse.toSeq - val activeJobsTable = jobsTable(request, "active", "activeJob", activeJobs) - val completedJobsTable = jobsTable(request, "completed", "completedJob", completedJobs) - val failedJobsTable = jobsTable(request, "failed", "failedJob", failedJobs) + val activeJobsTable = + jobsTable(request, "active", "activeJob", activeJobs, killEnabled = parent.killEnabled) + val completedJobsTable = + jobsTable(request, "completed", "completedJob", completedJobs, killEnabled = false) + val failedJobsTable = + jobsTable(request, "failed", "failedJob", failedJobs, killEnabled = false) val shouldShowActiveJobs = activeJobs.nonEmpty val shouldShowCompletedJobs = completedJobs.nonEmpty @@ -483,6 +488,7 @@ private[ui] class JobPagedTable( parameterOtherTable: Iterable[String], stageIdToInfo: HashMap[Int, StageInfo], stageIdToData: HashMap[(Int, Int), StageUIData], + killEnabled: Boolean, currentTime: Long, jobIdTitle: String, pageSize: Int, @@ -586,12 +592,30 @@ private[ui] class JobPagedTable( override def row(jobTableRow: JobTableRowData): Seq[Node] = { val job = jobTableRow.jobData + val killLink = if (killEnabled) { + val confirm = + s"if (window.confirm('Are you sure you want to kill job ${job.jobId} ?')) " + + "{ this.parentNode.submit(); return true; } else { return false; }" + // SPARK-6846 this should be POST-only but YARN AM won't proxy POST + /* + val killLinkUri = s"$basePathUri/jobs/job/kill/" + <form action={killLinkUri} method="POST" style="display:inline"> + <input type="hidden" name="id" value={job.jobId.toString}/> + <a href="#" onclick={confirm} class="kill-link">(kill)</a> + </form> + */ + val killLinkUri = s"$basePath/jobs/job/kill/?id=${job.jobId}" + <a href={killLinkUri} onclick={confirm} class="kill-link">(kill)</a> + } else { + Seq.empty + } + <tr id={"job-" + job.jobId}> <td> {job.jobId} {job.jobGroup.map(id => s"($id)").getOrElse("")} </td> <td> - {jobTableRow.jobDescription} + {jobTableRow.jobDescription} {killLink} <a href={jobTableRow.detailUrl} class="name-link">{jobTableRow.lastStageName}</a> </td> <td> http://git-wip-us.apache.org/repos/asf/spark/blob/5d0f81da/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala index 7b00b55..620c54c 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala @@ -17,6 +17,8 @@ package org.apache.spark.ui.jobs +import javax.servlet.http.HttpServletRequest + import org.apache.spark.scheduler.SchedulingMode import org.apache.spark.ui.{SparkUI, SparkUITab} @@ -35,4 +37,19 @@ private[ui] class JobsTab(parent: SparkUI) extends SparkUITab(parent, "jobs") { attachPage(new AllJobsPage(this)) attachPage(new JobPage(this)) + + def handleKillRequest(request: HttpServletRequest): Unit = { + if (killEnabled && parent.securityManager.checkModifyPermissions(request.getRemoteUser)) { + val jobId = Option(request.getParameter("id")).map(_.toInt) + jobId.foreach { id => + if (jobProgresslistener.activeJobs.contains(id)) { + sc.foreach(_.cancelJob(id)) + // Do a quick pause here to give Spark time to kill the job so it shows up as + // killed after the refresh. Note that this will block the serving thread so the + // time should be limited in duration. + Thread.sleep(100) + } + } + } + } } http://git-wip-us.apache.org/repos/asf/spark/blob/5d0f81da/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala index 9b9b468..c9d0431 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala @@ -353,12 +353,13 @@ private[ui] class StagePagedTable( val killLinkUri = s"$basePathUri/stages/stage/kill/" <form action={killLinkUri} method="POST" style="display:inline"> <input type="hidden" name="id" value={s.stageId.toString}/> - <input type="hidden" name="terminate" value="true"/> <a href="#" onclick={confirm} class="kill-link">(kill)</a> </form> */ - val killLinkUri = s"$basePathUri/stages/stage/kill/?id=${s.stageId}&terminate=true" + val killLinkUri = s"$basePathUri/stages/stage/kill/?id=${s.stageId}" <a href={killLinkUri} onclick={confirm} class="kill-link">(kill)</a> + } else { + Seq.empty } val nameLinkUri = s"$basePathUri/stages/stage?id=${s.stageId}&attempt=${s.attemptId}" http://git-wip-us.apache.org/repos/asf/spark/blob/5d0f81da/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala index 573192a..c1f2511 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala @@ -39,15 +39,16 @@ private[ui] class StagesTab(parent: SparkUI) extends SparkUITab(parent, "stages" def handleKillRequest(request: HttpServletRequest): Unit = { if (killEnabled && parent.securityManager.checkModifyPermissions(request.getRemoteUser)) { - val killFlag = Option(request.getParameter("terminate")).getOrElse("false").toBoolean - val stageId = Option(request.getParameter("id")).getOrElse("-1").toInt - if (stageId >= 0 && killFlag && progressListener.activeStages.contains(stageId)) { - sc.get.cancelStage(stageId) + val stageId = Option(request.getParameter("id")).map(_.toInt) + stageId.foreach { id => + if (progressListener.activeStages.contains(id)) { + sc.foreach(_.cancelStage(id)) + // Do a quick pause here to give Spark time to kill the stage so it shows up as + // killed after the refresh. Note that this will block the serving thread so the + // time should be limited in duration. + Thread.sleep(100) + } } - // Do a quick pause here to give Spark time to kill the stage so it shows up as - // killed after the refresh. Note that this will block the serving thread so the - // time should be limited in duration. - Thread.sleep(100) } } http://git-wip-us.apache.org/repos/asf/spark/blob/5d0f81da/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala index fd12a21..e5d408a 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala @@ -197,6 +197,22 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B withSpark(newSparkContext(killEnabled = true)) { sc => runSlowJob(sc) eventually(timeout(5 seconds), interval(50 milliseconds)) { + goToUi(sc, "/jobs") + assert(hasKillLink) + } + } + + withSpark(newSparkContext(killEnabled = false)) { sc => + runSlowJob(sc) + eventually(timeout(5 seconds), interval(50 milliseconds)) { + goToUi(sc, "/jobs") + assert(!hasKillLink) + } + } + + withSpark(newSparkContext(killEnabled = true)) { sc => + runSlowJob(sc) + eventually(timeout(5 seconds), interval(50 milliseconds)) { goToUi(sc, "/stages") assert(hasKillLink) } @@ -453,20 +469,24 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B } test("kill stage POST/GET response is correct") { - def getResponseCode(url: URL, method: String): Int = { - val connection = url.openConnection().asInstanceOf[HttpURLConnection] - connection.setRequestMethod(method) - connection.connect() - val code = connection.getResponseCode() - connection.disconnect() - code + withSpark(newSparkContext(killEnabled = true)) { sc => + sc.parallelize(1 to 10).map{x => Thread.sleep(10000); x}.countAsync() + eventually(timeout(5 seconds), interval(50 milliseconds)) { + val url = new URL( + sc.ui.get.appUIAddress.stripSuffix("/") + "/stages/stage/kill/?id=0") + // SPARK-6846: should be POST only but YARN AM doesn't proxy POST + getResponseCode(url, "GET") should be (200) + getResponseCode(url, "POST") should be (200) + } } + } + test("kill job POST/GET response is correct") { withSpark(newSparkContext(killEnabled = true)) { sc => sc.parallelize(1 to 10).map{x => Thread.sleep(10000); x}.countAsync() eventually(timeout(5 seconds), interval(50 milliseconds)) { val url = new URL( - sc.ui.get.appUIAddress.stripSuffix("/") + "/stages/stage/kill/?id=0&terminate=true") + sc.ui.get.appUIAddress.stripSuffix("/") + "/jobs/job/kill/?id=0") // SPARK-6846: should be POST only but YARN AM doesn't proxy POST getResponseCode(url, "GET") should be (200) getResponseCode(url, "POST") should be (200) @@ -651,6 +671,17 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B } } + def getResponseCode(url: URL, method: String): Int = { + val connection = url.openConnection().asInstanceOf[HttpURLConnection] + connection.setRequestMethod(method) + try { + connection.connect() + connection.getResponseCode() + } finally { + connection.disconnect() + } + } + def goToUi(sc: SparkContext, path: String): Unit = { goToUi(sc.ui.get, path) } http://git-wip-us.apache.org/repos/asf/spark/blob/5d0f81da/docs/configuration.md ---------------------------------------------------------------------- diff --git a/docs/configuration.md b/docs/configuration.md index b07867d..6600cb6 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -632,7 +632,7 @@ Apart from these, the following properties are also available, and may be useful <td><code>spark.ui.killEnabled</code></td> <td>true</td> <td> - Allows stages and corresponding jobs to be killed from the web ui. + Allows jobs and stages to be killed from the web UI. </td> </tr> <tr> --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org