Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6203#discussion_r197452051
  
    --- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
    @@ -317,43 +315,51 @@ public JobSubmissionResult submitJob(JobGraph 
jobGraph, ClassLoader classLoader)
                // we have to enable queued scheduling because slot will be 
allocated lazily
                jobGraph.setAllowQueuedScheduling(true);
     
    -           log.info("Requesting blob server port.");
    -           CompletableFuture<BlobServerPortResponseBody> portFuture = 
sendRequest(BlobServerPortHeaders.getInstance());
    -
    -           CompletableFuture<JobGraph> jobUploadFuture = 
portFuture.thenCombine(
    -                   getDispatcherAddress(),
    -                   (BlobServerPortResponseBody response, String 
dispatcherAddress) -> {
    -                           final int blobServerPort = response.port;
    -                           final InetSocketAddress address = new 
InetSocketAddress(dispatcherAddress, blobServerPort);
    +           CompletableFuture<JobSubmitResponseBody> submissionFuture = 
CompletableFuture.supplyAsync(
    +                   () -> {
    +                           log.info("Submitting job graph.");
     
    -                           List<Path> userJars = jobGraph.getUserJars();
                                Map<String, 
DistributedCache.DistributedCacheEntry> userArtifacts = 
jobGraph.getUserArtifacts();
    -                           if (!userJars.isEmpty() || 
!userArtifacts.isEmpty()) {
    -                                   try (BlobClient client = new 
BlobClient(address, flinkConfig)) {
    -                                           log.info("Uploading jar 
files.");
    -                                           
ClientUtils.uploadAndSetUserJars(jobGraph, client);
    -                                           log.info("Uploading jar 
artifacts.");
    -                                           
ClientUtils.uploadAndSetUserArtifacts(jobGraph, client);
    -                                   } catch (IOException ioe) {
    -                                           throw new 
CompletionException(new FlinkException("Could not upload job files.", ioe));
    +
    +                           List<String> jarFileNames = new ArrayList<>(8);
    +                           List<JobSubmitRequestBody.DistributedCacheFile> 
artifactFileNames = new ArrayList<>(8);
    +                           Collection<FileUpload> filesToUpload = new 
ArrayList<>(8);
    +
    +                           // TODO: need configurable location
    +                           final String jobGraphFileName;
    +                           try {
    +                                   final java.nio.file.Path tempFile = 
Files.createTempFile("flink-jobgraph", ".bin");
    +                                   try (OutputStream fileOut = 
Files.newOutputStream(tempFile)) {
    +                                           try (ObjectOutputStream 
objectOut = new ObjectOutputStream(fileOut)) {
    +                                                   
objectOut.writeObject(jobGraph);
    +                                           }
                                        }
    +                                   filesToUpload.add(new 
FileUpload(tempFile, RestConstants.CONTENT_TYPE_BINARY));
    +                                   jobGraphFileName = 
tempFile.getFileName().toString();
    +                           } catch (IOException e) {
    +                                   throw new RuntimeException("lol", e);
                                }
     
    -                           return jobGraph;
    -                   });
    -
    -           CompletableFuture<JobSubmitResponseBody> submissionFuture = 
jobUploadFuture.thenCompose(
    -                   (JobGraph jobGraphToSubmit) -> {
    -                           log.info("Submitting job graph.");
    +                           for (Path jar : jobGraph.getUserJars()) {
    +                                   jarFileNames.add(jar.getName());
    +                                   filesToUpload.add(new 
FileUpload(Paths.get(jar.toUri()), RestConstants.CONTENT_TYPE_JAR));
    +                           }
     
    -                           try {
    -                                   return sendRequest(
    -                                           JobSubmitHeaders.getInstance(),
    -                                           new 
JobSubmitRequestBody(jobGraph));
    -                           } catch (IOException ioe) {
    -                                   throw new CompletionException(new 
FlinkException("Could not create JobSubmitRequestBody.", ioe));
    +                           for (Map.Entry<String, 
DistributedCache.DistributedCacheEntry> artifacts : 
jobGraph.getUserArtifacts().entrySet()) {
    +                                   artifactFileNames.add(new 
JobSubmitRequestBody.DistributedCacheFile(artifacts.getKey(), 
artifacts.getValue().filePath));
    +                                   filesToUpload.add(new 
FileUpload(Paths.get(artifacts.getValue().filePath), 
RestConstants.CONTENT_TYPE_BINARY));
                                }
    -                   });
    +
    +                           return sendRetriableRequest(
    +                                   JobSubmitHeaders.getInstance(),
    +                                   EmptyMessageParameters.getInstance(),
    +                                   new JobSubmitRequestBody(
    +                                           jobGraphFileName,
    +                                           jarFileNames,
    +                                           artifactFileNames),
    +                                   filesToUpload,
    +                                   
isConnectionProblemOrServiceUnavailable());
    --- End diff --
    
    I think it would be better to clean up the generated `JobGraph` file after 
we've sent the request.


---

Reply via email to