[ https://issues.apache.org/jira/browse/FLINK-4478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15535624#comment-15535624 ]
ASF GitHub Bot commented on FLINK-4478: --------------------------------------- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2435#discussion_r81310859 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java --- @@ -0,0 +1,328 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.heartbeat; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.AcceptFunction; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; + +import javax.annotation.concurrent.ThreadSafe; +import java.util.Collection; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Heartbeat manager implementation. The heartbeat manager maintains a map of heartbeat monitors + * and resource IDs. Each monitor will be updated when a new heartbeat of the associated machine has + * been received. If the monitor detects that a heartbeat has timed out, it will notify the + * {@link HeartbeatListener} about it. A heartbeat times out iff no heartbeat signal has been + * received within a given timeout interval. + * + * @param <I> Type of the incoming heartbeat payload + * @param <O> Type of the outgoing heartbeat payload + */ +@ThreadSafe +public class HeartbeatManagerImpl<I, O> implements HeartbeatManager<I, O>, HeartbeatTarget<I> { + + /** Heartbeat timeout interval in milli seconds */ + private final long heartbeatTimeoutIntervalMs; + + /** Resource ID which is used to mark one own's heartbeat signals */ + private final ResourceID ownResourceID; + + /** Executor service used to run heartbeat timeout notifications */ + private final ScheduledExecutorService scheduledExecutorService; + + protected final Logger log; + + /** Map containing the heartbeat monitors associated with the respective resource ID */ + private final ConcurrentHashMap<ResourceID, HeartbeatManagerImpl.HeartbeatMonitor<O>> heartbeatTargets; + + /** Execution context used to run future callbacks */ + private final Executor executor; + + /** Heartbeat listener with which the heartbeat manager has been associated */ + private HeartbeatListener<I, O> heartbeatListener; + + /** Running state of the heartbeat manager */ + protected boolean stopped; + + public HeartbeatManagerImpl( + long heartbeatTimeoutIntervalMs, + ResourceID ownResourceID, + Executor executor, + ScheduledExecutorService scheduledExecutorService, + Logger log) { + Preconditions.checkArgument(heartbeatTimeoutIntervalMs > 0L, "The heartbeat timeout has to be larger than 0."); + + this.heartbeatTimeoutIntervalMs = heartbeatTimeoutIntervalMs; + this.ownResourceID = Preconditions.checkNotNull(ownResourceID); + this.scheduledExecutorService = Preconditions.checkNotNull(scheduledExecutorService); + this.log = Preconditions.checkNotNull(log); + this.executor = Preconditions.checkNotNull(executor); + this.heartbeatTargets = new ConcurrentHashMap<>(16); + + stopped = true; + } + + //---------------------------------------------------------------------------------------------- + // Getters + //---------------------------------------------------------------------------------------------- + + ResourceID getOwnResourceID() { + return ownResourceID; + } + + Executor getExecutor() { + return executor; + } + + HeartbeatListener<I, O> getHeartbeatListener() { + return heartbeatListener; + } + + Collection<HeartbeatManagerImpl.HeartbeatMonitor<O>> getHeartbeatTargets() { + return heartbeatTargets.values(); + } + + //---------------------------------------------------------------------------------------------- + // HeartbeatManager methods + //---------------------------------------------------------------------------------------------- + + @Override + public void monitorTarget(ResourceID resourceID, HeartbeatTarget<O> heartbeatTarget) { + if (!stopped) { + if (heartbeatTargets.containsKey(resourceID)) { + log.info("The target with resource ID {} is already been monitored.", resourceID); + } else { + HeartbeatManagerImpl.HeartbeatMonitor<O> heartbeatMonitor = new HeartbeatManagerImpl.HeartbeatMonitor<>( + resourceID, + heartbeatTarget, + scheduledExecutorService, + heartbeatListener, + heartbeatTimeoutIntervalMs); + + heartbeatTargets.put( + resourceID, + heartbeatMonitor); + + // check if we have stopped in the meantime (concurrent stop operation) + if (stopped) { + heartbeatMonitor.cancel(); + + heartbeatTargets.remove(resourceID); + } --- End diff -- What if the `stopped = true` is set here? It seems kind of arbitrary to check at this point in time. Isn't it enough to put the `heartbeatMonitor` into the ConcurrentHashMap and remove the above lines? If we really want to be sure that no concurrency occurs while adding a new `HeartbeatMonitor`, why don't we synchronize on a lock then? > Implement heartbeat logic > ------------------------- > > Key: FLINK-4478 > URL: https://issues.apache.org/jira/browse/FLINK-4478 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination > Affects Versions: 1.1.0 > Reporter: Till Rohrmann > Assignee: Till Rohrmann > Fix For: 1.2.0 > > > With the Flip-6 refactoring, we'll have the need for a dedicated heartbeat > component. The heartbeat component is used to check the liveliness of the > distributed components among each other. Furthermore, heartbeats are used to > regularly transmit status updates to another component. For example, the > TaskManager informs the ResourceManager with each heartbeat about the current > slot allocation. > The heartbeat is initiated from one component. This component sends a > heartbeat request to another component which answers with an heartbeat > response. Thus, one can differentiate between a sending and a receiving side. > Apart from the triggering of the heartbeat request, the logic of treating > heartbeats, marking components dead and payload delivery are the same and > should be reusable by different distributed components (JM, TM, RM). > Different models for the heartbeat reporting are conceivable. First of all, > the heartbeat request could be sent as an ask operation where the heartbeat > response is returned as a future on the sending side. Alternatively, the > sending side could request a heartbeat response by sending a tell message. > The heartbeat response is then delivered by an RPC back to the heartbeat > sender. The latter model has the advantage that a heartbeat response is not > tightly coupled to a heartbeat request. Such a tight coupling could cause > that heartbeat response are ignored after the future has timed out even > though they might still contain valuable information (receiver is still > alive). > Furthermore, different strategies for the heartbeat triggering and marking > heartbeat targets as dead are conceivable. For example, we could periodically > (with a fixed period) trigger a heartbeat request and mark all targets as > dead if we didn't receive a heartbeat response in a given time period. > Furthermore, we could adapt the heartbeat interval and heartbeat timeouts > with respect to the latency of previous heartbeat responses. This would > reflect the current load and network conditions better. > For the first version, I would propose to use a fixed period heartbeat with a > maximum heartbeat timeout before a target is marked dead. Furthermore, I > would propose to use tell messages (fire and forget) to request and report > heartbeats because they are the more flexible model imho. -- This message was sent by Atlassian JIRA (v6.3.4#6332)