Repository: spark Updated Branches: refs/heads/master 72e336997 -> f1957e116
SPARK-2134: Report metrics before application finishes Author: Rahul Singhal <rahul.sing...@guavus.com> Closes #1076 from rahulsinghaliitd/SPARK-2134 and squashes the following commits: 15f18b6 [Rahul Singhal] SPARK-2134: Report metrics before application finishes Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f1957e11 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f1957e11 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f1957e11 Branch: refs/heads/master Commit: f1957e11652a537efd40771f843591a4c9341014 Parents: 72e3369 Author: Rahul Singhal <rahul.sing...@guavus.com> Authored: Fri Aug 1 00:33:15 2014 -0700 Committer: Matei Zaharia <ma...@databricks.com> Committed: Fri Aug 1 00:33:15 2014 -0700 ---------------------------------------------------------------------- core/src/main/scala/org/apache/spark/SparkContext.scala | 1 + core/src/main/scala/org/apache/spark/deploy/master/Master.scala | 2 ++ core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala | 1 + .../org/apache/spark/executor/CoarseGrainedExecutorBackend.scala | 1 + core/src/main/scala/org/apache/spark/executor/Executor.scala | 4 ++++ core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala | 4 ++++ .../main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala | 4 ++++ core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala | 4 ++++ .../main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala | 4 ++++ core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala | 2 ++ .../scala/org/apache/spark/metrics/sink/MetricsServlet.scala | 2 ++ core/src/main/scala/org/apache/spark/metrics/sink/Sink.scala | 1 + .../main/scala/org/apache/spark/metrics/sink/GangliaSink.scala | 4 ++++ 13 files changed, 34 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/f1957e11/core/src/main/scala/org/apache/spark/SparkContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index b25f081..f5a0549 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -990,6 +990,7 @@ class SparkContext(config: SparkConf) extends Logging { val dagSchedulerCopy = dagScheduler dagScheduler = null if (dagSchedulerCopy != null) { + env.metricsSystem.report() metadataCleaner.cancel() cleaner.foreach(_.stop()) dagSchedulerCopy.stop() http://git-wip-us.apache.org/repos/asf/spark/blob/f1957e11/core/src/main/scala/org/apache/spark/deploy/master/Master.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 21f8667..a70ecdb 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -154,6 +154,8 @@ private[spark] class Master( } override def postStop() { + masterMetricsSystem.report() + applicationMetricsSystem.report() // prevent the CompleteRecovery message sending to restarted master if (recoveryCompletionTask != null) { recoveryCompletionTask.cancel() http://git-wip-us.apache.org/repos/asf/spark/blob/f1957e11/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index ce42544..fb5252d 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -357,6 +357,7 @@ private[spark] class Worker( } override def postStop() { + metricsSystem.report() registrationRetryTimer.foreach(_.cancel()) executors.values.foreach(_.kill()) drivers.values.foreach(_.kill()) http://git-wip-us.apache.org/repos/asf/spark/blob/f1957e11/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 860b47e..af736de 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -88,6 +88,7 @@ private[spark] class CoarseGrainedExecutorBackend( case StopExecutor => logInfo("Driver commanded a shutdown") + executor.stop() context.stop(self) context.system.shutdown() } http://git-wip-us.apache.org/repos/asf/spark/blob/f1957e11/core/src/main/scala/org/apache/spark/executor/Executor.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 3b69bc4..99d650a 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -121,6 +121,10 @@ private[spark] class Executor( } } + def stop(): Unit = { + env.metricsSystem.report() + } + /** Get the Yarn approved local directories. */ private def getYarnLocalDirs(): String = { // Hadoop 0.23 and 2.x have different Environment variable names for the http://git-wip-us.apache.org/repos/asf/spark/blob/f1957e11/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala index 651511d..6ef817d 100644 --- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala @@ -91,6 +91,10 @@ private[spark] class MetricsSystem private (val instance: String, sinks.foreach(_.stop) } + def report(): Unit = { + sinks.foreach(_.report()) + } + def registerSource(source: Source) { sources += source try { http://git-wip-us.apache.org/repos/asf/spark/blob/f1957e11/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala index 05852f1..81b9056 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala @@ -57,5 +57,9 @@ private[spark] class ConsoleSink(val property: Properties, val registry: MetricR override def stop() { reporter.stop() } + + override def report() { + reporter.report() + } } http://git-wip-us.apache.org/repos/asf/spark/blob/f1957e11/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala index 542dce6..9d5f2ae 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala @@ -66,5 +66,9 @@ private[spark] class CsvSink(val property: Properties, val registry: MetricRegis override def stop() { reporter.stop() } + + override def report() { + reporter.report() + } } http://git-wip-us.apache.org/repos/asf/spark/blob/f1957e11/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala index aeb4ad4..d7b5f5c 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala @@ -81,4 +81,8 @@ private[spark] class GraphiteSink(val property: Properties, val registry: Metric override def stop() { reporter.stop() } + + override def report() { + reporter.report() + } } http://git-wip-us.apache.org/repos/asf/spark/blob/f1957e11/core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala index ed27234..2588fe2 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala @@ -35,4 +35,6 @@ private[spark] class JmxSink(val property: Properties, val registry: MetricRegis reporter.stop() } + override def report() { } + } http://git-wip-us.apache.org/repos/asf/spark/blob/f1957e11/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala b/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala index 571539b..2f65bc8 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala @@ -57,4 +57,6 @@ private[spark] class MetricsServlet(val property: Properties, val registry: Metr override def start() { } override def stop() { } + + override def report() { } } http://git-wip-us.apache.org/repos/asf/spark/blob/f1957e11/core/src/main/scala/org/apache/spark/metrics/sink/Sink.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/Sink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/Sink.scala index 6f2b5a0..0d83d8c 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/Sink.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/Sink.scala @@ -20,4 +20,5 @@ package org.apache.spark.metrics.sink private[spark] trait Sink { def start: Unit def stop: Unit + def report(): Unit } http://git-wip-us.apache.org/repos/asf/spark/blob/f1957e11/extras/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala ---------------------------------------------------------------------- diff --git a/extras/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala b/extras/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala index d03d777..3b1880e 100644 --- a/extras/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala +++ b/extras/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala @@ -82,5 +82,9 @@ class GangliaSink(val property: Properties, val registry: MetricRegistry, override def stop() { reporter.stop() } + + override def report() { + reporter.report() + } }