Repository: spark
Updated Branches:
  refs/heads/branch-1.5 6847be6d1 -> 84f510c4f


[SPARK-10885] [STREAMING] Display the failed output op in Streaming UI

This PR implements the following features for both `master` and `branch-1.5`.
1. Display the failed output op count in the batch list
2. Display the failure reason of output op in the batch detail page

Screenshots:
<img width="1356" alt="1" 
src="https://cloud.githubusercontent.com/assets/1000778/10198387/5b2b97ec-67ce-11e5-81c2-f818b9d2f3ad.png";>
<img width="1356" alt="2" 
src="https://cloud.githubusercontent.com/assets/1000778/10198388/5b76ac14-67ce-11e5-8c8b-de2683c5b485.png";>

There are still two remaining problems in the UI.
1. If an output operation doesn't run any spark job, we cannot get the its 
duration since now it's the sum of all jobs' durations.
2. If an output operation doesn't run any spark job, we cannot get the 
description since it's the latest job's call site.

We need to add new `StreamingListenerEvent` about output operations to fix 
them. So I'd like to fix them only for `master` in another PR.

Author: zsxwing <zsxw...@gmail.com>

Closes #8950 from zsxwing/batch-failure.

(cherry picked from commit ffe6831e49e28eb855f857fdfa5dd99341e80c9d)
Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/84f510c4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/84f510c4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/84f510c4

Branch: refs/heads/branch-1.5
Commit: 84f510c4fa06e43bd35e2dc8e1008d0590cbe266
Parents: 6847be6
Author: zsxwing <zsxw...@gmail.com>
Authored: Tue Oct 6 16:51:03 2015 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Tue Oct 6 16:51:31 2015 -0700

----------------------------------------------------------------------
 .../spark/streaming/scheduler/BatchInfo.scala   |  10 ++
 .../spark/streaming/scheduler/JobSet.scala      |   1 +
 .../spark/streaming/ui/AllBatchesTable.scala    |  15 ++-
 .../apache/spark/streaming/ui/BatchPage.scala   | 134 ++++++++++++++++---
 .../apache/spark/streaming/ui/BatchUIData.scala |   6 +-
 .../spark/streaming/UISeleniumSuite.scala       |   4 +-
 6 files changed, 143 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/84f510c4/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
index 3c86956..463f899 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
@@ -41,6 +41,8 @@ case class BatchInfo(
 
   private var _failureReasons: Map[Int, String] = Map.empty
 
+  private var _numOutputOp: Int = 0
+
   @deprecated("Use streamIdToInputInfo instead", "1.5.0")
   def streamIdToNumRecords: Map[Int, Long] = 
streamIdToInputInfo.mapValues(_.numRecords)
 
@@ -77,4 +79,12 @@ case class BatchInfo(
 
   /** Failure reasons corresponding to every output ops in the batch */
   private[streaming] def failureReasons = _failureReasons
+
+  /** Set the number of output operations in this batch */
+  private[streaming] def setNumOutputOp(numOutputOp: Int): Unit = {
+    _numOutputOp = numOutputOp
+  }
+
+  /** Return the number of output operations in this batch */
+  private[streaming] def numOutputOp: Int = _numOutputOp
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/84f510c4/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
index 255ccf0..08f63cc 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
@@ -81,6 +81,7 @@ case class JobSet(
       if (processingEndTime >= 0) Some(processingEndTime) else None
     )
     binfo.setFailureReason(failureReasons)
+    binfo.setNumOutputOp(jobs.size)
     binfo
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/84f510c4/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
index f702bd5..3e6590d 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
@@ -107,9 +107,10 @@ private[ui] class ActiveBatchTable(
 private[ui] class CompletedBatchTable(batches: Seq[BatchUIData], 
batchInterval: Long)
   extends BatchTableBase("completed-batches-table", batchInterval) {
 
-  override protected def columns: Seq[Node] = super.columns ++
-    <th>Total Delay
-      {SparkUIUtils.tooltip("Total time taken to handle a batch", "top")}</th>
+  override protected def columns: Seq[Node] = super.columns ++ {
+    <th>Total Delay {SparkUIUtils.tooltip("Total time taken to handle a 
batch", "top")}</th>
+      <th>Output Ops: Succeeded/Total</th>
+  }
 
   override protected def renderRows: Seq[Node] = {
     batches.flatMap(batch => <tr>{completedBatchRow(batch)}</tr>)
@@ -118,9 +119,17 @@ private[ui] class CompletedBatchTable(batches: 
Seq[BatchUIData], batchInterval:
   private def completedBatchRow(batch: BatchUIData): Seq[Node] = {
     val totalDelay = batch.totalDelay
     val formattedTotalDelay = 
totalDelay.map(SparkUIUtils.formatDuration).getOrElse("-")
+    val numFailedOutputOp = batch.failureReason.size
+    val outputOpColumn = if (numFailedOutputOp > 0) {
+        s"${batch.numOutputOp - numFailedOutputOp}/${batch.numOutputOp}" +
+          s" (${numFailedOutputOp} failed)"
+      } else {
+        s"${batch.numOutputOp}/${batch.numOutputOp}"
+      }
     baseRow(batch) ++
       <td sorttable_customkey={totalDelay.getOrElse(Long.MaxValue).toString}>
         {formattedTotalDelay}
       </td>
+      <td>{outputOpColumn}</td>
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/84f510c4/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
index 9129c1f..1b717b6 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
@@ -38,6 +38,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends 
WebUIPage("batch") {
     <th>Output Op Id</th>
       <th>Description</th>
       <th>Duration</th>
+      <th>Status</th>
       <th>Job Id</th>
       <th>Duration</th>
       <th class="sorttable_nosort">Stages: Succeeded/Total</th>
@@ -49,18 +50,42 @@ private[ui] class BatchPage(parent: StreamingTab) extends 
WebUIPage("batch") {
       outputOpId: OutputOpId,
       outputOpDescription: Seq[Node],
       formattedOutputOpDuration: String,
+      outputOpStatus: String,
       numSparkJobRowsInOutputOp: Int,
       isFirstRow: Boolean,
       sparkJob: SparkJobIdWithUIData): Seq[Node] = {
     if (sparkJob.jobUIData.isDefined) {
       generateNormalJobRow(outputOpId, outputOpDescription, 
formattedOutputOpDuration,
-        numSparkJobRowsInOutputOp, isFirstRow, sparkJob.jobUIData.get)
+        outputOpStatus, numSparkJobRowsInOutputOp, isFirstRow, 
sparkJob.jobUIData.get)
     } else {
       generateDroppedJobRow(outputOpId, outputOpDescription, 
formattedOutputOpDuration,
-        numSparkJobRowsInOutputOp, isFirstRow, sparkJob.sparkJobId)
+        outputOpStatus, numSparkJobRowsInOutputOp, isFirstRow, 
sparkJob.sparkJobId)
     }
   }
 
+  private def generateOutputOpRowWithoutSparkJobs(
+    outputOpId: OutputOpId,
+    outputOpDescription: Seq[Node],
+    formattedOutputOpDuration: String,
+    outputOpStatus: String): Seq[Node] = {
+    <tr>
+      <td class="output-op-id-cell" >{outputOpId.toString}</td>
+      <td>{outputOpDescription}</td>
+      <td>{formattedOutputOpDuration}</td>
+      {outputOpStatusCell(outputOpStatus, rowspan = 1)}
+      <!-- Job Id -->
+      <td>-</td>
+      <!-- Duration -->
+      <td>-</td>
+      <!-- Stages: Succeeded/Total -->
+      <td>-</td>
+      <!-- Tasks (for all stages): Succeeded/Total -->
+      <td>-</td>
+      <!-- Error -->
+      <td>-</td>
+    </tr>
+  }
+
   /**
    * Generate a row for a Spark Job. Because duplicated output op infos needs 
to be collapsed into
    * one cell, we use "rowspan" for the first row of a output op.
@@ -69,6 +94,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends 
WebUIPage("batch") {
       outputOpId: OutputOpId,
       outputOpDescription: Seq[Node],
       formattedOutputOpDuration: String,
+      outputOpStatus: String,
       numSparkJobRowsInOutputOp: Int,
       isFirstRow: Boolean,
       sparkJob: JobUIData): Seq[Node] = {
@@ -94,7 +120,8 @@ private[ui] class BatchPage(parent: StreamingTab) extends 
WebUIPage("batch") {
         <td rowspan={numSparkJobRowsInOutputOp.toString}>
           {outputOpDescription}
         </td>
-        <td 
rowspan={numSparkJobRowsInOutputOp.toString}>{formattedOutputOpDuration}</td>
+        <td 
rowspan={numSparkJobRowsInOutputOp.toString}>{formattedOutputOpDuration}</td> ++
+        {outputOpStatusCell(outputOpStatus, numSparkJobRowsInOutputOp)}
       } else {
         Nil
       }
@@ -125,7 +152,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends 
WebUIPage("batch") {
             total = sparkJob.numTasks - sparkJob.numSkippedTasks)
         }
       </td>
-      {failureReasonCell(lastFailureReason)}
+      {failureReasonCell(lastFailureReason, rowspan = 1)}
     </tr>
   }
 
@@ -137,6 +164,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends 
WebUIPage("batch") {
       outputOpId: OutputOpId,
       outputOpDescription: Seq[Node],
       formattedOutputOpDuration: String,
+      outputOpStatus: String,
       numSparkJobRowsInOutputOp: Int,
       isFirstRow: Boolean,
       jobId: Int): Seq[Node] = {
@@ -147,7 +175,8 @@ private[ui] class BatchPage(parent: StreamingTab) extends 
WebUIPage("batch") {
       if (isFirstRow) {
         <td class="output-op-id-cell" 
rowspan={numSparkJobRowsInOutputOp.toString}>{outputOpId.toString}</td>
           <td 
rowspan={numSparkJobRowsInOutputOp.toString}>{outputOpDescription}</td>
-          <td 
rowspan={numSparkJobRowsInOutputOp.toString}>{formattedOutputOpDuration}</td>
+          <td 
rowspan={numSparkJobRowsInOutputOp.toString}>{formattedOutputOpDuration}</td> ++
+          {outputOpStatusCell(outputOpStatus, numSparkJobRowsInOutputOp)}
       } else {
         Nil
       }
@@ -156,7 +185,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends 
WebUIPage("batch") {
     <tr>
       {prefixCells}
       <td sorttable_customkey={jobId.toString}>
-        {jobId.toString}
+        {if (jobId >= 0) jobId.toString else "-"}
       </td>
       <!-- Duration -->
       <td>-</td>
@@ -170,7 +199,9 @@ private[ui] class BatchPage(parent: StreamingTab) extends 
WebUIPage("batch") {
   }
 
   private def generateOutputOpIdRow(
-      outputOpId: OutputOpId, sparkJobs: Seq[SparkJobIdWithUIData]): Seq[Node] 
= {
+      outputOpId: OutputOpId,
+      outputOpStatus: String,
+      sparkJobs: Seq[SparkJobIdWithUIData]): Seq[Node] = {
     // We don't count the durations of dropped jobs
     val sparkJobDurations = 
sparkJobs.filter(_.jobUIData.nonEmpty).map(_.jobUIData.get).
       map(sparkJob => {
@@ -189,12 +220,32 @@ private[ui] class BatchPage(parent: StreamingTab) extends 
WebUIPage("batch") {
 
     val description = generateOutputOpDescription(sparkJobs)
 
-    generateJobRow(
-      outputOpId, description, formattedOutputOpDuration, sparkJobs.size, 
true, sparkJobs.head) ++
-      sparkJobs.tail.map { sparkJob =>
+    if (sparkJobs.isEmpty) {
+      generateOutputOpRowWithoutSparkJobs(
+        outputOpId, description, formattedOutputOpDuration, outputOpStatus)
+    } else {
+      val firstRow =
         generateJobRow(
-          outputOpId, description, formattedOutputOpDuration, sparkJobs.size, 
false, sparkJob)
-      }.flatMap(x => x)
+          outputOpId,
+          description,
+          formattedOutputOpDuration,
+          outputOpStatus,
+          sparkJobs.size,
+          true,
+          sparkJobs.head)
+      val tailRows =
+        sparkJobs.tail.map { sparkJob =>
+          generateJobRow(
+            outputOpId,
+            description,
+            formattedOutputOpDuration,
+            outputOpStatus,
+            sparkJobs.size,
+            false,
+            sparkJob)
+        }
+      (firstRow ++ tailRows).flatten
+    }
   }
 
   private def generateOutputOpDescription(sparkJobs: 
Seq[SparkJobIdWithUIData]): Seq[Node] = {
@@ -228,7 +279,10 @@ private[ui] class BatchPage(parent: StreamingTab) extends 
WebUIPage("batch") {
     }
   }
 
-  private def failureReasonCell(failureReason: String): Seq[Node] = {
+  private def failureReasonCell(
+      failureReason: String,
+      rowspan: Int,
+      includeFirstLineInExpandDetails: Boolean = true): Seq[Node] = {
     val isMultiline = failureReason.indexOf('\n') >= 0
     // Display the first line by default
     val failureReasonSummary = StringEscapeUtils.escapeHtml4(
@@ -237,6 +291,13 @@ private[ui] class BatchPage(parent: StreamingTab) extends 
WebUIPage("batch") {
       } else {
         failureReason
       })
+    val failureDetails =
+      if (isMultiline && !includeFirstLineInExpandDetails) {
+        // Skip the first line
+        failureReason.substring(failureReason.indexOf('\n') + 1)
+      } else {
+        failureReason
+      }
     val details = if (isMultiline) {
       // scalastyle:off
       <span 
onclick="this.parentNode.querySelector('.stacktrace-details').classList.toggle('collapsed')"
@@ -244,13 +305,20 @@ private[ui] class BatchPage(parent: StreamingTab) extends 
WebUIPage("batch") {
         +details
       </span> ++
         <div class="stacktrace-details collapsed">
-          <pre>{failureReason}</pre>
+          <pre>{failureDetails}</pre>
         </div>
       // scalastyle:on
     } else {
       ""
     }
-    <td valign="middle" style="max-width: 
300px">{failureReasonSummary}{details}</td>
+
+    if (rowspan == 1) {
+      <td valign="middle" style="max-width: 
300px">{failureReasonSummary}{details}</td>
+    } else {
+      <td valign="middle" style="max-width: 300px" rowspan={rowspan.toString}>
+        {failureReasonSummary}{details}
+      </td>
+    }
   }
 
   private def getJobData(sparkJobId: SparkJobId): Option[JobUIData] = {
@@ -265,16 +333,31 @@ private[ui] class BatchPage(parent: StreamingTab) extends 
WebUIPage("batch") {
    * Generate the job table for the batch.
    */
   private def generateJobTable(batchUIData: BatchUIData): Seq[Node] = {
-    val outputOpIdToSparkJobIds = 
batchUIData.outputOpIdSparkJobIdPairs.groupBy(_.outputOpId).toSeq.
-      sortBy(_._1). // sorted by OutputOpId
+    val outputOpIdToSparkJobIds = 
batchUIData.outputOpIdSparkJobIdPairs.groupBy(_.outputOpId).
       map { case (outputOpId, outputOpIdAndSparkJobIds) =>
         // sort SparkJobIds for each OutputOpId
         (outputOpId, outputOpIdAndSparkJobIds.map(_.sparkJobId).sorted)
       }
+    val outputOps = (0 until batchUIData.numOutputOp).map { outputOpId =>
+      val status = batchUIData.failureReason.get(outputOpId).map { failure =>
+        if (failure.startsWith("org.apache.spark.SparkException")) {
+          "Failed due to Spark job error\n" + failure
+        } else {
+          var nextLineIndex = failure.indexOf("\n")
+          if (nextLineIndex < 0) {
+            nextLineIndex = failure.size
+          }
+          val firstLine = failure.substring(0, nextLineIndex)
+          s"Failed due to error: $firstLine\n$failure"
+        }
+      }.getOrElse("Succeeded")
+      val sparkJobIds = outputOpIdToSparkJobIds.getOrElse(outputOpId, 
Seq.empty)
+      (outputOpId, status, sparkJobIds)
+    }
     sparkListener.synchronized {
-      val outputOpIdWithJobs: Seq[(OutputOpId, Seq[SparkJobIdWithUIData])] =
-        outputOpIdToSparkJobIds.map { case (outputOpId, sparkJobIds) =>
-          (outputOpId,
+      val outputOpIdWithJobs: Seq[(OutputOpId, String, 
Seq[SparkJobIdWithUIData])] =
+        outputOps.map { case (outputOpId, status, sparkJobIds) =>
+          (outputOpId, status,
             sparkJobIds.map(sparkJobId => SparkJobIdWithUIData(sparkJobId, 
getJobData(sparkJobId))))
         }
 
@@ -285,7 +368,8 @@ private[ui] class BatchPage(parent: StreamingTab) extends 
WebUIPage("batch") {
         <tbody>
           {
             outputOpIdWithJobs.map {
-              case (outputOpId, sparkJobIds) => 
generateOutputOpIdRow(outputOpId, sparkJobIds)
+              case (outputOpId, status, sparkJobIds) =>
+                generateOutputOpIdRow(outputOpId, status, sparkJobIds)
             }
           }
         </tbody>
@@ -386,4 +470,12 @@ private[ui] class BatchPage(parent: StreamingTab) extends 
WebUIPage("batch") {
     Unparsed(StringEscapeUtils.escapeHtml4(metadataDescription).
       replaceAllLiterally("\t", 
"&nbsp;&nbsp;&nbsp;&nbsp;").replaceAllLiterally("\n", "<br/>"))
   }
+
+  private def outputOpStatusCell(status: String, rowspan: Int): Seq[Node] = {
+    if (status == "Succeeded") {
+      <td rowspan={rowspan.toString}>Succeeded</td>
+    } else {
+      failureReasonCell(status, rowspan, includeFirstLineInExpandDetails = 
false)
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/84f510c4/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala
index ae508c0..e6c2e21 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala
@@ -30,6 +30,8 @@ private[ui] case class BatchUIData(
     val submissionTime: Long,
     val processingStartTime: Option[Long],
     val processingEndTime: Option[Long],
+    val numOutputOp: Int,
+    val failureReason: Map[Int, String],
     var outputOpIdSparkJobIdPairs: Seq[OutputOpIdAndSparkJobId] = Seq.empty) {
 
   /**
@@ -69,7 +71,9 @@ private[ui] object BatchUIData {
       batchInfo.streamIdToInputInfo,
       batchInfo.submissionTime,
       batchInfo.processingStartTime,
-      batchInfo.processingEndTime
+      batchInfo.processingEndTime,
+      batchInfo.numOutputOp,
+      batchInfo.failureReasons
     )
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/84f510c4/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala 
b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
index 068a6cb..d1df788 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
@@ -121,7 +121,7 @@ class UISeleniumSuite
         }
         findAll(cssSelector("""#completed-batches-table 
th""")).map(_.text).toSeq should be {
           List("Batch Time", "Input Size", "Scheduling Delay (?)", "Processing 
Time (?)",
-            "Total Delay (?)")
+            "Total Delay (?)", "Output Ops: Succeeded/Total")
         }
 
         val batchLinks =
@@ -138,7 +138,7 @@ class UISeleniumSuite
         summaryText should contain ("Total delay:")
 
         findAll(cssSelector("""#batch-job-table th""")).map(_.text).toSeq 
should be {
-          List("Output Op Id", "Description", "Duration", "Job Id", "Duration",
+          List("Output Op Id", "Description", "Duration", "Status", "Job Id", 
"Duration",
             "Stages: Succeeded/Total", "Tasks (for all stages): 
Succeeded/Total", "Error")
         }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to