Repository: flink Updated Branches: refs/heads/master 4378ac3ae -> fcac882d2
[FLINK-7326] [futures] Replace Flink's future with Java 8's CompletableFuture in RegisteredRpcConnection Address PR comments This closes #4440. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fcac882d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fcac882d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fcac882d Branch: refs/heads/master Commit: fcac882d243a5d2f0a5ed3ce54cba0e7263a112a Parents: 4378ac3 Author: Till Rohrmann <trohrm...@apache.org> Authored: Mon Jul 31 20:11:30 2017 +0200 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Tue Aug 1 14:00:34 2017 +0200 ---------------------------------------------------------------------- .../flink/runtime/jobmaster/JobMaster.java | 19 +++--- .../registration/RegisteredRpcConnection.java | 34 ++++------- .../registration/RetryingRegistration.java | 62 ++++++++------------ .../runtime/taskexecutor/JobLeaderService.java | 9 +-- ...TaskExecutorToResourceManagerConnection.java | 7 ++- .../registration/RetryingRegistrationTest.java | 17 +++--- 6 files changed, 68 insertions(+), 80 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/fcac882d/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 947a914..9417f90 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -38,6 +38,7 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.BiFunction; import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; @@ -106,6 +107,7 @@ import java.util.HashMap; import java.util.Map; import java.util.UUID; import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeoutException; @@ -1044,18 +1046,19 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { getTargetAddress(), getTargetLeaderId()) { @Override - protected Future<RegistrationResponse> invokeRegistration( + protected CompletableFuture<RegistrationResponse> invokeRegistration( ResourceManagerGateway gateway, UUID leaderId, long timeoutMillis) throws Exception { Time timeout = Time.milliseconds(timeoutMillis); - return gateway.registerJobManager( - leaderId, - jobManagerLeaderID, - jobManagerResourceID, - jobManagerRpcAddress, - jobID, - timeout); + return FutureUtils.toJava( + gateway.registerJobManager( + leaderId, + jobManagerLeaderID, + jobManagerResourceID, + jobManagerRpcAddress, + jobID, + timeout)); } }; } http://git-wip-us.apache.org/repos/asf/flink/blob/fcac882d/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java index b477546..da46e1c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java @@ -19,14 +19,12 @@ package org.apache.flink.runtime.registration; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.runtime.concurrent.AcceptFunction; -import org.apache.flink.runtime.concurrent.ApplyFunction; -import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.rpc.RpcGateway; import org.slf4j.Logger; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -88,24 +86,18 @@ public abstract class RegisteredRpcConnection<Gateway extends RpcGateway, Succes pendingRegistration = checkNotNull(generateRegistration()); pendingRegistration.startRegistration(); - Future<Tuple2<Gateway, Success>> future = pendingRegistration.getFuture(); - - Future<Void> registrationSuccessFuture = future.thenAcceptAsync(new AcceptFunction<Tuple2<Gateway, Success>>() { - @Override - public void accept(Tuple2<Gateway, Success> result) { - targetGateway = result.f0; - onRegistrationSuccess(result.f1); - } - }, executor); - - // this future should only ever fail if there is a bug, not if the registration is declined - registrationSuccessFuture.exceptionallyAsync(new ApplyFunction<Throwable, Void>() { - @Override - public Void apply(Throwable failure) { - onRegistrationFailure(failure); - return null; - } - }, executor); + CompletableFuture<Tuple2<Gateway, Success>> future = pendingRegistration.getFuture(); + + future.whenCompleteAsync( + (Tuple2<Gateway, Success> result, Throwable failure) -> { + // this future should only ever fail if there is a bug, not if the registration is declined + if (failure != null) { + onRegistrationFailure(failure); + } else { + targetGateway = result.f0; + onRegistrationSuccess(result.f1); + } + }, executor); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/fcac882d/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java index c5c03bd..1034a89 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java @@ -19,17 +19,14 @@ package org.apache.flink.runtime.registration; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.runtime.concurrent.AcceptFunction; -import org.apache.flink.runtime.concurrent.ApplyFunction; -import org.apache.flink.runtime.concurrent.CompletableFuture; -import org.apache.flink.runtime.concurrent.Future; -import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; +import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.rpc.RpcService; import org.slf4j.Logger; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -138,14 +135,14 @@ public abstract class RetryingRegistration<Gateway extends RpcGateway, Success e this.delayOnError = delayOnError; this.delayOnRefusedRegistration = delayOnRefusedRegistration; - this.completionFuture = new FlinkCompletableFuture<>(); + this.completionFuture = new CompletableFuture<>(); } // ------------------------------------------------------------------------ // completion and cancellation // ------------------------------------------------------------------------ - public Future<Tuple2<Gateway, Success>> getFuture() { + public CompletableFuture<Tuple2<Gateway, Success>> getFuture() { return completionFuture; } @@ -168,7 +165,7 @@ public abstract class RetryingRegistration<Gateway extends RpcGateway, Success e // registration // ------------------------------------------------------------------------ - protected abstract Future<RegistrationResponse> invokeRegistration( + protected abstract CompletableFuture<RegistrationResponse> invokeRegistration( Gateway gateway, UUID leaderId, long timeoutMillis) throws Exception; /** @@ -179,29 +176,26 @@ public abstract class RetryingRegistration<Gateway extends RpcGateway, Success e public void startRegistration() { try { // trigger resolution of the resource manager address to a callable gateway - Future<Gateway> resourceManagerFuture = rpcService.connect(targetAddress, targetType); + CompletableFuture<Gateway> resourceManagerFuture = FutureUtils.toJava( + rpcService.connect(targetAddress, targetType)); // upon success, start the registration attempts - Future<Void> resourceManagerAcceptFuture = resourceManagerFuture.thenAcceptAsync(new AcceptFunction<Gateway>() { - @Override - public void accept(Gateway result) { + CompletableFuture<Void> resourceManagerAcceptFuture = resourceManagerFuture.thenAcceptAsync( + (Gateway result) -> { log.info("Resolved {} address, beginning registration", targetName); register(result, 1, initialRegistrationTimeout); - } - }, rpcService.getExecutor()); + }, + rpcService.getExecutor()); // upon failure, retry, unless this is cancelled - resourceManagerAcceptFuture.exceptionallyAsync(new ApplyFunction<Throwable, Void>() { - @Override - public Void apply(Throwable failure) { - if (!isCanceled()) { + resourceManagerAcceptFuture.whenCompleteAsync( + (Void v, Throwable failure) -> { + if (failure != null && !isCanceled()) { log.warn("Could not resolve {} address {}, retrying...", targetName, targetAddress, failure); startRegistration(); } - - return null; - } - }, rpcService.getExecutor()); + }, + rpcService.getExecutor()); } catch (Throwable t) { cancel(); @@ -222,12 +216,11 @@ public abstract class RetryingRegistration<Gateway extends RpcGateway, Success e try { log.info("Registration at {} attempt {} (timeout={}ms)", targetName, attempt, timeoutMillis); - Future<RegistrationResponse> registrationFuture = invokeRegistration(gateway, leaderId, timeoutMillis); + CompletableFuture<RegistrationResponse> registrationFuture = invokeRegistration(gateway, leaderId, timeoutMillis); // if the registration was successful, let the TaskExecutor know - Future<Void> registrationAcceptFuture = registrationFuture.thenAcceptAsync(new AcceptFunction<RegistrationResponse>() { - @Override - public void accept(RegistrationResponse result) { + CompletableFuture<Void> registrationAcceptFuture = registrationFuture.thenAcceptAsync( + (RegistrationResponse result) -> { if (!isCanceled()) { if (result instanceof RegistrationResponse.Success) { // registration successful! @@ -247,14 +240,13 @@ public abstract class RetryingRegistration<Gateway extends RpcGateway, Success e registerLater(gateway, 1, initialRegistrationTimeout, delayOnRefusedRegistration); } } - } - }, rpcService.getExecutor()); + }, + rpcService.getExecutor()); // upon failure, retry - registrationAcceptFuture.exceptionallyAsync(new ApplyFunction<Throwable, Void>() { - @Override - public Void apply(Throwable failure) { - if (!isCanceled()) { + registrationAcceptFuture.whenCompleteAsync( + (Void v, Throwable failure) -> { + if (failure != null && !isCanceled()) { if (failure instanceof TimeoutException) { // we simply have not received a response in time. maybe the timeout was // very low (initial fast registration attempts), maybe the target endpoint is @@ -275,10 +267,8 @@ public abstract class RetryingRegistration<Gateway extends RpcGateway, Success e registerLater(gateway, 1, initialRegistrationTimeout, delayOnError); } } - - return null; - } - }, rpcService.getExecutor()); + }, + rpcService.getExecutor()); } catch (Throwable t) { cancel(); http://git-wip-us.apache.org/repos/asf/flink/blob/fcac882d/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java index 2dd7964..71933fe 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java @@ -21,7 +21,7 @@ package org.apache.flink.runtime.taskexecutor; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess; import org.apache.flink.runtime.jobmaster.JobMasterGateway; @@ -39,6 +39,7 @@ import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.Map; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; /** @@ -375,11 +376,11 @@ public class JobLeaderService { } @Override - protected Future<RegistrationResponse> invokeRegistration( + protected CompletableFuture<RegistrationResponse> invokeRegistration( JobMasterGateway gateway, UUID leaderId, long timeoutMillis) throws Exception { - return gateway.registerTaskManager(taskManagerRpcAddress, taskManagerLocation, - leaderId, Time.milliseconds(timeoutMillis)); + return FutureUtils.toJava(gateway.registerTaskManager(taskManagerRpcAddress, taskManagerLocation, + leaderId, Time.milliseconds(timeoutMillis))); } } http://git-wip-us.apache.org/repos/asf/flink/blob/fcac882d/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java index 775482c..4f91166 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.taskexecutor; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.registration.RegisteredRpcConnection; import org.apache.flink.runtime.registration.RegistrationConnectionListener; @@ -27,12 +28,12 @@ import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.registration.RegistrationResponse; import org.apache.flink.runtime.registration.RetryingRegistration; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; -import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -151,11 +152,11 @@ public class TaskExecutorToResourceManagerConnection } @Override - protected Future<RegistrationResponse> invokeRegistration( + protected CompletableFuture<RegistrationResponse> invokeRegistration( ResourceManagerGateway resourceManager, UUID leaderId, long timeoutMillis) throws Exception { Time timeout = Time.milliseconds(timeoutMillis); - return resourceManager.registerTaskExecutor(leaderId, taskExecutorAddress, resourceID, slotReport, timeout); + return FutureUtils.toJava(resourceManager.registerTaskExecutor(leaderId, taskExecutorAddress, resourceID, slotReport, timeout)); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/fcac882d/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java index 0c2134f..9a4917a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java @@ -19,7 +19,7 @@ package org.apache.flink.runtime.registration; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.TestingRpcService; @@ -29,6 +29,7 @@ import org.junit.Test; import org.slf4j.LoggerFactory; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -68,7 +69,7 @@ public class RetryingRegistrationTest extends TestLogger { TestRetryingRegistration registration = new TestRetryingRegistration(rpc, testEndpointAddress, leaderId); registration.startRegistration(); - Future<Tuple2<TestRegistrationGateway, TestRegistrationSuccess>> future = registration.getFuture(); + CompletableFuture<Tuple2<TestRegistrationGateway, TestRegistrationSuccess>> future = registration.getFuture(); assertNotNull(future); // multiple accesses return the same future @@ -98,7 +99,7 @@ public class RetryingRegistrationTest extends TestLogger { TestRetryingRegistration registration = new TestRetryingRegistration(rpc, "testaddress", UUID.randomUUID()); registration.startRegistration(); - Future<?> future = registration.getFuture(); + CompletableFuture<?> future = registration.getFuture(); assertTrue(future.isDone()); try { @@ -166,7 +167,7 @@ public class RetryingRegistrationTest extends TestLogger { long started = System.nanoTime(); registration.startRegistration(); - Future<Tuple2<TestRegistrationGateway, TestRegistrationSuccess>> future = registration.getFuture(); + CompletableFuture<Tuple2<TestRegistrationGateway, TestRegistrationSuccess>> future = registration.getFuture(); Tuple2<TestRegistrationGateway, TestRegistrationSuccess> success = future.get(10L, TimeUnit.SECONDS); @@ -209,7 +210,7 @@ public class RetryingRegistrationTest extends TestLogger { long started = System.nanoTime(); registration.startRegistration(); - Future<Tuple2<TestRegistrationGateway, TestRegistrationSuccess>> future = registration.getFuture(); + CompletableFuture<Tuple2<TestRegistrationGateway, TestRegistrationSuccess>> future = registration.getFuture(); Tuple2<TestRegistrationGateway, TestRegistrationSuccess> success = future.get(10L, TimeUnit.SECONDS); @@ -254,7 +255,7 @@ public class RetryingRegistrationTest extends TestLogger { long started = System.nanoTime(); registration.startRegistration(); - Future<Tuple2<TestRegistrationGateway, TestRegistrationSuccess>> future = registration.getFuture(); + CompletableFuture<Tuple2<TestRegistrationGateway, TestRegistrationSuccess>> future = registration.getFuture(); Tuple2<TestRegistrationGateway, TestRegistrationSuccess> success = future.get(10, TimeUnit.SECONDS); @@ -337,9 +338,9 @@ public class RetryingRegistrationTest extends TestLogger { } @Override - protected Future<RegistrationResponse> invokeRegistration( + protected CompletableFuture<RegistrationResponse> invokeRegistration( TestRegistrationGateway gateway, UUID leaderId, long timeoutMillis) { - return gateway.registrationCall(leaderId, timeoutMillis); + return FutureUtils.toJava(gateway.registrationCall(leaderId, timeoutMillis)); } } }