Repository: spark
Updated Branches:
  refs/heads/branch-2.3 d963ba031 -> 4e75b0cb4


[SPARK-23121][CORE] Fix for ui becoming unaccessible for long running streaming 
apps

## What changes were proposed in this pull request?

The allJobs and the job pages attempt to use stage attempt and DAG 
visualization from the store, but for long running jobs they are not guaranteed 
to be retained, leading to exceptions when these pages are rendered.

To fix it `store.lastStageAttempt(stageId)` and 
`store.operationGraphForJob(jobId)` are wrapped in `store.asOption` and default 
values are used if the info is missing.

## How was this patch tested?

Manual testing of the UI, also using the test command reported in SPARK-23121:

./bin/spark-submit --class org.apache.spark.examples.streaming.HdfsWordCount 
./examples/jars/spark-examples_2.11-2.4.0-SNAPSHOT.jar /spark

Closes #20287

Author: Sandor Murakozi <smurak...@gmail.com>

Closes #20330 from smurakozi/SPARK-23121.

(cherry picked from commit 446948af1d8dbc080a26a6eec6f743d338f1d12b)
Signed-off-by: Marcelo Vanzin <van...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4e75b0cb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4e75b0cb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4e75b0cb

Branch: refs/heads/branch-2.3
Commit: 4e75b0cb4b575d4799c02455eed286fe971c6c50
Parents: d963ba0
Author: Sandor Murakozi <smurak...@gmail.com>
Authored: Mon Jan 22 10:36:28 2018 -0800
Committer: Marcelo Vanzin <van...@cloudera.com>
Committed: Mon Jan 22 10:36:39 2018 -0800

----------------------------------------------------------------------
 .../org/apache/spark/ui/jobs/AllJobsPage.scala  | 24 +++++++++++---------
 .../org/apache/spark/ui/jobs/JobPage.scala      | 10 ++++++--
 .../org/apache/spark/ui/jobs/StagePage.scala    |  9 +++++---
 3 files changed, 27 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4e75b0cb/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
index ff916bb..c2668a7 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
@@ -36,6 +36,9 @@ import org.apache.spark.util.Utils
 
 /** Page showing list of all ongoing and recently finished jobs */
 private[ui] class AllJobsPage(parent: JobsTab, store: AppStatusStore) extends 
WebUIPage("") {
+
+  import ApiHelper._
+
   private val JOBS_LEGEND =
     <div class="legend-area"><svg width="150px" height="85px">
       <rect class="succeeded-job-legend"
@@ -65,10 +68,9 @@ private[ui] class AllJobsPage(parent: JobsTab, store: 
AppStatusStore) extends We
     }.map { job =>
       val jobId = job.jobId
       val status = job.status
-      val jobDescription = store.lastStageAttempt(job.stageIds.max).description
-      val displayJobDescription = jobDescription
-        .map(UIUtils.makeDescription(_, "", plainText = true).text)
-        .getOrElse("")
+      val (_, lastStageDescription) = lastStageNameAndDescription(store, job)
+      val jobDescription = UIUtils.makeDescription(lastStageDescription, "", 
plainText = true).text
+
       val submissionTime = job.submissionTime.get.getTime()
       val completionTime = 
job.completionTime.map(_.getTime()).getOrElse(System.currentTimeMillis())
       val classNameByStatus = status match {
@@ -80,7 +82,7 @@ private[ui] class AllJobsPage(parent: JobsTab, store: 
AppStatusStore) extends We
 
       // The timeline library treats contents as HTML, so we have to escape 
them. We need to add
       // extra layers of escaping in order to embed this in a Javascript 
string literal.
-      val escapedDesc = Utility.escape(displayJobDescription)
+      val escapedDesc = Utility.escape(jobDescription)
       val jsEscapedDesc = StringEscapeUtils.escapeEcmaScript(escapedDesc)
       val jobEventJsonAsStr =
         s"""
@@ -403,6 +405,8 @@ private[ui] class JobDataSource(
     sortColumn: String,
     desc: Boolean) extends PagedDataSource[JobTableRowData](pageSize) {
 
+  import ApiHelper._
+
   // Convert JobUIData to JobTableRowData which contains the final contents to 
show in the table
   // so that we can avoid creating duplicate contents during sorting the data
   private val data = jobs.map(jobRow).sorted(ordering(sortColumn, desc))
@@ -427,23 +431,21 @@ private[ui] class JobDataSource(
     val formattedDuration = duration.map(d => 
UIUtils.formatDuration(d)).getOrElse("Unknown")
     val submissionTime = jobData.submissionTime
     val formattedSubmissionTime = 
submissionTime.map(UIUtils.formatDate).getOrElse("Unknown")
-    val lastStageAttempt = store.lastStageAttempt(jobData.stageIds.max)
-    val lastStageDescription = lastStageAttempt.description.getOrElse("")
+    val (lastStageName, lastStageDescription) = 
lastStageNameAndDescription(store, jobData)
 
-    val formattedJobDescription =
-      UIUtils.makeDescription(lastStageDescription, basePath, plainText = 
false)
+    val jobDescription = UIUtils.makeDescription(lastStageDescription, 
basePath, plainText = false)
 
     val detailUrl = "%s/jobs/job?id=%s".format(basePath, jobData.jobId)
 
     new JobTableRowData(
       jobData,
-      lastStageAttempt.name,
+      lastStageName,
       lastStageDescription,
       duration.getOrElse(-1),
       formattedDuration,
       submissionTime.map(_.getTime()).getOrElse(-1L),
       formattedSubmissionTime,
-      formattedJobDescription,
+      jobDescription,
       detailUrl
     )
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/4e75b0cb/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
index bf59152..974e5c5 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
@@ -336,8 +336,14 @@ private[ui] class JobPage(parent: JobsTab, store: 
AppStatusStore) extends WebUIP
     content ++= makeTimeline(activeStages ++ completedStages ++ failedStages,
       store.executorList(false), appStartTime)
 
-    content ++= UIUtils.showDagVizForJob(
-      jobId, store.operationGraphForJob(jobId))
+    val operationGraphContent = 
store.asOption(store.operationGraphForJob(jobId)) match {
+      case Some(operationGraph) => UIUtils.showDagVizForJob(jobId, 
operationGraph)
+      case None =>
+        <div id="no-info">
+          <p>No DAG visualization information to display for job {jobId}</p>
+        </div>
+    }
+    content ++= operationGraphContent
 
     if (shouldShowActiveStages) {
       content ++= <h4 id="active">Active Stages ({activeStages.size})</h4> ++

http://git-wip-us.apache.org/repos/asf/spark/blob/4e75b0cb/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 38f7b35..8af2537 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -23,12 +23,10 @@ import java.util.concurrent.TimeUnit
 import javax.servlet.http.HttpServletRequest
 
 import scala.collection.mutable.{HashMap, HashSet}
-import scala.xml.{Elem, Node, Unparsed}
+import scala.xml.{Node, Unparsed}
 
 import org.apache.commons.lang3.StringEscapeUtils
 
-import org.apache.spark.SparkConf
-import org.apache.spark.internal.config._
 import org.apache.spark.scheduler.TaskLocality
 import org.apache.spark.status._
 import org.apache.spark.status.api.v1._
@@ -1012,4 +1010,9 @@ private object ApiHelper {
     }
   }
 
+  def lastStageNameAndDescription(store: AppStatusStore, job: JobData): 
(String, String) = {
+    val stage = store.asOption(store.lastStageAttempt(job.stageIds.max))
+    (stage.map(_.name).getOrElse(""), 
stage.flatMap(_.description).getOrElse(job.name))
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to