This is an automated email from the ASF dual-hosted git repository.

tgraves pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new b7fff03  [SPARK-31711][CORE] Register the executor source with the 
metrics system when running in local mode
b7fff03 is described below

commit b7fff0397319efd2987d4cceff4f738f1c06409d
Author: Luca Canali <luca.can...@cern.ch>
AuthorDate: Wed Nov 4 16:48:55 2020 -0600

    [SPARK-31711][CORE] Register the executor source with the metrics system 
when running in local mode
    
    ### What changes were proposed in this pull request?
    This PR proposes to register the executor source with the Spark metrics 
system when running in local mode.
    
    ### Why are the changes needed?
    The Apache Spark metrics system provides many useful insights on the Spark 
workload.
    In particular, the [executor source 
metrics](https://github.com/apache/spark/blob/master/docs/monitoring.md#component-instance--executor)
 provide detailed info, including the number of active tasks, I/O metrics, and 
several task metrics details. The executor source metrics, contrary to other 
sources (for example ExecutorMetrics source), is not available when running in 
local mode.
    Having executor metrics in local mode can be useful when testing and 
troubleshooting Spark workloads in a development environment. The metrics can 
be fed to a dashboard to see the evolution of resource usage and can be used to 
troubleshoot performance,
    as [in this example](https://github.com/cerndb/spark-dashboard).
    Currently users will have to deploy on a cluster to be able to collect 
executor source metrics, while the possibility of having them in local mode is 
handy for testing.
    
    ### Does this PR introduce _any_ user-facing change?
    - This PR exposes executor source metrics data when running in local mode.
    
    ### How was this patch tested?
    - Manually tested by running in local mode and inspecting the metrics 
listed in http://localhost:4040/metrics/json/
    - Also added a test in `SourceConfigSuite`
    
    Closes #28528 from LucaCanali/metricsWithLocalMode.
    
    Authored-by: Luca Canali <luca.can...@cern.ch>
    Signed-off-by: Thomas Graves <tgra...@apache.org>
---
 core/src/main/scala/org/apache/spark/SparkContext.scala      |  5 ++++-
 core/src/main/scala/org/apache/spark/executor/Executor.scala |  8 ++++++++
 .../org/apache/spark/metrics/source/SourceConfigSuite.scala  | 12 ++++++++++++
 docs/monitoring.md                                           |  8 ++++++--
 4 files changed, 30 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index b357682..d680154 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -42,7 +42,7 @@ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat 
=> NewFileInputFor
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
-import org.apache.spark.executor.{ExecutorMetrics, ExecutorMetricsSource}
+import org.apache.spark.executor.{Executor, ExecutorMetrics, 
ExecutorMetricsSource}
 import org.apache.spark.input.{FixedLengthBinaryInputFormat, 
PortableDataStream, StreamInputFormat, WholeTextFileInputFormat}
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
@@ -625,6 +625,9 @@ class SparkContext(config: SparkConf) extends Logging {
 
     // Post init
     _taskScheduler.postStartHook()
+    if (isLocal) {
+      _env.metricsSystem.registerSource(Executor.executorSourceLocalModeOnly)
+    }
     _env.metricsSystem.registerSource(_dagScheduler.metricsSource)
     _env.metricsSystem.registerSource(new 
BlockManagerSource(_env.blockManager))
     _env.metricsSystem.registerSource(new JVMCPUSource())
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala 
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 6653650..1a0ad56 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -135,6 +135,11 @@ private[spark] class Executor(
     env.metricsSystem.registerSource(new JVMCPUSource())
     executorMetricsSource.foreach(_.register(env.metricsSystem))
     env.metricsSystem.registerSource(env.blockManager.shuffleMetricsSource)
+  } else {
+    // This enable the registration of the executor source in local mode.
+    // The actual registration happens in SparkContext,
+    // it cannot be done here as the appId is not available yet
+    Executor.executorSourceLocalModeOnly = executorSource
   }
 
   // Whether to load classes in user jars before those in Spark jars
@@ -987,4 +992,7 @@ private[spark] object Executor {
   // task is fully deserialized. When possible, the 
TaskContext.getLocalProperty call should be
   // used instead.
   val taskDeserializationProps: ThreadLocal[Properties] = new 
ThreadLocal[Properties]
+
+  // Used to store executorSource, for local mode only
+  var executorSourceLocalModeOnly: ExecutorSource = null
 }
diff --git 
a/core/src/test/scala/org/apache/spark/metrics/source/SourceConfigSuite.scala 
b/core/src/test/scala/org/apache/spark/metrics/source/SourceConfigSuite.scala
index 8f5ab74..7da1403 100644
--- 
a/core/src/test/scala/org/apache/spark/metrics/source/SourceConfigSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/metrics/source/SourceConfigSuite.scala
@@ -80,4 +80,16 @@ class SourceConfigSuite extends SparkFunSuite with 
LocalSparkContext {
     }
   }
 
+  test("SPARK-31711: Test executor source registration in local mode") {
+    val conf = new SparkConf()
+    val sc = new SparkContext("local", "test", conf)
+    try {
+      val metricsSystem = sc.env.metricsSystem
+
+      // Executor source should be registered
+      assert (metricsSystem.getSourcesByName("executor").nonEmpty)
+    } finally {
+      sc.stop()
+    }
+  }
 }
diff --git a/docs/monitoring.md b/docs/monitoring.md
index 3513fed..a07a113 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -1155,6 +1155,11 @@ This is the component with the largest amount of 
instrumented metrics
 - namespace=JVMCPU
   - jvmCpuTime
 
+- namespace=executor
+  - **note:** These metrics are available in the driver in local mode only.
+  - A full list of available metrics in this 
+    namespace can be found in the corresponding entry for the Executor 
component instance.
+    
 - namespace=ExecutorMetrics
   - **note:** these metrics are conditional to a configuration parameter:
     `spark.metrics.executorMetricsSource.enabled` (default is true) 
@@ -1167,8 +1172,7 @@ This is the component with the largest amount of 
instrumented metrics
   custom plugins into Spark.
 
 ### Component instance = Executor
-These metrics are exposed by Spark executors. Note, currently they are not 
available
-when running in local mode.
+These metrics are exposed by Spark executors. 
  
 - namespace=executor (metrics are of type counter or gauge)
   - bytesRead.count


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to