Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5312#discussion_r162581631 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java --- @@ -376,4 +430,99 @@ public GetClusterStatusResponse getClusterStatus() { public int getMaxSlots() { return 0; } + + //------------------------------------------------------------------------- + // RestClient Helper + //------------------------------------------------------------------------- + + private <M extends MessageHeaders<EmptyRequestBody, P, U>, U extends MessageParameters, P extends ResponseBody> CompletableFuture<P> + sendRequest(M messageHeaders, U messageParameters) throws IOException, LeaderNotAvailableException { + return sendRequest(messageHeaders, messageParameters, EmptyRequestBody.getInstance()); + } + + private <M extends MessageHeaders<R, P, EmptyMessageParameters>, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> + sendRequest(M messageHeaders, R request) throws IOException, LeaderNotAvailableException { + return sendRequest(messageHeaders, EmptyMessageParameters.getInstance(), request); + } + + private <M extends MessageHeaders<EmptyRequestBody, P, EmptyMessageParameters>, P extends ResponseBody> CompletableFuture<P> + sendRequest(M messageHeaders) throws IOException, LeaderNotAvailableException { + return sendRequest(messageHeaders, EmptyMessageParameters.getInstance(), EmptyRequestBody.getInstance()); + } + + private <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> + sendRequest(M messageHeaders, U messageParameters, R request) throws IOException, LeaderNotAvailableException { + final URL restServerBaseUrl = restServerLeaderHolder.getLeaderAddress(); + return restClient.sendRequest(restServerBaseUrl.getHost(), restServerBaseUrl.getPort(), messageHeaders, messageParameters, request); + } + + private <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> + sendRetryableRequest(M messageHeaders, U messageParameters, R request, Predicate<Throwable> retryPredicate) { + return retry(() -> { + final URL restServerBaseUrl = restServerLeaderHolder.getLeaderAddress(); + return restClient.sendRequest(restServerBaseUrl.getHost(), restServerBaseUrl.getPort(), messageHeaders, messageParameters, request); + }, retryPredicate); + } + + private <C> CompletableFuture<C> retry( + CheckedSupplier<CompletableFuture<C>> operation, + Predicate<Throwable> retryPredicate) { + return FutureUtils.retryWithDelay( + CheckedSupplier.unchecked(operation), + MAX_RETRIES, + RETRY_DELAY, + retryPredicate, + new ScheduledExecutorServiceAdapter(retryExecutorService)); + } + + private static Predicate<Throwable> isTimeoutException() { + return (throwable) -> + ExceptionUtils.findThrowable(throwable, java.net.ConnectException.class).isPresent() || + ExceptionUtils.findThrowable(throwable, java.net.SocketTimeoutException.class).isPresent() || + ExceptionUtils.findThrowable(throwable, ConnectTimeoutException.class).isPresent() || + ExceptionUtils.findThrowable(throwable, IOException.class).isPresent(); + } + + private static Predicate<Throwable> isHttpStatusUnsuccessfulException() { + return (throwable) -> ExceptionUtils.findThrowable(throwable, RestClientException.class) + .map(restClientException -> { + final int code = restClientException.getHttpResponseStatus().code(); + return code < 200 || code > 299; + }) + .orElse(false); + } + + private abstract class RestClusterClientLeaderRetrievalListener implements LeaderRetrievalListener { + @Override + public final void handleError(final Exception exception) { + log.error("Exception in LeaderRetrievalListener", exception); + shutdown(); --- End diff -- It's not good to call `shutdown()` here because some code in `CliFrontend` may wait forever due to some threadpools being shutdown. One could just null the leader address and let the rest client fail.
---