Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5589#discussion_r171226341
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
 ---
    @@ -223,6 +223,78 @@
                }
        }
     
    +   /**
    +    * Retry the given operation with the given delay in between successful 
completions where the
    +    * result does not match a given predicate.
    +    *
    +    * @param operation to retry
    +    * @param retries number of retries
    +    * @param retryDelay delay between retries
    +    * @param retryPredicate Predicate to test whether the result is 
acceptable
    +    * @param scheduledExecutor executor to be used for the retry operation
    +    * @param <T> type of the result
    +    * @return Future which retries the given operation a given amount of 
times and delays the retry
    +    *   in case the predicate isn't matched
    +    */
    +   public static <T> CompletableFuture<T> retrySuccesfulWithDelay(
    +           final Supplier<CompletableFuture<T>> operation,
    +           final long retries,
    +           final Time retryDelay,
    +           final Predicate<T> retryPredicate,
    +           final ScheduledExecutor scheduledExecutor) {
    +
    +           final CompletableFuture<T> resultFuture = new 
CompletableFuture<>();
    +
    +           retrySuccessfulOperationWithDelay(
    +                   resultFuture,
    +                   operation,
    +                   retries,
    +                   retryDelay,
    +                   retryPredicate,
    +                   scheduledExecutor);
    +
    +           return resultFuture;
    +   }
    +
    +   private static <T> void retrySuccessfulOperationWithDelay(
    +           final CompletableFuture<T> resultFuture,
    +           final Supplier<CompletableFuture<T>> operation,
    +           final long retries,
    +           final Time retryDelay,
    +           final Predicate<T> retryPredicate,
    +           final ScheduledExecutor scheduledExecutor) {
    +
    +           if (!resultFuture.isDone()) {
    +                   final CompletableFuture<T> operationResultFuture = 
operation.get();
    +
    +                   operationResultFuture.whenComplete(
    +                           (t, throwable) -> {
    +                                   if (throwable != null) {
    +                                           if (throwable instanceof 
CancellationException) {
    +                                                   
resultFuture.completeExceptionally(new RetryException("Operation future was 
cancelled.", throwable));
    +                                           } else {
    +                                                   
resultFuture.completeExceptionally(throwable);
    +                                           }
    +                                   } else {
    +                                           if (retries > 0 && 
!retryPredicate.test(t)) {
    +                                                   final 
ScheduledFuture<?> scheduledFuture = scheduledExecutor.schedule(
    +                                                           () -> 
retrySuccessfulOperationWithDelay(resultFuture, operation, retries - 1, 
retryDelay, retryPredicate, scheduledExecutor),
    +                                                           
retryDelay.toMilliseconds(),
    +                                                           
TimeUnit.MILLISECONDS);
    +
    +                                                   
resultFuture.whenComplete(
    +                                                           (innerT, 
innerThrowable) -> scheduledFuture.cancel(false));
    +                                           } else {
    +                                                   
resultFuture.complete(t);
    --- End diff --
    
    So if the retries are exhausted we submit the last returned operation 
result?


---

Reply via email to