[FLINK-4478] [flip-6] Add HeartbeatManager Add a heartbeat manager abstraction which can monitor heartbeat targets. Whenever no heartbeat signal has been received for a heartbeat timeout interval, the heartbeat manager will issue a heartbeat timeout notification.
Add resourceID to HeartbeatListener.reportPayload Replace scala future by Flink's futures Add unmonitoring test This closes #2435. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3e4eb4f9 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3e4eb4f9 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3e4eb4f9 Branch: refs/heads/flip-6 Commit: 3e4eb4f92012265b6fff27f0544fcd6d1629431f Parents: 214113e Author: Till Rohrmann <trohrm...@apache.org> Authored: Thu Aug 25 14:05:07 2016 +0200 Committer: Stephan Ewen <se...@apache.org> Committed: Fri Oct 14 15:14:42 2016 +0200 ---------------------------------------------------------------------- .../runtime/heartbeat/HeartbeatListener.java | 62 ++++ .../runtime/heartbeat/HeartbeatManager.java | 67 ++++ .../runtime/heartbeat/HeartbeatManagerImpl.java | 328 +++++++++++++++++++ .../heartbeat/HeartbeatManagerSenderImpl.java | 81 +++++ .../runtime/heartbeat/HeartbeatTarget.java | 50 +++ .../runtime/heartbeat/HeartbeatManagerTest.java | 315 ++++++++++++++++++ .../slotmanager/SlotProtocolTest.java | 4 - 7 files changed, 903 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/3e4eb4f9/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatListener.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatListener.java new file mode 100644 index 0000000..8c08251 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatListener.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.heartbeat; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.Future; + +/** + * Interface for the interaction with the {@link HeartbeatManager}. The heartbeat listener is used + * for the following things: + * <p> + * <ul> + * <il>Notifications about heartbeat timeouts</il> + * <li>Payload reports of incoming heartbeats</li> + * <li>Retrieval of payloads for outgoing heartbeats</li> + * </ul> + * @param <I> Type of the incoming payload + * @param <O> Type of the outgoing payload + */ +public interface HeartbeatListener<I, O> { + + /** + * Callback which is called if a heartbeat for the machine identified by the given resource + * ID times out. + * + * @param resourceID Resource ID of the machine whose heartbeat has timed out + */ + void notifyHeartbeatTimeout(ResourceID resourceID); + + /** + * Callback which is called whenever a heartbeat with an associated payload is received. The + * carried payload is given to this method. + * + * @param resourceID Resource ID identifying the sender of the payload + * @param payload Payload of the received heartbeat + */ + void reportPayload(ResourceID resourceID, I payload); + + /** + * Retrieves the payload value for the next heartbeat message. Since the operation can happen + * asynchronously, the result is returned wrapped in a future. + * + * @return Future containing the next payload for heartbeats + */ + Future<O> retrievePayload(); +} http://git-wip-us.apache.org/repos/asf/flink/blob/3e4eb4f9/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManager.java new file mode 100644 index 0000000..12918ed --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManager.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.heartbeat; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; + +/** + * A heartbeat manager has to be able to do the following things: + * + * <ul> + * <li>Monitor {@link HeartbeatTarget} and report heartbeat timeouts for this target</li> + * <li>Stop monitoring a {@link HeartbeatTarget}</li> + * </ul> + * + * + * @param <I> Type of the incoming payload + * @param <O> Type of the outgoing payload + */ +public interface HeartbeatManager<I, O> { + + /** + * Start monitoring a {@link HeartbeatTarget}. Heartbeat timeouts for this target are reported + * to the {@link HeartbeatListener} associated with this heartbeat manager. + * + * @param resourceID Resource ID identifying the heartbeat target + * @param heartbeatTarget Interface to send heartbeat requests and responses to the heartbeat + * target + */ + void monitorTarget(ResourceID resourceID, HeartbeatTarget<O> heartbeatTarget); + + /** + * Stops monitoring the heartbeat target with the associated resource ID. + * + * @param resourceID Resource ID of the heartbeat target which shall no longer be monitored + */ + void unmonitorTarget(ResourceID resourceID); + + /** + * Starts the heartbeat manager with the given {@link HeartbeatListener}. The heartbeat listener + * is notified about heartbeat timeouts and heartbeat payloads are reported and retrieved to + * and from it. + * + * @param heartbeatListener Heartbeat listener associated with the heartbeat manager + */ + void start(HeartbeatListener<I, O> heartbeatListener); + + /** + * Stops the heartbeat manager. + */ + void stop(); +} http://git-wip-us.apache.org/repos/asf/flink/blob/3e4eb4f9/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java new file mode 100644 index 0000000..042f95b --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java @@ -0,0 +1,328 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.heartbeat; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.AcceptFunction; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; + +import javax.annotation.concurrent.ThreadSafe; +import java.util.Collection; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Heartbeat manager implementation. The heartbeat manager maintains a map of heartbeat monitors + * and resource IDs. Each monitor will be updated when a new heartbeat of the associated machine has + * been received. If the monitor detects that a heartbeat has timed out, it will notify the + * {@link HeartbeatListener} about it. A heartbeat times out iff no heartbeat signal has been + * received within a given timeout interval. + * + * @param <I> Type of the incoming heartbeat payload + * @param <O> Type of the outgoing heartbeat payload + */ +@ThreadSafe +public class HeartbeatManagerImpl<I, O> implements HeartbeatManager<I, O>, HeartbeatTarget<I> { + + /** Heartbeat timeout interval in milli seconds */ + private final long heartbeatTimeoutIntervalMs; + + /** Resource ID which is used to mark one own's heartbeat signals */ + private final ResourceID ownResourceID; + + /** Executor service used to run heartbeat timeout notifications */ + private final ScheduledExecutorService scheduledExecutorService; + + protected final Logger log; + + /** Map containing the heartbeat monitors associated with the respective resource ID */ + private final ConcurrentHashMap<ResourceID, HeartbeatManagerImpl.HeartbeatMonitor<O>> heartbeatTargets; + + /** Execution context used to run future callbacks */ + private final Executor executor; + + /** Heartbeat listener with which the heartbeat manager has been associated */ + private HeartbeatListener<I, O> heartbeatListener; + + /** Running state of the heartbeat manager */ + protected volatile boolean stopped; + + public HeartbeatManagerImpl( + long heartbeatTimeoutIntervalMs, + ResourceID ownResourceID, + Executor executor, + ScheduledExecutorService scheduledExecutorService, + Logger log) { + Preconditions.checkArgument(heartbeatTimeoutIntervalMs > 0L, "The heartbeat timeout has to be larger than 0."); + + this.heartbeatTimeoutIntervalMs = heartbeatTimeoutIntervalMs; + this.ownResourceID = Preconditions.checkNotNull(ownResourceID); + this.scheduledExecutorService = Preconditions.checkNotNull(scheduledExecutorService); + this.log = Preconditions.checkNotNull(log); + this.executor = Preconditions.checkNotNull(executor); + this.heartbeatTargets = new ConcurrentHashMap<>(16); + + stopped = true; + } + + //---------------------------------------------------------------------------------------------- + // Getters + //---------------------------------------------------------------------------------------------- + + ResourceID getOwnResourceID() { + return ownResourceID; + } + + Executor getExecutor() { + return executor; + } + + HeartbeatListener<I, O> getHeartbeatListener() { + return heartbeatListener; + } + + Collection<HeartbeatManagerImpl.HeartbeatMonitor<O>> getHeartbeatTargets() { + return heartbeatTargets.values(); + } + + //---------------------------------------------------------------------------------------------- + // HeartbeatManager methods + //---------------------------------------------------------------------------------------------- + + @Override + public void monitorTarget(ResourceID resourceID, HeartbeatTarget<O> heartbeatTarget) { + if (!stopped) { + if (heartbeatTargets.containsKey(resourceID)) { + log.info("The target with resource ID {} is already been monitored.", resourceID); + } else { + HeartbeatManagerImpl.HeartbeatMonitor<O> heartbeatMonitor = new HeartbeatManagerImpl.HeartbeatMonitor<>( + resourceID, + heartbeatTarget, + scheduledExecutorService, + heartbeatListener, + heartbeatTimeoutIntervalMs); + + heartbeatTargets.put( + resourceID, + heartbeatMonitor); + + // check if we have stopped in the meantime (concurrent stop operation) + if (stopped) { + heartbeatMonitor.cancel(); + + heartbeatTargets.remove(resourceID); + } + } + } + } + + @Override + public void unmonitorTarget(ResourceID resourceID) { + if (!stopped) { + HeartbeatManagerImpl.HeartbeatMonitor<O> heartbeatMonitor = heartbeatTargets.remove(resourceID); + + if (heartbeatMonitor != null) { + heartbeatMonitor.cancel(); + } + } + } + + @Override + public void start(HeartbeatListener<I, O> heartbeatListener) { + Preconditions.checkState(stopped, "Cannot start an already started heartbeat manager."); + + stopped = false; + + this.heartbeatListener = Preconditions.checkNotNull(heartbeatListener); + } + + @Override + public void stop() { + stopped = true; + + for (HeartbeatManagerImpl.HeartbeatMonitor<O> heartbeatMonitor : heartbeatTargets.values()) { + heartbeatMonitor.cancel(); + } + + heartbeatTargets.clear(); + } + + //---------------------------------------------------------------------------------------------- + // HeartbeatTarget methods + //---------------------------------------------------------------------------------------------- + + @Override + public void sendHeartbeat(ResourceID resourceID, I payload) { + if (!stopped) { + log.debug("Received heartbeat from {}.", resourceID); + reportHeartbeat(resourceID); + + if (payload != null) { + heartbeatListener.reportPayload(resourceID, payload); + } + } + } + + @Override + public void requestHeartbeat(ResourceID resourceID, I payload) { + if (!stopped) { + log.debug("Received heartbeat request from {}.", resourceID); + + final HeartbeatTarget<O> heartbeatTarget = reportHeartbeat(resourceID); + + if (heartbeatTarget != null) { + if (payload != null) { + heartbeatListener.reportPayload(resourceID, payload); + } + + Future<O> futurePayload = heartbeatListener.retrievePayload(); + + if (futurePayload != null) { + futurePayload.thenAcceptAsync(new AcceptFunction<O>() { + @Override + public void accept(O retrievedPayload) { + heartbeatTarget.sendHeartbeat(getOwnResourceID(), retrievedPayload); + } + }, executor); + } else { + heartbeatTarget.sendHeartbeat(ownResourceID, null); + } + } + } + } + + HeartbeatTarget<O> reportHeartbeat(ResourceID resourceID) { + if (heartbeatTargets.containsKey(resourceID)) { + HeartbeatManagerImpl.HeartbeatMonitor<O> heartbeatMonitor = heartbeatTargets.get(resourceID); + heartbeatMonitor.reportHeartbeat(); + + return heartbeatMonitor.getHeartbeatTarget(); + } else { + return null; + } + } + + //---------------------------------------------------------------------------------------------- + // Utility classes + //---------------------------------------------------------------------------------------------- + + /** + * Heartbeat monitor which manages the heartbeat state of the associated heartbeat target. The + * monitor notifies the {@link HeartbeatListener} whenever it has not seen a heartbeat signal + * in the specified heartbeat timeout interval. Each heartbeat signal resets this timer. + * + * @param <O> Type of the payload being sent to the associated heartbeat target + */ + static class HeartbeatMonitor<O> implements Runnable { + + /** Resource ID of the monitored heartbeat target */ + private final ResourceID resourceID; + + /** Associated heartbeat target */ + private final HeartbeatTarget<O> heartbeatTarget; + + private final ScheduledExecutorService scheduledExecutorService; + + /** Listener which is notified about heartbeat timeouts */ + private final HeartbeatListener<?, ?> heartbeatListener; + + /** Maximum heartbeat timeout interval */ + private final long heartbeatTimeoutIntervalMs; + + private volatile ScheduledFuture<?> futureTimeout; + + private final AtomicReference<State> state = new AtomicReference<>(State.RUNNING); + + HeartbeatMonitor( + ResourceID resourceID, + HeartbeatTarget<O> heartbeatTarget, + ScheduledExecutorService scheduledExecutorService, + HeartbeatListener<?, O> heartbeatListener, + long heartbeatTimeoutIntervalMs) { + + this.resourceID = Preconditions.checkNotNull(resourceID); + this.heartbeatTarget = Preconditions.checkNotNull(heartbeatTarget); + this.scheduledExecutorService = Preconditions.checkNotNull(scheduledExecutorService); + this.heartbeatListener = Preconditions.checkNotNull(heartbeatListener); + + Preconditions.checkArgument(heartbeatTimeoutIntervalMs >= 0L, "The heartbeat timeout interval has to be larger than 0."); + this.heartbeatTimeoutIntervalMs = heartbeatTimeoutIntervalMs; + + resetHeartbeatTimeout(heartbeatTimeoutIntervalMs); + } + + HeartbeatTarget<O> getHeartbeatTarget() { + return heartbeatTarget; + } + + void reportHeartbeat() { + resetHeartbeatTimeout(heartbeatTimeoutIntervalMs); + } + + void resetHeartbeatTimeout(long heartbeatTimeout) { + if (state.get() == State.RUNNING) { + cancelTimeout(); + + futureTimeout = scheduledExecutorService.schedule(this, heartbeatTimeout, TimeUnit.MILLISECONDS); + + // Double check for concurrent accesses (e.g. a firing of the scheduled future) + if (state.get() != State.RUNNING) { + cancelTimeout(); + } + } + } + + void cancel() { + // we can only cancel if we are in state running + if (state.compareAndSet(State.RUNNING, State.CANCELED)) { + cancelTimeout(); + } + } + + private void cancelTimeout() { + if (futureTimeout != null) { + futureTimeout.cancel(true); + } + } + + public boolean isCanceled() { + return state.get() == State.CANCELED; + } + + @Override + public void run() { + // The heartbeat has timed out if we're in state running + if (state.compareAndSet(State.RUNNING, State.TIMEOUT)) { + heartbeatListener.notifyHeartbeatTimeout(resourceID); + } + } + + private enum State { + RUNNING, + TIMEOUT, + CANCELED + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/3e4eb4f9/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java new file mode 100644 index 0000000..588ba7f --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.heartbeat; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.AcceptFunction; +import org.apache.flink.runtime.concurrent.Future; +import org.slf4j.Logger; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +/** + * {@link HeartbeatManager} implementation which regularly requests a heartbeat response from + * its monitored {@link HeartbeatTarget}. The heartbeat period is configurable. + * + * @param <I> Type of the incoming heartbeat payload + * @param <O> Type of the outgoind heartbeat payload + */ +public class HeartbeatManagerSenderImpl<I, O> extends HeartbeatManagerImpl<I, O> implements Runnable { + + private final ScheduledFuture<?> triggerFuture; + + public HeartbeatManagerSenderImpl( + long heartbeatPeriod, + long heartbeatTimeout, + ResourceID ownResourceID, + ExecutorService executorService, + ScheduledExecutorService scheduledExecutorService, + Logger log) { + super(heartbeatTimeout, ownResourceID, executorService, scheduledExecutorService, log); + + triggerFuture = scheduledExecutorService.scheduleAtFixedRate(this, 0L, heartbeatPeriod, TimeUnit.MILLISECONDS); + } + + @Override + public void run() { + if (!stopped) { + log.debug("Trigger heartbeat request."); + for (HeartbeatMonitor<O> heartbeatMonitor : getHeartbeatTargets()) { + Future<O> futurePayload = getHeartbeatListener().retrievePayload(); + final HeartbeatTarget<O> heartbeatTarget = heartbeatMonitor.getHeartbeatTarget(); + + if (futurePayload != null) { + futurePayload.thenAcceptAsync(new AcceptFunction<O>() { + @Override + public void accept(O payload) { + heartbeatTarget.requestHeartbeat(getOwnResourceID(), payload); + } + }, getExecutor()); + } else { + heartbeatTarget.requestHeartbeat(getOwnResourceID(), null); + } + } + } + } + + @Override + public void stop() { + triggerFuture.cancel(true); + super.stop(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/3e4eb4f9/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatTarget.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatTarget.java b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatTarget.java new file mode 100644 index 0000000..ef953de --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatTarget.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.heartbeat; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; + +/** + * Interface for components which can be sent heartbeats and from which one can request a + * heartbeat response. Both the heartbeat response as well as the heartbeat request can carry a + * payload. This payload is reported to the heartbeat target and contains additional information. + * The payload can be empty which is indicated by a null value. + * + * @param <I> Type of the payload which is sent to the heartbeat target + */ +public interface HeartbeatTarget<I> { + + /** + * Sends a heartbeat response to the target. Each heartbeat response can carry a payload which + * contains additional information for the heartbeat target. + * + * @param resourceID Resource ID identifying the machine for which a heartbeat shall be reported. + * @param payload Payload of the heartbeat response. Null indicates an empty payload. + */ + void sendHeartbeat(ResourceID resourceID, I payload); + + /** + * Requests a heartbeat from the target. Each heartbeat request can carry a payload which + * contains additional information for the heartbeat target. + * + * @param resourceID Resource ID identifying the machine issuing the heartbeat request. + * @param payload Payload of the heartbeat response. Null indicates an empty payload. + */ + void requestHeartbeat(ResourceID resourceID, I payload); +} http://git-wip-us.apache.org/repos/asf/flink/blob/3e4eb4f9/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java new file mode 100644 index 0000000..1c62f17 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.heartbeat; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.CompletableFuture; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; +import org.apache.flink.runtime.util.DirectExecutorService; +import org.apache.flink.util.TestLogger; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + + +public class HeartbeatManagerTest extends TestLogger { + private static final Logger LOG = LoggerFactory.getLogger(HeartbeatManagerTest.class); + + /** + * Tests that regular heartbeat signal triggers the right callback functions in the + * {@link HeartbeatListener}. + */ + @Test + public void testRegularHeartbeat() { + long heartbeatTimeout = 1000L; + ResourceID ownResourceID = new ResourceID("foobar"); + ResourceID targetResourceID = new ResourceID("barfoo"); + HeartbeatListener<Object, Object> heartbeatListener = mock(HeartbeatListener.class); + ScheduledExecutorService scheduledExecutorService = mock(ScheduledExecutorService.class); + + Object expectedObject = new Object(); + + when(heartbeatListener.retrievePayload()).thenReturn(FlinkCompletableFuture.completed(expectedObject)); + + HeartbeatManagerImpl<Object, Object> heartbeatManager = new HeartbeatManagerImpl<>( + heartbeatTimeout, + ownResourceID, + new DirectExecutorService(), + scheduledExecutorService, + LOG); + + heartbeatManager.start(heartbeatListener); + + HeartbeatTarget<Object> heartbeatTarget = mock(HeartbeatTarget.class); + + heartbeatManager.monitorTarget(targetResourceID, heartbeatTarget); + + heartbeatManager.requestHeartbeat(targetResourceID, expectedObject); + + verify(heartbeatListener, times(1)).reportPayload(targetResourceID, expectedObject); + verify(heartbeatListener, times(1)).retrievePayload(); + verify(heartbeatTarget, times(1)).sendHeartbeat(ownResourceID, expectedObject); + + heartbeatManager.sendHeartbeat(targetResourceID, expectedObject); + + verify(heartbeatListener, times(2)).reportPayload(targetResourceID, expectedObject); + } + + /** + * Tests that the heartbeat monitors are updated when receiving a new heartbeat signal. + */ + @Test + public void testHeartbeatMonitorUpdate() { + long heartbeatTimeout = 1000L; + ResourceID ownResourceID = new ResourceID("foobar"); + ResourceID targetResourceID = new ResourceID("barfoo"); + HeartbeatListener<Object, Object> heartbeatListener = mock(HeartbeatListener.class); + ScheduledExecutorService scheduledExecutorService = mock(ScheduledExecutorService.class); + ScheduledFuture<?> scheduledFuture = mock(ScheduledFuture.class); + + doReturn(scheduledFuture).when(scheduledExecutorService).schedule(any(Runnable.class), anyLong(), any(TimeUnit.class)); + + Object expectedObject = new Object(); + + when(heartbeatListener.retrievePayload()).thenReturn(FlinkCompletableFuture.completed(expectedObject)); + + HeartbeatManagerImpl<Object, Object> heartbeatManager = new HeartbeatManagerImpl<>( + heartbeatTimeout, + ownResourceID, + new DirectExecutorService(), + scheduledExecutorService, + LOG); + + heartbeatManager.start(heartbeatListener); + + HeartbeatTarget<Object> heartbeatTarget = mock(HeartbeatTarget.class); + + heartbeatManager.monitorTarget(targetResourceID, heartbeatTarget); + + heartbeatManager.sendHeartbeat(targetResourceID, expectedObject); + + verify(scheduledFuture, times(1)).cancel(true); + verify(scheduledExecutorService, times(2)).schedule(any(Runnable.class), eq(heartbeatTimeout), eq(TimeUnit.MILLISECONDS)); + } + + /** + * Tests that a heartbeat timeout is signaled if the heartbeat is not reported in time. + * + * @throws Exception + */ + @Test + public void testHeartbeatTimeout() throws Exception { + long heartbeatTimeout = 100L; + int numHeartbeats = 10; + long heartbeatInterval = 20L; + Object payload = new Object(); + + ResourceID ownResourceID = new ResourceID("foobar"); + ResourceID targetResourceID = new ResourceID("barfoo"); + TestingHeartbeatListener heartbeatListener = new TestingHeartbeatListener(payload); + ScheduledExecutorService scheduledExecutorService = mock(ScheduledExecutorService.class); + ScheduledFuture<?> scheduledFuture = mock(ScheduledFuture.class); + + doReturn(scheduledFuture).when(scheduledExecutorService).schedule(any(Runnable.class), anyLong(), any(TimeUnit.class)); + + Object expectedObject = new Object(); + + HeartbeatManagerImpl<Object, Object> heartbeatManager = new HeartbeatManagerImpl<>( + heartbeatTimeout, + ownResourceID, + new DirectExecutorService(), + new ScheduledThreadPoolExecutor(1), + LOG); + + heartbeatManager.start(heartbeatListener); + + HeartbeatTarget<Object> heartbeatTarget = mock(HeartbeatTarget.class); + + Future<ResourceID> timeoutFuture = heartbeatListener.getTimeoutFuture(); + + heartbeatManager.monitorTarget(targetResourceID, heartbeatTarget); + + for (int i = 0; i < numHeartbeats; i++) { + heartbeatManager.sendHeartbeat(targetResourceID, expectedObject); + Thread.sleep(heartbeatInterval); + } + + assertFalse(timeoutFuture.isDone()); + + ResourceID timeoutResourceID = timeoutFuture.get(2 * heartbeatTimeout, TimeUnit.MILLISECONDS); + + assertEquals(targetResourceID, timeoutResourceID); + } + + /** + * Tests the heartbeat interplay between the {@link HeartbeatManagerImpl} and the + * {@link HeartbeatManagerSenderImpl}. The sender should regularly trigger heartbeat requests + * which are fulfilled by the receiver. Upon stopping the receiver, the sender should notify + * the heartbeat listener about the heartbeat timeout. + * + * @throws Exception + */ + @Test + public void testHeartbeatCluster() throws Exception { + long heartbeatTimeout = 100L; + long heartbeatPeriod = 20L; + Object object = new Object(); + Object object2 = new Object(); + ResourceID resourceID = new ResourceID("foobar"); + ResourceID resourceID2 = new ResourceID("barfoo"); + HeartbeatListener<Object, Object> heartbeatListener = mock(HeartbeatListener.class); + + when(heartbeatListener.retrievePayload()).thenReturn(FlinkCompletableFuture.completed(object)); + + TestingHeartbeatListener heartbeatListener2 = new TestingHeartbeatListener(object2); + + Future<ResourceID> futureTimeout = heartbeatListener2.getTimeoutFuture(); + + HeartbeatManagerImpl<Object, Object> heartbeatManager = new HeartbeatManagerImpl<>( + heartbeatTimeout, + resourceID, + new DirectExecutorService(), + new ScheduledThreadPoolExecutor(1), + LOG); + + HeartbeatManagerSenderImpl<Object, Object> heartbeatManager2 = new HeartbeatManagerSenderImpl<>( + heartbeatPeriod, + heartbeatTimeout, + resourceID2, + new DirectExecutorService(), + new ScheduledThreadPoolExecutor(1), + LOG);; + + heartbeatManager.start(heartbeatListener); + heartbeatManager2.start(heartbeatListener2); + + heartbeatManager.monitorTarget(resourceID2, heartbeatManager2); + heartbeatManager2.monitorTarget(resourceID, heartbeatManager); + + Thread.sleep(2 * heartbeatTimeout); + + assertFalse(futureTimeout.isDone()); + + heartbeatManager.stop(); + + ResourceID timeoutResourceID = futureTimeout.get(2 * heartbeatTimeout, TimeUnit.MILLISECONDS); + + assertEquals(resourceID, timeoutResourceID); + + int numberHeartbeats = (int) (2 * heartbeatTimeout / heartbeatPeriod); + + verify(heartbeatListener, atLeast(numberHeartbeats / 2)).reportPayload(resourceID2, object2); + assertTrue(heartbeatListener2.getNumberHeartbeatReports() >= numberHeartbeats / 2); + } + + /** + * Tests that after unmonitoring a target, there won't be a timeout triggered + */ + @Test + public void testTargetUnmonitoring() throws InterruptedException, ExecutionException { + // this might be too aggresive for Travis, let's see... + long heartbeatTimeout = 100L; + ResourceID resourceID = new ResourceID("foobar"); + ResourceID targetID = new ResourceID("target"); + Object object = new Object(); + + HeartbeatManager<Object, Object> heartbeatManager = new HeartbeatManagerImpl<>( + heartbeatTimeout, + resourceID, + new DirectExecutorService(), + new ScheduledThreadPoolExecutor(1), + LOG); + + TestingHeartbeatListener heartbeatListener = new TestingHeartbeatListener(object); + + heartbeatManager.start(heartbeatListener); + + heartbeatManager.monitorTarget(targetID, mock(HeartbeatTarget.class)); + + heartbeatManager.unmonitorTarget(targetID); + + Future<ResourceID> timeout = heartbeatListener.getTimeoutFuture(); + + + try { + timeout.get(2 * heartbeatTimeout, TimeUnit.MILLISECONDS); + fail("Timeout should time out."); + } catch (TimeoutException e) { + // the timeout should not be completed since we unmonitored the target + } + } + + static class TestingHeartbeatListener implements HeartbeatListener<Object, Object> { + + private final CompletableFuture<ResourceID> future = new FlinkCompletableFuture<>(); + + private final Object payload; + + private int numberHeartbeatReports; + + TestingHeartbeatListener(Object payload) { + this.payload = payload; + } + + public Future<ResourceID> getTimeoutFuture() { + return future; + } + + public int getNumberHeartbeatReports() { + return numberHeartbeatReports; + } + + @Override + public void notifyHeartbeatTimeout(ResourceID resourceID) { + future.complete(resourceID); + } + + @Override + public void reportPayload(ResourceID resourceID, Object payload) { + numberHeartbeatReports++; + } + + @Override + public Future<Object> retrievePayload() { + return FlinkCompletableFuture.completed(payload); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/3e4eb4f9/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java index 805ea71..a87fe42 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java @@ -25,7 +25,6 @@ import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.clusterframework.types.SlotID; import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; -import org.apache.flink.runtime.concurrent.impl.FlinkFuture; import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; import org.apache.flink.runtime.jobmaster.JobMasterGateway; import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; @@ -43,12 +42,9 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; import java.util.Collections; import java.util.UUID; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import static org.mockito.Matchers.any;