[ 
https://issues.apache.org/jira/browse/NIFI-1678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15254378#comment-15254378
 ] 

ASF GitHub Bot commented on NIFI-1678:
--------------------------------------

Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/323#discussion_r60781637
  
    --- Diff: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/CuratorHeartbeatMonitor.java
 ---
    @@ -0,0 +1,376 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.cluster.coordination.heartbeat;
    +
    +import java.io.ByteArrayInputStream;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Properties;
    +import java.util.concurrent.ScheduledFuture;
    +import java.util.concurrent.TimeUnit;
    +
    +import javax.xml.bind.JAXBContext;
    +import javax.xml.bind.Unmarshaller;
    +
    +import org.apache.curator.RetryPolicy;
    +import org.apache.curator.framework.CuratorFramework;
    +import org.apache.curator.framework.CuratorFrameworkFactory;
    +import org.apache.curator.retry.RetryForever;
    +import org.apache.nifi.cluster.coordination.ClusterCoordinator;
    +import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
    +import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
    +import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
    +import org.apache.nifi.cluster.protocol.NodeIdentifier;
    +import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
    +import org.apache.nifi.controller.cluster.ZooKeeperClientConfig;
    +import org.apache.nifi.engine.FlowEngine;
    +import org.apache.nifi.reporting.Severity;
    +import org.apache.nifi.util.FormatUtils;
    +import org.apache.nifi.util.NiFiProperties;
    +import org.apache.nifi.util.StopWatch;
    +import org.apache.zookeeper.KeeperException.NoNodeException;
    +import org.apache.zookeeper.data.Stat;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Uses Apache Curator to monitor heartbeats from nodes
    + */
    +public class CuratorHeartbeatMonitor implements HeartbeatMonitor {
    +    private static final Logger logger = 
LoggerFactory.getLogger(CuratorHeartbeatMonitor.class);
    +    private static final Unmarshaller unmarshaller;
    +
    +    private final ClusterCoordinator clusterCoordinator;
    +    private final ZooKeeperClientConfig zkClientConfig;
    +    private final String heartbeatPath;
    +    private final int heartbeatIntervalMillis;
    +
    +    private volatile CuratorFramework curatorClient;
    +    private volatile ScheduledFuture<?> future;
    +    private volatile Map<NodeIdentifier, NodeHeartbeat> 
latestHeartbeatMessages;
    +    private volatile long latestHeartbeatTime;
    +
    +    private final FlowEngine flowEngine = new FlowEngine(1, "Heartbeat 
Monitor", true);
    +
    +    static {
    +        try {
    +            final JAXBContext jaxbContext = 
JAXBContext.newInstance(HeartbeatMessage.class);
    +            unmarshaller = jaxbContext.createUnmarshaller();
    +        } catch (final Exception e) {
    +            throw new RuntimeException("Failed to create an Unmarshaller 
for unmarshalling Heartbeat Messages", e);
    +        }
    +    }
    +
    +    public CuratorHeartbeatMonitor(final ClusterCoordinator 
clusterCoordinator, final Properties properties) {
    +        this.clusterCoordinator = clusterCoordinator;
    +        this.zkClientConfig = 
ZooKeeperClientConfig.createConfig(properties);
    +        this.heartbeatPath = 
zkClientConfig.resolvePath("cluster/heartbeats");
    +
    +        final String heartbeatInterval = 
properties.getProperty(NiFiProperties.CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL,
    +            NiFiProperties.DEFAULT_CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL);
    +
    +        this.heartbeatIntervalMillis = (int) 
FormatUtils.getTimeDuration(heartbeatInterval, TimeUnit.MILLISECONDS);
    +    }
    +
    +    @Override
    +    public void start() {
    +        final RetryPolicy retryPolicy = new RetryForever(5000);
    +        curatorClient = 
CuratorFrameworkFactory.newClient(zkClientConfig.getConnectString(),
    +            zkClientConfig.getSessionTimeoutMillis(), 
zkClientConfig.getConnectionTimeoutMillis(), retryPolicy);
    +        curatorClient.start();
    +
    +        this.future = flowEngine.scheduleWithFixedDelay(new Runnable() {
    +            @Override
    +            public void run() {
    +                try {
    +                    monitorHeartbeats();
    +                } catch (final Exception e) {
    +                    clusterCoordinator.reportEvent(null, Severity.ERROR, 
"Failed to process heartbeats from nodes due to " + e.toString());
    +                    logger.error("Failed to process heartbeats", e);
    +                }
    +            }
    +        }, heartbeatIntervalMillis, heartbeatIntervalMillis, 
TimeUnit.MILLISECONDS);
    +    }
    +
    +    private CuratorFramework getClient() {
    +        return curatorClient;
    +    }
    +
    +    @Override
    +    public void stop() {
    +        final CuratorFramework client = getClient();
    +        if (client != null) {
    +            client.close();
    +        }
    +
    +        if (future != null) {
    +            future.cancel(true);
    +        }
    +    }
    --- End diff --
    
    Actually in the refactoring i did, these issues are both gone as I removed 
both the future and the FlowEngine :)


> Nodes in cluster should use ZooKeeper to store heartbeat messages instead of 
> sending to NCM
> -------------------------------------------------------------------------------------------
>
>                 Key: NIFI-1678
>                 URL: https://issues.apache.org/jira/browse/NIFI-1678
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Core Framework
>            Reporter: Mark Payne
>            Assignee: Mark Payne
>             Fix For: 1.0.0
>
>
> Currently, nodes send heartbeats to the NCM periodically in order to indicate 
> that they are actively participating in the cluster. As we move away from 
> using an NCM, we need these heartbeats to go somewhere else. ZooKeeper is a 
> reasonable location to push the heartbeats to, as it provides the HA that we 
> need



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to