This is an automated email from the ASF dual-hosted git repository. holden pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new af8dd411aa9 [SPARK-33782][K8S][CORE] Place spark.files, spark.jars and spark.files under the current working directory on the driver in K8S cluster mode af8dd411aa9 is described below commit af8dd411aa9d15bc59d09cf9959d4a57debc9635 Author: pralabhkumar <pralabhku...@gmail.com> AuthorDate: Mon Dec 12 21:16:54 2022 -0800 [SPARK-33782][K8S][CORE] Place spark.files, spark.jars and spark.files under the current working directory on the driver in K8S cluster mode ### What changes were proposed in this pull request? This PR will place spark.files , spark.jars and spark.pyfiles to the current working directory on the driver in K8s cluster mode ### Why are the changes needed? This mimics the behaviour of Yarn and also helps user to access files from PWD . Also as mentioned in the jira By doing this, users can, for example, leverage PEX to manage Python dependences in Apache Spark: ``` pex pyspark==3.0.1 pyarrow==0.15.1 pandas==0.25.3 -o myarchive.pex PYSPARK_PYTHON=./myarchive.pex spark-submit --files myarchive.pex ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Tested via unit test cases and also ran on local K8s cluster. Closes #37417 from pralabhkumar/rk_k8s_local_resource. Authored-by: pralabhkumar <pralabhku...@gmail.com> Signed-off-by: Holden Karau <hka...@netflix.com> --- .../org/apache/spark/deploy/SparkSubmit.scala | 55 +++++++++++++--------- .../org/apache/spark/deploy/SparkSubmitSuite.scala | 35 ++++++++++++++ 2 files changed, 69 insertions(+), 21 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 783cf47df16..73acfedd8bc 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -20,6 +20,7 @@ package org.apache.spark.deploy import java.io._ import java.lang.reflect.{InvocationTargetException, UndeclaredThrowableException} import java.net.{URI, URL} +import java.nio.file.Files import java.security.PrivilegedExceptionAction import java.text.ParseException import java.util.{ServiceLoader, UUID} @@ -383,43 +384,55 @@ private[spark] class SparkSubmit extends Logging { }.orNull if (isKubernetesClusterModeDriver) { - // Replace with the downloaded local jar path to avoid propagating hadoop compatible uris. - // Executors will get the jars from the Spark file server. - // Explicitly download the related files here - args.jars = localJars - val filesLocalFiles = Option(args.files).map { - downloadFileList(_, targetDir, sparkConf, hadoopConf) - }.orNull - val archiveLocalFiles = Option(args.archives).map { uris => + // SPARK-33748: this mimics the behaviour of Yarn cluster mode. If the driver is running + // in cluster mode, the archives should be available in the driver's current working + // directory too. + // SPARK-33782 : This downloads all the files , jars , archiveFiles and pyfiles to current + // working directory + def downloadResourcesToCurrentDirectory(uris: String, isArchive: Boolean = false): + String = { val resolvedUris = Utils.stringToSeq(uris).map(Utils.resolveURI) - val localArchives = downloadFileList( + val localResources = downloadFileList( resolvedUris.map( UriBuilder.fromUri(_).fragment(null).build().toString).mkString(","), targetDir, sparkConf, hadoopConf) - - // SPARK-33748: this mimics the behaviour of Yarn cluster mode. If the driver is running - // in cluster mode, the archives should be available in the driver's current working - // directory too. - Utils.stringToSeq(localArchives).map(Utils.resolveURI).zip(resolvedUris).map { - case (localArchive, resolvedUri) => - val source = new File(localArchive.getPath) + Utils.stringToSeq(localResources).map(Utils.resolveURI).zip(resolvedUris).map { + case (localResources, resolvedUri) => + val source = new File(localResources.getPath) val dest = new File( ".", if (resolvedUri.getFragment != null) resolvedUri.getFragment else source.getName) logInfo( - s"Unpacking an archive $resolvedUri " + + s"Files $resolvedUri " + s"from ${source.getAbsolutePath} to ${dest.getAbsolutePath}") Utils.deleteRecursively(dest) - Utils.unpack(source, dest) - + if (isArchive) { + Utils.unpack(source, dest) + } else { + Files.copy(source.toPath, dest.toPath) + } // Keep the URIs of local files with the given fragments. UriBuilder.fromUri( - localArchive).fragment(resolvedUri.getFragment).build().toString + localResources).fragment(resolvedUri.getFragment).build().toString }.mkString(",") + } + + val filesLocalFiles = Option(args.files).map { + downloadResourcesToCurrentDirectory(_) + }.orNull + val jarsLocalJars = Option(args.jars).map { + downloadResourcesToCurrentDirectory(_) + }.orNull + val archiveLocalFiles = Option(args.archives).map { + downloadResourcesToCurrentDirectory(_, true) + }.orNull + val pyLocalFiles = Option(args.pyFiles).map { + downloadResourcesToCurrentDirectory(_) }.orNull args.files = filesLocalFiles args.archives = archiveLocalFiles - args.pyFiles = localPyFiles + args.pyFiles = pyLocalFiles + args.jars = jarsLocalJars } } diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index 6bd3a49576a..76311d0ab18 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -486,6 +486,41 @@ class SparkSubmitSuite conf.get("spark.kubernetes.driver.container.image") should be ("bar") } + test("SPARK-33782: handles k8s files download to current directory") { + val clArgs = Seq( + "--deploy-mode", "client", + "--proxy-user", "test.user", + "--master", "k8s://host:port", + "--executor-memory", "5g", + "--class", "org.SomeClass", + "--driver-memory", "4g", + "--conf", "spark.kubernetes.namespace=spark", + "--conf", "spark.kubernetes.driver.container.image=bar", + "--conf", "spark.kubernetes.submitInDriver=true", + "--files", "src/test/resources/test_metrics_config.properties", + "--py-files", "src/test/resources/test_metrics_system.properties", + "--archives", "src/test/resources/log4j2.properties", + "--jars", "src/test/resources/TestUDTF.jar", + "/home/thejar.jar", + "arg1") + val appArgs = new SparkSubmitArguments(clArgs) + val (childArgs, classpath, conf, mainClass) = submit.prepareSubmitEnvironment(appArgs) + conf.get("spark.master") should be ("k8s://https://host:port") + conf.get("spark.executor.memory") should be ("5g") + conf.get("spark.driver.memory") should be ("4g") + conf.get("spark.kubernetes.namespace") should be ("spark") + conf.get("spark.kubernetes.driver.container.image") should be ("bar") + + Files.exists(Paths.get("test_metrics_config.properties")) should be (true) + Files.exists(Paths.get("test_metrics_system.properties")) should be (true) + Files.exists(Paths.get("log4j2.properties")) should be (true) + Files.exists(Paths.get("TestUDTF.jar")) should be (true) + Files.delete(Paths.get("test_metrics_config.properties")) + Files.delete(Paths.get("test_metrics_system.properties")) + Files.delete(Paths.get("log4j2.properties")) + Files.delete(Paths.get("TestUDTF.jar")) + } + /** * Helper function for testing main class resolution on remote JAR files. * --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org