Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5312#discussion_r162581631
  
    --- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
    @@ -376,4 +430,99 @@ public GetClusterStatusResponse getClusterStatus() {
        public int getMaxSlots() {
                return 0;
        }
    +
    +   
//-------------------------------------------------------------------------
    +   // RestClient Helper
    +   
//-------------------------------------------------------------------------
    +
    +   private <M extends MessageHeaders<EmptyRequestBody, P, U>, U extends 
MessageParameters, P extends ResponseBody> CompletableFuture<P>
    +                   sendRequest(M messageHeaders, U messageParameters) 
throws IOException, LeaderNotAvailableException {
    +           return sendRequest(messageHeaders, messageParameters, 
EmptyRequestBody.getInstance());
    +   }
    +
    +   private <M extends MessageHeaders<R, P, EmptyMessageParameters>, R 
extends RequestBody, P extends ResponseBody> CompletableFuture<P>
    +                   sendRequest(M messageHeaders, R request) throws 
IOException, LeaderNotAvailableException {
    +           return sendRequest(messageHeaders, 
EmptyMessageParameters.getInstance(), request);
    +   }
    +
    +   private <M extends MessageHeaders<EmptyRequestBody, P, 
EmptyMessageParameters>, P extends ResponseBody> CompletableFuture<P>
    +                   sendRequest(M messageHeaders) throws IOException, 
LeaderNotAvailableException {
    +           return sendRequest(messageHeaders, 
EmptyMessageParameters.getInstance(), EmptyRequestBody.getInstance());
    +   }
    +
    +   private <M extends MessageHeaders<R, P, U>, U extends 
MessageParameters, R extends RequestBody, P extends ResponseBody> 
CompletableFuture<P>
    +                   sendRequest(M messageHeaders, U messageParameters, R 
request) throws IOException, LeaderNotAvailableException {
    +           final URL restServerBaseUrl = 
restServerLeaderHolder.getLeaderAddress();
    +           return restClient.sendRequest(restServerBaseUrl.getHost(), 
restServerBaseUrl.getPort(), messageHeaders, messageParameters, request);
    +   }
    +
    +   private <M extends MessageHeaders<R, P, U>, U extends 
MessageParameters, R extends RequestBody, P extends ResponseBody> 
CompletableFuture<P>
    +                   sendRetryableRequest(M messageHeaders, U 
messageParameters, R request, Predicate<Throwable> retryPredicate) {
    +           return retry(() -> {
    +                   final URL restServerBaseUrl = 
restServerLeaderHolder.getLeaderAddress();
    +                   return 
restClient.sendRequest(restServerBaseUrl.getHost(), 
restServerBaseUrl.getPort(), messageHeaders, messageParameters, request);
    +           }, retryPredicate);
    +   }
    +
    +   private <C> CompletableFuture<C> retry(
    +                   CheckedSupplier<CompletableFuture<C>> operation,
    +                   Predicate<Throwable> retryPredicate) {
    +           return FutureUtils.retryWithDelay(
    +                   CheckedSupplier.unchecked(operation),
    +                   MAX_RETRIES,
    +                   RETRY_DELAY,
    +                   retryPredicate,
    +                   new 
ScheduledExecutorServiceAdapter(retryExecutorService));
    +   }
    +
    +   private static Predicate<Throwable> isTimeoutException() {
    +           return (throwable) ->
    +                   ExceptionUtils.findThrowable(throwable, 
java.net.ConnectException.class).isPresent() ||
    +                           ExceptionUtils.findThrowable(throwable, 
java.net.SocketTimeoutException.class).isPresent() ||
    +                           ExceptionUtils.findThrowable(throwable, 
ConnectTimeoutException.class).isPresent() ||
    +                           ExceptionUtils.findThrowable(throwable, 
IOException.class).isPresent();
    +   }
    +
    +   private static Predicate<Throwable> isHttpStatusUnsuccessfulException() 
{
    +           return (throwable) -> ExceptionUtils.findThrowable(throwable, 
RestClientException.class)
    +                           .map(restClientException -> {
    +                                   final int code = 
restClientException.getHttpResponseStatus().code();
    +                                   return code < 200 || code > 299;
    +                           })
    +                           .orElse(false);
    +   }
    +
    +   private abstract class RestClusterClientLeaderRetrievalListener 
implements LeaderRetrievalListener {
    +           @Override
    +           public final void handleError(final Exception exception) {
    +                   log.error("Exception in LeaderRetrievalListener", 
exception);
    +                   shutdown();
    --- End diff --
    
    It's not good to call `shutdown()` here because some code in `CliFrontend` 
may wait forever due to some threadpools being shutdown. One could just null 
the leader address and let the rest client fail.


---

Reply via email to