This is an automated email from the ASF dual-hosted git repository.

bowenliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 49d224e00 [KYUUBI #6335] [REST] Support uploading extra resources in 
creating batch jobs via REST API
49d224e00 is described below

commit 49d224e0026511dab1250d13089f8bb6ec738abd
Author: Bowen Liang <liangbo...@gf.com.cn>
AuthorDate: Wed Aug 7 14:24:02 2024 +0800

    [KYUUBI #6335] [REST] Support uploading extra resources in creating batch 
jobs via REST API
    
    # :mag: Description
    ## Issue References ๐Ÿ”—
    
    ## Describe Your Solution ๐Ÿ”ง
    - support creating batch jobs with uploading extra resource files
    - allow uploading extra resource when creating batch jobs via REST API
    - support binding the subresources to configs by customed configs, 
eg.`spark.submit.pyFiles`.
    
    ## Types of changes :bookmark:
    
    - [ ] Bugfix (non-breaking change which fixes an issue)
    - [x] New feature (non-breaking change which adds functionality)
    - [ ] Breaking change (fix or feature that would cause existing 
functionality to change)
    
    ## Test Plan ๐Ÿงช
    
    #### Behavior Without This Pull Request :coffin:
    
    #### Behavior With This Pull Request :tada:
    
    #### Related Unit Tests
    + new test
    
    ---
    
    # Checklist ๐Ÿ“
    
    - [x] This patch was not authored or co-authored using [Generative 
Tooling](https://www.apache.org/legal/generative-tooling.html)
    
    **Be nice. Be informative.**
    
    Closes #6335 from bowenliang123/batch-subresource.
    
    Closes #6335
    
    57d43d26d [Bowen Liang] nit
    d866a8a17 [Bowen Liang] warn exception
    20d4328a1 [Bowen Liang] log exception when exception ignored
    58c402334 [Bowen Liang] rename param to ignoreException
    80bc21034 [Bowen Liang] cleanup the uploaded resource folder when handling 
files error
    3e7961124 [Bowen Liang] throw exception when file non-existed
    09ac48a26 [liangbowen] pyspark extra resources
    
    Lead-authored-by: Bowen Liang <liangbo...@gf.com.cn>
    Co-authored-by: liangbowen <liangbo...@gf.com.cn>
    Signed-off-by: Bowen Liang <liangbo...@gf.com.cn>
---
 .../src/main/scala/org/apache/kyuubi/Utils.scala   |  21 +++-
 .../org/apache/kyuubi/client/BatchRestApi.java     |  24 +++-
 .../kyuubi/client/api/v1/dto/BatchRequest.java     |  20 +++-
 .../kyuubi/operation/BatchJobSubmission.scala      |   7 +-
 .../kyuubi/server/api/v1/BatchesResource.scala     | 127 ++++++++++++++++++---
 .../apache/kyuubi/session/KyuubiBatchSession.scala |  10 +-
 kyuubi-server/src/test/resources/python/app.py     |  20 ++++
 .../src/test/resources/python/module1/__init__.py  |   0
 .../src/test/resources/python/module1/module.py    |   5 +
 .../src/test/resources/python/module2/__init__.py  |   0
 .../src/test/resources/python/module2/module.py    |   6 +
 .../server/rest/client/BatchRestApiSuite.scala     |  88 +++++++++++++-
 12 files changed, 296 insertions(+), 32 deletions(-)

diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
index 5944e9f97..326b1601f 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
@@ -137,12 +137,23 @@ object Utils extends Logging {
   /**
    * Delete a directory recursively.
    */
-  def deleteDirectoryRecursively(f: File): Boolean = {
-    if (f.isDirectory) f.listFiles match {
-      case files: Array[File] => files.foreach(deleteDirectoryRecursively)
-      case _ =>
+  def deleteDirectoryRecursively(f: File, ignoreException: Boolean = true): 
Unit = {
+    if (f.isDirectory) {
+      val files = f.listFiles
+      if (files != null && files.nonEmpty) {
+        files.foreach(deleteDirectoryRecursively(_, ignoreException))
+      }
+    }
+    try {
+      f.delete()
+    } catch {
+      case e: Exception =>
+        if (ignoreException) {
+          warn(s"Ignoring the exception in deleting file, path: ${f.toPath}", 
e)
+        } else {
+          throw e
+        }
     }
-    f.delete()
   }
 
   /**
diff --git 
a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/BatchRestApi.java 
b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/BatchRestApi.java
index e6f9577b3..681170b87 100644
--- 
a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/BatchRestApi.java
+++ 
b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/BatchRestApi.java
@@ -18,8 +18,9 @@
 package org.apache.kyuubi.client;
 
 import java.io.File;
-import java.util.HashMap;
-import java.util.Map;
+import java.nio.file.Paths;
+import java.util.*;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.kyuubi.client.api.v1.dto.*;
 import org.apache.kyuubi.client.util.JsonUtils;
 import org.apache.kyuubi.client.util.VersionUtils;
@@ -46,10 +47,29 @@ public class BatchRestApi {
   }
 
   public Batch createBatch(BatchRequest request, File resourceFile) {
+    return createBatch(request, resourceFile, Collections.emptyList());
+  }
+
+  public Batch createBatch(BatchRequest request, File resourceFile, 
List<String> extraResources) {
     setClientVersion(request);
     Map<String, MultiPart> multiPartMap = new HashMap<>();
     multiPartMap.put("batchRequest", new 
MultiPart(MultiPart.MultiPartType.JSON, request));
     multiPartMap.put("resourceFile", new 
MultiPart(MultiPart.MultiPartType.FILE, resourceFile));
+    extraResources.stream()
+        .distinct()
+        .filter(StringUtils::isNotBlank)
+        .map(
+            path -> {
+              File file = Paths.get(path).toFile();
+              if (!file.exists()) {
+                throw new RuntimeException("File not existed, path: " + path);
+              }
+              return file;
+            })
+        .forEach(
+            file ->
+                multiPartMap.put(
+                    file.getName(), new 
MultiPart(MultiPart.MultiPartType.FILE, file)));
     return this.getClient().post(API_BASE_PATH, multiPartMap, Batch.class, 
client.getAuthHeader());
   }
 
diff --git 
a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/BatchRequest.java
 
b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/BatchRequest.java
index f45821fc2..ac9850498 100644
--- 
a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/BatchRequest.java
+++ 
b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/BatchRequest.java
@@ -31,6 +31,7 @@ public class BatchRequest {
   private String name;
   private Map<String, String> conf = Collections.emptyMap();
   private List<String> args = Collections.emptyList();
+  private Map<String, String> extraResourcesMap = Collections.emptyMap();
 
   public BatchRequest() {}
 
@@ -110,6 +111,14 @@ public class BatchRequest {
     this.args = args;
   }
 
+  public Map<String, String> getExtraResourcesMap() {
+    return extraResourcesMap;
+  }
+
+  public void setExtraResourcesMap(Map<String, String> extraResourcesMap) {
+    this.extraResourcesMap = extraResourcesMap;
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) return true;
@@ -120,13 +129,20 @@ public class BatchRequest {
         && Objects.equals(getClassName(), that.getClassName())
         && Objects.equals(getName(), that.getName())
         && Objects.equals(getConf(), that.getConf())
-        && Objects.equals(getArgs(), that.getArgs());
+        && Objects.equals(getArgs(), that.getArgs())
+        && Objects.equals(getExtraResourcesMap(), that.getExtraResourcesMap());
   }
 
   @Override
   public int hashCode() {
     return Objects.hash(
-        getBatchType(), getResource(), getClassName(), getName(), getConf(), 
getArgs());
+        getBatchType(),
+        getResource(),
+        getClassName(),
+        getName(),
+        getConf(),
+        getArgs(),
+        getExtraResourcesMap());
   }
 
   @Override
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
index 2c90058db..8b2cfef85 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
@@ -17,7 +17,6 @@
 
 package org.apache.kyuubi.operation
 
-import java.nio.file.{Files, Paths}
 import java.util.Locale
 import java.util.concurrent.TimeUnit
 
@@ -395,11 +394,7 @@ class BatchJobSubmission(
 
   private def cleanupUploadedResourceIfNeeded(): Unit = {
     if (session.isResourceUploaded) {
-      try {
-        Files.deleteIfExists(Paths.get(resource))
-      } catch {
-        case e: Throwable => error(s"Error deleting the uploaded resource: 
$resource", e)
-      }
+      Utils.deleteDirectoryRecursively(session.resourceUploadFolderPath.toFile)
     }
   }
 }
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
index aed806714..182b28b0c 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
@@ -18,6 +18,7 @@
 package org.apache.kyuubi.server.api.v1
 
 import java.io.InputStream
+import java.nio.file.{Path => JPath}
 import java.util
 import java.util.{Collections, Locale, UUID}
 import java.util.concurrent.ConcurrentHashMap
@@ -32,7 +33,7 @@ import io.swagger.v3.oas.annotations.media.{Content, Schema}
 import io.swagger.v3.oas.annotations.responses.ApiResponse
 import io.swagger.v3.oas.annotations.tags.Tag
 import org.apache.commons.lang3.StringUtils
-import org.glassfish.jersey.media.multipart.{FormDataContentDisposition, 
FormDataParam}
+import org.glassfish.jersey.media.multipart.{FormDataContentDisposition, 
FormDataMultiPart, FormDataParam}
 
 import org.apache.kyuubi.{Logging, Utils}
 import org.apache.kyuubi.client.api.v1.dto._
@@ -190,7 +191,8 @@ private[v1] class BatchesResource extends ApiRequestContext 
with Logging {
   def openBatchSessionWithUpload(
       @FormDataParam("batchRequest") batchRequest: BatchRequest,
       @FormDataParam("resourceFile") resourceFileInputStream: InputStream,
-      @FormDataParam("resourceFile") resourceFileMetadata: 
FormDataContentDisposition): Batch = {
+      @FormDataParam("resourceFile") resourceFileMetadata: 
FormDataContentDisposition,
+      formDataMultiPart: FormDataMultiPart): Batch = {
     require(
       fe.getConf.get(BATCH_RESOURCE_UPLOAD_ENABLED),
       "Batch resource upload function is disabled.")
@@ -198,12 +200,12 @@ private[v1] class BatchesResource extends 
ApiRequestContext with Logging {
       batchRequest != null,
       "batchRequest is required and please check the content type" +
         " of batchRequest is application/json")
-    val tempFile = Utils.writeToTempFile(
-      resourceFileInputStream,
-      KyuubiApplicationManager.uploadWorkDir,
-      resourceFileMetadata.getFileName)
-    batchRequest.setResource(tempFile.getPath)
-    openBatchSessionInternal(batchRequest, isResourceFromUpload = true)
+    openBatchSessionInternal(
+      batchRequest,
+      isResourceFromUpload = true,
+      resourceFileInputStream = Some(resourceFileInputStream),
+      resourceFileMetadata = Some(resourceFileMetadata),
+      formDataMultiPartOpt = Some(formDataMultiPart))
   }
 
   /**
@@ -215,7 +217,10 @@ private[v1] class BatchesResource extends 
ApiRequestContext with Logging {
    */
   private def openBatchSessionInternal(
       request: BatchRequest,
-      isResourceFromUpload: Boolean = false): Batch = {
+      isResourceFromUpload: Boolean = false,
+      resourceFileInputStream: Option[InputStream] = None,
+      resourceFileMetadata: Option[FormDataContentDisposition] = None,
+      formDataMultiPartOpt: Option[FormDataMultiPart] = None): Batch = {
     require(
       supportedBatchType(request.getBatchType),
       s"${request.getBatchType} is not in the supported list: 
$SUPPORTED_BATCH_TYPES}")
@@ -243,6 +248,14 @@ private[v1] class BatchesResource extends 
ApiRequestContext with Logging {
         markDuplicated(batch)
       case None =>
         val batchId = userProvidedBatchId.getOrElse(UUID.randomUUID().toString)
+        if (isResourceFromUpload) {
+          handleUploadingFiles(
+            batchId,
+            request,
+            resourceFileInputStream.get,
+            resourceFileMetadata.get.getFileName,
+            formDataMultiPartOpt)
+        }
         request.setConf(
           (request.getConf.asScala ++ Map(
             KYUUBI_BATCH_ID_KEY -> batchId,
@@ -525,22 +538,110 @@ private[v1] class BatchesResource extends 
ApiRequestContext with Logging {
       }
     }
   }
+
+  private def handleUploadingFiles(
+      batchId: String,
+      request: BatchRequest,
+      resourceFileInputStream: InputStream,
+      resourceFileName: String,
+      formDataMultiPartOpt: Option[FormDataMultiPart]): Option[JPath] = {
+    val uploadFileFolderPath = batchResourceUploadFolderPath(batchId)
+    try {
+      handleUploadingResourceFile(
+        request,
+        resourceFileInputStream,
+        resourceFileName,
+        uploadFileFolderPath)
+      handleUploadingExtraResourcesFiles(request, formDataMultiPartOpt, 
uploadFileFolderPath)
+      Some(uploadFileFolderPath)
+    } catch {
+      case e: Exception =>
+        Utils.deleteDirectoryRecursively(uploadFileFolderPath.toFile)
+        throw e
+    }
+  }
+
+  private def handleUploadingResourceFile(
+      request: BatchRequest,
+      inputStream: InputStream,
+      fileName: String,
+      uploadFileFolderPath: JPath): Unit = {
+    try {
+      val tempFile = Utils.writeToTempFile(inputStream, uploadFileFolderPath, 
fileName)
+      request.setResource(tempFile.getPath)
+    } catch {
+      case e: Exception =>
+        throw new RuntimeException(
+          s"Failed handling uploaded resource file $fileName: ${e.getMessage}",
+          e)
+    }
+  }
+
+  private def handleUploadingExtraResourcesFiles(
+      request: BatchRequest,
+      formDataMultiPartOpt: Option[FormDataMultiPart],
+      uploadFileFolderPath: JPath): Unit = {
+    val extraResourceMap = request.getExtraResourcesMap.asScala
+    if (extraResourceMap.nonEmpty) {
+      val fileNameSeparator = ","
+      val formDataMultiPart = formDataMultiPartOpt.get
+      val transformedExtraResourcesMap = extraResourceMap
+        .mapValues(confValue =>
+          confValue.split(fileNameSeparator).filter(StringUtils.isNotBlank(_)))
+        .filter { case (confKey, fileNames) =>
+          fileNames.nonEmpty && StringUtils.isNotBlank(confKey)
+        }.mapValues { fileNames =>
+          fileNames.map(fileName =>
+            Option(formDataMultiPart.getField(fileName))
+              .getOrElse(throw new RuntimeException(s"File part for file 
$fileName not found")))
+        }.map {
+          case (confKey, fileParts) =>
+            val tempFilePaths = fileParts.map { filePart =>
+              val fileName = filePart.getContentDisposition.getFileName
+              try {
+                Utils.writeToTempFile(
+                  filePart.getValueAs(classOf[InputStream]),
+                  uploadFileFolderPath,
+                  fileName).getPath
+              } catch {
+                case e: Exception =>
+                  throw new RuntimeException(
+                    s"Failed handling uploaded extra resource file $fileName: 
${e.getMessage}",
+                    e)
+              }
+            }
+            (confKey, tempFilePaths.mkString(fileNameSeparator))
+        }
+
+      val conf = request.getConf
+      transformedExtraResourcesMap.foreach { case (confKey, tempFilePathStr) =>
+        conf.get(confKey) match {
+          case confValue: String if StringUtils.isNotBlank(confValue) =>
+            conf.put(confKey, List(confValue.trim, 
tempFilePathStr).mkString(fileNameSeparator))
+          case _ => conf.put(confKey, tempFilePathStr)
+        }
+      }
+    }
+  }
 }
 
 object BatchesResource {
-  val SUPPORTED_BATCH_TYPES = Seq("SPARK", "PYSPARK")
-  val VALID_BATCH_STATES = Seq(
+  private lazy val SUPPORTED_BATCH_TYPES = Set("SPARK", "PYSPARK")
+  private lazy val VALID_BATCH_STATES = Set(
     OperationState.PENDING,
     OperationState.RUNNING,
     OperationState.FINISHED,
     OperationState.ERROR,
     OperationState.CANCELED).map(_.toString)
 
-  def supportedBatchType(batchType: String): Boolean = {
+  private def supportedBatchType(batchType: String): Boolean = {
     Option(batchType).exists(bt => 
SUPPORTED_BATCH_TYPES.contains(bt.toUpperCase(Locale.ROOT)))
   }
 
-  def validBatchState(batchState: String): Boolean = {
+  private def validBatchState(batchState: String): Boolean = {
     Option(batchState).exists(bt => 
VALID_BATCH_STATES.contains(bt.toUpperCase(Locale.ROOT)))
   }
+
+  def batchResourceUploadFolderPath(batchId: String): JPath =
+    KyuubiApplicationManager.uploadWorkDir.resolve(s"batch-$batchId")
 }
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala
index f648c39cb..149c7ab01 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala
@@ -17,6 +17,8 @@
 
 package org.apache.kyuubi.session
 
+import java.nio.file.Path
+
 import scala.collection.JavaConverters._
 
 import org.apache.kyuubi.client.util.BatchUtils._
@@ -26,6 +28,7 @@ import org.apache.kyuubi.engine.KyuubiApplicationManager
 import org.apache.kyuubi.engine.spark.SparkProcessBuilder
 import org.apache.kyuubi.events.{EventBus, KyuubiSessionEvent}
 import org.apache.kyuubi.operation.OperationState
+import org.apache.kyuubi.server.api.v1.BatchesResource
 import org.apache.kyuubi.server.metadata.api.Metadata
 import org.apache.kyuubi.session.SessionType.SessionType
 import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TProtocolVersion
@@ -79,6 +82,9 @@ class KyuubiBatchSession(
   override val normalizedConf: Map[String, String] =
     sessionConf.getBatchConf(batchType) ++ 
sessionManager.validateBatchConf(conf)
 
+  private[kyuubi] def resourceUploadFolderPath: Path =
+    BatchesResource.batchResourceUploadFolderPath(batchJobSubmissionOp.batchId)
+
   val optimizedConf: Map[String, String] = {
     val confOverlay = sessionManager.sessionConfAdvisor.map(_.getConfOverlay(
       user,
@@ -101,8 +107,8 @@ class KyuubiBatchSession(
     
batchName.filterNot(_.trim.isEmpty).orElse(optimizedConf.get(KyuubiConf.SESSION_NAME.key))
 
   // whether the resource file is from uploading
-  private[kyuubi] val isResourceUploaded: Boolean =
-    conf.getOrElse(KyuubiReservedKeys.KYUUBI_BATCH_RESOURCE_UPLOADED_KEY, 
"false").toBoolean
+  private[kyuubi] lazy val isResourceUploaded: Boolean =
+    conf.getOrElse(KyuubiReservedKeys.KYUUBI_BATCH_RESOURCE_UPLOADED_KEY, 
false.toString).toBoolean
 
   private[kyuubi] lazy val batchJobSubmissionOp = 
sessionManager.operationManager
     .newBatchJobSubmissionOperation(
diff --git a/kyuubi-server/src/test/resources/python/app.py 
b/kyuubi-server/src/test/resources/python/app.py
new file mode 100644
index 000000000..482a82196
--- /dev/null
+++ b/kyuubi-server/src/test/resources/python/app.py
@@ -0,0 +1,20 @@
+from module1.module import func1
+
+from pyspark.sql import SparkSession
+from pyspark.sql.types import StructType, StructField, IntegerType
+
+if __name__ == "__main__":
+    print(f"Started running PySpark app at {func1()}")
+
+    spark = SparkSession.builder.appName("pyspark-sample").getOrCreate()
+    sc = spark.sparkContext
+
+    data = [1, 2, 3, 4, 5]
+    rdd = sc.parallelize(data)
+    transformed_rdd = rdd.map(lambda x: x * 2)
+    collected = transformed_rdd.collect()
+
+    df = spark.createDataFrame(transformed_rdd, IntegerType())
+    df.coalesce(1).write.format("csv").option("header", "false").save("/tmp/" 
+ func1())
+
+    print(f"Result: {collected}")
diff --git a/kyuubi-server/src/test/resources/python/module1/__init__.py 
b/kyuubi-server/src/test/resources/python/module1/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/kyuubi-server/src/test/resources/python/module1/module.py 
b/kyuubi-server/src/test/resources/python/module1/module.py
new file mode 100644
index 000000000..aac092925
--- /dev/null
+++ b/kyuubi-server/src/test/resources/python/module1/module.py
@@ -0,0 +1,5 @@
+from module2.module import current_time
+
+
+def func1():
+    return "result_" + current_time()
diff --git a/kyuubi-server/src/test/resources/python/module2/__init__.py 
b/kyuubi-server/src/test/resources/python/module2/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/kyuubi-server/src/test/resources/python/module2/module.py 
b/kyuubi-server/src/test/resources/python/module2/module.py
new file mode 100644
index 000000000..ba098a762
--- /dev/null
+++ b/kyuubi-server/src/test/resources/python/module2/module.py
@@ -0,0 +1,6 @@
+from datetime import datetime
+
+
+def current_time():
+    now = datetime.now()
+    return now.strftime("%Y%m%d%H%M%S")
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchRestApiSuite.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchRestApiSuite.scala
index 20ec2fc0a..e6ea0d162 100644
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchRestApiSuite.scala
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchRestApiSuite.scala
@@ -16,9 +16,12 @@
  */
 
 package org.apache.kyuubi.server.rest.client
-
-import java.nio.file.Paths
+import java.io.{File, FileOutputStream}
+import java.nio.file.{Files, Path, Paths}
 import java.util.Base64
+import java.util.zip.{ZipEntry, ZipOutputStream}
+
+import scala.collection.JavaConverters._
 
 import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
 
@@ -29,6 +32,7 @@ import org.apache.kyuubi.client.exception.KyuubiRestException
 import org.apache.kyuubi.config.KyuubiReservedKeys
 import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem}
 import org.apache.kyuubi.session.{KyuubiSession, SessionHandle}
+import org.apache.kyuubi.util.GoldenFileUtils.getCurrentModuleHome
 
 class BatchRestApiSuite extends RestClientTestHelper with BatchTestHelper {
 
@@ -99,6 +103,86 @@ class BatchRestApiSuite extends RestClientTestHelper with 
BatchTestHelper {
     basicKyuubiRestClient.close()
   }
 
+  test("basic batch rest client with uploading resource and extra resources") {
+    def preparePyModulesZip(
+        srcFolderPath: Path,
+        targetZipFileName: String,
+        excludedFileNames: Set[String] = Set.empty[String]): String = {
+
+      def addFolderToZip(zos: ZipOutputStream, folder: File, parentFolder: 
String = ""): Unit = {
+        if (folder.isDirectory) {
+          folder.listFiles().foreach { file =>
+            val fileName = file.getName
+            if (!(excludedFileNames.contains(fileName) || 
fileName.startsWith("."))) {
+              if (file.isDirectory) {
+                val folderPath =
+                  if (parentFolder.isEmpty) fileName else parentFolder + "/" + 
fileName
+                addFolderToZip(zos, file, folderPath)
+              } else {
+                val filePath = if (parentFolder.isEmpty) fileName else 
parentFolder + "/" + fileName
+                zos.putNextEntry(new ZipEntry(filePath))
+                zos.write(Files.readAllBytes(file.toPath))
+                zos.closeEntry()
+              }
+            }
+          }
+        }
+      }
+
+      val zipFilePath = Paths.get(System.getProperty("java.io.tmpdir"), 
targetZipFileName).toString
+      val fileOutputStream = new FileOutputStream(zipFilePath)
+      val zipOutputStream = new ZipOutputStream(fileOutputStream)
+      try {
+        addFolderToZip(zipOutputStream, srcFolderPath.toFile)
+      } finally {
+        zipOutputStream.close()
+        fileOutputStream.close()
+      }
+      zipFilePath
+    }
+
+    val basicKyuubiRestClient: KyuubiRestClient =
+      KyuubiRestClient.builder(baseUri.toString)
+        .authHeaderMethod(KyuubiRestClient.AuthHeaderMethod.BASIC)
+        .username(ldapUser)
+        .password(ldapUserPasswd)
+        .socketTimeout(5 * 60 * 1000)
+        .build()
+    val batchRestApi: BatchRestApi = new BatchRestApi(basicKyuubiRestClient)
+
+    val pythonScriptsPath = 
s"${getCurrentModuleHome(this)}/src/test/resources/python/"
+    val appScriptFileName = "app.py"
+    val appScriptFile = Paths.get(pythonScriptsPath, appScriptFileName).toFile
+    val modulesZipFileName = "pymodules.zip"
+    val modulesZipFile = preparePyModulesZip(
+      srcFolderPath = Paths.get(pythonScriptsPath),
+      targetZipFileName = modulesZipFileName,
+      excludedFileNames = Set(appScriptFileName))
+
+    val requestObj = newSparkBatchRequest(Map("spark.master" -> "local"))
+    requestObj.setBatchType("PYSPARK")
+    requestObj.setName("pyspark-test")
+    requestObj.setExtraResourcesMap(Map("spark.submit.pyFiles" -> 
modulesZipFileName).asJava)
+    val extraResources = List(modulesZipFile)
+    val batch: Batch = batchRestApi.createBatch(requestObj, appScriptFile, 
extraResources.asJava)
+
+    try {
+      assert(batch.getKyuubiInstance === fe.connectionUrl)
+      assert(batch.getBatchType === "PYSPARK")
+      val batchId = batch.getId
+      assert(batchId !== null)
+
+      eventually(timeout(1.minutes), interval(1.seconds)) {
+        val batch = batchRestApi.getBatchById(batchId)
+        assert(batch.getState == "FINISHED")
+      }
+
+    } finally {
+      Files.deleteIfExists(Paths.get(modulesZipFile))
+      basicKyuubiRestClient.close()
+    }
+  }
+
   test("basic batch rest client with invalid user") {
     val totalConnections =
       
MetricsSystem.counterValue(MetricsConstants.REST_CONN_TOTAL).getOrElse(0L)

Reply via email to