Repository: flink
Updated Branches:
  refs/heads/master 7c639c600 -> 4a9f19b9f


[FLINK-7319] [futures] Replace Flink's Futures with Java 8 CompletableFuture in 
MesosResourceManager

This closes #4432.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4a9f19b9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4a9f19b9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4a9f19b9

Branch: refs/heads/master
Commit: 4a9f19b9faa87de52ac1f078e53f26fd32efadad
Parents: 7c639c6
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Mon Jul 31 18:06:20 2017 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Tue Aug 1 13:45:15 2017 +0200

----------------------------------------------------------------------
 .../clusterframework/MesosResourceManager.java  | 44 +++++++++-----------
 1 file changed, 19 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4a9f19b9/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
index 260d5bf..736af59 100644
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
@@ -45,10 +45,7 @@ import 
org.apache.flink.runtime.clusterframework.ContainerSpecification;
 import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.concurrent.ApplyFunction;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.instance.InstanceID;
@@ -79,11 +76,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
 import scala.Option;
@@ -324,24 +321,23 @@ public class MesosResourceManager extends 
ResourceManager<RegisteredMesosWorkerN
                Exception exception = null;
                FiniteDuration stopTimeout = new FiniteDuration(5L, 
TimeUnit.SECONDS);
 
-               Future<Boolean> stopTaskMonitorFuture = stopActor(taskMonitor, 
stopTimeout);
+               CompletableFuture<Boolean> stopTaskMonitorFuture = 
stopActor(taskMonitor, stopTimeout);
                taskMonitor = null;
 
-               Future<Boolean> stopConnectionMonitorFuture = 
stopActor(connectionMonitor, stopTimeout);
+               CompletableFuture<Boolean> stopConnectionMonitorFuture = 
stopActor(connectionMonitor, stopTimeout);
                connectionMonitor = null;
 
-               Future<Boolean> stopLaunchCoordinatorFuture = 
stopActor(launchCoordinator, stopTimeout);
+               CompletableFuture<Boolean> stopLaunchCoordinatorFuture = 
stopActor(launchCoordinator, stopTimeout);
                launchCoordinator = null;
 
-               Future<Boolean> stopReconciliationCoordinatorFuture = 
stopActor(reconciliationCoordinator, stopTimeout);
+               CompletableFuture<Boolean> stopReconciliationCoordinatorFuture 
= stopActor(reconciliationCoordinator, stopTimeout);
                reconciliationCoordinator = null;
 
-               Future<Void> stopFuture = FutureUtils.waitForAll(
-                       Arrays.asList(
-                               stopTaskMonitorFuture,
-                               stopConnectionMonitorFuture,
-                               stopLaunchCoordinatorFuture,
-                               stopReconciliationCoordinatorFuture));
+               CompletableFuture<Void> stopFuture = CompletableFuture.allOf(
+                       stopTaskMonitorFuture,
+                       stopConnectionMonitorFuture,
+                       stopLaunchCoordinatorFuture,
+                       stopReconciliationCoordinatorFuture);
 
                // wait for the future to complete or to time out
                try {
@@ -606,20 +602,18 @@ public class MesosResourceManager extends 
ResourceManager<RegisteredMesosWorkerN
         * @param timeout for the graceful shut down
         * @return Future containing the result of the graceful shut down
         */
-       private Future<Boolean> stopActor(final ActorRef actorRef, 
FiniteDuration timeout) {
-               return new FlinkFuture<>(Patterns.gracefulStop(actorRef, 
timeout))
+       private CompletableFuture<Boolean> stopActor(final ActorRef actorRef, 
FiniteDuration timeout) {
+               return FutureUtils.toJava(Patterns.gracefulStop(actorRef, 
timeout))
                        .exceptionally(
-                               new ApplyFunction<Throwable, Boolean>() {
-                                       @Override
-                                       public Boolean apply(Throwable 
throwable) {
-                                               // The actor did not stop 
gracefully in time, try to directly stop it
-                                               actorSystem.stop(actorRef);
+                               (Throwable throwable) -> {
+                                       // The actor did not stop gracefully in 
time, try to directly stop it
+                                       actorSystem.stop(actorRef);
 
-                                               log.warn("Could not stop actor 
{} gracefully.", actorRef.path(), throwable);
+                                       log.warn("Could not stop actor {} 
gracefully.", actorRef.path(), throwable);
 
-                                               return true;
-                                       }
-                               });
+                                       return true;
+                               }
+                       );
        }
 
        /**

Reply via email to