spark git commit: [SPARK-4537][Streaming] Expand StreamingSource to add more metrics

2014-12-25 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master ac8278593 -> f205fe477


[SPARK-4537][Streaming] Expand StreamingSource to add more metrics

Add `processingDelay`, `schedulingDelay` and `totalDelay` for the last 
completed batch. Add `lastReceivedBatchRecords` and `totalReceivedBatchRecords` 
to the received records counting.

Author: jerryshao 

Closes #3466 from jerryshao/SPARK-4537 and squashes the following commits:

00f5f7f [jerryshao] Change the code style and add totalProcessedRecords
44721a6 [jerryshao] Further address the comments
c097ddc [jerryshao] Address the comments
02dd44f [jerryshao] Fix the addressed comments
c7a9376 [jerryshao] Expand StreamingSource to add more metrics


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f205fe47
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f205fe47
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f205fe47

Branch: refs/heads/master
Commit: f205fe477c33a541053c198cd43a5811d6cf9fe2
Parents: ac82785
Author: jerryshao 
Authored: Thu Dec 25 19:39:49 2014 -0800
Committer: Tathagata Das 
Committed: Thu Dec 25 19:39:49 2014 -0800

--
 .../spark/streaming/StreamingSource.scala   | 53 ++--
 .../ui/StreamingJobProgressListener.scala   | 19 ++-
 2 files changed, 57 insertions(+), 15 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f205fe47/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala
index e35a568..9697437 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala
@@ -29,9 +29,17 @@ private[streaming] class StreamingSource(ssc: 
StreamingContext) extends Source {
   private val streamingListener = ssc.progressListener
 
   private def registerGauge[T](name: String, f: StreamingJobProgressListener 
=> T,
-  defaultValue: T) {
+  defaultValue: T): Unit = {
+registerGaugeWithOption[T](name,
+  (l: StreamingJobProgressListener) => Option(f(streamingListener)), 
defaultValue)
+  }
+
+  private def registerGaugeWithOption[T](
+  name: String,
+  f: StreamingJobProgressListener => Option[T],
+  defaultValue: T): Unit = {
 metricRegistry.register(MetricRegistry.name("streaming", name), new 
Gauge[T] {
-  override def getValue: T = 
Option(f(streamingListener)).getOrElse(defaultValue)
+  override def getValue: T = f(streamingListener).getOrElse(defaultValue)
 })
   }
 
@@ -41,6 +49,12 @@ private[streaming] class StreamingSource(ssc: 
StreamingContext) extends Source {
   // Gauge for number of total completed batches
   registerGauge("totalCompletedBatches", _.numTotalCompletedBatches, 0L)
 
+  // Gauge for number of total received records
+  registerGauge("totalReceivedRecords", _.numTotalReceivedRecords, 0L)
+
+  // Gauge for number of total processed records
+  registerGauge("totalProcessedRecords", _.numTotalProcessedRecords, 0L)
+
   // Gauge for number of unprocessed batches
   registerGauge("unprocessedBatches", _.numUnprocessedBatches, 0L)
 
@@ -55,19 +69,30 @@ private[streaming] class StreamingSource(ssc: 
StreamingContext) extends Source {
 
   // Gauge for last completed batch, useful for monitoring the streaming job's 
running status,
   // displayed data -1 for any abnormal condition.
-  registerGauge("lastCompletedBatch_submissionTime",
-_.lastCompletedBatch.map(_.submissionTime).getOrElse(-1L), -1L)
-  registerGauge("lastCompletedBatch_processStartTime",
-_.lastCompletedBatch.flatMap(_.processingStartTime).getOrElse(-1L), -1L)
-  registerGauge("lastCompletedBatch_processEndTime",
-_.lastCompletedBatch.flatMap(_.processingEndTime).getOrElse(-1L), -1L)
+  registerGaugeWithOption("lastCompletedBatch_submissionTime",
+_.lastCompletedBatch.map(_.submissionTime), -1L)
+  registerGaugeWithOption("lastCompletedBatch_processingStartTime",
+_.lastCompletedBatch.flatMap(_.processingStartTime), -1L)
+  registerGaugeWithOption("lastCompletedBatch_processingEndTime",
+_.lastCompletedBatch.flatMap(_.processingEndTime), -1L)
+
+  // Gauge for last completed batch's delay information.
+  registerGaugeWithOption("lastCompletedBatch_processingDelay",
+_.lastCompletedBatch.flatMap(_.processingDelay), -1L)
+  registerGaugeWithOption("lastCompletedBatch_schedulingDelay",
+_.lastCompletedBatch.flatMap(_.schedulingDelay), -1L)
+  registerGaugeWithOption("lastCompletedBatch_totalDelay",
+_.lastCompletedBatch.flatMap(_.totalDelay), -1L)
 
   // Gauge for last received 

spark git commit: [SPARK-4537][Streaming] Expand StreamingSource to add more metrics

2014-12-25 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/branch-1.2 475ab6ec7 -> acf5c6328


[SPARK-4537][Streaming] Expand StreamingSource to add more metrics

Add `processingDelay`, `schedulingDelay` and `totalDelay` for the last 
completed batch. Add `lastReceivedBatchRecords` and `totalReceivedBatchRecords` 
to the received records counting.

Author: jerryshao 

Closes #3466 from jerryshao/SPARK-4537 and squashes the following commits:

00f5f7f [jerryshao] Change the code style and add totalProcessedRecords
44721a6 [jerryshao] Further address the comments
c097ddc [jerryshao] Address the comments
02dd44f [jerryshao] Fix the addressed comments
c7a9376 [jerryshao] Expand StreamingSource to add more metrics

(cherry picked from commit f205fe477c33a541053c198cd43a5811d6cf9fe2)
Signed-off-by: Tathagata Das 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/acf5c632
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/acf5c632
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/acf5c632

Branch: refs/heads/branch-1.2
Commit: acf5c63289506c66c621175bb3da1f4633005770
Parents: 475ab6e
Author: jerryshao 
Authored: Thu Dec 25 19:39:49 2014 -0800
Committer: Tathagata Das 
Committed: Thu Dec 25 19:41:59 2014 -0800

--
 .../spark/streaming/StreamingSource.scala   | 53 ++--
 .../ui/StreamingJobProgressListener.scala   | 19 ++-
 2 files changed, 57 insertions(+), 15 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/acf5c632/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala
index e35a568..9697437 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala
@@ -29,9 +29,17 @@ private[streaming] class StreamingSource(ssc: 
StreamingContext) extends Source {
   private val streamingListener = ssc.progressListener
 
   private def registerGauge[T](name: String, f: StreamingJobProgressListener 
=> T,
-  defaultValue: T) {
+  defaultValue: T): Unit = {
+registerGaugeWithOption[T](name,
+  (l: StreamingJobProgressListener) => Option(f(streamingListener)), 
defaultValue)
+  }
+
+  private def registerGaugeWithOption[T](
+  name: String,
+  f: StreamingJobProgressListener => Option[T],
+  defaultValue: T): Unit = {
 metricRegistry.register(MetricRegistry.name("streaming", name), new 
Gauge[T] {
-  override def getValue: T = 
Option(f(streamingListener)).getOrElse(defaultValue)
+  override def getValue: T = f(streamingListener).getOrElse(defaultValue)
 })
   }
 
@@ -41,6 +49,12 @@ private[streaming] class StreamingSource(ssc: 
StreamingContext) extends Source {
   // Gauge for number of total completed batches
   registerGauge("totalCompletedBatches", _.numTotalCompletedBatches, 0L)
 
+  // Gauge for number of total received records
+  registerGauge("totalReceivedRecords", _.numTotalReceivedRecords, 0L)
+
+  // Gauge for number of total processed records
+  registerGauge("totalProcessedRecords", _.numTotalProcessedRecords, 0L)
+
   // Gauge for number of unprocessed batches
   registerGauge("unprocessedBatches", _.numUnprocessedBatches, 0L)
 
@@ -55,19 +69,30 @@ private[streaming] class StreamingSource(ssc: 
StreamingContext) extends Source {
 
   // Gauge for last completed batch, useful for monitoring the streaming job's 
running status,
   // displayed data -1 for any abnormal condition.
-  registerGauge("lastCompletedBatch_submissionTime",
-_.lastCompletedBatch.map(_.submissionTime).getOrElse(-1L), -1L)
-  registerGauge("lastCompletedBatch_processStartTime",
-_.lastCompletedBatch.flatMap(_.processingStartTime).getOrElse(-1L), -1L)
-  registerGauge("lastCompletedBatch_processEndTime",
-_.lastCompletedBatch.flatMap(_.processingEndTime).getOrElse(-1L), -1L)
+  registerGaugeWithOption("lastCompletedBatch_submissionTime",
+_.lastCompletedBatch.map(_.submissionTime), -1L)
+  registerGaugeWithOption("lastCompletedBatch_processingStartTime",
+_.lastCompletedBatch.flatMap(_.processingStartTime), -1L)
+  registerGaugeWithOption("lastCompletedBatch_processingEndTime",
+_.lastCompletedBatch.flatMap(_.processingEndTime), -1L)
+
+  // Gauge for last completed batch's delay information.
+  registerGaugeWithOption("lastCompletedBatch_processingDelay",
+_.lastCompletedBatch.flatMap(_.processingDelay), -1L)
+  registerGaugeWithOption("lastCompletedBatch_schedulingDelay",
+_.lastCompletedBatch.flatMap(_.schedulingDelay), -1L)
+  registerGaugeWithOption("lastComple

spark git commit: [SPARK-4537][Streaming] Expand StreamingSource to add more metrics

2014-12-25 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/branch-1.1 dd0287cca -> d21347dbb


[SPARK-4537][Streaming] Expand StreamingSource to add more metrics

Add `processingDelay`, `schedulingDelay` and `totalDelay` for the last 
completed batch. Add `lastReceivedBatchRecords` and `totalReceivedBatchRecords` 
to the received records counting.

Author: jerryshao 

Closes #3466 from jerryshao/SPARK-4537 and squashes the following commits:

00f5f7f [jerryshao] Change the code style and add totalProcessedRecords
44721a6 [jerryshao] Further address the comments
c097ddc [jerryshao] Address the comments
02dd44f [jerryshao] Fix the addressed comments
c7a9376 [jerryshao] Expand StreamingSource to add more metrics

(cherry picked from commit f205fe477c33a541053c198cd43a5811d6cf9fe2)
Signed-off-by: Tathagata Das 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d21347db
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d21347db
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d21347db

Branch: refs/heads/branch-1.1
Commit: d21347dbb9799287794f18933eb8abf82221552f
Parents: dd0287c
Author: jerryshao 
Authored: Thu Dec 25 19:39:49 2014 -0800
Committer: Tathagata Das 
Committed: Thu Dec 25 19:42:17 2014 -0800

--
 .../spark/streaming/StreamingSource.scala   | 53 ++--
 .../ui/StreamingJobProgressListener.scala   | 19 ++-
 2 files changed, 57 insertions(+), 15 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d21347db/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala
index e35a568..9697437 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala
@@ -29,9 +29,17 @@ private[streaming] class StreamingSource(ssc: 
StreamingContext) extends Source {
   private val streamingListener = ssc.progressListener
 
   private def registerGauge[T](name: String, f: StreamingJobProgressListener 
=> T,
-  defaultValue: T) {
+  defaultValue: T): Unit = {
+registerGaugeWithOption[T](name,
+  (l: StreamingJobProgressListener) => Option(f(streamingListener)), 
defaultValue)
+  }
+
+  private def registerGaugeWithOption[T](
+  name: String,
+  f: StreamingJobProgressListener => Option[T],
+  defaultValue: T): Unit = {
 metricRegistry.register(MetricRegistry.name("streaming", name), new 
Gauge[T] {
-  override def getValue: T = 
Option(f(streamingListener)).getOrElse(defaultValue)
+  override def getValue: T = f(streamingListener).getOrElse(defaultValue)
 })
   }
 
@@ -41,6 +49,12 @@ private[streaming] class StreamingSource(ssc: 
StreamingContext) extends Source {
   // Gauge for number of total completed batches
   registerGauge("totalCompletedBatches", _.numTotalCompletedBatches, 0L)
 
+  // Gauge for number of total received records
+  registerGauge("totalReceivedRecords", _.numTotalReceivedRecords, 0L)
+
+  // Gauge for number of total processed records
+  registerGauge("totalProcessedRecords", _.numTotalProcessedRecords, 0L)
+
   // Gauge for number of unprocessed batches
   registerGauge("unprocessedBatches", _.numUnprocessedBatches, 0L)
 
@@ -55,19 +69,30 @@ private[streaming] class StreamingSource(ssc: 
StreamingContext) extends Source {
 
   // Gauge for last completed batch, useful for monitoring the streaming job's 
running status,
   // displayed data -1 for any abnormal condition.
-  registerGauge("lastCompletedBatch_submissionTime",
-_.lastCompletedBatch.map(_.submissionTime).getOrElse(-1L), -1L)
-  registerGauge("lastCompletedBatch_processStartTime",
-_.lastCompletedBatch.flatMap(_.processingStartTime).getOrElse(-1L), -1L)
-  registerGauge("lastCompletedBatch_processEndTime",
-_.lastCompletedBatch.flatMap(_.processingEndTime).getOrElse(-1L), -1L)
+  registerGaugeWithOption("lastCompletedBatch_submissionTime",
+_.lastCompletedBatch.map(_.submissionTime), -1L)
+  registerGaugeWithOption("lastCompletedBatch_processingStartTime",
+_.lastCompletedBatch.flatMap(_.processingStartTime), -1L)
+  registerGaugeWithOption("lastCompletedBatch_processingEndTime",
+_.lastCompletedBatch.flatMap(_.processingEndTime), -1L)
+
+  // Gauge for last completed batch's delay information.
+  registerGaugeWithOption("lastCompletedBatch_processingDelay",
+_.lastCompletedBatch.flatMap(_.processingDelay), -1L)
+  registerGaugeWithOption("lastCompletedBatch_schedulingDelay",
+_.lastCompletedBatch.flatMap(_.schedulingDelay), -1L)
+  registerGaugeWithOption("lastComple