[ https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531618#comment-16531618 ]
ASF GitHub Bot commented on FLINK-9280: --------------------------------------- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6203#discussion_r199867852 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java --- @@ -315,36 +315,61 @@ public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) // we have to enable queued scheduling because slot will be allocated lazily jobGraph.setAllowQueuedScheduling(true); - log.info("Requesting blob server port."); - CompletableFuture<BlobServerPortResponseBody> portFuture = sendRequest(BlobServerPortHeaders.getInstance()); + CompletableFuture<java.nio.file.Path> jobGraphFileFuture = CompletableFuture.supplyAsync(() -> { + try { + final java.nio.file.Path jobGraphFile = Files.createTempFile("flink-jobgraph", ".bin"); + try (ObjectOutputStream objectOut = new ObjectOutputStream(Files.newOutputStream(jobGraphFile))) { + objectOut.writeObject(jobGraph); + } + return jobGraphFile; + } catch (IOException e) { + throw new CompletionException(new FlinkException("Failed to serialize JobGraph.", e)); + } + }, executorService); - CompletableFuture<JobGraph> jobUploadFuture = portFuture.thenCombine( - getDispatcherAddress(), - (BlobServerPortResponseBody response, String dispatcherAddress) -> { - final int blobServerPort = response.port; - final InetSocketAddress address = new InetSocketAddress(dispatcherAddress, blobServerPort); + CompletableFuture<Tuple2<JobSubmitRequestBody, Collection<FileUpload>>> requestFuture = jobGraphFileFuture.thenApply(jobGraphFile -> { + List<String> jarFileNames = new ArrayList<>(8); + List<JobSubmitRequestBody.DistributedCacheFile> artifactFileNames = new ArrayList<>(8); + Collection<FileUpload> filesToUpload = new ArrayList<>(8); - try { - ClientUtils.uploadJobGraphFiles(jobGraph, () -> new BlobClient(address, flinkConfig)); - } catch (Exception e) { - throw new CompletionException(e); - } + filesToUpload.add(new FileUpload(jobGraphFile, RestConstants.CONTENT_TYPE_BINARY)); - return jobGraph; - }); + for (Path jar : jobGraph.getUserJars()) { + jarFileNames.add(jar.getName()); + filesToUpload.add(new FileUpload(Paths.get(jar.toUri()), RestConstants.CONTENT_TYPE_JAR)); + } - CompletableFuture<JobSubmitResponseBody> submissionFuture = jobUploadFuture.thenCompose( - (JobGraph jobGraphToSubmit) -> { - log.info("Submitting job graph."); + for (Map.Entry<String, DistributedCache.DistributedCacheEntry> artifacts : jobGraph.getUserArtifacts().entrySet()) { + artifactFileNames.add(new JobSubmitRequestBody.DistributedCacheFile(artifacts.getKey(), new Path(artifacts.getValue().filePath).getName())); + filesToUpload.add(new FileUpload(Paths.get(artifacts.getValue().filePath), RestConstants.CONTENT_TYPE_BINARY)); + } - try { - return sendRequest( - JobSubmitHeaders.getInstance(), - new JobSubmitRequestBody(jobGraph)); - } catch (IOException ioe) { - throw new CompletionException(new FlinkException("Could not create JobSubmitRequestBody.", ioe)); - } - }); + final JobSubmitRequestBody requestBody = new JobSubmitRequestBody( + jobGraphFile.getFileName().toString(), + jarFileNames, + artifactFileNames); + + return Tuple2.of(requestBody, Collections.unmodifiableCollection(filesToUpload)); + }); + + final CompletableFuture<JobSubmitResponseBody> submissionFuture = requestFuture.thenCompose( + requestAndFileUploads -> sendRetriableRequest( + JobSubmitHeaders.getInstance(), + EmptyMessageParameters.getInstance(), + requestAndFileUploads.f0, + requestAndFileUploads.f1, + isConnectionProblemOrServiceUnavailable()) + ); + + submissionFuture + .thenCombine(jobGraphFileFuture, (ignored, jobGraphFile) -> jobGraphFile) + .thenAccept(jobGraphFile -> { --- End diff -- ah right, i did it like this so I don't have to return anything in thenCombine(). I could of course return `null`, but that doesn't seem right, any returning anything else isn't necessary. @tillrohrmann What do you think? > Extend JobSubmitHandler to accept jar files > ------------------------------------------- > > Key: FLINK-9280 > URL: https://issues.apache.org/jira/browse/FLINK-9280 > Project: Flink > Issue Type: New Feature > Components: Job-Submission, REST > Affects Versions: 1.5.0 > Reporter: Chesnay Schepler > Assignee: Chesnay Schepler > Priority: Critical > Labels: pull-request-available > Fix For: 1.6.0, 1.5.1 > > > The job submission through the CLI first uploads all require jars to the blob > server, sets the blob keys in the jobgraph, and then uploads this graph to > The {{JobSubmitHandler}} which submits it to the Dispatcher. > This process has the downside that it requires jars to be uploaded to the > blobserver before submitting the job graph, which does not happen via REST. > I propose an extension to the the {{JobSubmitHandler}} to also accept an > optional list of jar files, that were previously uploaded through the > {{JarUploadHandler}}. If present, the handler would upload these jars to the > blobserver and set the blob keys. -- This message was sent by Atlassian JIRA (v7.6.3#76005)