This is an automated email from the ASF dual-hosted git repository. tgraves pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new e6699570bec [SPARK-40490][YARN][TESTS] Ensure `YarnShuffleIntegrationSuite` tests registeredExecFile reload scenarios e6699570bec is described below commit e6699570becadb91695572bca5adc1605dc1b2a8 Author: yangjie01 <yangji...@baidu.com> AuthorDate: Wed Sep 21 08:05:17 2022 -0500 [SPARK-40490][YARN][TESTS] Ensure `YarnShuffleIntegrationSuite` tests registeredExecFile reload scenarios ### What changes were proposed in this pull request? After SPARK-17321, `YarnShuffleService` will persist data to local shuffle state db/reload data from local shuffle state db only when Yarn NodeManager start with `YarnConfiguration#NM_RECOVERY_ENABLED = true`. `YarnShuffleIntegrationSuite` not set `YarnConfiguration#NM_RECOVERY_ENABLED` and the default value of the configuration is false, so `YarnShuffleIntegrationSuite` will neither trigger data persistence to the db nor verify the reload of data. This pr aims to let `YarnShuffleIntegrationSuite` restart the verification of registeredExecFile reload scenarios, to achieve this goal, this pr make the following changes: 1. Add a new un-document configuration `spark.yarn.shuffle.testing` to `YarnShuffleService`, and Initialize `_recoveryPath` when `_recoveryPath == null && spark.yarn.shuffle.testing == true`. 2. Only set `spark.yarn.shuffle.testing = true` in `YarnShuffleIntegrationSuite`, and add assertions to check `registeredExecFile` is not null to ensure that registeredExecFile reload scenarios will be verified. ### Why are the changes needed? Fix registeredExecFile reload test scenarios. Why not test by configuring `YarnConfiguration#NM_RECOVERY_ENABLED` as true? This configuration has been tried **Hadoop 3.3.4** ``` build/mvn clean install -pl resource-managers/yarn -Pyarn -Dtest=none -DwildcardSuites=org.apache.spark.deploy.yarn.YarnShuffleIntegrationWithLevelDBBackendSuite -Phadoop-3 ``` ``` 2022-09-10T11:44:42.1710230Z Cause: java.lang.ClassNotFoundException: org.apache.hadoop.shaded.org.iq80.leveldb.DBException 2022-09-10T11:44:42.1715234Z at java.net.URLClassLoader.findClass(URLClassLoader.java:387) 2022-09-10T11:44:42.1719347Z at java.lang.ClassLoader.loadClass(ClassLoader.java:419) 2022-09-10T11:44:42.1723090Z at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352) 2022-09-10T11:44:42.1726759Z at java.lang.ClassLoader.loadClass(ClassLoader.java:352) 2022-09-10T11:44:42.1731028Z at org.apache.hadoop.yarn.server.nodemanager.NodeManager.initAndStartRecoveryStore(NodeManager.java:313) 2022-09-10T11:44:42.1735424Z at org.apache.hadoop.yarn.server.nodemanager.NodeManager.serviceInit(NodeManager.java:370) 2022-09-10T11:44:42.1740303Z at org.apache.hadoop.service.AbstractService.init(AbstractService.java:164) 2022-09-10T11:44:42.1745576Z at org.apache.hadoop.yarn.server.MiniYARNCluster$NodeManagerWrapper.serviceInit(MiniYARNCluster.java:597) 2022-09-10T11:44:42.1828858Z at org.apache.hadoop.service.AbstractService.init(AbstractService.java:164) 2022-09-10T11:44:42.1829712Z at org.apache.hadoop.service.CompositeService.serviceInit(CompositeService.java:109) 2022-09-10T11:44:42.1830633Z at org.apache.hadoop.yarn.server.MiniYARNCluster.serviceInit(MiniYARNCluster.java:327) 2022-09-10T11:44:42.1831431Z at org.apache.hadoop.service.AbstractService.init(AbstractService.java:164) 2022-09-10T11:44:42.1832279Z at org.apache.spark.deploy.yarn.BaseYarnClusterSuite.beforeAll(BaseYarnClusterSuite.scala:112) ``` **Hadoop 2.7.4** ``` build/mvn clean install -pl resource-managers/yarn -Pyarn -Dtest=none -DwildcardSuites=org.apache.spark.deploy.yarn.YarnShuffleIntegrationWithLevelDBBackendSuite -Phadoop-2 ``` ``` YarnShuffleIntegrationWithLevelDBBackendSuite: org.apache.spark.deploy.yarn.YarnShuffleIntegrationWithLevelDBBackendSuite *** ABORTED *** java.lang.IllegalArgumentException: Cannot support recovery with an ephemeral server port. Check the setting of yarn.nodemanager.address at org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl.serviceStart(ContainerManagerImpl.java:395) at org.apache.hadoop.service.AbstractService.start(AbstractService.java:193) at org.apache.hadoop.service.CompositeService.serviceStart(CompositeService.java:120) at org.apache.hadoop.yarn.server.nodemanager.NodeManager.serviceStart(NodeManager.java:272) at org.apache.hadoop.service.AbstractService.start(AbstractService.java:193) at org.apache.hadoop.yarn.server.MiniYARNCluster$NodeManagerWrapper.serviceStart(MiniYARNCluster.java:560) at org.apache.hadoop.service.AbstractService.start(AbstractService.java:193) at org.apache.hadoop.service.CompositeService.serviceStart(CompositeService.java:120) at org.apache.hadoop.yarn.server.MiniYARNCluster.serviceStart(MiniYARNCluster.java:278) at org.apache.hadoop.service.AbstractService.start(AbstractService.java:193) ... Run completed in 3 seconds, 992 milliseconds. Total number of tests run: 0 Suites: completed 1, aborted 1 Tests: succeeded 0, failed 0, canceled 0, ignored 0, pending 0 *** 1 SUITE ABORTED *** ``` From the above test, we need to use a fixed port to enable Yarn NodeManager recovery, but this is difficult to be guaranteed in UT, so this pr try a workaround way. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass GitHub Actions Closes #37938 from LuciferYang/yarnshuffleservice-it. Lead-authored-by: yangjie01 <yangji...@baidu.com> Co-authored-by: YangJie <yangji...@baidu.com> Signed-off-by: Thomas Graves <tgra...@apache.org> --- .../org/apache/spark/network/yarn/YarnShuffleService.java | 9 +++++++++ .../spark/deploy/yarn/YarnShuffleIntegrationSuite.scala | 12 ++++-------- .../org/apache/spark/network/yarn/YarnTestAccessor.scala | 4 ++++ 3 files changed, 17 insertions(+), 8 deletions(-) diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java index 58f6b6500f6..bde358a638a 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java @@ -60,6 +60,7 @@ import org.apache.spark.network.sasl.ShuffleSecretManager; import org.apache.spark.network.server.TransportServer; import org.apache.spark.network.server.TransportServerBootstrap; import org.apache.spark.network.shuffle.ExternalBlockHandler; +import org.apache.spark.network.util.JavaUtils; import org.apache.spark.network.util.TransportConf; import org.apache.spark.network.yarn.util.HadoopConfigProvider; @@ -129,6 +130,10 @@ public class YarnShuffleService extends AuxiliaryService { // Whether failure during service initialization should stop the NM. @VisibleForTesting static final String STOP_ON_FAILURE_KEY = "spark.yarn.shuffle.stopOnFailure"; + + @VisibleForTesting + static final String INTEGRATION_TESTING = "spark.yarn.shuffle.testing"; + private static final boolean DEFAULT_STOP_ON_FAILURE = false; // just for testing when you want to find an open port @@ -237,6 +242,10 @@ public class YarnShuffleService extends AuxiliaryService { boolean stopOnFailure = _conf.getBoolean(STOP_ON_FAILURE_KEY, DEFAULT_STOP_ON_FAILURE); + if (_recoveryPath == null && _conf.getBoolean(INTEGRATION_TESTING, false)) { + _recoveryPath = new Path(JavaUtils.createTempDir().toURI()); + } + if (_recoveryPath != null) { String dbBackendName = _conf.get(Constants.SHUFFLE_SERVICE_DB_BACKEND, DBBackend.LEVELDB.name()); diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala index 80e014fd062..deb95773676 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala @@ -50,6 +50,7 @@ abstract class YarnShuffleIntegrationSuite extends BaseYarnClusterSuite { classOf[YarnShuffleService].getCanonicalName) yarnConfig.set(SHUFFLE_SERVICE_PORT.key, "0") yarnConfig.set(SHUFFLE_SERVICE_DB_BACKEND.key, dbBackend.name()) + yarnConfig.set(YarnTestAccessor.shuffleServiceIntegrationTestingKey, "true") yarnConfig } @@ -71,23 +72,18 @@ abstract class YarnShuffleIntegrationSuite extends BaseYarnClusterSuite { val shuffleService = YarnTestAccessor.getShuffleServiceInstance val registeredExecFile = YarnTestAccessor.getRegisteredExecutorFile(shuffleService) + assert(registeredExecFile != null) val result = File.createTempFile("result", null, tempDir) val finalState = runSpark( false, mainClassName(YarnExternalShuffleDriver.getClass), - appArgs = if (registeredExecFile != null) { - Seq(result.getAbsolutePath, registeredExecFile.getAbsolutePath) - } else { - Seq(result.getAbsolutePath) - }, + appArgs = Seq(result.getAbsolutePath, registeredExecFile.getAbsolutePath), extraConf = extraSparkConf() ) checkResult(finalState, result) - if (registeredExecFile != null) { - assert(YarnTestAccessor.getRegisteredExecutorFile(shuffleService).exists()) - } + assert(YarnTestAccessor.getRegisteredExecutorFile(shuffleService).exists()) } } diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnTestAccessor.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnTestAccessor.scala index d87cc263847..df7bfd800b1 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnTestAccessor.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnTestAccessor.scala @@ -37,4 +37,8 @@ object YarnTestAccessor { def getShuffleServiceConfOverlayResourceName: String = { YarnShuffleService.SHUFFLE_SERVICE_CONF_OVERLAY_RESOURCE_NAME } + + def shuffleServiceIntegrationTestingKey: String = { + YarnShuffleService.INTEGRATION_TESTING + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org