Repository: flink
Updated Branches:
  refs/heads/master 4378ac3ae -> fcac882d2


[FLINK-7326] [futures] Replace Flink's future with Java 8's CompletableFuture 
in RegisteredRpcConnection

Address PR comments

This closes #4440.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fcac882d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fcac882d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fcac882d

Branch: refs/heads/master
Commit: fcac882d243a5d2f0a5ed3ce54cba0e7263a112a
Parents: 4378ac3
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Mon Jul 31 20:11:30 2017 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Tue Aug 1 14:00:34 2017 +0200

----------------------------------------------------------------------
 .../flink/runtime/jobmaster/JobMaster.java      | 19 +++---
 .../registration/RegisteredRpcConnection.java   | 34 ++++-------
 .../registration/RetryingRegistration.java      | 62 ++++++++------------
 .../runtime/taskexecutor/JobLeaderService.java  |  9 +--
 ...TaskExecutorToResourceManagerConnection.java |  7 ++-
 .../registration/RetryingRegistrationTest.java  | 17 +++---
 6 files changed, 68 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fcac882d/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 947a914..9417f90 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -38,6 +38,7 @@ import 
org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.BiFunction;
 import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
@@ -106,6 +107,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeoutException;
@@ -1044,18 +1046,19 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                                        getTargetAddress(), getTargetLeaderId())
                        {
                                @Override
-                               protected Future<RegistrationResponse> 
invokeRegistration(
+                               protected 
CompletableFuture<RegistrationResponse> invokeRegistration(
                                                ResourceManagerGateway gateway, 
UUID leaderId, long timeoutMillis) throws Exception
                                {
                                        Time timeout = 
Time.milliseconds(timeoutMillis);
 
-                                       return gateway.registerJobManager(
-                                               leaderId,
-                                               jobManagerLeaderID,
-                                               jobManagerResourceID,
-                                               jobManagerRpcAddress,
-                                               jobID,
-                                               timeout);
+                                       return FutureUtils.toJava(
+                                               gateway.registerJobManager(
+                                                       leaderId,
+                                                       jobManagerLeaderID,
+                                                       jobManagerResourceID,
+                                                       jobManagerRpcAddress,
+                                                       jobID,
+                                                       timeout));
                                }
                        };
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/fcac882d/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java
index b477546..da46e1c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java
@@ -19,14 +19,12 @@
 package org.apache.flink.runtime.registration;
 
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.concurrent.AcceptFunction;
-import org.apache.flink.runtime.concurrent.ApplyFunction;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.rpc.RpcGateway;
 
 import org.slf4j.Logger;
 
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -88,24 +86,18 @@ public abstract class RegisteredRpcConnection<Gateway 
extends RpcGateway, Succes
                pendingRegistration = checkNotNull(generateRegistration());
                pendingRegistration.startRegistration();
 
-               Future<Tuple2<Gateway, Success>> future = 
pendingRegistration.getFuture();
-
-               Future<Void> registrationSuccessFuture = 
future.thenAcceptAsync(new AcceptFunction<Tuple2<Gateway, Success>>() {
-                       @Override
-                       public void accept(Tuple2<Gateway, Success> result) {
-                               targetGateway = result.f0;
-                               onRegistrationSuccess(result.f1);
-                       }
-               }, executor);
-
-               // this future should only ever fail if there is a bug, not if 
the registration is declined
-               registrationSuccessFuture.exceptionallyAsync(new 
ApplyFunction<Throwable, Void>() {
-                       @Override
-                       public Void apply(Throwable failure) {
-                               onRegistrationFailure(failure);
-                               return null;
-                       }
-               }, executor);
+               CompletableFuture<Tuple2<Gateway, Success>> future = 
pendingRegistration.getFuture();
+
+               future.whenCompleteAsync(
+                       (Tuple2<Gateway, Success> result, Throwable failure) -> 
{
+                               // this future should only ever fail if there 
is a bug, not if the registration is declined
+                               if (failure != null) {
+                                       onRegistrationFailure(failure);
+                               } else {
+                                       targetGateway = result.f0;
+                                       onRegistrationSuccess(result.f1);
+                               }
+                       }, executor);
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/fcac882d/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
index c5c03bd..1034a89 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
@@ -19,17 +19,14 @@
 package org.apache.flink.runtime.registration;
 
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.concurrent.AcceptFunction;
-import org.apache.flink.runtime.concurrent.ApplyFunction;
-import org.apache.flink.runtime.concurrent.CompletableFuture;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcService;
 
 import org.slf4j.Logger;
 
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -138,14 +135,14 @@ public abstract class RetryingRegistration<Gateway 
extends RpcGateway, Success e
                this.delayOnError = delayOnError;
                this.delayOnRefusedRegistration = delayOnRefusedRegistration;
 
-               this.completionFuture = new FlinkCompletableFuture<>();
+               this.completionFuture = new CompletableFuture<>();
        }
 
        // 
------------------------------------------------------------------------
        //  completion and cancellation
        // 
------------------------------------------------------------------------
 
-       public Future<Tuple2<Gateway, Success>> getFuture() {
+       public CompletableFuture<Tuple2<Gateway, Success>> getFuture() {
                return completionFuture;
        }
 
@@ -168,7 +165,7 @@ public abstract class RetryingRegistration<Gateway extends 
RpcGateway, Success e
        //  registration
        // 
------------------------------------------------------------------------
 
-       protected abstract Future<RegistrationResponse> invokeRegistration(
+       protected abstract CompletableFuture<RegistrationResponse> 
invokeRegistration(
                        Gateway gateway, UUID leaderId, long timeoutMillis) 
throws Exception;
 
        /**
@@ -179,29 +176,26 @@ public abstract class RetryingRegistration<Gateway 
extends RpcGateway, Success e
        public void startRegistration() {
                try {
                        // trigger resolution of the resource manager address 
to a callable gateway
-                       Future<Gateway> resourceManagerFuture = 
rpcService.connect(targetAddress, targetType);
+                       CompletableFuture<Gateway> resourceManagerFuture = 
FutureUtils.toJava(
+                               rpcService.connect(targetAddress, targetType));
 
                        // upon success, start the registration attempts
-                       Future<Void> resourceManagerAcceptFuture = 
resourceManagerFuture.thenAcceptAsync(new AcceptFunction<Gateway>() {
-                               @Override
-                               public void accept(Gateway result) {
+                       CompletableFuture<Void> resourceManagerAcceptFuture = 
resourceManagerFuture.thenAcceptAsync(
+                               (Gateway result) -> {
                                        log.info("Resolved {} address, 
beginning registration", targetName);
                                        register(result, 1, 
initialRegistrationTimeout);
-                               }
-                       }, rpcService.getExecutor());
+                               },
+                               rpcService.getExecutor());
 
                        // upon failure, retry, unless this is cancelled
-                       resourceManagerAcceptFuture.exceptionallyAsync(new 
ApplyFunction<Throwable, Void>() {
-                               @Override
-                               public Void apply(Throwable failure) {
-                                       if (!isCanceled()) {
+                       resourceManagerAcceptFuture.whenCompleteAsync(
+                               (Void v, Throwable failure) -> {
+                                       if (failure != null && !isCanceled()) {
                                                log.warn("Could not resolve {} 
address {}, retrying...", targetName, targetAddress, failure);
                                                startRegistration();
                                        }
-
-                                       return null;
-                               }
-                       }, rpcService.getExecutor());
+                               },
+                               rpcService.getExecutor());
                }
                catch (Throwable t) {
                        cancel();
@@ -222,12 +216,11 @@ public abstract class RetryingRegistration<Gateway 
extends RpcGateway, Success e
 
                try {
                        log.info("Registration at {} attempt {} 
(timeout={}ms)", targetName, attempt, timeoutMillis);
-                       Future<RegistrationResponse> registrationFuture = 
invokeRegistration(gateway, leaderId, timeoutMillis);
+                       CompletableFuture<RegistrationResponse> 
registrationFuture = invokeRegistration(gateway, leaderId, timeoutMillis);
 
                        // if the registration was successful, let the 
TaskExecutor know
-                       Future<Void> registrationAcceptFuture = 
registrationFuture.thenAcceptAsync(new AcceptFunction<RegistrationResponse>() {
-                               @Override
-                               public void accept(RegistrationResponse result) 
{
+                       CompletableFuture<Void> registrationAcceptFuture = 
registrationFuture.thenAcceptAsync(
+                               (RegistrationResponse result) -> {
                                        if (!isCanceled()) {
                                                if (result instanceof 
RegistrationResponse.Success) {
                                                        // registration 
successful!
@@ -247,14 +240,13 @@ public abstract class RetryingRegistration<Gateway 
extends RpcGateway, Success e
                                                        registerLater(gateway, 
1, initialRegistrationTimeout, delayOnRefusedRegistration);
                                                }
                                        }
-                               }
-                       }, rpcService.getExecutor());
+                               },
+                               rpcService.getExecutor());
 
                        // upon failure, retry
-                       registrationAcceptFuture.exceptionallyAsync(new 
ApplyFunction<Throwable, Void>() {
-                               @Override
-                               public Void apply(Throwable failure) {
-                                       if (!isCanceled()) {
+                       registrationAcceptFuture.whenCompleteAsync(
+                               (Void v, Throwable failure) -> {
+                                       if (failure != null && !isCanceled()) {
                                                if (failure instanceof 
TimeoutException) {
                                                        // we simply have not 
received a response in time. maybe the timeout was
                                                        // very low (initial 
fast registration attempts), maybe the target endpoint is
@@ -275,10 +267,8 @@ public abstract class RetryingRegistration<Gateway extends 
RpcGateway, Success e
                                                        registerLater(gateway, 
1, initialRegistrationTimeout, delayOnError);
                                                }
                                        }
-
-                                       return null;
-                               }
-                       }, rpcService.getExecutor());
+                               },
+                               rpcService.getExecutor());
                }
                catch (Throwable t) {
                        cancel();

http://git-wip-us.apache.org/repos/asf/flink/blob/fcac882d/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
index 2dd7964..71933fe 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.taskexecutor;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
@@ -39,6 +39,7 @@ import org.slf4j.LoggerFactory;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 
 /**
@@ -375,11 +376,11 @@ public class JobLeaderService {
                }
 
                @Override
-               protected Future<RegistrationResponse> invokeRegistration(
+               protected CompletableFuture<RegistrationResponse> 
invokeRegistration(
                                JobMasterGateway gateway, UUID leaderId, long 
timeoutMillis) throws Exception
                {
-                       return 
gateway.registerTaskManager(taskManagerRpcAddress, taskManagerLocation,
-                                       leaderId, 
Time.milliseconds(timeoutMillis));
+                       return 
FutureUtils.toJava(gateway.registerTaskManager(taskManagerRpcAddress, 
taskManagerLocation,
+                                       leaderId, 
Time.milliseconds(timeoutMillis)));
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fcac882d/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
index 775482c..4f91166 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.taskexecutor;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.registration.RegisteredRpcConnection;
 import org.apache.flink.runtime.registration.RegistrationConnectionListener;
@@ -27,12 +28,12 @@ import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.registration.RetryingRegistration;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
-import org.apache.flink.runtime.concurrent.Future;
 
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -151,11 +152,11 @@ public class TaskExecutorToResourceManagerConnection
                }
 
                @Override
-               protected Future<RegistrationResponse> invokeRegistration(
+               protected CompletableFuture<RegistrationResponse> 
invokeRegistration(
                                ResourceManagerGateway resourceManager, UUID 
leaderId, long timeoutMillis) throws Exception {
 
                        Time timeout = Time.milliseconds(timeoutMillis);
-                       return resourceManager.registerTaskExecutor(leaderId, 
taskExecutorAddress, resourceID, slotReport, timeout);
+                       return 
FutureUtils.toJava(resourceManager.registerTaskExecutor(leaderId, 
taskExecutorAddress, resourceID, slotReport, timeout));
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fcac882d/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
index 0c2134f..9a4917a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.registration;
 
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.TestingRpcService;
@@ -29,6 +29,7 @@ import org.junit.Test;
 import org.slf4j.LoggerFactory;
 
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -68,7 +69,7 @@ public class RetryingRegistrationTest extends TestLogger {
                        TestRetryingRegistration registration = new 
TestRetryingRegistration(rpc, testEndpointAddress, leaderId);
                        registration.startRegistration();
 
-                       Future<Tuple2<TestRegistrationGateway, 
TestRegistrationSuccess>> future = registration.getFuture();
+                       CompletableFuture<Tuple2<TestRegistrationGateway, 
TestRegistrationSuccess>> future = registration.getFuture();
                        assertNotNull(future);
 
                        // multiple accesses return the same future
@@ -98,7 +99,7 @@ public class RetryingRegistrationTest extends TestLogger {
                TestRetryingRegistration registration = new 
TestRetryingRegistration(rpc, "testaddress", UUID.randomUUID());
                registration.startRegistration();
 
-               Future<?> future = registration.getFuture();
+               CompletableFuture<?> future = registration.getFuture();
                assertTrue(future.isDone());
 
                try {
@@ -166,7 +167,7 @@ public class RetryingRegistrationTest extends TestLogger {
                        long started = System.nanoTime();
                        registration.startRegistration();
 
-                       Future<Tuple2<TestRegistrationGateway, 
TestRegistrationSuccess>> future = registration.getFuture();
+                       CompletableFuture<Tuple2<TestRegistrationGateway, 
TestRegistrationSuccess>> future = registration.getFuture();
                        Tuple2<TestRegistrationGateway, 
TestRegistrationSuccess> success =
                                        future.get(10L, TimeUnit.SECONDS);
 
@@ -209,7 +210,7 @@ public class RetryingRegistrationTest extends TestLogger {
                        long started = System.nanoTime();
                        registration.startRegistration();
 
-                       Future<Tuple2<TestRegistrationGateway, 
TestRegistrationSuccess>> future = registration.getFuture();
+                       CompletableFuture<Tuple2<TestRegistrationGateway, 
TestRegistrationSuccess>> future = registration.getFuture();
                        Tuple2<TestRegistrationGateway, 
TestRegistrationSuccess> success =
                                        future.get(10L, TimeUnit.SECONDS);
 
@@ -254,7 +255,7 @@ public class RetryingRegistrationTest extends TestLogger {
                        long started = System.nanoTime();
                        registration.startRegistration();
 
-                       Future<Tuple2<TestRegistrationGateway, 
TestRegistrationSuccess>> future = registration.getFuture();
+                       CompletableFuture<Tuple2<TestRegistrationGateway, 
TestRegistrationSuccess>> future = registration.getFuture();
                        Tuple2<TestRegistrationGateway, 
TestRegistrationSuccess> success =
                                        future.get(10, TimeUnit.SECONDS);
 
@@ -337,9 +338,9 @@ public class RetryingRegistrationTest extends TestLogger {
                }
 
                @Override
-               protected Future<RegistrationResponse> invokeRegistration(
+               protected CompletableFuture<RegistrationResponse> 
invokeRegistration(
                                TestRegistrationGateway gateway, UUID leaderId, 
long timeoutMillis) {
-                       return gateway.registrationCall(leaderId, 
timeoutMillis);
+                       return 
FutureUtils.toJava(gateway.registrationCall(leaderId, timeoutMillis));
                }
        }
 }

Reply via email to