[ https://issues.apache.org/jira/browse/NIFI-1678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15244977#comment-15244977 ]
ASF GitHub Bot commented on NIFI-1678: -------------------------------------- Github user olegz commented on a diff in the pull request: https://github.com/apache/nifi/pull/323#discussion_r59995310 --- Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/CuratorHeartbeatMonitor.java --- @@ -0,0 +1,376 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.heartbeat; + +import java.io.ByteArrayInputStream; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import javax.xml.bind.JAXBContext; +import javax.xml.bind.Unmarshaller; + +import org.apache.curator.RetryPolicy; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.RetryForever; +import org.apache.nifi.cluster.coordination.ClusterCoordinator; +import org.apache.nifi.cluster.coordination.node.DisconnectionCode; +import org.apache.nifi.cluster.coordination.node.NodeConnectionState; +import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.cluster.protocol.message.HeartbeatMessage; +import org.apache.nifi.controller.cluster.ZooKeeperClientConfig; +import org.apache.nifi.engine.FlowEngine; +import org.apache.nifi.reporting.Severity; +import org.apache.nifi.util.FormatUtils; +import org.apache.nifi.util.NiFiProperties; +import org.apache.nifi.util.StopWatch; +import org.apache.zookeeper.KeeperException.NoNodeException; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Uses Apache Curator to monitor heartbeats from nodes + */ +public class CuratorHeartbeatMonitor implements HeartbeatMonitor { + private static final Logger logger = LoggerFactory.getLogger(CuratorHeartbeatMonitor.class); + private static final Unmarshaller unmarshaller; + + private final ClusterCoordinator clusterCoordinator; + private final ZooKeeperClientConfig zkClientConfig; + private final String heartbeatPath; + private final int heartbeatIntervalMillis; + + private volatile CuratorFramework curatorClient; + private volatile ScheduledFuture<?> future; + private volatile Map<NodeIdentifier, NodeHeartbeat> latestHeartbeatMessages; + private volatile long latestHeartbeatTime; + + private final FlowEngine flowEngine = new FlowEngine(1, "Heartbeat Monitor", true); + + static { + try { + final JAXBContext jaxbContext = JAXBContext.newInstance(HeartbeatMessage.class); + unmarshaller = jaxbContext.createUnmarshaller(); + } catch (final Exception e) { + throw new RuntimeException("Failed to create an Unmarshaller for unmarshalling Heartbeat Messages", e); + } + } + + public CuratorHeartbeatMonitor(final ClusterCoordinator clusterCoordinator, final Properties properties) { + this.clusterCoordinator = clusterCoordinator; + this.zkClientConfig = ZooKeeperClientConfig.createConfig(properties); + this.heartbeatPath = zkClientConfig.resolvePath("cluster/heartbeats"); + + final String heartbeatInterval = properties.getProperty(NiFiProperties.CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL, + NiFiProperties.DEFAULT_CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL); + + this.heartbeatIntervalMillis = (int) FormatUtils.getTimeDuration(heartbeatInterval, TimeUnit.MILLISECONDS); + } + + @Override + public void start() { + final RetryPolicy retryPolicy = new RetryForever(5000); + curatorClient = CuratorFrameworkFactory.newClient(zkClientConfig.getConnectString(), + zkClientConfig.getSessionTimeoutMillis(), zkClientConfig.getConnectionTimeoutMillis(), retryPolicy); + curatorClient.start(); + + this.future = flowEngine.scheduleWithFixedDelay(new Runnable() { + @Override + public void run() { + try { + monitorHeartbeats(); + } catch (final Exception e) { + clusterCoordinator.reportEvent(null, Severity.ERROR, "Failed to process heartbeats from nodes due to " + e.toString()); + logger.error("Failed to process heartbeats", e); + } + } + }, heartbeatIntervalMillis, heartbeatIntervalMillis, TimeUnit.MILLISECONDS); + } + + private CuratorFramework getClient() { + return curatorClient; + } + + @Override + public void stop() { + final CuratorFramework client = getClient(); + if (client != null) { + client.close(); + } + + if (future != null) { + future.cancel(true); + } + } + + @Override + public NodeHeartbeat getLatestHeartbeat(final NodeIdentifier nodeId) { + return latestHeartbeatMessages.get(nodeId); + } + + + /** + * Fetches all of the latest heartbeats from ZooKeeper + * and updates the Cluster Coordinator as appropriate, + * based on the heartbeats received. + * + * Visible for testing. + */ + synchronized void monitorHeartbeats() { + final StopWatch fetchStopWatch = new StopWatch(true); + final Map<NodeIdentifier, NodeHeartbeat> latestHeartbeats = fetchHeartbeats(); + if (latestHeartbeats == null || latestHeartbeats.isEmpty()) { + // failed to fetch heartbeats from ZooKeeper; don't change anything. + clusterCoordinator.reportEvent(null, Severity.WARNING, "Failed to retrieve any new heartbeat information for nodes from ZooKeeper. " + + "Will not make any decisions based on heartbeats."); + return; + } + + this.latestHeartbeatMessages = new HashMap<>(latestHeartbeats); + fetchStopWatch.stop(); + + final StopWatch procStopWatch = new StopWatch(true); + for (final NodeHeartbeat heartbeat : latestHeartbeats.values()) { + try { + processHeartbeat(heartbeat); + } catch (final Exception e) { + clusterCoordinator.reportEvent(null, Severity.ERROR, + "Received heartbeat from " + heartbeat.getNodeIdentifier() + " but failed to process heartbeat due to " + e); + logger.error("Failed to process heartbeat from {} due to {}", heartbeat.getNodeIdentifier(), e.toString()); + logger.error("", e); + } + } + + procStopWatch.stop(); + logger.info("Finished processing {} heartbeats in {} (fetch took an additional {})", + latestHeartbeats.size(), procStopWatch.getDuration(), fetchStopWatch.getDuration()); + + // Disconnect any node that hasn't sent a heartbeat in a long time (8 times the heartbeat interval) + for (final NodeHeartbeat heartbeat : latestHeartbeats.values()) { + final long maxMillis = heartbeatIntervalMillis * 1000L * 8; + final long threshold = latestHeartbeatTime - maxMillis; --- End diff -- Do those need to be recomputed for each iteration? > Nodes in cluster should use ZooKeeper to store heartbeat messages instead of > sending to NCM > ------------------------------------------------------------------------------------------- > > Key: NIFI-1678 > URL: https://issues.apache.org/jira/browse/NIFI-1678 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework > Reporter: Mark Payne > Assignee: Mark Payne > Fix For: 1.0.0 > > > Currently, nodes send heartbeats to the NCM periodically in order to indicate > that they are actively participating in the cluster. As we move away from > using an NCM, we need these heartbeats to go somewhere else. ZooKeeper is a > reasonable location to push the heartbeats to, as it provides the HA that we > need -- This message was sent by Atlassian JIRA (v6.3.4#6332)