[FLINK-4478] [flip-6] Add HeartbeatManager

Add a heartbeat manager abstraction which can monitor heartbeat targets. 
Whenever
no heartbeat signal has been received for a heartbeat timeout interval, the
heartbeat manager will issue a heartbeat timeout notification.

Add resourceID to HeartbeatListener.reportPayload

Replace scala future by Flink's futures

Add unmonitoring test

This closes #2435.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3e4eb4f9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3e4eb4f9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3e4eb4f9

Branch: refs/heads/flip-6
Commit: 3e4eb4f92012265b6fff27f0544fcd6d1629431f
Parents: 214113e
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Thu Aug 25 14:05:07 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Oct 14 15:14:42 2016 +0200

----------------------------------------------------------------------
 .../runtime/heartbeat/HeartbeatListener.java    |  62 ++++
 .../runtime/heartbeat/HeartbeatManager.java     |  67 ++++
 .../runtime/heartbeat/HeartbeatManagerImpl.java | 328 +++++++++++++++++++
 .../heartbeat/HeartbeatManagerSenderImpl.java   |  81 +++++
 .../runtime/heartbeat/HeartbeatTarget.java      |  50 +++
 .../runtime/heartbeat/HeartbeatManagerTest.java | 315 ++++++++++++++++++
 .../slotmanager/SlotProtocolTest.java           |   4 -
 7 files changed, 903 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3e4eb4f9/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatListener.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatListener.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatListener.java
new file mode 100644
index 0000000..8c08251
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatListener.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.heartbeat;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.Future;
+
+/**
+ * Interface for the interaction with the {@link HeartbeatManager}. The 
heartbeat listener is used
+ * for the following things:
+ * <p>
+ * <ul>
+ *     <il>Notifications about heartbeat timeouts</il>
+ *     <li>Payload reports of incoming heartbeats</li>
+ *     <li>Retrieval of payloads for outgoing heartbeats</li>
+ * </ul>
+ * @param <I> Type of the incoming payload
+ * @param <O> Type of the outgoing payload
+ */
+public interface HeartbeatListener<I, O> {
+
+       /**
+        * Callback which is called if a heartbeat for the machine identified 
by the given resource
+        * ID times out.
+        *
+        * @param resourceID Resource ID of the machine whose heartbeat has 
timed out
+        */
+       void notifyHeartbeatTimeout(ResourceID resourceID);
+
+       /**
+        * Callback which is called whenever a heartbeat with an associated 
payload is received. The
+        * carried payload is given to this method.
+        *
+        * @param resourceID Resource ID identifying the sender of the payload
+        * @param payload Payload of the received heartbeat
+        */
+       void reportPayload(ResourceID resourceID, I payload);
+
+       /**
+        * Retrieves the payload value for the next heartbeat message. Since 
the operation can happen
+        * asynchronously, the result is returned wrapped in a future.
+        *
+        * @return Future containing the next payload for heartbeats
+        */
+       Future<O> retrievePayload();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3e4eb4f9/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManager.java
new file mode 100644
index 0000000..12918ed
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManager.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.heartbeat;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+
+/**
+ * A heartbeat manager has to be able to do the following things:
+ *
+ * <ul>
+ *     <li>Monitor {@link HeartbeatTarget} and report heartbeat timeouts for 
this target</li>
+ *     <li>Stop monitoring a {@link HeartbeatTarget}</li>
+ * </ul>
+ *
+ *
+ * @param <I> Type of the incoming payload
+ * @param <O> Type of the outgoing payload
+ */
+public interface HeartbeatManager<I, O> {
+
+       /**
+        * Start monitoring a {@link HeartbeatTarget}. Heartbeat timeouts for 
this target are reported
+        * to the {@link HeartbeatListener} associated with this heartbeat 
manager.
+        *
+        * @param resourceID Resource ID identifying the heartbeat target
+        * @param heartbeatTarget Interface to send heartbeat requests and 
responses to the heartbeat
+        *                        target
+        */
+       void monitorTarget(ResourceID resourceID, HeartbeatTarget<O> 
heartbeatTarget);
+
+       /**
+        * Stops monitoring the heartbeat target with the associated resource 
ID.
+        *
+        * @param resourceID Resource ID of the heartbeat target which shall no 
longer be monitored
+        */
+       void unmonitorTarget(ResourceID resourceID);
+
+       /**
+        * Starts the heartbeat manager with the given {@link 
HeartbeatListener}. The heartbeat listener
+        * is notified about heartbeat timeouts and heartbeat payloads are 
reported and retrieved to
+        * and from it.
+        *
+        * @param heartbeatListener Heartbeat listener associated with the 
heartbeat manager
+        */
+       void start(HeartbeatListener<I, O> heartbeatListener);
+
+       /**
+        * Stops the heartbeat manager.
+        */
+       void stop();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3e4eb4f9/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
new file mode 100644
index 0000000..042f95b
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.heartbeat;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.AcceptFunction;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+
+import javax.annotation.concurrent.ThreadSafe;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Heartbeat manager implementation. The heartbeat manager maintains a map of 
heartbeat monitors
+ * and resource IDs. Each monitor will be updated when a new heartbeat of the 
associated machine has
+ * been received. If the monitor detects that a heartbeat has timed out, it 
will notify the
+ * {@link HeartbeatListener} about it. A heartbeat times out iff no heartbeat 
signal has been
+ * received within a given timeout interval.
+ *
+ * @param <I> Type of the incoming heartbeat payload
+ * @param <O> Type of the outgoing heartbeat payload
+ */
+@ThreadSafe
+public class HeartbeatManagerImpl<I, O> implements HeartbeatManager<I, O>, 
HeartbeatTarget<I> {
+
+       /** Heartbeat timeout interval in milli seconds */
+       private final long heartbeatTimeoutIntervalMs;
+
+       /** Resource ID which is used to mark one own's heartbeat signals */
+       private final ResourceID ownResourceID;
+
+       /** Executor service used to run heartbeat timeout notifications */
+       private final ScheduledExecutorService scheduledExecutorService;
+
+       protected final Logger log;
+
+       /** Map containing the heartbeat monitors associated with the 
respective resource ID */
+       private final ConcurrentHashMap<ResourceID, 
HeartbeatManagerImpl.HeartbeatMonitor<O>> heartbeatTargets;
+
+       /** Execution context used to run future callbacks */
+       private final Executor executor;
+
+       /** Heartbeat listener with which the heartbeat manager has been 
associated */
+       private HeartbeatListener<I, O> heartbeatListener;
+
+       /** Running state of the heartbeat manager */
+       protected volatile boolean stopped;
+
+       public HeartbeatManagerImpl(
+               long heartbeatTimeoutIntervalMs,
+               ResourceID ownResourceID,
+               Executor executor,
+               ScheduledExecutorService scheduledExecutorService,
+               Logger log) {
+               Preconditions.checkArgument(heartbeatTimeoutIntervalMs > 0L, 
"The heartbeat timeout has to be larger than 0.");
+
+               this.heartbeatTimeoutIntervalMs = heartbeatTimeoutIntervalMs;
+               this.ownResourceID = Preconditions.checkNotNull(ownResourceID);
+               this.scheduledExecutorService = 
Preconditions.checkNotNull(scheduledExecutorService);
+               this.log = Preconditions.checkNotNull(log);
+               this.executor = Preconditions.checkNotNull(executor);
+               this.heartbeatTargets = new ConcurrentHashMap<>(16);
+
+               stopped = true;
+       }
+
+       
//----------------------------------------------------------------------------------------------
+       // Getters
+       
//----------------------------------------------------------------------------------------------
+
+       ResourceID getOwnResourceID() {
+               return ownResourceID;
+       }
+
+       Executor getExecutor() {
+               return executor;
+       }
+
+       HeartbeatListener<I, O> getHeartbeatListener() {
+               return heartbeatListener;
+       }
+
+       Collection<HeartbeatManagerImpl.HeartbeatMonitor<O>> 
getHeartbeatTargets() {
+               return heartbeatTargets.values();
+       }
+
+       
//----------------------------------------------------------------------------------------------
+       // HeartbeatManager methods
+       
//----------------------------------------------------------------------------------------------
+
+       @Override
+       public void monitorTarget(ResourceID resourceID, HeartbeatTarget<O> 
heartbeatTarget) {
+               if (!stopped) {
+                       if (heartbeatTargets.containsKey(resourceID)) {
+                               log.info("The target with resource ID {} is 
already been monitored.", resourceID);
+                       } else {
+                               HeartbeatManagerImpl.HeartbeatMonitor<O> 
heartbeatMonitor = new HeartbeatManagerImpl.HeartbeatMonitor<>(
+                                       resourceID,
+                                       heartbeatTarget,
+                                       scheduledExecutorService,
+                                       heartbeatListener,
+                                       heartbeatTimeoutIntervalMs);
+
+                               heartbeatTargets.put(
+                                       resourceID,
+                                       heartbeatMonitor);
+
+                               // check if we have stopped in the meantime 
(concurrent stop operation)
+                               if (stopped) {
+                                       heartbeatMonitor.cancel();
+
+                                       heartbeatTargets.remove(resourceID);
+                               }
+                       }
+               }
+       }
+
+       @Override
+       public void unmonitorTarget(ResourceID resourceID) {
+               if (!stopped) {
+                       HeartbeatManagerImpl.HeartbeatMonitor<O> 
heartbeatMonitor = heartbeatTargets.remove(resourceID);
+
+                       if (heartbeatMonitor != null) {
+                               heartbeatMonitor.cancel();
+                       }
+               }
+       }
+
+       @Override
+       public void start(HeartbeatListener<I, O> heartbeatListener) {
+               Preconditions.checkState(stopped, "Cannot start an already 
started heartbeat manager.");
+
+               stopped = false;
+
+               this.heartbeatListener = 
Preconditions.checkNotNull(heartbeatListener);
+       }
+
+       @Override
+       public void stop() {
+               stopped = true;
+
+               for (HeartbeatManagerImpl.HeartbeatMonitor<O> heartbeatMonitor 
: heartbeatTargets.values()) {
+                       heartbeatMonitor.cancel();
+               }
+
+               heartbeatTargets.clear();
+       }
+
+       
//----------------------------------------------------------------------------------------------
+       // HeartbeatTarget methods
+       
//----------------------------------------------------------------------------------------------
+
+       @Override
+       public void sendHeartbeat(ResourceID resourceID, I payload) {
+               if (!stopped) {
+                       log.debug("Received heartbeat from {}.", resourceID);
+                       reportHeartbeat(resourceID);
+
+                       if (payload != null) {
+                               heartbeatListener.reportPayload(resourceID, 
payload);
+                       }
+               }
+       }
+
+       @Override
+       public void requestHeartbeat(ResourceID resourceID, I payload) {
+               if (!stopped) {
+                       log.debug("Received heartbeat request from {}.", 
resourceID);
+
+                       final HeartbeatTarget<O> heartbeatTarget = 
reportHeartbeat(resourceID);
+
+                       if (heartbeatTarget != null) {
+                               if (payload != null) {
+                                       
heartbeatListener.reportPayload(resourceID, payload);
+                               }
+
+                               Future<O> futurePayload = 
heartbeatListener.retrievePayload();
+
+                               if (futurePayload != null) {
+                                       futurePayload.thenAcceptAsync(new 
AcceptFunction<O>() {
+                                               @Override
+                                               public void accept(O 
retrievedPayload) {
+                                                       
heartbeatTarget.sendHeartbeat(getOwnResourceID(), retrievedPayload);
+                                               }
+                                       }, executor);
+                               } else {
+                                       
heartbeatTarget.sendHeartbeat(ownResourceID, null);
+                               }
+                       }
+               }
+       }
+
+       HeartbeatTarget<O> reportHeartbeat(ResourceID resourceID) {
+               if (heartbeatTargets.containsKey(resourceID)) {
+                       HeartbeatManagerImpl.HeartbeatMonitor<O> 
heartbeatMonitor = heartbeatTargets.get(resourceID);
+                       heartbeatMonitor.reportHeartbeat();
+
+                       return heartbeatMonitor.getHeartbeatTarget();
+               } else {
+                       return null;
+               }
+       }
+
+       
//----------------------------------------------------------------------------------------------
+       // Utility classes
+       
//----------------------------------------------------------------------------------------------
+
+       /**
+        * Heartbeat monitor which manages the heartbeat state of the 
associated heartbeat target. The
+        * monitor notifies the {@link HeartbeatListener} whenever it has not 
seen a heartbeat signal
+        * in the specified heartbeat timeout interval. Each heartbeat signal 
resets this timer.
+        *
+        * @param <O> Type of the payload being sent to the associated 
heartbeat target
+        */
+       static class HeartbeatMonitor<O> implements Runnable {
+
+               /** Resource ID of the monitored heartbeat target */
+               private final ResourceID resourceID;
+
+               /** Associated heartbeat target */
+               private final HeartbeatTarget<O> heartbeatTarget;
+
+               private final ScheduledExecutorService scheduledExecutorService;
+
+               /** Listener which is notified about heartbeat timeouts */
+               private final HeartbeatListener<?, ?> heartbeatListener;
+
+               /** Maximum heartbeat timeout interval */
+               private final long heartbeatTimeoutIntervalMs;
+
+               private volatile ScheduledFuture<?> futureTimeout;
+
+               private final AtomicReference<State> state = new 
AtomicReference<>(State.RUNNING);
+
+               HeartbeatMonitor(
+                       ResourceID resourceID,
+                       HeartbeatTarget<O> heartbeatTarget,
+                       ScheduledExecutorService scheduledExecutorService,
+                       HeartbeatListener<?, O> heartbeatListener,
+                       long heartbeatTimeoutIntervalMs) {
+
+                       this.resourceID = 
Preconditions.checkNotNull(resourceID);
+                       this.heartbeatTarget = 
Preconditions.checkNotNull(heartbeatTarget);
+                       this.scheduledExecutorService = 
Preconditions.checkNotNull(scheduledExecutorService);
+                       this.heartbeatListener = 
Preconditions.checkNotNull(heartbeatListener);
+
+                       Preconditions.checkArgument(heartbeatTimeoutIntervalMs 
>= 0L, "The heartbeat timeout interval has to be larger than 0.");
+                       this.heartbeatTimeoutIntervalMs = 
heartbeatTimeoutIntervalMs;
+
+                       resetHeartbeatTimeout(heartbeatTimeoutIntervalMs);
+               }
+
+               HeartbeatTarget<O> getHeartbeatTarget() {
+                       return heartbeatTarget;
+               }
+
+               void reportHeartbeat() {
+                       resetHeartbeatTimeout(heartbeatTimeoutIntervalMs);
+               }
+
+               void resetHeartbeatTimeout(long heartbeatTimeout) {
+                       if (state.get() == State.RUNNING) {
+                               cancelTimeout();
+
+                               futureTimeout = 
scheduledExecutorService.schedule(this, heartbeatTimeout, 
TimeUnit.MILLISECONDS);
+
+                               // Double check for concurrent accesses (e.g. a 
firing of the scheduled future)
+                               if (state.get() != State.RUNNING) {
+                                       cancelTimeout();
+                               }
+                       }
+               }
+
+               void cancel() {
+                       // we can only cancel if we are in state running
+                       if (state.compareAndSet(State.RUNNING, State.CANCELED)) 
{
+                               cancelTimeout();
+                       }
+               }
+
+               private void cancelTimeout() {
+                       if (futureTimeout != null) {
+                               futureTimeout.cancel(true);
+                       }
+               }
+
+               public boolean isCanceled() {
+                       return state.get() == State.CANCELED;
+               }
+
+               @Override
+               public void run() {
+                       // The heartbeat has timed out if we're in state running
+                       if (state.compareAndSet(State.RUNNING, State.TIMEOUT)) {
+                               
heartbeatListener.notifyHeartbeatTimeout(resourceID);
+                       }
+               }
+
+               private enum State {
+                       RUNNING,
+                       TIMEOUT,
+                       CANCELED
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3e4eb4f9/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java
new file mode 100644
index 0000000..588ba7f
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.heartbeat;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.AcceptFunction;
+import org.apache.flink.runtime.concurrent.Future;
+import org.slf4j.Logger;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * {@link HeartbeatManager} implementation which regularly requests a 
heartbeat response from
+ * its monitored {@link HeartbeatTarget}. The heartbeat period is configurable.
+ *
+ * @param <I> Type of the incoming heartbeat payload
+ * @param <O> Type of the outgoind heartbeat payload
+ */
+public class HeartbeatManagerSenderImpl<I, O> extends HeartbeatManagerImpl<I, 
O> implements Runnable {
+
+       private final ScheduledFuture<?> triggerFuture;
+
+       public HeartbeatManagerSenderImpl(
+               long heartbeatPeriod,
+               long heartbeatTimeout,
+               ResourceID ownResourceID,
+               ExecutorService executorService,
+               ScheduledExecutorService scheduledExecutorService,
+               Logger log) {
+               super(heartbeatTimeout, ownResourceID, executorService, 
scheduledExecutorService, log);
+
+               triggerFuture = 
scheduledExecutorService.scheduleAtFixedRate(this, 0L, heartbeatPeriod, 
TimeUnit.MILLISECONDS);
+       }
+
+       @Override
+       public void run() {
+               if (!stopped) {
+                       log.debug("Trigger heartbeat request.");
+                       for (HeartbeatMonitor<O> heartbeatMonitor : 
getHeartbeatTargets()) {
+                               Future<O> futurePayload = 
getHeartbeatListener().retrievePayload();
+                               final HeartbeatTarget<O> heartbeatTarget = 
heartbeatMonitor.getHeartbeatTarget();
+
+                               if (futurePayload != null) {
+                                       futurePayload.thenAcceptAsync(new 
AcceptFunction<O>() {
+                                               @Override
+                                               public void accept(O payload) {
+                                                       
heartbeatTarget.requestHeartbeat(getOwnResourceID(), payload);
+                                               }
+                                       }, getExecutor());
+                               } else {
+                                       
heartbeatTarget.requestHeartbeat(getOwnResourceID(), null);
+                               }
+                       }
+               }
+       }
+
+       @Override
+       public void stop() {
+                       triggerFuture.cancel(true);
+                       super.stop();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3e4eb4f9/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatTarget.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatTarget.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatTarget.java
new file mode 100644
index 0000000..ef953de
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatTarget.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.heartbeat;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+
+/**
+ * Interface for components which can be sent heartbeats and from which one 
can request a
+ * heartbeat response. Both the heartbeat response as well as the heartbeat 
request can carry a
+ * payload. This payload is reported to the heartbeat target and contains 
additional information.
+ * The payload can be empty which is indicated by a null value.
+ *
+ * @param <I> Type of the payload which is sent to the heartbeat target
+ */
+public interface HeartbeatTarget<I> {
+
+       /**
+        * Sends a heartbeat response to the target. Each heartbeat response 
can carry a payload which
+        * contains additional information for the heartbeat target.
+        *
+        * @param resourceID Resource ID identifying the machine for which a 
heartbeat shall be reported.
+        * @param payload Payload of the heartbeat response. Null indicates an 
empty payload.
+        */
+       void sendHeartbeat(ResourceID resourceID, I payload);
+
+       /**
+        * Requests a heartbeat from the target. Each heartbeat request can 
carry a payload which
+        * contains additional information for the heartbeat target.
+        *
+        * @param resourceID Resource ID identifying the machine issuing the 
heartbeat request.
+        * @param payload Payload of the heartbeat response. Null indicates an 
empty payload.
+        */
+       void requestHeartbeat(ResourceID resourceID, I payload);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3e4eb4f9/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java
new file mode 100644
index 0000000..1c62f17
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.heartbeat;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.CompletableFuture;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.util.DirectExecutorService;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+
+public class HeartbeatManagerTest extends TestLogger {
+       private static final Logger LOG = 
LoggerFactory.getLogger(HeartbeatManagerTest.class);
+
+       /**
+        * Tests that regular heartbeat signal triggers the right callback 
functions in the
+        * {@link HeartbeatListener}.
+        */
+       @Test
+       public void testRegularHeartbeat() {
+               long heartbeatTimeout = 1000L;
+               ResourceID ownResourceID = new ResourceID("foobar");
+               ResourceID targetResourceID = new ResourceID("barfoo");
+               HeartbeatListener<Object, Object> heartbeatListener = 
mock(HeartbeatListener.class);
+               ScheduledExecutorService scheduledExecutorService = 
mock(ScheduledExecutorService.class);
+
+               Object expectedObject = new Object();
+
+               
when(heartbeatListener.retrievePayload()).thenReturn(FlinkCompletableFuture.completed(expectedObject));
+
+               HeartbeatManagerImpl<Object, Object> heartbeatManager = new 
HeartbeatManagerImpl<>(
+                       heartbeatTimeout,
+                       ownResourceID,
+                       new DirectExecutorService(),
+                       scheduledExecutorService,
+                       LOG);
+
+               heartbeatManager.start(heartbeatListener);
+
+               HeartbeatTarget<Object> heartbeatTarget = 
mock(HeartbeatTarget.class);
+
+               heartbeatManager.monitorTarget(targetResourceID, 
heartbeatTarget);
+
+               heartbeatManager.requestHeartbeat(targetResourceID, 
expectedObject);
+
+               verify(heartbeatListener, 
times(1)).reportPayload(targetResourceID, expectedObject);
+               verify(heartbeatListener, times(1)).retrievePayload();
+               verify(heartbeatTarget, times(1)).sendHeartbeat(ownResourceID, 
expectedObject);
+
+               heartbeatManager.sendHeartbeat(targetResourceID, 
expectedObject);
+
+               verify(heartbeatListener, 
times(2)).reportPayload(targetResourceID, expectedObject);
+       }
+
+       /**
+        * Tests that the heartbeat monitors are updated when receiving a new 
heartbeat signal.
+        */
+       @Test
+       public void testHeartbeatMonitorUpdate() {
+               long heartbeatTimeout = 1000L;
+               ResourceID ownResourceID = new ResourceID("foobar");
+               ResourceID targetResourceID = new ResourceID("barfoo");
+               HeartbeatListener<Object, Object> heartbeatListener = 
mock(HeartbeatListener.class);
+               ScheduledExecutorService scheduledExecutorService = 
mock(ScheduledExecutorService.class);
+               ScheduledFuture<?> scheduledFuture = 
mock(ScheduledFuture.class);
+
+               
doReturn(scheduledFuture).when(scheduledExecutorService).schedule(any(Runnable.class),
 anyLong(), any(TimeUnit.class));
+
+               Object expectedObject = new Object();
+
+               
when(heartbeatListener.retrievePayload()).thenReturn(FlinkCompletableFuture.completed(expectedObject));
+
+               HeartbeatManagerImpl<Object, Object> heartbeatManager = new 
HeartbeatManagerImpl<>(
+                       heartbeatTimeout,
+                       ownResourceID,
+                       new DirectExecutorService(),
+                       scheduledExecutorService,
+                       LOG);
+
+               heartbeatManager.start(heartbeatListener);
+
+               HeartbeatTarget<Object> heartbeatTarget = 
mock(HeartbeatTarget.class);
+
+               heartbeatManager.monitorTarget(targetResourceID, 
heartbeatTarget);
+
+               heartbeatManager.sendHeartbeat(targetResourceID, 
expectedObject);
+
+               verify(scheduledFuture, times(1)).cancel(true);
+               verify(scheduledExecutorService, 
times(2)).schedule(any(Runnable.class), eq(heartbeatTimeout), 
eq(TimeUnit.MILLISECONDS));
+       }
+
+       /**
+        * Tests that a heartbeat timeout is signaled if the heartbeat is not 
reported in time.
+        *
+        * @throws Exception
+        */
+       @Test
+       public void testHeartbeatTimeout() throws Exception {
+               long heartbeatTimeout = 100L;
+               int numHeartbeats = 10;
+               long heartbeatInterval = 20L;
+               Object payload = new Object();
+
+               ResourceID ownResourceID = new ResourceID("foobar");
+               ResourceID targetResourceID = new ResourceID("barfoo");
+               TestingHeartbeatListener heartbeatListener = new 
TestingHeartbeatListener(payload);
+               ScheduledExecutorService scheduledExecutorService = 
mock(ScheduledExecutorService.class);
+               ScheduledFuture<?> scheduledFuture = 
mock(ScheduledFuture.class);
+
+               
doReturn(scheduledFuture).when(scheduledExecutorService).schedule(any(Runnable.class),
 anyLong(), any(TimeUnit.class));
+
+               Object expectedObject = new Object();
+
+               HeartbeatManagerImpl<Object, Object> heartbeatManager = new 
HeartbeatManagerImpl<>(
+                       heartbeatTimeout,
+                       ownResourceID,
+                       new DirectExecutorService(),
+                       new ScheduledThreadPoolExecutor(1),
+                       LOG);
+
+               heartbeatManager.start(heartbeatListener);
+
+               HeartbeatTarget<Object> heartbeatTarget = 
mock(HeartbeatTarget.class);
+
+               Future<ResourceID> timeoutFuture = 
heartbeatListener.getTimeoutFuture();
+
+               heartbeatManager.monitorTarget(targetResourceID, 
heartbeatTarget);
+
+               for (int i = 0; i < numHeartbeats; i++) {
+                       heartbeatManager.sendHeartbeat(targetResourceID, 
expectedObject);
+                       Thread.sleep(heartbeatInterval);
+               }
+
+               assertFalse(timeoutFuture.isDone());
+
+               ResourceID timeoutResourceID = timeoutFuture.get(2 * 
heartbeatTimeout, TimeUnit.MILLISECONDS);
+
+               assertEquals(targetResourceID, timeoutResourceID);
+       }
+
+       /**
+        * Tests the heartbeat interplay between the {@link 
HeartbeatManagerImpl} and the
+        * {@link HeartbeatManagerSenderImpl}. The sender should regularly 
trigger heartbeat requests
+        * which are fulfilled by the receiver. Upon stopping the receiver, the 
sender should notify
+        * the heartbeat listener about the heartbeat timeout.
+        *
+        * @throws Exception
+        */
+       @Test
+       public void testHeartbeatCluster() throws Exception {
+               long heartbeatTimeout = 100L;
+               long heartbeatPeriod = 20L;
+               Object object = new Object();
+               Object object2 = new Object();
+               ResourceID resourceID = new ResourceID("foobar");
+               ResourceID resourceID2 = new ResourceID("barfoo");
+               HeartbeatListener<Object, Object> heartbeatListener = 
mock(HeartbeatListener.class);
+
+               
when(heartbeatListener.retrievePayload()).thenReturn(FlinkCompletableFuture.completed(object));
+
+               TestingHeartbeatListener heartbeatListener2 = new 
TestingHeartbeatListener(object2);
+
+               Future<ResourceID> futureTimeout = 
heartbeatListener2.getTimeoutFuture();
+
+               HeartbeatManagerImpl<Object, Object> heartbeatManager = new 
HeartbeatManagerImpl<>(
+                       heartbeatTimeout,
+                       resourceID,
+                       new DirectExecutorService(),
+                       new ScheduledThreadPoolExecutor(1),
+                       LOG);
+
+               HeartbeatManagerSenderImpl<Object, Object> heartbeatManager2 = 
new HeartbeatManagerSenderImpl<>(
+                       heartbeatPeriod,
+                       heartbeatTimeout,
+                       resourceID2,
+                       new DirectExecutorService(),
+                       new ScheduledThreadPoolExecutor(1),
+                       LOG);;
+
+               heartbeatManager.start(heartbeatListener);
+               heartbeatManager2.start(heartbeatListener2);
+
+               heartbeatManager.monitorTarget(resourceID2, heartbeatManager2);
+               heartbeatManager2.monitorTarget(resourceID, heartbeatManager);
+
+               Thread.sleep(2 * heartbeatTimeout);
+
+               assertFalse(futureTimeout.isDone());
+
+               heartbeatManager.stop();
+
+               ResourceID timeoutResourceID = futureTimeout.get(2 * 
heartbeatTimeout, TimeUnit.MILLISECONDS);
+
+               assertEquals(resourceID, timeoutResourceID);
+
+               int numberHeartbeats = (int) (2 * heartbeatTimeout / 
heartbeatPeriod);
+
+               verify(heartbeatListener, atLeast(numberHeartbeats / 
2)).reportPayload(resourceID2, object2);
+               assertTrue(heartbeatListener2.getNumberHeartbeatReports() >= 
numberHeartbeats / 2);
+       }
+
+       /**
+        * Tests that after unmonitoring a target, there won't be a timeout 
triggered
+        */
+       @Test
+       public void testTargetUnmonitoring() throws InterruptedException, 
ExecutionException {
+               // this might be too aggresive for Travis, let's see...
+               long heartbeatTimeout = 100L;
+               ResourceID resourceID = new ResourceID("foobar");
+               ResourceID targetID = new ResourceID("target");
+               Object object = new Object();
+
+               HeartbeatManager<Object, Object> heartbeatManager = new 
HeartbeatManagerImpl<>(
+                       heartbeatTimeout,
+                       resourceID,
+                       new DirectExecutorService(),
+                       new ScheduledThreadPoolExecutor(1),
+                       LOG);
+
+               TestingHeartbeatListener heartbeatListener = new 
TestingHeartbeatListener(object);
+
+               heartbeatManager.start(heartbeatListener);
+
+               heartbeatManager.monitorTarget(targetID, 
mock(HeartbeatTarget.class));
+
+               heartbeatManager.unmonitorTarget(targetID);
+
+               Future<ResourceID> timeout = 
heartbeatListener.getTimeoutFuture();
+
+
+               try {
+                       timeout.get(2 * heartbeatTimeout, 
TimeUnit.MILLISECONDS);
+                       fail("Timeout should time out.");
+               } catch (TimeoutException e) {
+                       // the timeout should not be completed since we 
unmonitored the target
+               }
+       }
+
+       static class TestingHeartbeatListener implements 
HeartbeatListener<Object, Object> {
+
+               private final CompletableFuture<ResourceID> future = new 
FlinkCompletableFuture<>();
+
+               private final Object payload;
+
+               private int numberHeartbeatReports;
+
+               TestingHeartbeatListener(Object payload) {
+                       this.payload = payload;
+               }
+
+               public Future<ResourceID> getTimeoutFuture() {
+                       return future;
+               }
+
+               public int getNumberHeartbeatReports() {
+                       return numberHeartbeatReports;
+               }
+
+               @Override
+               public void notifyHeartbeatTimeout(ResourceID resourceID) {
+                       future.complete(resourceID);
+               }
+
+               @Override
+               public void reportPayload(ResourceID resourceID, Object 
payload) {
+                       numberHeartbeatReports++;
+               }
+
+               @Override
+               public Future<Object> retrievePayload() {
+                       return FlinkCompletableFuture.completed(payload);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3e4eb4f9/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
index 805ea71..a87fe42 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
@@ -25,7 +25,6 @@ import 
org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
-import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
 import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
@@ -43,12 +42,9 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 
 import java.util.Collections;
 import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
 import static org.mockito.Matchers.any;

Reply via email to