git commit: [SPARK-1510] Spark Streaming metrics source for metrics system
Repository: spark Updated Branches: refs/heads/master 44da5ab2d - 80429f3e2 [SPARK-1510] Spark Streaming metrics source for metrics system This pulls in changes made by @jerryshao in https://github.com/apache/spark/pull/424 and merges with the master. Author: jerryshao saisai.s...@intel.com Author: Tathagata Das tathagata.das1...@gmail.com Closes #545 from tdas/streaming-metrics and squashes the following commits: 034b443 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into streaming-metrics fb3b0a5 [jerryshao] Modify according master update 21939f5 [jerryshao] Style changes according to style check error 976116b [jerryshao] Add StreamSource in StreamingContext for better monitoring through metrics system Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/80429f3e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/80429f3e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/80429f3e Branch: refs/heads/master Commit: 80429f3e2ab786d103297652922c3d8da3cf5a01 Parents: 44da5ab Author: jerryshao saisai.s...@intel.com Authored: Thu Apr 24 18:56:57 2014 -0700 Committer: Tathagata Das tathagata.das1...@gmail.com Committed: Thu Apr 24 18:56:57 2014 -0700 -- .../spark/streaming/StreamingContext.scala | 4 ++ .../spark/streaming/StreamingSource.scala | 73 .../ui/StreamingJobProgressListener.scala | 3 +- 3 files changed, 79 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/80429f3e/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 1c89543..e0677b7 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -154,6 +154,10 @@ class StreamingContext private[streaming] ( private[streaming] val uiTab = new StreamingTab(this) + /** Register streaming source to metrics system */ + private val streamingSource = new StreamingSource(this) + SparkEnv.get.metricsSystem.registerSource(streamingSource) + /** Enumeration to identify current state of the StreamingContext */ private[streaming] object StreamingContextState extends Enumeration { type CheckpointState = Value http://git-wip-us.apache.org/repos/asf/spark/blob/80429f3e/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala new file mode 100644 index 000..774adc3 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming + +import com.codahale.metrics.{Gauge, MetricRegistry} + +import org.apache.spark.metrics.source.Source +import org.apache.spark.streaming.ui.StreamingJobProgressListener + +private[streaming] class StreamingSource(ssc: StreamingContext) extends Source { + val metricRegistry = new MetricRegistry + val sourceName = %s.StreamingMetrics.format(ssc.sparkContext.appName) + + val streamingListener = ssc.uiTab.listener + + private def registerGauge[T](name: String, f: StreamingJobProgressListener = T, + defaultValue: T) { +metricRegistry.register(MetricRegistry.name(streaming, name), new Gauge[T] { + override def getValue: T = Option(f(streamingListener)).getOrElse(defaultValue) +}) + } + + // Gauge for number of network receivers + registerGauge(receivers, _.numReceivers, 0) + + // Gauge for number of total completed batches + registerGauge(totalCompletedBatches,
git commit: [SPARK-1510] Spark Streaming metrics source for metrics system
Repository: spark Updated Branches: refs/heads/branch-1.0 c8dd13221 - 0bc0f36d6 [SPARK-1510] Spark Streaming metrics source for metrics system This pulls in changes made by @jerryshao in https://github.com/apache/spark/pull/424 and merges with the master. Author: jerryshao saisai.s...@intel.com Author: Tathagata Das tathagata.das1...@gmail.com Closes #545 from tdas/streaming-metrics and squashes the following commits: 034b443 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into streaming-metrics fb3b0a5 [jerryshao] Modify according master update 21939f5 [jerryshao] Style changes according to style check error 976116b [jerryshao] Add StreamSource in StreamingContext for better monitoring through metrics system (cherry picked from commit 80429f3e2ab786d103297652922c3d8da3cf5a01) Signed-off-by: Tathagata Das tathagata.das1...@gmail.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0bc0f36d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0bc0f36d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0bc0f36d Branch: refs/heads/branch-1.0 Commit: 0bc0f36d6be265d8ad5b1909fc06f03a661454a5 Parents: c8dd132 Author: jerryshao saisai.s...@intel.com Authored: Thu Apr 24 18:56:57 2014 -0700 Committer: Tathagata Das tathagata.das1...@gmail.com Committed: Thu Apr 24 18:57:11 2014 -0700 -- .../spark/streaming/StreamingContext.scala | 4 ++ .../spark/streaming/StreamingSource.scala | 73 .../ui/StreamingJobProgressListener.scala | 3 +- 3 files changed, 79 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0bc0f36d/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 1c89543..e0677b7 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -154,6 +154,10 @@ class StreamingContext private[streaming] ( private[streaming] val uiTab = new StreamingTab(this) + /** Register streaming source to metrics system */ + private val streamingSource = new StreamingSource(this) + SparkEnv.get.metricsSystem.registerSource(streamingSource) + /** Enumeration to identify current state of the StreamingContext */ private[streaming] object StreamingContextState extends Enumeration { type CheckpointState = Value http://git-wip-us.apache.org/repos/asf/spark/blob/0bc0f36d/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala new file mode 100644 index 000..774adc3 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming + +import com.codahale.metrics.{Gauge, MetricRegistry} + +import org.apache.spark.metrics.source.Source +import org.apache.spark.streaming.ui.StreamingJobProgressListener + +private[streaming] class StreamingSource(ssc: StreamingContext) extends Source { + val metricRegistry = new MetricRegistry + val sourceName = %s.StreamingMetrics.format(ssc.sparkContext.appName) + + val streamingListener = ssc.uiTab.listener + + private def registerGauge[T](name: String, f: StreamingJobProgressListener = T, + defaultValue: T) { +metricRegistry.register(MetricRegistry.name(streaming, name), new Gauge[T] { + override def getValue: T = Option(f(streamingListener)).getOrElse(defaultValue) +}) + } + + // Gauge for number of network receivers +