Repository: flink
Updated Branches:
  refs/heads/master cab490f89 -> 3b97784ae


[FLINK-7321] [futures] Replace Flink's futures with Java 8's CompletableFuture 
in HeartbeatManager

Address PR comments

This closes #4434.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3b97784a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3b97784a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3b97784a

Branch: refs/heads/master
Commit: 3b97784aef34a6598d9947b259f73f935dc90c9f
Parents: cab490f
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Mon Jul 31 18:47:22 2017 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Tue Aug 1 22:43:36 2017 +0200

----------------------------------------------------------------------
 .../runtime/heartbeat/HeartbeatListener.java    |  5 ++--
 .../runtime/heartbeat/HeartbeatManagerImpl.java | 24 +++++++-------------
 .../heartbeat/HeartbeatManagerSenderImpl.java   | 23 +++++++------------
 .../flink/runtime/jobmaster/JobMaster.java      |  8 +++----
 .../resourcemanager/ResourceManager.java        |  9 ++++----
 .../runtime/taskexecutor/TaskExecutor.java      | 21 ++++++++---------
 .../runtime/heartbeat/HeartbeatManagerTest.java | 24 +++++++++-----------
 7 files changed, 48 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3b97784a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatListener.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatListener.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatListener.java
index c6307aa..734eb4c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatListener.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatListener.java
@@ -19,7 +19,8 @@
 package org.apache.flink.runtime.heartbeat;
 
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.concurrent.Future;
+
+import java.util.concurrent.CompletableFuture;
 
 /**
  * Interface for the interaction with the {@link HeartbeatManager}. The 
heartbeat listener is used
@@ -58,5 +59,5 @@ public interface HeartbeatListener<I, O> {
         *
         * @return Future containing the next payload for heartbeats
         */
-       Future<O> retrievePayload();
+       CompletableFuture<O> retrievePayload();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97784a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
index d97cfa0..99f44f9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
@@ -19,9 +19,6 @@
 package org.apache.flink.runtime.heartbeat;
 
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.concurrent.AcceptFunction;
-import org.apache.flink.runtime.concurrent.ApplyFunction;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.util.Preconditions;
 
@@ -30,6 +27,7 @@ import org.slf4j.Logger;
 import javax.annotation.concurrent.ThreadSafe;
 
 import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledFuture;
@@ -193,24 +191,18 @@ public class HeartbeatManagerImpl<I, O> implements 
HeartbeatManager<I, O> {
                                        
heartbeatListener.reportPayload(requestOrigin, heartbeatPayload);
                                }
 
-                               Future<O> futurePayload = 
heartbeatListener.retrievePayload();
+                               CompletableFuture<O> futurePayload = 
heartbeatListener.retrievePayload();
 
                                if (futurePayload != null) {
-                                       Future<Void> sendHeartbeatFuture = 
futurePayload.thenAcceptAsync(new AcceptFunction<O>() {
-                                               @Override
-                                               public void accept(O 
retrievedPayload) {
-                                                       
heartbeatTarget.receiveHeartbeat(getOwnResourceID(), retrievedPayload);
-                                               }
-                                       }, executor);
-
-                                       sendHeartbeatFuture.exceptionally(new 
ApplyFunction<Throwable, Void>() {
-                                               @Override
-                                               public Void apply(Throwable 
failure) {
+                                       CompletableFuture<Void> 
sendHeartbeatFuture = futurePayload.thenAcceptAsync(
+                                               retrievedPayload ->     
heartbeatTarget.receiveHeartbeat(getOwnResourceID(), retrievedPayload),
+                                               executor);
+
+                                       
sendHeartbeatFuture.exceptionally((Throwable failure) -> {
                                                        log.warn("Could not 
send heartbeat to target with id {}.", requestOrigin, failure);
 
                                                        return null;
-                                               }
-                                       });
+                                               });
                                } else {
                                        
heartbeatTarget.receiveHeartbeat(ownResourceID, null);
                                }

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97784a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java
index 5b3a957..eb82343 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java
@@ -19,13 +19,11 @@
 package org.apache.flink.runtime.heartbeat;
 
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.concurrent.AcceptFunction;
-import org.apache.flink.runtime.concurrent.ApplyFunction;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 
 import org.slf4j.Logger;
 
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
@@ -65,25 +63,20 @@ public class HeartbeatManagerSenderImpl<I, O> extends 
HeartbeatManagerImpl<I, O>
                if (!stopped) {
                        log.debug("Trigger heartbeat request.");
                        for (HeartbeatMonitor<O> heartbeatMonitor : 
getHeartbeatTargets()) {
-                               Future<O> futurePayload = 
getHeartbeatListener().retrievePayload();
+                               CompletableFuture<O> futurePayload = 
getHeartbeatListener().retrievePayload();
                                final HeartbeatTarget<O> heartbeatTarget = 
heartbeatMonitor.getHeartbeatTarget();
 
                                if (futurePayload != null) {
-                                       Future<Void> requestHeartbeatFuture = 
futurePayload.thenAcceptAsync(new AcceptFunction<O>() {
-                                               @Override
-                                               public void accept(O payload) {
-                                                       
heartbeatTarget.requestHeartbeat(getOwnResourceID(), payload);
-                                               }
-                                       }, getExecutor());
+                                       CompletableFuture<Void> 
requestHeartbeatFuture = futurePayload.thenAcceptAsync(
+                                               payload -> 
heartbeatTarget.requestHeartbeat(getOwnResourceID(), payload),
+                                               getExecutor());
 
-                                       
requestHeartbeatFuture.exceptionally(new ApplyFunction<Throwable, Void>() {
-                                               @Override
-                                               public Void apply(Throwable 
failure) {
+                                       requestHeartbeatFuture.exceptionally(
+                                               (Throwable failure) -> {
                                                        log.warn("Could not 
request the heartbeat from target {}.", heartbeatTarget, failure);
 
                                                        return null;
-                                               }
-                                       });
+                                               });
                                } else {
                                        
heartbeatTarget.requestHeartbeat(getOwnResourceID(), null);
                                }

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97784a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 9417f90..7922baa 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -1126,8 +1126,8 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                }
 
                @Override
-               public Future<Void> retrievePayload() {
-                       return FlinkCompletableFuture.completed(null);
+               public CompletableFuture<Void> retrievePayload() {
+                       return CompletableFuture.completedFuture(null);
                }
        }
 
@@ -1153,8 +1153,8 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                }
 
                @Override
-               public Future<Void> retrievePayload() {
-                       return FlinkCompletableFuture.completed(null);
+               public CompletableFuture<Void> retrievePayload() {
+                       return CompletableFuture.completedFuture(null);
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97784a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 0dfbbcd..438ec65 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -67,6 +67,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeoutException;
@@ -999,8 +1000,8 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
                }
 
                @Override
-               public Future<Void> retrievePayload() {
-                       return FlinkCompletableFuture.completed(null);
+               public CompletableFuture<Void> retrievePayload() {
+                       return CompletableFuture.completedFuture(null);
                }
        }
 
@@ -1032,8 +1033,8 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
                }
 
                @Override
-               public Future<Void> retrievePayload() {
-                       return FlinkCompletableFuture.completed(null);
+               public CompletableFuture<Void> retrievePayload() {
+                       return CompletableFuture.completedFuture(null);
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97784a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index cdec08e..4c4b0a7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -30,7 +30,7 @@ import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.concurrent.AcceptFunction;
 import org.apache.flink.runtime.concurrent.ApplyFunction;
 import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
@@ -99,7 +99,7 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.Objects;
 import java.util.UUID;
-import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeoutException;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -1328,8 +1328,8 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                }
 
                @Override
-               public Future<Void> retrievePayload() {
-                       return FlinkCompletableFuture.completed(null);
+               public CompletableFuture<Void> retrievePayload() {
+                       return CompletableFuture.completedFuture(null);
                }
        }
 
@@ -1355,14 +1355,11 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                }
 
                @Override
-               public Future<SlotReport> retrievePayload() {
-                       return callAsync(
-                               new Callable<SlotReport>() {
-                                       @Override
-                                       public SlotReport call() throws 
Exception {
-                                               return 
taskSlotTable.createSlotReport(getResourceID());
-                                       }
-                               }, taskManagerConfiguration.getTimeout());
+               public CompletableFuture<SlotReport> retrievePayload() {
+                       return FutureUtils.toJava(
+                               callAsync(
+                                       () -> 
taskSlotTable.createSlotReport(getResourceID()),
+                                       taskManagerConfiguration.getTimeout()));
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97784a/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java
index 031f48c..593daf7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java
@@ -19,11 +19,8 @@
 package org.apache.flink.runtime.heartbeat;
 
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.concurrent.CompletableFuture;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.util.DirectExecutorService;
 import org.apache.flink.util.TestLogger;
 
@@ -31,6 +28,7 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
@@ -72,7 +70,7 @@ public class HeartbeatManagerTest extends TestLogger {
 
                Object expectedObject = new Object();
 
-               
when(heartbeatListener.retrievePayload()).thenReturn(FlinkCompletableFuture.completed(expectedObject));
+               
when(heartbeatListener.retrievePayload()).thenReturn(CompletableFuture.completedFuture(expectedObject));
 
                HeartbeatManagerImpl<Object, Object> heartbeatManager = new 
HeartbeatManagerImpl<>(
                        heartbeatTimeout,
@@ -113,7 +111,7 @@ public class HeartbeatManagerTest extends TestLogger {
 
                Object expectedObject = new Object();
 
-               
when(heartbeatListener.retrievePayload()).thenReturn(FlinkCompletableFuture.completed(expectedObject));
+               
when(heartbeatListener.retrievePayload()).thenReturn(CompletableFuture.completedFuture(expectedObject));
 
                HeartbeatManagerImpl<Object, Object> heartbeatManager = new 
HeartbeatManagerImpl<>(
                        heartbeatTimeout,
@@ -165,7 +163,7 @@ public class HeartbeatManagerTest extends TestLogger {
 
                HeartbeatTarget<Object> heartbeatTarget = 
mock(HeartbeatTarget.class);
 
-               Future<ResourceID> timeoutFuture = 
heartbeatListener.getTimeoutFuture();
+               CompletableFuture<ResourceID> timeoutFuture = 
heartbeatListener.getTimeoutFuture();
 
                heartbeatManager.monitorTarget(targetResourceID, 
heartbeatTarget);
 
@@ -199,11 +197,11 @@ public class HeartbeatManagerTest extends TestLogger {
                ResourceID resourceID2 = new ResourceID("barfoo");
                HeartbeatListener<Object, Object> heartbeatListener = 
mock(HeartbeatListener.class);
 
-               
when(heartbeatListener.retrievePayload()).thenReturn(FlinkCompletableFuture.completed(object));
+               
when(heartbeatListener.retrievePayload()).thenReturn(CompletableFuture.completedFuture(object));
 
                TestingHeartbeatListener heartbeatListener2 = new 
TestingHeartbeatListener(object2);
 
-               Future<ResourceID> futureTimeout = 
heartbeatListener2.getTimeoutFuture();
+               CompletableFuture<ResourceID> futureTimeout = 
heartbeatListener2.getTimeoutFuture();
 
                HeartbeatManagerImpl<Object, Object> heartbeatManager = new 
HeartbeatManagerImpl<>(
                        heartbeatTimeout,
@@ -266,7 +264,7 @@ public class HeartbeatManagerTest extends TestLogger {
 
                heartbeatManager.unmonitorTarget(targetID);
 
-               Future<ResourceID> timeout = 
heartbeatListener.getTimeoutFuture();
+               CompletableFuture<ResourceID> timeout = 
heartbeatListener.getTimeoutFuture();
 
                try {
                        timeout.get(2 * heartbeatTimeout, 
TimeUnit.MILLISECONDS);
@@ -278,7 +276,7 @@ public class HeartbeatManagerTest extends TestLogger {
 
        static class TestingHeartbeatListener implements 
HeartbeatListener<Object, Object> {
 
-               private final CompletableFuture<ResourceID> future = new 
FlinkCompletableFuture<>();
+               private final CompletableFuture<ResourceID> future = new 
CompletableFuture<>();
 
                private final Object payload;
 
@@ -288,7 +286,7 @@ public class HeartbeatManagerTest extends TestLogger {
                        this.payload = payload;
                }
 
-               public Future<ResourceID> getTimeoutFuture() {
+               CompletableFuture<ResourceID> getTimeoutFuture() {
                        return future;
                }
 
@@ -307,8 +305,8 @@ public class HeartbeatManagerTest extends TestLogger {
                }
 
                @Override
-               public Future<Object> retrievePayload() {
-                       return FlinkCompletableFuture.completed(payload);
+               public CompletableFuture<Object> retrievePayload() {
+                       return CompletableFuture.completedFuture(payload);
                }
        }
 }

Reply via email to