Repository: spark
Updated Branches:
  refs/heads/master 297813647 -> 5d0f81da4


[SPARK-4411][WEB UI] Add "kill" link for jobs in the UI

## What changes were proposed in this pull request?

Currently users can kill stages via the web ui but not jobs directly (jobs are 
killed if one of their stages is). I've added the ability to kill jobs via the 
web ui. This code change is based on #4823 by lianhuiwang and updated to work 
with the latest code matching how stages are currently killed. In general I've 
copied the kill stage code warning and note comments and all. I also updated 
applicable tests and documentation.

## How was this patch tested?

Manually tested and dev/run-tests

![screen shot 2016-10-11 at 4 49 43 
pm](https://cloud.githubusercontent.com/assets/13952758/19292857/12f1b7c0-8fd4-11e6-8982-210249f7b697.png)

Author: Alex Bozarth <ajboz...@us.ibm.com>
Author: Lianhui Wang <lianhuiwan...@gmail.com>

Closes #15441 from ajbozarth/spark4411.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5d0f81da
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5d0f81da
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5d0f81da

Branch: refs/heads/master
Commit: 5d0f81da49e86ee93ecf679a20d024ea2cb8b3d3
Parents: 2978136
Author: Alex Bozarth <ajboz...@us.ibm.com>
Authored: Wed Oct 26 14:26:54 2016 +0200
Committer: Sean Owen <so...@cloudera.com>
Committed: Wed Oct 26 14:26:54 2016 +0200

----------------------------------------------------------------------
 .../scala/org/apache/spark/ui/SparkUI.scala     | 11 ++---
 .../org/apache/spark/ui/jobs/AllJobsPage.scala  | 34 +++++++++++---
 .../org/apache/spark/ui/jobs/JobsTab.scala      | 17 +++++++
 .../org/apache/spark/ui/jobs/StageTable.scala   |  5 ++-
 .../org/apache/spark/ui/jobs/StagesTab.scala    | 17 +++----
 .../org/apache/spark/ui/UISeleniumSuite.scala   | 47 ++++++++++++++++----
 docs/configuration.md                           |  2 +-
 7 files changed, 104 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5d0f81da/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala 
b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
index ef71db8..f631a04 100644
--- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
@@ -58,14 +58,13 @@ private[spark] class SparkUI private (
 
   val killEnabled = sc.map(_.conf.getBoolean("spark.ui.killEnabled", 
true)).getOrElse(false)
 
-
-  val stagesTab = new StagesTab(this)
-
   var appId: String = _
 
   /** Initialize all components of the server. */
   def initialize() {
-    attachTab(new JobsTab(this))
+    val jobsTab = new JobsTab(this)
+    attachTab(jobsTab)
+    val stagesTab = new StagesTab(this)
     attachTab(stagesTab)
     attachTab(new StorageTab(this))
     attachTab(new EnvironmentTab(this))
@@ -73,7 +72,9 @@ private[spark] class SparkUI private (
     attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))
     attachHandler(createRedirectHandler("/", "/jobs/", basePath = basePath))
     attachHandler(ApiRootResource.getServletHandler(this))
-    // This should be POST only, but, the YARN AM proxy won't proxy POSTs
+    // These should be POST only, but, the YARN AM proxy won't proxy POSTs
+    attachHandler(createRedirectHandler(
+      "/jobs/job/kill", "/jobs/", jobsTab.handleKillRequest, httpMethods = 
Set("GET", "POST")))
     attachHandler(createRedirectHandler(
       "/stages/stage/kill", "/stages/", stagesTab.handleKillRequest,
       httpMethods = Set("GET", "POST")))

http://git-wip-us.apache.org/repos/asf/spark/blob/5d0f81da/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
index f671309..173fc3c 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
@@ -218,7 +218,8 @@ private[ui] class AllJobsPage(parent: JobsTab) extends 
WebUIPage("") {
       request: HttpServletRequest,
       tableHeaderId: String,
       jobTag: String,
-      jobs: Seq[JobUIData]): Seq[Node] = {
+      jobs: Seq[JobUIData],
+      killEnabled: Boolean): Seq[Node] = {
     val allParameters = request.getParameterMap.asScala.toMap
     val parameterOtherTable = allParameters.filterNot(_._1.startsWith(jobTag))
       .map(para => para._1 + "=" + para._2(0))
@@ -264,6 +265,7 @@ private[ui] class AllJobsPage(parent: JobsTab) extends 
WebUIPage("") {
         parameterOtherTable,
         parent.jobProgresslistener.stageIdToInfo,
         parent.jobProgresslistener.stageIdToData,
+        killEnabled,
         currentTime,
         jobIdTitle,
         pageSize = jobPageSize,
@@ -290,9 +292,12 @@ private[ui] class AllJobsPage(parent: JobsTab) extends 
WebUIPage("") {
       val completedJobs = listener.completedJobs.reverse.toSeq
       val failedJobs = listener.failedJobs.reverse.toSeq
 
-      val activeJobsTable = jobsTable(request, "active", "activeJob", 
activeJobs)
-      val completedJobsTable = jobsTable(request, "completed", "completedJob", 
completedJobs)
-      val failedJobsTable = jobsTable(request, "failed", "failedJob", 
failedJobs)
+      val activeJobsTable =
+        jobsTable(request, "active", "activeJob", activeJobs, killEnabled = 
parent.killEnabled)
+      val completedJobsTable =
+        jobsTable(request, "completed", "completedJob", completedJobs, 
killEnabled = false)
+      val failedJobsTable =
+        jobsTable(request, "failed", "failedJob", failedJobs, killEnabled = 
false)
 
       val shouldShowActiveJobs = activeJobs.nonEmpty
       val shouldShowCompletedJobs = completedJobs.nonEmpty
@@ -483,6 +488,7 @@ private[ui] class JobPagedTable(
     parameterOtherTable: Iterable[String],
     stageIdToInfo: HashMap[Int, StageInfo],
     stageIdToData: HashMap[(Int, Int), StageUIData],
+    killEnabled: Boolean,
     currentTime: Long,
     jobIdTitle: String,
     pageSize: Int,
@@ -586,12 +592,30 @@ private[ui] class JobPagedTable(
   override def row(jobTableRow: JobTableRowData): Seq[Node] = {
     val job = jobTableRow.jobData
 
+    val killLink = if (killEnabled) {
+      val confirm =
+        s"if (window.confirm('Are you sure you want to kill job ${job.jobId} 
?')) " +
+          "{ this.parentNode.submit(); return true; } else { return false; }"
+      // SPARK-6846 this should be POST-only but YARN AM won't proxy POST
+      /*
+      val killLinkUri = s"$basePathUri/jobs/job/kill/"
+      <form action={killLinkUri} method="POST" style="display:inline">
+        <input type="hidden" name="id" value={job.jobId.toString}/>
+        <a href="#" onclick={confirm} class="kill-link">(kill)</a>
+      </form>
+       */
+      val killLinkUri = s"$basePath/jobs/job/kill/?id=${job.jobId}"
+      <a href={killLinkUri} onclick={confirm} class="kill-link">(kill)</a>
+    } else {
+      Seq.empty
+    }
+
     <tr id={"job-" + job.jobId}>
       <td>
         {job.jobId} {job.jobGroup.map(id => s"($id)").getOrElse("")}
       </td>
       <td>
-        {jobTableRow.jobDescription}
+        {jobTableRow.jobDescription} {killLink}
         <a href={jobTableRow.detailUrl} 
class="name-link">{jobTableRow.lastStageName}</a>
       </td>
       <td>

http://git-wip-us.apache.org/repos/asf/spark/blob/5d0f81da/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala
index 7b00b55..620c54c 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.ui.jobs
 
+import javax.servlet.http.HttpServletRequest
+
 import org.apache.spark.scheduler.SchedulingMode
 import org.apache.spark.ui.{SparkUI, SparkUITab}
 
@@ -35,4 +37,19 @@ private[ui] class JobsTab(parent: SparkUI) extends 
SparkUITab(parent, "jobs") {
 
   attachPage(new AllJobsPage(this))
   attachPage(new JobPage(this))
+
+  def handleKillRequest(request: HttpServletRequest): Unit = {
+    if (killEnabled && 
parent.securityManager.checkModifyPermissions(request.getRemoteUser)) {
+      val jobId = Option(request.getParameter("id")).map(_.toInt)
+      jobId.foreach { id =>
+        if (jobProgresslistener.activeJobs.contains(id)) {
+          sc.foreach(_.cancelJob(id))
+          // Do a quick pause here to give Spark time to kill the job so it 
shows up as
+          // killed after the refresh. Note that this will block the serving 
thread so the
+          // time should be limited in duration.
+          Thread.sleep(100)
+        }
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/5d0f81da/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
index 9b9b468..c9d0431 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
@@ -353,12 +353,13 @@ private[ui] class StagePagedTable(
       val killLinkUri = s"$basePathUri/stages/stage/kill/"
       <form action={killLinkUri} method="POST" style="display:inline">
         <input type="hidden" name="id" value={s.stageId.toString}/>
-        <input type="hidden" name="terminate" value="true"/>
         <a href="#" onclick={confirm} class="kill-link">(kill)</a>
       </form>
        */
-      val killLinkUri = 
s"$basePathUri/stages/stage/kill/?id=${s.stageId}&terminate=true"
+      val killLinkUri = s"$basePathUri/stages/stage/kill/?id=${s.stageId}"
       <a href={killLinkUri} onclick={confirm} class="kill-link">(kill)</a>
+    } else {
+      Seq.empty
     }
 
     val nameLinkUri = 
s"$basePathUri/stages/stage?id=${s.stageId}&attempt=${s.attemptId}"

http://git-wip-us.apache.org/repos/asf/spark/blob/5d0f81da/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala
index 573192a..c1f2511 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala
@@ -39,15 +39,16 @@ private[ui] class StagesTab(parent: SparkUI) extends 
SparkUITab(parent, "stages"
 
   def handleKillRequest(request: HttpServletRequest): Unit = {
     if (killEnabled && 
parent.securityManager.checkModifyPermissions(request.getRemoteUser)) {
-      val killFlag = 
Option(request.getParameter("terminate")).getOrElse("false").toBoolean
-      val stageId = Option(request.getParameter("id")).getOrElse("-1").toInt
-      if (stageId >= 0 && killFlag && 
progressListener.activeStages.contains(stageId)) {
-        sc.get.cancelStage(stageId)
+      val stageId = Option(request.getParameter("id")).map(_.toInt)
+      stageId.foreach { id =>
+        if (progressListener.activeStages.contains(id)) {
+          sc.foreach(_.cancelStage(id))
+          // Do a quick pause here to give Spark time to kill the stage so it 
shows up as
+          // killed after the refresh. Note that this will block the serving 
thread so the
+          // time should be limited in duration.
+          Thread.sleep(100)
+        }
       }
-      // Do a quick pause here to give Spark time to kill the stage so it 
shows up as
-      // killed after the refresh. Note that this will block the serving 
thread so the
-      // time should be limited in duration.
-      Thread.sleep(100)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5d0f81da/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala 
b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
index fd12a21..e5d408a 100644
--- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
@@ -197,6 +197,22 @@ class UISeleniumSuite extends SparkFunSuite with 
WebBrowser with Matchers with B
     withSpark(newSparkContext(killEnabled = true)) { sc =>
       runSlowJob(sc)
       eventually(timeout(5 seconds), interval(50 milliseconds)) {
+        goToUi(sc, "/jobs")
+        assert(hasKillLink)
+      }
+    }
+
+    withSpark(newSparkContext(killEnabled = false)) { sc =>
+      runSlowJob(sc)
+      eventually(timeout(5 seconds), interval(50 milliseconds)) {
+        goToUi(sc, "/jobs")
+        assert(!hasKillLink)
+      }
+    }
+
+    withSpark(newSparkContext(killEnabled = true)) { sc =>
+      runSlowJob(sc)
+      eventually(timeout(5 seconds), interval(50 milliseconds)) {
         goToUi(sc, "/stages")
         assert(hasKillLink)
       }
@@ -453,20 +469,24 @@ class UISeleniumSuite extends SparkFunSuite with 
WebBrowser with Matchers with B
   }
 
   test("kill stage POST/GET response is correct") {
-    def getResponseCode(url: URL, method: String): Int = {
-      val connection = url.openConnection().asInstanceOf[HttpURLConnection]
-      connection.setRequestMethod(method)
-      connection.connect()
-      val code = connection.getResponseCode()
-      connection.disconnect()
-      code
+    withSpark(newSparkContext(killEnabled = true)) { sc =>
+      sc.parallelize(1 to 10).map{x => Thread.sleep(10000); x}.countAsync()
+      eventually(timeout(5 seconds), interval(50 milliseconds)) {
+        val url = new URL(
+          sc.ui.get.appUIAddress.stripSuffix("/") + "/stages/stage/kill/?id=0")
+        // SPARK-6846: should be POST only but YARN AM doesn't proxy POST
+        getResponseCode(url, "GET") should be (200)
+        getResponseCode(url, "POST") should be (200)
+      }
     }
+  }
 
+  test("kill job POST/GET response is correct") {
     withSpark(newSparkContext(killEnabled = true)) { sc =>
       sc.parallelize(1 to 10).map{x => Thread.sleep(10000); x}.countAsync()
       eventually(timeout(5 seconds), interval(50 milliseconds)) {
         val url = new URL(
-          sc.ui.get.appUIAddress.stripSuffix("/") + 
"/stages/stage/kill/?id=0&terminate=true")
+          sc.ui.get.appUIAddress.stripSuffix("/") + "/jobs/job/kill/?id=0")
         // SPARK-6846: should be POST only but YARN AM doesn't proxy POST
         getResponseCode(url, "GET") should be (200)
         getResponseCode(url, "POST") should be (200)
@@ -651,6 +671,17 @@ class UISeleniumSuite extends SparkFunSuite with 
WebBrowser with Matchers with B
     }
   }
 
+  def getResponseCode(url: URL, method: String): Int = {
+    val connection = url.openConnection().asInstanceOf[HttpURLConnection]
+    connection.setRequestMethod(method)
+    try {
+      connection.connect()
+      connection.getResponseCode()
+    } finally {
+      connection.disconnect()
+    }
+  }
+
   def goToUi(sc: SparkContext, path: String): Unit = {
     goToUi(sc.ui.get, path)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/5d0f81da/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index b07867d..6600cb6 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -632,7 +632,7 @@ Apart from these, the following properties are also 
available, and may be useful
   <td><code>spark.ui.killEnabled</code></td>
   <td>true</td>
   <td>
-    Allows stages and corresponding jobs to be killed from the web ui.
+    Allows jobs and stages to be killed from the web UI.
   </td>
 </tr>
 <tr>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to