Repository: spark Updated Branches: refs/heads/master cfc3e1aaa -> d2436a852
[SPARK-24594][YARN] Introducing metrics for YARN ## What changes were proposed in this pull request? In this PR metrics are introduced for YARN. As up to now there was no metrics in the YARN module a new metric system is created with the name "applicationMaster". To support both client and cluster mode the metric system lifecycle is bound to the AM. ## How was this patch tested? Both client and cluster mode was tested manually. Before the test on one of the YARN node spark-core was removed to cause the allocation failure. Spark was started as (in case of client mode): ``` spark2-submit \ --class org.apache.spark.examples.SparkPi \ --conf "spark.yarn.blacklist.executor.launch.blacklisting.enabled=true" --conf "spark.blacklist.application.maxFailedExecutorsPerNode=2" --conf "spark.dynamicAllocation.enabled=true" --conf "spark.metrics.conf.*.sink.console.class=org.apache.spark.metrics.sink.ConsoleSink" \ --master yarn \ --deploy-mode client \ original-spark-examples_2.11-2.4.0-SNAPSHOT.jar \ 1000 ``` In both cases the YARN logs contained the new metrics as: ``` $ yarn logs --applicationId application_1529926424933_0015 ... -- Gauges ---------------------------------------------------------------------- application_1531751594108_0046.applicationMaster.numContainersPendingAllocate value = 0 application_1531751594108_0046.applicationMaster.numExecutorsFailed value = 3 application_1531751594108_0046.applicationMaster.numExecutorsRunning value = 9 application_1531751594108_0046.applicationMaster.numLocalityAwareTasks value = 0 application_1531751594108_0046.applicationMaster.numReleasedContainers value = 0 ... ``` Author: âattilapirosâ <piros.attila.zs...@gmail.com> Author: Attila Zsolt Piros <2017933+attilapi...@users.noreply.github.com> Closes #21635 from attilapiros/SPARK-24594. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d2436a85 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d2436a85 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d2436a85 Branch: refs/heads/master Commit: d2436a85294a178398525c37833dae79d45c1452 Parents: cfc3e1a Author: âattilapirosâ <piros.attila.zs...@gmail.com> Authored: Tue Jul 24 09:33:10 2018 +0800 Committer: jerryshao <ss...@hortonworks.com> Committed: Tue Jul 24 09:33:10 2018 +0800 ---------------------------------------------------------------------- docs/monitoring.md | 1 + docs/running-on-yarn.md | 9 +++- .../spark/deploy/yarn/ApplicationMaster.scala | 18 +++++++ .../deploy/yarn/ApplicationMasterSource.scala | 50 ++++++++++++++++++++ .../spark/deploy/yarn/YarnAllocator.scala | 8 +++- .../yarn/YarnAllocatorBlacklistTracker.scala | 2 +- .../org/apache/spark/deploy/yarn/config.scala | 5 ++ 7 files changed, 90 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/d2436a85/docs/monitoring.md ---------------------------------------------------------------------- diff --git a/docs/monitoring.md b/docs/monitoring.md index 6eaf331..2717dd0 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -435,6 +435,7 @@ set of sinks to which metrics are reported. The following instances are currentl * `executor`: A Spark executor. * `driver`: The Spark driver process (the process in which your SparkContext is created). * `shuffleService`: The Spark shuffle service. +* `applicationMaster`: The Spark ApplicationMaster when running on YARN. Each instance can report to zero or more _sinks_. Sinks are contained in the `org.apache.spark.metrics.sink` package: http://git-wip-us.apache.org/repos/asf/spark/blob/d2436a85/docs/running-on-yarn.md ---------------------------------------------------------------------- diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index 0b265b0..1c1f40c 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -421,7 +421,14 @@ To use a custom metrics.properties for the application master and executors, upd <code>spark.blacklist.application.maxFailedExecutorsPerNode</code>. </td> </tr> - +<tr> + <td><code>spark.yarn.metrics.namespace</code></td> + <td>(none)</td> + <td> + The root namespace for AM metrics reporting. + If it is not set then the YARN application ID is used. + </td> +</tr> </table> # Important notes http://git-wip-us.apache.org/repos/asf/spark/blob/d2436a85/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---------------------------------------------------------------------- diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index ecc5769..55ed114 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -43,6 +43,7 @@ import org.apache.spark.deploy.yarn.config._ import org.apache.spark.deploy.yarn.security.AMCredentialRenewer import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ +import org.apache.spark.metrics.MetricsSystem import org.apache.spark.rpc._ import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, YarnSchedulerBackend} import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ @@ -67,6 +68,8 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends private val securityMgr = new SecurityManager(sparkConf) + private var metricsSystem: Option[MetricsSystem] = None + // Set system properties for each config entry. This covers two use cases: // - The default configuration stored by the SparkHadoopUtil class // - The user application creating a new SparkConf in cluster mode @@ -309,6 +312,16 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION, "Uncaught exception: " + StringUtils.stringifyException(e)) + } finally { + try { + metricsSystem.foreach { ms => + ms.report() + ms.stop() + } + } catch { + case e: Exception => + logWarning("Exception during stopping of the metric system: ", e) + } } } @@ -434,6 +447,11 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends rpcEnv.setupEndpoint("YarnAM", new AMEndpoint(rpcEnv, driverRef)) allocator.allocateResources() + val ms = MetricsSystem.createMetricsSystem("applicationMaster", sparkConf, securityMgr) + val prefix = _sparkConf.get(YARN_METRICS_NAMESPACE).getOrElse(appId) + ms.registerSource(new ApplicationMasterSource(prefix, allocator)) + ms.start() + metricsSystem = Some(ms) reporterThread = launchReporterThread() } http://git-wip-us.apache.org/repos/asf/spark/blob/d2436a85/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterSource.scala ---------------------------------------------------------------------- diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterSource.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterSource.scala new file mode 100644 index 0000000..0fec916 --- /dev/null +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterSource.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import com.codahale.metrics.{Gauge, MetricRegistry} + +import org.apache.spark.metrics.source.Source + +private[spark] class ApplicationMasterSource(prefix: String, yarnAllocator: YarnAllocator) + extends Source { + + override val sourceName: String = prefix + ".applicationMaster" + override val metricRegistry: MetricRegistry = new MetricRegistry() + + metricRegistry.register(MetricRegistry.name("numExecutorsFailed"), new Gauge[Int] { + override def getValue: Int = yarnAllocator.getNumExecutorsFailed + }) + + metricRegistry.register(MetricRegistry.name("numExecutorsRunning"), new Gauge[Int] { + override def getValue: Int = yarnAllocator.getNumExecutorsRunning + }) + + metricRegistry.register(MetricRegistry.name("numReleasedContainers"), new Gauge[Int] { + override def getValue: Int = yarnAllocator.getNumReleasedContainers + }) + + metricRegistry.register(MetricRegistry.name("numLocalityAwareTasks"), new Gauge[Int] { + override def getValue: Int = yarnAllocator.numLocalityAwareTasks + }) + + metricRegistry.register(MetricRegistry.name("numContainersPendingAllocate"), new Gauge[Int] { + override def getValue: Int = yarnAllocator.numContainersPendingAllocate + }) + +} http://git-wip-us.apache.org/repos/asf/spark/blob/d2436a85/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala ---------------------------------------------------------------------- diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index fae054e..40f1222 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -150,7 +150,7 @@ private[yarn] class YarnAllocator( private var hostToLocalTaskCounts: Map[String, Int] = Map.empty // Number of tasks that have locality preferences in active stages - private var numLocalityAwareTasks: Int = 0 + private[yarn] var numLocalityAwareTasks: Int = 0 // A container placement strategy based on pending tasks' locality preference private[yarn] val containerPlacementStrategy = @@ -158,6 +158,8 @@ private[yarn] class YarnAllocator( def getNumExecutorsRunning: Int = runningExecutors.size() + def getNumReleasedContainers: Int = releasedContainers.size() + def getNumExecutorsFailed: Int = failureTracker.numFailedExecutors def isAllNodeBlacklisted: Boolean = allocatorBlacklistTracker.isAllNodeBlacklisted @@ -167,6 +169,10 @@ private[yarn] class YarnAllocator( */ def getPendingAllocate: Seq[ContainerRequest] = getPendingAtLocation(ANY_HOST) + def numContainersPendingAllocate: Int = synchronized { + getPendingAllocate.size + } + /** * A sequence of pending container requests at the given location that have not yet been * fulfilled. http://git-wip-us.apache.org/repos/asf/spark/blob/d2436a85/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala ---------------------------------------------------------------------- diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala index 1b48a0e..ceac7cd 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala @@ -28,7 +28,7 @@ import org.apache.spark.deploy.yarn.config._ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.scheduler.BlacklistTracker -import org.apache.spark.util.{Clock, SystemClock, Utils} +import org.apache.spark.util.{Clock, SystemClock} /** * YarnAllocatorBlacklistTracker is responsible for tracking the blacklisted nodes http://git-wip-us.apache.org/repos/asf/spark/blob/d2436a85/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala ---------------------------------------------------------------------- diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala index 129084a..1013fd2 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala @@ -152,6 +152,11 @@ package object config { .timeConf(TimeUnit.MILLISECONDS) .createWithDefaultString("100s") + private[spark] val YARN_METRICS_NAMESPACE = ConfigBuilder("spark.yarn.metrics.namespace") + .doc("The root namespace for AM metrics reporting.") + .stringConf + .createOptional + private[spark] val AM_NODE_LABEL_EXPRESSION = ConfigBuilder("spark.yarn.am.nodeLabelExpression") .doc("Node label expression for the AM.") .stringConf --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org