Repository: flink
Updated Branches:
  refs/heads/master 6d4981a43 -> a6c8953eb


[FLINK-7328] [futures] Replace Flink's futures with Java 8's CompletableFuture 
in SlotManager

This closes #4443.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a6c8953e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a6c8953e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a6c8953e

Branch: refs/heads/master
Commit: a6c8953eb3ee9d5e4fa760e474b040b19ed8b97f
Parents: 6d4981a
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Mon Jul 31 21:38:28 2017 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Tue Aug 1 22:56:31 2017 +0200

----------------------------------------------------------------------
 .../slotmanager/PendingSlotRequest.java         |  3 +-
 .../slotmanager/SlotManager.java                | 46 ++++++++------------
 2 files changed, 21 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a6c8953e/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java
index ffe1bfc..17cf8c7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java
@@ -21,13 +21,14 @@ package 
org.apache.flink.runtime.resourcemanager.slotmanager;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.concurrent.CompletableFuture;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
 import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nullable;
 
+import java.util.concurrent.CompletableFuture;
+
 public class PendingSlotRequest {
 
        private final SlotRequest slotRequest;

http://git-wip-us.apache.org/repos/asf/flink/blob/a6c8953e/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index 829a06d..8354525 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -24,11 +24,8 @@ import 
org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.TaskManagerSlot;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
-import org.apache.flink.runtime.concurrent.BiFunction;
-import org.apache.flink.runtime.concurrent.CompletableFuture;
-import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
@@ -51,6 +48,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.UUID;
 import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
@@ -623,7 +621,7 @@ public class SlotManager implements AutoCloseable {
                TaskExecutorConnection taskExecutorConnection = 
taskManagerSlot.getTaskManagerConnection();
                TaskExecutorGateway gateway = 
taskExecutorConnection.getTaskExecutorGateway();
 
-               final CompletableFuture<Acknowledge> completableFuture = new 
FlinkCompletableFuture<>();
+               final CompletableFuture<Acknowledge> completableFuture = new 
CompletableFuture<>();
                final AllocationID allocationId = 
pendingSlotRequest.getAllocationId();
                final SlotID slotId = taskManagerSlot.getSlotId();
 
@@ -641,30 +639,26 @@ public class SlotManager implements AutoCloseable {
                }
 
                // RPC call to the task manager
-               Future<Acknowledge> requestFuture = gateway.requestSlot(
-                       slotId,
-                       pendingSlotRequest.getJobId(),
-                       allocationId,
-                       pendingSlotRequest.getTargetAddress(),
-                       leaderId,
-                       taskManagerRequestTimeout);
-
-               requestFuture.handle(new BiFunction<Acknowledge, Throwable, 
Void>() {
-                       @Override
-                       public Void apply(Acknowledge acknowledge, Throwable 
throwable) {
+               CompletableFuture<Acknowledge> requestFuture = 
FutureUtils.toJava(
+                       gateway.requestSlot(
+                               slotId,
+                               pendingSlotRequest.getJobId(),
+                               allocationId,
+                               pendingSlotRequest.getTargetAddress(),
+                               leaderId,
+                               taskManagerRequestTimeout));
+
+               requestFuture.whenComplete(
+                       (Acknowledge acknowledge, Throwable throwable) -> {
                                if (acknowledge != null) {
                                        completableFuture.complete(acknowledge);
                                } else {
                                        
completableFuture.completeExceptionally(throwable);
                                }
+                       });
 
-                               return null;
-                       }
-               });
-
-               completableFuture.handleAsync(new BiFunction<Acknowledge, 
Throwable, Void>() {
-                       @Override
-                       public Void apply(Acknowledge acknowledge, Throwable 
throwable) {
+               completableFuture.whenCompleteAsync(
+                       (Acknowledge acknowledge, Throwable throwable) -> {
                                if (acknowledge != null) {
                                        updateSlot(slotId, allocationId);
                                } else {
@@ -681,10 +675,8 @@ public class SlotManager implements AutoCloseable {
                                                LOG.debug("Slot allocation 
request {} has been cancelled.", allocationId, throwable);
                                        }
                                }
-
-                               return null;
-                       }
-               }, mainThreadExecutor);
+                       },
+                       mainThreadExecutor);
        }
 
        /**

Reply via email to