This is an automated email from the ASF dual-hosted git repository. bowenliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push: new 49d224e00 [KYUUBI #6335] [REST] Support uploading extra resources in creating batch jobs via REST API 49d224e00 is described below commit 49d224e0026511dab1250d13089f8bb6ec738abd Author: Bowen Liang <liangbo...@gf.com.cn> AuthorDate: Wed Aug 7 14:24:02 2024 +0800 [KYUUBI #6335] [REST] Support uploading extra resources in creating batch jobs via REST API # :mag: Description ## Issue References ๐ ## Describe Your Solution ๐ง - support creating batch jobs with uploading extra resource files - allow uploading extra resource when creating batch jobs via REST API - support binding the subresources to configs by customed configs, eg.`spark.submit.pyFiles`. ## Types of changes :bookmark: - [ ] Bugfix (non-breaking change which fixes an issue) - [x] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) ## Test Plan ๐งช #### Behavior Without This Pull Request :coffin: #### Behavior With This Pull Request :tada: #### Related Unit Tests + new test --- # Checklist ๐ - [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html) **Be nice. Be informative.** Closes #6335 from bowenliang123/batch-subresource. Closes #6335 57d43d26d [Bowen Liang] nit d866a8a17 [Bowen Liang] warn exception 20d4328a1 [Bowen Liang] log exception when exception ignored 58c402334 [Bowen Liang] rename param to ignoreException 80bc21034 [Bowen Liang] cleanup the uploaded resource folder when handling files error 3e7961124 [Bowen Liang] throw exception when file non-existed 09ac48a26 [liangbowen] pyspark extra resources Lead-authored-by: Bowen Liang <liangbo...@gf.com.cn> Co-authored-by: liangbowen <liangbo...@gf.com.cn> Signed-off-by: Bowen Liang <liangbo...@gf.com.cn> --- .../src/main/scala/org/apache/kyuubi/Utils.scala | 21 +++- .../org/apache/kyuubi/client/BatchRestApi.java | 24 +++- .../kyuubi/client/api/v1/dto/BatchRequest.java | 20 +++- .../kyuubi/operation/BatchJobSubmission.scala | 7 +- .../kyuubi/server/api/v1/BatchesResource.scala | 127 ++++++++++++++++++--- .../apache/kyuubi/session/KyuubiBatchSession.scala | 10 +- kyuubi-server/src/test/resources/python/app.py | 20 ++++ .../src/test/resources/python/module1/__init__.py | 0 .../src/test/resources/python/module1/module.py | 5 + .../src/test/resources/python/module2/__init__.py | 0 .../src/test/resources/python/module2/module.py | 6 + .../server/rest/client/BatchRestApiSuite.scala | 88 +++++++++++++- 12 files changed, 296 insertions(+), 32 deletions(-) diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala index 5944e9f97..326b1601f 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala @@ -137,12 +137,23 @@ object Utils extends Logging { /** * Delete a directory recursively. */ - def deleteDirectoryRecursively(f: File): Boolean = { - if (f.isDirectory) f.listFiles match { - case files: Array[File] => files.foreach(deleteDirectoryRecursively) - case _ => + def deleteDirectoryRecursively(f: File, ignoreException: Boolean = true): Unit = { + if (f.isDirectory) { + val files = f.listFiles + if (files != null && files.nonEmpty) { + files.foreach(deleteDirectoryRecursively(_, ignoreException)) + } + } + try { + f.delete() + } catch { + case e: Exception => + if (ignoreException) { + warn(s"Ignoring the exception in deleting file, path: ${f.toPath}", e) + } else { + throw e + } } - f.delete() } /** diff --git a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/BatchRestApi.java b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/BatchRestApi.java index e6f9577b3..681170b87 100644 --- a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/BatchRestApi.java +++ b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/BatchRestApi.java @@ -18,8 +18,9 @@ package org.apache.kyuubi.client; import java.io.File; -import java.util.HashMap; -import java.util.Map; +import java.nio.file.Paths; +import java.util.*; +import org.apache.commons.lang3.StringUtils; import org.apache.kyuubi.client.api.v1.dto.*; import org.apache.kyuubi.client.util.JsonUtils; import org.apache.kyuubi.client.util.VersionUtils; @@ -46,10 +47,29 @@ public class BatchRestApi { } public Batch createBatch(BatchRequest request, File resourceFile) { + return createBatch(request, resourceFile, Collections.emptyList()); + } + + public Batch createBatch(BatchRequest request, File resourceFile, List<String> extraResources) { setClientVersion(request); Map<String, MultiPart> multiPartMap = new HashMap<>(); multiPartMap.put("batchRequest", new MultiPart(MultiPart.MultiPartType.JSON, request)); multiPartMap.put("resourceFile", new MultiPart(MultiPart.MultiPartType.FILE, resourceFile)); + extraResources.stream() + .distinct() + .filter(StringUtils::isNotBlank) + .map( + path -> { + File file = Paths.get(path).toFile(); + if (!file.exists()) { + throw new RuntimeException("File not existed, path: " + path); + } + return file; + }) + .forEach( + file -> + multiPartMap.put( + file.getName(), new MultiPart(MultiPart.MultiPartType.FILE, file))); return this.getClient().post(API_BASE_PATH, multiPartMap, Batch.class, client.getAuthHeader()); } diff --git a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/BatchRequest.java b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/BatchRequest.java index f45821fc2..ac9850498 100644 --- a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/BatchRequest.java +++ b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/BatchRequest.java @@ -31,6 +31,7 @@ public class BatchRequest { private String name; private Map<String, String> conf = Collections.emptyMap(); private List<String> args = Collections.emptyList(); + private Map<String, String> extraResourcesMap = Collections.emptyMap(); public BatchRequest() {} @@ -110,6 +111,14 @@ public class BatchRequest { this.args = args; } + public Map<String, String> getExtraResourcesMap() { + return extraResourcesMap; + } + + public void setExtraResourcesMap(Map<String, String> extraResourcesMap) { + this.extraResourcesMap = extraResourcesMap; + } + @Override public boolean equals(Object o) { if (this == o) return true; @@ -120,13 +129,20 @@ public class BatchRequest { && Objects.equals(getClassName(), that.getClassName()) && Objects.equals(getName(), that.getName()) && Objects.equals(getConf(), that.getConf()) - && Objects.equals(getArgs(), that.getArgs()); + && Objects.equals(getArgs(), that.getArgs()) + && Objects.equals(getExtraResourcesMap(), that.getExtraResourcesMap()); } @Override public int hashCode() { return Objects.hash( - getBatchType(), getResource(), getClassName(), getName(), getConf(), getArgs()); + getBatchType(), + getResource(), + getClassName(), + getName(), + getConf(), + getArgs(), + getExtraResourcesMap()); } @Override diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala index 2c90058db..8b2cfef85 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala @@ -17,7 +17,6 @@ package org.apache.kyuubi.operation -import java.nio.file.{Files, Paths} import java.util.Locale import java.util.concurrent.TimeUnit @@ -395,11 +394,7 @@ class BatchJobSubmission( private def cleanupUploadedResourceIfNeeded(): Unit = { if (session.isResourceUploaded) { - try { - Files.deleteIfExists(Paths.get(resource)) - } catch { - case e: Throwable => error(s"Error deleting the uploaded resource: $resource", e) - } + Utils.deleteDirectoryRecursively(session.resourceUploadFolderPath.toFile) } } } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala index aed806714..182b28b0c 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala @@ -18,6 +18,7 @@ package org.apache.kyuubi.server.api.v1 import java.io.InputStream +import java.nio.file.{Path => JPath} import java.util import java.util.{Collections, Locale, UUID} import java.util.concurrent.ConcurrentHashMap @@ -32,7 +33,7 @@ import io.swagger.v3.oas.annotations.media.{Content, Schema} import io.swagger.v3.oas.annotations.responses.ApiResponse import io.swagger.v3.oas.annotations.tags.Tag import org.apache.commons.lang3.StringUtils -import org.glassfish.jersey.media.multipart.{FormDataContentDisposition, FormDataParam} +import org.glassfish.jersey.media.multipart.{FormDataContentDisposition, FormDataMultiPart, FormDataParam} import org.apache.kyuubi.{Logging, Utils} import org.apache.kyuubi.client.api.v1.dto._ @@ -190,7 +191,8 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging { def openBatchSessionWithUpload( @FormDataParam("batchRequest") batchRequest: BatchRequest, @FormDataParam("resourceFile") resourceFileInputStream: InputStream, - @FormDataParam("resourceFile") resourceFileMetadata: FormDataContentDisposition): Batch = { + @FormDataParam("resourceFile") resourceFileMetadata: FormDataContentDisposition, + formDataMultiPart: FormDataMultiPart): Batch = { require( fe.getConf.get(BATCH_RESOURCE_UPLOAD_ENABLED), "Batch resource upload function is disabled.") @@ -198,12 +200,12 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging { batchRequest != null, "batchRequest is required and please check the content type" + " of batchRequest is application/json") - val tempFile = Utils.writeToTempFile( - resourceFileInputStream, - KyuubiApplicationManager.uploadWorkDir, - resourceFileMetadata.getFileName) - batchRequest.setResource(tempFile.getPath) - openBatchSessionInternal(batchRequest, isResourceFromUpload = true) + openBatchSessionInternal( + batchRequest, + isResourceFromUpload = true, + resourceFileInputStream = Some(resourceFileInputStream), + resourceFileMetadata = Some(resourceFileMetadata), + formDataMultiPartOpt = Some(formDataMultiPart)) } /** @@ -215,7 +217,10 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging { */ private def openBatchSessionInternal( request: BatchRequest, - isResourceFromUpload: Boolean = false): Batch = { + isResourceFromUpload: Boolean = false, + resourceFileInputStream: Option[InputStream] = None, + resourceFileMetadata: Option[FormDataContentDisposition] = None, + formDataMultiPartOpt: Option[FormDataMultiPart] = None): Batch = { require( supportedBatchType(request.getBatchType), s"${request.getBatchType} is not in the supported list: $SUPPORTED_BATCH_TYPES}") @@ -243,6 +248,14 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging { markDuplicated(batch) case None => val batchId = userProvidedBatchId.getOrElse(UUID.randomUUID().toString) + if (isResourceFromUpload) { + handleUploadingFiles( + batchId, + request, + resourceFileInputStream.get, + resourceFileMetadata.get.getFileName, + formDataMultiPartOpt) + } request.setConf( (request.getConf.asScala ++ Map( KYUUBI_BATCH_ID_KEY -> batchId, @@ -525,22 +538,110 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging { } } } + + private def handleUploadingFiles( + batchId: String, + request: BatchRequest, + resourceFileInputStream: InputStream, + resourceFileName: String, + formDataMultiPartOpt: Option[FormDataMultiPart]): Option[JPath] = { + val uploadFileFolderPath = batchResourceUploadFolderPath(batchId) + try { + handleUploadingResourceFile( + request, + resourceFileInputStream, + resourceFileName, + uploadFileFolderPath) + handleUploadingExtraResourcesFiles(request, formDataMultiPartOpt, uploadFileFolderPath) + Some(uploadFileFolderPath) + } catch { + case e: Exception => + Utils.deleteDirectoryRecursively(uploadFileFolderPath.toFile) + throw e + } + } + + private def handleUploadingResourceFile( + request: BatchRequest, + inputStream: InputStream, + fileName: String, + uploadFileFolderPath: JPath): Unit = { + try { + val tempFile = Utils.writeToTempFile(inputStream, uploadFileFolderPath, fileName) + request.setResource(tempFile.getPath) + } catch { + case e: Exception => + throw new RuntimeException( + s"Failed handling uploaded resource file $fileName: ${e.getMessage}", + e) + } + } + + private def handleUploadingExtraResourcesFiles( + request: BatchRequest, + formDataMultiPartOpt: Option[FormDataMultiPart], + uploadFileFolderPath: JPath): Unit = { + val extraResourceMap = request.getExtraResourcesMap.asScala + if (extraResourceMap.nonEmpty) { + val fileNameSeparator = "," + val formDataMultiPart = formDataMultiPartOpt.get + val transformedExtraResourcesMap = extraResourceMap + .mapValues(confValue => + confValue.split(fileNameSeparator).filter(StringUtils.isNotBlank(_))) + .filter { case (confKey, fileNames) => + fileNames.nonEmpty && StringUtils.isNotBlank(confKey) + }.mapValues { fileNames => + fileNames.map(fileName => + Option(formDataMultiPart.getField(fileName)) + .getOrElse(throw new RuntimeException(s"File part for file $fileName not found"))) + }.map { + case (confKey, fileParts) => + val tempFilePaths = fileParts.map { filePart => + val fileName = filePart.getContentDisposition.getFileName + try { + Utils.writeToTempFile( + filePart.getValueAs(classOf[InputStream]), + uploadFileFolderPath, + fileName).getPath + } catch { + case e: Exception => + throw new RuntimeException( + s"Failed handling uploaded extra resource file $fileName: ${e.getMessage}", + e) + } + } + (confKey, tempFilePaths.mkString(fileNameSeparator)) + } + + val conf = request.getConf + transformedExtraResourcesMap.foreach { case (confKey, tempFilePathStr) => + conf.get(confKey) match { + case confValue: String if StringUtils.isNotBlank(confValue) => + conf.put(confKey, List(confValue.trim, tempFilePathStr).mkString(fileNameSeparator)) + case _ => conf.put(confKey, tempFilePathStr) + } + } + } + } } object BatchesResource { - val SUPPORTED_BATCH_TYPES = Seq("SPARK", "PYSPARK") - val VALID_BATCH_STATES = Seq( + private lazy val SUPPORTED_BATCH_TYPES = Set("SPARK", "PYSPARK") + private lazy val VALID_BATCH_STATES = Set( OperationState.PENDING, OperationState.RUNNING, OperationState.FINISHED, OperationState.ERROR, OperationState.CANCELED).map(_.toString) - def supportedBatchType(batchType: String): Boolean = { + private def supportedBatchType(batchType: String): Boolean = { Option(batchType).exists(bt => SUPPORTED_BATCH_TYPES.contains(bt.toUpperCase(Locale.ROOT))) } - def validBatchState(batchState: String): Boolean = { + private def validBatchState(batchState: String): Boolean = { Option(batchState).exists(bt => VALID_BATCH_STATES.contains(bt.toUpperCase(Locale.ROOT))) } + + def batchResourceUploadFolderPath(batchId: String): JPath = + KyuubiApplicationManager.uploadWorkDir.resolve(s"batch-$batchId") } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala index f648c39cb..149c7ab01 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala @@ -17,6 +17,8 @@ package org.apache.kyuubi.session +import java.nio.file.Path + import scala.collection.JavaConverters._ import org.apache.kyuubi.client.util.BatchUtils._ @@ -26,6 +28,7 @@ import org.apache.kyuubi.engine.KyuubiApplicationManager import org.apache.kyuubi.engine.spark.SparkProcessBuilder import org.apache.kyuubi.events.{EventBus, KyuubiSessionEvent} import org.apache.kyuubi.operation.OperationState +import org.apache.kyuubi.server.api.v1.BatchesResource import org.apache.kyuubi.server.metadata.api.Metadata import org.apache.kyuubi.session.SessionType.SessionType import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TProtocolVersion @@ -79,6 +82,9 @@ class KyuubiBatchSession( override val normalizedConf: Map[String, String] = sessionConf.getBatchConf(batchType) ++ sessionManager.validateBatchConf(conf) + private[kyuubi] def resourceUploadFolderPath: Path = + BatchesResource.batchResourceUploadFolderPath(batchJobSubmissionOp.batchId) + val optimizedConf: Map[String, String] = { val confOverlay = sessionManager.sessionConfAdvisor.map(_.getConfOverlay( user, @@ -101,8 +107,8 @@ class KyuubiBatchSession( batchName.filterNot(_.trim.isEmpty).orElse(optimizedConf.get(KyuubiConf.SESSION_NAME.key)) // whether the resource file is from uploading - private[kyuubi] val isResourceUploaded: Boolean = - conf.getOrElse(KyuubiReservedKeys.KYUUBI_BATCH_RESOURCE_UPLOADED_KEY, "false").toBoolean + private[kyuubi] lazy val isResourceUploaded: Boolean = + conf.getOrElse(KyuubiReservedKeys.KYUUBI_BATCH_RESOURCE_UPLOADED_KEY, false.toString).toBoolean private[kyuubi] lazy val batchJobSubmissionOp = sessionManager.operationManager .newBatchJobSubmissionOperation( diff --git a/kyuubi-server/src/test/resources/python/app.py b/kyuubi-server/src/test/resources/python/app.py new file mode 100644 index 000000000..482a82196 --- /dev/null +++ b/kyuubi-server/src/test/resources/python/app.py @@ -0,0 +1,20 @@ +from module1.module import func1 + +from pyspark.sql import SparkSession +from pyspark.sql.types import StructType, StructField, IntegerType + +if __name__ == "__main__": + print(f"Started running PySpark app at {func1()}") + + spark = SparkSession.builder.appName("pyspark-sample").getOrCreate() + sc = spark.sparkContext + + data = [1, 2, 3, 4, 5] + rdd = sc.parallelize(data) + transformed_rdd = rdd.map(lambda x: x * 2) + collected = transformed_rdd.collect() + + df = spark.createDataFrame(transformed_rdd, IntegerType()) + df.coalesce(1).write.format("csv").option("header", "false").save("/tmp/" + func1()) + + print(f"Result: {collected}") diff --git a/kyuubi-server/src/test/resources/python/module1/__init__.py b/kyuubi-server/src/test/resources/python/module1/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/kyuubi-server/src/test/resources/python/module1/module.py b/kyuubi-server/src/test/resources/python/module1/module.py new file mode 100644 index 000000000..aac092925 --- /dev/null +++ b/kyuubi-server/src/test/resources/python/module1/module.py @@ -0,0 +1,5 @@ +from module2.module import current_time + + +def func1(): + return "result_" + current_time() diff --git a/kyuubi-server/src/test/resources/python/module2/__init__.py b/kyuubi-server/src/test/resources/python/module2/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/kyuubi-server/src/test/resources/python/module2/module.py b/kyuubi-server/src/test/resources/python/module2/module.py new file mode 100644 index 000000000..ba098a762 --- /dev/null +++ b/kyuubi-server/src/test/resources/python/module2/module.py @@ -0,0 +1,6 @@ +from datetime import datetime + + +def current_time(): + now = datetime.now() + return now.strftime("%Y%m%d%H%M%S") diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchRestApiSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchRestApiSuite.scala index 20ec2fc0a..e6ea0d162 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchRestApiSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchRestApiSuite.scala @@ -16,9 +16,12 @@ */ package org.apache.kyuubi.server.rest.client - -import java.nio.file.Paths +import java.io.{File, FileOutputStream} +import java.nio.file.{Files, Path, Paths} import java.util.Base64 +import java.util.zip.{ZipEntry, ZipOutputStream} + +import scala.collection.JavaConverters._ import org.scalatest.time.SpanSugar.convertIntToGrainOfTime @@ -29,6 +32,7 @@ import org.apache.kyuubi.client.exception.KyuubiRestException import org.apache.kyuubi.config.KyuubiReservedKeys import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem} import org.apache.kyuubi.session.{KyuubiSession, SessionHandle} +import org.apache.kyuubi.util.GoldenFileUtils.getCurrentModuleHome class BatchRestApiSuite extends RestClientTestHelper with BatchTestHelper { @@ -99,6 +103,86 @@ class BatchRestApiSuite extends RestClientTestHelper with BatchTestHelper { basicKyuubiRestClient.close() } + test("basic batch rest client with uploading resource and extra resources") { + def preparePyModulesZip( + srcFolderPath: Path, + targetZipFileName: String, + excludedFileNames: Set[String] = Set.empty[String]): String = { + + def addFolderToZip(zos: ZipOutputStream, folder: File, parentFolder: String = ""): Unit = { + if (folder.isDirectory) { + folder.listFiles().foreach { file => + val fileName = file.getName + if (!(excludedFileNames.contains(fileName) || fileName.startsWith("."))) { + if (file.isDirectory) { + val folderPath = + if (parentFolder.isEmpty) fileName else parentFolder + "/" + fileName + addFolderToZip(zos, file, folderPath) + } else { + val filePath = if (parentFolder.isEmpty) fileName else parentFolder + "/" + fileName + zos.putNextEntry(new ZipEntry(filePath)) + zos.write(Files.readAllBytes(file.toPath)) + zos.closeEntry() + } + } + } + } + } + + val zipFilePath = Paths.get(System.getProperty("java.io.tmpdir"), targetZipFileName).toString + val fileOutputStream = new FileOutputStream(zipFilePath) + val zipOutputStream = new ZipOutputStream(fileOutputStream) + try { + addFolderToZip(zipOutputStream, srcFolderPath.toFile) + } finally { + zipOutputStream.close() + fileOutputStream.close() + } + zipFilePath + } + + val basicKyuubiRestClient: KyuubiRestClient = + KyuubiRestClient.builder(baseUri.toString) + .authHeaderMethod(KyuubiRestClient.AuthHeaderMethod.BASIC) + .username(ldapUser) + .password(ldapUserPasswd) + .socketTimeout(5 * 60 * 1000) + .build() + val batchRestApi: BatchRestApi = new BatchRestApi(basicKyuubiRestClient) + + val pythonScriptsPath = s"${getCurrentModuleHome(this)}/src/test/resources/python/" + val appScriptFileName = "app.py" + val appScriptFile = Paths.get(pythonScriptsPath, appScriptFileName).toFile + val modulesZipFileName = "pymodules.zip" + val modulesZipFile = preparePyModulesZip( + srcFolderPath = Paths.get(pythonScriptsPath), + targetZipFileName = modulesZipFileName, + excludedFileNames = Set(appScriptFileName)) + + val requestObj = newSparkBatchRequest(Map("spark.master" -> "local")) + requestObj.setBatchType("PYSPARK") + requestObj.setName("pyspark-test") + requestObj.setExtraResourcesMap(Map("spark.submit.pyFiles" -> modulesZipFileName).asJava) + val extraResources = List(modulesZipFile) + val batch: Batch = batchRestApi.createBatch(requestObj, appScriptFile, extraResources.asJava) + + try { + assert(batch.getKyuubiInstance === fe.connectionUrl) + assert(batch.getBatchType === "PYSPARK") + val batchId = batch.getId + assert(batchId !== null) + + eventually(timeout(1.minutes), interval(1.seconds)) { + val batch = batchRestApi.getBatchById(batchId) + assert(batch.getState == "FINISHED") + } + + } finally { + Files.deleteIfExists(Paths.get(modulesZipFile)) + basicKyuubiRestClient.close() + } + } + test("basic batch rest client with invalid user") { val totalConnections = MetricsSystem.counterValue(MetricsConstants.REST_CONN_TOTAL).getOrElse(0L)