This is an automated email from the ASF dual-hosted git repository.

holden pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new af8dd411aa9 [SPARK-33782][K8S][CORE] Place spark.files, spark.jars and 
spark.files under the current working directory on the driver in K8S cluster 
mode
af8dd411aa9 is described below

commit af8dd411aa9d15bc59d09cf9959d4a57debc9635
Author: pralabhkumar <pralabhku...@gmail.com>
AuthorDate: Mon Dec 12 21:16:54 2022 -0800

    [SPARK-33782][K8S][CORE] Place spark.files, spark.jars and spark.files 
under the current working directory on the driver in K8S cluster mode
    
    ### What changes were proposed in this pull request?
    This PR will place spark.files , spark.jars and spark.pyfiles to the 
current working directory on the driver in K8s cluster mode
    
    ### Why are the changes needed?
    This mimics the behaviour of Yarn and also helps user to access files from 
PWD . Also as mentioned in the jira
    By doing this, users can, for example, leverage PEX to manage Python 
dependences in Apache Spark:
    ```
    pex pyspark==3.0.1 pyarrow==0.15.1 pandas==0.25.3 -o myarchive.pex
    PYSPARK_PYTHON=./myarchive.pex spark-submit --files myarchive.pex
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Tested via unit test cases and also ran on local K8s cluster.
    
    Closes #37417 from pralabhkumar/rk_k8s_local_resource.
    
    Authored-by: pralabhkumar <pralabhku...@gmail.com>
    Signed-off-by: Holden Karau <hka...@netflix.com>
---
 .../org/apache/spark/deploy/SparkSubmit.scala      | 55 +++++++++++++---------
 .../org/apache/spark/deploy/SparkSubmitSuite.scala | 35 ++++++++++++++
 2 files changed, 69 insertions(+), 21 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 783cf47df16..73acfedd8bc 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -20,6 +20,7 @@ package org.apache.spark.deploy
 import java.io._
 import java.lang.reflect.{InvocationTargetException, 
UndeclaredThrowableException}
 import java.net.{URI, URL}
+import java.nio.file.Files
 import java.security.PrivilegedExceptionAction
 import java.text.ParseException
 import java.util.{ServiceLoader, UUID}
@@ -383,43 +384,55 @@ private[spark] class SparkSubmit extends Logging {
       }.orNull
 
       if (isKubernetesClusterModeDriver) {
-        // Replace with the downloaded local jar path to avoid propagating 
hadoop compatible uris.
-        // Executors will get the jars from the Spark file server.
-        // Explicitly download the related files here
-        args.jars = localJars
-        val filesLocalFiles = Option(args.files).map {
-          downloadFileList(_, targetDir, sparkConf, hadoopConf)
-        }.orNull
-        val archiveLocalFiles = Option(args.archives).map { uris =>
+        // SPARK-33748: this mimics the behaviour of Yarn cluster mode. If the 
driver is running
+        // in cluster mode, the archives should be available in the driver's 
current working
+        // directory too.
+        // SPARK-33782 : This downloads all the files , jars , archiveFiles 
and pyfiles to current
+        // working directory
+        def downloadResourcesToCurrentDirectory(uris: String, isArchive: 
Boolean = false):
+        String = {
           val resolvedUris = Utils.stringToSeq(uris).map(Utils.resolveURI)
-          val localArchives = downloadFileList(
+          val localResources = downloadFileList(
             resolvedUris.map(
               
UriBuilder.fromUri(_).fragment(null).build().toString).mkString(","),
             targetDir, sparkConf, hadoopConf)
-
-          // SPARK-33748: this mimics the behaviour of Yarn cluster mode. If 
the driver is running
-          // in cluster mode, the archives should be available in the driver's 
current working
-          // directory too.
-          
Utils.stringToSeq(localArchives).map(Utils.resolveURI).zip(resolvedUris).map {
-            case (localArchive, resolvedUri) =>
-              val source = new File(localArchive.getPath)
+          
Utils.stringToSeq(localResources).map(Utils.resolveURI).zip(resolvedUris).map {
+            case (localResources, resolvedUri) =>
+              val source = new File(localResources.getPath)
               val dest = new File(
                 ".",
                 if (resolvedUri.getFragment != null) resolvedUri.getFragment 
else source.getName)
               logInfo(
-                s"Unpacking an archive $resolvedUri " +
+                s"Files  $resolvedUri " +
                   s"from ${source.getAbsolutePath} to ${dest.getAbsolutePath}")
               Utils.deleteRecursively(dest)
-              Utils.unpack(source, dest)
-
+              if (isArchive) {
+                Utils.unpack(source, dest)
+              } else {
+                Files.copy(source.toPath, dest.toPath)
+              }
               // Keep the URIs of local files with the given fragments.
               UriBuilder.fromUri(
-                
localArchive).fragment(resolvedUri.getFragment).build().toString
+                
localResources).fragment(resolvedUri.getFragment).build().toString
           }.mkString(",")
+        }
+
+        val filesLocalFiles = Option(args.files).map {
+          downloadResourcesToCurrentDirectory(_)
+        }.orNull
+        val jarsLocalJars = Option(args.jars).map {
+          downloadResourcesToCurrentDirectory(_)
+        }.orNull
+        val archiveLocalFiles = Option(args.archives).map {
+          downloadResourcesToCurrentDirectory(_, true)
+        }.orNull
+        val pyLocalFiles = Option(args.pyFiles).map {
+          downloadResourcesToCurrentDirectory(_)
         }.orNull
         args.files = filesLocalFiles
         args.archives = archiveLocalFiles
-        args.pyFiles = localPyFiles
+        args.pyFiles = pyLocalFiles
+        args.jars = jarsLocalJars
       }
     }
 
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 6bd3a49576a..76311d0ab18 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -486,6 +486,41 @@ class SparkSubmitSuite
     conf.get("spark.kubernetes.driver.container.image") should be ("bar")
   }
 
+  test("SPARK-33782: handles k8s files download to current directory") {
+    val clArgs = Seq(
+      "--deploy-mode", "client",
+      "--proxy-user", "test.user",
+      "--master", "k8s://host:port",
+      "--executor-memory", "5g",
+      "--class", "org.SomeClass",
+      "--driver-memory", "4g",
+      "--conf", "spark.kubernetes.namespace=spark",
+      "--conf", "spark.kubernetes.driver.container.image=bar",
+      "--conf", "spark.kubernetes.submitInDriver=true",
+      "--files", "src/test/resources/test_metrics_config.properties",
+      "--py-files", "src/test/resources/test_metrics_system.properties",
+      "--archives", "src/test/resources/log4j2.properties",
+      "--jars", "src/test/resources/TestUDTF.jar",
+      "/home/thejar.jar",
+      "arg1")
+    val appArgs = new SparkSubmitArguments(clArgs)
+    val (childArgs, classpath, conf, mainClass) = 
submit.prepareSubmitEnvironment(appArgs)
+    conf.get("spark.master") should be ("k8s://https://host:port";)
+    conf.get("spark.executor.memory") should be ("5g")
+    conf.get("spark.driver.memory") should be ("4g")
+    conf.get("spark.kubernetes.namespace") should be ("spark")
+    conf.get("spark.kubernetes.driver.container.image") should be ("bar")
+
+    Files.exists(Paths.get("test_metrics_config.properties")) should be (true)
+    Files.exists(Paths.get("test_metrics_system.properties")) should be (true)
+    Files.exists(Paths.get("log4j2.properties")) should be (true)
+    Files.exists(Paths.get("TestUDTF.jar")) should be (true)
+    Files.delete(Paths.get("test_metrics_config.properties"))
+    Files.delete(Paths.get("test_metrics_system.properties"))
+    Files.delete(Paths.get("log4j2.properties"))
+    Files.delete(Paths.get("TestUDTF.jar"))
+  }
+
   /**
    * Helper function for testing main class resolution on remote JAR files.
    *


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to