Repository: flink Updated Branches: refs/heads/master 3b97784ae -> 6d4981a43
[FLINK-7324] [futures] Replace Flink's future with Java 8's CompletableFuture in SlotPool Address PR comments This closes #4438. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6d4981a4 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6d4981a4 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6d4981a4 Branch: refs/heads/master Commit: 6d4981a431e1ad28dee3a2143477fa7d2696d5fd Parents: 3b97784 Author: Till Rohrmann <trohrm...@apache.org> Authored: Mon Jul 31 19:35:14 2017 +0200 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Tue Aug 1 22:52:11 2017 +0200 ---------------------------------------------------------------------- .../flink/runtime/concurrent/FutureUtils.java | 25 +++++++ .../apache/flink/runtime/instance/SlotPool.java | 70 ++++++++++---------- 2 files changed, 59 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/6d4981a4/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java index 9cdbe1f..8721e52 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java @@ -344,4 +344,29 @@ public class FutureUtils { return result; } + + /** + * Converts a Java 8 {@link java.util.concurrent.CompletableFuture} into a Flink {@link Future}. + * + * @param javaFuture to convert to a Flink future + * @param <T> type of the future value + * @return Flink future + * + * @deprecated Will be removed once we completely remove Flink's futures + */ + @Deprecated + public static <T> Future<T> toFlinkFuture(java.util.concurrent.CompletableFuture<T> javaFuture) { + FlinkCompletableFuture<T> result = new FlinkCompletableFuture<>(); + + javaFuture.whenComplete( + (value, throwable) -> { + if (throwable == null) { + result.complete(value); + } else { + result.completeExceptionally(throwable); + } + }); + + return result; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/6d4981a4/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java index 8cf6a9b..c74d9a6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java @@ -25,10 +25,8 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; -import org.apache.flink.runtime.concurrent.AcceptFunction; -import org.apache.flink.runtime.concurrent.ApplyFunction; import org.apache.flink.runtime.concurrent.Future; -import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; +import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.jobmanager.scheduler.Locality; import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; @@ -57,6 +55,7 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeoutException; import static org.apache.flink.util.Preconditions.checkArgument; @@ -246,7 +245,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> { // work on all slots waiting for this connection for (PendingRequest pending : waitingForResourceManager.values()) { - requestSlotFromResourceManager(pending.allocationID(), pending.future(), pending.resourceProfile()); + requestSlotFromResourceManager(pending.allocationID(), pending.getFuture(), pending.resourceProfile()); } // all sent off @@ -269,7 +268,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> { ResourceProfile resources, Iterable<TaskManagerLocation> locationPreferences) { - return internalAllocateSlot(task, resources, locationPreferences); + return FutureUtils.toFlinkFuture(internalAllocateSlot(task, resources, locationPreferences)); } @RpcMethod @@ -278,7 +277,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> { } - Future<SimpleSlot> internalAllocateSlot( + CompletableFuture<SimpleSlot> internalAllocateSlot( ScheduledUnit task, ResourceProfile resources, Iterable<TaskManagerLocation> locationPreferences) { @@ -288,12 +287,12 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> { if (slotFromPool != null) { SimpleSlot slot = createSimpleSlot(slotFromPool.slot(), slotFromPool.locality()); allocatedSlots.add(slot); - return FlinkCompletableFuture.completed(slot); + return CompletableFuture.completedFuture(slot); } // the request will be completed by a future final AllocationID allocationID = new AllocationID(); - final FlinkCompletableFuture<SimpleSlot> future = new FlinkCompletableFuture<>(); + final CompletableFuture<SimpleSlot> future = new CompletableFuture<>(); // (2) need to request a slot @@ -310,34 +309,33 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> { private void requestSlotFromResourceManager( final AllocationID allocationID, - final FlinkCompletableFuture<SimpleSlot> future, + final CompletableFuture<SimpleSlot> future, final ResourceProfile resources) { LOG.info("Requesting slot with profile {} from resource manager (request = {}).", resources, allocationID); pendingRequests.put(allocationID, new PendingRequest(allocationID, future, resources)); - Future<Acknowledge> rmResponse = resourceManagerGateway.requestSlot( + CompletableFuture<Acknowledge> rmResponse = FutureUtils.toJava( + resourceManagerGateway.requestSlot( jobManagerLeaderId, resourceManagerLeaderId, new SlotRequest(jobId, allocationID, resources, jobManagerAddress), - resourceManagerRequestsTimeout); + resourceManagerRequestsTimeout)); - Future<Void> slotRequestProcessingFuture = rmResponse.thenAcceptAsync(new AcceptFunction<Acknowledge>() { - @Override - public void accept(Acknowledge value) { + CompletableFuture<Void> slotRequestProcessingFuture = rmResponse.thenAcceptAsync( + (Acknowledge value) -> { slotRequestToResourceManagerSuccess(allocationID); - } - }, getMainThreadExecutor()); + }, + getMainThreadExecutor()); // on failure, fail the request future - slotRequestProcessingFuture.exceptionallyAsync(new ApplyFunction<Throwable, Void>() { - - @Override - public Void apply(Throwable failure) { - slotRequestToResourceManagerFailed(allocationID, failure); - return null; - } - }, getMainThreadExecutor()); + slotRequestProcessingFuture.whenCompleteAsync( + (Void v, Throwable failure) -> { + if (failure != null) { + slotRequestToResourceManagerFailed(allocationID, failure); + } + }, + getMainThreadExecutor()); } private void slotRequestToResourceManagerSuccess(final AllocationID allocationID) { @@ -354,7 +352,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> { private void slotRequestToResourceManagerFailed(AllocationID allocationID, Throwable failure) { PendingRequest request = pendingRequests.remove(allocationID); if (request != null) { - request.future().completeExceptionally(new NoResourceAvailableException( + request.getFuture().completeExceptionally(new NoResourceAvailableException( "No pooled slot available and request to ResourceManager for new slot failed", failure)); } else { if (LOG.isDebugEnabled()) { @@ -365,15 +363,15 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> { private void checkTimeoutSlotAllocation(AllocationID allocationID) { PendingRequest request = pendingRequests.remove(allocationID); - if (request != null && !request.future().isDone()) { - request.future().completeExceptionally(new TimeoutException("Slot allocation request timed out")); + if (request != null && !request.getFuture().isDone()) { + request.getFuture().completeExceptionally(new TimeoutException("Slot allocation request timed out")); } } private void stashRequestWaitingForResourceManager( final AllocationID allocationID, final ResourceProfile resources, - final FlinkCompletableFuture<SimpleSlot> future) { + final CompletableFuture<SimpleSlot> future) { LOG.info("Cannot serve slot request, no ResourceManager connected. " + "Adding as pending request {}", allocationID); @@ -390,8 +388,8 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> { private void checkTimeoutRequestWaitingForResourceManager(AllocationID allocationID) { PendingRequest request = waitingForResourceManager.remove(allocationID); - if (request != null && !request.future().isDone()) { - request.future().completeExceptionally(new NoResourceAvailableException( + if (request != null && !request.getFuture().isDone()) { + request.getFuture().completeExceptionally(new NoResourceAvailableException( "No slot available and no connection to Resource Manager established.")); } } @@ -426,7 +424,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> { SimpleSlot newSlot = createSimpleSlot(taskManagerSlot, Locality.UNKNOWN); allocatedSlots.add(newSlot); - pendingRequest.future().complete(newSlot); + pendingRequest.getFuture().complete(newSlot); } else { LOG.debug("Adding returned slot [{}] to available slots", taskManagerSlot.getSlotAllocationId()); @@ -513,7 +511,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> { if (pendingRequest != null) { // we were waiting for this! SimpleSlot resultSlot = createSimpleSlot(slot, Locality.UNKNOWN); - pendingRequest.future().complete(resultSlot); + pendingRequest.getFuture().complete(resultSlot); allocatedSlots.add(resultSlot); } else { @@ -552,7 +550,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> { if (pendingRequest != null) { // request was still pending LOG.debug("Failed pending request [{}] with ", allocationID, cause); - pendingRequest.future().completeExceptionally(cause); + pendingRequest.getFuture().completeExceptionally(cause); } else if (availableSlots.tryRemove(allocationID)) { LOG.debug("Failed available slot [{}] with ", allocationID, cause); @@ -999,13 +997,13 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> { private final AllocationID allocationID; - private final FlinkCompletableFuture<SimpleSlot> future; + private final CompletableFuture<SimpleSlot> future; private final ResourceProfile resourceProfile; PendingRequest( AllocationID allocationID, - FlinkCompletableFuture<SimpleSlot> future, + CompletableFuture<SimpleSlot> future, ResourceProfile resourceProfile) { this.allocationID = allocationID; this.future = future; @@ -1016,7 +1014,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> { return allocationID; } - public FlinkCompletableFuture<SimpleSlot> future() { + public CompletableFuture<SimpleSlot> getFuture() { return future; }