Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6199#discussion_r197279550 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java --- @@ -323,17 +325,18 @@ public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) (BlobServerPortResponseBody response, String dispatcherAddress) -> { final int blobServerPort = response.port; final InetSocketAddress address = new InetSocketAddress(dispatcherAddress, blobServerPort); - final List<PermanentBlobKey> keys; - try { - log.info("Uploading jar files."); - keys = BlobClient.uploadFiles(address, flinkConfig, jobGraph.getJobID(), jobGraph.getUserJars()); - jobGraph.uploadUserArtifacts(address, flinkConfig); - } catch (IOException ioe) { - throw new CompletionException(new FlinkException("Could not upload job files.", ioe)); - } - for (PermanentBlobKey key : keys) { - jobGraph.addUserJarBlobKey(key); + List<Path> userJars = jobGraph.getUserJars(); + Map<String, DistributedCache.DistributedCacheEntry> userArtifacts = jobGraph.getUserArtifacts(); --- End diff -- this entire block is effectively duplicated in several classes and could also be moved to `ClientUtils`, but I wasn't sure whether this wouldn't put too much logic into a single method,
---