spark git commit: [SPARK-4537][Streaming] Expand StreamingSource to add more metrics
Repository: spark Updated Branches: refs/heads/master ac8278593 -> f205fe477 [SPARK-4537][Streaming] Expand StreamingSource to add more metrics Add `processingDelay`, `schedulingDelay` and `totalDelay` for the last completed batch. Add `lastReceivedBatchRecords` and `totalReceivedBatchRecords` to the received records counting. Author: jerryshao Closes #3466 from jerryshao/SPARK-4537 and squashes the following commits: 00f5f7f [jerryshao] Change the code style and add totalProcessedRecords 44721a6 [jerryshao] Further address the comments c097ddc [jerryshao] Address the comments 02dd44f [jerryshao] Fix the addressed comments c7a9376 [jerryshao] Expand StreamingSource to add more metrics Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f205fe47 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f205fe47 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f205fe47 Branch: refs/heads/master Commit: f205fe477c33a541053c198cd43a5811d6cf9fe2 Parents: ac82785 Author: jerryshao Authored: Thu Dec 25 19:39:49 2014 -0800 Committer: Tathagata Das Committed: Thu Dec 25 19:39:49 2014 -0800 -- .../spark/streaming/StreamingSource.scala | 53 ++-- .../ui/StreamingJobProgressListener.scala | 19 ++- 2 files changed, 57 insertions(+), 15 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f205fe47/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala index e35a568..9697437 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala @@ -29,9 +29,17 @@ private[streaming] class StreamingSource(ssc: StreamingContext) extends Source { private val streamingListener = ssc.progressListener private def registerGauge[T](name: String, f: StreamingJobProgressListener => T, - defaultValue: T) { + defaultValue: T): Unit = { +registerGaugeWithOption[T](name, + (l: StreamingJobProgressListener) => Option(f(streamingListener)), defaultValue) + } + + private def registerGaugeWithOption[T]( + name: String, + f: StreamingJobProgressListener => Option[T], + defaultValue: T): Unit = { metricRegistry.register(MetricRegistry.name("streaming", name), new Gauge[T] { - override def getValue: T = Option(f(streamingListener)).getOrElse(defaultValue) + override def getValue: T = f(streamingListener).getOrElse(defaultValue) }) } @@ -41,6 +49,12 @@ private[streaming] class StreamingSource(ssc: StreamingContext) extends Source { // Gauge for number of total completed batches registerGauge("totalCompletedBatches", _.numTotalCompletedBatches, 0L) + // Gauge for number of total received records + registerGauge("totalReceivedRecords", _.numTotalReceivedRecords, 0L) + + // Gauge for number of total processed records + registerGauge("totalProcessedRecords", _.numTotalProcessedRecords, 0L) + // Gauge for number of unprocessed batches registerGauge("unprocessedBatches", _.numUnprocessedBatches, 0L) @@ -55,19 +69,30 @@ private[streaming] class StreamingSource(ssc: StreamingContext) extends Source { // Gauge for last completed batch, useful for monitoring the streaming job's running status, // displayed data -1 for any abnormal condition. - registerGauge("lastCompletedBatch_submissionTime", -_.lastCompletedBatch.map(_.submissionTime).getOrElse(-1L), -1L) - registerGauge("lastCompletedBatch_processStartTime", -_.lastCompletedBatch.flatMap(_.processingStartTime).getOrElse(-1L), -1L) - registerGauge("lastCompletedBatch_processEndTime", -_.lastCompletedBatch.flatMap(_.processingEndTime).getOrElse(-1L), -1L) + registerGaugeWithOption("lastCompletedBatch_submissionTime", +_.lastCompletedBatch.map(_.submissionTime), -1L) + registerGaugeWithOption("lastCompletedBatch_processingStartTime", +_.lastCompletedBatch.flatMap(_.processingStartTime), -1L) + registerGaugeWithOption("lastCompletedBatch_processingEndTime", +_.lastCompletedBatch.flatMap(_.processingEndTime), -1L) + + // Gauge for last completed batch's delay information. + registerGaugeWithOption("lastCompletedBatch_processingDelay", +_.lastCompletedBatch.flatMap(_.processingDelay), -1L) + registerGaugeWithOption("lastCompletedBatch_schedulingDelay", +_.lastCompletedBatch.flatMap(_.schedulingDelay), -1L) + registerGaugeWithOption("lastCompletedBatch_totalDelay", +_.lastCompletedBatch.flatMap(_.totalDelay), -1L) // Gauge for last received
spark git commit: [SPARK-4537][Streaming] Expand StreamingSource to add more metrics
Repository: spark Updated Branches: refs/heads/branch-1.2 475ab6ec7 -> acf5c6328 [SPARK-4537][Streaming] Expand StreamingSource to add more metrics Add `processingDelay`, `schedulingDelay` and `totalDelay` for the last completed batch. Add `lastReceivedBatchRecords` and `totalReceivedBatchRecords` to the received records counting. Author: jerryshao Closes #3466 from jerryshao/SPARK-4537 and squashes the following commits: 00f5f7f [jerryshao] Change the code style and add totalProcessedRecords 44721a6 [jerryshao] Further address the comments c097ddc [jerryshao] Address the comments 02dd44f [jerryshao] Fix the addressed comments c7a9376 [jerryshao] Expand StreamingSource to add more metrics (cherry picked from commit f205fe477c33a541053c198cd43a5811d6cf9fe2) Signed-off-by: Tathagata Das Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/acf5c632 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/acf5c632 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/acf5c632 Branch: refs/heads/branch-1.2 Commit: acf5c63289506c66c621175bb3da1f4633005770 Parents: 475ab6e Author: jerryshao Authored: Thu Dec 25 19:39:49 2014 -0800 Committer: Tathagata Das Committed: Thu Dec 25 19:41:59 2014 -0800 -- .../spark/streaming/StreamingSource.scala | 53 ++-- .../ui/StreamingJobProgressListener.scala | 19 ++- 2 files changed, 57 insertions(+), 15 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/acf5c632/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala index e35a568..9697437 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala @@ -29,9 +29,17 @@ private[streaming] class StreamingSource(ssc: StreamingContext) extends Source { private val streamingListener = ssc.progressListener private def registerGauge[T](name: String, f: StreamingJobProgressListener => T, - defaultValue: T) { + defaultValue: T): Unit = { +registerGaugeWithOption[T](name, + (l: StreamingJobProgressListener) => Option(f(streamingListener)), defaultValue) + } + + private def registerGaugeWithOption[T]( + name: String, + f: StreamingJobProgressListener => Option[T], + defaultValue: T): Unit = { metricRegistry.register(MetricRegistry.name("streaming", name), new Gauge[T] { - override def getValue: T = Option(f(streamingListener)).getOrElse(defaultValue) + override def getValue: T = f(streamingListener).getOrElse(defaultValue) }) } @@ -41,6 +49,12 @@ private[streaming] class StreamingSource(ssc: StreamingContext) extends Source { // Gauge for number of total completed batches registerGauge("totalCompletedBatches", _.numTotalCompletedBatches, 0L) + // Gauge for number of total received records + registerGauge("totalReceivedRecords", _.numTotalReceivedRecords, 0L) + + // Gauge for number of total processed records + registerGauge("totalProcessedRecords", _.numTotalProcessedRecords, 0L) + // Gauge for number of unprocessed batches registerGauge("unprocessedBatches", _.numUnprocessedBatches, 0L) @@ -55,19 +69,30 @@ private[streaming] class StreamingSource(ssc: StreamingContext) extends Source { // Gauge for last completed batch, useful for monitoring the streaming job's running status, // displayed data -1 for any abnormal condition. - registerGauge("lastCompletedBatch_submissionTime", -_.lastCompletedBatch.map(_.submissionTime).getOrElse(-1L), -1L) - registerGauge("lastCompletedBatch_processStartTime", -_.lastCompletedBatch.flatMap(_.processingStartTime).getOrElse(-1L), -1L) - registerGauge("lastCompletedBatch_processEndTime", -_.lastCompletedBatch.flatMap(_.processingEndTime).getOrElse(-1L), -1L) + registerGaugeWithOption("lastCompletedBatch_submissionTime", +_.lastCompletedBatch.map(_.submissionTime), -1L) + registerGaugeWithOption("lastCompletedBatch_processingStartTime", +_.lastCompletedBatch.flatMap(_.processingStartTime), -1L) + registerGaugeWithOption("lastCompletedBatch_processingEndTime", +_.lastCompletedBatch.flatMap(_.processingEndTime), -1L) + + // Gauge for last completed batch's delay information. + registerGaugeWithOption("lastCompletedBatch_processingDelay", +_.lastCompletedBatch.flatMap(_.processingDelay), -1L) + registerGaugeWithOption("lastCompletedBatch_schedulingDelay", +_.lastCompletedBatch.flatMap(_.schedulingDelay), -1L) + registerGaugeWithOption("lastComple
spark git commit: [SPARK-4537][Streaming] Expand StreamingSource to add more metrics
Repository: spark Updated Branches: refs/heads/branch-1.1 dd0287cca -> d21347dbb [SPARK-4537][Streaming] Expand StreamingSource to add more metrics Add `processingDelay`, `schedulingDelay` and `totalDelay` for the last completed batch. Add `lastReceivedBatchRecords` and `totalReceivedBatchRecords` to the received records counting. Author: jerryshao Closes #3466 from jerryshao/SPARK-4537 and squashes the following commits: 00f5f7f [jerryshao] Change the code style and add totalProcessedRecords 44721a6 [jerryshao] Further address the comments c097ddc [jerryshao] Address the comments 02dd44f [jerryshao] Fix the addressed comments c7a9376 [jerryshao] Expand StreamingSource to add more metrics (cherry picked from commit f205fe477c33a541053c198cd43a5811d6cf9fe2) Signed-off-by: Tathagata Das Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d21347db Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d21347db Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d21347db Branch: refs/heads/branch-1.1 Commit: d21347dbb9799287794f18933eb8abf82221552f Parents: dd0287c Author: jerryshao Authored: Thu Dec 25 19:39:49 2014 -0800 Committer: Tathagata Das Committed: Thu Dec 25 19:42:17 2014 -0800 -- .../spark/streaming/StreamingSource.scala | 53 ++-- .../ui/StreamingJobProgressListener.scala | 19 ++- 2 files changed, 57 insertions(+), 15 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d21347db/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala index e35a568..9697437 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala @@ -29,9 +29,17 @@ private[streaming] class StreamingSource(ssc: StreamingContext) extends Source { private val streamingListener = ssc.progressListener private def registerGauge[T](name: String, f: StreamingJobProgressListener => T, - defaultValue: T) { + defaultValue: T): Unit = { +registerGaugeWithOption[T](name, + (l: StreamingJobProgressListener) => Option(f(streamingListener)), defaultValue) + } + + private def registerGaugeWithOption[T]( + name: String, + f: StreamingJobProgressListener => Option[T], + defaultValue: T): Unit = { metricRegistry.register(MetricRegistry.name("streaming", name), new Gauge[T] { - override def getValue: T = Option(f(streamingListener)).getOrElse(defaultValue) + override def getValue: T = f(streamingListener).getOrElse(defaultValue) }) } @@ -41,6 +49,12 @@ private[streaming] class StreamingSource(ssc: StreamingContext) extends Source { // Gauge for number of total completed batches registerGauge("totalCompletedBatches", _.numTotalCompletedBatches, 0L) + // Gauge for number of total received records + registerGauge("totalReceivedRecords", _.numTotalReceivedRecords, 0L) + + // Gauge for number of total processed records + registerGauge("totalProcessedRecords", _.numTotalProcessedRecords, 0L) + // Gauge for number of unprocessed batches registerGauge("unprocessedBatches", _.numUnprocessedBatches, 0L) @@ -55,19 +69,30 @@ private[streaming] class StreamingSource(ssc: StreamingContext) extends Source { // Gauge for last completed batch, useful for monitoring the streaming job's running status, // displayed data -1 for any abnormal condition. - registerGauge("lastCompletedBatch_submissionTime", -_.lastCompletedBatch.map(_.submissionTime).getOrElse(-1L), -1L) - registerGauge("lastCompletedBatch_processStartTime", -_.lastCompletedBatch.flatMap(_.processingStartTime).getOrElse(-1L), -1L) - registerGauge("lastCompletedBatch_processEndTime", -_.lastCompletedBatch.flatMap(_.processingEndTime).getOrElse(-1L), -1L) + registerGaugeWithOption("lastCompletedBatch_submissionTime", +_.lastCompletedBatch.map(_.submissionTime), -1L) + registerGaugeWithOption("lastCompletedBatch_processingStartTime", +_.lastCompletedBatch.flatMap(_.processingStartTime), -1L) + registerGaugeWithOption("lastCompletedBatch_processingEndTime", +_.lastCompletedBatch.flatMap(_.processingEndTime), -1L) + + // Gauge for last completed batch's delay information. + registerGaugeWithOption("lastCompletedBatch_processingDelay", +_.lastCompletedBatch.flatMap(_.processingDelay), -1L) + registerGaugeWithOption("lastCompletedBatch_schedulingDelay", +_.lastCompletedBatch.flatMap(_.schedulingDelay), -1L) + registerGaugeWithOption("lastComple