Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6203#discussion_r197452051 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java --- @@ -317,43 +315,51 @@ public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) // we have to enable queued scheduling because slot will be allocated lazily jobGraph.setAllowQueuedScheduling(true); - log.info("Requesting blob server port."); - CompletableFuture<BlobServerPortResponseBody> portFuture = sendRequest(BlobServerPortHeaders.getInstance()); - - CompletableFuture<JobGraph> jobUploadFuture = portFuture.thenCombine( - getDispatcherAddress(), - (BlobServerPortResponseBody response, String dispatcherAddress) -> { - final int blobServerPort = response.port; - final InetSocketAddress address = new InetSocketAddress(dispatcherAddress, blobServerPort); + CompletableFuture<JobSubmitResponseBody> submissionFuture = CompletableFuture.supplyAsync( + () -> { + log.info("Submitting job graph."); - List<Path> userJars = jobGraph.getUserJars(); Map<String, DistributedCache.DistributedCacheEntry> userArtifacts = jobGraph.getUserArtifacts(); - if (!userJars.isEmpty() || !userArtifacts.isEmpty()) { - try (BlobClient client = new BlobClient(address, flinkConfig)) { - log.info("Uploading jar files."); - ClientUtils.uploadAndSetUserJars(jobGraph, client); - log.info("Uploading jar artifacts."); - ClientUtils.uploadAndSetUserArtifacts(jobGraph, client); - } catch (IOException ioe) { - throw new CompletionException(new FlinkException("Could not upload job files.", ioe)); + + List<String> jarFileNames = new ArrayList<>(8); + List<JobSubmitRequestBody.DistributedCacheFile> artifactFileNames = new ArrayList<>(8); + Collection<FileUpload> filesToUpload = new ArrayList<>(8); + + // TODO: need configurable location + final String jobGraphFileName; + try { + final java.nio.file.Path tempFile = Files.createTempFile("flink-jobgraph", ".bin"); + try (OutputStream fileOut = Files.newOutputStream(tempFile)) { + try (ObjectOutputStream objectOut = new ObjectOutputStream(fileOut)) { + objectOut.writeObject(jobGraph); + } } + filesToUpload.add(new FileUpload(tempFile, RestConstants.CONTENT_TYPE_BINARY)); + jobGraphFileName = tempFile.getFileName().toString(); + } catch (IOException e) { + throw new RuntimeException("lol", e); } - return jobGraph; - }); - - CompletableFuture<JobSubmitResponseBody> submissionFuture = jobUploadFuture.thenCompose( - (JobGraph jobGraphToSubmit) -> { - log.info("Submitting job graph."); + for (Path jar : jobGraph.getUserJars()) { + jarFileNames.add(jar.getName()); + filesToUpload.add(new FileUpload(Paths.get(jar.toUri()), RestConstants.CONTENT_TYPE_JAR)); + } - try { - return sendRequest( - JobSubmitHeaders.getInstance(), - new JobSubmitRequestBody(jobGraph)); - } catch (IOException ioe) { - throw new CompletionException(new FlinkException("Could not create JobSubmitRequestBody.", ioe)); + for (Map.Entry<String, DistributedCache.DistributedCacheEntry> artifacts : jobGraph.getUserArtifacts().entrySet()) { + artifactFileNames.add(new JobSubmitRequestBody.DistributedCacheFile(artifacts.getKey(), artifacts.getValue().filePath)); + filesToUpload.add(new FileUpload(Paths.get(artifacts.getValue().filePath), RestConstants.CONTENT_TYPE_BINARY)); } - }); + + return sendRetriableRequest( + JobSubmitHeaders.getInstance(), + EmptyMessageParameters.getInstance(), + new JobSubmitRequestBody( + jobGraphFileName, + jarFileNames, + artifactFileNames), + filesToUpload, + isConnectionProblemOrServiceUnavailable()); --- End diff -- I think it would be better to clean up the generated `JobGraph` file after we've sent the request.
---