git commit: [SPARK-1510] Spark Streaming metrics source for metrics system

2014-04-24 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master 44da5ab2d - 80429f3e2


[SPARK-1510] Spark Streaming metrics source for metrics system

This pulls in changes made by @jerryshao in 
https://github.com/apache/spark/pull/424 and merges with the master.

Author: jerryshao saisai.s...@intel.com
Author: Tathagata Das tathagata.das1...@gmail.com

Closes #545 from tdas/streaming-metrics and squashes the following commits:

034b443 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' 
into streaming-metrics
fb3b0a5 [jerryshao] Modify according master update
21939f5 [jerryshao] Style changes according to style check error
976116b [jerryshao] Add StreamSource in StreamingContext for better monitoring 
through metrics system


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/80429f3e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/80429f3e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/80429f3e

Branch: refs/heads/master
Commit: 80429f3e2ab786d103297652922c3d8da3cf5a01
Parents: 44da5ab
Author: jerryshao saisai.s...@intel.com
Authored: Thu Apr 24 18:56:57 2014 -0700
Committer: Tathagata Das tathagata.das1...@gmail.com
Committed: Thu Apr 24 18:56:57 2014 -0700

--
 .../spark/streaming/StreamingContext.scala  |  4 ++
 .../spark/streaming/StreamingSource.scala   | 73 
 .../ui/StreamingJobProgressListener.scala   |  3 +-
 3 files changed, 79 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/80429f3e/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 1c89543..e0677b7 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -154,6 +154,10 @@ class StreamingContext private[streaming] (
 
   private[streaming] val uiTab = new StreamingTab(this)
 
+  /** Register streaming source to metrics system */
+  private val streamingSource = new StreamingSource(this)
+  SparkEnv.get.metricsSystem.registerSource(streamingSource)
+
   /** Enumeration to identify current state of the StreamingContext */
   private[streaming] object StreamingContextState extends Enumeration {
 type CheckpointState = Value

http://git-wip-us.apache.org/repos/asf/spark/blob/80429f3e/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala
new file mode 100644
index 000..774adc3
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import com.codahale.metrics.{Gauge, MetricRegistry}
+
+import org.apache.spark.metrics.source.Source
+import org.apache.spark.streaming.ui.StreamingJobProgressListener
+
+private[streaming] class StreamingSource(ssc: StreamingContext) extends Source 
{
+  val metricRegistry = new MetricRegistry
+  val sourceName = %s.StreamingMetrics.format(ssc.sparkContext.appName)
+
+  val streamingListener = ssc.uiTab.listener
+
+  private def registerGauge[T](name: String, f: StreamingJobProgressListener 
= T,
+  defaultValue: T) {
+metricRegistry.register(MetricRegistry.name(streaming, name), new 
Gauge[T] {
+  override def getValue: T = 
Option(f(streamingListener)).getOrElse(defaultValue)
+})
+  }
+
+  // Gauge for number of network receivers
+  registerGauge(receivers, _.numReceivers, 0)
+
+  // Gauge for number of total completed batches
+  registerGauge(totalCompletedBatches, 

git commit: [SPARK-1510] Spark Streaming metrics source for metrics system

2014-04-24 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/branch-1.0 c8dd13221 - 0bc0f36d6


[SPARK-1510] Spark Streaming metrics source for metrics system

This pulls in changes made by @jerryshao in 
https://github.com/apache/spark/pull/424 and merges with the master.

Author: jerryshao saisai.s...@intel.com
Author: Tathagata Das tathagata.das1...@gmail.com

Closes #545 from tdas/streaming-metrics and squashes the following commits:

034b443 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' 
into streaming-metrics
fb3b0a5 [jerryshao] Modify according master update
21939f5 [jerryshao] Style changes according to style check error
976116b [jerryshao] Add StreamSource in StreamingContext for better monitoring 
through metrics system

(cherry picked from commit 80429f3e2ab786d103297652922c3d8da3cf5a01)
Signed-off-by: Tathagata Das tathagata.das1...@gmail.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0bc0f36d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0bc0f36d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0bc0f36d

Branch: refs/heads/branch-1.0
Commit: 0bc0f36d6be265d8ad5b1909fc06f03a661454a5
Parents: c8dd132
Author: jerryshao saisai.s...@intel.com
Authored: Thu Apr 24 18:56:57 2014 -0700
Committer: Tathagata Das tathagata.das1...@gmail.com
Committed: Thu Apr 24 18:57:11 2014 -0700

--
 .../spark/streaming/StreamingContext.scala  |  4 ++
 .../spark/streaming/StreamingSource.scala   | 73 
 .../ui/StreamingJobProgressListener.scala   |  3 +-
 3 files changed, 79 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0bc0f36d/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 1c89543..e0677b7 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -154,6 +154,10 @@ class StreamingContext private[streaming] (
 
   private[streaming] val uiTab = new StreamingTab(this)
 
+  /** Register streaming source to metrics system */
+  private val streamingSource = new StreamingSource(this)
+  SparkEnv.get.metricsSystem.registerSource(streamingSource)
+
   /** Enumeration to identify current state of the StreamingContext */
   private[streaming] object StreamingContextState extends Enumeration {
 type CheckpointState = Value

http://git-wip-us.apache.org/repos/asf/spark/blob/0bc0f36d/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala
new file mode 100644
index 000..774adc3
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import com.codahale.metrics.{Gauge, MetricRegistry}
+
+import org.apache.spark.metrics.source.Source
+import org.apache.spark.streaming.ui.StreamingJobProgressListener
+
+private[streaming] class StreamingSource(ssc: StreamingContext) extends Source 
{
+  val metricRegistry = new MetricRegistry
+  val sourceName = %s.StreamingMetrics.format(ssc.sparkContext.appName)
+
+  val streamingListener = ssc.uiTab.listener
+
+  private def registerGauge[T](name: String, f: StreamingJobProgressListener 
= T,
+  defaultValue: T) {
+metricRegistry.register(MetricRegistry.name(streaming, name), new 
Gauge[T] {
+  override def getValue: T = 
Option(f(streamingListener)).getOrElse(defaultValue)
+})
+  }
+
+  // Gauge for number of network receivers
+