[ https://issues.apache.org/jira/browse/FLINK-4478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15535622#comment-15535622 ]
ASF GitHub Bot commented on FLINK-4478: --------------------------------------- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2435#discussion_r81313363 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java --- @@ -0,0 +1,328 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.heartbeat; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.AcceptFunction; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; + +import javax.annotation.concurrent.ThreadSafe; +import java.util.Collection; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Heartbeat manager implementation. The heartbeat manager maintains a map of heartbeat monitors + * and resource IDs. Each monitor will be updated when a new heartbeat of the associated machine has + * been received. If the monitor detects that a heartbeat has timed out, it will notify the + * {@link HeartbeatListener} about it. A heartbeat times out iff no heartbeat signal has been + * received within a given timeout interval. + * + * @param <I> Type of the incoming heartbeat payload + * @param <O> Type of the outgoing heartbeat payload + */ +@ThreadSafe +public class HeartbeatManagerImpl<I, O> implements HeartbeatManager<I, O>, HeartbeatTarget<I> { + + /** Heartbeat timeout interval in milli seconds */ + private final long heartbeatTimeoutIntervalMs; + + /** Resource ID which is used to mark one own's heartbeat signals */ + private final ResourceID ownResourceID; + + /** Executor service used to run heartbeat timeout notifications */ + private final ScheduledExecutorService scheduledExecutorService; + + protected final Logger log; + + /** Map containing the heartbeat monitors associated with the respective resource ID */ + private final ConcurrentHashMap<ResourceID, HeartbeatManagerImpl.HeartbeatMonitor<O>> heartbeatTargets; + + /** Execution context used to run future callbacks */ + private final Executor executor; + + /** Heartbeat listener with which the heartbeat manager has been associated */ + private HeartbeatListener<I, O> heartbeatListener; + + /** Running state of the heartbeat manager */ + protected boolean stopped; + + public HeartbeatManagerImpl( + long heartbeatTimeoutIntervalMs, + ResourceID ownResourceID, + Executor executor, + ScheduledExecutorService scheduledExecutorService, + Logger log) { + Preconditions.checkArgument(heartbeatTimeoutIntervalMs > 0L, "The heartbeat timeout has to be larger than 0."); + + this.heartbeatTimeoutIntervalMs = heartbeatTimeoutIntervalMs; + this.ownResourceID = Preconditions.checkNotNull(ownResourceID); + this.scheduledExecutorService = Preconditions.checkNotNull(scheduledExecutorService); + this.log = Preconditions.checkNotNull(log); + this.executor = Preconditions.checkNotNull(executor); + this.heartbeatTargets = new ConcurrentHashMap<>(16); + + stopped = true; + } + + //---------------------------------------------------------------------------------------------- + // Getters + //---------------------------------------------------------------------------------------------- + + ResourceID getOwnResourceID() { + return ownResourceID; + } + + Executor getExecutor() { + return executor; + } + + HeartbeatListener<I, O> getHeartbeatListener() { + return heartbeatListener; + } + + Collection<HeartbeatManagerImpl.HeartbeatMonitor<O>> getHeartbeatTargets() { + return heartbeatTargets.values(); + } + + //---------------------------------------------------------------------------------------------- + // HeartbeatManager methods + //---------------------------------------------------------------------------------------------- + + @Override + public void monitorTarget(ResourceID resourceID, HeartbeatTarget<O> heartbeatTarget) { + if (!stopped) { + if (heartbeatTargets.containsKey(resourceID)) { + log.info("The target with resource ID {} is already been monitored.", resourceID); + } else { + HeartbeatManagerImpl.HeartbeatMonitor<O> heartbeatMonitor = new HeartbeatManagerImpl.HeartbeatMonitor<>( + resourceID, + heartbeatTarget, + scheduledExecutorService, + heartbeatListener, + heartbeatTimeoutIntervalMs); + + heartbeatTargets.put( + resourceID, + heartbeatMonitor); + + // check if we have stopped in the meantime (concurrent stop operation) + if (stopped) { + heartbeatMonitor.cancel(); + + heartbeatTargets.remove(resourceID); + } + } + } + } + + @Override + public void unmonitorTarget(ResourceID resourceID) { + if (!stopped) { + HeartbeatManagerImpl.HeartbeatMonitor<O> heartbeatMonitor = heartbeatTargets.remove(resourceID); + + if (heartbeatMonitor != null) { + heartbeatMonitor.cancel(); + } + } + } + + @Override + public void start(HeartbeatListener<I, O> heartbeatListener) { + Preconditions.checkState(stopped, "Cannot start an already started heartbeat manager."); + + stopped = false; + + this.heartbeatListener = Preconditions.checkNotNull(heartbeatListener); + } + + @Override + public void stop() { + stopped = true; + + for (HeartbeatManagerImpl.HeartbeatMonitor<O> heartbeatMonitor : heartbeatTargets.values()) { + heartbeatMonitor.cancel(); + } + + heartbeatTargets.clear(); + } + + //---------------------------------------------------------------------------------------------- + // HeartbeatTarget methods + //---------------------------------------------------------------------------------------------- + + @Override + public void sendHeartbeat(ResourceID resourceID, I payload) { + if (!stopped) { + log.debug("Received heartbeat from {}.", resourceID); + reportHeartbeat(resourceID); + + if (payload != null) { + heartbeatListener.reportPayload(resourceID, payload); + } + } + } + + @Override + public void requestHeartbeat(ResourceID resourceID, I payload) { + if (!stopped) { + log.debug("Received heartbeat request from {}.", resourceID); + + final HeartbeatTarget<O> heartbeatTarget = reportHeartbeat(resourceID); + + if (heartbeatTarget != null) { + if (payload != null) { + heartbeatListener.reportPayload(resourceID, payload); + } + + Future<O> futurePayload = heartbeatListener.retrievePayload(); + + if (futurePayload != null) { + futurePayload.thenAcceptAsync(new AcceptFunction<O>() { + @Override + public void accept(O retrievedPayload) { + heartbeatTarget.sendHeartbeat(getOwnResourceID(), retrievedPayload); + } + }, executor); + } else { + heartbeatTarget.sendHeartbeat(ownResourceID, null); + } + } + } + } + + HeartbeatTarget<O> reportHeartbeat(ResourceID resourceID) { + if (heartbeatTargets.containsKey(resourceID)) { + HeartbeatManagerImpl.HeartbeatMonitor<O> heartbeatMonitor = heartbeatTargets.get(resourceID); + heartbeatMonitor.reportHeartbeat(); + + return heartbeatMonitor.getHeartbeatTarget(); + } else { + return null; + } + } + + //---------------------------------------------------------------------------------------------- + // Utility classes + //---------------------------------------------------------------------------------------------- + + /** + * Heartbeat monitor which manages the heartbeat state of the associated heartbeat target. The + * monitor notifies the {@link HeartbeatListener} whenever it has not seen a heartbeat signal + * in the specified heartbeat timeout interval. Each heartbeat signal resets this timer. + * + * @param <O> Type of the payload being sent to the associated heartbeat target + */ + static class HeartbeatMonitor<O> implements Runnable { + + /** Resource ID of the monitored heartbeat target */ + private final ResourceID resourceID; + + /** Associated heartbeat target */ + private final HeartbeatTarget<O> heartbeatTarget; + + private final ScheduledExecutorService scheduledExecutorService; + + /** Listener which is notified about heartbeat timeouts */ + private final HeartbeatListener<?, ?> heartbeatListener; + + /** Maximum heartbeat timeout interval */ + private final long heartbeatTimeoutIntervalMs; + + private volatile ScheduledFuture<?> futureTimeout; + + private final AtomicReference<State> state = new AtomicReference<>(State.RUNNING); + + HeartbeatMonitor( + ResourceID resourceID, + HeartbeatTarget<O> heartbeatTarget, + ScheduledExecutorService scheduledExecutorService, + HeartbeatListener<?, O> heartbeatListener, + long heartbeatTimeoutIntervalMs) { + + this.resourceID = Preconditions.checkNotNull(resourceID); + this.heartbeatTarget = Preconditions.checkNotNull(heartbeatTarget); + this.scheduledExecutorService = Preconditions.checkNotNull(scheduledExecutorService); + this.heartbeatListener = Preconditions.checkNotNull(heartbeatListener); + + Preconditions.checkArgument(heartbeatTimeoutIntervalMs >= 0L, "The heartbeat timeout interval has to be larger than 0."); + this.heartbeatTimeoutIntervalMs = heartbeatTimeoutIntervalMs; + + resetHeartbeatTimeout(heartbeatTimeoutIntervalMs); + } + + HeartbeatTarget<O> getHeartbeatTarget() { + return heartbeatTarget; + } + + void reportHeartbeat() { + resetHeartbeatTimeout(heartbeatTimeoutIntervalMs); + } + + void resetHeartbeatTimeout(long heartbeatTimeout) { + if (state.get() == State.RUNNING) { + cancelTimeout(); + + futureTimeout = scheduledExecutorService.schedule(this, heartbeatTimeout, TimeUnit.MILLISECONDS); + + // Double check for concurrent accesses (e.g. a firing of the scheduled future) + if (state.get() != State.RUNNING) { + cancelTimeout(); + } --- End diff -- This looks like an optimization. Is is really necessary? We can still change state after we have checked. > Implement heartbeat logic > ------------------------- > > Key: FLINK-4478 > URL: https://issues.apache.org/jira/browse/FLINK-4478 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination > Affects Versions: 1.1.0 > Reporter: Till Rohrmann > Assignee: Till Rohrmann > Fix For: 1.2.0 > > > With the Flip-6 refactoring, we'll have the need for a dedicated heartbeat > component. The heartbeat component is used to check the liveliness of the > distributed components among each other. Furthermore, heartbeats are used to > regularly transmit status updates to another component. For example, the > TaskManager informs the ResourceManager with each heartbeat about the current > slot allocation. > The heartbeat is initiated from one component. This component sends a > heartbeat request to another component which answers with an heartbeat > response. Thus, one can differentiate between a sending and a receiving side. > Apart from the triggering of the heartbeat request, the logic of treating > heartbeats, marking components dead and payload delivery are the same and > should be reusable by different distributed components (JM, TM, RM). > Different models for the heartbeat reporting are conceivable. First of all, > the heartbeat request could be sent as an ask operation where the heartbeat > response is returned as a future on the sending side. Alternatively, the > sending side could request a heartbeat response by sending a tell message. > The heartbeat response is then delivered by an RPC back to the heartbeat > sender. The latter model has the advantage that a heartbeat response is not > tightly coupled to a heartbeat request. Such a tight coupling could cause > that heartbeat response are ignored after the future has timed out even > though they might still contain valuable information (receiver is still > alive). > Furthermore, different strategies for the heartbeat triggering and marking > heartbeat targets as dead are conceivable. For example, we could periodically > (with a fixed period) trigger a heartbeat request and mark all targets as > dead if we didn't receive a heartbeat response in a given time period. > Furthermore, we could adapt the heartbeat interval and heartbeat timeouts > with respect to the latency of previous heartbeat responses. This would > reflect the current load and network conditions better. > For the first version, I would propose to use a fixed period heartbeat with a > maximum heartbeat timeout before a target is marked dead. Furthermore, I > would propose to use tell messages (fire and forget) to request and report > heartbeats because they are the more flexible model imho. -- This message was sent by Atlassian JIRA (v6.3.4#6332)