Repository: flink Updated Branches: refs/heads/master 3c42557f3 -> 648c1f595
[FLINK-7313] [futures] Add Flink future and Scala future to Java 8 CompletableFuture conversion Add DirectExecutionContext Add Scala Future to Java 8 CompletableFuture utility to FutureUtils Add Flink future to Java 8's CompletableFuture conversion utility to FutureUtils Add base class for Flink's unchecked future exceptions This closes #4429. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/648c1f59 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/648c1f59 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/648c1f59 Branch: refs/heads/master Commit: 648c1f595990655f9c866d0c9e8983e0b63a927a Parents: 3c42557 Author: Till Rohrmann <trohrm...@apache.org> Authored: Mon Jul 31 15:07:18 2017 +0200 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Tue Aug 1 13:30:25 2017 +0200 ---------------------------------------------------------------------- .../flink/runtime/concurrent/Executors.java | 37 +++++++++++++ .../concurrent/FlinkFutureException.java | 47 ++++++++++++++++ .../flink/runtime/concurrent/FutureUtils.java | 58 ++++++++++++++++++++ 3 files changed, 142 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/648c1f59/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java index e8a9be9..04cdce7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java @@ -26,6 +26,8 @@ import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import scala.concurrent.ExecutionContext; + /** * Collection of {@link Executor} implementations */ @@ -59,6 +61,41 @@ public class Executors { } /** + * Return a direct execution context. The direct execution context executes the runnable directly + * in the calling thread. + * + * @return Direct execution context. + */ + public static ExecutionContext directExecutionContext() { + return DirectExecutionContext.INSTANCE; + } + + /** + * Direct execution context. + */ + private static class DirectExecutionContext implements ExecutionContext { + + static final DirectExecutionContext INSTANCE = new DirectExecutionContext(); + + private DirectExecutionContext() {} + + @Override + public void execute(Runnable runnable) { + runnable.run(); + } + + @Override + public void reportFailure(Throwable cause) { + throw new IllegalStateException("Error in direct execution context.", cause); + } + + @Override + public ExecutionContext prepare() { + return this; + } + } + + /** * Gracefully shutdown the given {@link ExecutorService}. The call waits the given timeout that * all ExecutorServices terminate. If the ExecutorServices do not terminate in this time, * they will be shut down hard. http://git-wip-us.apache.org/repos/asf/flink/blob/648c1f59/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FlinkFutureException.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FlinkFutureException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FlinkFutureException.java new file mode 100644 index 0000000..c728904 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FlinkFutureException.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.concurrent; + +import org.apache.flink.util.FlinkRuntimeException; + +import java.util.concurrent.CompletionStage; + +/** + * Base class for exceptions which are thrown in {@link CompletionStage}. + * + * <p>The exception has to extend {@link FlinkRuntimeException} because only + * unchecked exceptions can be thrown in a future's stage. Additionally we let + * it extend the Flink runtime exception because it designates the exception to + * come from a Flink stage. + */ +public class FlinkFutureException extends FlinkRuntimeException { + private static final long serialVersionUID = -8878194471694178210L; + + public FlinkFutureException(String message) { + super(message); + } + + public FlinkFutureException(Throwable cause) { + super(cause); + } + + public FlinkFutureException(String message, Throwable cause) { + super(message, cause); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/648c1f59/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java index a27af56..9cdbe1f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java @@ -21,6 +21,8 @@ package org.apache.flink.runtime.concurrent; import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; import org.apache.flink.util.Preconditions; +import akka.dispatch.OnComplete; + import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -286,4 +288,60 @@ public class FutureUtils { return numCompleted.get(); } } + + // ------------------------------------------------------------------------ + // Converting futures + // ------------------------------------------------------------------------ + + /** + * Converts a Scala {@link scala.concurrent.Future} to a {@link java.util.concurrent.CompletableFuture}. + * + * @param scalaFuture to convert to a Java 8 CompletableFuture + * @param <T> type of the future value + * @return Java 8 CompletableFuture + */ + public static <T> java.util.concurrent.CompletableFuture<T> toJava(scala.concurrent.Future<T> scalaFuture) { + final java.util.concurrent.CompletableFuture<T> result = new java.util.concurrent.CompletableFuture<>(); + + scalaFuture.onComplete(new OnComplete<T>() { + @Override + public void onComplete(Throwable failure, T success) throws Throwable { + if (failure != null) { + result.completeExceptionally(failure); + } else { + result.complete(success); + } + } + }, Executors.directExecutionContext()); + + return result; + } + + /** + * Converts a Flink {@link Future} into a {@link CompletableFuture}. + * + * @param flinkFuture to convert to a Java 8 CompletableFuture + * @param <T> type of the future value + * @return Java 8 CompletableFuture + * + * @deprecated Will be removed once we completely remove Flink's futures + */ + @Deprecated + public static <T> java.util.concurrent.CompletableFuture<T> toJava(Future<T> flinkFuture) { + final java.util.concurrent.CompletableFuture<T> result = new java.util.concurrent.CompletableFuture<>(); + + flinkFuture.handle( + (t, throwable) -> { + if (throwable != null) { + result.completeExceptionally(throwable); + } else { + result.complete(t); + } + + return null; + } + ); + + return result; + } }