Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4918#discussion_r147594412
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
 ---
    @@ -446,19 +449,67 @@ public void onComplete(Throwable failure, T success) 
throws Throwable {
                return result;
        }
     
    -   // 
------------------------------------------------------------------------
    -   //  Future Completed with an exception.
    -   // 
------------------------------------------------------------------------
    +   /**
    +    * Times the given future out after the timeout.
    +    *
    +    * @param future to time out
    +    * @param timeout after which the given future is timed out
    +    * @param timeUnit time unit of the timeout
    +    * @param <T> type of the given future
    +    * @return The timeout enriched future
    +    */
    +   public static <T> CompletableFuture<T> orTimeout(CompletableFuture<T> 
future, long timeout, TimeUnit timeUnit) {
    +           final ScheduledFuture<?> timeoutFuture = Delayer.delay(new 
Timeout(future), timeout, timeUnit);
    +
    +           future.whenComplete((T value, Throwable throwable) -> {
    +                   if (!timeoutFuture.isDone()) {
    +                           timeoutFuture.cancel(false);
    +                   }
    +           });
    +
    +           return future;
    +   }
     
        /**
    -    * Returns a {@link CompletableFuture} that has failed with the 
exception
    -    * provided as argument.
    -    * @param throwable the exception to fail the future with.
    -    * @return The failed future.
    +    * Runnable to complete the given future with a {@link 
TimeoutException}.
         */
    -   public static <T> CompletableFuture<T> getFailedFuture(Throwable 
throwable) {
    -           CompletableFuture<T> failedAttempt = new CompletableFuture<>();
    -           failedAttempt.completeExceptionally(throwable);
    -           return failedAttempt;
    +   static final class Timeout implements Runnable {
    +
    +           private final CompletableFuture<?> future;
    +
    +           Timeout(CompletableFuture<?> future) {
    +                   this.future = Preconditions.checkNotNull(future);
    +           }
    +
    +           @Override
    +           public void run() {
    +                   future.completeExceptionally(new TimeoutException());
    +           }
    +   }
    +
    +   /**
    +    * Delay scheduler used to timeout futures.
    +    *
    +    * <p>This class creates a singleton scheduler used to run the provided 
actions.
    +    */
    +   private static final class Delayer {
    +           static final ScheduledThreadPoolExecutor delayer = new 
ScheduledThreadPoolExecutor(
    +                   1,
    +                   new 
ExecutorThreadFactory("CompletableFutureDelayScheduler"));
    --- End diff --
    
    Let's add a "Flink" prefix to the thread name.


---

Reply via email to