pan3793 commented on code in PR #6335:
URL: https://github.com/apache/kyuubi/pull/6335#discussion_r1704939521
##########
kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala:
##########
@@ -190,20 +191,21 @@ private[v1] class BatchesResource extends
ApiRequestContext with Logging {
def openBatchSessionWithUpload(
@FormDataParam("batchRequest") batchRequest: BatchRequest,
@FormDataParam("resourceFile") resourceFileInputStream: InputStream,
- @FormDataParam("resourceFile") resourceFileMetadata:
FormDataContentDisposition): Batch = {
+ @FormDataParam("resourceFile") resourceFileMetadata:
FormDataContentDisposition,
+ formDataMultiPart: FormDataMultiPart): Batch = {
require(
fe.getConf.get(BATCH_RESOURCE_UPLOAD_ENABLED),
"Batch resource upload function is disabled.")
require(
batchRequest != null,
"batchRequest is required and please check the content type" +
" of batchRequest is application/json")
- val tempFile = Utils.writeToTempFile(
- resourceFileInputStream,
- KyuubiApplicationManager.uploadWorkDir,
- resourceFileMetadata.getFileName)
- batchRequest.setResource(tempFile.getPath)
- openBatchSessionInternal(batchRequest, isResourceFromUpload = true)
+ openBatchSessionInternal(
Review Comment:
can we handle upload inside function `openBatchSessionWithUpload`?
otherwise, this function looks a little bit redundant
##########
kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala:
##########
@@ -525,22 +538,106 @@ private[v1] class BatchesResource extends
ApiRequestContext with Logging {
}
}
}
+
+ private def handleUploadingFiles(
+ batchId: String,
+ request: BatchRequest,
+ resourceFileInputStream: InputStream,
+ resourceFileName: String,
+ formDataMultiPartOpt: Option[FormDataMultiPart]): Option[JPath] = {
+ val uploadFileFolderPath = batchResourceUploadFolderPath(batchId)
+ handleUploadingResourceFile(
+ request,
+ resourceFileInputStream,
+ resourceFileName,
+ uploadFileFolderPath)
+ handleUploadingExtraResourcesFiles(request, formDataMultiPartOpt,
uploadFileFolderPath)
+ Some(uploadFileFolderPath)
+ }
+
+ private def handleUploadingResourceFile(
+ request: BatchRequest,
+ inputStream: InputStream,
+ fileName: String,
+ uploadFileFolderPath: JPath): Unit = {
+ try {
+ val tempFile = Utils.writeToTempFile(inputStream, uploadFileFolderPath,
fileName)
+ request.setResource(tempFile.getPath)
+ } catch {
+ case e: Exception =>
+ throw new RuntimeException(
+ s"Failed handling uploaded resource file $fileName: ${e.getMessage}",
+ e)
+ }
+ }
+
+ private def handleUploadingExtraResourcesFiles(
+ request: BatchRequest,
+ formDataMultiPartOpt: Option[FormDataMultiPart],
+ uploadFileFolderPath: JPath): Unit = {
+ val extraResourceMap = request.getExtraResourcesMap.asScala
+ if (extraResourceMap.nonEmpty) {
+ val fileNameSeperator = ","
+ val formDataMultiPart = formDataMultiPartOpt.get
+ val transformedExtraResourcesMap = extraResourceMap
+ .map { case (confKey, confValue) =>
Review Comment:
I remember there is a function `mapValues`
##########
dev/dependencyList:
##########
@@ -30,6 +30,7 @@ arrow-vector/16.0.0//arrow-vector-16.0.0.jar
checker-qual/3.42.0//checker-qual-3.42.0.jar
classgraph/4.8.138//classgraph-4.8.138.jar
commons-codec/1.15//commons-codec-1.15.jar
+commons-io/2.11.0//commons-io-2.11.0.jar
Review Comment:
LICENSE-binary and NOTICE-binary should be updated too
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]