Repository: flink Updated Branches: refs/heads/master 7c639c600 -> 4a9f19b9f
[FLINK-7319] [futures] Replace Flink's Futures with Java 8 CompletableFuture in MesosResourceManager This closes #4432. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4a9f19b9 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4a9f19b9 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4a9f19b9 Branch: refs/heads/master Commit: 4a9f19b9faa87de52ac1f078e53f26fd32efadad Parents: 7c639c6 Author: Till Rohrmann <trohrm...@apache.org> Authored: Mon Jul 31 18:06:20 2017 +0200 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Tue Aug 1 13:45:15 2017 +0200 ---------------------------------------------------------------------- .../clusterframework/MesosResourceManager.java | 44 +++++++++----------- 1 file changed, 19 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/4a9f19b9/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java index 260d5bf..736af59 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java @@ -45,10 +45,7 @@ import org.apache.flink.runtime.clusterframework.ContainerSpecification; import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; -import org.apache.flink.runtime.concurrent.ApplyFunction; -import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.concurrent.FutureUtils; -import org.apache.flink.runtime.concurrent.impl.FlinkFuture; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.instance.InstanceID; @@ -79,11 +76,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import scala.Option; @@ -324,24 +321,23 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN Exception exception = null; FiniteDuration stopTimeout = new FiniteDuration(5L, TimeUnit.SECONDS); - Future<Boolean> stopTaskMonitorFuture = stopActor(taskMonitor, stopTimeout); + CompletableFuture<Boolean> stopTaskMonitorFuture = stopActor(taskMonitor, stopTimeout); taskMonitor = null; - Future<Boolean> stopConnectionMonitorFuture = stopActor(connectionMonitor, stopTimeout); + CompletableFuture<Boolean> stopConnectionMonitorFuture = stopActor(connectionMonitor, stopTimeout); connectionMonitor = null; - Future<Boolean> stopLaunchCoordinatorFuture = stopActor(launchCoordinator, stopTimeout); + CompletableFuture<Boolean> stopLaunchCoordinatorFuture = stopActor(launchCoordinator, stopTimeout); launchCoordinator = null; - Future<Boolean> stopReconciliationCoordinatorFuture = stopActor(reconciliationCoordinator, stopTimeout); + CompletableFuture<Boolean> stopReconciliationCoordinatorFuture = stopActor(reconciliationCoordinator, stopTimeout); reconciliationCoordinator = null; - Future<Void> stopFuture = FutureUtils.waitForAll( - Arrays.asList( - stopTaskMonitorFuture, - stopConnectionMonitorFuture, - stopLaunchCoordinatorFuture, - stopReconciliationCoordinatorFuture)); + CompletableFuture<Void> stopFuture = CompletableFuture.allOf( + stopTaskMonitorFuture, + stopConnectionMonitorFuture, + stopLaunchCoordinatorFuture, + stopReconciliationCoordinatorFuture); // wait for the future to complete or to time out try { @@ -606,20 +602,18 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN * @param timeout for the graceful shut down * @return Future containing the result of the graceful shut down */ - private Future<Boolean> stopActor(final ActorRef actorRef, FiniteDuration timeout) { - return new FlinkFuture<>(Patterns.gracefulStop(actorRef, timeout)) + private CompletableFuture<Boolean> stopActor(final ActorRef actorRef, FiniteDuration timeout) { + return FutureUtils.toJava(Patterns.gracefulStop(actorRef, timeout)) .exceptionally( - new ApplyFunction<Throwable, Boolean>() { - @Override - public Boolean apply(Throwable throwable) { - // The actor did not stop gracefully in time, try to directly stop it - actorSystem.stop(actorRef); + (Throwable throwable) -> { + // The actor did not stop gracefully in time, try to directly stop it + actorSystem.stop(actorRef); - log.warn("Could not stop actor {} gracefully.", actorRef.path(), throwable); + log.warn("Could not stop actor {} gracefully.", actorRef.path(), throwable); - return true; - } - }); + return true; + } + ); } /**