This is an automated email from the ASF dual-hosted git repository.

tgraves pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 9f065ff  [SPARK-34828][YARN] Make shuffle service name configurable on 
client side and allow for classpath-based config override on server side
9f065ff is described below

commit 9f065ff375bb2146f54dc1583b52beaf24064284
Author: Erik Krogen <xkro...@apache.org>
AuthorDate: Tue Mar 30 10:09:00 2021 -0500

    [SPARK-34828][YARN] Make shuffle service name configurable on client side 
and allow for classpath-based config override on server side
    
    ### What changes were proposed in this pull request?
    Add a new config, `spark.shuffle.service.name`, which allows for Spark 
applications to look for a YARN shuffle service which is defined at a name 
other than the default `spark_shuffle`.
    
    Add a new config, `spark.yarn.shuffle.service.metrics.namespace`, which 
allows for configuring the namespace used when emitting metrics from the 
shuffle service into the NodeManager's `metrics2` system.
    
    Add a new mechanism by which to override shuffle service configurations 
independently of the configurations in the NodeManager. When a resource 
`spark-shuffle-site.xml` is present on the classpath of the shuffle service, 
the configs present within it will be used to override the configs coming from 
`yarn-site.xml` (via the NodeManager).
    
    ### Why are the changes needed?
    There are two use cases which can benefit from these changes.
    
    One use case is to run multiple instances of the shuffle service 
side-by-side in the same NodeManager. This can be helpful, for example, when 
running a YARN cluster with a mixed workload of applications running multiple 
Spark versions, since a given version of the shuffle service is not always 
compatible with other versions of Spark (e.g. see SPARK-27780). With this PR, 
it is possible to run two shuffle services like `spark_shuffle` and 
`spark_shuffle_3.2.0`, one of which is "legacy"  [...]
    
    Besides this, the separation of shuffle service configs into 
`spark-shuffle-site.xml` can be useful for administrators who want to change 
and/or deploy Spark shuffle service configurations independently of the 
configurations for the NodeManager (e.g., perhaps they are owned by two 
different teams).
    
    ### Does this PR introduce _any_ user-facing change?
    Yes. There are two new configurations related to the external shuffle 
service, and a new mechanism which can optionally be used to configure the 
shuffle service. `docs/running-on-yarn.md` has been updated to provide user 
instructions; please see this guide for more details.
    
    ### How was this patch tested?
    In addition to the new unit tests added, I have deployed this to a live 
YARN cluster and successfully deployed two Spark shuffle services 
simultaneously, one running a modified version of Spark 2.3.0 (which supports 
some of the newer shuffle protocols) and one running Spark 3.1.1. Spark 
applications of both versions are able to communicate with their respective 
shuffle services without issue.
    
    Closes #31936 from 
xkrogen/xkrogen-SPARK-34828-shufflecompat-config-from-classpath.
    
    Authored-by: Erik Krogen <xkro...@apache.org>
    Signed-off-by: Thomas Graves <tgra...@apache.org>
---
 .../spark/network/yarn/YarnShuffleService.java     | 66 +++++++++++++++---
 .../network/yarn/YarnShuffleServiceMetrics.java    |  6 +-
 .../org/apache/spark/internal/config/package.scala | 10 +++
 docs/running-on-yarn.md                            | 71 +++++++++++++++++++
 .../spark/deploy/yarn/ExecutorRunnable.scala       |  4 +-
 .../yarn/YarnShuffleAlternateNameConfigSuite.scala | 79 ++++++++++++++++++++++
 .../network/yarn/YarnShuffleServiceSuite.scala     | 13 ++++
 .../spark/network/yarn/YarnTestAccessor.scala      |  3 +
 8 files changed, 240 insertions(+), 12 deletions(-)

diff --git 
a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
 
b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
index 548a5cc..cb6d5d0 100644
--- 
a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
+++ 
b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
@@ -19,6 +19,7 @@ package org.apache.spark.network.yarn;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.URL;
 import java.nio.charset.StandardCharsets;
 import java.nio.ByteBuffer;
 import java.util.List;
@@ -75,6 +76,20 @@ import 
org.apache.spark.network.yarn.util.HadoopConfigProvider;
  * is because an application running on the same Yarn cluster may choose to 
not use the external
  * shuffle service, in which case its setting of `spark.authenticate` should 
be independent of
  * the service's.
+ *
+ * The shuffle service will produce metrics via the YARN NodeManager's {@code 
metrics2} system
+ * under a namespace specified by the {@value 
SPARK_SHUFFLE_SERVICE_METRICS_NAMESPACE_KEY} config.
+ *
+ * By default, all configurations for the shuffle service will be taken 
directly from the
+ * Hadoop {@link Configuration} passed by the YARN NodeManager. It is also 
possible to configure
+ * the shuffle service by placing a resource named
+ * {@value SHUFFLE_SERVICE_CONF_OVERLAY_RESOURCE_NAME} into the classpath, 
which should be an
+ * XML file in the standard Hadoop Configuration resource format. Note that 
when the shuffle
+ * service is loaded in the default manner, without configuring
+ * {@code yarn.nodemanager.aux-services.<service>.classpath}, this file must 
be on the classpath
+ * of the NodeManager itself. When using the {@code classpath} configuration, 
it can be present
+ * either on the NodeManager's classpath, or specified in the classpath 
configuration.
+ * This {@code classpath} configuration is only supported on YARN versions >= 
2.9.0.
  */
 public class YarnShuffleService extends AuxiliaryService {
   private static final Logger logger = 
LoggerFactory.getLogger(YarnShuffleService.class);
@@ -83,6 +98,14 @@ public class YarnShuffleService extends AuxiliaryService {
   private static final String SPARK_SHUFFLE_SERVICE_PORT_KEY = 
"spark.shuffle.service.port";
   private static final int DEFAULT_SPARK_SHUFFLE_SERVICE_PORT = 7337;
 
+  /**
+   * The namespace to use for the metrics record which will contain all 
metrics produced by the
+   * shuffle service.
+   */
+  static final String SPARK_SHUFFLE_SERVICE_METRICS_NAMESPACE_KEY =
+      "spark.yarn.shuffle.service.metrics.namespace";
+  private static final String DEFAULT_SPARK_SHUFFLE_SERVICE_METRICS_NAME = 
"sparkShuffleService";
+
   // Whether the shuffle server should authenticate fetch requests
   private static final String SPARK_AUTHENTICATE_KEY = "spark.authenticate";
   private static final boolean DEFAULT_SPARK_AUTHENTICATE = false;
@@ -103,6 +126,13 @@ public class YarnShuffleService extends AuxiliaryService {
   private static final LevelDBProvider.StoreVersion CURRENT_VERSION = new 
LevelDBProvider
       .StoreVersion(1, 0);
 
+  /**
+   * The name of the resource to search for on the classpath to find a shuffle 
service-specific
+   * configuration overlay. If found, this will be parsed as a standard Hadoop
+   * {@link Configuration config} file and will override the configs passed 
from the NodeManager.
+   */
+  static final String SHUFFLE_SERVICE_CONF_OVERLAY_RESOURCE_NAME = 
"spark-shuffle-site.xml";
+
   // just for integration tests that want to look at this file -- in general 
not sensible as
   // a static
   @VisibleForTesting
@@ -139,6 +169,13 @@ public class YarnShuffleService extends AuxiliaryService {
   private DB db;
 
   public YarnShuffleService() {
+    // The name of the auxiliary service configured within the NodeManager
+    // (`yarn.nodemanager.aux-services`) is treated as the source-of-truth, so 
this one can be
+    // arbitrary. The NodeManager will log a warning if the configured name 
doesn't match this name,
+    // to inform operators of a potential misconfiguration, but this name is 
otherwise not used.
+    // It is hard-coded instead of using the value of the 
`spark.shuffle.service.name` configuration
+    // because at this point in instantiation there is no Configuration 
object; it is not passed
+    // until `serviceInit` is called, at which point it's too late to adjust 
the name.
     super("spark_shuffle");
     logger.info("Initializing YARN shuffle service for Spark");
     instance = this;
@@ -157,10 +194,18 @@ public class YarnShuffleService extends AuxiliaryService {
    * Start the shuffle server with the given configuration.
    */
   @Override
-  protected void serviceInit(Configuration conf) throws Exception {
-    _conf = conf;
+  protected void serviceInit(Configuration externalConf) throws Exception {
+    _conf = new Configuration(externalConf);
+    URL confOverlayUrl = Thread.currentThread().getContextClassLoader()
+        .getResource(SHUFFLE_SERVICE_CONF_OVERLAY_RESOURCE_NAME);
+    if (confOverlayUrl != null) {
+      logger.info("Initializing Spark YARN shuffle service with configuration 
overlay from {}",
+          confOverlayUrl);
+      _conf.addResource(confOverlayUrl);
+    }
+    super.serviceInit(_conf);
 
-    boolean stopOnFailure = conf.getBoolean(STOP_ON_FAILURE_KEY, 
DEFAULT_STOP_ON_FAILURE);
+    boolean stopOnFailure = _conf.getBoolean(STOP_ON_FAILURE_KEY, 
DEFAULT_STOP_ON_FAILURE);
 
     try {
       // In case this NM was killed while there were running spark 
applications, we need to restore
@@ -172,7 +217,7 @@ public class YarnShuffleService extends AuxiliaryService {
         registeredExecutorFile = initRecoveryDb(RECOVERY_FILE_NAME);
       }
 
-      TransportConf transportConf = new TransportConf("shuffle", new 
HadoopConfigProvider(conf));
+      TransportConf transportConf = new TransportConf("shuffle", new 
HadoopConfigProvider(_conf));
       MergedShuffleFileManager shuffleMergeManager = 
newMergedShuffleFileManagerInstance(
         transportConf);
       blockHandler = new ExternalBlockHandler(
@@ -181,7 +226,7 @@ public class YarnShuffleService extends AuxiliaryService {
       // If authentication is enabled, set up the shuffle server to use a
       // special RPC handler that filters out unauthenticated fetch requests
       List<TransportServerBootstrap> bootstraps = Lists.newArrayList();
-      boolean authEnabled = conf.getBoolean(SPARK_AUTHENTICATE_KEY, 
DEFAULT_SPARK_AUTHENTICATE);
+      boolean authEnabled = _conf.getBoolean(SPARK_AUTHENTICATE_KEY, 
DEFAULT_SPARK_AUTHENTICATE);
       if (authEnabled) {
         secretManager = new ShuffleSecretManager();
         if (_recoveryPath != null) {
@@ -190,7 +235,7 @@ public class YarnShuffleService extends AuxiliaryService {
         bootstraps.add(new AuthServerBootstrap(transportConf, secretManager));
       }
 
-      int port = conf.getInt(
+      int port = _conf.getInt(
         SPARK_SHUFFLE_SERVICE_PORT_KEY, DEFAULT_SPARK_SHUFFLE_SERVICE_PORT);
       transportContext = new TransportContext(transportConf, blockHandler, 
true);
       shuffleServer = transportContext.createServer(port, bootstraps);
@@ -203,13 +248,16 @@ public class YarnShuffleService extends AuxiliaryService {
       blockHandler.getAllMetrics().getMetrics().put("numRegisteredConnections",
           shuffleServer.getRegisteredConnections());
       
blockHandler.getAllMetrics().getMetrics().putAll(shuffleServer.getAllMetrics().getMetrics());
+      String metricsNamespace = 
_conf.get(SPARK_SHUFFLE_SERVICE_METRICS_NAMESPACE_KEY,
+          DEFAULT_SPARK_SHUFFLE_SERVICE_METRICS_NAME);
       YarnShuffleServiceMetrics serviceMetrics =
-          new YarnShuffleServiceMetrics(blockHandler.getAllMetrics());
+          new YarnShuffleServiceMetrics(metricsNamespace, 
blockHandler.getAllMetrics());
 
       MetricsSystemImpl metricsSystem = (MetricsSystemImpl) 
DefaultMetricsSystem.instance();
       metricsSystem.register(
-          "sparkShuffleService", "Metrics on the Spark Shuffle Service", 
serviceMetrics);
-      logger.info("Registered metrics with Hadoop's DefaultMetricsSystem");
+          metricsNamespace, "Metrics on the Spark Shuffle Service", 
serviceMetrics);
+      logger.info("Registered metrics with Hadoop's DefaultMetricsSystem using 
namespace '{}'",
+          metricsNamespace);
 
       logger.info("Started YARN shuffle service for Spark on port {}. " +
         "Authentication is {}.  Registered executor file is {}", port, 
authEnabledString,
diff --git 
a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java
 
b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java
index 81be6e8..f30abbd 100644
--- 
a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java
+++ 
b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java
@@ -32,9 +32,11 @@ import org.apache.hadoop.metrics2.MetricsSource;
  */
 class YarnShuffleServiceMetrics implements MetricsSource {
 
+  private final String metricsNamespace;
   private final MetricSet metricSet;
 
-  YarnShuffleServiceMetrics(MetricSet metricSet) {
+  YarnShuffleServiceMetrics(String metricsNamespace, MetricSet metricSet) {
+    this.metricsNamespace = metricsNamespace;
     this.metricSet = metricSet;
   }
 
@@ -46,7 +48,7 @@ class YarnShuffleServiceMetrics implements MetricsSource {
    */
   @Override
   public void getMetrics(MetricsCollector collector, boolean all) {
-    MetricsRecordBuilder metricsRecordBuilder = 
collector.addRecord("sparkShuffleService");
+    MetricsRecordBuilder metricsRecordBuilder = 
collector.addRecord(metricsNamespace);
 
     for (Map.Entry<String, Metric> entry : metricSet.getMetrics().entrySet()) {
       collectMetric(metricsRecordBuilder, entry.getKey(), entry.getValue());
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index d988e52..1a18856 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -680,6 +680,16 @@ package object config {
   private[spark] val SHUFFLE_SERVICE_PORT =
     
ConfigBuilder("spark.shuffle.service.port").version("1.2.0").intConf.createWithDefault(7337)
 
+  private[spark] val SHUFFLE_SERVICE_NAME =
+    ConfigBuilder("spark.shuffle.service.name")
+      .doc("The configured name of the Spark shuffle service the client should 
communicate with. " +
+        "This must match the name used to configure the Shuffle within the 
YARN NodeManager " +
+        "configuration (`yarn.nodemanager.aux-services`). Only takes effect 
when " +
+        s"$SHUFFLE_SERVICE_ENABLED is set to true.")
+      .version("3.2.0")
+      .stringConf
+      .createWithDefault("spark_shuffle")
+
   private[spark] val KEYTAB = ConfigBuilder("spark.kerberos.keytab")
     .doc("Location of user's keytab.")
     .version("3.0.0")
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 72df64b..73bb76a 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -773,8 +773,28 @@ The following extra configuration options are available 
when the shuffle service
     NodeManagers where the Spark Shuffle Service is not running.
   </td>
 </tr>
+<tr>
+  <td><code>spark.yarn.shuffle.service.metrics.namespace</code></td>
+  <td><code>sparkShuffleService</code></td>
+  <td>
+    The namespace to use when emitting shuffle service metrics into Hadoop 
metrics2 system of the
+    NodeManager.
+  </td>
+</tr>
 </table>
 
+Please note that the instructions above assume that the default shuffle 
service name,
+`spark_shuffle`, has been used. It is possible to use any name here, but the 
values used in the
+YARN NodeManager configurations must match the value of 
`spark.shuffle.service.name` in the
+Spark application.
+
+The shuffle service will, by default, take all of its configurations from the 
Hadoop Configuration
+used by the NodeManager (e.g. `yarn-site.xml`). However, it is also possible 
to configure the
+shuffle service independently using a file named `spark-shuffle-site.xml` 
which should be placed
+onto the classpath of the shuffle service (which is, by default, shared with 
the classpath of the
+NodeManager). The shuffle service will treat this as a standard Hadoop 
Configuration resource and
+overlay it on top of the NodeManager's configuration.
+
 # Launching your application with Apache Oozie
 
 Apache Oozie can launch Spark applications as part of a workflow.
@@ -823,3 +843,54 @@ do the following:
   to the list of filters in the <code>spark.ui.filters</code> configuration.
 
 Be aware that the history server information may not be up-to-date with the 
application's state.
+
+# Running multiple versions of the Spark Shuffle Service
+
+Please note that this section only applies when running on YARN versions >= 
2.9.0.
+
+In some cases it may be desirable to run multiple instances of the Spark 
Shuffle Service which are
+using different versions of Spark. This can be helpful, for example, when 
running a YARN cluster
+with a mixed workload of applications running multiple Spark versions, since a 
given version of
+the shuffle service is not always compatible with other versions of Spark. 
YARN versions since 2.9.0
+support the ability to run shuffle services within an isolated classloader
+(see [YARN-4577](https://issues.apache.org/jira/browse/YARN-4577)), meaning 
multiple Spark versions
+can coexist within a single NodeManager. The
+`yarn.nodemanager.aux-services.<service-name>.classpath` and, starting from 
YARN 2.10.2/3.1.1/3.2.0,
+`yarn.nodemanager.aux-services.<service-name>.remote-classpath` options can be 
used to configure
+this. In addition to setting up separate classpaths, it's necessary to ensure 
the two versions
+advertise to different ports. This can be achieved using the 
`spark-shuffle-site.xml` file described
+above. For example, you may have configuration like:
+
+```properties
+  yarn.nodemanager.aux-services = spark_shuffle_x,spark_shuffle_y
+  yarn.nodemanager.aux-services.spark_shuffle_x.classpath = 
/path/to/spark-x-yarn-shuffle.jar,/path/to/spark-x-config
+  yarn.nodemanager.aux-services.spark_shuffle_y.classpath = 
/path/to/spark-y-yarn-shuffle.jar,/path/to/spark-y-config
+```
+
+The two `spark-*-config` directories each contain one file, 
`spark-shuffle-site.xml`. These are XML
+files in the [Hadoop Configuration 
format](https://hadoop.apache.org/docs/r3.2.2/api/org/apache/hadoop/conf/Configuration.html)
+which each contain a few configurations to adjust the port number and metrics 
name prefix used:
+```xml
+<configuration>
+  <property>
+    <name>spark.shuffle.service.port</name>
+    <value>7001</value>
+  </property>
+  <property>
+    <name>spark.yarn.shuffle.service.metrics.namespace</name>
+    <value>sparkShuffleServiceX</value>
+  </property>
+</configuration>
+```
+The values should both be different for the two different services.
+
+Then, in the configuration of the Spark applications, one should be configured 
with:
+```properties
+  spark.shuffle.service.name = spark_shuffle_x
+  spark.shuffle.service.port = 7001
+```
+and one should be configured with:
+```properties
+  spark.shuffle.service.name = spark_shuffle_y
+  spark.shuffle.service.port = <other value>
+```
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index ede3906..717ce57 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -115,7 +115,9 @@ private[yarn] class ExecutorRunnable(
           // Authentication is not enabled, so just provide dummy metadata
           ByteBuffer.allocate(0)
         }
-      ctx.setServiceData(Collections.singletonMap("spark_shuffle", 
secretBytes))
+      val serviceName = sparkConf.get(SHUFFLE_SERVICE_NAME)
+      logInfo(s"Initializing service data for shuffle service using name 
'$serviceName'")
+      ctx.setServiceData(Collections.singletonMap(serviceName, secretBytes))
     }
 
     // Send the start request to the ContainerManager
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleAlternateNameConfigSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleAlternateNameConfigSuite.scala
new file mode 100644
index 0000000..db001a9
--- /dev/null
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleAlternateNameConfigSuite.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.net.URLClassLoader
+
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+
+import org.apache.spark._
+import org.apache.spark.internal.config._
+import org.apache.spark.network.yarn.{YarnShuffleService, YarnTestAccessor}
+import org.apache.spark.tags.ExtendedYarnTest
+
+/**
+ * SPARK-34828: Integration test for the external shuffle service with an 
alternate name and
+ * configs (by using a configuration overlay)
+ */
+@ExtendedYarnTest
+class YarnShuffleAlternateNameConfigSuite extends YarnShuffleIntegrationSuite {
+
+  private[this] val shuffleServiceName = "custom_shuffle_service_name"
+
+  override def newYarnConfig(): YarnConfiguration = {
+    val yarnConfig = super.newYarnConfig()
+    yarnConfig.set(YarnConfiguration.NM_AUX_SERVICES, shuffleServiceName)
+    
yarnConfig.set(YarnConfiguration.NM_AUX_SERVICE_FMT.format(shuffleServiceName),
+      classOf[YarnShuffleService].getCanonicalName)
+    val overlayConf = new YarnConfiguration()
+    // Enable authentication in the base NodeManager conf but not in the 
client. This would break
+    // shuffle, unless the shuffle service conf overlay overrides to turn off 
authentication.
+    overlayConf.setBoolean(NETWORK_AUTH_ENABLED.key, true)
+    // Add the authentication conf to a separate config object used as an 
overlay rather than
+    // setting it directly. This is necessary because a config overlay will 
override previous
+    // config overlays, but not configs which were set directly on the config 
object.
+    yarnConfig.addResource(overlayConf)
+    yarnConfig
+  }
+
+  override protected def extraSparkConf(): Map[String, String] =
+    super.extraSparkConf() ++ Map(SHUFFLE_SERVICE_NAME.key -> 
shuffleServiceName)
+
+  override def beforeAll(): Unit = {
+    val configFileContent =
+      s"""<?xml version="1.0" encoding="UTF-8"?>
+         |<configuration>
+         |  <property>
+         |    <name>${NETWORK_AUTH_ENABLED.key}</name>
+         |    <value>false</value>
+         |  </property>
+         |</configuration>
+         |""".stripMargin
+    val jarFile = TestUtils.createJarWithFiles(Map(
+      YarnTestAccessor.getShuffleServiceConfOverlayResourceName -> 
configFileContent
+    ))
+    // Configure a custom classloader which includes the conf overlay as a 
resource
+    val oldClassLoader = Thread.currentThread().getContextClassLoader
+    Thread.currentThread().setContextClassLoader(new 
URLClassLoader(Array(jarFile)))
+    try {
+      super.beforeAll()
+    } finally {
+      Thread.currentThread().setContextClassLoader(oldClassLoader)
+    }
+  }
+}
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
index 188a485..d6d1715 100644
--- 
a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
@@ -56,6 +56,9 @@ class YarnShuffleServiceSuite extends SparkFunSuite with 
Matchers with BeforeAnd
 
   override def beforeEach(): Unit = {
     super.beforeEach()
+    // Ensure that each test uses a fresh metrics system
+    DefaultMetricsSystem.shutdown()
+    DefaultMetricsSystem.setInstance(new MetricsSystemImpl())
     yarnConfig = new YarnConfiguration()
     yarnConfig.set(YarnConfiguration.NM_AUX_SERVICES, "spark_shuffle")
     
yarnConfig.set(YarnConfiguration.NM_AUX_SERVICE_FMT.format("spark_shuffle"),
@@ -413,6 +416,16 @@ class YarnShuffleServiceSuite extends SparkFunSuite with 
Matchers with BeforeAnd
     ))
   }
 
+  test("SPARK-34828: metrics should be registered with configured name") {
+    s1 = new YarnShuffleService
+    
yarnConfig.set(YarnShuffleService.SPARK_SHUFFLE_SERVICE_METRICS_NAMESPACE_KEY, 
"fooMetrics")
+    s1.init(yarnConfig)
+
+    assert(DefaultMetricsSystem.instance.getSource("sparkShuffleService") === 
null)
+    assert(DefaultMetricsSystem.instance.getSource("fooMetrics")
+        .isInstanceOf[YarnShuffleServiceMetrics])
+  }
+
   test("create default merged shuffle file manager instance") {
     val mockConf = mock(classOf[TransportConf])
     when(mockConf.mergedShuffleFileManagerImpl).thenReturn(
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnTestAccessor.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnTestAccessor.scala
index db322cd..d87cc26 100644
--- 
a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnTestAccessor.scala
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnTestAccessor.scala
@@ -34,4 +34,7 @@ object YarnTestAccessor {
     service.registeredExecutorFile
   }
 
+  def getShuffleServiceConfOverlayResourceName: String = {
+    YarnShuffleService.SHUFFLE_SERVICE_CONF_OVERLAY_RESOURCE_NAME
+  }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to