Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6203#discussion_r197451254 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java --- @@ -54,18 +69,89 @@ public JobSubmitHandler( @Override protected CompletableFuture<JobSubmitResponseBody> handleRequest(@Nonnull HandlerRequest<JobSubmitRequestBody, EmptyMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException { - JobGraph jobGraph; - try { - ObjectInputStream objectIn = new ObjectInputStream(new ByteArrayInputStream(request.getRequestBody().serializedJobGraph)); - jobGraph = (JobGraph) objectIn.readObject(); - } catch (Exception e) { - throw new RestHandlerException( - "Failed to deserialize JobGraph.", - HttpResponseStatus.BAD_REQUEST, - e); + Collection<Path> uploadedFiles = request.getUploadedFiles(); + Map<String, Path> nameToFile = uploadedFiles.stream().collect(Collectors.toMap( + path -> path.getFileName().toString(), + entry -> entry + )); + + JobSubmitRequestBody requestBody = request.getRequestBody(); + + Path jobGraphFile = getPathAndAssertUpload(requestBody.jobGraphFileName, "JobGraph", nameToFile); --- End diff -- Move closer to usage
---