[ 
https://issues.apache.org/jira/browse/FLINK-9624?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16520324#comment-16520324
 ] 

ASF GitHub Bot commented on FLINK-9624:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6199#discussion_r197433655
  
    --- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
    @@ -323,17 +325,18 @@ public JobSubmissionResult submitJob(JobGraph 
jobGraph, ClassLoader classLoader)
                        (BlobServerPortResponseBody response, String 
dispatcherAddress) -> {
                                final int blobServerPort = response.port;
                                final InetSocketAddress address = new 
InetSocketAddress(dispatcherAddress, blobServerPort);
    -                           final List<PermanentBlobKey> keys;
    -                           try {
    -                                   log.info("Uploading jar files.");
    -                                   keys = BlobClient.uploadFiles(address, 
flinkConfig, jobGraph.getJobID(), jobGraph.getUserJars());
    -                                   jobGraph.uploadUserArtifacts(address, 
flinkConfig);
    -                           } catch (IOException ioe) {
    -                                   throw new CompletionException(new 
FlinkException("Could not upload job files.", ioe));
    -                           }
     
    -                           for (PermanentBlobKey key : keys) {
    -                                   jobGraph.addUserJarBlobKey(key);
    +                           List<Path> userJars = jobGraph.getUserJars();
    +                           Map<String, 
DistributedCache.DistributedCacheEntry> userArtifacts = 
jobGraph.getUserArtifacts();
    +                           if (!userJars.isEmpty() || 
!userArtifacts.isEmpty()) {
    +                                   try (BlobClient client = new 
BlobClient(address, flinkConfig)) {
    --- End diff --
    
    I would be in favour of having a `ClientUtils#uploadJobGraphFiles(jobGraph, 
flinkConfig, Supplier<BlobClient>)` which basically does what's being done here.


> Move jar/artifact upload logic out of JobGraph
> ----------------------------------------------
>
>                 Key: FLINK-9624
>                 URL: https://issues.apache.org/jira/browse/FLINK-9624
>             Project: Flink
>          Issue Type: Improvement
>          Components: Job-Submission
>    Affects Versions: 1.6.0
>            Reporter: Chesnay Schepler
>            Assignee: Chesnay Schepler
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.6.0
>
>
> The {{JobGraph}} offers utility methods for uploading jars and artifacts to 
> the BlobService.
> However, how these files are uploaded isn't a concern of theĀ {{JobGraph}} but 
> the submission-method, like theĀ {{RestClusterClient}}.
> These methods should be moved into a utility class.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to