[ 
https://issues.apache.org/jira/browse/FLINK-4751?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15551359#comment-15551359
 ] 

ASF GitHub Bot commented on FLINK-4751:
---------------------------------------

Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2600#discussion_r82142626
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
 ---
    @@ -232,6 +244,40 @@ public R recover(Throwable failure) throws Throwable {
                return new FlinkFuture<>(recoveredFuture);
        }
     
    +   @Override
    +   public <U, R> Future<R> thenCombineAsync(final Future<U> other, final 
BiFunction<? super T, ? super U, ? extends R> biFunction, final Executor 
executor) {
    +           final scala.concurrent.Future<U> thatScalaFuture;
    +
    +           if (other instanceof FlinkFuture) {
    +                   thatScalaFuture = ((FlinkFuture<U>) other).scalaFuture;
    +           } else {
    +                   thatScalaFuture = Futures.future(new Callable<U>() {
    +                           @Override
    +                           public U call() throws Exception {
    +                                   try {
    +                                           return other.get();
    +                                   } catch (ExecutionException e) {
    +                                           // unwrap the execution 
exception if it's not a throwable
    +                                           if (e.getCause() instanceof 
Exception) {
    +                                                   throw (Exception) 
e.getCause();
    +                                           } else {
    +                                                   throw new 
FlinkFuture.ThrowableWrapperException(e.getCause());
    +                                           }
    +                                   }
    +                           }
    +                   }, createExecutionContext(executor));
    --- End diff --
    
    Does it make sense to create the wrapper only once?


> Extend Flink's futures to support combining two futures
> -------------------------------------------------------
>
>                 Key: FLINK-4751
>                 URL: https://issues.apache.org/jira/browse/FLINK-4751
>             Project: Flink
>          Issue Type: Improvement
>    Affects Versions: 1.2.0
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>            Priority: Minor
>             Fix For: 1.2.0
>
>
> A useful feature for Flink's futures would be to allow combining two futures:
> {code}
> Future<A> a = ...
> Future<B> b = ...
> Future<C> c = a.thenCombineAsync(b, new BiFunction<A, B, C>() {...}, 
> executor);
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to