This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push: new b4916d4 [SPARK-35672][CORE][YARN][3.1] Pass user classpath entries to executors using config instead of command line b4916d4 is described below commit b4916d4a410820ba00125c00b55ca724b27cc853 Author: Erik Krogen <xkro...@apache.org> AuthorDate: Mon Jun 28 17:38:56 2021 +0900 [SPARK-35672][CORE][YARN][3.1] Pass user classpath entries to executors using config instead of command line ### What changes were proposed in this pull request? Refactor the logic for constructing the user classpath from `yarn.ApplicationMaster` into `yarn.Client` so that it can be leveraged on the executor side as well, instead of having the driver construct it and pass it to the executor via command-line arguments. A new method, `getUserClassPath`, is added to `CoarseGrainedExecutorBackend` which defaults to `Nil` (consistent with the existing behavior where non-YARN resource managers do not configure the user classpath). `YarnCoarseGrained [...] ### Why are the changes needed? User-provided JARs are made available to executors using a custom classloader, so they do not appear on the standard Java classpath. Instead, they are passed as a list to the executor which then creates a classloader out of the URLs. Currently in the case of YARN, this list of JARs is crafted by the Driver (in `ExecutorRunnable`), which then passes the information to the executors (`CoarseGrainedExecutorBackend`) by specifying each JAR on the executor command line as `--user-class-pat [...] > /bin/bash: Argument list too long A [Google search](https://www.google.com/search?q=spark%20%22%2Fbin%2Fbash%3A%20argument%20list%20too%20long%22&oq=spark%20%22%2Fbin%2Fbash%3A%20argument%20list%20too%20long%22) indicates that this is not a theoretical problem and afflicts real users, including ours. Passing this list using the configurations instead resolves this issue. ### Does this PR introduce _any_ user-facing change? No, except for fixing the bug, allowing for larger JAR lists to be passed successfully. Configuration of JARs is identical to before. ### How was this patch tested? New unit tests were added in `YarnClusterSuite`. Also, we have been running a similar fix internally for 4 months with great success. Note that this is a backport of #32810 with minor conflicts around imports. Closes #33090 from xkrogen/xkrogen-SPARK-35672-classpath-scalable-branch-3.1. Authored-by: Erik Krogen <xkro...@apache.org> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../executor/CoarseGrainedExecutorBackend.scala | 17 +++--- .../scala/org/apache/spark/executor/Executor.scala | 2 + .../CoarseGrainedExecutorBackendSuite.scala | 17 +++--- .../spark/deploy/yarn/ApplicationMaster.scala | 9 ++-- .../org/apache/spark/deploy/yarn/Client.scala | 31 ++++++++++- .../spark/deploy/yarn/ExecutorRunnable.scala | 12 ----- .../YarnCoarseGrainedExecutorBackend.scala | 8 +-- .../org/apache/spark/deploy/yarn/ClientSuite.scala | 35 ++++++++++++ .../spark/deploy/yarn/YarnClusterSuite.scala | 63 ++++++++++++++++++---- 9 files changed, 142 insertions(+), 52 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 6a1fd57..d607ee8 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -51,7 +51,6 @@ private[spark] class CoarseGrainedExecutorBackend( bindAddress: String, hostname: String, cores: Int, - userClassPath: Seq[URL], env: SparkEnv, resourcesFileOpt: Option[String], resourceProfile: ResourceProfile) @@ -114,7 +113,7 @@ private[spark] class CoarseGrainedExecutorBackend( */ private def createClassLoader(): MutableURLClassLoader = { val currentLoader = Utils.getContextOrSparkClassLoader - val urls = userClassPath.toArray + val urls = getUserClassPath.toArray if (env.conf.get(EXECUTOR_USER_CLASS_PATH_FIRST)) { new ChildFirstURLClassLoader(urls, currentLoader) } else { @@ -139,6 +138,8 @@ private[spark] class CoarseGrainedExecutorBackend( } } + def getUserClassPath: Seq[URL] = Nil + def extractLogUrls: Map[String, String] = { val prefix = "SPARK_LOG_URL_" sys.env.filterKeys(_.startsWith(prefix)) @@ -155,7 +156,7 @@ private[spark] class CoarseGrainedExecutorBackend( case RegisteredExecutor => logInfo("Successfully registered with driver") try { - executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false, + executor = new Executor(executorId, hostname, env, getUserClassPath, isLocal = false, resources = _resources) driver.get.send(LaunchedExecutor(executorId)) } catch { @@ -368,7 +369,6 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { cores: Int, appId: String, workerUrl: Option[String], - userClassPath: mutable.ListBuffer[URL], resourcesFileOpt: Option[String], resourceProfileId: Int) @@ -376,7 +376,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { val createFn: (RpcEnv, Arguments, SparkEnv, ResourceProfile) => CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env, resourceProfile) => new CoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId, - arguments.bindAddress, arguments.hostname, arguments.cores, arguments.userClassPath.toSeq, + arguments.bindAddress, arguments.hostname, arguments.cores, env, arguments.resourcesFileOpt, resourceProfile) } run(parseArguments(args, this.getClass.getCanonicalName.stripSuffix("$")), createFn) @@ -459,7 +459,6 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { var resourcesFileOpt: Option[String] = None var appId: String = null var workerUrl: Option[String] = None - val userClassPath = new mutable.ListBuffer[URL]() var resourceProfileId: Int = DEFAULT_RESOURCE_PROFILE_ID var argv = args.toList @@ -490,9 +489,6 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { // Worker url is used in spark standalone mode to enforce fate-sharing with worker workerUrl = Some(value) argv = tail - case ("--user-class-path") :: value :: tail => - userClassPath += new URL(value) - argv = tail case ("--resourceProfileId") :: value :: tail => resourceProfileId = value.toInt argv = tail @@ -519,7 +515,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { } Arguments(driverUrl, executorId, bindAddress, hostname, cores, appId, workerUrl, - userClassPath, resourcesFileOpt, resourceProfileId) + resourcesFileOpt, resourceProfileId) } private def printUsageAndExit(classNameForEntry: String): Unit = { @@ -537,7 +533,6 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { | --resourcesFile <fileWithJSONResourceInformation> | --app-id <appid> | --worker-url <workerUrl> - | --user-class-path <url> | --resourceProfileId <id> |""".stripMargin) // scalastyle:on println diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index a40a61e..e7a6f66 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -877,6 +877,8 @@ private[spark] class Executor( val urls = userClassPath.toArray ++ currentJars.keySet.map { uri => new File(uri.split("/").last).toURI.toURL } + logInfo(s"Starting executor with user classpath (userClassPathFirst = $userClassPathFirst): " + + urls.mkString("'", ",", "'")) if (userClassPathFirst) { new ChildFirstURLClassLoader(urls, currentLoader) } else { diff --git a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala index 810dcf0..0d045dc 100644 --- a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala @@ -18,7 +18,6 @@ package org.apache.spark.executor import java.io.File -import java.net.URL import java.nio.ByteBuffer import java.util.Properties @@ -56,7 +55,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite // we don't really use this, just need it to get at the parser function val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1", "host1", - 4, Seq.empty[URL], env, None, resourceProfile) + 4, env, None, resourceProfile) withTempDir { tmpDir => val testResourceArgs: JObject = ("" -> "") val ja = JArray(List(testResourceArgs)) @@ -77,7 +76,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite val env = createMockEnv(conf, serializer) // we don't really use this, just need it to get at the parser function val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1", "host1", - 4, Seq.empty[URL], env, None, ResourceProfile.getOrCreateDefaultProfile(conf)) + 4, env, None, ResourceProfile.getOrCreateDefaultProfile(conf)) withTempDir { tmpDir => val ra = ResourceAllocation(EXECUTOR_GPU_ID, Seq("0", "1")) val ja = Extraction.decompose(Seq(ra)) @@ -111,7 +110,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite val env = createMockEnv(conf, serializer) // we don't really use this, just need it to get at the parser function val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1", "host1", - 4, Seq.empty[URL], env, None, resourceProfile) + 4, env, None, resourceProfile) withTempDir { tmpDir => val gpuArgs = ResourceAllocation(EXECUTOR_GPU_ID, Seq("0", "1")) @@ -138,7 +137,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite val env = createMockEnv(conf, serializer) // we don't really use this, just need it to get at the parser function val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1", - 4, Seq.empty[URL], env, None, ResourceProfile.getOrCreateDefaultProfile(conf)) + 4, env, None, ResourceProfile.getOrCreateDefaultProfile(conf)) // not enough gpu's on the executor withTempDir { tmpDir => @@ -191,7 +190,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite val env = createMockEnv(conf, serializer) // we don't really use this, just need it to get at the parser function val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1", - 4, Seq.empty[URL], env, None, resourceProfile) + 4, env, None, resourceProfile) // executor resources < required withTempDir { tmpDir => @@ -222,7 +221,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite // we don't really use this, just need it to get at the parser function val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1", - 4, Seq.empty[URL], env, None, ResourceProfile.getOrCreateDefaultProfile(conf)) + 4, env, None, ResourceProfile.getOrCreateDefaultProfile(conf)) val parsedResources = backend.parseOrFindResources(None) @@ -269,7 +268,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite // we don't really use this, just need it to get at the parser function val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1", - 4, Seq.empty[URL], env, None, resourceProfile) + 4, env, None, resourceProfile) val gpuArgs = ResourceAllocation(EXECUTOR_GPU_ID, Seq("0", "1")) val ja = Extraction.decompose(Seq(gpuArgs)) val f1 = createTempJsonFile(dir, "resources", ja) @@ -294,7 +293,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite val rpcEnv = RpcEnv.create("1", "localhost", 0, conf, securityMgr) val env = createMockEnv(conf, serializer, Some(rpcEnv)) backend = new CoarseGrainedExecutorBackend(env.rpcEnv, rpcEnv.address.hostPort, "1", - "host1", "host1", 4, Seq.empty[URL], env, None, + "host1", "host1", 4, env, None, resourceProfile = ResourceProfile.getOrCreateDefaultProfile(conf)) assert(backend.taskResources.isEmpty) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index be9a88c..c9c89d6 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -17,9 +17,9 @@ package org.apache.spark.deploy.yarn -import java.io.{File, IOException} +import java.io.IOException import java.lang.reflect.{InvocationTargetException, Modifier} -import java.net.{URI, URL} +import java.net.URI import java.security.PrivilegedExceptionAction import java.util.concurrent.{TimeoutException, TimeUnit} @@ -80,10 +80,7 @@ private[spark] class ApplicationMaster( private var metricsSystem: Option[MetricsSystem] = None private val userClassLoader = { - val classpath = Client.getUserClasspath(sparkConf) - val urls = classpath.map { entry => - new URL("file:" + new File(entry.getPath()).getAbsolutePath()) - } + val urls = Client.getUserClasspathUrls(sparkConf, isClusterMode) if (isClusterMode) { if (Client.isUserClassPathFirst(sparkConf, isDriver = true)) { diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 618faef..5d0bf0d 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -18,9 +18,10 @@ package org.apache.spark.deploy.yarn import java.io.{FileSystem => _, _} -import java.net.{InetAddress, UnknownHostException, URI} +import java.net.{InetAddress, UnknownHostException, URI, URL} import java.nio.ByteBuffer import java.nio.charset.StandardCharsets +import java.nio.file.Paths import java.util.{Locale, Properties, UUID} import java.util.zip.{ZipEntry, ZipOutputStream} @@ -1267,7 +1268,7 @@ private[spark] class Client( } -private object Client extends Logging { +private[spark] object Client extends Logging { // Alias for the user jar val APP_JAR_NAME: String = "__app__.jar" @@ -1429,6 +1430,32 @@ private object Client extends Logging { (mainUri ++ secondaryUris).toArray } + /** + * Returns a list of local, absolute file URLs representing the user classpath. Note that this + * must be executed on the same host which will access the URLs, as it will resolve relative + * paths based on the current working directory. + * + * @param conf Spark configuration. + * @param useClusterPath Whether to use the 'cluster' path when resolving paths with the + * `local` scheme. This should be used when running on the cluster, but + * not when running on the gateway (i.e. for the driver in `client` mode). + * @return Array of local URLs ready to be passed to a [[java.net.URLClassLoader]]. + */ + def getUserClasspathUrls(conf: SparkConf, useClusterPath: Boolean): Array[URL] = { + Client.getUserClasspath(conf).map { uri => + val inputPath = uri.getPath + val replacedFilePath = if (Utils.isLocalUri(uri.toString) && useClusterPath) { + Client.getClusterPath(conf, inputPath) + } else { + // Any other URI schemes should have been resolved by this point + assert(uri.getScheme == null || uri.getScheme == "file" || Utils.isLocalUri(uri.toString), + "getUserClasspath should only return 'file' or 'local' URIs but found: " + uri) + inputPath + } + Paths.get(replacedFilePath).toAbsolutePath.toUri.toURL + } + } + private def getMainJarUri(mainJar: Option[String]): Option[URI] = { mainJar.flatMap { path => val uri = Utils.resolveURI(path) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index ede3906..bcef397 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -17,7 +17,6 @@ package org.apache.spark.deploy.yarn -import java.io.File import java.nio.ByteBuffer import java.util.Collections @@ -188,16 +187,6 @@ private[yarn] class ExecutorRunnable( // For log4j configuration to reference javaOpts += ("-Dspark.yarn.app.container.log.dir=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR) - val userClassPath = Client.getUserClasspath(sparkConf).flatMap { uri => - val absPath = - if (new File(uri.getPath()).isAbsolute()) { - Client.getClusterPath(sparkConf, uri.getPath()) - } else { - Client.buildPath(Environment.PWD.$(), uri.getPath()) - } - Seq("--user-class-path", "file:" + absPath) - }.toSeq - YarnSparkHadoopUtil.addOutOfMemoryErrorArgument(javaOpts) val commands = prefixEnv ++ Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++ @@ -209,7 +198,6 @@ private[yarn] class ExecutorRunnable( "--cores", executorCores.toString, "--app-id", appId, "--resourceProfileId", resourceProfileId.toString) ++ - userClassPath ++ Seq( s"1>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout", s"2>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr") diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala index ce46ffa..3dd51f1 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala @@ -21,6 +21,7 @@ import java.net.URL import org.apache.spark.SparkEnv import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.yarn.Client import org.apache.spark.internal.Logging import org.apache.spark.resource.ResourceProfile import org.apache.spark.rpc.RpcEnv @@ -38,7 +39,6 @@ private[spark] class YarnCoarseGrainedExecutorBackend( bindAddress: String, hostname: String, cores: Int, - userClassPath: Seq[URL], env: SparkEnv, resourcesFile: Option[String], resourceProfile: ResourceProfile) @@ -49,13 +49,15 @@ private[spark] class YarnCoarseGrainedExecutorBackend( bindAddress, hostname, cores, - userClassPath, env, resourcesFile, resourceProfile) with Logging { private lazy val hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(env.conf) + override def getUserClassPath: Seq[URL] = + Client.getUserClasspathUrls(env.conf, useClusterPath = true) + override def extractLogUrls: Map[String, String] = { YarnContainerInfoHelper.getLogUrls(hadoopConfiguration, container = None) .getOrElse(Map()) @@ -73,7 +75,7 @@ private[spark] object YarnCoarseGrainedExecutorBackend extends Logging { val createFn: (RpcEnv, CoarseGrainedExecutorBackend.Arguments, SparkEnv, ResourceProfile) => CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env, resourceProfile) => new YarnCoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId, - arguments.bindAddress, arguments.hostname, arguments.cores, arguments.userClassPath.toSeq, + arguments.bindAddress, arguments.hostname, arguments.cores, env, arguments.resourcesFileOpt, resourceProfile) } val backendArgs = CoarseGrainedExecutorBackend.parseArguments(args, diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index ea3acec..1650ea2 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.deploy.yarn import java.io.{File, FileInputStream, FileNotFoundException, FileOutputStream} import java.net.URI +import java.nio.file.Paths import java.util.Properties import java.util.concurrent.ConcurrentHashMap @@ -583,6 +584,40 @@ class ClientSuite extends SparkFunSuite with Matchers { } } + test("SPARK-35672: test Client.getUserClasspathUrls") { + val gatewayRootPath = "/local/matching/replace" + val replacementRootPath = "/replaced/path" + val conf = new SparkConf() + .set(SECONDARY_JARS, Seq( + s"local:$gatewayRootPath/foo.jar", + "local:/local/not/matching/replace/foo.jar", + "file:/absolute/file/path/foo.jar", + s"$gatewayRootPath/but-not-actually-local/foo.jar", + "/absolute/path/foo.jar", + "relative/path/foo.jar" + )) + .set(GATEWAY_ROOT_PATH, gatewayRootPath) + .set(REPLACEMENT_ROOT_PATH, replacementRootPath) + + def assertUserClasspathUrls(cluster: Boolean, expectedReplacementPath: String): Unit = { + val expectedUrls = Seq( + Paths.get(APP_JAR_NAME).toAbsolutePath.toUri.toString, + s"file:$expectedReplacementPath/foo.jar", + "file:/local/not/matching/replace/foo.jar", + "file:/absolute/file/path/foo.jar", + // since this path wasn't a local URI, it should never be replaced + s"file:$gatewayRootPath/but-not-actually-local/foo.jar", + "file:/absolute/path/foo.jar", + Paths.get("relative/path/foo.jar").toAbsolutePath.toUri.toString + ).map(URI.create(_).toURL).toArray + assert(Client.getUserClasspathUrls(conf, cluster) === expectedUrls) + } + // assert that no replacement happens when cluster = false by expecting the replacement + // path to be the same as the original path + assertUserClasspathUrls(cluster = false, gatewayRootPath) + assertUserClasspathUrls(cluster = true, replacementRootPath) + } + private val matching = Seq( ("files URI match test1", "file:///file1", "file:///file2"), ("files URI match test2", "file:///c:file1", "file://c:file2"), diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index 9bc934d..d25f5de 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -18,7 +18,9 @@ package org.apache.spark.deploy.yarn import java.io.File +import java.net.URL import java.nio.charset.StandardCharsets +import java.nio.file.Paths import java.util.{HashMap => JHashMap} import scala.collection.mutable @@ -149,12 +151,48 @@ class YarnClusterSuite extends BaseYarnClusterSuite { checkResult(finalState, result) } - test("run Spark in yarn-client mode with additional jar") { - testWithAddJar(true) + test("SPARK-35672: run Spark in yarn-client mode with additional jar using URI scheme 'local'") { + testWithAddJar(clientMode = true, "local") } - test("run Spark in yarn-cluster mode with additional jar") { - testWithAddJar(false) + test("SPARK-35672: run Spark in yarn-cluster mode with additional jar using URI scheme 'local'") { + testWithAddJar(clientMode = false, "local") + } + + test("SPARK-35672: run Spark in yarn-client mode with additional jar using URI scheme 'local' " + + "and gateway-replacement path") { + // Use the original jar URL, but set up the gateway/replacement configs such that if + // replacement occurs, things will break. This ensures the replacement doesn't apply to the + // driver in 'client' mode. Executors will fail in this case because they still apply the + // replacement in client mode. + testWithAddJar(clientMode = true, "local", Some(jarUrl => { + (jarUrl.getPath, Map( + GATEWAY_ROOT_PATH.key -> Paths.get(jarUrl.toURI).getParent.toString, + REPLACEMENT_ROOT_PATH.key -> "/nonexistent/path/" + )) + }), expectExecutorFailure = true) + } + + test("SPARK-35672: run Spark in yarn-cluster mode with additional jar using URI scheme 'local' " + + "and gateway-replacement path") { + // Put a prefix in front of the original jar URL which causes it to be an invalid path. + // Set up the gateway/replacement configs such that if replacement occurs, it is a valid + // path again (by removing the prefix). This ensures the replacement is applied. + val gatewayPath = "/replaceme/nonexistent/" + testWithAddJar(clientMode = false, "local", Some(jarUrl => { + (gatewayPath + jarUrl.getPath, Map( + GATEWAY_ROOT_PATH.key -> gatewayPath, + REPLACEMENT_ROOT_PATH.key -> "" + )) + })) + } + + test("SPARK-35672: run Spark in yarn-client mode with additional jar using URI scheme 'file'") { + testWithAddJar(clientMode = true, "file") + } + + test("SPARK-35672: run Spark in yarn-cluster mode with additional jar using URI scheme 'file'") { + testWithAddJar(clientMode = false, "file") } test("run Spark in yarn-cluster mode unsuccessfully") { @@ -285,16 +323,23 @@ class YarnClusterSuite extends BaseYarnClusterSuite { checkResult(finalState, result) } - private def testWithAddJar(clientMode: Boolean): Unit = { + private def testWithAddJar( + clientMode: Boolean, + jarUriScheme: String, + jarUrlToPathAndConfs: Option[URL => (String, Map[String, String])] = None, + expectExecutorFailure: Boolean = false): Unit = { val originalJar = TestUtils.createJarWithFiles(Map("test.resource" -> "ORIGINAL"), tempDir) + val (jarPath, extraConf) = jarUrlToPathAndConfs + .map(_.apply(originalJar)) + .getOrElse((originalJar.getPath, Map[String, String]())) val driverResult = File.createTempFile("driver", null, tempDir) val executorResult = File.createTempFile("executor", null, tempDir) val finalState = runSpark(clientMode, mainClassName(YarnClasspathTest.getClass), - appArgs = Seq(driverResult.getAbsolutePath(), executorResult.getAbsolutePath()), - extraClassPath = Seq(originalJar.getPath()), - extraJars = Seq("local:" + originalJar.getPath())) + appArgs = Seq(driverResult.getAbsolutePath, executorResult.getAbsolutePath), + extraJars = Seq(s"$jarUriScheme:$jarPath"), + extraConf = extraConf) checkResult(finalState, driverResult, "ORIGINAL") - checkResult(finalState, executorResult, "ORIGINAL") + checkResult(finalState, executorResult, if (expectExecutorFailure) "failure" else "ORIGINAL") } private def testPySpark( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org