[ https://issues.apache.org/jira/browse/FLINK-8344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16335441#comment-16335441 ]
ASF GitHub Bot commented on FLINK-8344: --------------------------------------- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5312#discussion_r163160791 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java --- @@ -376,4 +430,99 @@ public GetClusterStatusResponse getClusterStatus() { public int getMaxSlots() { return 0; } + + //------------------------------------------------------------------------- + // RestClient Helper + //------------------------------------------------------------------------- + + private <M extends MessageHeaders<EmptyRequestBody, P, U>, U extends MessageParameters, P extends ResponseBody> CompletableFuture<P> + sendRequest(M messageHeaders, U messageParameters) throws IOException, LeaderNotAvailableException { + return sendRequest(messageHeaders, messageParameters, EmptyRequestBody.getInstance()); + } + + private <M extends MessageHeaders<R, P, EmptyMessageParameters>, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> + sendRequest(M messageHeaders, R request) throws IOException, LeaderNotAvailableException { + return sendRequest(messageHeaders, EmptyMessageParameters.getInstance(), request); + } + + private <M extends MessageHeaders<EmptyRequestBody, P, EmptyMessageParameters>, P extends ResponseBody> CompletableFuture<P> + sendRequest(M messageHeaders) throws IOException, LeaderNotAvailableException { + return sendRequest(messageHeaders, EmptyMessageParameters.getInstance(), EmptyRequestBody.getInstance()); + } + + private <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> + sendRequest(M messageHeaders, U messageParameters, R request) throws IOException, LeaderNotAvailableException { + final URL restServerBaseUrl = restServerLeaderHolder.getLeaderAddress(); + return restClient.sendRequest(restServerBaseUrl.getHost(), restServerBaseUrl.getPort(), messageHeaders, messageParameters, request); + } + + private <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> + sendRetryableRequest(M messageHeaders, U messageParameters, R request, Predicate<Throwable> retryPredicate) { + return retry(() -> { + final URL restServerBaseUrl = restServerLeaderHolder.getLeaderAddress(); + return restClient.sendRequest(restServerBaseUrl.getHost(), restServerBaseUrl.getPort(), messageHeaders, messageParameters, request); + }, retryPredicate); + } + + private <C> CompletableFuture<C> retry( + CheckedSupplier<CompletableFuture<C>> operation, + Predicate<Throwable> retryPredicate) { + return FutureUtils.retryWithDelay( + CheckedSupplier.unchecked(operation), + MAX_RETRIES, + RETRY_DELAY, + retryPredicate, + new ScheduledExecutorServiceAdapter(retryExecutorService)); + } + + private static Predicate<Throwable> isTimeoutException() { + return (throwable) -> + ExceptionUtils.findThrowable(throwable, java.net.ConnectException.class).isPresent() || + ExceptionUtils.findThrowable(throwable, java.net.SocketTimeoutException.class).isPresent() || + ExceptionUtils.findThrowable(throwable, ConnectTimeoutException.class).isPresent() || + ExceptionUtils.findThrowable(throwable, IOException.class).isPresent(); --- End diff -- renamed to `isConnectionProblemException` > Add support for HA to RestClusterClient > --------------------------------------- > > Key: FLINK-8344 > URL: https://issues.apache.org/jira/browse/FLINK-8344 > Project: Flink > Issue Type: Improvement > Components: Client > Affects Versions: 1.5.0 > Reporter: Till Rohrmann > Assignee: Gary Yao > Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > The {{RestClusterClient}} must be able to deal with changing JobMasters in > case of HA. We have to add functionality to reconnect to a newly elected > leader in case of HA. -- This message was sent by Atlassian JIRA (v7.6.3#76005)