Repository: flink Updated Branches: refs/heads/master 6d4981a43 -> a6c8953eb
[FLINK-7328] [futures] Replace Flink's futures with Java 8's CompletableFuture in SlotManager This closes #4443. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a6c8953e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a6c8953e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a6c8953e Branch: refs/heads/master Commit: a6c8953eb3ee9d5e4fa760e474b040b19ed8b97f Parents: 6d4981a Author: Till Rohrmann <trohrm...@apache.org> Authored: Mon Jul 31 21:38:28 2017 +0200 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Tue Aug 1 22:56:31 2017 +0200 ---------------------------------------------------------------------- .../slotmanager/PendingSlotRequest.java | 3 +- .../slotmanager/SlotManager.java | 46 ++++++++------------ 2 files changed, 21 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a6c8953e/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java index ffe1bfc..17cf8c7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java @@ -21,13 +21,14 @@ package org.apache.flink.runtime.resourcemanager.slotmanager; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; -import org.apache.flink.runtime.concurrent.CompletableFuture; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.resourcemanager.SlotRequest; import org.apache.flink.util.Preconditions; import javax.annotation.Nullable; +import java.util.concurrent.CompletableFuture; + public class PendingSlotRequest { private final SlotRequest slotRequest; http://git-wip-us.apache.org/repos/asf/flink/blob/a6c8953e/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java index 829a06d..8354525 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java @@ -24,11 +24,8 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.clusterframework.types.TaskManagerSlot; import org.apache.flink.runtime.clusterframework.types.SlotID; -import org.apache.flink.runtime.concurrent.BiFunction; -import org.apache.flink.runtime.concurrent.CompletableFuture; -import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.concurrent.ScheduledExecutor; -import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.resourcemanager.SlotRequest; @@ -51,6 +48,7 @@ import java.util.Map; import java.util.Objects; import java.util.UUID; import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -623,7 +621,7 @@ public class SlotManager implements AutoCloseable { TaskExecutorConnection taskExecutorConnection = taskManagerSlot.getTaskManagerConnection(); TaskExecutorGateway gateway = taskExecutorConnection.getTaskExecutorGateway(); - final CompletableFuture<Acknowledge> completableFuture = new FlinkCompletableFuture<>(); + final CompletableFuture<Acknowledge> completableFuture = new CompletableFuture<>(); final AllocationID allocationId = pendingSlotRequest.getAllocationId(); final SlotID slotId = taskManagerSlot.getSlotId(); @@ -641,30 +639,26 @@ public class SlotManager implements AutoCloseable { } // RPC call to the task manager - Future<Acknowledge> requestFuture = gateway.requestSlot( - slotId, - pendingSlotRequest.getJobId(), - allocationId, - pendingSlotRequest.getTargetAddress(), - leaderId, - taskManagerRequestTimeout); - - requestFuture.handle(new BiFunction<Acknowledge, Throwable, Void>() { - @Override - public Void apply(Acknowledge acknowledge, Throwable throwable) { + CompletableFuture<Acknowledge> requestFuture = FutureUtils.toJava( + gateway.requestSlot( + slotId, + pendingSlotRequest.getJobId(), + allocationId, + pendingSlotRequest.getTargetAddress(), + leaderId, + taskManagerRequestTimeout)); + + requestFuture.whenComplete( + (Acknowledge acknowledge, Throwable throwable) -> { if (acknowledge != null) { completableFuture.complete(acknowledge); } else { completableFuture.completeExceptionally(throwable); } + }); - return null; - } - }); - - completableFuture.handleAsync(new BiFunction<Acknowledge, Throwable, Void>() { - @Override - public Void apply(Acknowledge acknowledge, Throwable throwable) { + completableFuture.whenCompleteAsync( + (Acknowledge acknowledge, Throwable throwable) -> { if (acknowledge != null) { updateSlot(slotId, allocationId); } else { @@ -681,10 +675,8 @@ public class SlotManager implements AutoCloseable { LOG.debug("Slot allocation request {} has been cancelled.", allocationId, throwable); } } - - return null; - } - }, mainThreadExecutor); + }, + mainThreadExecutor); } /**