[GitHub] flink pull request #4918: [FLINK-7940] Add FutureUtils.orTimeout

2017-10-30 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4918


---


[GitHub] flink pull request #4918: [FLINK-7940] Add FutureUtils.orTimeout

2017-10-30 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4918#discussion_r147658822
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
 ---
@@ -446,19 +449,67 @@ public void onComplete(Throwable failure, T success) 
throws Throwable {
return result;
}
 
-   // 

-   //  Future Completed with an exception.
-   // 

+   /**
+* Times the given future out after the timeout.
+*
+* @param future to time out
+* @param timeout after which the given future is timed out
+* @param timeUnit time unit of the timeout
+* @param  type of the given future
+* @return The timeout enriched future
+*/
+   public static  CompletableFuture orTimeout(CompletableFuture 
future, long timeout, TimeUnit timeUnit) {
+   final ScheduledFuture timeoutFuture = Delayer.delay(new 
Timeout(future), timeout, timeUnit);
+
+   future.whenComplete((T value, Throwable throwable) -> {
+   if (!timeoutFuture.isDone()) {
+   timeoutFuture.cancel(false);
+   }
+   });
+
+   return future;
+   }
 
/**
-* Returns a {@link CompletableFuture} that has failed with the 
exception
-* provided as argument.
-* @param throwable the exception to fail the future with.
-* @return The failed future.
+* Runnable to complete the given future with a {@link 
TimeoutException}.
 */
-   public static  CompletableFuture getFailedFuture(Throwable 
throwable) {
-   CompletableFuture failedAttempt = new CompletableFuture<>();
-   failedAttempt.completeExceptionally(throwable);
-   return failedAttempt;
+   static final class Timeout implements Runnable {
+
+   private final CompletableFuture future;
+
+   Timeout(CompletableFuture future) {
+   this.future = Preconditions.checkNotNull(future);
+   }
+
+   @Override
+   public void run() {
+   future.completeExceptionally(new TimeoutException());
+   }
+   }
+
+   /**
+* Delay scheduler used to timeout futures.
+*
+* This class creates a singleton scheduler used to run the provided 
actions.
+*/
+   private static final class Delayer {
+   static final ScheduledThreadPoolExecutor delayer = new 
ScheduledThreadPoolExecutor(
+   1,
+   new 
ExecutorThreadFactory("CompletableFutureDelayScheduler"));
--- End diff --

Good point. Will add it.


---


[GitHub] flink pull request #4918: [FLINK-7940] Add FutureUtils.orTimeout

2017-10-30 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4918#discussion_r147658765
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
 ---
@@ -446,19 +449,67 @@ public void onComplete(Throwable failure, T success) 
throws Throwable {
return result;
}
 
-   // 

-   //  Future Completed with an exception.
-   // 

+   /**
+* Times the given future out after the timeout.
+*
+* @param future to time out
+* @param timeout after which the given future is timed out
+* @param timeUnit time unit of the timeout
+* @param  type of the given future
+* @return The timeout enriched future
+*/
+   public static  CompletableFuture orTimeout(CompletableFuture 
future, long timeout, TimeUnit timeUnit) {
+   final ScheduledFuture timeoutFuture = Delayer.delay(new 
Timeout(future), timeout, timeUnit);
+
+   future.whenComplete((T value, Throwable throwable) -> {
+   if (!timeoutFuture.isDone()) {
+   timeoutFuture.cancel(false);
+   }
+   });
+
+   return future;
+   }
 
/**
-* Returns a {@link CompletableFuture} that has failed with the 
exception
-* provided as argument.
-* @param throwable the exception to fail the future with.
-* @return The failed future.
+* Runnable to complete the given future with a {@link 
TimeoutException}.
 */
-   public static  CompletableFuture getFailedFuture(Throwable 
throwable) {
-   CompletableFuture failedAttempt = new CompletableFuture<>();
-   failedAttempt.completeExceptionally(throwable);
-   return failedAttempt;
+   static final class Timeout implements Runnable {
+
+   private final CompletableFuture future;
+
+   Timeout(CompletableFuture future) {
+   this.future = Preconditions.checkNotNull(future);
+   }
+
+   @Override
+   public void run() {
+   future.completeExceptionally(new TimeoutException());
--- End diff --

Not entirely sure, since people might use this to disambiguate different 
timeouts from each other. I rather not offer this possibility.


---


[GitHub] flink pull request #4918: [FLINK-7940] Add FutureUtils.orTimeout

2017-10-30 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4918#discussion_r147658453
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
 ---
@@ -446,19 +449,67 @@ public void onComplete(Throwable failure, T success) 
throws Throwable {
return result;
}
 
-   // 

-   //  Future Completed with an exception.
-   // 

+   /**
+* Times the given future out after the timeout.
+*
+* @param future to time out
+* @param timeout after which the given future is timed out
+* @param timeUnit time unit of the timeout
+* @param  type of the given future
+* @return The timeout enriched future
+*/
+   public static  CompletableFuture orTimeout(CompletableFuture 
future, long timeout, TimeUnit timeUnit) {
+   final ScheduledFuture timeoutFuture = Delayer.delay(new 
Timeout(future), timeout, timeUnit);
+
+   future.whenComplete((T value, Throwable throwable) -> {
+   if (!timeoutFuture.isDone()) {
+   timeoutFuture.cancel(false);
+   }
+   });
+
+   return future;
+   }
 
/**
-* Returns a {@link CompletableFuture} that has failed with the 
exception
-* provided as argument.
-* @param throwable the exception to fail the future with.
-* @return The failed future.
+* Runnable to complete the given future with a {@link 
TimeoutException}.
 */
-   public static  CompletableFuture getFailedFuture(Throwable 
throwable) {
-   CompletableFuture failedAttempt = new CompletableFuture<>();
-   failedAttempt.completeExceptionally(throwable);
-   return failedAttempt;
+   static final class Timeout implements Runnable {
--- End diff --

Yes will do.


---


[GitHub] flink pull request #4918: [FLINK-7940] Add FutureUtils.orTimeout

2017-10-29 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4918#discussion_r147594462
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
 ---
@@ -446,19 +449,67 @@ public void onComplete(Throwable failure, T success) 
throws Throwable {
return result;
}
 
-   // 

-   //  Future Completed with an exception.
-   // 

+   /**
+* Times the given future out after the timeout.
+*
+* @param future to time out
+* @param timeout after which the given future is timed out
+* @param timeUnit time unit of the timeout
+* @param  type of the given future
+* @return The timeout enriched future
+*/
+   public static  CompletableFuture orTimeout(CompletableFuture 
future, long timeout, TimeUnit timeUnit) {
+   final ScheduledFuture timeoutFuture = Delayer.delay(new 
Timeout(future), timeout, timeUnit);
+
+   future.whenComplete((T value, Throwable throwable) -> {
+   if (!timeoutFuture.isDone()) {
+   timeoutFuture.cancel(false);
+   }
+   });
+
+   return future;
+   }
 
/**
-* Returns a {@link CompletableFuture} that has failed with the 
exception
-* provided as argument.
-* @param throwable the exception to fail the future with.
-* @return The failed future.
+* Runnable to complete the given future with a {@link 
TimeoutException}.
 */
-   public static  CompletableFuture getFailedFuture(Throwable 
throwable) {
-   CompletableFuture failedAttempt = new CompletableFuture<>();
-   failedAttempt.completeExceptionally(throwable);
-   return failedAttempt;
+   static final class Timeout implements Runnable {
--- End diff --

make private?


---


[GitHub] flink pull request #4918: [FLINK-7940] Add FutureUtils.orTimeout

2017-10-29 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4918#discussion_r147594496
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
 ---
@@ -446,19 +449,67 @@ public void onComplete(Throwable failure, T success) 
throws Throwable {
return result;
}
 
-   // 

-   //  Future Completed with an exception.
-   // 

+   /**
+* Times the given future out after the timeout.
+*
+* @param future to time out
+* @param timeout after which the given future is timed out
+* @param timeUnit time unit of the timeout
+* @param  type of the given future
+* @return The timeout enriched future
+*/
+   public static  CompletableFuture orTimeout(CompletableFuture 
future, long timeout, TimeUnit timeUnit) {
+   final ScheduledFuture timeoutFuture = Delayer.delay(new 
Timeout(future), timeout, timeUnit);
+
+   future.whenComplete((T value, Throwable throwable) -> {
+   if (!timeoutFuture.isDone()) {
+   timeoutFuture.cancel(false);
+   }
+   });
+
+   return future;
+   }
 
/**
-* Returns a {@link CompletableFuture} that has failed with the 
exception
-* provided as argument.
-* @param throwable the exception to fail the future with.
-* @return The failed future.
+* Runnable to complete the given future with a {@link 
TimeoutException}.
 */
-   public static  CompletableFuture getFailedFuture(Throwable 
throwable) {
-   CompletableFuture failedAttempt = new CompletableFuture<>();
-   failedAttempt.completeExceptionally(throwable);
-   return failedAttempt;
+   static final class Timeout implements Runnable {
+
+   private final CompletableFuture future;
+
+   Timeout(CompletableFuture future) {
+   this.future = Preconditions.checkNotNull(future);
+   }
+
+   @Override
+   public void run() {
+   future.completeExceptionally(new TimeoutException());
--- End diff --

Could be useful to have to used timeout in the exception message.


---


[GitHub] flink pull request #4918: [FLINK-7940] Add FutureUtils.orTimeout

2017-10-29 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4918#discussion_r147594412
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
 ---
@@ -446,19 +449,67 @@ public void onComplete(Throwable failure, T success) 
throws Throwable {
return result;
}
 
-   // 

-   //  Future Completed with an exception.
-   // 

+   /**
+* Times the given future out after the timeout.
+*
+* @param future to time out
+* @param timeout after which the given future is timed out
+* @param timeUnit time unit of the timeout
+* @param  type of the given future
+* @return The timeout enriched future
+*/
+   public static  CompletableFuture orTimeout(CompletableFuture 
future, long timeout, TimeUnit timeUnit) {
+   final ScheduledFuture timeoutFuture = Delayer.delay(new 
Timeout(future), timeout, timeUnit);
+
+   future.whenComplete((T value, Throwable throwable) -> {
+   if (!timeoutFuture.isDone()) {
+   timeoutFuture.cancel(false);
+   }
+   });
+
+   return future;
+   }
 
/**
-* Returns a {@link CompletableFuture} that has failed with the 
exception
-* provided as argument.
-* @param throwable the exception to fail the future with.
-* @return The failed future.
+* Runnable to complete the given future with a {@link 
TimeoutException}.
 */
-   public static  CompletableFuture getFailedFuture(Throwable 
throwable) {
-   CompletableFuture failedAttempt = new CompletableFuture<>();
-   failedAttempt.completeExceptionally(throwable);
-   return failedAttempt;
+   static final class Timeout implements Runnable {
+
+   private final CompletableFuture future;
+
+   Timeout(CompletableFuture future) {
+   this.future = Preconditions.checkNotNull(future);
+   }
+
+   @Override
+   public void run() {
+   future.completeExceptionally(new TimeoutException());
+   }
+   }
+
+   /**
+* Delay scheduler used to timeout futures.
+*
+* This class creates a singleton scheduler used to run the provided 
actions.
+*/
+   private static final class Delayer {
+   static final ScheduledThreadPoolExecutor delayer = new 
ScheduledThreadPoolExecutor(
+   1,
+   new 
ExecutorThreadFactory("CompletableFutureDelayScheduler"));
--- End diff --

Let's add a "Flink" prefix to the thread name.


---


[GitHub] flink pull request #4918: [FLINK-7940] Add FutureUtils.orTimeout

2017-10-29 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/4918

[FLINK-7940] Add FutureUtils.orTimeout

## What is the purpose of the change

This commit adds a convenience function `FutureUtils#orTimeout` which 
allows to easily add a timeout to
a CompletableFuture.

## Verifying this change

- Added `FutureUtilsTest#testOrTimeout`

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink orTimeout

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4918.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4918


commit 83e52f5458379855c481fe169f6be50a8afee336
Author: Till Rohrmann 
Date:   2017-10-29T15:01:18Z

[hotfix] Remove redundant FutureUtils#getFailedFuture

FutureUtils#completedExceptionally does exactly the same.

commit 94b3d14bc2ba64a4862bd83e670f3dbfccdf96b8
Author: Till Rohrmann 
Date:   2017-10-29T15:38:53Z

[FLINK-7940] Add FutureUtils.orTimeout

This commit adds a convenience function which allows to easily add a 
timeout to
a CompletableFuture.




---