Repository: flink
Updated Branches:
  refs/heads/master 3c42557f3 -> 648c1f595


[FLINK-7313] [futures] Add Flink future and Scala future to Java 8 
CompletableFuture conversion

Add DirectExecutionContext

Add Scala Future to Java 8 CompletableFuture utility to FutureUtils

Add Flink future to Java 8's CompletableFuture conversion utility to FutureUtils

Add base class for Flink's unchecked future exceptions

This closes #4429.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/648c1f59
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/648c1f59
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/648c1f59

Branch: refs/heads/master
Commit: 648c1f595990655f9c866d0c9e8983e0b63a927a
Parents: 3c42557
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Mon Jul 31 15:07:18 2017 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Tue Aug 1 13:30:25 2017 +0200

----------------------------------------------------------------------
 .../flink/runtime/concurrent/Executors.java     | 37 +++++++++++++
 .../concurrent/FlinkFutureException.java        | 47 ++++++++++++++++
 .../flink/runtime/concurrent/FutureUtils.java   | 58 ++++++++++++++++++++
 3 files changed, 142 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/648c1f59/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
index e8a9be9..04cdce7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
@@ -26,6 +26,8 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import scala.concurrent.ExecutionContext;
+
 /**
  * Collection of {@link Executor} implementations
  */
@@ -59,6 +61,41 @@ public class Executors {
        }
 
        /**
+        * Return a direct execution context. The direct execution context 
executes the runnable directly
+        * in the calling thread.
+        *
+        * @return Direct execution context.
+        */
+       public static ExecutionContext directExecutionContext() {
+               return DirectExecutionContext.INSTANCE;
+       }
+
+       /**
+        * Direct execution context.
+        */
+       private static class DirectExecutionContext implements ExecutionContext 
{
+
+               static final DirectExecutionContext INSTANCE = new 
DirectExecutionContext();
+
+               private DirectExecutionContext() {}
+
+               @Override
+               public void execute(Runnable runnable) {
+                       runnable.run();
+               }
+
+               @Override
+               public void reportFailure(Throwable cause) {
+                       throw new IllegalStateException("Error in direct 
execution context.", cause);
+               }
+
+               @Override
+               public ExecutionContext prepare() {
+                       return this;
+               }
+       }
+
+       /**
         * Gracefully shutdown the given {@link ExecutorService}. The call 
waits the given timeout that
         * all ExecutorServices terminate. If the ExecutorServices do not 
terminate in this time,
         * they will be shut down hard.

http://git-wip-us.apache.org/repos/asf/flink/blob/648c1f59/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FlinkFutureException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FlinkFutureException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FlinkFutureException.java
new file mode 100644
index 0000000..c728904
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FlinkFutureException.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.concurrent;
+
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.concurrent.CompletionStage;
+
+/**
+ * Base class for exceptions which are thrown in {@link CompletionStage}.
+ *
+ * <p>The exception has to extend {@link FlinkRuntimeException} because only
+ * unchecked exceptions can be thrown in a future's stage. Additionally we let
+ * it extend the Flink runtime exception because it designates the exception to
+ * come from a Flink stage.
+ */
+public class FlinkFutureException extends FlinkRuntimeException {
+       private static final long serialVersionUID = -8878194471694178210L;
+
+       public FlinkFutureException(String message) {
+               super(message);
+       }
+
+       public FlinkFutureException(Throwable cause) {
+               super(cause);
+       }
+
+       public FlinkFutureException(String message, Throwable cause) {
+               super(message, cause);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/648c1f59/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
index a27af56..9cdbe1f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.concurrent;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.util.Preconditions;
 
+import akka.dispatch.OnComplete;
+
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -286,4 +288,60 @@ public class FutureUtils {
                        return numCompleted.get();
                }
        }
+
+       // 
------------------------------------------------------------------------
+       //  Converting futures
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Converts a Scala {@link scala.concurrent.Future} to a {@link 
java.util.concurrent.CompletableFuture}.
+        *
+        * @param scalaFuture to convert to a Java 8 CompletableFuture
+        * @param <T> type of the future value
+        * @return Java 8 CompletableFuture
+        */
+       public static <T> java.util.concurrent.CompletableFuture<T> 
toJava(scala.concurrent.Future<T> scalaFuture) {
+               final java.util.concurrent.CompletableFuture<T> result = new 
java.util.concurrent.CompletableFuture<>();
+
+               scalaFuture.onComplete(new OnComplete<T>() {
+                       @Override
+                       public void onComplete(Throwable failure, T success) 
throws Throwable {
+                               if (failure != null) {
+                                       result.completeExceptionally(failure);
+                               } else {
+                                       result.complete(success);
+                               }
+                       }
+               }, Executors.directExecutionContext());
+
+               return result;
+       }
+
+       /**
+        * Converts a Flink {@link Future} into a {@link CompletableFuture}.
+        *
+        * @param flinkFuture to convert to a Java 8 CompletableFuture
+        * @param <T> type of the future value
+        * @return Java 8 CompletableFuture
+        *
+        * @deprecated Will be removed once we completely remove Flink's futures
+        */
+       @Deprecated
+       public static <T> java.util.concurrent.CompletableFuture<T> 
toJava(Future<T> flinkFuture) {
+               final java.util.concurrent.CompletableFuture<T> result = new 
java.util.concurrent.CompletableFuture<>();
+
+               flinkFuture.handle(
+                       (t, throwable) -> {
+                               if (throwable != null) {
+                                       result.completeExceptionally(throwable);
+                               } else {
+                                       result.complete(t);
+                               }
+
+                               return null;
+                       }
+               );
+
+               return result;
+       }
 }

Reply via email to