This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new b4916d4  [SPARK-35672][CORE][YARN][3.1] Pass user classpath entries to 
executors using config instead of command line
b4916d4 is described below

commit b4916d4a410820ba00125c00b55ca724b27cc853
Author: Erik Krogen <xkro...@apache.org>
AuthorDate: Mon Jun 28 17:38:56 2021 +0900

    [SPARK-35672][CORE][YARN][3.1] Pass user classpath entries to executors 
using config instead of command line
    
    ### What changes were proposed in this pull request?
    Refactor the logic for constructing the user classpath from 
`yarn.ApplicationMaster` into `yarn.Client` so that it can be leveraged on the 
executor side as well, instead of having the driver construct it and pass it to 
the executor via command-line arguments. A new method, `getUserClassPath`, is 
added to `CoarseGrainedExecutorBackend` which defaults to `Nil` (consistent 
with the existing behavior where non-YARN resource managers do not configure 
the user classpath). `YarnCoarseGrained [...]
    
    ### Why are the changes needed?
    User-provided JARs are made available to executors using a custom 
classloader, so they do not appear on the standard Java classpath. Instead, 
they are passed as a list to the executor which then creates a classloader out 
of the URLs. Currently in the case of YARN, this list of JARs is crafted by the 
Driver (in `ExecutorRunnable`), which then passes the information to the 
executors (`CoarseGrainedExecutorBackend`) by specifying each JAR on the 
executor command line as `--user-class-pat [...]
    
    > /bin/bash: Argument list too long
    
    A [Google 
search](https://www.google.com/search?q=spark%20%22%2Fbin%2Fbash%3A%20argument%20list%20too%20long%22&oq=spark%20%22%2Fbin%2Fbash%3A%20argument%20list%20too%20long%22)
 indicates that this is not a theoretical problem and afflicts real users, 
including ours. Passing this list using the configurations instead resolves 
this issue.
    
    ### Does this PR introduce _any_ user-facing change?
    No, except for fixing the bug, allowing for larger JAR lists to be passed 
successfully. Configuration of JARs is identical to before.
    
    ### How was this patch tested?
    New unit tests were added in `YarnClusterSuite`. Also, we have been running 
a similar fix internally for 4 months with great success.
    
    Note that this is a backport of #32810 with minor conflicts around imports.
    
    Closes #33090 from 
xkrogen/xkrogen-SPARK-35672-classpath-scalable-branch-3.1.
    
    Authored-by: Erik Krogen <xkro...@apache.org>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 .../executor/CoarseGrainedExecutorBackend.scala    | 17 +++---
 .../scala/org/apache/spark/executor/Executor.scala |  2 +
 .../CoarseGrainedExecutorBackendSuite.scala        | 17 +++---
 .../spark/deploy/yarn/ApplicationMaster.scala      |  9 ++--
 .../org/apache/spark/deploy/yarn/Client.scala      | 31 ++++++++++-
 .../spark/deploy/yarn/ExecutorRunnable.scala       | 12 -----
 .../YarnCoarseGrainedExecutorBackend.scala         |  8 +--
 .../org/apache/spark/deploy/yarn/ClientSuite.scala | 35 ++++++++++++
 .../spark/deploy/yarn/YarnClusterSuite.scala       | 63 ++++++++++++++++++----
 9 files changed, 142 insertions(+), 52 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 6a1fd57..d607ee8 100644
--- 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -51,7 +51,6 @@ private[spark] class CoarseGrainedExecutorBackend(
     bindAddress: String,
     hostname: String,
     cores: Int,
-    userClassPath: Seq[URL],
     env: SparkEnv,
     resourcesFileOpt: Option[String],
     resourceProfile: ResourceProfile)
@@ -114,7 +113,7 @@ private[spark] class CoarseGrainedExecutorBackend(
    */
   private def createClassLoader(): MutableURLClassLoader = {
     val currentLoader = Utils.getContextOrSparkClassLoader
-    val urls = userClassPath.toArray
+    val urls = getUserClassPath.toArray
     if (env.conf.get(EXECUTOR_USER_CLASS_PATH_FIRST)) {
       new ChildFirstURLClassLoader(urls, currentLoader)
     } else {
@@ -139,6 +138,8 @@ private[spark] class CoarseGrainedExecutorBackend(
     }
   }
 
+  def getUserClassPath: Seq[URL] = Nil
+
   def extractLogUrls: Map[String, String] = {
     val prefix = "SPARK_LOG_URL_"
     sys.env.filterKeys(_.startsWith(prefix))
@@ -155,7 +156,7 @@ private[spark] class CoarseGrainedExecutorBackend(
     case RegisteredExecutor =>
       logInfo("Successfully registered with driver")
       try {
-        executor = new Executor(executorId, hostname, env, userClassPath, 
isLocal = false,
+        executor = new Executor(executorId, hostname, env, getUserClassPath, 
isLocal = false,
           resources = _resources)
         driver.get.send(LaunchedExecutor(executorId))
       } catch {
@@ -368,7 +369,6 @@ private[spark] object CoarseGrainedExecutorBackend extends 
Logging {
       cores: Int,
       appId: String,
       workerUrl: Option[String],
-      userClassPath: mutable.ListBuffer[URL],
       resourcesFileOpt: Option[String],
       resourceProfileId: Int)
 
@@ -376,7 +376,7 @@ private[spark] object CoarseGrainedExecutorBackend extends 
Logging {
     val createFn: (RpcEnv, Arguments, SparkEnv, ResourceProfile) =>
       CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env, 
resourceProfile) =>
       new CoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, 
arguments.executorId,
-        arguments.bindAddress, arguments.hostname, arguments.cores, 
arguments.userClassPath.toSeq,
+        arguments.bindAddress, arguments.hostname, arguments.cores,
         env, arguments.resourcesFileOpt, resourceProfile)
     }
     run(parseArguments(args, this.getClass.getCanonicalName.stripSuffix("$")), 
createFn)
@@ -459,7 +459,6 @@ private[spark] object CoarseGrainedExecutorBackend extends 
Logging {
     var resourcesFileOpt: Option[String] = None
     var appId: String = null
     var workerUrl: Option[String] = None
-    val userClassPath = new mutable.ListBuffer[URL]()
     var resourceProfileId: Int = DEFAULT_RESOURCE_PROFILE_ID
 
     var argv = args.toList
@@ -490,9 +489,6 @@ private[spark] object CoarseGrainedExecutorBackend extends 
Logging {
           // Worker url is used in spark standalone mode to enforce 
fate-sharing with worker
           workerUrl = Some(value)
           argv = tail
-        case ("--user-class-path") :: value :: tail =>
-          userClassPath += new URL(value)
-          argv = tail
         case ("--resourceProfileId") :: value :: tail =>
           resourceProfileId = value.toInt
           argv = tail
@@ -519,7 +515,7 @@ private[spark] object CoarseGrainedExecutorBackend extends 
Logging {
     }
 
     Arguments(driverUrl, executorId, bindAddress, hostname, cores, appId, 
workerUrl,
-      userClassPath, resourcesFileOpt, resourceProfileId)
+      resourcesFileOpt, resourceProfileId)
   }
 
   private def printUsageAndExit(classNameForEntry: String): Unit = {
@@ -537,7 +533,6 @@ private[spark] object CoarseGrainedExecutorBackend extends 
Logging {
       |   --resourcesFile <fileWithJSONResourceInformation>
       |   --app-id <appid>
       |   --worker-url <workerUrl>
-      |   --user-class-path <url>
       |   --resourceProfileId <id>
       |""".stripMargin)
     // scalastyle:on println
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala 
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index a40a61e..e7a6f66 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -877,6 +877,8 @@ private[spark] class Executor(
     val urls = userClassPath.toArray ++ currentJars.keySet.map { uri =>
       new File(uri.split("/").last).toURI.toURL
     }
+    logInfo(s"Starting executor with user classpath (userClassPathFirst = 
$userClassPathFirst): " +
+        urls.mkString("'", ",", "'"))
     if (userClassPathFirst) {
       new ChildFirstURLClassLoader(urls, currentLoader)
     } else {
diff --git 
a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala
 
b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala
index 810dcf0..0d045dc 100644
--- 
a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala
@@ -18,7 +18,6 @@
 package org.apache.spark.executor
 
 import java.io.File
-import java.net.URL
 import java.nio.ByteBuffer
 import java.util.Properties
 
@@ -56,7 +55,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
 
     // we don't really use this, just need it to get at the parser function
     val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", 
"1", "host1", "host1",
-      4, Seq.empty[URL], env, None, resourceProfile)
+      4, env, None, resourceProfile)
     withTempDir { tmpDir =>
       val testResourceArgs: JObject = ("" -> "")
       val ja = JArray(List(testResourceArgs))
@@ -77,7 +76,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
     val env = createMockEnv(conf, serializer)
     // we don't really use this, just need it to get at the parser function
     val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", 
"1", "host1", "host1",
-      4, Seq.empty[URL], env, None, 
ResourceProfile.getOrCreateDefaultProfile(conf))
+      4, env, None, ResourceProfile.getOrCreateDefaultProfile(conf))
     withTempDir { tmpDir =>
       val ra = ResourceAllocation(EXECUTOR_GPU_ID, Seq("0", "1"))
       val ja = Extraction.decompose(Seq(ra))
@@ -111,7 +110,7 @@ class CoarseGrainedExecutorBackendSuite extends 
SparkFunSuite
     val env = createMockEnv(conf, serializer)
     // we don't really use this, just need it to get at the parser function
     val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", 
"1", "host1", "host1",
-      4, Seq.empty[URL], env, None, resourceProfile)
+      4, env, None, resourceProfile)
 
     withTempDir { tmpDir =>
       val gpuArgs = ResourceAllocation(EXECUTOR_GPU_ID, Seq("0", "1"))
@@ -138,7 +137,7 @@ class CoarseGrainedExecutorBackendSuite extends 
SparkFunSuite
     val env = createMockEnv(conf, serializer)
     // we don't really use this, just need it to get at the parser function
     val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", 
"1", "host1", "host1",
-      4, Seq.empty[URL], env, None, 
ResourceProfile.getOrCreateDefaultProfile(conf))
+      4, env, None, ResourceProfile.getOrCreateDefaultProfile(conf))
 
     // not enough gpu's on the executor
     withTempDir { tmpDir =>
@@ -191,7 +190,7 @@ class CoarseGrainedExecutorBackendSuite extends 
SparkFunSuite
     val env = createMockEnv(conf, serializer)
     // we don't really use this, just need it to get at the parser function
     val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", 
"1", "host1", "host1",
-      4, Seq.empty[URL], env, None, resourceProfile)
+      4, env, None, resourceProfile)
 
     // executor resources < required
     withTempDir { tmpDir =>
@@ -222,7 +221,7 @@ class CoarseGrainedExecutorBackendSuite extends 
SparkFunSuite
 
       // we don't really use this, just need it to get at the parser function
       val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", 
"1", "host1", "host1",
-        4, Seq.empty[URL], env, None, 
ResourceProfile.getOrCreateDefaultProfile(conf))
+        4, env, None, ResourceProfile.getOrCreateDefaultProfile(conf))
 
       val parsedResources = backend.parseOrFindResources(None)
 
@@ -269,7 +268,7 @@ class CoarseGrainedExecutorBackendSuite extends 
SparkFunSuite
 
     // we don't really use this, just need it to get at the parser function
     val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", 
"1", "host1", "host1",
-      4, Seq.empty[URL], env, None, resourceProfile)
+      4, env, None, resourceProfile)
     val gpuArgs = ResourceAllocation(EXECUTOR_GPU_ID, Seq("0", "1"))
     val ja = Extraction.decompose(Seq(gpuArgs))
     val f1 = createTempJsonFile(dir, "resources", ja)
@@ -294,7 +293,7 @@ class CoarseGrainedExecutorBackendSuite extends 
SparkFunSuite
       val rpcEnv = RpcEnv.create("1", "localhost", 0, conf, securityMgr)
       val env = createMockEnv(conf, serializer, Some(rpcEnv))
         backend = new CoarseGrainedExecutorBackend(env.rpcEnv, 
rpcEnv.address.hostPort, "1",
-        "host1", "host1", 4, Seq.empty[URL], env, None,
+        "host1", "host1", 4, env, None,
           resourceProfile = ResourceProfile.getOrCreateDefaultProfile(conf))
       assert(backend.taskResources.isEmpty)
 
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index be9a88c..c9c89d6 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -17,9 +17,9 @@
 
 package org.apache.spark.deploy.yarn
 
-import java.io.{File, IOException}
+import java.io.IOException
 import java.lang.reflect.{InvocationTargetException, Modifier}
-import java.net.{URI, URL}
+import java.net.URI
 import java.security.PrivilegedExceptionAction
 import java.util.concurrent.{TimeoutException, TimeUnit}
 
@@ -80,10 +80,7 @@ private[spark] class ApplicationMaster(
   private var metricsSystem: Option[MetricsSystem] = None
 
   private val userClassLoader = {
-    val classpath = Client.getUserClasspath(sparkConf)
-    val urls = classpath.map { entry =>
-      new URL("file:" + new File(entry.getPath()).getAbsolutePath())
-    }
+    val urls = Client.getUserClasspathUrls(sparkConf, isClusterMode)
 
     if (isClusterMode) {
       if (Client.isUserClassPathFirst(sparkConf, isDriver = true)) {
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 618faef..5d0bf0d 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -18,9 +18,10 @@
 package org.apache.spark.deploy.yarn
 
 import java.io.{FileSystem => _, _}
-import java.net.{InetAddress, UnknownHostException, URI}
+import java.net.{InetAddress, UnknownHostException, URI, URL}
 import java.nio.ByteBuffer
 import java.nio.charset.StandardCharsets
+import java.nio.file.Paths
 import java.util.{Locale, Properties, UUID}
 import java.util.zip.{ZipEntry, ZipOutputStream}
 
@@ -1267,7 +1268,7 @@ private[spark] class Client(
 
 }
 
-private object Client extends Logging {
+private[spark] object Client extends Logging {
 
   // Alias for the user jar
   val APP_JAR_NAME: String = "__app__.jar"
@@ -1429,6 +1430,32 @@ private object Client extends Logging {
     (mainUri ++ secondaryUris).toArray
   }
 
+  /**
+   * Returns a list of local, absolute file URLs representing the user 
classpath. Note that this
+   * must be executed on the same host which will access the URLs, as it will 
resolve relative
+   * paths based on the current working directory.
+   *
+   * @param conf Spark configuration.
+   * @param useClusterPath Whether to use the 'cluster' path when resolving 
paths with the
+   *                       `local` scheme. This should be used when running on 
the cluster, but
+   *                       not when running on the gateway (i.e. for the 
driver in `client` mode).
+   * @return Array of local URLs ready to be passed to a 
[[java.net.URLClassLoader]].
+   */
+  def getUserClasspathUrls(conf: SparkConf, useClusterPath: Boolean): 
Array[URL] = {
+    Client.getUserClasspath(conf).map { uri =>
+      val inputPath = uri.getPath
+      val replacedFilePath = if (Utils.isLocalUri(uri.toString) && 
useClusterPath) {
+        Client.getClusterPath(conf, inputPath)
+      } else {
+        // Any other URI schemes should have been resolved by this point
+        assert(uri.getScheme == null || uri.getScheme == "file" || 
Utils.isLocalUri(uri.toString),
+          "getUserClasspath should only return 'file' or 'local' URIs but 
found: " + uri)
+        inputPath
+      }
+      Paths.get(replacedFilePath).toAbsolutePath.toUri.toURL
+    }
+  }
+
   private def getMainJarUri(mainJar: Option[String]): Option[URI] = {
     mainJar.flatMap { path =>
       val uri = Utils.resolveURI(path)
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index ede3906..bcef397 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark.deploy.yarn
 
-import java.io.File
 import java.nio.ByteBuffer
 import java.util.Collections
 
@@ -188,16 +187,6 @@ private[yarn] class ExecutorRunnable(
     // For log4j configuration to reference
     javaOpts += ("-Dspark.yarn.app.container.log.dir=" + 
ApplicationConstants.LOG_DIR_EXPANSION_VAR)
 
-    val userClassPath = Client.getUserClasspath(sparkConf).flatMap { uri =>
-      val absPath =
-        if (new File(uri.getPath()).isAbsolute()) {
-          Client.getClusterPath(sparkConf, uri.getPath())
-        } else {
-          Client.buildPath(Environment.PWD.$(), uri.getPath())
-        }
-      Seq("--user-class-path", "file:" + absPath)
-    }.toSeq
-
     YarnSparkHadoopUtil.addOutOfMemoryErrorArgument(javaOpts)
     val commands = prefixEnv ++
       Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++
@@ -209,7 +198,6 @@ private[yarn] class ExecutorRunnable(
         "--cores", executorCores.toString,
         "--app-id", appId,
         "--resourceProfileId", resourceProfileId.toString) ++
-      userClassPath ++
       Seq(
         s"1>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout",
         s"2>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr")
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala
index ce46ffa..3dd51f1 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala
@@ -21,6 +21,7 @@ import java.net.URL
 
 import org.apache.spark.SparkEnv
 import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.yarn.Client
 import org.apache.spark.internal.Logging
 import org.apache.spark.resource.ResourceProfile
 import org.apache.spark.rpc.RpcEnv
@@ -38,7 +39,6 @@ private[spark] class YarnCoarseGrainedExecutorBackend(
     bindAddress: String,
     hostname: String,
     cores: Int,
-    userClassPath: Seq[URL],
     env: SparkEnv,
     resourcesFile: Option[String],
     resourceProfile: ResourceProfile)
@@ -49,13 +49,15 @@ private[spark] class YarnCoarseGrainedExecutorBackend(
     bindAddress,
     hostname,
     cores,
-    userClassPath,
     env,
     resourcesFile,
     resourceProfile) with Logging {
 
   private lazy val hadoopConfiguration = 
SparkHadoopUtil.get.newConfiguration(env.conf)
 
+  override def getUserClassPath: Seq[URL] =
+    Client.getUserClasspathUrls(env.conf, useClusterPath = true)
+
   override def extractLogUrls: Map[String, String] = {
     YarnContainerInfoHelper.getLogUrls(hadoopConfiguration, container = None)
       .getOrElse(Map())
@@ -73,7 +75,7 @@ private[spark] object YarnCoarseGrainedExecutorBackend 
extends Logging {
     val createFn: (RpcEnv, CoarseGrainedExecutorBackend.Arguments, SparkEnv, 
ResourceProfile) =>
       CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env, 
resourceProfile) =>
       new YarnCoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, 
arguments.executorId,
-        arguments.bindAddress, arguments.hostname, arguments.cores, 
arguments.userClassPath.toSeq,
+        arguments.bindAddress, arguments.hostname, arguments.cores,
         env, arguments.resourcesFileOpt, resourceProfile)
     }
     val backendArgs = CoarseGrainedExecutorBackend.parseArguments(args,
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index ea3acec..1650ea2 100644
--- 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.deploy.yarn
 
 import java.io.{File, FileInputStream, FileNotFoundException, FileOutputStream}
 import java.net.URI
+import java.nio.file.Paths
 import java.util.Properties
 import java.util.concurrent.ConcurrentHashMap
 
@@ -583,6 +584,40 @@ class ClientSuite extends SparkFunSuite with Matchers {
     }
   }
 
+  test("SPARK-35672: test Client.getUserClasspathUrls") {
+    val gatewayRootPath = "/local/matching/replace"
+    val replacementRootPath = "/replaced/path"
+    val conf = new SparkConf()
+        .set(SECONDARY_JARS, Seq(
+          s"local:$gatewayRootPath/foo.jar",
+          "local:/local/not/matching/replace/foo.jar",
+          "file:/absolute/file/path/foo.jar",
+          s"$gatewayRootPath/but-not-actually-local/foo.jar",
+          "/absolute/path/foo.jar",
+          "relative/path/foo.jar"
+        ))
+        .set(GATEWAY_ROOT_PATH, gatewayRootPath)
+        .set(REPLACEMENT_ROOT_PATH, replacementRootPath)
+
+    def assertUserClasspathUrls(cluster: Boolean, expectedReplacementPath: 
String): Unit = {
+      val expectedUrls = Seq(
+        Paths.get(APP_JAR_NAME).toAbsolutePath.toUri.toString,
+        s"file:$expectedReplacementPath/foo.jar",
+        "file:/local/not/matching/replace/foo.jar",
+        "file:/absolute/file/path/foo.jar",
+        // since this path wasn't a local URI, it should never be replaced
+        s"file:$gatewayRootPath/but-not-actually-local/foo.jar",
+        "file:/absolute/path/foo.jar",
+        Paths.get("relative/path/foo.jar").toAbsolutePath.toUri.toString
+      ).map(URI.create(_).toURL).toArray
+      assert(Client.getUserClasspathUrls(conf, cluster) === expectedUrls)
+    }
+    // assert that no replacement happens when cluster = false by expecting 
the replacement
+    // path to be the same as the original path
+    assertUserClasspathUrls(cluster = false, gatewayRootPath)
+    assertUserClasspathUrls(cluster = true, replacementRootPath)
+  }
+
   private val matching = Seq(
     ("files URI match test1", "file:///file1", "file:///file2"),
     ("files URI match test2", "file:///c:file1", "file://c:file2"),
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index 9bc934d..d25f5de 100644
--- 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -18,7 +18,9 @@
 package org.apache.spark.deploy.yarn
 
 import java.io.File
+import java.net.URL
 import java.nio.charset.StandardCharsets
+import java.nio.file.Paths
 import java.util.{HashMap => JHashMap}
 
 import scala.collection.mutable
@@ -149,12 +151,48 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
     checkResult(finalState, result)
   }
 
-  test("run Spark in yarn-client mode with additional jar") {
-    testWithAddJar(true)
+  test("SPARK-35672: run Spark in yarn-client mode with additional jar using 
URI scheme 'local'") {
+    testWithAddJar(clientMode = true, "local")
   }
 
-  test("run Spark in yarn-cluster mode with additional jar") {
-    testWithAddJar(false)
+  test("SPARK-35672: run Spark in yarn-cluster mode with additional jar using 
URI scheme 'local'") {
+    testWithAddJar(clientMode = false, "local")
+  }
+
+  test("SPARK-35672: run Spark in yarn-client mode with additional jar using 
URI scheme 'local' " +
+      "and gateway-replacement path") {
+    // Use the original jar URL, but set up the gateway/replacement configs 
such that if
+    // replacement occurs, things will break. This ensures the replacement 
doesn't apply to the
+    // driver in 'client' mode. Executors will fail in this case because they 
still apply the
+    // replacement in client mode.
+    testWithAddJar(clientMode = true, "local", Some(jarUrl => {
+      (jarUrl.getPath, Map(
+        GATEWAY_ROOT_PATH.key -> Paths.get(jarUrl.toURI).getParent.toString,
+        REPLACEMENT_ROOT_PATH.key -> "/nonexistent/path/"
+      ))
+    }), expectExecutorFailure = true)
+  }
+
+  test("SPARK-35672: run Spark in yarn-cluster mode with additional jar using 
URI scheme 'local' " +
+      "and gateway-replacement path") {
+    // Put a prefix in front of the original jar URL which causes it to be an 
invalid path.
+    // Set up the gateway/replacement configs such that if replacement occurs, 
it is a valid
+    // path again (by removing the prefix). This ensures the replacement is 
applied.
+    val gatewayPath = "/replaceme/nonexistent/"
+    testWithAddJar(clientMode = false, "local", Some(jarUrl => {
+      (gatewayPath + jarUrl.getPath, Map(
+        GATEWAY_ROOT_PATH.key -> gatewayPath,
+        REPLACEMENT_ROOT_PATH.key -> ""
+      ))
+    }))
+  }
+
+  test("SPARK-35672: run Spark in yarn-client mode with additional jar using 
URI scheme 'file'") {
+    testWithAddJar(clientMode = true, "file")
+  }
+
+  test("SPARK-35672: run Spark in yarn-cluster mode with additional jar using 
URI scheme 'file'") {
+    testWithAddJar(clientMode = false, "file")
   }
 
   test("run Spark in yarn-cluster mode unsuccessfully") {
@@ -285,16 +323,23 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
     checkResult(finalState, result)
   }
 
-  private def testWithAddJar(clientMode: Boolean): Unit = {
+  private def testWithAddJar(
+      clientMode: Boolean,
+      jarUriScheme: String,
+      jarUrlToPathAndConfs: Option[URL => (String, Map[String, String])] = 
None,
+      expectExecutorFailure: Boolean = false): Unit = {
     val originalJar = TestUtils.createJarWithFiles(Map("test.resource" -> 
"ORIGINAL"), tempDir)
+    val (jarPath, extraConf) = jarUrlToPathAndConfs
+        .map(_.apply(originalJar))
+        .getOrElse((originalJar.getPath, Map[String, String]()))
     val driverResult = File.createTempFile("driver", null, tempDir)
     val executorResult = File.createTempFile("executor", null, tempDir)
     val finalState = runSpark(clientMode, 
mainClassName(YarnClasspathTest.getClass),
-      appArgs = Seq(driverResult.getAbsolutePath(), 
executorResult.getAbsolutePath()),
-      extraClassPath = Seq(originalJar.getPath()),
-      extraJars = Seq("local:" + originalJar.getPath()))
+      appArgs = Seq(driverResult.getAbsolutePath, 
executorResult.getAbsolutePath),
+      extraJars = Seq(s"$jarUriScheme:$jarPath"),
+      extraConf = extraConf)
     checkResult(finalState, driverResult, "ORIGINAL")
-    checkResult(finalState, executorResult, "ORIGINAL")
+    checkResult(finalState, executorResult, if (expectExecutorFailure) 
"failure" else "ORIGINAL")
   }
 
   private def testPySpark(

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to