Repository: spark
Updated Branches:
  refs/heads/master ac8278593 -> f205fe477


[SPARK-4537][Streaming] Expand StreamingSource to add more metrics

Add `processingDelay`, `schedulingDelay` and `totalDelay` for the last 
completed batch. Add `lastReceivedBatchRecords` and `totalReceivedBatchRecords` 
to the received records counting.

Author: jerryshao <saisai.s...@intel.com>

Closes #3466 from jerryshao/SPARK-4537 and squashes the following commits:

00f5f7f [jerryshao] Change the code style and add totalProcessedRecords
44721a6 [jerryshao] Further address the comments
c097ddc [jerryshao] Address the comments
02dd44f [jerryshao] Fix the addressed comments
c7a9376 [jerryshao] Expand StreamingSource to add more metrics


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f205fe47
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f205fe47
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f205fe47

Branch: refs/heads/master
Commit: f205fe477c33a541053c198cd43a5811d6cf9fe2
Parents: ac82785
Author: jerryshao <saisai.s...@intel.com>
Authored: Thu Dec 25 19:39:49 2014 -0800
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Thu Dec 25 19:39:49 2014 -0800

----------------------------------------------------------------------
 .../spark/streaming/StreamingSource.scala       | 53 ++++++++++++++------
 .../ui/StreamingJobProgressListener.scala       | 19 ++++++-
 2 files changed, 57 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f205fe47/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala
index e35a568..9697437 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala
@@ -29,9 +29,17 @@ private[streaming] class StreamingSource(ssc: 
StreamingContext) extends Source {
   private val streamingListener = ssc.progressListener
 
   private def registerGauge[T](name: String, f: StreamingJobProgressListener 
=> T,
-      defaultValue: T) {
+      defaultValue: T): Unit = {
+    registerGaugeWithOption[T](name,
+      (l: StreamingJobProgressListener) => Option(f(streamingListener)), 
defaultValue)
+  }
+
+  private def registerGaugeWithOption[T](
+      name: String,
+      f: StreamingJobProgressListener => Option[T],
+      defaultValue: T): Unit = {
     metricRegistry.register(MetricRegistry.name("streaming", name), new 
Gauge[T] {
-      override def getValue: T = 
Option(f(streamingListener)).getOrElse(defaultValue)
+      override def getValue: T = f(streamingListener).getOrElse(defaultValue)
     })
   }
 
@@ -41,6 +49,12 @@ private[streaming] class StreamingSource(ssc: 
StreamingContext) extends Source {
   // Gauge for number of total completed batches
   registerGauge("totalCompletedBatches", _.numTotalCompletedBatches, 0L)
 
+  // Gauge for number of total received records
+  registerGauge("totalReceivedRecords", _.numTotalReceivedRecords, 0L)
+
+  // Gauge for number of total processed records
+  registerGauge("totalProcessedRecords", _.numTotalProcessedRecords, 0L)
+
   // Gauge for number of unprocessed batches
   registerGauge("unprocessedBatches", _.numUnprocessedBatches, 0L)
 
@@ -55,19 +69,30 @@ private[streaming] class StreamingSource(ssc: 
StreamingContext) extends Source {
 
   // Gauge for last completed batch, useful for monitoring the streaming job's 
running status,
   // displayed data -1 for any abnormal condition.
-  registerGauge("lastCompletedBatch_submissionTime",
-    _.lastCompletedBatch.map(_.submissionTime).getOrElse(-1L), -1L)
-  registerGauge("lastCompletedBatch_processStartTime",
-    _.lastCompletedBatch.flatMap(_.processingStartTime).getOrElse(-1L), -1L)
-  registerGauge("lastCompletedBatch_processEndTime",
-    _.lastCompletedBatch.flatMap(_.processingEndTime).getOrElse(-1L), -1L)
+  registerGaugeWithOption("lastCompletedBatch_submissionTime",
+    _.lastCompletedBatch.map(_.submissionTime), -1L)
+  registerGaugeWithOption("lastCompletedBatch_processingStartTime",
+    _.lastCompletedBatch.flatMap(_.processingStartTime), -1L)
+  registerGaugeWithOption("lastCompletedBatch_processingEndTime",
+    _.lastCompletedBatch.flatMap(_.processingEndTime), -1L)
+
+  // Gauge for last completed batch's delay information.
+  registerGaugeWithOption("lastCompletedBatch_processingDelay",
+    _.lastCompletedBatch.flatMap(_.processingDelay), -1L)
+  registerGaugeWithOption("lastCompletedBatch_schedulingDelay",
+    _.lastCompletedBatch.flatMap(_.schedulingDelay), -1L)
+  registerGaugeWithOption("lastCompletedBatch_totalDelay",
+    _.lastCompletedBatch.flatMap(_.totalDelay), -1L)
 
   // Gauge for last received batch, useful for monitoring the streaming job's 
running status,
   // displayed data -1 for any abnormal condition.
-  registerGauge("lastReceivedBatch_submissionTime",
-    _.lastCompletedBatch.map(_.submissionTime).getOrElse(-1L), -1L)
-  registerGauge("lastReceivedBatch_processStartTime",
-    _.lastCompletedBatch.flatMap(_.processingStartTime).getOrElse(-1L), -1L)
-  registerGauge("lastReceivedBatch_processEndTime",
-    _.lastCompletedBatch.flatMap(_.processingEndTime).getOrElse(-1L), -1L)
+  registerGaugeWithOption("lastReceivedBatch_submissionTime",
+    _.lastCompletedBatch.map(_.submissionTime), -1L)
+  registerGaugeWithOption("lastReceivedBatch_processingStartTime",
+    _.lastCompletedBatch.flatMap(_.processingStartTime), -1L)
+  registerGaugeWithOption("lastReceivedBatch_processingEndTime",
+    _.lastCompletedBatch.flatMap(_.processingEndTime), -1L)
+
+  // Gauge for last received batch records.
+  registerGauge("lastReceivedBatch_records", 
_.lastReceivedBatchRecords.values.sum, 0L)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/f205fe47/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
index f61069b..5ee53a5 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
@@ -25,7 +25,6 @@ import 
org.apache.spark.streaming.scheduler.StreamingListenerBatchStarted
 import org.apache.spark.streaming.scheduler.BatchInfo
 import org.apache.spark.streaming.scheduler.StreamingListenerBatchSubmitted
 import org.apache.spark.util.Distribution
-import org.apache.spark.Logging
 
 
 private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
@@ -36,6 +35,8 @@ private[streaming] class StreamingJobProgressListener(ssc: 
StreamingContext)
   private val completedaBatchInfos = new Queue[BatchInfo]
   private val batchInfoLimit = 
ssc.conf.getInt("spark.streaming.ui.retainedBatches", 100)
   private var totalCompletedBatches = 0L
+  private var totalReceivedRecords = 0L
+  private var totalProcessedRecords = 0L
   private val receiverInfos = new HashMap[Int, ReceiverInfo]
 
   val batchDuration = ssc.graph.batchDuration.milliseconds
@@ -65,6 +66,10 @@ private[streaming] class StreamingJobProgressListener(ssc: 
StreamingContext)
   override def onBatchStarted(batchStarted: StreamingListenerBatchStarted) = 
synchronized {
     runningBatchInfos(batchStarted.batchInfo.batchTime) = 
batchStarted.batchInfo
     waitingBatchInfos.remove(batchStarted.batchInfo.batchTime)
+
+    batchStarted.batchInfo.receivedBlockInfo.foreach { case (_, infos) =>
+      totalReceivedRecords += infos.map(_.numRecords).sum
+    }
   }
 
   override def onBatchCompleted(batchCompleted: 
StreamingListenerBatchCompleted) = synchronized {
@@ -73,6 +78,10 @@ private[streaming] class StreamingJobProgressListener(ssc: 
StreamingContext)
     completedaBatchInfos.enqueue(batchCompleted.batchInfo)
     if (completedaBatchInfos.size > batchInfoLimit) 
completedaBatchInfos.dequeue()
     totalCompletedBatches += 1L
+
+    batchCompleted.batchInfo.receivedBlockInfo.foreach { case (_, infos) =>
+      totalProcessedRecords += infos.map(_.numRecords).sum
+    }
   }
 
   def numReceivers = synchronized {
@@ -83,6 +92,14 @@ private[streaming] class StreamingJobProgressListener(ssc: 
StreamingContext)
     totalCompletedBatches
   }
 
+  def numTotalReceivedRecords: Long = synchronized {
+    totalReceivedRecords
+  }
+
+  def numTotalProcessedRecords: Long = synchronized {
+    totalProcessedRecords
+  }
+
   def numUnprocessedBatches: Long = synchronized {
     waitingBatchInfos.size + runningBatchInfos.size
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to