Repository: spark
Updated Branches:
  refs/heads/branch-1.6 64439f7d6 -> 3bd72eafc


[SPARK-11742][STREAMING] Add the failure info to the batch lists

<img width="1365" alt="screen shot 2015-11-13 at 9 57 43 pm" 
src="https://cloud.githubusercontent.com/assets/1000778/11162322/9b88e204-8a51-11e5-8c57-a44889cab713.png";>

Author: Shixiong Zhu <shixi...@databricks.com>

Closes #9711 from zsxwing/failure-info.

(cherry picked from commit bcea0bfda66a30ee86790b048de5cb47b4d0b32f)
Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3bd72eaf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3bd72eaf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3bd72eaf

Branch: refs/heads/branch-1.6
Commit: 3bd72eafc5b7204d49fc1a3d587337fae88d8ae8
Parents: 64439f7
Author: Shixiong Zhu <shixi...@databricks.com>
Authored: Mon Nov 16 15:06:06 2015 -0800
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Mon Nov 16 15:06:20 2015 -0800

----------------------------------------------------------------------
 .../spark/streaming/ui/AllBatchesTable.scala    | 61 ++++++++++++++++++--
 .../apache/spark/streaming/ui/BatchPage.scala   | 49 ++--------------
 .../org/apache/spark/streaming/ui/UIUtils.scala | 60 +++++++++++++++++++
 3 files changed, 120 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3bd72eaf/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
index 125cafd..d339723 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
@@ -33,6 +33,22 @@ private[ui] abstract class BatchTableBase(tableId: String, 
batchInterval: Long)
         {SparkUIUtils.tooltip("Time taken to process all jobs of a batch", 
"top")}</th>
   }
 
+  /**
+   * Return the first failure reason if finding in the batches.
+   */
+  protected def getFirstFailureReason(batches: Seq[BatchUIData]): 
Option[String] = {
+    batches.flatMap(_.outputOperations.flatMap(_._2.failureReason)).headOption
+  }
+
+  protected def getFirstFailureTableCell(batch: BatchUIData): Seq[Node] = {
+    val firstFailureReason = 
batch.outputOperations.flatMap(_._2.failureReason).headOption
+    firstFailureReason.map { failureReason =>
+      val failureReasonForUI = 
UIUtils.createOutputOperationFailureForUI(failureReason)
+      UIUtils.failureReasonCell(
+        failureReasonForUI, rowspan = 1, includeFirstLineInExpandDetails = 
false)
+    }.getOrElse(<td>-</td>)
+  }
+
   protected def baseRow(batch: BatchUIData): Seq[Node] = {
     val batchTime = batch.batchTime.milliseconds
     val formattedBatchTime = UIUtils.formatBatchTime(batchTime, batchInterval)
@@ -97,9 +113,17 @@ private[ui] class ActiveBatchTable(
     waitingBatches: Seq[BatchUIData],
     batchInterval: Long) extends BatchTableBase("active-batches-table", 
batchInterval) {
 
+  private val firstFailureReason = getFirstFailureReason(runningBatches)
+
   override protected def columns: Seq[Node] = super.columns ++ {
     <th>Output Ops: Succeeded/Total</th>
-      <th>Status</th>
+      <th>Status</th> ++ {
+      if (firstFailureReason.nonEmpty) {
+        <th>Error</th>
+      } else {
+        Nil
+      }
+    }
   }
 
   override protected def renderRows: Seq[Node] = {
@@ -110,20 +134,41 @@ private[ui] class ActiveBatchTable(
   }
 
   private def runningBatchRow(batch: BatchUIData): Seq[Node] = {
-    baseRow(batch) ++ createOutputOperationProgressBar(batch) ++ 
<td>processing</td>
+    baseRow(batch) ++ createOutputOperationProgressBar(batch) ++ 
<td>processing</td> ++ {
+      if (firstFailureReason.nonEmpty) {
+        getFirstFailureTableCell(batch)
+      } else {
+        Nil
+      }
+    }
   }
 
   private def waitingBatchRow(batch: BatchUIData): Seq[Node] = {
-    baseRow(batch) ++ createOutputOperationProgressBar(batch) ++ 
<td>queued</td>
+    baseRow(batch) ++ createOutputOperationProgressBar(batch) ++ 
<td>queued</td>++ {
+      if (firstFailureReason.nonEmpty) {
+        // Waiting batches have not run yet, so must have no failure reasons.
+        <td>-</td>
+      } else {
+        Nil
+      }
+    }
   }
 }
 
 private[ui] class CompletedBatchTable(batches: Seq[BatchUIData], 
batchInterval: Long)
   extends BatchTableBase("completed-batches-table", batchInterval) {
 
+  private val firstFailureReason = getFirstFailureReason(batches)
+
   override protected def columns: Seq[Node] = super.columns ++ {
     <th>Total Delay {SparkUIUtils.tooltip("Total time taken to handle a 
batch", "top")}</th>
-      <th>Output Ops: Succeeded/Total</th>
+      <th>Output Ops: Succeeded/Total</th> ++ {
+      if (firstFailureReason.nonEmpty) {
+        <th>Error</th>
+      } else {
+        Nil
+      }
+    }
   }
 
   override protected def renderRows: Seq[Node] = {
@@ -138,6 +183,12 @@ private[ui] class CompletedBatchTable(batches: 
Seq[BatchUIData], batchInterval:
       <td sorttable_customkey={totalDelay.getOrElse(Long.MaxValue).toString}>
         {formattedTotalDelay}
       </td>
-    } ++ createOutputOperationProgressBar(batch)
+    } ++ createOutputOperationProgressBar(batch)++ {
+      if (firstFailureReason.nonEmpty) {
+        getFirstFailureTableCell(batch)
+      } else {
+        Nil
+      }
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/3bd72eaf/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
index 2ed9255..bc17119 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
@@ -149,7 +149,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends 
WebUIPage("batch") {
             total = sparkJob.numTasks - sparkJob.numSkippedTasks)
         }
       </td>
-      {failureReasonCell(lastFailureReason, rowspan = 1)}
+      {UIUtils.failureReasonCell(lastFailureReason)}
     </tr>
   }
 
@@ -245,48 +245,6 @@ private[ui] class BatchPage(parent: StreamingTab) extends 
WebUIPage("batch") {
     </div>
   }
 
-  private def failureReasonCell(
-      failureReason: String,
-      rowspan: Int,
-      includeFirstLineInExpandDetails: Boolean = true): Seq[Node] = {
-    val isMultiline = failureReason.indexOf('\n') >= 0
-    // Display the first line by default
-    val failureReasonSummary = StringEscapeUtils.escapeHtml4(
-      if (isMultiline) {
-        failureReason.substring(0, failureReason.indexOf('\n'))
-      } else {
-        failureReason
-      })
-    val failureDetails =
-      if (isMultiline && !includeFirstLineInExpandDetails) {
-        // Skip the first line
-        failureReason.substring(failureReason.indexOf('\n') + 1)
-      } else {
-        failureReason
-      }
-    val details = if (isMultiline) {
-      // scalastyle:off
-      <span 
onclick="this.parentNode.querySelector('.stacktrace-details').classList.toggle('collapsed')"
-            class="expand-details">
-        +details
-      </span> ++
-        <div class="stacktrace-details collapsed">
-          <pre>{failureDetails}</pre>
-        </div>
-      // scalastyle:on
-    } else {
-      ""
-    }
-
-    if (rowspan == 1) {
-      <td valign="middle" style="max-width: 
300px">{failureReasonSummary}{details}</td>
-    } else {
-      <td valign="middle" style="max-width: 300px" rowspan={rowspan.toString}>
-        {failureReasonSummary}{details}
-      </td>
-    }
-  }
-
   private def getJobData(sparkJobId: SparkJobId): Option[JobUIData] = {
     sparkListener.activeJobs.get(sparkJobId).orElse {
       sparkListener.completedJobs.find(_.jobId == sparkJobId).orElse {
@@ -434,8 +392,9 @@ private[ui] class BatchPage(parent: StreamingTab) extends 
WebUIPage("batch") {
   private def outputOpStatusCell(outputOp: OutputOperationUIData, rowspan: 
Int): Seq[Node] = {
     outputOp.failureReason match {
       case Some(failureReason) =>
-        val failureReasonForUI = 
generateOutputOperationStatusForUI(failureReason)
-        failureReasonCell(failureReasonForUI, rowspan, 
includeFirstLineInExpandDetails = false)
+        val failureReasonForUI = 
UIUtils.createOutputOperationFailureForUI(failureReason)
+        UIUtils.failureReasonCell(
+          failureReasonForUI, rowspan, includeFirstLineInExpandDetails = false)
       case None =>
         if (outputOp.endTime.isEmpty) {
           <td rowspan={rowspan.toString}>-</td>

http://git-wip-us.apache.org/repos/asf/spark/blob/3bd72eaf/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala
index 86cfb1f..d89f7ad 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala
@@ -17,6 +17,10 @@
 
 package org.apache.spark.streaming.ui
 
+import scala.xml.Node
+
+import org.apache.commons.lang3.StringEscapeUtils
+
 import java.text.SimpleDateFormat
 import java.util.TimeZone
 import java.util.concurrent.TimeUnit
@@ -124,4 +128,60 @@ private[streaming] object UIUtils {
       }
     }
   }
+
+  def createOutputOperationFailureForUI(failure: String): String = {
+    if (failure.startsWith("org.apache.spark.Spark")) {
+      // SparkException or SparkDriverExecutionException
+      "Failed due to Spark job error\n" + failure
+    } else {
+      var nextLineIndex = failure.indexOf("\n")
+      if (nextLineIndex < 0) {
+        nextLineIndex = failure.size
+      }
+      val firstLine = failure.substring(0, nextLineIndex)
+      s"Failed due to error: $firstLine\n$failure"
+    }
+  }
+
+  def failureReasonCell(
+      failureReason: String,
+      rowspan: Int = 1,
+      includeFirstLineInExpandDetails: Boolean = true): Seq[Node] = {
+    val isMultiline = failureReason.indexOf('\n') >= 0
+    // Display the first line by default
+    val failureReasonSummary = StringEscapeUtils.escapeHtml4(
+      if (isMultiline) {
+        failureReason.substring(0, failureReason.indexOf('\n'))
+      } else {
+        failureReason
+      })
+    val failureDetails =
+      if (isMultiline && !includeFirstLineInExpandDetails) {
+        // Skip the first line
+        failureReason.substring(failureReason.indexOf('\n') + 1)
+      } else {
+        failureReason
+      }
+    val details = if (isMultiline) {
+      // scalastyle:off
+      <span 
onclick="this.parentNode.querySelector('.stacktrace-details').classList.toggle('collapsed')"
+            class="expand-details">
+        +details
+      </span> ++
+        <div class="stacktrace-details collapsed">
+          <pre>{failureDetails}</pre>
+        </div>
+      // scalastyle:on
+    } else {
+      ""
+    }
+
+    if (rowspan == 1) {
+      <td valign="middle" style="max-width: 
300px">{failureReasonSummary}{details}</td>
+    } else {
+      <td valign="middle" style="max-width: 300px" rowspan={rowspan.toString}>
+        {failureReasonSummary}{details}
+      </td>
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to