[ https://issues.apache.org/jira/browse/NIFI-1678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15254378#comment-15254378 ]
ASF GitHub Bot commented on NIFI-1678: -------------------------------------- Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/323#discussion_r60781637 --- Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/CuratorHeartbeatMonitor.java --- @@ -0,0 +1,376 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.heartbeat; + +import java.io.ByteArrayInputStream; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import javax.xml.bind.JAXBContext; +import javax.xml.bind.Unmarshaller; + +import org.apache.curator.RetryPolicy; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.RetryForever; +import org.apache.nifi.cluster.coordination.ClusterCoordinator; +import org.apache.nifi.cluster.coordination.node.DisconnectionCode; +import org.apache.nifi.cluster.coordination.node.NodeConnectionState; +import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.cluster.protocol.message.HeartbeatMessage; +import org.apache.nifi.controller.cluster.ZooKeeperClientConfig; +import org.apache.nifi.engine.FlowEngine; +import org.apache.nifi.reporting.Severity; +import org.apache.nifi.util.FormatUtils; +import org.apache.nifi.util.NiFiProperties; +import org.apache.nifi.util.StopWatch; +import org.apache.zookeeper.KeeperException.NoNodeException; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Uses Apache Curator to monitor heartbeats from nodes + */ +public class CuratorHeartbeatMonitor implements HeartbeatMonitor { + private static final Logger logger = LoggerFactory.getLogger(CuratorHeartbeatMonitor.class); + private static final Unmarshaller unmarshaller; + + private final ClusterCoordinator clusterCoordinator; + private final ZooKeeperClientConfig zkClientConfig; + private final String heartbeatPath; + private final int heartbeatIntervalMillis; + + private volatile CuratorFramework curatorClient; + private volatile ScheduledFuture<?> future; + private volatile Map<NodeIdentifier, NodeHeartbeat> latestHeartbeatMessages; + private volatile long latestHeartbeatTime; + + private final FlowEngine flowEngine = new FlowEngine(1, "Heartbeat Monitor", true); + + static { + try { + final JAXBContext jaxbContext = JAXBContext.newInstance(HeartbeatMessage.class); + unmarshaller = jaxbContext.createUnmarshaller(); + } catch (final Exception e) { + throw new RuntimeException("Failed to create an Unmarshaller for unmarshalling Heartbeat Messages", e); + } + } + + public CuratorHeartbeatMonitor(final ClusterCoordinator clusterCoordinator, final Properties properties) { + this.clusterCoordinator = clusterCoordinator; + this.zkClientConfig = ZooKeeperClientConfig.createConfig(properties); + this.heartbeatPath = zkClientConfig.resolvePath("cluster/heartbeats"); + + final String heartbeatInterval = properties.getProperty(NiFiProperties.CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL, + NiFiProperties.DEFAULT_CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL); + + this.heartbeatIntervalMillis = (int) FormatUtils.getTimeDuration(heartbeatInterval, TimeUnit.MILLISECONDS); + } + + @Override + public void start() { + final RetryPolicy retryPolicy = new RetryForever(5000); + curatorClient = CuratorFrameworkFactory.newClient(zkClientConfig.getConnectString(), + zkClientConfig.getSessionTimeoutMillis(), zkClientConfig.getConnectionTimeoutMillis(), retryPolicy); + curatorClient.start(); + + this.future = flowEngine.scheduleWithFixedDelay(new Runnable() { + @Override + public void run() { + try { + monitorHeartbeats(); + } catch (final Exception e) { + clusterCoordinator.reportEvent(null, Severity.ERROR, "Failed to process heartbeats from nodes due to " + e.toString()); + logger.error("Failed to process heartbeats", e); + } + } + }, heartbeatIntervalMillis, heartbeatIntervalMillis, TimeUnit.MILLISECONDS); + } + + private CuratorFramework getClient() { + return curatorClient; + } + + @Override + public void stop() { + final CuratorFramework client = getClient(); + if (client != null) { + client.close(); + } + + if (future != null) { + future.cancel(true); + } + } --- End diff -- Actually in the refactoring i did, these issues are both gone as I removed both the future and the FlowEngine :) > Nodes in cluster should use ZooKeeper to store heartbeat messages instead of > sending to NCM > ------------------------------------------------------------------------------------------- > > Key: NIFI-1678 > URL: https://issues.apache.org/jira/browse/NIFI-1678 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework > Reporter: Mark Payne > Assignee: Mark Payne > Fix For: 1.0.0 > > > Currently, nodes send heartbeats to the NCM periodically in order to indicate > that they are actively participating in the cluster. As we move away from > using an NCM, we need these heartbeats to go somewhere else. ZooKeeper is a > reasonable location to push the heartbeats to, as it provides the HA that we > need -- This message was sent by Atlassian JIRA (v6.3.4#6332)