Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4918#discussion_r147594412 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java --- @@ -446,19 +449,67 @@ public void onComplete(Throwable failure, T success) throws Throwable { return result; } - // ------------------------------------------------------------------------ - // Future Completed with an exception. - // ------------------------------------------------------------------------ + /** + * Times the given future out after the timeout. + * + * @param future to time out + * @param timeout after which the given future is timed out + * @param timeUnit time unit of the timeout + * @param <T> type of the given future + * @return The timeout enriched future + */ + public static <T> CompletableFuture<T> orTimeout(CompletableFuture<T> future, long timeout, TimeUnit timeUnit) { + final ScheduledFuture<?> timeoutFuture = Delayer.delay(new Timeout(future), timeout, timeUnit); + + future.whenComplete((T value, Throwable throwable) -> { + if (!timeoutFuture.isDone()) { + timeoutFuture.cancel(false); + } + }); + + return future; + } /** - * Returns a {@link CompletableFuture} that has failed with the exception - * provided as argument. - * @param throwable the exception to fail the future with. - * @return The failed future. + * Runnable to complete the given future with a {@link TimeoutException}. */ - public static <T> CompletableFuture<T> getFailedFuture(Throwable throwable) { - CompletableFuture<T> failedAttempt = new CompletableFuture<>(); - failedAttempt.completeExceptionally(throwable); - return failedAttempt; + static final class Timeout implements Runnable { + + private final CompletableFuture<?> future; + + Timeout(CompletableFuture<?> future) { + this.future = Preconditions.checkNotNull(future); + } + + @Override + public void run() { + future.completeExceptionally(new TimeoutException()); + } + } + + /** + * Delay scheduler used to timeout futures. + * + * <p>This class creates a singleton scheduler used to run the provided actions. + */ + private static final class Delayer { + static final ScheduledThreadPoolExecutor delayer = new ScheduledThreadPoolExecutor( + 1, + new ExecutorThreadFactory("CompletableFutureDelayScheduler")); --- End diff -- Let's add a "Flink" prefix to the thread name.
---