Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6203#discussion_r197450505
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
    @@ -54,18 +69,89 @@ public JobSubmitHandler(
     
        @Override
        protected CompletableFuture<JobSubmitResponseBody> 
handleRequest(@Nonnull HandlerRequest<JobSubmitRequestBody, 
EmptyMessageParameters> request, @Nonnull DispatcherGateway gateway) throws 
RestHandlerException {
    -           JobGraph jobGraph;
    -           try {
    -                   ObjectInputStream objectIn = new ObjectInputStream(new 
ByteArrayInputStream(request.getRequestBody().serializedJobGraph));
    -                   jobGraph = (JobGraph) objectIn.readObject();
    -           } catch (Exception e) {
    -                   throw new RestHandlerException(
    -                           "Failed to deserialize JobGraph.",
    -                           HttpResponseStatus.BAD_REQUEST,
    -                           e);
    +           Collection<Path> uploadedFiles = request.getUploadedFiles();
    +           Map<String, Path> nameToFile = 
uploadedFiles.stream().collect(Collectors.toMap(
    +                   path -> path.getFileName().toString(),
    +                   entry -> entry
    +           ));
    +
    +           JobSubmitRequestBody requestBody = request.getRequestBody();
    +
    +           Path jobGraphFile = 
getPathAndAssertUpload(requestBody.jobGraphFileName, "JobGraph", nameToFile);
    +
    +           Collection<org.apache.flink.core.fs.Path> jarFiles = new 
ArrayList<>(requestBody.jarFileNames.size());
    +           for (String jarFileName : requestBody.jarFileNames) {
    +                   Path jarFile = getPathAndAssertUpload(jarFileName, 
"Jar", nameToFile);
    +                   jarFiles.add(new 
org.apache.flink.core.fs.Path(jarFile.toString()));
    +           }
    +
    +           Collection<Tuple2<String, org.apache.flink.core.fs.Path>> 
artifacts = new ArrayList<>(requestBody.artifactFileNames.size());
    +           for (JobSubmitRequestBody.DistributedCacheFile artifactFileName 
: requestBody.artifactFileNames) {
    +                   Path artifactFile = 
getPathAndAssertUpload(artifactFileName.fileName, "Artifact", nameToFile);
    --- End diff --
    
    Same here for `"Artifact"`


---

Reply via email to