Repository: flink
Updated Branches:
  refs/heads/master 3b97784ae -> 6d4981a43


[FLINK-7324] [futures] Replace Flink's future with Java 8's CompletableFuture 
in SlotPool

Address PR comments

This closes #4438.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6d4981a4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6d4981a4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6d4981a4

Branch: refs/heads/master
Commit: 6d4981a431e1ad28dee3a2143477fa7d2696d5fd
Parents: 3b97784
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Mon Jul 31 19:35:14 2017 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Tue Aug 1 22:52:11 2017 +0200

----------------------------------------------------------------------
 .../flink/runtime/concurrent/FutureUtils.java   | 25 +++++++
 .../apache/flink/runtime/instance/SlotPool.java | 70 ++++++++++----------
 2 files changed, 59 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6d4981a4/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
index 9cdbe1f..8721e52 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
@@ -344,4 +344,29 @@ public class FutureUtils {
 
                return result;
        }
+
+       /**
+        * Converts a Java 8 {@link java.util.concurrent.CompletableFuture} 
into a Flink {@link Future}.
+        *
+        * @param javaFuture to convert to a Flink future
+        * @param <T> type of the future value
+        * @return Flink future
+        *
+        * @deprecated Will be removed once we completely remove Flink's futures
+        */
+       @Deprecated
+       public static <T> Future<T> 
toFlinkFuture(java.util.concurrent.CompletableFuture<T> javaFuture) {
+               FlinkCompletableFuture<T> result = new 
FlinkCompletableFuture<>();
+
+               javaFuture.whenComplete(
+                       (value, throwable) -> {
+                               if (throwable == null) {
+                                       result.complete(value);
+                               } else {
+                                       result.completeExceptionally(throwable);
+                               }
+                       });
+
+               return result;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6d4981a4/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
index 8cf6a9b..c74d9a6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
@@ -25,10 +25,8 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.concurrent.AcceptFunction;
-import org.apache.flink.runtime.concurrent.ApplyFunction;
 import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.jobmanager.scheduler.Locality;
 import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
@@ -57,6 +55,7 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeoutException;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -246,7 +245,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
 
                // work on all slots waiting for this connection
                for (PendingRequest pending : 
waitingForResourceManager.values()) {
-                       requestSlotFromResourceManager(pending.allocationID(), 
pending.future(), pending.resourceProfile());
+                       requestSlotFromResourceManager(pending.allocationID(), 
pending.getFuture(), pending.resourceProfile());
                }
 
                // all sent off
@@ -269,7 +268,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
                        ResourceProfile resources,
                        Iterable<TaskManagerLocation> locationPreferences) {
 
-               return internalAllocateSlot(task, resources, 
locationPreferences);
+               return FutureUtils.toFlinkFuture(internalAllocateSlot(task, 
resources, locationPreferences));
        }
 
        @RpcMethod
@@ -278,7 +277,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
        }
 
 
-       Future<SimpleSlot> internalAllocateSlot(
+       CompletableFuture<SimpleSlot> internalAllocateSlot(
                        ScheduledUnit task,
                        ResourceProfile resources,
                        Iterable<TaskManagerLocation> locationPreferences) {
@@ -288,12 +287,12 @@ public class SlotPool extends 
RpcEndpoint<SlotPoolGateway> {
                if (slotFromPool != null) {
                        SimpleSlot slot = createSimpleSlot(slotFromPool.slot(), 
slotFromPool.locality());
                        allocatedSlots.add(slot);
-                       return FlinkCompletableFuture.completed(slot);
+                       return CompletableFuture.completedFuture(slot);
                }
 
                // the request will be completed by a future
                final AllocationID allocationID = new AllocationID();
-               final FlinkCompletableFuture<SimpleSlot> future = new 
FlinkCompletableFuture<>();
+               final CompletableFuture<SimpleSlot> future = new 
CompletableFuture<>();
 
                // (2) need to request a slot
 
@@ -310,34 +309,33 @@ public class SlotPool extends 
RpcEndpoint<SlotPoolGateway> {
 
        private void requestSlotFromResourceManager(
                        final AllocationID allocationID,
-                       final FlinkCompletableFuture<SimpleSlot> future,
+                       final CompletableFuture<SimpleSlot> future,
                        final ResourceProfile resources) {
 
                LOG.info("Requesting slot with profile {} from resource manager 
(request = {}).", resources, allocationID);
 
                pendingRequests.put(allocationID, new 
PendingRequest(allocationID, future, resources));
 
-               Future<Acknowledge> rmResponse = 
resourceManagerGateway.requestSlot(
+               CompletableFuture<Acknowledge> rmResponse = FutureUtils.toJava(
+                       resourceManagerGateway.requestSlot(
                                jobManagerLeaderId, resourceManagerLeaderId,
                                new SlotRequest(jobId, allocationID, resources, 
jobManagerAddress),
-                               resourceManagerRequestsTimeout);
+                               resourceManagerRequestsTimeout));
 
-               Future<Void> slotRequestProcessingFuture = 
rmResponse.thenAcceptAsync(new AcceptFunction<Acknowledge>() {
-                       @Override
-                       public void accept(Acknowledge value) {
+               CompletableFuture<Void> slotRequestProcessingFuture = 
rmResponse.thenAcceptAsync(
+                       (Acknowledge value) -> {
                                
slotRequestToResourceManagerSuccess(allocationID);
-                       }
-               }, getMainThreadExecutor());
+                       },
+                       getMainThreadExecutor());
 
                // on failure, fail the request future
-               slotRequestProcessingFuture.exceptionallyAsync(new 
ApplyFunction<Throwable, Void>() {
-
-                       @Override
-                       public Void apply(Throwable failure) {
-                               
slotRequestToResourceManagerFailed(allocationID, failure);
-                               return null;
-                       }
-               }, getMainThreadExecutor());
+               slotRequestProcessingFuture.whenCompleteAsync(
+                       (Void v, Throwable failure) -> {
+                               if (failure != null) {
+                                       
slotRequestToResourceManagerFailed(allocationID, failure);
+                               }
+                       },
+                       getMainThreadExecutor());
        }
 
        private void slotRequestToResourceManagerSuccess(final AllocationID 
allocationID) {
@@ -354,7 +352,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
        private void slotRequestToResourceManagerFailed(AllocationID 
allocationID, Throwable failure) {
                PendingRequest request = pendingRequests.remove(allocationID);
                if (request != null) {
-                       request.future().completeExceptionally(new 
NoResourceAvailableException(
+                       request.getFuture().completeExceptionally(new 
NoResourceAvailableException(
                                        "No pooled slot available and request 
to ResourceManager for new slot failed", failure));
                } else {
                        if (LOG.isDebugEnabled()) {
@@ -365,15 +363,15 @@ public class SlotPool extends 
RpcEndpoint<SlotPoolGateway> {
 
        private void checkTimeoutSlotAllocation(AllocationID allocationID) {
                PendingRequest request = pendingRequests.remove(allocationID);
-               if (request != null && !request.future().isDone()) {
-                       request.future().completeExceptionally(new 
TimeoutException("Slot allocation request timed out"));
+               if (request != null && !request.getFuture().isDone()) {
+                       request.getFuture().completeExceptionally(new 
TimeoutException("Slot allocation request timed out"));
                }
        }
 
        private void stashRequestWaitingForResourceManager(
                        final AllocationID allocationID,
                        final ResourceProfile resources,
-                       final FlinkCompletableFuture<SimpleSlot> future) {
+                       final CompletableFuture<SimpleSlot> future) {
 
                LOG.info("Cannot serve slot request, no ResourceManager 
connected. " +
                                "Adding as pending request {}",  allocationID);
@@ -390,8 +388,8 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
 
        private void checkTimeoutRequestWaitingForResourceManager(AllocationID 
allocationID) {
                PendingRequest request = 
waitingForResourceManager.remove(allocationID);
-               if (request != null && !request.future().isDone()) {
-                       request.future().completeExceptionally(new 
NoResourceAvailableException(
+               if (request != null && !request.getFuture().isDone()) {
+                       request.getFuture().completeExceptionally(new 
NoResourceAvailableException(
                                        "No slot available and no connection to 
Resource Manager established."));
                }
        }
@@ -426,7 +424,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
 
                                        SimpleSlot newSlot = 
createSimpleSlot(taskManagerSlot, Locality.UNKNOWN);
                                        allocatedSlots.add(newSlot);
-                                       
pendingRequest.future().complete(newSlot);
+                                       
pendingRequest.getFuture().complete(newSlot);
                                }
                                else {
                                        LOG.debug("Adding returned slot [{}] to 
available slots", taskManagerSlot.getSlotAllocationId());
@@ -513,7 +511,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
                if (pendingRequest != null) {
                        // we were waiting for this!
                        SimpleSlot resultSlot = createSimpleSlot(slot, 
Locality.UNKNOWN);
-                       pendingRequest.future().complete(resultSlot);
+                       pendingRequest.getFuture().complete(resultSlot);
                        allocatedSlots.add(resultSlot);
                }
                else {
@@ -552,7 +550,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
                if (pendingRequest != null) {
                        // request was still pending
                        LOG.debug("Failed pending request [{}] with ", 
allocationID, cause);
-                       pendingRequest.future().completeExceptionally(cause);
+                       pendingRequest.getFuture().completeExceptionally(cause);
                }
                else if (availableSlots.tryRemove(allocationID)) {
                        LOG.debug("Failed available slot [{}] with ", 
allocationID, cause);
@@ -999,13 +997,13 @@ public class SlotPool extends 
RpcEndpoint<SlotPoolGateway> {
 
                private final AllocationID allocationID;
 
-               private final FlinkCompletableFuture<SimpleSlot> future;
+               private final CompletableFuture<SimpleSlot> future;
 
                private final ResourceProfile resourceProfile;
 
                PendingRequest(
                                AllocationID allocationID,
-                               FlinkCompletableFuture<SimpleSlot> future,
+                               CompletableFuture<SimpleSlot> future,
                                ResourceProfile resourceProfile) {
                        this.allocationID = allocationID;
                        this.future = future;
@@ -1016,7 +1014,7 @@ public class SlotPool extends 
RpcEndpoint<SlotPoolGateway> {
                        return allocationID;
                }
 
-               public FlinkCompletableFuture<SimpleSlot> future() {
+               public CompletableFuture<SimpleSlot> getFuture() {
                        return future;
                }
 

Reply via email to