Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5312#discussion_r163160791 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java --- @@ -376,4 +430,99 @@ public GetClusterStatusResponse getClusterStatus() { public int getMaxSlots() { return 0; } + + //------------------------------------------------------------------------- + // RestClient Helper + //------------------------------------------------------------------------- + + private <M extends MessageHeaders<EmptyRequestBody, P, U>, U extends MessageParameters, P extends ResponseBody> CompletableFuture<P> + sendRequest(M messageHeaders, U messageParameters) throws IOException, LeaderNotAvailableException { + return sendRequest(messageHeaders, messageParameters, EmptyRequestBody.getInstance()); + } + + private <M extends MessageHeaders<R, P, EmptyMessageParameters>, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> + sendRequest(M messageHeaders, R request) throws IOException, LeaderNotAvailableException { + return sendRequest(messageHeaders, EmptyMessageParameters.getInstance(), request); + } + + private <M extends MessageHeaders<EmptyRequestBody, P, EmptyMessageParameters>, P extends ResponseBody> CompletableFuture<P> + sendRequest(M messageHeaders) throws IOException, LeaderNotAvailableException { + return sendRequest(messageHeaders, EmptyMessageParameters.getInstance(), EmptyRequestBody.getInstance()); + } + + private <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> + sendRequest(M messageHeaders, U messageParameters, R request) throws IOException, LeaderNotAvailableException { + final URL restServerBaseUrl = restServerLeaderHolder.getLeaderAddress(); + return restClient.sendRequest(restServerBaseUrl.getHost(), restServerBaseUrl.getPort(), messageHeaders, messageParameters, request); + } + + private <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> + sendRetryableRequest(M messageHeaders, U messageParameters, R request, Predicate<Throwable> retryPredicate) { + return retry(() -> { + final URL restServerBaseUrl = restServerLeaderHolder.getLeaderAddress(); + return restClient.sendRequest(restServerBaseUrl.getHost(), restServerBaseUrl.getPort(), messageHeaders, messageParameters, request); + }, retryPredicate); + } + + private <C> CompletableFuture<C> retry( + CheckedSupplier<CompletableFuture<C>> operation, + Predicate<Throwable> retryPredicate) { + return FutureUtils.retryWithDelay( + CheckedSupplier.unchecked(operation), + MAX_RETRIES, + RETRY_DELAY, + retryPredicate, + new ScheduledExecutorServiceAdapter(retryExecutorService)); + } + + private static Predicate<Throwable> isTimeoutException() { + return (throwable) -> + ExceptionUtils.findThrowable(throwable, java.net.ConnectException.class).isPresent() || + ExceptionUtils.findThrowable(throwable, java.net.SocketTimeoutException.class).isPresent() || + ExceptionUtils.findThrowable(throwable, ConnectTimeoutException.class).isPresent() || + ExceptionUtils.findThrowable(throwable, IOException.class).isPresent(); --- End diff -- renamed to `isConnectionProblemException`
---