Repository: spark
Updated Branches:
  refs/heads/master 0aed38e44 -> b7bcbe25f


[SPARK-8743] [STREAMING] Deregister Codahale metrics for streaming when 
StreamingContext is closed

The issue link: https://issues.apache.org/jira/browse/SPARK-8743
Deregister Codahale metrics for streaming when StreamingContext is closed

Design:
Adding the method calls in the appropriate start() and stop () methods for the 
StreamingContext

Actions in the PullRequest:
1) Added the registerSource method call to the start method for the Streaming 
Context.
2) Added the removeSource method to the stop method.
3) Added comments for both 1 and 2 and comment to show initialization of the 
StreamingSource
4) Added a test case to check for both registration and de-registration of 
metrics

Previous closed PR for reference: https://github.com/apache/spark/pull/7250

Author: Neelesh Srinivas Salian <nsal...@cloudera.com>

Closes #7362 from nssalian/branch-SPARK-8743 and squashes the following commits:

7d998a3 [Neelesh Srinivas Salian] Removed the Thread.sleep() call
8b26397 [Neelesh Srinivas Salian] Moved the scalatest.{} import
0e8007a [Neelesh Srinivas Salian] moved import org.apache.spark{} to correct 
place
daedaa5 [Neelesh Srinivas Salian] Corrected Ordering of imports
8873180 [Neelesh Srinivas Salian] Removed redundancy in imports
59227a4 [Neelesh Srinivas Salian] Changed the ordering of the imports to 
classify  scala and spark imports
d8cb577 [Neelesh Srinivas Salian] Added registerSource to start() and 
removeSource to stop(). Wrote a test to check the registration and 
de-registration


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b7bcbe25
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b7bcbe25
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b7bcbe25

Branch: refs/heads/master
Commit: b7bcbe25f90ba4e78b548465bc80d4de1d2c4a4a
Parents: 0aed38e
Author: Neelesh Srinivas Salian <nsal...@cloudera.com>
Authored: Mon Jul 13 15:46:51 2015 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Mon Jul 13 15:46:51 2015 -0700

----------------------------------------------------------------------
 .../spark/streaming/StreamingContext.scala      | 10 +++--
 .../spark/streaming/StreamingContextSuite.scala | 41 +++++++++++++++++++-
 2 files changed, 45 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b7bcbe25/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index ec49d0f..6b78a82 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -192,11 +192,8 @@ class StreamingContext private[streaming] (
       None
     }
 
-  /** Register streaming source to metrics system */
+  /* Initializing a streamingSource to register metrics */
   private val streamingSource = new StreamingSource(this)
-  assert(env != null)
-  assert(env.metricsSystem != null)
-  env.metricsSystem.registerSource(streamingSource)
 
   private var state: StreamingContextState = INITIALIZED
 
@@ -606,6 +603,9 @@ class StreamingContext private[streaming] (
         }
         shutdownHookRef = Utils.addShutdownHook(
           StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)
+        // Registering Streaming Metrics at the start of the StreamingContext
+        assert(env.metricsSystem != null)
+        env.metricsSystem.registerSource(streamingSource)
         uiTab.foreach(_.attach())
         logInfo("StreamingContext started")
       case ACTIVE =>
@@ -682,6 +682,8 @@ class StreamingContext private[streaming] (
           logWarning("StreamingContext has already been stopped")
         case ACTIVE =>
           scheduler.stop(stopGracefully)
+          // Removing the streamingSource to de-register the metrics on stop()
+          env.metricsSystem.removeSource(streamingSource)
           uiTab.foreach(_.detach())
           StreamingContext.setActiveContext(null)
           waiter.notifyStop()

http://git-wip-us.apache.org/repos/asf/spark/blob/b7bcbe25/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 56b4ce5..289a159 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -20,20 +20,23 @@ package org.apache.spark.streaming
 import java.io.{File, NotSerializableException}
 import java.util.concurrent.atomic.AtomicInteger
 
+import scala.collection.mutable.ArrayBuffer
 import scala.collection.mutable.Queue
 
 import org.apache.commons.io.FileUtils
+import org.scalatest.{Assertions, BeforeAndAfter, PrivateMethodTester}
 import org.scalatest.concurrent.Eventually._
 import org.scalatest.concurrent.Timeouts
 import org.scalatest.exceptions.TestFailedDueToTimeoutException
 import org.scalatest.time.SpanSugar._
-import org.scalatest.{Assertions, BeforeAndAfter}
 
+import org.apache.spark.{Logging, SparkConf, SparkContext, SparkFunSuite}
+import org.apache.spark.metrics.MetricsSystem
+import org.apache.spark.metrics.source.Source
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.dstream.DStream
 import org.apache.spark.streaming.receiver.Receiver
 import org.apache.spark.util.Utils
-import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException, 
SparkFunSuite}
 
 
 class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with 
Timeouts with Logging {
@@ -299,6 +302,25 @@ class StreamingContextSuite extends SparkFunSuite with 
BeforeAndAfter with Timeo
     Thread.sleep(100)
   }
 
+  test ("registering and de-registering of streamingSource") {
+    val conf = new SparkConf().setMaster(master).setAppName(appName)
+    ssc = new StreamingContext(conf, batchDuration)
+    assert(ssc.getState() === StreamingContextState.INITIALIZED)
+    addInputStream(ssc).register()
+    ssc.start()
+
+    val sources = StreamingContextSuite.getSources(ssc.env.metricsSystem)
+    val streamingSource = StreamingContextSuite.getStreamingSource(ssc)
+    assert(sources.contains(streamingSource))
+    assert(ssc.getState() === StreamingContextState.ACTIVE)
+
+    ssc.stop()
+    val sourcesAfterStop = 
StreamingContextSuite.getSources(ssc.env.metricsSystem)
+    val streamingSourceAfterStop = 
StreamingContextSuite.getStreamingSource(ssc)
+    assert(ssc.getState() === StreamingContextState.STOPPED)
+    assert(!sourcesAfterStop.contains(streamingSourceAfterStop))
+  }
+
   test("awaitTermination") {
     ssc = new StreamingContext(master, appName, batchDuration)
     val inputStream = addInputStream(ssc)
@@ -811,3 +833,18 @@ package object testPackage extends Assertions {
     }
   }
 }
+
+/**
+ * Helper methods for testing StreamingContextSuite
+ * This includes methods to access private methods and fields in 
StreamingContext and MetricsSystem
+ */
+private object StreamingContextSuite extends PrivateMethodTester {
+  private val _sources = PrivateMethod[ArrayBuffer[Source]]('sources)
+  private def getSources(metricsSystem: MetricsSystem): ArrayBuffer[Source] = {
+    metricsSystem.invokePrivate(_sources())
+  }
+  private val _streamingSource = 
PrivateMethod[StreamingSource]('streamingSource)
+  private def getStreamingSource(streamingContext: StreamingContext): 
StreamingSource = {
+    streamingContext.invokePrivate(_streamingSource())
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to