Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5312#discussion_r164074804
  
    --- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
    @@ -258,89 +312,96 @@ public String cancelWithSavepoint(JobID jobId, 
@Nullable String savepointDirecto
     
                final CompletableFuture<SavepointTriggerResponseBody> 
responseFuture;
     
    -           try {
    -                   responseFuture = restClient.sendRequest(
    -                           
restClusterClientConfiguration.getRestServerAddress(),
    -                           
restClusterClientConfiguration.getRestServerPort(),
    -                           savepointTriggerHeaders,
    -                           savepointTriggerMessageParameters,
    -                           new 
SavepointTriggerRequestBody(savepointDirectory));
    -           } catch (IOException e) {
    -                   throw new FlinkException("Could not send trigger 
savepoint request to Flink cluster.", e);
    -           }
    +           responseFuture = sendRequest(
    +                   savepointTriggerHeaders,
    +                   savepointTriggerMessageParameters,
    +                   new SavepointTriggerRequestBody(savepointDirectory));
     
    -           return responseFuture.thenApply(savepointTriggerResponseBody -> 
{
    +           return responseFuture.thenCompose(savepointTriggerResponseBody 
-> {
                        final SavepointTriggerId savepointTriggerId = 
savepointTriggerResponseBody.getSavepointTriggerId();
    -                   final SavepointInfo savepointInfo;
    -                   try {
    -                           savepointInfo = 
waitForSavepointCompletion(jobId, savepointTriggerId);
    -                   } catch (Exception e) {
    -                           throw new CompletionException(e);
    -                   }
    +                   return waitForSavepointCompletion(jobId, 
savepointTriggerId);
    +           }).thenApply(savepointInfo -> {
                        if (savepointInfo.getFailureCause() != null) {
                                throw new 
CompletionException(savepointInfo.getFailureCause());
                        }
                        return savepointInfo.getLocation();
                });
        }
     
    -   private SavepointInfo waitForSavepointCompletion(
    +   private CompletableFuture<SavepointInfo> waitForSavepointCompletion(
                        final JobID jobId,
    -                   final SavepointTriggerId savepointTriggerId) throws 
Exception {
    +                   final SavepointTriggerId savepointTriggerId) {
                return waitForResource(() -> {
                        final SavepointStatusHeaders savepointStatusHeaders = 
SavepointStatusHeaders.getInstance();
                        final SavepointStatusMessageParameters 
savepointStatusMessageParameters =
                                
savepointStatusHeaders.getUnresolvedMessageParameters();
                        
savepointStatusMessageParameters.jobIdPathParameter.resolve(jobId);
                        
savepointStatusMessageParameters.savepointTriggerIdPathParameter.resolve(savepointTriggerId);
    -                   return restClient.sendRequest(
    -                           
restClusterClientConfiguration.getRestServerAddress(),
    -                           
restClusterClientConfiguration.getRestServerPort(),
    +                   return sendRetryableRequest(
                                savepointStatusHeaders,
    -                           savepointStatusMessageParameters
    -                   );
    +                           savepointStatusMessageParameters,
    +                           EmptyRequestBody.getInstance(),
    +                           isConnectionProblemException());
                });
        }
     
        @Override
        public CompletableFuture<Collection<JobStatusMessage>> listJobs() 
throws Exception {
    -           JobsOverviewHeaders headers = JobsOverviewHeaders.getInstance();
    -           CompletableFuture<MultipleJobsDetails> jobDetailsFuture = 
restClient.sendRequest(
    -                   restClusterClientConfiguration.getRestServerAddress(),
    -                   restClusterClientConfiguration.getRestServerPort(),
    -                   headers
    -           );
    -           return jobDetailsFuture
    +           return sendRequest(JobsOverviewHeaders.getInstance())
                        .thenApply(
    -                           (MultipleJobsDetails multipleJobsDetails) -> {
    -                                   final Collection<JobDetails> jobDetails 
= multipleJobsDetails.getJobs();
    -                                   Collection<JobStatusMessage> 
flattenedDetails = new ArrayList<>(jobDetails.size());
    -                                   jobDetails.forEach(detail -> 
flattenedDetails.add(new JobStatusMessage(detail.getJobId(), 
detail.getJobName(), detail.getStatus(), detail.getStartTime())));
    -
    -                                   return flattenedDetails;
    -                   });
    +                           (multipleJobsDetails) -> multipleJobsDetails
    +                                   .getJobs()
    +                                   .stream()
    +                                   .map(detail -> new JobStatusMessage(
    +                                           detail.getJobId(),
    +                                           detail.getJobName(),
    +                                           detail.getStatus(),
    +                                           detail.getStartTime()))
    +                                   .collect(Collectors.toList()));
        }
     
        @Override
        public T getClusterId() {
                return clusterId;
        }
     
    -   private <R, A extends AsynchronouslyCreatedResource<R>> R 
waitForResource(
    -                   final SupplierWithException<CompletableFuture<A>, 
IOException> resourceFutureSupplier)
    -                           throws IOException, InterruptedException, 
ExecutionException, TimeoutException {
    -           A asynchronouslyCreatedResource;
    -           long attempt = 0;
    -           while (true) {
    -                   final CompletableFuture<A> responseFuture = 
resourceFutureSupplier.get();
    -                   asynchronouslyCreatedResource = 
responseFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
    -                   if (asynchronouslyCreatedResource.queueStatus().getId() 
== QueueStatus.Id.COMPLETED) {
    -                           break;
    +   /**
    +    * Polls a {@code AsynchronouslyCreatedResource} until its
    +    * {@link AsynchronouslyCreatedResource#queueStatus() QueueStatus} 
becomes
    +    * {@link QueueStatus.Id#COMPLETED COMPLETED}. This method returns a 
{@code CompletableFuture}
    +    * which completes with {@link 
AsynchronouslyCreatedResource#resource()}.
    +    *
    +    * @param resourceFutureSupplier The operation which polls for the
    +    *                               {@code AsynchronouslyCreatedResource}.
    +    * @param <R>                    The type of the resource.
    +    * @param <A>                    The type of the {@code 
AsynchronouslyCreatedResource}.
    +    * @return A {@code CompletableFuture} delivering the resource.
    +    */
    +   private <R, A extends AsynchronouslyCreatedResource<R>> 
CompletableFuture<R> waitForResource(
    --- End diff --
    
    It's now `pollResourceAsync`. This is consistent with the methods in 
`CompletableFuture`.


---

Reply via email to