This is an automated email from the ASF dual-hosted git repository. tgraves pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new b7fff03 [SPARK-31711][CORE] Register the executor source with the metrics system when running in local mode b7fff03 is described below commit b7fff0397319efd2987d4cceff4f738f1c06409d Author: Luca Canali <luca.can...@cern.ch> AuthorDate: Wed Nov 4 16:48:55 2020 -0600 [SPARK-31711][CORE] Register the executor source with the metrics system when running in local mode ### What changes were proposed in this pull request? This PR proposes to register the executor source with the Spark metrics system when running in local mode. ### Why are the changes needed? The Apache Spark metrics system provides many useful insights on the Spark workload. In particular, the [executor source metrics](https://github.com/apache/spark/blob/master/docs/monitoring.md#component-instance--executor) provide detailed info, including the number of active tasks, I/O metrics, and several task metrics details. The executor source metrics, contrary to other sources (for example ExecutorMetrics source), is not available when running in local mode. Having executor metrics in local mode can be useful when testing and troubleshooting Spark workloads in a development environment. The metrics can be fed to a dashboard to see the evolution of resource usage and can be used to troubleshoot performance, as [in this example](https://github.com/cerndb/spark-dashboard). Currently users will have to deploy on a cluster to be able to collect executor source metrics, while the possibility of having them in local mode is handy for testing. ### Does this PR introduce _any_ user-facing change? - This PR exposes executor source metrics data when running in local mode. ### How was this patch tested? - Manually tested by running in local mode and inspecting the metrics listed in http://localhost:4040/metrics/json/ - Also added a test in `SourceConfigSuite` Closes #28528 from LucaCanali/metricsWithLocalMode. Authored-by: Luca Canali <luca.can...@cern.ch> Signed-off-by: Thomas Graves <tgra...@apache.org> --- core/src/main/scala/org/apache/spark/SparkContext.scala | 5 ++++- core/src/main/scala/org/apache/spark/executor/Executor.scala | 8 ++++++++ .../org/apache/spark/metrics/source/SourceConfigSuite.scala | 12 ++++++++++++ docs/monitoring.md | 8 ++++++-- 4 files changed, 30 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index b357682..d680154 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -42,7 +42,7 @@ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFor import org.apache.spark.annotation.DeveloperApi import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil} -import org.apache.spark.executor.{ExecutorMetrics, ExecutorMetricsSource} +import org.apache.spark.executor.{Executor, ExecutorMetrics, ExecutorMetricsSource} import org.apache.spark.input.{FixedLengthBinaryInputFormat, PortableDataStream, StreamInputFormat, WholeTextFileInputFormat} import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ @@ -625,6 +625,9 @@ class SparkContext(config: SparkConf) extends Logging { // Post init _taskScheduler.postStartHook() + if (isLocal) { + _env.metricsSystem.registerSource(Executor.executorSourceLocalModeOnly) + } _env.metricsSystem.registerSource(_dagScheduler.metricsSource) _env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager)) _env.metricsSystem.registerSource(new JVMCPUSource()) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 6653650..1a0ad56 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -135,6 +135,11 @@ private[spark] class Executor( env.metricsSystem.registerSource(new JVMCPUSource()) executorMetricsSource.foreach(_.register(env.metricsSystem)) env.metricsSystem.registerSource(env.blockManager.shuffleMetricsSource) + } else { + // This enable the registration of the executor source in local mode. + // The actual registration happens in SparkContext, + // it cannot be done here as the appId is not available yet + Executor.executorSourceLocalModeOnly = executorSource } // Whether to load classes in user jars before those in Spark jars @@ -987,4 +992,7 @@ private[spark] object Executor { // task is fully deserialized. When possible, the TaskContext.getLocalProperty call should be // used instead. val taskDeserializationProps: ThreadLocal[Properties] = new ThreadLocal[Properties] + + // Used to store executorSource, for local mode only + var executorSourceLocalModeOnly: ExecutorSource = null } diff --git a/core/src/test/scala/org/apache/spark/metrics/source/SourceConfigSuite.scala b/core/src/test/scala/org/apache/spark/metrics/source/SourceConfigSuite.scala index 8f5ab74..7da1403 100644 --- a/core/src/test/scala/org/apache/spark/metrics/source/SourceConfigSuite.scala +++ b/core/src/test/scala/org/apache/spark/metrics/source/SourceConfigSuite.scala @@ -80,4 +80,16 @@ class SourceConfigSuite extends SparkFunSuite with LocalSparkContext { } } + test("SPARK-31711: Test executor source registration in local mode") { + val conf = new SparkConf() + val sc = new SparkContext("local", "test", conf) + try { + val metricsSystem = sc.env.metricsSystem + + // Executor source should be registered + assert (metricsSystem.getSourcesByName("executor").nonEmpty) + } finally { + sc.stop() + } + } } diff --git a/docs/monitoring.md b/docs/monitoring.md index 3513fed..a07a113 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -1155,6 +1155,11 @@ This is the component with the largest amount of instrumented metrics - namespace=JVMCPU - jvmCpuTime +- namespace=executor + - **note:** These metrics are available in the driver in local mode only. + - A full list of available metrics in this + namespace can be found in the corresponding entry for the Executor component instance. + - namespace=ExecutorMetrics - **note:** these metrics are conditional to a configuration parameter: `spark.metrics.executorMetricsSource.enabled` (default is true) @@ -1167,8 +1172,7 @@ This is the component with the largest amount of instrumented metrics custom plugins into Spark. ### Component instance = Executor -These metrics are exposed by Spark executors. Note, currently they are not available -when running in local mode. +These metrics are exposed by Spark executors. - namespace=executor (metrics are of type counter or gauge) - bytesRead.count --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org