[ 
https://issues.apache.org/jira/browse/FLINK-8344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16338968#comment-16338968
 ] 

ASF GitHub Bot commented on FLINK-8344:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5312#discussion_r163775424
  
    --- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
    @@ -258,89 +312,96 @@ public String cancelWithSavepoint(JobID jobId, 
@Nullable String savepointDirecto
     
                final CompletableFuture<SavepointTriggerResponseBody> 
responseFuture;
     
    -           try {
    -                   responseFuture = restClient.sendRequest(
    -                           
restClusterClientConfiguration.getRestServerAddress(),
    -                           
restClusterClientConfiguration.getRestServerPort(),
    -                           savepointTriggerHeaders,
    -                           savepointTriggerMessageParameters,
    -                           new 
SavepointTriggerRequestBody(savepointDirectory));
    -           } catch (IOException e) {
    -                   throw new FlinkException("Could not send trigger 
savepoint request to Flink cluster.", e);
    -           }
    +           responseFuture = sendRequest(
    +                   savepointTriggerHeaders,
    +                   savepointTriggerMessageParameters,
    +                   new SavepointTriggerRequestBody(savepointDirectory));
     
    -           return responseFuture.thenApply(savepointTriggerResponseBody -> 
{
    +           return responseFuture.thenCompose(savepointTriggerResponseBody 
-> {
                        final SavepointTriggerId savepointTriggerId = 
savepointTriggerResponseBody.getSavepointTriggerId();
    -                   final SavepointInfo savepointInfo;
    -                   try {
    -                           savepointInfo = 
waitForSavepointCompletion(jobId, savepointTriggerId);
    -                   } catch (Exception e) {
    -                           throw new CompletionException(e);
    -                   }
    +                   return waitForSavepointCompletion(jobId, 
savepointTriggerId);
    +           }).thenApply(savepointInfo -> {
                        if (savepointInfo.getFailureCause() != null) {
                                throw new 
CompletionException(savepointInfo.getFailureCause());
                        }
                        return savepointInfo.getLocation();
                });
        }
     
    -   private SavepointInfo waitForSavepointCompletion(
    +   private CompletableFuture<SavepointInfo> waitForSavepointCompletion(
    --- End diff --
    
    Isn't this more `getSavepointFuture` instead of waiting? Waiting indicates 
for me that I wait for the completion of a future.


> Add support for HA to RestClusterClient
> ---------------------------------------
>
>                 Key: FLINK-8344
>                 URL: https://issues.apache.org/jira/browse/FLINK-8344
>             Project: Flink
>          Issue Type: Improvement
>          Components: Client
>    Affects Versions: 1.5.0
>            Reporter: Till Rohrmann
>            Assignee: Gary Yao
>            Priority: Major
>              Labels: flip-6
>             Fix For: 1.5.0
>
>
> The {{RestClusterClient}} must be able to deal with changing JobMasters in 
> case of HA. We have to add functionality to reconnect to a newly elected 
> leader in case of HA.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to