Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6203#discussion_r197450074 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java --- @@ -54,18 +69,89 @@ public JobSubmitHandler( @Override protected CompletableFuture<JobSubmitResponseBody> handleRequest(@Nonnull HandlerRequest<JobSubmitRequestBody, EmptyMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException { - JobGraph jobGraph; - try { - ObjectInputStream objectIn = new ObjectInputStream(new ByteArrayInputStream(request.getRequestBody().serializedJobGraph)); - jobGraph = (JobGraph) objectIn.readObject(); - } catch (Exception e) { - throw new RestHandlerException( - "Failed to deserialize JobGraph.", - HttpResponseStatus.BAD_REQUEST, - e); + Collection<Path> uploadedFiles = request.getUploadedFiles(); + Map<String, Path> nameToFile = uploadedFiles.stream().collect(Collectors.toMap( + path -> path.getFileName().toString(), + entry -> entry + )); + + JobSubmitRequestBody requestBody = request.getRequestBody(); + + Path jobGraphFile = getPathAndAssertUpload(requestBody.jobGraphFileName, "JobGraph", nameToFile); + + Collection<org.apache.flink.core.fs.Path> jarFiles = new ArrayList<>(requestBody.jarFileNames.size()); + for (String jarFileName : requestBody.jarFileNames) { + Path jarFile = getPathAndAssertUpload(jarFileName, "Jar", nameToFile); + jarFiles.add(new org.apache.flink.core.fs.Path(jarFile.toString())); + } + + Collection<Tuple2<String, org.apache.flink.core.fs.Path>> artifacts = new ArrayList<>(requestBody.artifactFileNames.size()); + for (JobSubmitRequestBody.DistributedCacheFile artifactFileName : requestBody.artifactFileNames) { + Path artifactFile = getPathAndAssertUpload(artifactFileName.fileName, "Artifact", nameToFile); + artifacts.add(Tuple2.of(artifactFileName.entryName, new org.apache.flink.core.fs.Path(artifactFile.toString()))); } - return gateway.submitJob(jobGraph, timeout) - .thenApply(ack -> new JobSubmitResponseBody("/jobs/" + jobGraph.getJobID())); + Map<String, DistributedCache.DistributedCacheEntry> temporaryHack = artifacts.stream() + .collect(Collectors.toMap( + tuple -> tuple.f0, + // the actual entry definition is mostly irrelevant as only the blobkey is accessed + // blame whoever wrote the ClientUtils API + tuple -> new DistributedCache.DistributedCacheEntry(tuple.f1.toString(), false) + )); + + // TODO: use executor --- End diff -- let's resolve this TODO
---