Repository: spark Updated Branches: refs/heads/master 0aed38e44 -> b7bcbe25f
[SPARK-8743] [STREAMING] Deregister Codahale metrics for streaming when StreamingContext is closed The issue link: https://issues.apache.org/jira/browse/SPARK-8743 Deregister Codahale metrics for streaming when StreamingContext is closed Design: Adding the method calls in the appropriate start() and stop () methods for the StreamingContext Actions in the PullRequest: 1) Added the registerSource method call to the start method for the Streaming Context. 2) Added the removeSource method to the stop method. 3) Added comments for both 1 and 2 and comment to show initialization of the StreamingSource 4) Added a test case to check for both registration and de-registration of metrics Previous closed PR for reference: https://github.com/apache/spark/pull/7250 Author: Neelesh Srinivas Salian <nsal...@cloudera.com> Closes #7362 from nssalian/branch-SPARK-8743 and squashes the following commits: 7d998a3 [Neelesh Srinivas Salian] Removed the Thread.sleep() call 8b26397 [Neelesh Srinivas Salian] Moved the scalatest.{} import 0e8007a [Neelesh Srinivas Salian] moved import org.apache.spark{} to correct place daedaa5 [Neelesh Srinivas Salian] Corrected Ordering of imports 8873180 [Neelesh Srinivas Salian] Removed redundancy in imports 59227a4 [Neelesh Srinivas Salian] Changed the ordering of the imports to classify scala and spark imports d8cb577 [Neelesh Srinivas Salian] Added registerSource to start() and removeSource to stop(). Wrote a test to check the registration and de-registration Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b7bcbe25 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b7bcbe25 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b7bcbe25 Branch: refs/heads/master Commit: b7bcbe25f90ba4e78b548465bc80d4de1d2c4a4a Parents: 0aed38e Author: Neelesh Srinivas Salian <nsal...@cloudera.com> Authored: Mon Jul 13 15:46:51 2015 -0700 Committer: Tathagata Das <tathagata.das1...@gmail.com> Committed: Mon Jul 13 15:46:51 2015 -0700 ---------------------------------------------------------------------- .../spark/streaming/StreamingContext.scala | 10 +++-- .../spark/streaming/StreamingContextSuite.scala | 41 +++++++++++++++++++- 2 files changed, 45 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/b7bcbe25/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index ec49d0f..6b78a82 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -192,11 +192,8 @@ class StreamingContext private[streaming] ( None } - /** Register streaming source to metrics system */ + /* Initializing a streamingSource to register metrics */ private val streamingSource = new StreamingSource(this) - assert(env != null) - assert(env.metricsSystem != null) - env.metricsSystem.registerSource(streamingSource) private var state: StreamingContextState = INITIALIZED @@ -606,6 +603,9 @@ class StreamingContext private[streaming] ( } shutdownHookRef = Utils.addShutdownHook( StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown) + // Registering Streaming Metrics at the start of the StreamingContext + assert(env.metricsSystem != null) + env.metricsSystem.registerSource(streamingSource) uiTab.foreach(_.attach()) logInfo("StreamingContext started") case ACTIVE => @@ -682,6 +682,8 @@ class StreamingContext private[streaming] ( logWarning("StreamingContext has already been stopped") case ACTIVE => scheduler.stop(stopGracefully) + // Removing the streamingSource to de-register the metrics on stop() + env.metricsSystem.removeSource(streamingSource) uiTab.foreach(_.detach()) StreamingContext.setActiveContext(null) waiter.notifyStop() http://git-wip-us.apache.org/repos/asf/spark/blob/b7bcbe25/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 56b4ce5..289a159 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -20,20 +20,23 @@ package org.apache.spark.streaming import java.io.{File, NotSerializableException} import java.util.concurrent.atomic.AtomicInteger +import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.Queue import org.apache.commons.io.FileUtils +import org.scalatest.{Assertions, BeforeAndAfter, PrivateMethodTester} import org.scalatest.concurrent.Eventually._ import org.scalatest.concurrent.Timeouts import org.scalatest.exceptions.TestFailedDueToTimeoutException import org.scalatest.time.SpanSugar._ -import org.scalatest.{Assertions, BeforeAndAfter} +import org.apache.spark.{Logging, SparkConf, SparkContext, SparkFunSuite} +import org.apache.spark.metrics.MetricsSystem +import org.apache.spark.metrics.source.Source import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.receiver.Receiver import org.apache.spark.util.Utils -import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException, SparkFunSuite} class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeouts with Logging { @@ -299,6 +302,25 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo Thread.sleep(100) } + test ("registering and de-registering of streamingSource") { + val conf = new SparkConf().setMaster(master).setAppName(appName) + ssc = new StreamingContext(conf, batchDuration) + assert(ssc.getState() === StreamingContextState.INITIALIZED) + addInputStream(ssc).register() + ssc.start() + + val sources = StreamingContextSuite.getSources(ssc.env.metricsSystem) + val streamingSource = StreamingContextSuite.getStreamingSource(ssc) + assert(sources.contains(streamingSource)) + assert(ssc.getState() === StreamingContextState.ACTIVE) + + ssc.stop() + val sourcesAfterStop = StreamingContextSuite.getSources(ssc.env.metricsSystem) + val streamingSourceAfterStop = StreamingContextSuite.getStreamingSource(ssc) + assert(ssc.getState() === StreamingContextState.STOPPED) + assert(!sourcesAfterStop.contains(streamingSourceAfterStop)) + } + test("awaitTermination") { ssc = new StreamingContext(master, appName, batchDuration) val inputStream = addInputStream(ssc) @@ -811,3 +833,18 @@ package object testPackage extends Assertions { } } } + +/** + * Helper methods for testing StreamingContextSuite + * This includes methods to access private methods and fields in StreamingContext and MetricsSystem + */ +private object StreamingContextSuite extends PrivateMethodTester { + private val _sources = PrivateMethod[ArrayBuffer[Source]]('sources) + private def getSources(metricsSystem: MetricsSystem): ArrayBuffer[Source] = { + metricsSystem.invokePrivate(_sources()) + } + private val _streamingSource = PrivateMethod[StreamingSource]('streamingSource) + private def getStreamingSource(streamingContext: StreamingContext): StreamingSource = { + streamingContext.invokePrivate(_streamingSource()) + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org