Repository: flink Updated Branches: refs/heads/master cab490f89 -> 3b97784ae
[FLINK-7321] [futures] Replace Flink's futures with Java 8's CompletableFuture in HeartbeatManager Address PR comments This closes #4434. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3b97784a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3b97784a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3b97784a Branch: refs/heads/master Commit: 3b97784aef34a6598d9947b259f73f935dc90c9f Parents: cab490f Author: Till Rohrmann <trohrm...@apache.org> Authored: Mon Jul 31 18:47:22 2017 +0200 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Tue Aug 1 22:43:36 2017 +0200 ---------------------------------------------------------------------- .../runtime/heartbeat/HeartbeatListener.java | 5 ++-- .../runtime/heartbeat/HeartbeatManagerImpl.java | 24 +++++++------------- .../heartbeat/HeartbeatManagerSenderImpl.java | 23 +++++++------------ .../flink/runtime/jobmaster/JobMaster.java | 8 +++---- .../resourcemanager/ResourceManager.java | 9 ++++---- .../runtime/taskexecutor/TaskExecutor.java | 21 ++++++++--------- .../runtime/heartbeat/HeartbeatManagerTest.java | 24 +++++++++----------- 7 files changed, 48 insertions(+), 66 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/3b97784a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatListener.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatListener.java index c6307aa..734eb4c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatListener.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatListener.java @@ -19,7 +19,8 @@ package org.apache.flink.runtime.heartbeat; import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.concurrent.Future; + +import java.util.concurrent.CompletableFuture; /** * Interface for the interaction with the {@link HeartbeatManager}. The heartbeat listener is used @@ -58,5 +59,5 @@ public interface HeartbeatListener<I, O> { * * @return Future containing the next payload for heartbeats */ - Future<O> retrievePayload(); + CompletableFuture<O> retrievePayload(); } http://git-wip-us.apache.org/repos/asf/flink/blob/3b97784a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java index d97cfa0..99f44f9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java @@ -19,9 +19,6 @@ package org.apache.flink.runtime.heartbeat; import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.concurrent.AcceptFunction; -import org.apache.flink.runtime.concurrent.ApplyFunction; -import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.concurrent.ScheduledExecutor; import org.apache.flink.util.Preconditions; @@ -30,6 +27,7 @@ import org.slf4j.Logger; import javax.annotation.concurrent.ThreadSafe; import java.util.Collection; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledFuture; @@ -193,24 +191,18 @@ public class HeartbeatManagerImpl<I, O> implements HeartbeatManager<I, O> { heartbeatListener.reportPayload(requestOrigin, heartbeatPayload); } - Future<O> futurePayload = heartbeatListener.retrievePayload(); + CompletableFuture<O> futurePayload = heartbeatListener.retrievePayload(); if (futurePayload != null) { - Future<Void> sendHeartbeatFuture = futurePayload.thenAcceptAsync(new AcceptFunction<O>() { - @Override - public void accept(O retrievedPayload) { - heartbeatTarget.receiveHeartbeat(getOwnResourceID(), retrievedPayload); - } - }, executor); - - sendHeartbeatFuture.exceptionally(new ApplyFunction<Throwable, Void>() { - @Override - public Void apply(Throwable failure) { + CompletableFuture<Void> sendHeartbeatFuture = futurePayload.thenAcceptAsync( + retrievedPayload -> heartbeatTarget.receiveHeartbeat(getOwnResourceID(), retrievedPayload), + executor); + + sendHeartbeatFuture.exceptionally((Throwable failure) -> { log.warn("Could not send heartbeat to target with id {}.", requestOrigin, failure); return null; - } - }); + }); } else { heartbeatTarget.receiveHeartbeat(ownResourceID, null); } http://git-wip-us.apache.org/repos/asf/flink/blob/3b97784a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java index 5b3a957..eb82343 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java @@ -19,13 +19,11 @@ package org.apache.flink.runtime.heartbeat; import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.concurrent.AcceptFunction; -import org.apache.flink.runtime.concurrent.ApplyFunction; -import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.concurrent.ScheduledExecutor; import org.slf4j.Logger; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -65,25 +63,20 @@ public class HeartbeatManagerSenderImpl<I, O> extends HeartbeatManagerImpl<I, O> if (!stopped) { log.debug("Trigger heartbeat request."); for (HeartbeatMonitor<O> heartbeatMonitor : getHeartbeatTargets()) { - Future<O> futurePayload = getHeartbeatListener().retrievePayload(); + CompletableFuture<O> futurePayload = getHeartbeatListener().retrievePayload(); final HeartbeatTarget<O> heartbeatTarget = heartbeatMonitor.getHeartbeatTarget(); if (futurePayload != null) { - Future<Void> requestHeartbeatFuture = futurePayload.thenAcceptAsync(new AcceptFunction<O>() { - @Override - public void accept(O payload) { - heartbeatTarget.requestHeartbeat(getOwnResourceID(), payload); - } - }, getExecutor()); + CompletableFuture<Void> requestHeartbeatFuture = futurePayload.thenAcceptAsync( + payload -> heartbeatTarget.requestHeartbeat(getOwnResourceID(), payload), + getExecutor()); - requestHeartbeatFuture.exceptionally(new ApplyFunction<Throwable, Void>() { - @Override - public Void apply(Throwable failure) { + requestHeartbeatFuture.exceptionally( + (Throwable failure) -> { log.warn("Could not request the heartbeat from target {}.", heartbeatTarget, failure); return null; - } - }); + }); } else { heartbeatTarget.requestHeartbeat(getOwnResourceID(), null); } http://git-wip-us.apache.org/repos/asf/flink/blob/3b97784a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 9417f90..7922baa 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -1126,8 +1126,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { } @Override - public Future<Void> retrievePayload() { - return FlinkCompletableFuture.completed(null); + public CompletableFuture<Void> retrievePayload() { + return CompletableFuture.completedFuture(null); } } @@ -1153,8 +1153,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { } @Override - public Future<Void> retrievePayload() { - return FlinkCompletableFuture.completed(null); + public CompletableFuture<Void> retrievePayload() { + return CompletableFuture.completedFuture(null); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/3b97784a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index 0dfbbcd..438ec65 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -67,6 +67,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Objects; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeoutException; @@ -999,8 +1000,8 @@ public abstract class ResourceManager<WorkerType extends Serializable> } @Override - public Future<Void> retrievePayload() { - return FlinkCompletableFuture.completed(null); + public CompletableFuture<Void> retrievePayload() { + return CompletableFuture.completedFuture(null); } } @@ -1032,8 +1033,8 @@ public abstract class ResourceManager<WorkerType extends Serializable> } @Override - public Future<Void> retrievePayload() { - return FlinkCompletableFuture.completed(null); + public CompletableFuture<Void> retrievePayload() { + return CompletableFuture.completedFuture(null); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/3b97784a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index cdec08e..4c4b0a7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -30,7 +30,7 @@ import org.apache.flink.runtime.clusterframework.types.SlotID; import org.apache.flink.runtime.concurrent.AcceptFunction; import org.apache.flink.runtime.concurrent.ApplyFunction; import org.apache.flink.runtime.concurrent.Future; -import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; +import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager; @@ -99,7 +99,7 @@ import java.util.Iterator; import java.util.Map; import java.util.Objects; import java.util.UUID; -import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeoutException; import static org.apache.flink.util.Preconditions.checkArgument; @@ -1328,8 +1328,8 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { } @Override - public Future<Void> retrievePayload() { - return FlinkCompletableFuture.completed(null); + public CompletableFuture<Void> retrievePayload() { + return CompletableFuture.completedFuture(null); } } @@ -1355,14 +1355,11 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { } @Override - public Future<SlotReport> retrievePayload() { - return callAsync( - new Callable<SlotReport>() { - @Override - public SlotReport call() throws Exception { - return taskSlotTable.createSlotReport(getResourceID()); - } - }, taskManagerConfiguration.getTimeout()); + public CompletableFuture<SlotReport> retrievePayload() { + return FutureUtils.toJava( + callAsync( + () -> taskSlotTable.createSlotReport(getResourceID()), + taskManagerConfiguration.getTimeout())); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/3b97784a/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java index 031f48c..593daf7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java @@ -19,11 +19,8 @@ package org.apache.flink.runtime.heartbeat; import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.concurrent.CompletableFuture; -import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.concurrent.ScheduledExecutor; import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; -import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; import org.apache.flink.runtime.util.DirectExecutorService; import org.apache.flink.util.TestLogger; @@ -31,6 +28,7 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -72,7 +70,7 @@ public class HeartbeatManagerTest extends TestLogger { Object expectedObject = new Object(); - when(heartbeatListener.retrievePayload()).thenReturn(FlinkCompletableFuture.completed(expectedObject)); + when(heartbeatListener.retrievePayload()).thenReturn(CompletableFuture.completedFuture(expectedObject)); HeartbeatManagerImpl<Object, Object> heartbeatManager = new HeartbeatManagerImpl<>( heartbeatTimeout, @@ -113,7 +111,7 @@ public class HeartbeatManagerTest extends TestLogger { Object expectedObject = new Object(); - when(heartbeatListener.retrievePayload()).thenReturn(FlinkCompletableFuture.completed(expectedObject)); + when(heartbeatListener.retrievePayload()).thenReturn(CompletableFuture.completedFuture(expectedObject)); HeartbeatManagerImpl<Object, Object> heartbeatManager = new HeartbeatManagerImpl<>( heartbeatTimeout, @@ -165,7 +163,7 @@ public class HeartbeatManagerTest extends TestLogger { HeartbeatTarget<Object> heartbeatTarget = mock(HeartbeatTarget.class); - Future<ResourceID> timeoutFuture = heartbeatListener.getTimeoutFuture(); + CompletableFuture<ResourceID> timeoutFuture = heartbeatListener.getTimeoutFuture(); heartbeatManager.monitorTarget(targetResourceID, heartbeatTarget); @@ -199,11 +197,11 @@ public class HeartbeatManagerTest extends TestLogger { ResourceID resourceID2 = new ResourceID("barfoo"); HeartbeatListener<Object, Object> heartbeatListener = mock(HeartbeatListener.class); - when(heartbeatListener.retrievePayload()).thenReturn(FlinkCompletableFuture.completed(object)); + when(heartbeatListener.retrievePayload()).thenReturn(CompletableFuture.completedFuture(object)); TestingHeartbeatListener heartbeatListener2 = new TestingHeartbeatListener(object2); - Future<ResourceID> futureTimeout = heartbeatListener2.getTimeoutFuture(); + CompletableFuture<ResourceID> futureTimeout = heartbeatListener2.getTimeoutFuture(); HeartbeatManagerImpl<Object, Object> heartbeatManager = new HeartbeatManagerImpl<>( heartbeatTimeout, @@ -266,7 +264,7 @@ public class HeartbeatManagerTest extends TestLogger { heartbeatManager.unmonitorTarget(targetID); - Future<ResourceID> timeout = heartbeatListener.getTimeoutFuture(); + CompletableFuture<ResourceID> timeout = heartbeatListener.getTimeoutFuture(); try { timeout.get(2 * heartbeatTimeout, TimeUnit.MILLISECONDS); @@ -278,7 +276,7 @@ public class HeartbeatManagerTest extends TestLogger { static class TestingHeartbeatListener implements HeartbeatListener<Object, Object> { - private final CompletableFuture<ResourceID> future = new FlinkCompletableFuture<>(); + private final CompletableFuture<ResourceID> future = new CompletableFuture<>(); private final Object payload; @@ -288,7 +286,7 @@ public class HeartbeatManagerTest extends TestLogger { this.payload = payload; } - public Future<ResourceID> getTimeoutFuture() { + CompletableFuture<ResourceID> getTimeoutFuture() { return future; } @@ -307,8 +305,8 @@ public class HeartbeatManagerTest extends TestLogger { } @Override - public Future<Object> retrievePayload() { - return FlinkCompletableFuture.completed(payload); + public CompletableFuture<Object> retrievePayload() { + return CompletableFuture.completedFuture(payload); } } }