Repository: spark
Updated Branches:
  refs/heads/master cfc3e1aaa -> d2436a852


[SPARK-24594][YARN] Introducing metrics for YARN

## What changes were proposed in this pull request?

In this PR metrics are introduced for YARN.  As up to now there was no metrics 
in the YARN module a new metric system is created with the name 
"applicationMaster".
To support both client and cluster mode the metric system lifecycle is bound to 
the AM.

## How was this patch tested?

Both client and cluster mode was tested manually.
Before the test on one of the YARN node spark-core was removed to cause the 
allocation failure.
Spark was started as (in case of client mode):

```
spark2-submit \
  --class org.apache.spark.examples.SparkPi \
  --conf "spark.yarn.blacklist.executor.launch.blacklisting.enabled=true" 
--conf "spark.blacklist.application.maxFailedExecutorsPerNode=2" --conf 
"spark.dynamicAllocation.enabled=true" --conf 
"spark.metrics.conf.*.sink.console.class=org.apache.spark.metrics.sink.ConsoleSink"
 \
  --master yarn \
  --deploy-mode client \
  original-spark-examples_2.11-2.4.0-SNAPSHOT.jar \
  1000
```

In both cases the YARN logs contained the new metrics as:

```
$ yarn logs --applicationId application_1529926424933_0015
...
-- Gauges ----------------------------------------------------------------------
application_1531751594108_0046.applicationMaster.numContainersPendingAllocate
             value = 0
application_1531751594108_0046.applicationMaster.numExecutorsFailed
             value = 3
application_1531751594108_0046.applicationMaster.numExecutorsRunning
             value = 9
application_1531751594108_0046.applicationMaster.numLocalityAwareTasks
             value = 0
application_1531751594108_0046.applicationMaster.numReleasedContainers
             value = 0
...

```

Author: “attilapiros” <piros.attila.zs...@gmail.com>
Author: Attila Zsolt Piros <2017933+attilapi...@users.noreply.github.com>

Closes #21635 from attilapiros/SPARK-24594.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d2436a85
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d2436a85
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d2436a85

Branch: refs/heads/master
Commit: d2436a85294a178398525c37833dae79d45c1452
Parents: cfc3e1a
Author: “attilapiros” <piros.attila.zs...@gmail.com>
Authored: Tue Jul 24 09:33:10 2018 +0800
Committer: jerryshao <ss...@hortonworks.com>
Committed: Tue Jul 24 09:33:10 2018 +0800

----------------------------------------------------------------------
 docs/monitoring.md                              |  1 +
 docs/running-on-yarn.md                         |  9 +++-
 .../spark/deploy/yarn/ApplicationMaster.scala   | 18 +++++++
 .../deploy/yarn/ApplicationMasterSource.scala   | 50 ++++++++++++++++++++
 .../spark/deploy/yarn/YarnAllocator.scala       |  8 +++-
 .../yarn/YarnAllocatorBlacklistTracker.scala    |  2 +-
 .../org/apache/spark/deploy/yarn/config.scala   |  5 ++
 7 files changed, 90 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d2436a85/docs/monitoring.md
----------------------------------------------------------------------
diff --git a/docs/monitoring.md b/docs/monitoring.md
index 6eaf331..2717dd0 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -435,6 +435,7 @@ set of sinks to which metrics are reported. The following 
instances are currentl
 * `executor`: A Spark executor.
 * `driver`: The Spark driver process (the process in which your SparkContext 
is created).
 * `shuffleService`: The Spark shuffle service.
+* `applicationMaster`: The Spark ApplicationMaster when running on YARN.
 
 Each instance can report to zero or more _sinks_. Sinks are contained in the
 `org.apache.spark.metrics.sink` package:

http://git-wip-us.apache.org/repos/asf/spark/blob/d2436a85/docs/running-on-yarn.md
----------------------------------------------------------------------
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 0b265b0..1c1f40c 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -421,7 +421,14 @@ To use a custom metrics.properties for the application 
master and executors, upd
   <code>spark.blacklist.application.maxFailedExecutorsPerNode</code>.
   </td>
 </tr>
-
+<tr>
+  <td><code>spark.yarn.metrics.namespace</code></td>
+  <td>(none)</td>
+  <td>
+  The root namespace for AM metrics reporting. 
+  If it is not set then the YARN application ID is used.
+  </td>
+</tr>
 </table>
 
 # Important notes

http://git-wip-us.apache.org/repos/asf/spark/blob/d2436a85/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index ecc5769..55ed114 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -43,6 +43,7 @@ import org.apache.spark.deploy.yarn.config._
 import org.apache.spark.deploy.yarn.security.AMCredentialRenewer
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
+import org.apache.spark.metrics.MetricsSystem
 import org.apache.spark.rpc._
 import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, 
YarnSchedulerBackend}
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
@@ -67,6 +68,8 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments) extends
 
   private val securityMgr = new SecurityManager(sparkConf)
 
+  private var metricsSystem: Option[MetricsSystem] = None
+
   // Set system properties for each config entry. This covers two use cases:
   // - The default configuration stored by the SparkHadoopUtil class
   // - The user application creating a new SparkConf in cluster mode
@@ -309,6 +312,16 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments) extends
         finish(FinalApplicationStatus.FAILED,
           ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION,
           "Uncaught exception: " + StringUtils.stringifyException(e))
+    } finally {
+      try {
+        metricsSystem.foreach { ms =>
+          ms.report()
+          ms.stop()
+        }
+      } catch {
+        case e: Exception =>
+          logWarning("Exception during stopping of the metric system: ", e)
+      }
     }
   }
 
@@ -434,6 +447,11 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments) extends
     rpcEnv.setupEndpoint("YarnAM", new AMEndpoint(rpcEnv, driverRef))
 
     allocator.allocateResources()
+    val ms = MetricsSystem.createMetricsSystem("applicationMaster", sparkConf, 
securityMgr)
+    val prefix = _sparkConf.get(YARN_METRICS_NAMESPACE).getOrElse(appId)
+    ms.registerSource(new ApplicationMasterSource(prefix, allocator))
+    ms.start()
+    metricsSystem = Some(ms)
     reporterThread = launchReporterThread()
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d2436a85/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterSource.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterSource.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterSource.scala
new file mode 100644
index 0000000..0fec916
--- /dev/null
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterSource.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import com.codahale.metrics.{Gauge, MetricRegistry}
+
+import org.apache.spark.metrics.source.Source
+
+private[spark] class ApplicationMasterSource(prefix: String, yarnAllocator: 
YarnAllocator)
+  extends Source {
+
+  override val sourceName: String = prefix + ".applicationMaster"
+  override val metricRegistry: MetricRegistry = new MetricRegistry()
+
+  metricRegistry.register(MetricRegistry.name("numExecutorsFailed"), new 
Gauge[Int] {
+    override def getValue: Int = yarnAllocator.getNumExecutorsFailed
+  })
+
+  metricRegistry.register(MetricRegistry.name("numExecutorsRunning"), new 
Gauge[Int] {
+    override def getValue: Int = yarnAllocator.getNumExecutorsRunning
+  })
+
+  metricRegistry.register(MetricRegistry.name("numReleasedContainers"), new 
Gauge[Int] {
+    override def getValue: Int = yarnAllocator.getNumReleasedContainers
+  })
+
+  metricRegistry.register(MetricRegistry.name("numLocalityAwareTasks"), new 
Gauge[Int] {
+    override def getValue: Int = yarnAllocator.numLocalityAwareTasks
+  })
+
+  metricRegistry.register(MetricRegistry.name("numContainersPendingAllocate"), 
new Gauge[Int] {
+    override def getValue: Int = yarnAllocator.numContainersPendingAllocate
+  })
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/d2436a85/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index fae054e..40f1222 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -150,7 +150,7 @@ private[yarn] class YarnAllocator(
   private var hostToLocalTaskCounts: Map[String, Int] = Map.empty
 
   // Number of tasks that have locality preferences in active stages
-  private var numLocalityAwareTasks: Int = 0
+  private[yarn] var numLocalityAwareTasks: Int = 0
 
   // A container placement strategy based on pending tasks' locality preference
   private[yarn] val containerPlacementStrategy =
@@ -158,6 +158,8 @@ private[yarn] class YarnAllocator(
 
   def getNumExecutorsRunning: Int = runningExecutors.size()
 
+  def getNumReleasedContainers: Int = releasedContainers.size()
+
   def getNumExecutorsFailed: Int = failureTracker.numFailedExecutors
 
   def isAllNodeBlacklisted: Boolean = 
allocatorBlacklistTracker.isAllNodeBlacklisted
@@ -167,6 +169,10 @@ private[yarn] class YarnAllocator(
    */
   def getPendingAllocate: Seq[ContainerRequest] = 
getPendingAtLocation(ANY_HOST)
 
+  def numContainersPendingAllocate: Int = synchronized {
+    getPendingAllocate.size
+  }
+
   /**
    * A sequence of pending container requests at the given location that have 
not yet been
    * fulfilled.

http://git-wip-us.apache.org/repos/asf/spark/blob/d2436a85/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala
index 1b48a0e..ceac7cd 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala
@@ -28,7 +28,7 @@ import org.apache.spark.deploy.yarn.config._
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
 import org.apache.spark.scheduler.BlacklistTracker
-import org.apache.spark.util.{Clock, SystemClock, Utils}
+import org.apache.spark.util.{Clock, SystemClock}
 
 /**
  * YarnAllocatorBlacklistTracker is responsible for tracking the blacklisted 
nodes

http://git-wip-us.apache.org/repos/asf/spark/blob/d2436a85/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
index 129084a..1013fd2 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
@@ -152,6 +152,11 @@ package object config {
     .timeConf(TimeUnit.MILLISECONDS)
     .createWithDefaultString("100s")
 
+  private[spark] val YARN_METRICS_NAMESPACE = 
ConfigBuilder("spark.yarn.metrics.namespace")
+    .doc("The root namespace for AM metrics reporting.")
+    .stringConf
+    .createOptional
+
   private[spark] val AM_NODE_LABEL_EXPRESSION = 
ConfigBuilder("spark.yarn.am.nodeLabelExpression")
     .doc("Node label expression for the AM.")
     .stringConf


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to