Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6203#discussion_r199178551
  
    --- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
    @@ -315,36 +315,58 @@ public JobSubmissionResult submitJob(JobGraph 
jobGraph, ClassLoader classLoader)
                // we have to enable queued scheduling because slot will be 
allocated lazily
                jobGraph.setAllowQueuedScheduling(true);
     
    -           log.info("Requesting blob server port.");
    -           CompletableFuture<BlobServerPortResponseBody> portFuture = 
sendRequest(BlobServerPortHeaders.getInstance());
    +           CompletableFuture<JobSubmitResponseBody> submissionFuture = 
CompletableFuture.supplyAsync(
    +                   () -> {
    +                           log.info("Submitting job graph.");
     
    -           CompletableFuture<JobGraph> jobUploadFuture = 
portFuture.thenCombine(
    -                   getDispatcherAddress(),
    -                   (BlobServerPortResponseBody response, String 
dispatcherAddress) -> {
    -                           final int blobServerPort = response.port;
    -                           final InetSocketAddress address = new 
InetSocketAddress(dispatcherAddress, blobServerPort);
    +                           List<String> jarFileNames = new ArrayList<>(8);
    +                           List<JobSubmitRequestBody.DistributedCacheFile> 
artifactFileNames = new ArrayList<>(8);
    +                           Collection<FileUpload> filesToUpload = new 
ArrayList<>(8);
     
    +                           // TODO: need configurable location
    +                           final java.nio.file.Path jobGraphFile;
                                try {
    -                                   
ClientUtils.uploadJobGraphFiles(jobGraph, () -> new BlobClient(address, 
flinkConfig));
    -                           } catch (Exception e) {
    -                                   throw new CompletionException(e);
    +                                   jobGraphFile = 
Files.createTempFile("flink-jobgraph", ".bin");
    +                                   try (OutputStream fileOut = 
Files.newOutputStream(jobGraphFile)) {
    +                                           try (ObjectOutputStream 
objectOut = new ObjectOutputStream(fileOut)) {
    --- End diff --
    
    I think we could combine these to `try` statement to `new 
ObjectOutputStream(Files.newOutputStream(jobGraphFile))`


---

Reply via email to