Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6203#discussion_r199195443 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java --- @@ -34,38 +40,137 @@ import javax.annotation.Nonnull; -import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.IOException; import java.io.ObjectInputStream; +import java.net.InetSocketAddress; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collection; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; +import java.util.stream.Collectors; /** * This handler can be used to submit jobs to a Flink cluster. */ public final class JobSubmitHandler extends AbstractRestHandler<DispatcherGateway, JobSubmitRequestBody, JobSubmitResponseBody, EmptyMessageParameters> { + private static final String FILE_TYPE_GRAPH = "JobGraph"; + private static final String FILE_TYPE_JAR = "Jar"; + private static final String FILE_TYPE_ARTIFACT = "Artifact"; + + private final Executor executor; + public JobSubmitHandler( CompletableFuture<String> localRestAddress, GatewayRetriever<? extends DispatcherGateway> leaderRetriever, Time timeout, - Map<String, String> headers) { + Map<String, String> headers, + Executor executor) { super(localRestAddress, leaderRetriever, timeout, headers, JobSubmitHeaders.getInstance()); + this.executor = executor; } @Override protected CompletableFuture<JobSubmitResponseBody> handleRequest(@Nonnull HandlerRequest<JobSubmitRequestBody, EmptyMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException { - JobGraph jobGraph; - try { - ObjectInputStream objectIn = new ObjectInputStream(new ByteArrayInputStream(request.getRequestBody().serializedJobGraph)); - jobGraph = (JobGraph) objectIn.readObject(); - } catch (Exception e) { + Collection<File> uploadedFiles = request.getUploadedFiles(); + Map<String, Path> nameToFile = uploadedFiles.stream().collect(Collectors.toMap( + path -> path.toPath().getFileName().toString(), + File::toPath --- End diff -- Moreover we would no longer have to prefix the Flink `Path` type because we no longer have an ambiguity.
---