Repository: spark
Updated Branches:
  refs/heads/branch-1.5 1000b5d7e -> 4c48593bf


[SPARK-10692] [STREAMING] Expose failureReasons in BatchInfo for streaming UI 
to clear failed batches

Slightly modified version of #8818, all credit goes to zsxwing

Author: zsxwing <zsxw...@gmail.com>
Author: Tathagata Das <tathagata.das1...@gmail.com>

Closes #8892 from tdas/SPARK-10692.

(cherry picked from commit 758c9d25e92417f8c06328c3af7ea2ef0212c79f)
Signed-off-by: Reynold Xin <r...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4c48593b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4c48593b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4c48593b

Branch: refs/heads/branch-1.5
Commit: 4c48593bf5d44218b42bc8be9573184dd95e6ff2
Parents: 1000b5d
Author: zsxwing <zsxw...@gmail.com>
Authored: Wed Sep 23 19:52:02 2015 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Wed Sep 23 19:52:10 2015 -0700

----------------------------------------------------------------------
 .../spark/streaming/scheduler/BatchInfo.scala   | 10 +++
 .../streaming/scheduler/JobScheduler.scala      | 26 +++----
 .../spark/streaming/scheduler/JobSet.scala      | 19 ++++-
 .../streaming/StreamingListenerSuite.scala      | 76 ++++++++++++++++++++
 4 files changed, 115 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4c48593b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
index 9922b6b..3c86956 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
@@ -39,6 +39,8 @@ case class BatchInfo(
     processingEndTime: Option[Long]
   ) {
 
+  private var _failureReasons: Map[Int, String] = Map.empty
+
   @deprecated("Use streamIdToInputInfo instead", "1.5.0")
   def streamIdToNumRecords: Map[Int, Long] = 
streamIdToInputInfo.mapValues(_.numRecords)
 
@@ -67,4 +69,12 @@ case class BatchInfo(
    * The number of recorders received by the receivers in this batch.
    */
   def numRecords: Long = streamIdToInputInfo.values.map(_.numRecords).sum
+
+  /** Set the failure reasons corresponding to every output ops in the batch */
+  private[streaming] def setFailureReason(reasons: Map[Int, String]): Unit = {
+    _failureReasons = reasons
+  }
+
+  /** Failure reasons corresponding to every output ops in the batch */
+  private[streaming] def failureReasons = _failureReasons
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/4c48593b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index fb51b0b..b5546db 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -166,22 +166,22 @@ class JobScheduler(val ssc: StreamingContext) extends 
Logging {
   }
 
   private def handleJobCompletion(job: Job) {
+    val jobSet = jobSets.get(job.time)
+    jobSet.handleJobCompletion(job)
+    logInfo("Finished job " + job.id + " from job set of time " + jobSet.time)
+    if (jobSet.hasCompleted) {
+      jobSets.remove(jobSet.time)
+      jobGenerator.onBatchCompletion(jobSet.time)
+      logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format(
+        jobSet.totalDelay / 1000.0, jobSet.time.toString,
+        jobSet.processingDelay / 1000.0
+      ))
+      listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo))
+    }
     job.result match {
-      case Success(_) =>
-        val jobSet = jobSets.get(job.time)
-        jobSet.handleJobCompletion(job)
-        logInfo("Finished job " + job.id + " from job set of time " + 
jobSet.time)
-        if (jobSet.hasCompleted) {
-          jobSets.remove(jobSet.time)
-          jobGenerator.onBatchCompletion(jobSet.time)
-          logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format(
-            jobSet.totalDelay / 1000.0, jobSet.time.toString,
-            jobSet.processingDelay / 1000.0
-          ))
-          listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo))
-        }
       case Failure(e) =>
         reportError("Error running job " + job, e)
+      case _ =>
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/4c48593b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
index 95833ef..255ccf0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
@@ -18,8 +18,10 @@
 package org.apache.spark.streaming.scheduler
 
 import scala.collection.mutable.HashSet
+import scala.util.Failure
 
 import org.apache.spark.streaming.Time
+import org.apache.spark.util.Utils
 
 /** Class representing a set of Jobs
   * belong to the same batch.
@@ -62,12 +64,23 @@ case class JobSet(
   }
 
   def toBatchInfo: BatchInfo = {
-    new BatchInfo(
+    val failureReasons: Map[Int, String] = {
+      if (hasCompleted) {
+        jobs.filter(_.result.isFailure).map { job =>
+          (job.outputOpId, 
Utils.exceptionString(job.result.asInstanceOf[Failure[_]].exception))
+        }.toMap
+      } else {
+        Map.empty
+      }
+    }
+    val binfo = new BatchInfo(
       time,
       streamIdToInputInfo,
       submissionTime,
-      if (processingStartTime >= 0 ) Some(processingStartTime) else None,
-      if (processingEndTime >= 0 ) Some(processingEndTime) else None
+      if (processingStartTime >= 0) Some(processingStartTime) else None,
+      if (processingEndTime >= 0) Some(processingEndTime) else None
     )
+    binfo.setFailureReason(failureReasons)
+    binfo
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/4c48593b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
index d840c34..d8fd2ce 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
@@ -140,6 +140,69 @@ class StreamingListenerSuite extends TestSuiteBase with 
Matchers {
     }
   }
 
+  test("onBatchCompleted with successful batch") {
+    ssc = new StreamingContext("local[2]", "test", Milliseconds(1000))
+    val inputStream = ssc.receiverStream(new StreamingListenerSuiteReceiver)
+    inputStream.foreachRDD(_.count)
+
+    val failureReasons = startStreamingContextAndCollectFailureReasons(ssc)
+    assert(failureReasons != null && failureReasons.isEmpty,
+      "A successful batch should not set errorMessage")
+  }
+
+  test("onBatchCompleted with failed batch and one failed job") {
+    ssc = new StreamingContext("local[2]", "test", Milliseconds(1000))
+    val inputStream = ssc.receiverStream(new StreamingListenerSuiteReceiver)
+    inputStream.foreachRDD { _ =>
+      throw new RuntimeException("This is a failed job")
+    }
+
+    // Check if failureReasons contains the correct error message
+    val failureReasons = startStreamingContextAndCollectFailureReasons(ssc, 
isFailed = true)
+    assert(failureReasons != null)
+    assert(failureReasons.size === 1)
+    assert(failureReasons.contains(0))
+    assert(failureReasons(0).contains("This is a failed job"))
+  }
+
+  test("onBatchCompleted with failed batch and multiple failed jobs") {
+    ssc = new StreamingContext("local[2]", "test", Milliseconds(1000))
+    val inputStream = ssc.receiverStream(new StreamingListenerSuiteReceiver)
+    inputStream.foreachRDD { _ =>
+      throw new RuntimeException("This is a failed job")
+    }
+    inputStream.foreachRDD { _ =>
+      throw new RuntimeException("This is another failed job")
+    }
+
+    // Check if failureReasons contains the correct error messages
+    val failureReasons =
+      startStreamingContextAndCollectFailureReasons(ssc, isFailed = true)
+    assert(failureReasons != null)
+    assert(failureReasons.size === 2)
+    assert(failureReasons.contains(0))
+    assert(failureReasons.contains(1))
+    assert(failureReasons(0).contains("This is a failed job"))
+    assert(failureReasons(1).contains("This is another failed job"))
+  }
+
+  private def startStreamingContextAndCollectFailureReasons(
+      _ssc: StreamingContext, isFailed: Boolean = false): Map[Int, String] = {
+    val failureReasonsCollector = new FailureReasonsCollector()
+    _ssc.addStreamingListener(failureReasonsCollector)
+    val batchCounter = new BatchCounter(_ssc)
+    _ssc.start()
+    // Make sure running at least one batch
+    batchCounter.waitUntilBatchesCompleted(expectedNumCompletedBatches = 1, 
timeout = 10000)
+    if (isFailed) {
+      intercept[RuntimeException] {
+        _ssc.awaitTerminationOrTimeout(10000)
+      }
+    }
+    _ssc.stop()
+    failureReasonsCollector.failureReasons
+  }
+
   /** Check if a sequence of numbers is in increasing order */
   def isInIncreasingOrder(seq: Seq[Long]): Boolean = {
     for (i <- 1 until seq.size) {
@@ -205,3 +268,16 @@ class StreamingListenerSuiteReceiver extends 
Receiver[Any](StorageLevel.MEMORY_O
   }
   def onStop() { }
 }
+
+/**
+ * A StreamingListener that saves the latest `failureReasons` in `BatchInfo` 
to the `failureReasons`
+ * field.
+ */
+class FailureReasonsCollector extends StreamingListener {
+
+  @volatile var failureReasons: Map[Int, String] = null
+
+  override def onBatchCompleted(batchCompleted: 
StreamingListenerBatchCompleted): Unit = {
+    failureReasons = batchCompleted.batchInfo.failureReasons
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to