antonipp commented on code in PR #38376:
URL: https://github.com/apache/spark/pull/38376#discussion_r1073566530


##########
resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DepsTestsSuite.scala:
##########
@@ -239,6 +239,41 @@ private[spark] trait DepsTestsSuite { k8sSuite: 
KubernetesSuite =>
     }
   }
 
+  test(
+    "SPARK-40817: Check that remote files do not get discarded in spark.files",

Review Comment:
   I am getting a Scalastyle error when putting everything on one line ("File 
line length exceeds 100 characters."), hence my formatting



##########
resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DepsTestsSuite.scala:
##########
@@ -259,20 +294,46 @@ private[spark] trait DepsTestsSuite { k8sSuite: 
KubernetesSuite =>
     }
   }
 
-  private def createS3Bucket(accessKey: String, secretKey: String, endPoint: 
String): Unit = {
+  private def getS3Client(
+      s3Endpoint: String,
+      accessKey: String = ACCESS_KEY,
+      secretKey: String = SECRET_KEY): AmazonS3Client = {
+    val credentials = new BasicAWSCredentials(accessKey, secretKey)
+    val s3client = new AmazonS3Client(credentials)
+    s3client.setEndpoint(s3Endpoint)
+    s3client
+  }
+
+  private def createS3Bucket(
+      s3Endpoint: String = getServiceUrl(svcName),
+      bucket: String = BUCKET): Unit = {

Review Comment:
   Makes sense, I updated this code to keep refactoring to a minimum and keep 
the same function signature. I still think it's worth adding a new 
`getS3Client()` method since both `createS3Bucket()` and `createS3Object()` 
need it.



##########
resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DepsTestsSuite.scala:
##########
@@ -239,6 +239,41 @@ private[spark] trait DepsTestsSuite { k8sSuite: 
KubernetesSuite =>
     }
   }
 
+  test(
+    "SPARK-40817: Check that remote files do not get discarded in spark.files",
+    k8sTestTag,
+    MinikubeTag) {
+    tryDepsTest({
+      // Create a local file
+      val localFileName = Utils.createTempFile(FILE_CONTENTS, HOST_PATH)
+
+      // Create a remote file on S3
+      val remoteFileKey = "some-path/some-remote-file.txt"
+      createS3Object(remoteFileKey, "Some Content")
+      val remoteFileFullPath = s"s3a://${BUCKET}/${remoteFileKey}"
+
+      // Put both file paths in spark.files
+      sparkAppConf.set("spark.files", 
s"$HOST_PATH/$localFileName,${remoteFileFullPath}")
+      // Allows to properly read executor logs once the job is finished
+      sparkAppConf.set("spark.kubernetes.executor.deleteOnTermination", 
"false")

Review Comment:
   This is not for humans, I had `expectedExecutorLogOnCompletion` failing (not 
finding any logs) until I added this configuration property. I believe this was 
because `SparkPi` with the default value of [slices = 
2](https://github.com/antonipp/spark/blob/e1ea806b3075d279b5f08a29fe4c1ad6d3c4191a/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala#L32)
 was finishing way too quickly so the executor pod was deleted too quickly as 
well and its logs couldn't be recovered.
   
   I confirmed this by setting the `SparkPi` `slices` argument to 100 (`appArgs 
= Array("100")`) and it also solved the problem of 
`expectedExecutorLogOnCompletion` failing.



##########
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala:
##########
@@ -168,27 +168,27 @@ private[spark] class BasicDriverFeatureStep(conf: 
KubernetesDriverConf)
       MEMORY_OVERHEAD_FACTOR.key -> defaultOverheadFactor.toString)
     // try upload local, resolvable files to a hadoop compatible file system
     Seq(JARS, FILES, ARCHIVES, SUBMIT_PYTHON_FILES).foreach { key =>
-      val uris = conf.get(key).filter(uri => 
KubernetesUtils.isLocalAndResolvable(uri))
+      val (localUris, remoteUris) =
+        conf.get(key).partition(uri => 
KubernetesUtils.isLocalAndResolvable(uri))
       val value = {
         if (key == ARCHIVES) {

Review Comment:
   I don't think there's anything that needs to be done for remote archive 
paths, my PR already works with them too. 
   
   It was important to remove the fragment for **local** archives, because, as 
you said, they need to be uploaded to remote storage (under 
`spark.kubernetes.file.upload.path`) but **remote** archives are _already_ 
stored in remote storage so the URLs can be kept as they are.
   
   Quick example:
   I uploaded a test archive under `s3://$MY_BUCKET/test-archive.tar.gz`.
   The archive contains one file under `test/file.txt` with contents 
`some-content`.
   
   I then ran a Spark job using Spark 3.1.3 built from my branch with the 
following configuration value:
   ```
   [...]
   --conf spark.archives="s3://$MY_BUCKET/test-archive.tar.gz#my-archive",
   [...]
   ```
   I printed the output of the following code in the job:
   ```
   spark.read.text("my-archive/test/file.txt").collect
   ```
   The output was:
   ```
   Array([some-content])
   ```
   
   In the Spark History Server it's also shown that the archive was properly 
downloaded on the pods: 
   
![image](https://user-images.githubusercontent.com/17275833/213245697-7c4a3379-f370-4943-b256-ce16102edc79.png)
   I also see the extracted files on both the driver and executor pods.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to