[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6203 ---
[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6203#discussion_r199867852 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java --- @@ -315,36 +315,61 @@ public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) // we have to enable queued scheduling because slot will be allocated lazily jobGraph.setAllowQueuedScheduling(true); - log.info("Requesting blob server port."); - CompletableFuture portFuture = sendRequest(BlobServerPortHeaders.getInstance()); + CompletableFuture jobGraphFileFuture = CompletableFuture.supplyAsync(() -> { + try { + final java.nio.file.Path jobGraphFile = Files.createTempFile("flink-jobgraph", ".bin"); + try (ObjectOutputStream objectOut = new ObjectOutputStream(Files.newOutputStream(jobGraphFile))) { + objectOut.writeObject(jobGraph); + } + return jobGraphFile; + } catch (IOException e) { + throw new CompletionException(new FlinkException("Failed to serialize JobGraph.", e)); + } + }, executorService); - CompletableFuture jobUploadFuture = portFuture.thenCombine( - getDispatcherAddress(), - (BlobServerPortResponseBody response, String dispatcherAddress) -> { - final int blobServerPort = response.port; - final InetSocketAddress address = new InetSocketAddress(dispatcherAddress, blobServerPort); + CompletableFuture>> requestFuture = jobGraphFileFuture.thenApply(jobGraphFile -> { + List jarFileNames = new ArrayList<>(8); + List artifactFileNames = new ArrayList<>(8); + Collection filesToUpload = new ArrayList<>(8); - try { - ClientUtils.uploadJobGraphFiles(jobGraph, () -> new BlobClient(address, flinkConfig)); - } catch (Exception e) { - throw new CompletionException(e); - } + filesToUpload.add(new FileUpload(jobGraphFile, RestConstants.CONTENT_TYPE_BINARY)); - return jobGraph; - }); + for (Path jar : jobGraph.getUserJars()) { + jarFileNames.add(jar.getName()); + filesToUpload.add(new FileUpload(Paths.get(jar.toUri()), RestConstants.CONTENT_TYPE_JAR)); + } - CompletableFuture submissionFuture = jobUploadFuture.thenCompose( - (JobGraph jobGraphToSubmit) -> { - log.info("Submitting job graph."); + for (Map.Entry artifacts : jobGraph.getUserArtifacts().entrySet()) { + artifactFileNames.add(new JobSubmitRequestBody.DistributedCacheFile(artifacts.getKey(), new Path(artifacts.getValue().filePath).getName())); + filesToUpload.add(new FileUpload(Paths.get(artifacts.getValue().filePath), RestConstants.CONTENT_TYPE_BINARY)); + } - try { - return sendRequest( - JobSubmitHeaders.getInstance(), - new JobSubmitRequestBody(jobGraph)); - } catch (IOException ioe) { - throw new CompletionException(new FlinkException("Could not create JobSubmitRequestBody.", ioe)); - } - }); + final JobSubmitRequestBody requestBody = new JobSubmitRequestBody( + jobGraphFile.getFileName().toString(), + jarFileNames, + artifactFileNames); + + return Tuple2.of(requestBody, Collections.unmodifiableCollection(filesToUpload)); + }); + + final CompletableFuture submissionFuture = requestFuture.thenCompose( + requestAndFileUploads -> sendRetriableRequest( + JobSubmitHeaders.getInstance(), + EmptyMessageParameters.getInstance(), + requestAndFileUploads.f0, + requestAndFileUploads.f1, +
[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6203#discussion_r199866326 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java --- @@ -315,36 +315,61 @@ public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) // we have to enable queued scheduling because slot will be allocated lazily jobGraph.setAllowQueuedScheduling(true); - log.info("Requesting blob server port."); - CompletableFuture portFuture = sendRequest(BlobServerPortHeaders.getInstance()); + CompletableFuture jobGraphFileFuture = CompletableFuture.supplyAsync(() -> { + try { + final java.nio.file.Path jobGraphFile = Files.createTempFile("flink-jobgraph", ".bin"); + try (ObjectOutputStream objectOut = new ObjectOutputStream(Files.newOutputStream(jobGraphFile))) { + objectOut.writeObject(jobGraph); + } + return jobGraphFile; + } catch (IOException e) { + throw new CompletionException(new FlinkException("Failed to serialize JobGraph.", e)); + } + }, executorService); - CompletableFuture jobUploadFuture = portFuture.thenCombine( - getDispatcherAddress(), - (BlobServerPortResponseBody response, String dispatcherAddress) -> { - final int blobServerPort = response.port; - final InetSocketAddress address = new InetSocketAddress(dispatcherAddress, blobServerPort); + CompletableFuture>> requestFuture = jobGraphFileFuture.thenApply(jobGraphFile -> { + List jarFileNames = new ArrayList<>(8); + List artifactFileNames = new ArrayList<>(8); + Collection filesToUpload = new ArrayList<>(8); - try { - ClientUtils.uploadJobGraphFiles(jobGraph, () -> new BlobClient(address, flinkConfig)); - } catch (Exception e) { - throw new CompletionException(e); - } + filesToUpload.add(new FileUpload(jobGraphFile, RestConstants.CONTENT_TYPE_BINARY)); - return jobGraph; - }); + for (Path jar : jobGraph.getUserJars()) { + jarFileNames.add(jar.getName()); + filesToUpload.add(new FileUpload(Paths.get(jar.toUri()), RestConstants.CONTENT_TYPE_JAR)); + } - CompletableFuture submissionFuture = jobUploadFuture.thenCompose( - (JobGraph jobGraphToSubmit) -> { - log.info("Submitting job graph."); + for (Map.Entry artifacts : jobGraph.getUserArtifacts().entrySet()) { + artifactFileNames.add(new JobSubmitRequestBody.DistributedCacheFile(artifacts.getKey(), new Path(artifacts.getValue().filePath).getName())); + filesToUpload.add(new FileUpload(Paths.get(artifacts.getValue().filePath), RestConstants.CONTENT_TYPE_BINARY)); + } - try { - return sendRequest( - JobSubmitHeaders.getInstance(), - new JobSubmitRequestBody(jobGraph)); - } catch (IOException ioe) { - throw new CompletionException(new FlinkException("Could not create JobSubmitRequestBody.", ioe)); - } - }); + final JobSubmitRequestBody requestBody = new JobSubmitRequestBody( + jobGraphFile.getFileName().toString(), + jarFileNames, + artifactFileNames); + + return Tuple2.of(requestBody, Collections.unmodifiableCollection(filesToUpload)); + }); + + final CompletableFuture submissionFuture = requestFuture.thenCompose( + requestAndFileUploads -> sendRetriableRequest( + JobSubmitHeaders.getInstance(), + EmptyMessageParameters.getInstance(), + requestAndFileUploads.f0, + requestAndFileUploads.f1, +
[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6203#discussion_r199794599 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java --- @@ -54,18 +69,89 @@ public JobSubmitHandler( @Override protected CompletableFuture handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws RestHandlerException { - JobGraph jobGraph; - try { - ObjectInputStream objectIn = new ObjectInputStream(new ByteArrayInputStream(request.getRequestBody().serializedJobGraph)); - jobGraph = (JobGraph) objectIn.readObject(); - } catch (Exception e) { - throw new RestHandlerException( - "Failed to deserialize JobGraph.", - HttpResponseStatus.BAD_REQUEST, - e); + Collection uploadedFiles = request.getUploadedFiles(); + Map nameToFile = uploadedFiles.stream().collect(Collectors.toMap( + path -> path.getFileName().toString(), + entry -> entry + )); + + JobSubmitRequestBody requestBody = request.getRequestBody(); + + Path jobGraphFile = getPathAndAssertUpload(requestBody.jobGraphFileName, "JobGraph", nameToFile); + + Collection jarFiles = new ArrayList<>(requestBody.jarFileNames.size()); + for (String jarFileName : requestBody.jarFileNames) { + Path jarFile = getPathAndAssertUpload(jarFileName, "Jar", nameToFile); + jarFiles.add(new org.apache.flink.core.fs.Path(jarFile.toString())); + } + + Collection> artifacts = new ArrayList<>(requestBody.artifactFileNames.size()); + for (JobSubmitRequestBody.DistributedCacheFile artifactFileName : requestBody.artifactFileNames) { + Path artifactFile = getPathAndAssertUpload(artifactFileName.fileName, "Artifact", nameToFile); + artifacts.add(Tuple2.of(artifactFileName.entryName, new org.apache.flink.core.fs.Path(artifactFile.toString(; } - return gateway.submitJob(jobGraph, timeout) - .thenApply(ack -> new JobSubmitResponseBody("/jobs/" + jobGraph.getJobID())); + Map temporaryHack = artifacts.stream() + .collect(Collectors.toMap( + tuple -> tuple.f0, + // the actual entry definition is mostly irrelevant as only the blobkey is accessed + // blame whoever wrote the ClientUtils API + tuple -> new DistributedCache.DistributedCacheEntry(tuple.f1.toString(), false) + )); + + // TODO: use executor + CompletableFuture jobGraphFuture = CompletableFuture.supplyAsync(() -> { + JobGraph jobGraph; + try (ObjectInputStream objectIn = new ObjectInputStream(Files.newInputStream(jobGraphFile))) { + jobGraph = (JobGraph) objectIn.readObject(); + } catch (Exception e) { + throw new CompletionException(new RestHandlerException( + "Failed to deserialize JobGraph.", + HttpResponseStatus.BAD_REQUEST, + e)); + } + return jobGraph; + }); + + CompletableFuture blobServerPortFuture = gateway.getBlobServerPort(timeout); + + CompletableFuture finalizedJobGraphFuture = jobGraphFuture.thenCombine(blobServerPortFuture, (jobGraph, blobServerPort) -> { + final InetSocketAddress address = new InetSocketAddress(gateway.getHostname(), blobServerPort); + try (BlobClient blobClient = new BlobClient(address, new Configuration())) { + Collection jarBlobKeys = ClientUtils.uploadUserJars(jobGraph.getJobID(), jarFiles, blobClient); + ClientUtils.setUserJarBlobKeys(jarBlobKeys, jobGraph); + + Collection> artifactBlobKeys = ClientUtils.uploadUserArtifacts(jobGraph.getJobID(), temporaryHack, blobClient); + ClientUtils.setUserArtifactBlobKeys(jobGraph, artifactBlobKeys); + } catch (IOException e) { + throw new CompletionException(new RestHandlerException( + "Could not upload job files.", +
[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6203#discussion_r199793741 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java --- @@ -315,36 +315,61 @@ public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) // we have to enable queued scheduling because slot will be allocated lazily jobGraph.setAllowQueuedScheduling(true); - log.info("Requesting blob server port."); - CompletableFuture portFuture = sendRequest(BlobServerPortHeaders.getInstance()); + CompletableFuture jobGraphFileFuture = CompletableFuture.supplyAsync(() -> { + try { + final java.nio.file.Path jobGraphFile = Files.createTempFile("flink-jobgraph", ".bin"); + try (ObjectOutputStream objectOut = new ObjectOutputStream(Files.newOutputStream(jobGraphFile))) { + objectOut.writeObject(jobGraph); + } + return jobGraphFile; + } catch (IOException e) { + throw new CompletionException(new FlinkException("Failed to serialize JobGraph.", e)); + } + }, executorService); - CompletableFuture jobUploadFuture = portFuture.thenCombine( - getDispatcherAddress(), - (BlobServerPortResponseBody response, String dispatcherAddress) -> { - final int blobServerPort = response.port; - final InetSocketAddress address = new InetSocketAddress(dispatcherAddress, blobServerPort); + CompletableFuture>> requestFuture = jobGraphFileFuture.thenApply(jobGraphFile -> { + List jarFileNames = new ArrayList<>(8); + List artifactFileNames = new ArrayList<>(8); + Collection filesToUpload = new ArrayList<>(8); - try { - ClientUtils.uploadJobGraphFiles(jobGraph, () -> new BlobClient(address, flinkConfig)); - } catch (Exception e) { - throw new CompletionException(e); - } + filesToUpload.add(new FileUpload(jobGraphFile, RestConstants.CONTENT_TYPE_BINARY)); - return jobGraph; - }); + for (Path jar : jobGraph.getUserJars()) { + jarFileNames.add(jar.getName()); + filesToUpload.add(new FileUpload(Paths.get(jar.toUri()), RestConstants.CONTENT_TYPE_JAR)); + } - CompletableFuture submissionFuture = jobUploadFuture.thenCompose( - (JobGraph jobGraphToSubmit) -> { - log.info("Submitting job graph."); + for (Map.Entry artifacts : jobGraph.getUserArtifacts().entrySet()) { + artifactFileNames.add(new JobSubmitRequestBody.DistributedCacheFile(artifacts.getKey(), new Path(artifacts.getValue().filePath).getName())); + filesToUpload.add(new FileUpload(Paths.get(artifacts.getValue().filePath), RestConstants.CONTENT_TYPE_BINARY)); + } - try { - return sendRequest( - JobSubmitHeaders.getInstance(), - new JobSubmitRequestBody(jobGraph)); - } catch (IOException ioe) { - throw new CompletionException(new FlinkException("Could not create JobSubmitRequestBody.", ioe)); - } - }); + final JobSubmitRequestBody requestBody = new JobSubmitRequestBody( + jobGraphFile.getFileName().toString(), + jarFileNames, + artifactFileNames); + + return Tuple2.of(requestBody, Collections.unmodifiableCollection(filesToUpload)); + }); + + final CompletableFuture submissionFuture = requestFuture.thenCompose( + requestAndFileUploads -> sendRetriableRequest( + JobSubmitHeaders.getInstance(), + EmptyMessageParameters.getInstance(), + requestAndFileUploads.f0, + requestAndFileUploads.f1, +
[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6203#discussion_r199780706 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java --- @@ -105,7 +106,8 @@ protected void respondToRequest(ChannelHandlerContext ctx, HttpRequest httpReque messageHeaders.getResponseStatusCode(), responseHeaders); } - }); + }).whenComplete((P resp, Throwable throwable) -> processingFinishedFuture.complete(null)); --- End diff -- I think we are swallowing potential exceptions here. I think it would be better to do something like ``` return response.whenComplete(...).thenApply(ignored -> null) ``` That way we would also get rid of the `processingFinishedFuture`. ---
[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6203#discussion_r199780490 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java --- @@ -79,6 +79,7 @@ protected void respondToRequest(ChannelHandlerContext ctx, HttpRequest httpReque response = FutureUtils.completedExceptionally(e); } + CompletableFuture processingFinishedFuture = new CompletableFuture<>(); response.whenComplete((P resp, Throwable throwable) -> { --- End diff -- This is a good point. ---
[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6203#discussion_r199741252 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java --- @@ -18,64 +18,118 @@ package org.apache.flink.runtime.rest.messages.job; -import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.rest.messages.RequestBody; -import org.apache.flink.util.Preconditions; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.ObjectOutputStream; -import java.util.Arrays; +import java.util.Collection; +import java.util.Objects; /** * Request for submitting a job. * - * We currently require the job-jars to be uploaded through the blob-server. + * This request only contains the names of files that must be present on the server, and defines how these files are + * interpreted. */ public final class JobSubmitRequestBody implements RequestBody { - private static final String FIELD_NAME_SERIALIZED_JOB_GRAPH = "serializedJobGraph"; + private static final String FIELD_NAME_JOB_GRAPH = "jobGraphFileName"; + private static final String FIELD_NAME_JOB_JARS = "jobJarFileNames"; + private static final String FIELD_NAME_JOB_ARTIFACTS = "jobArtifactFileNames"; - /** -* The serialized job graph. -*/ - @JsonProperty(FIELD_NAME_SERIALIZED_JOB_GRAPH) - public final byte[] serializedJobGraph; + @JsonProperty(FIELD_NAME_JOB_GRAPH) + public final String jobGraphFileName; - public JobSubmitRequestBody(JobGraph jobGraph) throws IOException { - this(serializeJobGraph(jobGraph)); - } + @JsonProperty(FIELD_NAME_JOB_JARS) + public final Collection jarFileNames; + + @JsonProperty(FIELD_NAME_JOB_ARTIFACTS) + public final Collection artifactFileNames; - @JsonCreator --- End diff -- revert ---
[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6203#discussion_r199456197 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java --- @@ -119,15 +113,17 @@ public void testSuccessfulJobSubmission() throws Exception { } } - DispatcherGateway mockGateway = mock(DispatcherGateway.class); - when(mockGateway.getHostname()).thenReturn("localhost"); - when(mockGateway.getBlobServerPort(any(Time.class))).thenReturn(CompletableFuture.completedFuture(blobServer.getPort())); - when(mockGateway.submitJob(any(JobGraph.class), any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get())); - GatewayRetriever mockGatewayRetriever = mock(GatewayRetriever.class); + TestingDispatcherGateway.Builder builder = new TestingDispatcherGateway.Builder(); --- End diff -- This block is written the way it is since methods that the `TestingDispatcherGateway.Builder` inherits return a `TestingRestfulGateway.Builder`, which also applies to `build`. it's a bit cumbersome, but I couldn't find a solution that doesn't include copying the entire `TestingRestfulGateway.Builder`. ---
[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6203#discussion_r199450747 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java --- @@ -315,36 +315,58 @@ public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) // we have to enable queued scheduling because slot will be allocated lazily jobGraph.setAllowQueuedScheduling(true); - log.info("Requesting blob server port."); - CompletableFuture portFuture = sendRequest(BlobServerPortHeaders.getInstance()); + CompletableFuture submissionFuture = CompletableFuture.supplyAsync( + () -> { + log.info("Submitting job graph."); - CompletableFuture jobUploadFuture = portFuture.thenCombine( - getDispatcherAddress(), - (BlobServerPortResponseBody response, String dispatcherAddress) -> { - final int blobServerPort = response.port; - final InetSocketAddress address = new InetSocketAddress(dispatcherAddress, blobServerPort); + List jarFileNames = new ArrayList<>(8); + List artifactFileNames = new ArrayList<>(8); + Collection filesToUpload = new ArrayList<>(8); + // TODO: need configurable location --- End diff -- Will do once the PR is merged. ---
[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6203#discussion_r199439130 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java --- @@ -315,36 +315,58 @@ public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) // we have to enable queued scheduling because slot will be allocated lazily jobGraph.setAllowQueuedScheduling(true); - log.info("Requesting blob server port."); - CompletableFuture portFuture = sendRequest(BlobServerPortHeaders.getInstance()); + CompletableFuture submissionFuture = CompletableFuture.supplyAsync( + () -> { + log.info("Submitting job graph."); - CompletableFuture jobUploadFuture = portFuture.thenCombine( - getDispatcherAddress(), - (BlobServerPortResponseBody response, String dispatcherAddress) -> { - final int blobServerPort = response.port; - final InetSocketAddress address = new InetSocketAddress(dispatcherAddress, blobServerPort); + List jarFileNames = new ArrayList<>(8); + List artifactFileNames = new ArrayList<>(8); + Collection filesToUpload = new ArrayList<>(8); + // TODO: need configurable location + final java.nio.file.Path jobGraphFile; try { - ClientUtils.uploadJobGraphFiles(jobGraph, () -> new BlobClient(address, flinkConfig)); - } catch (Exception e) { - throw new CompletionException(e); + jobGraphFile = Files.createTempFile("flink-jobgraph", ".bin"); + try (OutputStream fileOut = Files.newOutputStream(jobGraphFile)) { + try (ObjectOutputStream objectOut = new ObjectOutputStream(fileOut)) { + objectOut.writeObject(jobGraph); + } + } + filesToUpload.add(new FileUpload(jobGraphFile, RestConstants.CONTENT_TYPE_BINARY)); + } catch (IOException e) { + throw new CompletionException("Failed to serialize JobGraph.", e); } - return jobGraph; - }); - - CompletableFuture submissionFuture = jobUploadFuture.thenCompose( - (JobGraph jobGraphToSubmit) -> { - log.info("Submitting job graph."); + for (Path jar : jobGraph.getUserJars()) { + jarFileNames.add(jar.getName()); + filesToUpload.add(new FileUpload(Paths.get(jar.toUri()), RestConstants.CONTENT_TYPE_JAR)); + } - try { - return sendRequest( - JobSubmitHeaders.getInstance(), - new JobSubmitRequestBody(jobGraph)); - } catch (IOException ioe) { - throw new CompletionException(new FlinkException("Could not create JobSubmitRequestBody.", ioe)); + for (Map.Entry artifacts : jobGraph.getUserArtifacts().entrySet()) { + artifactFileNames.add(new JobSubmitRequestBody.DistributedCacheFile(artifacts.getKey(), new Path(artifacts.getValue().filePath).getName())); + filesToUpload.add(new FileUpload(Paths.get(artifacts.getValue().filePath), RestConstants.CONTENT_TYPE_BINARY)); } - }); + + final CompletableFuture submitFuture = sendRetriableRequest( --- End diff -- This is a slightly more extensive change since the cleanup needs access to the `jobGraphFile`. I'll see what i can do. ---
[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6203#discussion_r199427903 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java --- @@ -79,6 +79,7 @@ protected void respondToRequest(ChannelHandlerContext ctx, HttpRequest httpReque response = FutureUtils.completedExceptionally(e); } + CompletableFuture processingFinishedFuture = new CompletableFuture<>(); response.whenComplete((P resp, Throwable throwable) -> { --- End diff -- seems odd to modify the signature to make the code prettier. There's no use-case for the handlers to return anything but null, so why even allow it? Doesn't this go against the principle of using the most restrictive interface? ---
[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6203#discussion_r199333574 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java --- @@ -34,38 +40,137 @@ import javax.annotation.Nonnull; -import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.IOException; import java.io.ObjectInputStream; +import java.net.InetSocketAddress; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collection; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; +import java.util.stream.Collectors; /** * This handler can be used to submit jobs to a Flink cluster. */ public final class JobSubmitHandler extends AbstractRestHandler { + private static final String FILE_TYPE_GRAPH = "JobGraph"; + private static final String FILE_TYPE_JAR = "Jar"; + private static final String FILE_TYPE_ARTIFACT = "Artifact"; + + private final Executor executor; + public JobSubmitHandler( CompletableFuture localRestAddress, GatewayRetriever leaderRetriever, Time timeout, - Map headers) { + Map headers, + Executor executor) { super(localRestAddress, leaderRetriever, timeout, headers, JobSubmitHeaders.getInstance()); + this.executor = executor; } @Override protected CompletableFuture handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws RestHandlerException { - JobGraph jobGraph; - try { - ObjectInputStream objectIn = new ObjectInputStream(new ByteArrayInputStream(request.getRequestBody().serializedJobGraph)); - jobGraph = (JobGraph) objectIn.readObject(); - } catch (Exception e) { + Collection uploadedFiles = request.getUploadedFiles(); + Map nameToFile = uploadedFiles.stream().collect(Collectors.toMap( + path -> path.toPath().getFileName().toString(), + File::toPath + )); + + if (uploadedFiles.size() != nameToFile.size()) { throw new RestHandlerException( - "Failed to deserialize JobGraph.", - HttpResponseStatus.BAD_REQUEST, - e); + String.format("The number of uploaded files was %s than the expected count. Expected: %s Actual %s", + uploadedFiles.size() < nameToFile.size() ? "lower" : "higher", + nameToFile.size(), + uploadedFiles.size()), + HttpResponseStatus.BAD_REQUEST + ); + } + + JobSubmitRequestBody requestBody = request.getRequestBody(); + + Path jobGraphFile = getPathAndAssertUpload(requestBody.jobGraphFileName, FILE_TYPE_GRAPH, nameToFile); + + CompletableFuture jobGraphFuture = CompletableFuture.supplyAsync(() -> { + JobGraph jobGraph; + try (ObjectInputStream objectIn = new ObjectInputStream(Files.newInputStream(jobGraphFile))) { + jobGraph = (JobGraph) objectIn.readObject(); + } catch (Exception e) { + throw new CompletionException(new RestHandlerException( + "Failed to deserialize JobGraph.", + HttpResponseStatus.BAD_REQUEST, + e)); + } + return jobGraph; + }, executor); + + Collection jarFiles = getJarFilesToUpload(nameToFile, requestBody.jarFileNames); + + Collection> artifacts = getArtifactFilesToUpload(nameToFile, requestBody.artifactFileNames); + + CompletableFuture blobServerPortFuture = gateway.getBlobServerPort(timeout); + + CompletableFuture finalizedJobGraphFuture = jobGraphFuture.thenCombine(blobServerPortFuture, (jobGraph, blobServerPort) -> { + final InetSocketAddress address = new InetSocketAddress(gateway.getHostname(), blobServerPort); + try (BlobClient blobClient = new BlobClient(address, new Configuration())){ + Collection jarBlobKeys =
[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6203#discussion_r199236398 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java --- @@ -34,38 +40,137 @@ import javax.annotation.Nonnull; -import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.IOException; import java.io.ObjectInputStream; +import java.net.InetSocketAddress; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collection; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; +import java.util.stream.Collectors; /** * This handler can be used to submit jobs to a Flink cluster. */ public final class JobSubmitHandler extends AbstractRestHandler { + private static final String FILE_TYPE_GRAPH = "JobGraph"; + private static final String FILE_TYPE_JAR = "Jar"; + private static final String FILE_TYPE_ARTIFACT = "Artifact"; + + private final Executor executor; + public JobSubmitHandler( CompletableFuture localRestAddress, GatewayRetriever leaderRetriever, Time timeout, - Map headers) { + Map headers, + Executor executor) { super(localRestAddress, leaderRetriever, timeout, headers, JobSubmitHeaders.getInstance()); + this.executor = executor; } @Override protected CompletableFuture handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws RestHandlerException { - JobGraph jobGraph; - try { - ObjectInputStream objectIn = new ObjectInputStream(new ByteArrayInputStream(request.getRequestBody().serializedJobGraph)); - jobGraph = (JobGraph) objectIn.readObject(); - } catch (Exception e) { + Collection uploadedFiles = request.getUploadedFiles(); + Map nameToFile = uploadedFiles.stream().collect(Collectors.toMap( + path -> path.toPath().getFileName().toString(), + File::toPath + )); + + if (uploadedFiles.size() != nameToFile.size()) { throw new RestHandlerException( - "Failed to deserialize JobGraph.", - HttpResponseStatus.BAD_REQUEST, - e); + String.format("The number of uploaded files was %s than the expected count. Expected: %s Actual %s", + uploadedFiles.size() < nameToFile.size() ? "lower" : "higher", + nameToFile.size(), + uploadedFiles.size()), + HttpResponseStatus.BAD_REQUEST + ); + } + + JobSubmitRequestBody requestBody = request.getRequestBody(); + + Path jobGraphFile = getPathAndAssertUpload(requestBody.jobGraphFileName, FILE_TYPE_GRAPH, nameToFile); + + CompletableFuture jobGraphFuture = CompletableFuture.supplyAsync(() -> { + JobGraph jobGraph; + try (ObjectInputStream objectIn = new ObjectInputStream(Files.newInputStream(jobGraphFile))) { + jobGraph = (JobGraph) objectIn.readObject(); + } catch (Exception e) { + throw new CompletionException(new RestHandlerException( + "Failed to deserialize JobGraph.", + HttpResponseStatus.BAD_REQUEST, + e)); + } + return jobGraph; + }, executor); + + Collection jarFiles = getJarFilesToUpload(nameToFile, requestBody.jarFileNames); + + Collection> artifacts = getArtifactFilesToUpload(nameToFile, requestBody.artifactFileNames); + + CompletableFuture blobServerPortFuture = gateway.getBlobServerPort(timeout); + + CompletableFuture finalizedJobGraphFuture = jobGraphFuture.thenCombine(blobServerPortFuture, (jobGraph, blobServerPort) -> { + final InetSocketAddress address = new InetSocketAddress(gateway.getHostname(), blobServerPort); + try (BlobClient blobClient = new BlobClient(address, new Configuration())){ + Collection jarBlobKeys =
[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6203#discussion_r199197307 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java --- @@ -47,8 +65,30 @@ */ public class JobSubmitHandlerTest extends TestLogger { + @ClassRule + public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + private static BlobServer blobServer; + + @BeforeClass + public static void setup() throws IOException { + Configuration config = new Configuration(); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, + TEMPORARY_FOLDER.newFolder().getAbsolutePath()); + + blobServer = new BlobServer(config, new VoidBlobStore()); + blobServer.start(); + } + + @AfterClass + public static void teardown() throws IOException { + if (blobServer != null) { + blobServer.close(); + } + } + @Test public void testSerializationFailureHandling() throws Exception { + final Path jobGraphFile = TEMPORARY_FOLDER.newFile().toPath(); DispatcherGateway mockGateway = mock(DispatcherGateway.class); when(mockGateway.submitJob(any(JobGraph.class), any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get())); GatewayRetriever mockGatewayRetriever = mock(GatewayRetriever.class); --- End diff -- Here we could use instead `() -> new CompletableFuture()` ---
[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6203#discussion_r199181211 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java --- @@ -315,36 +315,58 @@ public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) // we have to enable queued scheduling because slot will be allocated lazily jobGraph.setAllowQueuedScheduling(true); - log.info("Requesting blob server port."); - CompletableFuture portFuture = sendRequest(BlobServerPortHeaders.getInstance()); + CompletableFuture submissionFuture = CompletableFuture.supplyAsync( + () -> { + log.info("Submitting job graph."); - CompletableFuture jobUploadFuture = portFuture.thenCombine( - getDispatcherAddress(), - (BlobServerPortResponseBody response, String dispatcherAddress) -> { - final int blobServerPort = response.port; - final InetSocketAddress address = new InetSocketAddress(dispatcherAddress, blobServerPort); + List jarFileNames = new ArrayList<>(8); + List artifactFileNames = new ArrayList<>(8); + Collection filesToUpload = new ArrayList<>(8); + // TODO: need configurable location + final java.nio.file.Path jobGraphFile; try { - ClientUtils.uploadJobGraphFiles(jobGraph, () -> new BlobClient(address, flinkConfig)); - } catch (Exception e) { - throw new CompletionException(e); + jobGraphFile = Files.createTempFile("flink-jobgraph", ".bin"); + try (OutputStream fileOut = Files.newOutputStream(jobGraphFile)) { + try (ObjectOutputStream objectOut = new ObjectOutputStream(fileOut)) { + objectOut.writeObject(jobGraph); + } + } + filesToUpload.add(new FileUpload(jobGraphFile, RestConstants.CONTENT_TYPE_BINARY)); + } catch (IOException e) { + throw new CompletionException("Failed to serialize JobGraph.", e); } - return jobGraph; - }); - - CompletableFuture submissionFuture = jobUploadFuture.thenCompose( - (JobGraph jobGraphToSubmit) -> { - log.info("Submitting job graph."); + for (Path jar : jobGraph.getUserJars()) { + jarFileNames.add(jar.getName()); + filesToUpload.add(new FileUpload(Paths.get(jar.toUri()), RestConstants.CONTENT_TYPE_JAR)); + } - try { - return sendRequest( - JobSubmitHeaders.getInstance(), - new JobSubmitRequestBody(jobGraph)); - } catch (IOException ioe) { - throw new CompletionException(new FlinkException("Could not create JobSubmitRequestBody.", ioe)); + for (Map.Entry artifacts : jobGraph.getUserArtifacts().entrySet()) { + artifactFileNames.add(new JobSubmitRequestBody.DistributedCacheFile(artifacts.getKey(), new Path(artifacts.getValue().filePath).getName())); + filesToUpload.add(new FileUpload(Paths.get(artifacts.getValue().filePath), RestConstants.CONTENT_TYPE_BINARY)); } - }); + + final CompletableFuture submitFuture = sendRetriableRequest( + JobSubmitHeaders.getInstance(), + EmptyMessageParameters.getInstance(), + new JobSubmitRequestBody( + jobGraphFile.getFileName().toString(), + jarFileNames, + artifactFileNames), + filesToUpload, +
[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6203#discussion_r199178551 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java --- @@ -315,36 +315,58 @@ public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) // we have to enable queued scheduling because slot will be allocated lazily jobGraph.setAllowQueuedScheduling(true); - log.info("Requesting blob server port."); - CompletableFuture portFuture = sendRequest(BlobServerPortHeaders.getInstance()); + CompletableFuture submissionFuture = CompletableFuture.supplyAsync( + () -> { + log.info("Submitting job graph."); - CompletableFuture jobUploadFuture = portFuture.thenCombine( - getDispatcherAddress(), - (BlobServerPortResponseBody response, String dispatcherAddress) -> { - final int blobServerPort = response.port; - final InetSocketAddress address = new InetSocketAddress(dispatcherAddress, blobServerPort); + List jarFileNames = new ArrayList<>(8); + List artifactFileNames = new ArrayList<>(8); + Collection filesToUpload = new ArrayList<>(8); + // TODO: need configurable location + final java.nio.file.Path jobGraphFile; try { - ClientUtils.uploadJobGraphFiles(jobGraph, () -> new BlobClient(address, flinkConfig)); - } catch (Exception e) { - throw new CompletionException(e); + jobGraphFile = Files.createTempFile("flink-jobgraph", ".bin"); + try (OutputStream fileOut = Files.newOutputStream(jobGraphFile)) { + try (ObjectOutputStream objectOut = new ObjectOutputStream(fileOut)) { --- End diff -- I think we could combine these to `try` statement to `new ObjectOutputStream(Files.newOutputStream(jobGraphFile))` ---
[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6203#discussion_r199187606 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java --- @@ -34,38 +40,137 @@ import javax.annotation.Nonnull; -import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.IOException; import java.io.ObjectInputStream; +import java.net.InetSocketAddress; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collection; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; +import java.util.stream.Collectors; /** * This handler can be used to submit jobs to a Flink cluster. */ public final class JobSubmitHandler extends AbstractRestHandler { + private static final String FILE_TYPE_GRAPH = "JobGraph"; + private static final String FILE_TYPE_JAR = "Jar"; + private static final String FILE_TYPE_ARTIFACT = "Artifact"; + + private final Executor executor; + public JobSubmitHandler( CompletableFuture localRestAddress, GatewayRetriever leaderRetriever, Time timeout, - Map headers) { + Map headers, + Executor executor) { super(localRestAddress, leaderRetriever, timeout, headers, JobSubmitHeaders.getInstance()); + this.executor = executor; } @Override protected CompletableFuture handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws RestHandlerException { - JobGraph jobGraph; - try { - ObjectInputStream objectIn = new ObjectInputStream(new ByteArrayInputStream(request.getRequestBody().serializedJobGraph)); - jobGraph = (JobGraph) objectIn.readObject(); - } catch (Exception e) { + Collection uploadedFiles = request.getUploadedFiles(); + Map nameToFile = uploadedFiles.stream().collect(Collectors.toMap( + path -> path.toPath().getFileName().toString(), + File::toPath + )); + + if (uploadedFiles.size() != nameToFile.size()) { throw new RestHandlerException( - "Failed to deserialize JobGraph.", - HttpResponseStatus.BAD_REQUEST, - e); + String.format("The number of uploaded files was %s than the expected count. Expected: %s Actual %s", + uploadedFiles.size() < nameToFile.size() ? "lower" : "higher", + nameToFile.size(), + uploadedFiles.size()), + HttpResponseStatus.BAD_REQUEST + ); + } + + JobSubmitRequestBody requestBody = request.getRequestBody(); + + Path jobGraphFile = getPathAndAssertUpload(requestBody.jobGraphFileName, FILE_TYPE_GRAPH, nameToFile); --- End diff -- maybe make `final`. ---
[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6203#discussion_r199187175 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java --- @@ -34,38 +40,137 @@ import javax.annotation.Nonnull; -import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.IOException; import java.io.ObjectInputStream; +import java.net.InetSocketAddress; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collection; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; +import java.util.stream.Collectors; /** * This handler can be used to submit jobs to a Flink cluster. */ public final class JobSubmitHandler extends AbstractRestHandler { + private static final String FILE_TYPE_GRAPH = "JobGraph"; + private static final String FILE_TYPE_JAR = "Jar"; + private static final String FILE_TYPE_ARTIFACT = "Artifact"; + + private final Executor executor; + public JobSubmitHandler( CompletableFuture localRestAddress, GatewayRetriever leaderRetriever, Time timeout, - Map headers) { + Map headers, + Executor executor) { super(localRestAddress, leaderRetriever, timeout, headers, JobSubmitHeaders.getInstance()); + this.executor = executor; } @Override protected CompletableFuture handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws RestHandlerException { - JobGraph jobGraph; - try { - ObjectInputStream objectIn = new ObjectInputStream(new ByteArrayInputStream(request.getRequestBody().serializedJobGraph)); - jobGraph = (JobGraph) objectIn.readObject(); - } catch (Exception e) { + Collection uploadedFiles = request.getUploadedFiles(); + Map nameToFile = uploadedFiles.stream().collect(Collectors.toMap( + path -> path.toPath().getFileName().toString(), --- End diff -- Let's add a type `(File file) -> ...` ---
[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6203#discussion_r199177926 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java --- @@ -315,36 +315,58 @@ public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) // we have to enable queued scheduling because slot will be allocated lazily jobGraph.setAllowQueuedScheduling(true); - log.info("Requesting blob server port."); - CompletableFuture portFuture = sendRequest(BlobServerPortHeaders.getInstance()); + CompletableFuture submissionFuture = CompletableFuture.supplyAsync( + () -> { + log.info("Submitting job graph."); - CompletableFuture jobUploadFuture = portFuture.thenCombine( - getDispatcherAddress(), - (BlobServerPortResponseBody response, String dispatcherAddress) -> { - final int blobServerPort = response.port; - final InetSocketAddress address = new InetSocketAddress(dispatcherAddress, blobServerPort); + List jarFileNames = new ArrayList<>(8); + List artifactFileNames = new ArrayList<>(8); + Collection filesToUpload = new ArrayList<>(8); + // TODO: need configurable location --- End diff -- Let's create a follow up story for it and remove the TODO ---
[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6203#discussion_r199195443 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java --- @@ -34,38 +40,137 @@ import javax.annotation.Nonnull; -import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.IOException; import java.io.ObjectInputStream; +import java.net.InetSocketAddress; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collection; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; +import java.util.stream.Collectors; /** * This handler can be used to submit jobs to a Flink cluster. */ public final class JobSubmitHandler extends AbstractRestHandler { + private static final String FILE_TYPE_GRAPH = "JobGraph"; + private static final String FILE_TYPE_JAR = "Jar"; + private static final String FILE_TYPE_ARTIFACT = "Artifact"; + + private final Executor executor; + public JobSubmitHandler( CompletableFuture localRestAddress, GatewayRetriever leaderRetriever, Time timeout, - Map headers) { + Map headers, + Executor executor) { super(localRestAddress, leaderRetriever, timeout, headers, JobSubmitHeaders.getInstance()); + this.executor = executor; } @Override protected CompletableFuture handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws RestHandlerException { - JobGraph jobGraph; - try { - ObjectInputStream objectIn = new ObjectInputStream(new ByteArrayInputStream(request.getRequestBody().serializedJobGraph)); - jobGraph = (JobGraph) objectIn.readObject(); - } catch (Exception e) { + Collection uploadedFiles = request.getUploadedFiles(); + Map nameToFile = uploadedFiles.stream().collect(Collectors.toMap( + path -> path.toPath().getFileName().toString(), + File::toPath --- End diff -- Moreover we would no longer have to prefix the Flink `Path` type because we no longer have an ambiguity. ---
[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6203#discussion_r199199366 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java --- @@ -71,20 +112,131 @@ public void testSerializationFailureHandling() throws Exception { @Test public void testSuccessfulJobSubmission() throws Exception { + final Path jobGraphFile = TEMPORARY_FOLDER.newFile().toPath(); + try (OutputStream fileOut = Files.newOutputStream(jobGraphFile)) { + try (ObjectOutputStream objectOut = new ObjectOutputStream(fileOut)) { + objectOut.writeObject(new JobGraph("testjob")); + } + } + DispatcherGateway mockGateway = mock(DispatcherGateway.class); + when(mockGateway.getHostname()).thenReturn("localhost"); + when(mockGateway.getBlobServerPort(any(Time.class))).thenReturn(CompletableFuture.completedFuture(blobServer.getPort())); when(mockGateway.submitJob(any(JobGraph.class), any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get())); GatewayRetriever mockGatewayRetriever = mock(GatewayRetriever.class); JobSubmitHandler handler = new JobSubmitHandler( CompletableFuture.completedFuture("http://localhost:1234;), mockGatewayRetriever, RpcUtils.INF_TIMEOUT, - Collections.emptyMap()); + Collections.emptyMap(), + TestingUtils.defaultExecutor()); - JobGraph job = new JobGraph("testjob"); - JobSubmitRequestBody request = new JobSubmitRequestBody(job); + JobSubmitRequestBody request = new JobSubmitRequestBody(jobGraphFile.getFileName().toString(), Collections.emptyList(), Collections.emptyList()); - handler.handleRequest(new HandlerRequest<>(request, EmptyMessageParameters.getInstance()), mockGateway) + handler.handleRequest(new HandlerRequest<>(request, EmptyMessageParameters.getInstance(), Collections.emptyMap(), Collections.emptyMap(), Collections.singleton(jobGraphFile.toFile())), mockGateway) .get(); } + + @Test + public void testRejectionOnCountMismatch() throws Exception { + final Path jobGraphFile = TEMPORARY_FOLDER.newFile().toPath(); + try (OutputStream fileOut = Files.newOutputStream(jobGraphFile)) { + try (ObjectOutputStream objectOut = new ObjectOutputStream(fileOut)) { + objectOut.writeObject(new JobGraph("testjob")); + } + } + final Path countExceedingFile = TEMPORARY_FOLDER.newFile().toPath(); + + DispatcherGateway mockGateway = mock(DispatcherGateway.class); + when(mockGateway.getHostname()).thenReturn("localhost"); + when(mockGateway.getBlobServerPort(any(Time.class))).thenReturn(CompletableFuture.completedFuture(blobServer.getPort())); + when(mockGateway.submitJob(any(JobGraph.class), any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get())); + GatewayRetriever mockGatewayRetriever = mock(GatewayRetriever.class); + + JobSubmitHandler handler = new JobSubmitHandler( + CompletableFuture.completedFuture("http://localhost:1234;), + mockGatewayRetriever, + RpcUtils.INF_TIMEOUT, + Collections.emptyMap(), + TestingUtils.defaultExecutor()); + + JobSubmitRequestBody request = new JobSubmitRequestBody(jobGraphFile.getFileName().toString(), Collections.emptyList(), Collections.emptyList()); + + try { + handler.handleRequest(new HandlerRequest<>(request, EmptyMessageParameters.getInstance(), Collections.emptyMap(), Collections.emptyMap(), Arrays.asList(jobGraphFile.toFile(), countExceedingFile.toFile())), mockGateway) + .get(); + } catch (Exception e) { + ExceptionUtils.findThrowable(e, candidate -> candidate instanceof RestHandlerException && candidate.getMessage().contains("count")); + } + } + + @Test + public void testFileHandling() throws Exception { + final String dcEntryName = "entry"; + + CompletableFuture submittedJobGraphFuture = new CompletableFuture<>(); + DispatcherGateway dispatcherGateway = new TestingDispatcherGateway.Builder() + .setBlobServerPort(blobServer.getPort()) +
[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6203#discussion_r199188278 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java --- @@ -34,38 +40,137 @@ import javax.annotation.Nonnull; -import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.IOException; import java.io.ObjectInputStream; +import java.net.InetSocketAddress; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collection; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; +import java.util.stream.Collectors; /** * This handler can be used to submit jobs to a Flink cluster. */ public final class JobSubmitHandler extends AbstractRestHandler { + private static final String FILE_TYPE_GRAPH = "JobGraph"; + private static final String FILE_TYPE_JAR = "Jar"; + private static final String FILE_TYPE_ARTIFACT = "Artifact"; + + private final Executor executor; + public JobSubmitHandler( CompletableFuture localRestAddress, GatewayRetriever leaderRetriever, Time timeout, - Map headers) { + Map headers, + Executor executor) { super(localRestAddress, leaderRetriever, timeout, headers, JobSubmitHeaders.getInstance()); + this.executor = executor; } @Override protected CompletableFuture handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws RestHandlerException { - JobGraph jobGraph; - try { - ObjectInputStream objectIn = new ObjectInputStream(new ByteArrayInputStream(request.getRequestBody().serializedJobGraph)); - jobGraph = (JobGraph) objectIn.readObject(); - } catch (Exception e) { + Collection uploadedFiles = request.getUploadedFiles(); + Map nameToFile = uploadedFiles.stream().collect(Collectors.toMap( + path -> path.toPath().getFileName().toString(), + File::toPath + )); + + if (uploadedFiles.size() != nameToFile.size()) { throw new RestHandlerException( - "Failed to deserialize JobGraph.", - HttpResponseStatus.BAD_REQUEST, - e); + String.format("The number of uploaded files was %s than the expected count. Expected: %s Actual %s", + uploadedFiles.size() < nameToFile.size() ? "lower" : "higher", + nameToFile.size(), + uploadedFiles.size()), + HttpResponseStatus.BAD_REQUEST + ); + } + + JobSubmitRequestBody requestBody = request.getRequestBody(); + + Path jobGraphFile = getPathAndAssertUpload(requestBody.jobGraphFileName, FILE_TYPE_GRAPH, nameToFile); + + CompletableFuture jobGraphFuture = CompletableFuture.supplyAsync(() -> { + JobGraph jobGraph; + try (ObjectInputStream objectIn = new ObjectInputStream(Files.newInputStream(jobGraphFile))) { + jobGraph = (JobGraph) objectIn.readObject(); + } catch (Exception e) { + throw new CompletionException(new RestHandlerException( + "Failed to deserialize JobGraph.", + HttpResponseStatus.BAD_REQUEST, + e)); + } + return jobGraph; + }, executor); + + Collection jarFiles = getJarFilesToUpload(nameToFile, requestBody.jarFileNames); + + Collection> artifacts = getArtifactFilesToUpload(nameToFile, requestBody.artifactFileNames); + + CompletableFuture blobServerPortFuture = gateway.getBlobServerPort(timeout); + + CompletableFuture finalizedJobGraphFuture = jobGraphFuture.thenCombine(blobServerPortFuture, (jobGraph, blobServerPort) -> { --- End diff -- Let's add types to the lambda parameters. ---
[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6203#discussion_r199189017 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java --- @@ -34,38 +40,137 @@ import javax.annotation.Nonnull; -import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.IOException; import java.io.ObjectInputStream; +import java.net.InetSocketAddress; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collection; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; +import java.util.stream.Collectors; /** * This handler can be used to submit jobs to a Flink cluster. */ public final class JobSubmitHandler extends AbstractRestHandler { + private static final String FILE_TYPE_GRAPH = "JobGraph"; + private static final String FILE_TYPE_JAR = "Jar"; + private static final String FILE_TYPE_ARTIFACT = "Artifact"; + + private final Executor executor; + public JobSubmitHandler( CompletableFuture localRestAddress, GatewayRetriever leaderRetriever, Time timeout, - Map headers) { + Map headers, + Executor executor) { super(localRestAddress, leaderRetriever, timeout, headers, JobSubmitHeaders.getInstance()); + this.executor = executor; } @Override protected CompletableFuture handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws RestHandlerException { - JobGraph jobGraph; - try { - ObjectInputStream objectIn = new ObjectInputStream(new ByteArrayInputStream(request.getRequestBody().serializedJobGraph)); - jobGraph = (JobGraph) objectIn.readObject(); - } catch (Exception e) { + Collection uploadedFiles = request.getUploadedFiles(); + Map nameToFile = uploadedFiles.stream().collect(Collectors.toMap( + path -> path.toPath().getFileName().toString(), + File::toPath + )); + + if (uploadedFiles.size() != nameToFile.size()) { throw new RestHandlerException( - "Failed to deserialize JobGraph.", - HttpResponseStatus.BAD_REQUEST, - e); + String.format("The number of uploaded files was %s than the expected count. Expected: %s Actual %s", + uploadedFiles.size() < nameToFile.size() ? "lower" : "higher", + nameToFile.size(), + uploadedFiles.size()), + HttpResponseStatus.BAD_REQUEST + ); + } + + JobSubmitRequestBody requestBody = request.getRequestBody(); + + Path jobGraphFile = getPathAndAssertUpload(requestBody.jobGraphFileName, FILE_TYPE_GRAPH, nameToFile); + + CompletableFuture jobGraphFuture = CompletableFuture.supplyAsync(() -> { + JobGraph jobGraph; + try (ObjectInputStream objectIn = new ObjectInputStream(Files.newInputStream(jobGraphFile))) { + jobGraph = (JobGraph) objectIn.readObject(); + } catch (Exception e) { + throw new CompletionException(new RestHandlerException( + "Failed to deserialize JobGraph.", + HttpResponseStatus.BAD_REQUEST, + e)); + } + return jobGraph; + }, executor); + + Collection jarFiles = getJarFilesToUpload(nameToFile, requestBody.jarFileNames); + + Collection> artifacts = getArtifactFilesToUpload(nameToFile, requestBody.artifactFileNames); + + CompletableFuture blobServerPortFuture = gateway.getBlobServerPort(timeout); + + CompletableFuture finalizedJobGraphFuture = jobGraphFuture.thenCombine(blobServerPortFuture, (jobGraph, blobServerPort) -> { + final InetSocketAddress address = new InetSocketAddress(gateway.getHostname(), blobServerPort); + try (BlobClient blobClient = new BlobClient(address, new Configuration())){ + Collection jarBlobKeys =
[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6203#discussion_r199198445 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java --- @@ -71,20 +112,131 @@ public void testSerializationFailureHandling() throws Exception { @Test public void testSuccessfulJobSubmission() throws Exception { + final Path jobGraphFile = TEMPORARY_FOLDER.newFile().toPath(); + try (OutputStream fileOut = Files.newOutputStream(jobGraphFile)) { + try (ObjectOutputStream objectOut = new ObjectOutputStream(fileOut)) { + objectOut.writeObject(new JobGraph("testjob")); + } + } + DispatcherGateway mockGateway = mock(DispatcherGateway.class); + when(mockGateway.getHostname()).thenReturn("localhost"); + when(mockGateway.getBlobServerPort(any(Time.class))).thenReturn(CompletableFuture.completedFuture(blobServer.getPort())); when(mockGateway.submitJob(any(JobGraph.class), any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get())); GatewayRetriever mockGatewayRetriever = mock(GatewayRetriever.class); JobSubmitHandler handler = new JobSubmitHandler( CompletableFuture.completedFuture("http://localhost:1234;), mockGatewayRetriever, RpcUtils.INF_TIMEOUT, - Collections.emptyMap()); + Collections.emptyMap(), + TestingUtils.defaultExecutor()); - JobGraph job = new JobGraph("testjob"); - JobSubmitRequestBody request = new JobSubmitRequestBody(job); + JobSubmitRequestBody request = new JobSubmitRequestBody(jobGraphFile.getFileName().toString(), Collections.emptyList(), Collections.emptyList()); - handler.handleRequest(new HandlerRequest<>(request, EmptyMessageParameters.getInstance()), mockGateway) + handler.handleRequest(new HandlerRequest<>(request, EmptyMessageParameters.getInstance(), Collections.emptyMap(), Collections.emptyMap(), Collections.singleton(jobGraphFile.toFile())), mockGateway) .get(); } + + @Test + public void testRejectionOnCountMismatch() throws Exception { + final Path jobGraphFile = TEMPORARY_FOLDER.newFile().toPath(); + try (OutputStream fileOut = Files.newOutputStream(jobGraphFile)) { + try (ObjectOutputStream objectOut = new ObjectOutputStream(fileOut)) { + objectOut.writeObject(new JobGraph("testjob")); + } + } + final Path countExceedingFile = TEMPORARY_FOLDER.newFile().toPath(); + + DispatcherGateway mockGateway = mock(DispatcherGateway.class); --- End diff -- Maybe we could replace it by the `TestingDispatcherGateway` ---
[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6203#discussion_r199191206 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java --- @@ -34,38 +40,137 @@ import javax.annotation.Nonnull; -import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.IOException; import java.io.ObjectInputStream; +import java.net.InetSocketAddress; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collection; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; +import java.util.stream.Collectors; /** * This handler can be used to submit jobs to a Flink cluster. */ public final class JobSubmitHandler extends AbstractRestHandler { + private static final String FILE_TYPE_GRAPH = "JobGraph"; + private static final String FILE_TYPE_JAR = "Jar"; + private static final String FILE_TYPE_ARTIFACT = "Artifact"; + + private final Executor executor; + public JobSubmitHandler( CompletableFuture localRestAddress, GatewayRetriever leaderRetriever, Time timeout, - Map headers) { + Map headers, + Executor executor) { super(localRestAddress, leaderRetriever, timeout, headers, JobSubmitHeaders.getInstance()); + this.executor = executor; } @Override protected CompletableFuture handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws RestHandlerException { - JobGraph jobGraph; - try { - ObjectInputStream objectIn = new ObjectInputStream(new ByteArrayInputStream(request.getRequestBody().serializedJobGraph)); - jobGraph = (JobGraph) objectIn.readObject(); - } catch (Exception e) { + Collection uploadedFiles = request.getUploadedFiles(); + Map nameToFile = uploadedFiles.stream().collect(Collectors.toMap( + path -> path.toPath().getFileName().toString(), + File::toPath + )); + + if (uploadedFiles.size() != nameToFile.size()) { throw new RestHandlerException( - "Failed to deserialize JobGraph.", - HttpResponseStatus.BAD_REQUEST, - e); + String.format("The number of uploaded files was %s than the expected count. Expected: %s Actual %s", + uploadedFiles.size() < nameToFile.size() ? "lower" : "higher", + nameToFile.size(), + uploadedFiles.size()), + HttpResponseStatus.BAD_REQUEST + ); + } + + JobSubmitRequestBody requestBody = request.getRequestBody(); + + Path jobGraphFile = getPathAndAssertUpload(requestBody.jobGraphFileName, FILE_TYPE_GRAPH, nameToFile); + + CompletableFuture jobGraphFuture = CompletableFuture.supplyAsync(() -> { + JobGraph jobGraph; + try (ObjectInputStream objectIn = new ObjectInputStream(Files.newInputStream(jobGraphFile))) { + jobGraph = (JobGraph) objectIn.readObject(); + } catch (Exception e) { + throw new CompletionException(new RestHandlerException( + "Failed to deserialize JobGraph.", + HttpResponseStatus.BAD_REQUEST, + e)); + } + return jobGraph; + }, executor); + + Collection jarFiles = getJarFilesToUpload(nameToFile, requestBody.jarFileNames); + + Collection> artifacts = getArtifactFilesToUpload(nameToFile, requestBody.artifactFileNames); + + CompletableFuture blobServerPortFuture = gateway.getBlobServerPort(timeout); + + CompletableFuture finalizedJobGraphFuture = jobGraphFuture.thenCombine(blobServerPortFuture, (jobGraph, blobServerPort) -> { + final InetSocketAddress address = new InetSocketAddress(gateway.getHostname(), blobServerPort); + try (BlobClient blobClient = new BlobClient(address, new Configuration())){ + Collection jarBlobKeys =
[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6203#discussion_r199195959 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java --- @@ -152,6 +152,7 @@ protected void respondToRequest(ChannelHandlerContext ctx, HttpRequest httpReque }, ctx.executor()); + CompletableFuture processingFinishedFuture = new CompletableFuture<>(); --- End diff -- Let's remove this future and simply return the `whenComplete` return value. ---
[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6203#discussion_r199197524 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java --- @@ -71,20 +112,131 @@ public void testSerializationFailureHandling() throws Exception { @Test public void testSuccessfulJobSubmission() throws Exception { + final Path jobGraphFile = TEMPORARY_FOLDER.newFile().toPath(); + try (OutputStream fileOut = Files.newOutputStream(jobGraphFile)) { + try (ObjectOutputStream objectOut = new ObjectOutputStream(fileOut)) { --- End diff -- These tries could be collapsed. ---
[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6203#discussion_r199198501 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java --- @@ -71,20 +112,131 @@ public void testSerializationFailureHandling() throws Exception { @Test public void testSuccessfulJobSubmission() throws Exception { + final Path jobGraphFile = TEMPORARY_FOLDER.newFile().toPath(); + try (OutputStream fileOut = Files.newOutputStream(jobGraphFile)) { + try (ObjectOutputStream objectOut = new ObjectOutputStream(fileOut)) { + objectOut.writeObject(new JobGraph("testjob")); + } + } + DispatcherGateway mockGateway = mock(DispatcherGateway.class); --- End diff -- Maybe replace by `TestingDispatcherGateway`. ---
[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6203#discussion_r199196388 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java --- @@ -18,64 +18,118 @@ package org.apache.flink.runtime.rest.messages.job; -import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.rest.messages.RequestBody; -import org.apache.flink.util.Preconditions; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.ObjectOutputStream; -import java.util.Arrays; +import java.util.Collection; +import java.util.Objects; /** * Request for submitting a job. * - * We currently require the job-jars to be uploaded through the blob-server. + * This request only contains the names of files that must be present on the server, and defines how these files are + * interpreted. */ public final class JobSubmitRequestBody implements RequestBody { - private static final String FIELD_NAME_SERIALIZED_JOB_GRAPH = "serializedJobGraph"; + private static final String FIELD_NAME_JOB_GRAPH = "jobGraphFileName"; + private static final String FIELD_NAME_JOB_JARS = "jobJarFileNames"; + private static final String FIELD_NAME_JOB_ARTIFACTS = "jobArtifactFileNames"; - /** -* The serialized job graph. -*/ - @JsonProperty(FIELD_NAME_SERIALIZED_JOB_GRAPH) - public final byte[] serializedJobGraph; + @JsonProperty(FIELD_NAME_JOB_GRAPH) + public final String jobGraphFileName; - public JobSubmitRequestBody(JobGraph jobGraph) throws IOException { - this(serializeJobGraph(jobGraph)); - } + @JsonProperty(FIELD_NAME_JOB_JARS) + public final Collection jarFileNames; + + @JsonProperty(FIELD_NAME_JOB_ARTIFACTS) + public final Collection artifactFileNames; - @JsonCreator public JobSubmitRequestBody( - @JsonProperty(FIELD_NAME_SERIALIZED_JOB_GRAPH) byte[] serializedJobGraph) { - this.serializedJobGraph = Preconditions.checkNotNull(serializedJobGraph); + @JsonProperty(FIELD_NAME_JOB_GRAPH) String jobGraphFileName, + @JsonProperty(FIELD_NAME_JOB_JARS) Collection jarFileNames, + @JsonProperty(FIELD_NAME_JOB_ARTIFACTS) Collection artifactFileNames) { + this.jobGraphFileName = jobGraphFileName; + this.jarFileNames = jarFileNames; + this.artifactFileNames = artifactFileNames; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + JobSubmitRequestBody that = (JobSubmitRequestBody) o; + return Objects.equals(jobGraphFileName, that.jobGraphFileName) && + Objects.equals(jarFileNames, that.jarFileNames) && + Objects.equals(artifactFileNames, that.artifactFileNames); } @Override public int hashCode() { - return 71 * Arrays.hashCode(this.serializedJobGraph); + return Objects.hash(jobGraphFileName, jarFileNames, artifactFileNames); } @Override - public boolean equals(Object object) { - if (object instanceof JobSubmitRequestBody) { - JobSubmitRequestBody other = (JobSubmitRequestBody) object; - return Arrays.equals(this.serializedJobGraph, other.serializedJobGraph); - } - return false; + public String toString() { + return "JobSubmitRequestBody{" + + "jobGraphFileName='" + jobGraphFileName + '\'' + + ", jarFileNames=" + jarFileNames + + ", artifactFileNames=" + artifactFileNames + + '}'; } - private static byte[] serializeJobGraph(JobGraph jobGraph) throws IOException { - try (ByteArrayOutputStream baos = new ByteArrayOutputStream(64 * 1024)) { - ObjectOutputStream out = new ObjectOutputStream(baos); + /** +* Descriptor for a distributed cache file. +*/ + public static class DistributedCacheFile { + private static final String FIELD_NAME_ENTRY_NAME = "entryName"; + private static final String FIELD_NAME_FILE_NAME = "fileName";
[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6203#discussion_r199195175 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java --- @@ -34,38 +40,137 @@ import javax.annotation.Nonnull; -import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.IOException; import java.io.ObjectInputStream; +import java.net.InetSocketAddress; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collection; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; +import java.util.stream.Collectors; /** * This handler can be used to submit jobs to a Flink cluster. */ public final class JobSubmitHandler extends AbstractRestHandler { + private static final String FILE_TYPE_GRAPH = "JobGraph"; + private static final String FILE_TYPE_JAR = "Jar"; + private static final String FILE_TYPE_ARTIFACT = "Artifact"; + + private final Executor executor; + public JobSubmitHandler( CompletableFuture localRestAddress, GatewayRetriever leaderRetriever, Time timeout, - Map headers) { + Map headers, + Executor executor) { super(localRestAddress, leaderRetriever, timeout, headers, JobSubmitHeaders.getInstance()); + this.executor = executor; } @Override protected CompletableFuture handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws RestHandlerException { - JobGraph jobGraph; - try { - ObjectInputStream objectIn = new ObjectInputStream(new ByteArrayInputStream(request.getRequestBody().serializedJobGraph)); - jobGraph = (JobGraph) objectIn.readObject(); - } catch (Exception e) { + Collection uploadedFiles = request.getUploadedFiles(); + Map nameToFile = uploadedFiles.stream().collect(Collectors.toMap( + path -> path.toPath().getFileName().toString(), + File::toPath --- End diff -- I think we can completely remove the `nio.Path` dependency if we use `Path::fromLocalFile` with `Path` being Flink's path implementation. The benefit would be that we would not have to change types in the `getJarFilesToUpload` and `getArtifactFilesToUpload` methods. Instead of `Files.newInputStream(jobGraphFile)` we would have to call `FileSystem.getLocalFileSystem().open(jobGraphFile)`. ---
[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6203#discussion_r199190292 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java --- @@ -54,18 +69,89 @@ public JobSubmitHandler( @Override protected CompletableFuture handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws RestHandlerException { - JobGraph jobGraph; - try { - ObjectInputStream objectIn = new ObjectInputStream(new ByteArrayInputStream(request.getRequestBody().serializedJobGraph)); - jobGraph = (JobGraph) objectIn.readObject(); - } catch (Exception e) { - throw new RestHandlerException( - "Failed to deserialize JobGraph.", - HttpResponseStatus.BAD_REQUEST, - e); + Collection uploadedFiles = request.getUploadedFiles(); + Map nameToFile = uploadedFiles.stream().collect(Collectors.toMap( + path -> path.getFileName().toString(), + entry -> entry + )); + + JobSubmitRequestBody requestBody = request.getRequestBody(); + + Path jobGraphFile = getPathAndAssertUpload(requestBody.jobGraphFileName, "JobGraph", nameToFile); + + Collection jarFiles = new ArrayList<>(requestBody.jarFileNames.size()); + for (String jarFileName : requestBody.jarFileNames) { + Path jarFile = getPathAndAssertUpload(jarFileName, "Jar", nameToFile); + jarFiles.add(new org.apache.flink.core.fs.Path(jarFile.toString())); + } + + Collection> artifacts = new ArrayList<>(requestBody.artifactFileNames.size()); + for (JobSubmitRequestBody.DistributedCacheFile artifactFileName : requestBody.artifactFileNames) { + Path artifactFile = getPathAndAssertUpload(artifactFileName.fileName, "Artifact", nameToFile); + artifacts.add(Tuple2.of(artifactFileName.entryName, new org.apache.flink.core.fs.Path(artifactFile.toString(; } - return gateway.submitJob(jobGraph, timeout) - .thenApply(ack -> new JobSubmitResponseBody("/jobs/" + jobGraph.getJobID())); + Map temporaryHack = artifacts.stream() + .collect(Collectors.toMap( + tuple -> tuple.f0, + // the actual entry definition is mostly irrelevant as only the blobkey is accessed + // blame whoever wrote the ClientUtils API + tuple -> new DistributedCache.DistributedCacheEntry(tuple.f1.toString(), false) + )); + + // TODO: use executor + CompletableFuture jobGraphFuture = CompletableFuture.supplyAsync(() -> { + JobGraph jobGraph; + try (ObjectInputStream objectIn = new ObjectInputStream(Files.newInputStream(jobGraphFile))) { + jobGraph = (JobGraph) objectIn.readObject(); + } catch (Exception e) { + throw new CompletionException(new RestHandlerException( + "Failed to deserialize JobGraph.", + HttpResponseStatus.BAD_REQUEST, + e)); + } + return jobGraph; + }); + + CompletableFuture blobServerPortFuture = gateway.getBlobServerPort(timeout); + + CompletableFuture finalizedJobGraphFuture = jobGraphFuture.thenCombine(blobServerPortFuture, (jobGraph, blobServerPort) -> { + final InetSocketAddress address = new InetSocketAddress(gateway.getHostname(), blobServerPort); + try (BlobClient blobClient = new BlobClient(address, new Configuration())) { + Collection jarBlobKeys = ClientUtils.uploadUserJars(jobGraph.getJobID(), jarFiles, blobClient); + ClientUtils.setUserJarBlobKeys(jarBlobKeys, jobGraph); + + Collection> artifactBlobKeys = ClientUtils.uploadUserArtifacts(jobGraph.getJobID(), temporaryHack, blobClient); + ClientUtils.setUserArtifactBlobKeys(jobGraph, artifactBlobKeys); + } catch (IOException e) { + throw new CompletionException(new RestHandlerException( + "Could not upload job files.", +
[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6203#discussion_r199182588 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java --- @@ -315,36 +315,58 @@ public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) // we have to enable queued scheduling because slot will be allocated lazily jobGraph.setAllowQueuedScheduling(true); - log.info("Requesting blob server port."); - CompletableFuture portFuture = sendRequest(BlobServerPortHeaders.getInstance()); + CompletableFuture submissionFuture = CompletableFuture.supplyAsync( + () -> { + log.info("Submitting job graph."); - CompletableFuture jobUploadFuture = portFuture.thenCombine( - getDispatcherAddress(), - (BlobServerPortResponseBody response, String dispatcherAddress) -> { - final int blobServerPort = response.port; - final InetSocketAddress address = new InetSocketAddress(dispatcherAddress, blobServerPort); + List jarFileNames = new ArrayList<>(8); + List artifactFileNames = new ArrayList<>(8); + Collection filesToUpload = new ArrayList<>(8); + // TODO: need configurable location + final java.nio.file.Path jobGraphFile; try { - ClientUtils.uploadJobGraphFiles(jobGraph, () -> new BlobClient(address, flinkConfig)); - } catch (Exception e) { - throw new CompletionException(e); + jobGraphFile = Files.createTempFile("flink-jobgraph", ".bin"); + try (OutputStream fileOut = Files.newOutputStream(jobGraphFile)) { + try (ObjectOutputStream objectOut = new ObjectOutputStream(fileOut)) { + objectOut.writeObject(jobGraph); + } + } + filesToUpload.add(new FileUpload(jobGraphFile, RestConstants.CONTENT_TYPE_BINARY)); + } catch (IOException e) { + throw new CompletionException("Failed to serialize JobGraph.", e); } - return jobGraph; - }); - - CompletableFuture submissionFuture = jobUploadFuture.thenCompose( - (JobGraph jobGraphToSubmit) -> { - log.info("Submitting job graph."); + for (Path jar : jobGraph.getUserJars()) { + jarFileNames.add(jar.getName()); + filesToUpload.add(new FileUpload(Paths.get(jar.toUri()), RestConstants.CONTENT_TYPE_JAR)); + } - try { - return sendRequest( - JobSubmitHeaders.getInstance(), - new JobSubmitRequestBody(jobGraph)); - } catch (IOException ioe) { - throw new CompletionException(new FlinkException("Could not create JobSubmitRequestBody.", ioe)); + for (Map.Entry artifacts : jobGraph.getUserArtifacts().entrySet()) { + artifactFileNames.add(new JobSubmitRequestBody.DistributedCacheFile(artifacts.getKey(), new Path(artifacts.getValue().filePath).getName())); + filesToUpload.add(new FileUpload(Paths.get(artifacts.getValue().filePath), RestConstants.CONTENT_TYPE_BINARY)); } - }); + + final CompletableFuture submitFuture = sendRetriableRequest( --- End diff -- Shall we split up the resource preparation and sending the actual request into different steps? Then this `Supplier` would be much simpler: ``` CompletableFuture> requestFuture = CompletableFuture.supplyAsync(() -> ..., executorService); CompletableFuture submissionFuture = requestFuture.thenCompose(requestBody -> sendRetriableRequest()); submissionFuture.whenComplete((ignoredA, ignoredB) -> cleanupFiles)); return submisssionFuture; ``` ---
[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6203#discussion_r199198055 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java --- @@ -71,20 +112,131 @@ public void testSerializationFailureHandling() throws Exception { @Test public void testSuccessfulJobSubmission() throws Exception { + final Path jobGraphFile = TEMPORARY_FOLDER.newFile().toPath(); + try (OutputStream fileOut = Files.newOutputStream(jobGraphFile)) { + try (ObjectOutputStream objectOut = new ObjectOutputStream(fileOut)) { + objectOut.writeObject(new JobGraph("testjob")); + } + } + DispatcherGateway mockGateway = mock(DispatcherGateway.class); + when(mockGateway.getHostname()).thenReturn("localhost"); + when(mockGateway.getBlobServerPort(any(Time.class))).thenReturn(CompletableFuture.completedFuture(blobServer.getPort())); when(mockGateway.submitJob(any(JobGraph.class), any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get())); GatewayRetriever mockGatewayRetriever = mock(GatewayRetriever.class); JobSubmitHandler handler = new JobSubmitHandler( CompletableFuture.completedFuture("http://localhost:1234;), mockGatewayRetriever, RpcUtils.INF_TIMEOUT, - Collections.emptyMap()); + Collections.emptyMap(), + TestingUtils.defaultExecutor()); - JobGraph job = new JobGraph("testjob"); - JobSubmitRequestBody request = new JobSubmitRequestBody(job); + JobSubmitRequestBody request = new JobSubmitRequestBody(jobGraphFile.getFileName().toString(), Collections.emptyList(), Collections.emptyList()); - handler.handleRequest(new HandlerRequest<>(request, EmptyMessageParameters.getInstance()), mockGateway) + handler.handleRequest(new HandlerRequest<>(request, EmptyMessageParameters.getInstance(), Collections.emptyMap(), Collections.emptyMap(), Collections.singleton(jobGraphFile.toFile())), mockGateway) .get(); } + + @Test + public void testRejectionOnCountMismatch() throws Exception { + final Path jobGraphFile = TEMPORARY_FOLDER.newFile().toPath(); + try (OutputStream fileOut = Files.newOutputStream(jobGraphFile)) { + try (ObjectOutputStream objectOut = new ObjectOutputStream(fileOut)) { + objectOut.writeObject(new JobGraph("testjob")); + } + } + final Path countExceedingFile = TEMPORARY_FOLDER.newFile().toPath(); + + DispatcherGateway mockGateway = mock(DispatcherGateway.class); + when(mockGateway.getHostname()).thenReturn("localhost"); + when(mockGateway.getBlobServerPort(any(Time.class))).thenReturn(CompletableFuture.completedFuture(blobServer.getPort())); + when(mockGateway.submitJob(any(JobGraph.class), any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get())); --- End diff -- This is duplicate code, can we avoid it? ---
[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6203#discussion_r199192714 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java --- @@ -34,38 +40,137 @@ import javax.annotation.Nonnull; -import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.IOException; import java.io.ObjectInputStream; +import java.net.InetSocketAddress; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collection; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; +import java.util.stream.Collectors; /** * This handler can be used to submit jobs to a Flink cluster. */ public final class JobSubmitHandler extends AbstractRestHandler { + private static final String FILE_TYPE_GRAPH = "JobGraph"; --- End diff -- Let's call it `FILE_TYPE_JOB_GRAPH` ---
[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6203#discussion_r199187385 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java --- @@ -34,38 +40,137 @@ import javax.annotation.Nonnull; -import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.IOException; import java.io.ObjectInputStream; +import java.net.InetSocketAddress; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collection; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; +import java.util.stream.Collectors; /** * This handler can be used to submit jobs to a Flink cluster. */ public final class JobSubmitHandler extends AbstractRestHandler { + private static final String FILE_TYPE_GRAPH = "JobGraph"; + private static final String FILE_TYPE_JAR = "Jar"; + private static final String FILE_TYPE_ARTIFACT = "Artifact"; + + private final Executor executor; + public JobSubmitHandler( CompletableFuture localRestAddress, GatewayRetriever leaderRetriever, Time timeout, - Map headers) { + Map headers, + Executor executor) { super(localRestAddress, leaderRetriever, timeout, headers, JobSubmitHeaders.getInstance()); + this.executor = executor; } @Override protected CompletableFuture handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws RestHandlerException { - JobGraph jobGraph; - try { - ObjectInputStream objectIn = new ObjectInputStream(new ByteArrayInputStream(request.getRequestBody().serializedJobGraph)); - jobGraph = (JobGraph) objectIn.readObject(); - } catch (Exception e) { + Collection uploadedFiles = request.getUploadedFiles(); + Map nameToFile = uploadedFiles.stream().collect(Collectors.toMap( + path -> path.toPath().getFileName().toString(), --- End diff -- Alternatively we could also call `File::getName` ---
[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6203#discussion_r199186445 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java --- @@ -79,6 +79,7 @@ protected void respondToRequest(ChannelHandlerContext ctx, HttpRequest httpReque response = FutureUtils.completedExceptionally(e); } + CompletableFuture processingFinishedFuture = new CompletableFuture<>(); response.whenComplete((P resp, Throwable throwable) -> { --- End diff -- Can't we simply return the result of the `whenComplete` stage here? We would just need to change the return type of `respondToRequest` to `CompletableFuture`. ---
[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6203#discussion_r199188036 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java --- @@ -34,38 +40,137 @@ import javax.annotation.Nonnull; -import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.IOException; import java.io.ObjectInputStream; +import java.net.InetSocketAddress; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collection; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; +import java.util.stream.Collectors; /** * This handler can be used to submit jobs to a Flink cluster. */ public final class JobSubmitHandler extends AbstractRestHandler { + private static final String FILE_TYPE_GRAPH = "JobGraph"; + private static final String FILE_TYPE_JAR = "Jar"; + private static final String FILE_TYPE_ARTIFACT = "Artifact"; + + private final Executor executor; + public JobSubmitHandler( CompletableFuture localRestAddress, GatewayRetriever leaderRetriever, Time timeout, - Map headers) { + Map headers, + Executor executor) { super(localRestAddress, leaderRetriever, timeout, headers, JobSubmitHeaders.getInstance()); + this.executor = executor; } @Override protected CompletableFuture handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws RestHandlerException { - JobGraph jobGraph; - try { - ObjectInputStream objectIn = new ObjectInputStream(new ByteArrayInputStream(request.getRequestBody().serializedJobGraph)); - jobGraph = (JobGraph) objectIn.readObject(); - } catch (Exception e) { + Collection uploadedFiles = request.getUploadedFiles(); + Map nameToFile = uploadedFiles.stream().collect(Collectors.toMap( + path -> path.toPath().getFileName().toString(), + File::toPath + )); + + if (uploadedFiles.size() != nameToFile.size()) { throw new RestHandlerException( - "Failed to deserialize JobGraph.", - HttpResponseStatus.BAD_REQUEST, - e); + String.format("The number of uploaded files was %s than the expected count. Expected: %s Actual %s", + uploadedFiles.size() < nameToFile.size() ? "lower" : "higher", + nameToFile.size(), + uploadedFiles.size()), + HttpResponseStatus.BAD_REQUEST + ); + } + + JobSubmitRequestBody requestBody = request.getRequestBody(); + + Path jobGraphFile = getPathAndAssertUpload(requestBody.jobGraphFileName, FILE_TYPE_GRAPH, nameToFile); + + CompletableFuture jobGraphFuture = CompletableFuture.supplyAsync(() -> { + JobGraph jobGraph; + try (ObjectInputStream objectIn = new ObjectInputStream(Files.newInputStream(jobGraphFile))) { + jobGraph = (JobGraph) objectIn.readObject(); + } catch (Exception e) { + throw new CompletionException(new RestHandlerException( + "Failed to deserialize JobGraph.", + HttpResponseStatus.BAD_REQUEST, + e)); + } + return jobGraph; + }, executor); + + Collection jarFiles = getJarFilesToUpload(nameToFile, requestBody.jarFileNames); --- End diff -- In `getPathAndAssertUpload` the parameters are in the other order. Would be cool to make it consistent. ---
[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6203#discussion_r199179398 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java --- @@ -315,36 +315,58 @@ public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) // we have to enable queued scheduling because slot will be allocated lazily jobGraph.setAllowQueuedScheduling(true); - log.info("Requesting blob server port."); - CompletableFuture portFuture = sendRequest(BlobServerPortHeaders.getInstance()); + CompletableFuture submissionFuture = CompletableFuture.supplyAsync( + () -> { + log.info("Submitting job graph."); - CompletableFuture jobUploadFuture = portFuture.thenCombine( - getDispatcherAddress(), - (BlobServerPortResponseBody response, String dispatcherAddress) -> { - final int blobServerPort = response.port; - final InetSocketAddress address = new InetSocketAddress(dispatcherAddress, blobServerPort); + List jarFileNames = new ArrayList<>(8); + List artifactFileNames = new ArrayList<>(8); + Collection filesToUpload = new ArrayList<>(8); + // TODO: need configurable location + final java.nio.file.Path jobGraphFile; try { - ClientUtils.uploadJobGraphFiles(jobGraph, () -> new BlobClient(address, flinkConfig)); - } catch (Exception e) { - throw new CompletionException(e); + jobGraphFile = Files.createTempFile("flink-jobgraph", ".bin"); + try (OutputStream fileOut = Files.newOutputStream(jobGraphFile)) { + try (ObjectOutputStream objectOut = new ObjectOutputStream(fileOut)) { + objectOut.writeObject(jobGraph); + } + } + filesToUpload.add(new FileUpload(jobGraphFile, RestConstants.CONTENT_TYPE_BINARY)); + } catch (IOException e) { + throw new CompletionException("Failed to serialize JobGraph.", e); --- End diff -- I think the failure message `"Failed to serialize JobGraph` should go to a dedicated exception because completion exceptions can be filtered out. `throw new CompletionException(new FlinkException("Failed to serialize JobGraph.", e))` ---
[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6203#discussion_r199094230 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java --- @@ -54,18 +69,89 @@ public JobSubmitHandler( @Override protected CompletableFuture handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws RestHandlerException { - JobGraph jobGraph; - try { - ObjectInputStream objectIn = new ObjectInputStream(new ByteArrayInputStream(request.getRequestBody().serializedJobGraph)); - jobGraph = (JobGraph) objectIn.readObject(); - } catch (Exception e) { - throw new RestHandlerException( - "Failed to deserialize JobGraph.", - HttpResponseStatus.BAD_REQUEST, - e); + Collection uploadedFiles = request.getUploadedFiles(); --- End diff -- What about `ClientUtils`, should they also accept `File` collections? ---
[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6203#discussion_r199077598 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java --- @@ -54,18 +69,89 @@ public JobSubmitHandler( @Override protected CompletableFuture handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws RestHandlerException { - JobGraph jobGraph; - try { - ObjectInputStream objectIn = new ObjectInputStream(new ByteArrayInputStream(request.getRequestBody().serializedJobGraph)); - jobGraph = (JobGraph) objectIn.readObject(); - } catch (Exception e) { - throw new RestHandlerException( - "Failed to deserialize JobGraph.", - HttpResponseStatus.BAD_REQUEST, - e); + Collection uploadedFiles = request.getUploadedFiles(); --- End diff -- So how far do you propose to go? Should `HandlerRequest` also receive a `Collection`, or convert the existing `Collection`? Should `FileUploads#getUploadedFiles` return a `Collection`? ---
[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6203#discussion_r198178787 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java --- @@ -315,42 +315,58 @@ public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) // we have to enable queued scheduling because slot will be allocated lazily jobGraph.setAllowQueuedScheduling(true); - log.info("Requesting blob server port."); - CompletableFuture portFuture = sendRequest(BlobServerPortHeaders.getInstance()); + CompletableFuture submissionFuture = CompletableFuture.supplyAsync( + () -> { + log.info("Submitting job graph."); + + List jarFileNames = new ArrayList<>(8); + List artifactFileNames = new ArrayList<>(8); + Collection filesToUpload = new ArrayList<>(8); - CompletableFuture jobUploadFuture = portFuture.thenCombine( - getDispatcherAddress(), - (BlobServerPortResponseBody response, String dispatcherAddress) -> { - final int blobServerPort = response.port; - final InetSocketAddress address = new InetSocketAddress(dispatcherAddress, blobServerPort); - final List keys; + // TODO: need configurable location + final java.nio.file.Path jobGraphFile; try { - log.info("Uploading jar files."); - keys = BlobClient.uploadFiles(address, flinkConfig, jobGraph.getJobID(), jobGraph.getUserJars()); - jobGraph.uploadUserArtifacts(address, flinkConfig); - } catch (IOException ioe) { - throw new CompletionException(new FlinkException("Could not upload job files.", ioe)); + jobGraphFile = Files.createTempFile("flink-jobgraph", ".bin"); + try (OutputStream fileOut = Files.newOutputStream(jobGraphFile)) { + try (ObjectOutputStream objectOut = new ObjectOutputStream(fileOut)) { + objectOut.writeObject(jobGraph); + } + } + filesToUpload.add(new FileUpload(jobGraphFile, RestConstants.CONTENT_TYPE_BINARY)); + } catch (IOException e) { + throw new RuntimeException("lol", e); --- End diff -- needs a proper exception ---
[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6203#discussion_r198033040 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java --- @@ -54,18 +69,89 @@ public JobSubmitHandler( @Override protected CompletableFuture handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws RestHandlerException { - JobGraph jobGraph; - try { - ObjectInputStream objectIn = new ObjectInputStream(new ByteArrayInputStream(request.getRequestBody().serializedJobGraph)); - jobGraph = (JobGraph) objectIn.readObject(); - } catch (Exception e) { - throw new RestHandlerException( - "Failed to deserialize JobGraph.", - HttpResponseStatus.BAD_REQUEST, - e); + Collection uploadedFiles = request.getUploadedFiles(); --- End diff -- I agree that nio `Paths` are more powerful. I'm just wondering whether we actually need this flexibility here. Usually it is a good idea to make the interface as restrictive as possible and widen it on demand. Moreover, there is also Flink's `Path` which comes with a bit of different semantics. For example, you have the safety net for closing open file streams which is also used to interrupt I/O operations which are otherwise not interruptible. Mixing the nio paths in there, might give the false impression that this also applies to them. So I'm just saying that we should make this decision consciously and not only based on which type is more convenient to be used. ---
[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6203#discussion_r197735309 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java --- @@ -54,18 +69,89 @@ public JobSubmitHandler( @Override protected CompletableFuture handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws RestHandlerException { - JobGraph jobGraph; - try { - ObjectInputStream objectIn = new ObjectInputStream(new ByteArrayInputStream(request.getRequestBody().serializedJobGraph)); - jobGraph = (JobGraph) objectIn.readObject(); - } catch (Exception e) { - throw new RestHandlerException( - "Failed to deserialize JobGraph.", - HttpResponseStatus.BAD_REQUEST, - e); + Collection uploadedFiles = request.getUploadedFiles(); --- End diff -- I was wondering when you might bring that up :) I'm not a fan of exposing `File`. * whether it _really is a local file_ shouldn't be relevant to the handler * nio Paths are more flexible than files; for example if the DistributedCache were to return files we wouldn't have to extract zips as we could mount it with a `ZipFileSystem` instead * always having to convert using `File#toPath` is rather tedious ---
[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6203#discussion_r197493894 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java --- @@ -54,18 +69,89 @@ public JobSubmitHandler( @Override protected CompletableFuture handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws RestHandlerException { - JobGraph jobGraph; - try { - ObjectInputStream objectIn = new ObjectInputStream(new ByteArrayInputStream(request.getRequestBody().serializedJobGraph)); - jobGraph = (JobGraph) objectIn.readObject(); - } catch (Exception e) { - throw new RestHandlerException( - "Failed to deserialize JobGraph.", - HttpResponseStatus.BAD_REQUEST, - e); + Collection uploadedFiles = request.getUploadedFiles(); + Map nameToFile = uploadedFiles.stream().collect(Collectors.toMap( + path -> path.getFileName().toString(), + entry -> entry + )); + + JobSubmitRequestBody requestBody = request.getRequestBody(); + + Path jobGraphFile = getPathAndAssertUpload(requestBody.jobGraphFileName, "JobGraph", nameToFile); + + Collection jarFiles = new ArrayList<>(requestBody.jarFileNames.size()); + for (String jarFileName : requestBody.jarFileNames) { + Path jarFile = getPathAndAssertUpload(jarFileName, "Jar", nameToFile); + jarFiles.add(new org.apache.flink.core.fs.Path(jarFile.toString())); + } + + Collection> artifacts = new ArrayList<>(requestBody.artifactFileNames.size()); + for (JobSubmitRequestBody.DistributedCacheFile artifactFileName : requestBody.artifactFileNames) { + Path artifactFile = getPathAndAssertUpload(artifactFileName.fileName, "Artifact", nameToFile); + artifacts.add(Tuple2.of(artifactFileName.entryName, new org.apache.flink.core.fs.Path(artifactFile.toString(; } - return gateway.submitJob(jobGraph, timeout) - .thenApply(ack -> new JobSubmitResponseBody("/jobs/" + jobGraph.getJobID())); + Map temporaryHack = artifacts.stream() + .collect(Collectors.toMap( + tuple -> tuple.f0, + // the actual entry definition is mostly irrelevant as only the blobkey is accessed + // blame whoever wrote the ClientUtils API + tuple -> new DistributedCache.DistributedCacheEntry(tuple.f1.toString(), false) + )); + + // TODO: use executor + CompletableFuture jobGraphFuture = CompletableFuture.supplyAsync(() -> { + JobGraph jobGraph; + try (ObjectInputStream objectIn = new ObjectInputStream(Files.newInputStream(jobGraphFile))) { + jobGraph = (JobGraph) objectIn.readObject(); + } catch (Exception e) { + throw new CompletionException(new RestHandlerException( + "Failed to deserialize JobGraph.", + HttpResponseStatus.BAD_REQUEST, + e)); + } + return jobGraph; + }); + + CompletableFuture blobServerPortFuture = gateway.getBlobServerPort(timeout); + + CompletableFuture finalizedJobGraphFuture = jobGraphFuture.thenCombine(blobServerPortFuture, (jobGraph, blobServerPort) -> { + final InetSocketAddress address = new InetSocketAddress(gateway.getHostname(), blobServerPort); + try (BlobClient blobClient = new BlobClient(address, new Configuration())) { + Collection jarBlobKeys = ClientUtils.uploadUserJars(jobGraph.getJobID(), jarFiles, blobClient); + ClientUtils.setUserJarBlobKeys(jarBlobKeys, jobGraph); + + Collection> artifactBlobKeys = ClientUtils.uploadUserArtifacts(jobGraph.getJobID(), temporaryHack, blobClient); + ClientUtils.setUserArtifactBlobKeys(jobGraph, artifactBlobKeys); + } catch (IOException e) { + throw new CompletionException(new RestHandlerException( + "Could not upload job files.", +
[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6203#discussion_r197492045 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java --- @@ -54,18 +69,89 @@ public JobSubmitHandler( @Override protected CompletableFuture handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws RestHandlerException { - JobGraph jobGraph; - try { - ObjectInputStream objectIn = new ObjectInputStream(new ByteArrayInputStream(request.getRequestBody().serializedJobGraph)); - jobGraph = (JobGraph) objectIn.readObject(); - } catch (Exception e) { - throw new RestHandlerException( - "Failed to deserialize JobGraph.", - HttpResponseStatus.BAD_REQUEST, - e); + Collection uploadedFiles = request.getUploadedFiles(); + Map nameToFile = uploadedFiles.stream().collect(Collectors.toMap( + path -> path.getFileName().toString(), + entry -> entry + )); + + JobSubmitRequestBody requestBody = request.getRequestBody(); + + Path jobGraphFile = getPathAndAssertUpload(requestBody.jobGraphFileName, "JobGraph", nameToFile); + + Collection jarFiles = new ArrayList<>(requestBody.jarFileNames.size()); + for (String jarFileName : requestBody.jarFileNames) { + Path jarFile = getPathAndAssertUpload(jarFileName, "Jar", nameToFile); + jarFiles.add(new org.apache.flink.core.fs.Path(jarFile.toString())); + } + + Collection> artifacts = new ArrayList<>(requestBody.artifactFileNames.size()); + for (JobSubmitRequestBody.DistributedCacheFile artifactFileName : requestBody.artifactFileNames) { + Path artifactFile = getPathAndAssertUpload(artifactFileName.fileName, "Artifact", nameToFile); + artifacts.add(Tuple2.of(artifactFileName.entryName, new org.apache.flink.core.fs.Path(artifactFile.toString(; } - return gateway.submitJob(jobGraph, timeout) - .thenApply(ack -> new JobSubmitResponseBody("/jobs/" + jobGraph.getJobID())); + Map temporaryHack = artifacts.stream() + .collect(Collectors.toMap( + tuple -> tuple.f0, + // the actual entry definition is mostly irrelevant as only the blobkey is accessed + // blame whoever wrote the ClientUtils API + tuple -> new DistributedCache.DistributedCacheEntry(tuple.f1.toString(), false) + )); + + // TODO: use executor + CompletableFuture jobGraphFuture = CompletableFuture.supplyAsync(() -> { + JobGraph jobGraph; + try (ObjectInputStream objectIn = new ObjectInputStream(Files.newInputStream(jobGraphFile))) { + jobGraph = (JobGraph) objectIn.readObject(); + } catch (Exception e) { + throw new CompletionException(new RestHandlerException( + "Failed to deserialize JobGraph.", + HttpResponseStatus.BAD_REQUEST, + e)); + } + return jobGraph; + }); + + CompletableFuture blobServerPortFuture = gateway.getBlobServerPort(timeout); + + CompletableFuture finalizedJobGraphFuture = jobGraphFuture.thenCombine(blobServerPortFuture, (jobGraph, blobServerPort) -> { + final InetSocketAddress address = new InetSocketAddress(gateway.getHostname(), blobServerPort); + try (BlobClient blobClient = new BlobClient(address, new Configuration())) { + Collection jarBlobKeys = ClientUtils.uploadUserJars(jobGraph.getJobID(), jarFiles, blobClient); + ClientUtils.setUserJarBlobKeys(jarBlobKeys, jobGraph); + + Collection> artifactBlobKeys = ClientUtils.uploadUserArtifacts(jobGraph.getJobID(), temporaryHack, blobClient); + ClientUtils.setUserArtifactBlobKeys(jobGraph, artifactBlobKeys); + } catch (IOException e) { + throw new CompletionException(new RestHandlerException( + "Could not upload job files.", +
[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6203#discussion_r197487718 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java --- @@ -54,18 +69,89 @@ public JobSubmitHandler( @Override protected CompletableFuture handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws RestHandlerException { - JobGraph jobGraph; - try { - ObjectInputStream objectIn = new ObjectInputStream(new ByteArrayInputStream(request.getRequestBody().serializedJobGraph)); - jobGraph = (JobGraph) objectIn.readObject(); - } catch (Exception e) { - throw new RestHandlerException( - "Failed to deserialize JobGraph.", - HttpResponseStatus.BAD_REQUEST, - e); + Collection uploadedFiles = request.getUploadedFiles(); + Map nameToFile = uploadedFiles.stream().collect(Collectors.toMap( + path -> path.getFileName().toString(), + entry -> entry + )); + + JobSubmitRequestBody requestBody = request.getRequestBody(); + + Path jobGraphFile = getPathAndAssertUpload(requestBody.jobGraphFileName, "JobGraph", nameToFile); + + Collection jarFiles = new ArrayList<>(requestBody.jarFileNames.size()); + for (String jarFileName : requestBody.jarFileNames) { + Path jarFile = getPathAndAssertUpload(jarFileName, "Jar", nameToFile); + jarFiles.add(new org.apache.flink.core.fs.Path(jarFile.toString())); + } + + Collection> artifacts = new ArrayList<>(requestBody.artifactFileNames.size()); + for (JobSubmitRequestBody.DistributedCacheFile artifactFileName : requestBody.artifactFileNames) { + Path artifactFile = getPathAndAssertUpload(artifactFileName.fileName, "Artifact", nameToFile); + artifacts.add(Tuple2.of(artifactFileName.entryName, new org.apache.flink.core.fs.Path(artifactFile.toString(; } - return gateway.submitJob(jobGraph, timeout) - .thenApply(ack -> new JobSubmitResponseBody("/jobs/" + jobGraph.getJobID())); + Map temporaryHack = artifacts.stream() + .collect(Collectors.toMap( + tuple -> tuple.f0, + // the actual entry definition is mostly irrelevant as only the blobkey is accessed + // blame whoever wrote the ClientUtils API + tuple -> new DistributedCache.DistributedCacheEntry(tuple.f1.toString(), false) + )); + + // TODO: use executor + CompletableFuture jobGraphFuture = CompletableFuture.supplyAsync(() -> { + JobGraph jobGraph; + try (ObjectInputStream objectIn = new ObjectInputStream(Files.newInputStream(jobGraphFile))) { + jobGraph = (JobGraph) objectIn.readObject(); + } catch (Exception e) { + throw new CompletionException(new RestHandlerException( + "Failed to deserialize JobGraph.", + HttpResponseStatus.BAD_REQUEST, + e)); + } + return jobGraph; + }); + + CompletableFuture blobServerPortFuture = gateway.getBlobServerPort(timeout); + + CompletableFuture finalizedJobGraphFuture = jobGraphFuture.thenCombine(blobServerPortFuture, (jobGraph, blobServerPort) -> { + final InetSocketAddress address = new InetSocketAddress(gateway.getHostname(), blobServerPort); + try (BlobClient blobClient = new BlobClient(address, new Configuration())) { + Collection jarBlobKeys = ClientUtils.uploadUserJars(jobGraph.getJobID(), jarFiles, blobClient); + ClientUtils.setUserJarBlobKeys(jarBlobKeys, jobGraph); + + Collection> artifactBlobKeys = ClientUtils.uploadUserArtifacts(jobGraph.getJobID(), temporaryHack, blobClient); + ClientUtils.setUserArtifactBlobKeys(jobGraph, artifactBlobKeys); + } catch (IOException e) { + throw new CompletionException(new RestHandlerException( + "Could not upload job files.", +
[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6203#discussion_r197477484 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java --- @@ -54,18 +69,89 @@ public JobSubmitHandler( @Override protected CompletableFuture handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws RestHandlerException { - JobGraph jobGraph; - try { - ObjectInputStream objectIn = new ObjectInputStream(new ByteArrayInputStream(request.getRequestBody().serializedJobGraph)); - jobGraph = (JobGraph) objectIn.readObject(); - } catch (Exception e) { - throw new RestHandlerException( - "Failed to deserialize JobGraph.", - HttpResponseStatus.BAD_REQUEST, - e); + Collection uploadedFiles = request.getUploadedFiles(); + Map nameToFile = uploadedFiles.stream().collect(Collectors.toMap( + path -> path.getFileName().toString(), + entry -> entry + )); + + JobSubmitRequestBody requestBody = request.getRequestBody(); + + Path jobGraphFile = getPathAndAssertUpload(requestBody.jobGraphFileName, "JobGraph", nameToFile); + + Collection jarFiles = new ArrayList<>(requestBody.jarFileNames.size()); + for (String jarFileName : requestBody.jarFileNames) { + Path jarFile = getPathAndAssertUpload(jarFileName, "Jar", nameToFile); + jarFiles.add(new org.apache.flink.core.fs.Path(jarFile.toString())); + } + + Collection> artifacts = new ArrayList<>(requestBody.artifactFileNames.size()); + for (JobSubmitRequestBody.DistributedCacheFile artifactFileName : requestBody.artifactFileNames) { + Path artifactFile = getPathAndAssertUpload(artifactFileName.fileName, "Artifact", nameToFile); + artifacts.add(Tuple2.of(artifactFileName.entryName, new org.apache.flink.core.fs.Path(artifactFile.toString(; } - return gateway.submitJob(jobGraph, timeout) - .thenApply(ack -> new JobSubmitResponseBody("/jobs/" + jobGraph.getJobID())); + Map temporaryHack = artifacts.stream() + .collect(Collectors.toMap( + tuple -> tuple.f0, + // the actual entry definition is mostly irrelevant as only the blobkey is accessed + // blame whoever wrote the ClientUtils API + tuple -> new DistributedCache.DistributedCacheEntry(tuple.f1.toString(), false) + )); + + // TODO: use executor + CompletableFuture jobGraphFuture = CompletableFuture.supplyAsync(() -> { + JobGraph jobGraph; + try (ObjectInputStream objectIn = new ObjectInputStream(Files.newInputStream(jobGraphFile))) { + jobGraph = (JobGraph) objectIn.readObject(); + } catch (Exception e) { + throw new CompletionException(new RestHandlerException( + "Failed to deserialize JobGraph.", + HttpResponseStatus.BAD_REQUEST, + e)); + } + return jobGraph; + }); + + CompletableFuture blobServerPortFuture = gateway.getBlobServerPort(timeout); + + CompletableFuture finalizedJobGraphFuture = jobGraphFuture.thenCombine(blobServerPortFuture, (jobGraph, blobServerPort) -> { + final InetSocketAddress address = new InetSocketAddress(gateway.getHostname(), blobServerPort); + try (BlobClient blobClient = new BlobClient(address, new Configuration())) { + Collection jarBlobKeys = ClientUtils.uploadUserJars(jobGraph.getJobID(), jarFiles, blobClient); + ClientUtils.setUserJarBlobKeys(jarBlobKeys, jobGraph); + + Collection> artifactBlobKeys = ClientUtils.uploadUserArtifacts(jobGraph.getJobID(), temporaryHack, blobClient); + ClientUtils.setUserArtifactBlobKeys(jobGraph, artifactBlobKeys); + } catch (IOException e) { + throw new CompletionException(new RestHandlerException( + "Could not upload job files.", +
[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6203#discussion_r197461746 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java --- @@ -54,18 +69,89 @@ public JobSubmitHandler( @Override protected CompletableFuture handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws RestHandlerException { - JobGraph jobGraph; - try { - ObjectInputStream objectIn = new ObjectInputStream(new ByteArrayInputStream(request.getRequestBody().serializedJobGraph)); - jobGraph = (JobGraph) objectIn.readObject(); - } catch (Exception e) { - throw new RestHandlerException( - "Failed to deserialize JobGraph.", - HttpResponseStatus.BAD_REQUEST, - e); + Collection uploadedFiles = request.getUploadedFiles(); + Map nameToFile = uploadedFiles.stream().collect(Collectors.toMap( + path -> path.getFileName().toString(), + entry -> entry + )); + + JobSubmitRequestBody requestBody = request.getRequestBody(); + + Path jobGraphFile = getPathAndAssertUpload(requestBody.jobGraphFileName, "JobGraph", nameToFile); + + Collection jarFiles = new ArrayList<>(requestBody.jarFileNames.size()); + for (String jarFileName : requestBody.jarFileNames) { + Path jarFile = getPathAndAssertUpload(jarFileName, "Jar", nameToFile); + jarFiles.add(new org.apache.flink.core.fs.Path(jarFile.toString())); + } + + Collection> artifacts = new ArrayList<>(requestBody.artifactFileNames.size()); + for (JobSubmitRequestBody.DistributedCacheFile artifactFileName : requestBody.artifactFileNames) { + Path artifactFile = getPathAndAssertUpload(artifactFileName.fileName, "Artifact", nameToFile); + artifacts.add(Tuple2.of(artifactFileName.entryName, new org.apache.flink.core.fs.Path(artifactFile.toString(; } - return gateway.submitJob(jobGraph, timeout) - .thenApply(ack -> new JobSubmitResponseBody("/jobs/" + jobGraph.getJobID())); + Map temporaryHack = artifacts.stream() + .collect(Collectors.toMap( + tuple -> tuple.f0, + // the actual entry definition is mostly irrelevant as only the blobkey is accessed + // blame whoever wrote the ClientUtils API + tuple -> new DistributedCache.DistributedCacheEntry(tuple.f1.toString(), false) + )); + + // TODO: use executor + CompletableFuture jobGraphFuture = CompletableFuture.supplyAsync(() -> { + JobGraph jobGraph; + try (ObjectInputStream objectIn = new ObjectInputStream(Files.newInputStream(jobGraphFile))) { + jobGraph = (JobGraph) objectIn.readObject(); + } catch (Exception e) { + throw new CompletionException(new RestHandlerException( + "Failed to deserialize JobGraph.", + HttpResponseStatus.BAD_REQUEST, + e)); + } + return jobGraph; + }); + + CompletableFuture blobServerPortFuture = gateway.getBlobServerPort(timeout); + + CompletableFuture finalizedJobGraphFuture = jobGraphFuture.thenCombine(blobServerPortFuture, (jobGraph, blobServerPort) -> { + final InetSocketAddress address = new InetSocketAddress(gateway.getHostname(), blobServerPort); + try (BlobClient blobClient = new BlobClient(address, new Configuration())) { + Collection jarBlobKeys = ClientUtils.uploadUserJars(jobGraph.getJobID(), jarFiles, blobClient); + ClientUtils.setUserJarBlobKeys(jarBlobKeys, jobGraph); + + Collection> artifactBlobKeys = ClientUtils.uploadUserArtifacts(jobGraph.getJobID(), temporaryHack, blobClient); + ClientUtils.setUserArtifactBlobKeys(jobGraph, artifactBlobKeys); + } catch (IOException e) { + throw new CompletionException(new RestHandlerException( + "Could not upload job files.", +
[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6203#discussion_r197456037 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java --- @@ -54,18 +69,89 @@ public JobSubmitHandler( @Override protected CompletableFuture handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws RestHandlerException { - JobGraph jobGraph; - try { - ObjectInputStream objectIn = new ObjectInputStream(new ByteArrayInputStream(request.getRequestBody().serializedJobGraph)); - jobGraph = (JobGraph) objectIn.readObject(); - } catch (Exception e) { - throw new RestHandlerException( - "Failed to deserialize JobGraph.", - HttpResponseStatus.BAD_REQUEST, - e); + Collection uploadedFiles = request.getUploadedFiles(); + Map nameToFile = uploadedFiles.stream().collect(Collectors.toMap( + path -> path.getFileName().toString(), + entry -> entry + )); + + JobSubmitRequestBody requestBody = request.getRequestBody(); + + Path jobGraphFile = getPathAndAssertUpload(requestBody.jobGraphFileName, "JobGraph", nameToFile); + + Collection jarFiles = new ArrayList<>(requestBody.jarFileNames.size()); + for (String jarFileName : requestBody.jarFileNames) { + Path jarFile = getPathAndAssertUpload(jarFileName, "Jar", nameToFile); + jarFiles.add(new org.apache.flink.core.fs.Path(jarFile.toString())); + } + + Collection> artifacts = new ArrayList<>(requestBody.artifactFileNames.size()); + for (JobSubmitRequestBody.DistributedCacheFile artifactFileName : requestBody.artifactFileNames) { + Path artifactFile = getPathAndAssertUpload(artifactFileName.fileName, "Artifact", nameToFile); + artifacts.add(Tuple2.of(artifactFileName.entryName, new org.apache.flink.core.fs.Path(artifactFile.toString(; } - return gateway.submitJob(jobGraph, timeout) - .thenApply(ack -> new JobSubmitResponseBody("/jobs/" + jobGraph.getJobID())); + Map temporaryHack = artifacts.stream() + .collect(Collectors.toMap( + tuple -> tuple.f0, + // the actual entry definition is mostly irrelevant as only the blobkey is accessed + // blame whoever wrote the ClientUtils API + tuple -> new DistributedCache.DistributedCacheEntry(tuple.f1.toString(), false) + )); + + // TODO: use executor + CompletableFuture jobGraphFuture = CompletableFuture.supplyAsync(() -> { + JobGraph jobGraph; + try (ObjectInputStream objectIn = new ObjectInputStream(Files.newInputStream(jobGraphFile))) { + jobGraph = (JobGraph) objectIn.readObject(); + } catch (Exception e) { + throw new CompletionException(new RestHandlerException( + "Failed to deserialize JobGraph.", + HttpResponseStatus.BAD_REQUEST, + e)); + } + return jobGraph; + }); + + CompletableFuture blobServerPortFuture = gateway.getBlobServerPort(timeout); + + CompletableFuture finalizedJobGraphFuture = jobGraphFuture.thenCombine(blobServerPortFuture, (jobGraph, blobServerPort) -> { + final InetSocketAddress address = new InetSocketAddress(gateway.getHostname(), blobServerPort); + try (BlobClient blobClient = new BlobClient(address, new Configuration())) { + Collection jarBlobKeys = ClientUtils.uploadUserJars(jobGraph.getJobID(), jarFiles, blobClient); + ClientUtils.setUserJarBlobKeys(jarBlobKeys, jobGraph); + + Collection> artifactBlobKeys = ClientUtils.uploadUserArtifacts(jobGraph.getJobID(), temporaryHack, blobClient); + ClientUtils.setUserArtifactBlobKeys(jobGraph, artifactBlobKeys); + } catch (IOException e) { + throw new CompletionException(new RestHandlerException( + "Could not upload job files.", +