[GitHub] flink pull request #4918: [FLINK-7940] Add FutureUtils.orTimeout
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4918 ---
[GitHub] flink pull request #4918: [FLINK-7940] Add FutureUtils.orTimeout
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4918#discussion_r147658822 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java --- @@ -446,19 +449,67 @@ public void onComplete(Throwable failure, T success) throws Throwable { return result; } - // - // Future Completed with an exception. - // + /** +* Times the given future out after the timeout. +* +* @param future to time out +* @param timeout after which the given future is timed out +* @param timeUnit time unit of the timeout +* @param type of the given future +* @return The timeout enriched future +*/ + public static CompletableFuture orTimeout(CompletableFuture future, long timeout, TimeUnit timeUnit) { + final ScheduledFuture timeoutFuture = Delayer.delay(new Timeout(future), timeout, timeUnit); + + future.whenComplete((T value, Throwable throwable) -> { + if (!timeoutFuture.isDone()) { + timeoutFuture.cancel(false); + } + }); + + return future; + } /** -* Returns a {@link CompletableFuture} that has failed with the exception -* provided as argument. -* @param throwable the exception to fail the future with. -* @return The failed future. +* Runnable to complete the given future with a {@link TimeoutException}. */ - public static CompletableFuture getFailedFuture(Throwable throwable) { - CompletableFuture failedAttempt = new CompletableFuture<>(); - failedAttempt.completeExceptionally(throwable); - return failedAttempt; + static final class Timeout implements Runnable { + + private final CompletableFuture future; + + Timeout(CompletableFuture future) { + this.future = Preconditions.checkNotNull(future); + } + + @Override + public void run() { + future.completeExceptionally(new TimeoutException()); + } + } + + /** +* Delay scheduler used to timeout futures. +* +* This class creates a singleton scheduler used to run the provided actions. +*/ + private static final class Delayer { + static final ScheduledThreadPoolExecutor delayer = new ScheduledThreadPoolExecutor( + 1, + new ExecutorThreadFactory("CompletableFutureDelayScheduler")); --- End diff -- Good point. Will add it. ---
[GitHub] flink pull request #4918: [FLINK-7940] Add FutureUtils.orTimeout
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4918#discussion_r147658765 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java --- @@ -446,19 +449,67 @@ public void onComplete(Throwable failure, T success) throws Throwable { return result; } - // - // Future Completed with an exception. - // + /** +* Times the given future out after the timeout. +* +* @param future to time out +* @param timeout after which the given future is timed out +* @param timeUnit time unit of the timeout +* @param type of the given future +* @return The timeout enriched future +*/ + public static CompletableFuture orTimeout(CompletableFuture future, long timeout, TimeUnit timeUnit) { + final ScheduledFuture timeoutFuture = Delayer.delay(new Timeout(future), timeout, timeUnit); + + future.whenComplete((T value, Throwable throwable) -> { + if (!timeoutFuture.isDone()) { + timeoutFuture.cancel(false); + } + }); + + return future; + } /** -* Returns a {@link CompletableFuture} that has failed with the exception -* provided as argument. -* @param throwable the exception to fail the future with. -* @return The failed future. +* Runnable to complete the given future with a {@link TimeoutException}. */ - public static CompletableFuture getFailedFuture(Throwable throwable) { - CompletableFuture failedAttempt = new CompletableFuture<>(); - failedAttempt.completeExceptionally(throwable); - return failedAttempt; + static final class Timeout implements Runnable { + + private final CompletableFuture future; + + Timeout(CompletableFuture future) { + this.future = Preconditions.checkNotNull(future); + } + + @Override + public void run() { + future.completeExceptionally(new TimeoutException()); --- End diff -- Not entirely sure, since people might use this to disambiguate different timeouts from each other. I rather not offer this possibility. ---
[GitHub] flink pull request #4918: [FLINK-7940] Add FutureUtils.orTimeout
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4918#discussion_r147658453 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java --- @@ -446,19 +449,67 @@ public void onComplete(Throwable failure, T success) throws Throwable { return result; } - // - // Future Completed with an exception. - // + /** +* Times the given future out after the timeout. +* +* @param future to time out +* @param timeout after which the given future is timed out +* @param timeUnit time unit of the timeout +* @param type of the given future +* @return The timeout enriched future +*/ + public static CompletableFuture orTimeout(CompletableFuture future, long timeout, TimeUnit timeUnit) { + final ScheduledFuture timeoutFuture = Delayer.delay(new Timeout(future), timeout, timeUnit); + + future.whenComplete((T value, Throwable throwable) -> { + if (!timeoutFuture.isDone()) { + timeoutFuture.cancel(false); + } + }); + + return future; + } /** -* Returns a {@link CompletableFuture} that has failed with the exception -* provided as argument. -* @param throwable the exception to fail the future with. -* @return The failed future. +* Runnable to complete the given future with a {@link TimeoutException}. */ - public static CompletableFuture getFailedFuture(Throwable throwable) { - CompletableFuture failedAttempt = new CompletableFuture<>(); - failedAttempt.completeExceptionally(throwable); - return failedAttempt; + static final class Timeout implements Runnable { --- End diff -- Yes will do. ---
[GitHub] flink pull request #4918: [FLINK-7940] Add FutureUtils.orTimeout
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4918#discussion_r147594462 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java --- @@ -446,19 +449,67 @@ public void onComplete(Throwable failure, T success) throws Throwable { return result; } - // - // Future Completed with an exception. - // + /** +* Times the given future out after the timeout. +* +* @param future to time out +* @param timeout after which the given future is timed out +* @param timeUnit time unit of the timeout +* @param type of the given future +* @return The timeout enriched future +*/ + public static CompletableFuture orTimeout(CompletableFuture future, long timeout, TimeUnit timeUnit) { + final ScheduledFuture timeoutFuture = Delayer.delay(new Timeout(future), timeout, timeUnit); + + future.whenComplete((T value, Throwable throwable) -> { + if (!timeoutFuture.isDone()) { + timeoutFuture.cancel(false); + } + }); + + return future; + } /** -* Returns a {@link CompletableFuture} that has failed with the exception -* provided as argument. -* @param throwable the exception to fail the future with. -* @return The failed future. +* Runnable to complete the given future with a {@link TimeoutException}. */ - public static CompletableFuture getFailedFuture(Throwable throwable) { - CompletableFuture failedAttempt = new CompletableFuture<>(); - failedAttempt.completeExceptionally(throwable); - return failedAttempt; + static final class Timeout implements Runnable { --- End diff -- make private? ---
[GitHub] flink pull request #4918: [FLINK-7940] Add FutureUtils.orTimeout
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4918#discussion_r147594496 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java --- @@ -446,19 +449,67 @@ public void onComplete(Throwable failure, T success) throws Throwable { return result; } - // - // Future Completed with an exception. - // + /** +* Times the given future out after the timeout. +* +* @param future to time out +* @param timeout after which the given future is timed out +* @param timeUnit time unit of the timeout +* @param type of the given future +* @return The timeout enriched future +*/ + public static CompletableFuture orTimeout(CompletableFuture future, long timeout, TimeUnit timeUnit) { + final ScheduledFuture timeoutFuture = Delayer.delay(new Timeout(future), timeout, timeUnit); + + future.whenComplete((T value, Throwable throwable) -> { + if (!timeoutFuture.isDone()) { + timeoutFuture.cancel(false); + } + }); + + return future; + } /** -* Returns a {@link CompletableFuture} that has failed with the exception -* provided as argument. -* @param throwable the exception to fail the future with. -* @return The failed future. +* Runnable to complete the given future with a {@link TimeoutException}. */ - public static CompletableFuture getFailedFuture(Throwable throwable) { - CompletableFuture failedAttempt = new CompletableFuture<>(); - failedAttempt.completeExceptionally(throwable); - return failedAttempt; + static final class Timeout implements Runnable { + + private final CompletableFuture future; + + Timeout(CompletableFuture future) { + this.future = Preconditions.checkNotNull(future); + } + + @Override + public void run() { + future.completeExceptionally(new TimeoutException()); --- End diff -- Could be useful to have to used timeout in the exception message. ---
[GitHub] flink pull request #4918: [FLINK-7940] Add FutureUtils.orTimeout
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4918#discussion_r147594412 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java --- @@ -446,19 +449,67 @@ public void onComplete(Throwable failure, T success) throws Throwable { return result; } - // - // Future Completed with an exception. - // + /** +* Times the given future out after the timeout. +* +* @param future to time out +* @param timeout after which the given future is timed out +* @param timeUnit time unit of the timeout +* @param type of the given future +* @return The timeout enriched future +*/ + public static CompletableFuture orTimeout(CompletableFuture future, long timeout, TimeUnit timeUnit) { + final ScheduledFuture timeoutFuture = Delayer.delay(new Timeout(future), timeout, timeUnit); + + future.whenComplete((T value, Throwable throwable) -> { + if (!timeoutFuture.isDone()) { + timeoutFuture.cancel(false); + } + }); + + return future; + } /** -* Returns a {@link CompletableFuture} that has failed with the exception -* provided as argument. -* @param throwable the exception to fail the future with. -* @return The failed future. +* Runnable to complete the given future with a {@link TimeoutException}. */ - public static CompletableFuture getFailedFuture(Throwable throwable) { - CompletableFuture failedAttempt = new CompletableFuture<>(); - failedAttempt.completeExceptionally(throwable); - return failedAttempt; + static final class Timeout implements Runnable { + + private final CompletableFuture future; + + Timeout(CompletableFuture future) { + this.future = Preconditions.checkNotNull(future); + } + + @Override + public void run() { + future.completeExceptionally(new TimeoutException()); + } + } + + /** +* Delay scheduler used to timeout futures. +* +* This class creates a singleton scheduler used to run the provided actions. +*/ + private static final class Delayer { + static final ScheduledThreadPoolExecutor delayer = new ScheduledThreadPoolExecutor( + 1, + new ExecutorThreadFactory("CompletableFutureDelayScheduler")); --- End diff -- Let's add a "Flink" prefix to the thread name. ---
[GitHub] flink pull request #4918: [FLINK-7940] Add FutureUtils.orTimeout
GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/4918 [FLINK-7940] Add FutureUtils.orTimeout ## What is the purpose of the change This commit adds a convenience function `FutureUtils#orTimeout` which allows to easily add a timeout to a CompletableFuture. ## Verifying this change - Added `FutureUtilsTest#testOrTimeout` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink orTimeout Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4918.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4918 commit 83e52f5458379855c481fe169f6be50a8afee336 Author: Till RohrmannDate: 2017-10-29T15:01:18Z [hotfix] Remove redundant FutureUtils#getFailedFuture FutureUtils#completedExceptionally does exactly the same. commit 94b3d14bc2ba64a4862bd83e670f3dbfccdf96b8 Author: Till Rohrmann Date: 2017-10-29T15:38:53Z [FLINK-7940] Add FutureUtils.orTimeout This commit adds a convenience function which allows to easily add a timeout to a CompletableFuture. ---