NIFI-2566: Refactored to allow just the Leader Election Manager to be 
responsible for determining who is the Cluster Coordinator

NIFI-2566: Removed storage of cluster roles from heartbeats and 
NodeConnectionStatus; use LeaderElectionManager to determine roles instead

NIFI-2566: Updated Heartbeats so that if a node is out-of-sync with cluster 
topology, cluster coordinator will provide updated information back to the nodes

NIFI-2566: Fixed issue that prevented standalone instance from starting by 
creating a standalone-instance version of the Leader Election Manager. Also 
added Controller Service enabled/disabled state to fingerprint rather than 
attempting to update the state when joining the cluster, as the implementation 
was incorrect and the correct implementation will be a rather significant 
effort that doesn't have to happen for 1.0.0 release

This closes #866

Signed-off-by: jpercivall <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/e42ea9ad
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/e42ea9ad
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/e42ea9ad

Branch: refs/heads/master
Commit: e42ea9ad457c5b5a1d2da5fa3d3494aacb0cb8d4
Parents: b3f3648
Author: Mark Payne <[email protected]>
Authored: Sat Aug 13 19:38:07 2016 -0400
Committer: jpercivall <[email protected]>
Committed: Tue Aug 16 20:39:04 2016 -0400

----------------------------------------------------------------------
 .../coordination/ClusterCoordinator.java        |  44 ++-
 .../heartbeat/HeartbeatMonitor.java             |   5 +
 .../coordination/heartbeat/NodeHeartbeat.java   |   7 -
 .../cluster/coordination/node/ClusterRoles.java |  10 +
 .../coordination/node/NodeConnectionStatus.java |  36 +-
 .../protocol/AbstractNodeProtocolSender.java    |  24 +-
 .../apache/nifi/cluster/protocol/Heartbeat.java |  11 +-
 .../nifi/cluster/protocol/HeartbeatPayload.java | 128 ++++++++
 .../cluster/protocol/NodeProtocolSender.java    |   5 +-
 .../impl/NodeProtocolSenderListener.java        |   5 +-
 .../protocol/jaxb/message/AdaptedHeartbeat.java |  11 -
 .../message/AdaptedNodeConnectionStatus.java    |  11 -
 .../protocol/jaxb/message/HeartbeatAdapter.java |   5 +-
 .../message/NodeConnectionStatusAdapter.java    |   4 +-
 .../protocol/jaxb/message/ObjectFactory.java    |   4 +
 .../message/HeartbeatResponseMessage.java       |  45 +++
 .../protocol/message/ProtocolMessage.java       |   1 +
 .../resources/nifi-cluster-protocol-context.xml |   7 +
 .../jaxb/message/TestJaxbProtocolUtils.java     |  28 ++
 .../heartbeat/AbstractHeartbeatMonitor.java     |   2 -
 .../ClusterProtocolHeartbeatMonitor.java        | 137 +++-----
 .../heartbeat/StandardNodeHeartbeat.java        |  18 +-
 .../node/CuratorNodeProtocolSender.java         | 124 -------
 .../node/LeaderElectionNodeProtocolSender.java  |  80 +++++
 .../node/NodeClusterCoordinator.java            | 325 +++++-------------
 .../NodeClusterCoordinatorFactoryBean.java      |   4 +-
 .../resources/nifi-cluster-manager-context.xml  |   6 +
 .../heartbeat/TestAbstractHeartbeatMonitor.java |  31 +-
 .../TestThreadPoolRequestReplicator.java        |   5 +-
 .../node/TestNodeClusterCoordinator.java        | 105 ++----
 .../nifi/cluster/integration/Cluster.java       |  34 +-
 .../integration/ClusterConnectionIT.java        | 327 +++++++++----------
 .../nifi/cluster/integration/ClusterUtils.java  |  10 +-
 .../apache/nifi/cluster/integration/Node.java   |  51 ++-
 .../apache/nifi/cluster/HeartbeatPayload.java   | 118 -------
 .../apache/nifi/controller/FlowController.java  |  46 ++-
 .../nifi/controller/StandardFlowService.java    |   5 +-
 .../controller/StandardFlowSynchronizer.java    |  83 ++---
 .../cluster/ClusterProtocolHeartbeater.java     | 107 +++---
 .../election/CuratorLeaderElectionManager.java  |  66 +++-
 .../leader/election/LeaderElectionManager.java  |  24 +-
 .../StandaloneLeaderElectionManager.java        |  65 ++++
 .../nifi/fingerprint/FingerprintFactory.java    |   1 +
 .../nifi/spring/FlowControllerFactoryBean.java  |  14 +-
 .../LeaderElectionManagerFactoryBean.java       |  57 ++++
 .../src/main/resources/nifi-context.xml         |   1 +
 .../nifi/cluster/HeartbeatPayloadTest.java      |   1 +
 .../java/org/apache/nifi/nar/NarCloseable.java  |  29 ++
 .../nifi/web/StandardNiFiServiceFacade.java     |  42 ++-
 .../src/main/resources/nifi-web-api-context.xml |   1 +
 50 files changed, 1170 insertions(+), 1140 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java
index 4894fc5..49c6142 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java
@@ -107,13 +107,20 @@ public interface ClusterCoordinator {
     Set<NodeIdentifier> getNodeIdentifiers(NodeConnectionState... states);
 
     /**
-     * Returns a Map of NodeConnectionStatus to all Node Identifiers that have 
that status.
+     * Returns a Map of NodeConnectionStates to all Node Identifiers that have 
that state.
      *
-     * @return the NodeConnectionStatus for each Node in the cluster, grouped 
by the Connection Status
+     * @return the NodeConnectionState for each Node in the cluster, grouped 
by the Connection State
      */
     Map<NodeConnectionState, List<NodeIdentifier>> getConnectionStates();
 
     /**
+     * Returns a List of the NodeConnectionStatus for each node in the cluster
+     *
+     * @return a List of the NodeConnectionStatus for each node in the cluster
+     */
+    List<NodeConnectionStatus> getConnectionStatuses();
+
+    /**
      * Checks if the given hostname is blocked by the configured firewall, 
returning
      * <code>true</code> if the node is blocked, <code>false</code> if the 
node is
      * allowed through the firewall or if there is no firewall configured
@@ -135,14 +142,6 @@ public interface ClusterCoordinator {
     void reportEvent(NodeIdentifier nodeId, Severity severity, String event);
 
     /**
-     * Updates the roles held by the given node
-     *
-     * @param nodeId the id of the node to update
-     * @param roles the new roles that the node possesses
-     */
-    void updateNodeRoles(NodeIdentifier nodeId, Set<String> roles);
-
-    /**
      * Returns the NodeIdentifier that exists that has the given UUID, or 
<code>null</code> if no NodeIdentifier
      * exists for the given UUID
      *
@@ -197,6 +196,17 @@ public interface ClusterCoordinator {
     void resetNodeStatuses(Map<NodeIdentifier, NodeConnectionStatus> 
statusMap);
 
     /**
+     * Resets the status of the node to be in accordance with the given 
NodeConnectionStatus if and only if the
+     * currently held status for this node has an Update ID equal to the given 
<code>qualifyingUpdateId</code>
+     *
+     * @param connectionStatus the new status of the node
+     * @param qualifyingUpdateId the Update ID to compare the current ID with. 
If the current ID for the node described by the provided
+     *            NodeConnectionStatus is not equal to this value, the value 
will not be updated
+     * @return <code>true</code> if the node status was updated, 
<code>false</code> if the <code>qualifyingUpdateId</code> is out of date.
+     */
+    boolean resetNodeStatus(NodeConnectionStatus connectionStatus, long 
qualifyingUpdateId);
+
+    /**
      * Notifies the Cluster Coordinator of the Node Identifier that the 
coordinator is currently running on
      *
      * @param nodeId the ID of the current node
@@ -216,18 +226,4 @@ public interface ClusterCoordinator {
      * @return <code>true</code> if connected, <code>false</code> otherwise
      */
     boolean isConnected();
-
-    /**
-     * Notifies the cluster coordinator that this node has been granted the 
given role
-     *
-     * @param clusterRole the role that this node has been granted
-     */
-    void addRole(String clusterRole);
-
-    /**
-     * Notifies the cluster coordinator that this node is no longer 
responsible for the given role
-     *
-     * @param clusterRole the role that this node is no longer responsible for
-     */
-    void removeRole(String clusterRole);
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/HeartbeatMonitor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/HeartbeatMonitor.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/HeartbeatMonitor.java
index c151382..988ba75 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/HeartbeatMonitor.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/HeartbeatMonitor.java
@@ -52,4 +52,9 @@ public interface HeartbeatMonitor {
      * @param nodeId the id of the node whose heartbeat should be removed
      */
     void removeHeartbeat(NodeIdentifier nodeId);
+
+    /**
+     * @return the address that heartbeats should be sent to when this node is 
elected coordinator.
+     */
+    String getHeartbeatAddress();
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/NodeHeartbeat.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/NodeHeartbeat.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/NodeHeartbeat.java
index 2ddda79..c4413a2 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/NodeHeartbeat.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/NodeHeartbeat.java
@@ -17,8 +17,6 @@
 
 package org.apache.nifi.cluster.coordination.heartbeat;
 
-import java.util.Set;
-
 import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
 
@@ -40,11 +38,6 @@ public interface NodeHeartbeat {
     NodeConnectionStatus getConnectionStatus();
 
     /**
-     * @return the set of Roles that the node currently possesses.
-     */
-    Set<String> getRoles();
-
-    /**
      * @return the number of FlowFiles that are queued up on the node
      */
     int getFlowFileCount();

http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/ClusterRoles.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/ClusterRoles.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/ClusterRoles.java
index 611faa4..1425c9d 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/ClusterRoles.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/ClusterRoles.java
@@ -17,9 +17,19 @@
 
 package org.apache.nifi.cluster.coordination.node;
 
+import java.util.HashSet;
+import java.util.Set;
+
 public class ClusterRoles {
 
     public static final String PRIMARY_NODE = "Primary Node";
 
     public static final String CLUSTER_COORDINATOR = "Cluster Coordinator";
+
+    public static Set<String> getAllRoles() {
+        final Set<String> roles = new HashSet<>();
+        roles.add(PRIMARY_NODE);
+        roles.add(CLUSTER_COORDINATOR);
+        return roles;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/NodeConnectionStatus.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/NodeConnectionStatus.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/NodeConnectionStatus.java
index 23e509d..34bd127 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/NodeConnectionStatus.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/NodeConnectionStatus.java
@@ -17,10 +17,7 @@
 
 package org.apache.nifi.cluster.coordination.node;
 
-import java.util.Collections;
-import java.util.HashSet;
 import java.util.Objects;
-import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 
 import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
@@ -41,40 +38,38 @@ public class NodeConnectionStatus {
     private final DisconnectionCode disconnectCode;
     private final String disconnectReason;
     private final Long connectionRequestTime;
-    private final Set<String> roles;
 
 
-    public NodeConnectionStatus(final NodeIdentifier nodeId, final 
NodeConnectionState state, final Set<String> roles) {
-        this(nodeId, state, null, null, null, roles);
+    public NodeConnectionStatus(final NodeIdentifier nodeId, final 
NodeConnectionState state) {
+        this(nodeId, state, null, null, null);
     }
 
     public NodeConnectionStatus(final NodeIdentifier nodeId, final 
DisconnectionCode disconnectionCode) {
-        this(nodeId, NodeConnectionState.DISCONNECTED, disconnectionCode, 
disconnectionCode.name(), null, null);
+        this(nodeId, NodeConnectionState.DISCONNECTED, disconnectionCode, 
disconnectionCode.toString(), null);
     }
 
     public NodeConnectionStatus(final NodeIdentifier nodeId, final 
DisconnectionCode disconnectionCode, final String disconnectionExplanation) {
-        this(nodeId, NodeConnectionState.DISCONNECTED, disconnectionCode, 
disconnectionExplanation, null, null);
+        this(nodeId, NodeConnectionState.DISCONNECTED, disconnectionCode, 
disconnectionExplanation, null);
     }
 
-    public NodeConnectionStatus(final NodeIdentifier nodeId, final 
NodeConnectionState state, final DisconnectionCode disconnectionCode, final 
Set<String> roles) {
-        this(nodeId, state, disconnectionCode, disconnectionCode.name(), null, 
roles);
+    public NodeConnectionStatus(final NodeIdentifier nodeId, final 
NodeConnectionState state, final DisconnectionCode disconnectionCode) {
+        this(nodeId, state, disconnectionCode, disconnectionCode == null ? 
null : disconnectionCode.toString(), null);
     }
 
-    public NodeConnectionStatus(final NodeConnectionStatus status, final 
Set<String> roles) {
-        this(status.getNodeIdentifier(), status.getState(), 
status.getDisconnectCode(), status.getDisconnectReason(), 
status.getConnectionRequestTime(), roles);
+    public NodeConnectionStatus(final NodeConnectionStatus status) {
+        this(status.getNodeIdentifier(), status.getState(), 
status.getDisconnectCode(), status.getDisconnectReason(), 
status.getConnectionRequestTime());
     }
 
     public NodeConnectionStatus(final NodeIdentifier nodeId, final 
NodeConnectionState state, final DisconnectionCode disconnectCode,
-        final String disconnectReason, final Long connectionRequestTime, final 
Set<String> roles) {
-        this(idGenerator.getAndIncrement(), nodeId, state, disconnectCode, 
disconnectReason, connectionRequestTime, roles);
+        final String disconnectReason, final Long connectionRequestTime) {
+        this(idGenerator.getAndIncrement(), nodeId, state, disconnectCode, 
disconnectReason, connectionRequestTime);
     }
 
     public NodeConnectionStatus(final long updateId, final NodeIdentifier 
nodeId, final NodeConnectionState state, final DisconnectionCode disconnectCode,
-        final String disconnectReason, final Long connectionRequestTime, final 
Set<String> roles) {
+        final String disconnectReason, final Long connectionRequestTime) {
         this.updateId = updateId;
         this.nodeId = nodeId;
         this.state = state;
-        this.roles = roles == null ? Collections.emptySet() : 
Collections.unmodifiableSet(new HashSet<>(roles));
         if (state == NodeConnectionState.DISCONNECTED && disconnectCode == 
null) {
             this.disconnectCode = DisconnectionCode.UNKNOWN;
             this.disconnectReason = this.disconnectCode.toString();
@@ -90,10 +85,6 @@ public class NodeConnectionStatus {
         return updateId;
     }
 
-    public Set<String> getRoles() {
-        return roles;
-    }
-
     public NodeIdentifier getNodeIdentifier() {
         return nodeId;
     }
@@ -118,11 +109,10 @@ public class NodeConnectionStatus {
     public String toString() {
         final StringBuilder sb = new StringBuilder();
         final NodeConnectionState state = getState();
-        sb.append("NodeConnectionStatus[state=").append(state);
+        sb.append("NodeConnectionStatus[nodeId=").append(nodeId).append(", 
state=").append(state);
         if (state == NodeConnectionState.DISCONNECTED || state == 
NodeConnectionState.DISCONNECTING) {
             sb.append(", Disconnect 
Code=").append(getDisconnectCode()).append(", Disconnect 
Reason=").append(getDisconnectReason());
         }
-        sb.append(", roles=").append(getRoles());
         sb.append(", updateId=").append(getUpdateIdentifier());
         sb.append("]");
         return sb.toString();
@@ -142,7 +132,6 @@ public class NodeConnectionStatus {
         final int prime = 31;
         int result = 1;
         result = prime * result + ((nodeId == null) ? 0 : nodeId.hashCode());
-        result = prime * result + ((roles == null) ? 0 : roles.hashCode());
         result = prime * result + ((state == null) ? 0 : state.hashCode());
         return result;
     }
@@ -163,7 +152,6 @@ public class NodeConnectionStatus {
 
         NodeConnectionStatus other = (NodeConnectionStatus) obj;
         return Objects.deepEquals(getNodeIdentifier(), 
other.getNodeIdentifier())
-            && Objects.deepEquals(getRoles(), other.getRoles())
             && Objects.deepEquals(getState(), other.getState());
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/AbstractNodeProtocolSender.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/AbstractNodeProtocolSender.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/AbstractNodeProtocolSender.java
index 4131dc5..22d6ebc 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/AbstractNodeProtocolSender.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/AbstractNodeProtocolSender.java
@@ -24,6 +24,7 @@ import java.net.Socket;
 import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
 import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
 import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
+import org.apache.nifi.cluster.protocol.message.HeartbeatResponseMessage;
 import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
 import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType;
 import org.apache.nifi.io.socket.SocketConfiguration;
@@ -74,7 +75,7 @@ public abstract class AbstractNodeProtocolSender implements 
NodeProtocolSender {
     }
 
     @Override
-    public void heartbeat(final HeartbeatMessage msg, final String address) 
throws ProtocolException {
+    public HeartbeatResponseMessage heartbeat(final HeartbeatMessage msg, 
final String address) throws ProtocolException {
         final String hostname;
         final int port;
         try {
@@ -85,7 +86,12 @@ public abstract class AbstractNodeProtocolSender implements 
NodeProtocolSender {
             throw new IllegalArgumentException("Cannot send heartbeat to 
address [" + address + "]. Address must be in <hostname>:<port> format");
         }
 
-        sendProtocolMessage(msg, hostname, port);
+        final ProtocolMessage responseMessage = sendProtocolMessage(msg, 
hostname, port);
+        if (MessageType.HEARTBEAT_RESPONSE == responseMessage.getType()) {
+            return (HeartbeatResponseMessage) responseMessage;
+        }
+
+        throw new ProtocolException("Expected message type '" + 
MessageType.HEARTBEAT_RESPONSE + "' but found '" + responseMessage.getType() + 
"'");
     }
 
 
@@ -108,7 +114,7 @@ public abstract class AbstractNodeProtocolSender implements 
NodeProtocolSender {
         return socketConfiguration;
     }
 
-    private void sendProtocolMessage(final ProtocolMessage msg, final String 
hostname, final int port) {
+    private ProtocolMessage sendProtocolMessage(final ProtocolMessage msg, 
final String hostname, final int port) {
         Socket socket = null;
         try {
             try {
@@ -124,6 +130,18 @@ public abstract class AbstractNodeProtocolSender 
implements NodeProtocolSender {
             } catch (final IOException ioe) {
                 throw new ProtocolException("Failed marshalling '" + 
msg.getType() + "' protocol message due to: " + ioe, ioe);
             }
+
+            final ProtocolMessage response;
+            try {
+                // unmarshall response and return
+                final ProtocolMessageUnmarshaller<ProtocolMessage> 
unmarshaller = protocolContext.createUnmarshaller();
+                response = unmarshaller.unmarshal(socket.getInputStream());
+            } catch (final IOException ioe) {
+                throw new ProtocolException("Failed unmarshalling '" + 
MessageType.CONNECTION_RESPONSE + "' protocol message from "
+                    + socket.getRemoteSocketAddress() + " due to: " + ioe, 
ioe);
+            }
+
+            return response;
         } finally {
             SocketUtils.closeQuietly(socket);
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/Heartbeat.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/Heartbeat.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/Heartbeat.java
index 2135f20..a028c8e 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/Heartbeat.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/Heartbeat.java
@@ -16,10 +16,7 @@
  */
 package org.apache.nifi.cluster.protocol;
 
-import java.util.Collections;
 import java.util.Date;
-import java.util.HashSet;
-import java.util.Set;
 
 import javax.xml.bind.annotation.XmlTransient;
 import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
@@ -35,17 +32,15 @@ import 
org.apache.nifi.cluster.protocol.jaxb.message.HeartbeatAdapter;
 public class Heartbeat {
 
     private final NodeIdentifier nodeIdentifier;
-    private final Set<String> roles;
     private final NodeConnectionStatus connectionStatus;
     private final long createdTimestamp;
     private final byte[] payload;
 
-    public Heartbeat(final NodeIdentifier nodeIdentifier, final Set<String> 
roles, final NodeConnectionStatus connectionStatus, final byte[] payload) {
+    public Heartbeat(final NodeIdentifier nodeIdentifier, final 
NodeConnectionStatus connectionStatus, final byte[] payload) {
         if (nodeIdentifier == null) {
             throw new IllegalArgumentException("Node Identifier may not be 
null.");
         }
         this.nodeIdentifier = nodeIdentifier;
-        this.roles = roles == null ? Collections.emptySet() : 
Collections.unmodifiableSet(new HashSet<>(roles));
         this.connectionStatus = connectionStatus;
         this.payload = payload;
         this.createdTimestamp = new Date().getTime();
@@ -59,10 +54,6 @@ public class Heartbeat {
         return payload;
     }
 
-    public Set<String> getRoles() {
-        return roles;
-    }
-
     public NodeConnectionStatus getConnectionStatus() {
         return connectionStatus;
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/HeartbeatPayload.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/HeartbeatPayload.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/HeartbeatPayload.java
new file mode 100644
index 0000000..8363a20
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/HeartbeatPayload.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.List;
+
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Marshaller;
+import javax.xml.bind.Unmarshaller;
+import javax.xml.bind.annotation.XmlRootElement;
+
+import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
+
+/**
+ * The payload of the heartbeat. The payload contains status to inform the 
cluster manager the current workload of this node.
+ *
+ */
+@XmlRootElement
+public class HeartbeatPayload {
+
+    private static final JAXBContext JAXB_CONTEXT;
+
+    static {
+        try {
+            JAXB_CONTEXT = JAXBContext.newInstance(HeartbeatPayload.class);
+        } catch (JAXBException e) {
+            throw new RuntimeException("Unable to create JAXBContext.");
+        }
+    }
+
+    private int activeThreadCount;
+    private long totalFlowFileCount;
+    private long totalFlowFileBytes;
+    private long systemStartTime;
+    private List<NodeConnectionStatus> clusterStatus;
+
+    public int getActiveThreadCount() {
+        return activeThreadCount;
+    }
+
+    public void setActiveThreadCount(final int activeThreadCount) {
+        this.activeThreadCount = activeThreadCount;
+    }
+
+    public long getTotalFlowFileCount() {
+        return totalFlowFileCount;
+    }
+
+    public void setTotalFlowFileCount(final long totalFlowFileCount) {
+        this.totalFlowFileCount = totalFlowFileCount;
+    }
+
+    public long getTotalFlowFileBytes() {
+        return totalFlowFileBytes;
+    }
+
+    public void setTotalFlowFileBytes(final long totalFlowFileBytes) {
+        this.totalFlowFileBytes = totalFlowFileBytes;
+    }
+
+    public long getSystemStartTime() {
+        return systemStartTime;
+    }
+
+    public void setSystemStartTime(final long systemStartTime) {
+        this.systemStartTime = systemStartTime;
+    }
+
+    public List<NodeConnectionStatus> getClusterStatus() {
+        return clusterStatus;
+    }
+
+    public void setClusterStatus(final List<NodeConnectionStatus> 
clusterStatus) {
+        this.clusterStatus = clusterStatus;
+    }
+
+    public byte[] marshal() throws ProtocolException {
+        final ByteArrayOutputStream payloadBytes = new ByteArrayOutputStream();
+        marshal(this, payloadBytes);
+        return payloadBytes.toByteArray();
+    }
+
+    public static void marshal(final HeartbeatPayload payload, final 
OutputStream os) throws ProtocolException {
+        try {
+            final Marshaller marshaller = JAXB_CONTEXT.createMarshaller();
+            marshaller.marshal(payload, os);
+        } catch (final JAXBException je) {
+            throw new ProtocolException(je);
+        }
+    }
+
+    public static HeartbeatPayload unmarshal(final InputStream is) throws 
ProtocolException {
+        try {
+            final Unmarshaller unmarshaller = 
JAXB_CONTEXT.createUnmarshaller();
+            return (HeartbeatPayload) unmarshaller.unmarshal(is);
+        } catch (final JAXBException je) {
+            throw new ProtocolException(je);
+        }
+    }
+
+    public static HeartbeatPayload unmarshal(final byte[] bytes) throws 
ProtocolException {
+        try {
+            final Unmarshaller unmarshaller = 
JAXB_CONTEXT.createUnmarshaller();
+            return (HeartbeatPayload) unmarshaller.unmarshal(new 
ByteArrayInputStream(bytes));
+        } catch (final JAXBException je) {
+            throw new ProtocolException(je);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java
index 432a03d..fcf5195 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java
@@ -19,6 +19,7 @@ package org.apache.nifi.cluster.protocol;
 import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
 import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
 import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
+import org.apache.nifi.cluster.protocol.message.HeartbeatResponseMessage;
 
 /**
  * An interface for sending protocol messages from a node to the cluster
@@ -44,6 +45,8 @@ public interface NodeProtocolSender {
      * @param msg the heartbeat message to send
      * @param address the address of the Cluster Coordinator in 
&lt;hostname&gt;:&lt;port&gt; format
      * @throws ProtocolException if unable to send the heartbeat
+     *
+     * @return the response from the Cluster Coordinator
      */
-    void heartbeat(HeartbeatMessage msg, String address) throws 
ProtocolException;
+    HeartbeatResponseMessage heartbeat(HeartbeatMessage msg, String address) 
throws ProtocolException;
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java
index 0fd2517..1b0aeea 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java
@@ -27,6 +27,7 @@ import 
org.apache.nifi.cluster.protocol.UnknownServiceAddressException;
 import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
 import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
 import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
+import org.apache.nifi.cluster.protocol.message.HeartbeatResponseMessage;
 import org.apache.nifi.reporting.BulletinRepository;
 
 public class NodeProtocolSenderListener implements NodeProtocolSender, 
ProtocolListener {
@@ -92,7 +93,7 @@ public class NodeProtocolSenderListener implements 
NodeProtocolSender, ProtocolL
     }
 
     @Override
-    public void heartbeat(HeartbeatMessage msg, String address) throws 
ProtocolException {
-        sender.heartbeat(msg, address);
+    public HeartbeatResponseMessage heartbeat(final HeartbeatMessage msg, 
final String address) throws ProtocolException {
+        return sender.heartbeat(msg, address);
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedHeartbeat.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedHeartbeat.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedHeartbeat.java
index f1eba52..6f718fa 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedHeartbeat.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedHeartbeat.java
@@ -16,8 +16,6 @@
  */
 package org.apache.nifi.cluster.protocol.jaxb.message;
 
-import java.util.Set;
-
 import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
 
 import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
@@ -29,7 +27,6 @@ public class AdaptedHeartbeat {
 
     private NodeIdentifier nodeIdentifier;
     private byte[] payload;
-    private Set<String> roles;
     private NodeConnectionStatus connectionStatus;
 
     public AdaptedHeartbeat() {
@@ -44,14 +41,6 @@ public class AdaptedHeartbeat {
         this.nodeIdentifier = nodeIdentifier;
     }
 
-    public Set<String> getRoles() {
-        return roles;
-    }
-
-    public void setRoles(Set<String> roles) {
-        this.roles = roles;
-    }
-
     public void setConnectionStatus(NodeConnectionStatus connectionStatus) {
         this.connectionStatus = connectionStatus;
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeConnectionStatus.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeConnectionStatus.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeConnectionStatus.java
index 9cfac2c..c8c4acf 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeConnectionStatus.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeConnectionStatus.java
@@ -17,8 +17,6 @@
 
 package org.apache.nifi.cluster.protocol.jaxb.message;
 
-import java.util.Set;
-
 import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
 import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
@@ -30,7 +28,6 @@ public class AdaptedNodeConnectionStatus {
     private DisconnectionCode disconnectCode;
     private String disconnectReason;
     private Long connectionRequestTime;
-    private Set<String> roles;
 
     public Long getUpdateId() {
         return updateId;
@@ -79,12 +76,4 @@ public class AdaptedNodeConnectionStatus {
     public void setConnectionRequestTime(Long connectionRequestTime) {
         this.connectionRequestTime = connectionRequestTime;
     }
-
-    public Set<String> getRoles() {
-        return roles;
-    }
-
-    public void setRoles(Set<String> roles) {
-        this.roles = roles;
-    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/HeartbeatAdapter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/HeartbeatAdapter.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/HeartbeatAdapter.java
index 94d26ce..58e675b 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/HeartbeatAdapter.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/HeartbeatAdapter.java
@@ -34,9 +34,6 @@ public class HeartbeatAdapter extends 
XmlAdapter<AdaptedHeartbeat, Heartbeat> {
             // set payload
             aHb.setPayload(hb.getPayload());
 
-            // set leader flag
-            aHb.setRoles(hb.getRoles());
-
             // set connected flag
             aHb.setConnectionStatus(hb.getConnectionStatus());
         }
@@ -46,7 +43,7 @@ public class HeartbeatAdapter extends 
XmlAdapter<AdaptedHeartbeat, Heartbeat> {
 
     @Override
     public Heartbeat unmarshal(final AdaptedHeartbeat aHb) {
-        return new Heartbeat(aHb.getNodeIdentifier(), aHb.getRoles(), 
aHb.getConnectionStatus(), aHb.getPayload());
+        return new Heartbeat(aHb.getNodeIdentifier(), 
aHb.getConnectionStatus(), aHb.getPayload());
     }
 
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeConnectionStatusAdapter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeConnectionStatusAdapter.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeConnectionStatusAdapter.java
index 21d0bda..ec209de 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeConnectionStatusAdapter.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeConnectionStatusAdapter.java
@@ -30,8 +30,7 @@ public class NodeConnectionStatusAdapter extends 
XmlAdapter<AdaptedNodeConnectio
             adapted.getState(),
             adapted.getDisconnectCode(),
             adapted.getDisconnectReason(),
-            adapted.getConnectionRequestTime(),
-            adapted.getRoles());
+            adapted.getConnectionRequestTime());
     }
 
     @Override
@@ -44,7 +43,6 @@ public class NodeConnectionStatusAdapter extends 
XmlAdapter<AdaptedNodeConnectio
             adapted.setDisconnectCode(toAdapt.getDisconnectCode());
             adapted.setDisconnectReason(toAdapt.getDisconnectReason());
             adapted.setState(toAdapt.getState());
-            adapted.setRoles(toAdapt.getRoles());
         }
         return adapted;
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java
index da13b02..afa87b9 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java
@@ -24,6 +24,7 @@ import 
org.apache.nifi.cluster.protocol.message.DisconnectMessage;
 import org.apache.nifi.cluster.protocol.message.FlowRequestMessage;
 import org.apache.nifi.cluster.protocol.message.FlowResponseMessage;
 import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
+import org.apache.nifi.cluster.protocol.message.HeartbeatResponseMessage;
 import org.apache.nifi.cluster.protocol.message.MulticastProtocolMessage;
 import 
org.apache.nifi.cluster.protocol.message.NodeConnectionStatusRequestMessage;
 import 
org.apache.nifi.cluster.protocol.message.NodeConnectionStatusResponseMessage;
@@ -97,4 +98,7 @@ public class ObjectFactory {
         return new NodeConnectionStatusResponseMessage();
     }
 
+    public HeartbeatResponseMessage createHeartbeatResponse() {
+        return new HeartbeatResponseMessage();
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/HeartbeatResponseMessage.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/HeartbeatResponseMessage.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/HeartbeatResponseMessage.java
new file mode 100644
index 0000000..cbb8b48
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/HeartbeatResponseMessage.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.protocol.message;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
+
+@XmlRootElement(name = "heartbeatResponse")
+public class HeartbeatResponseMessage extends ProtocolMessage {
+
+    private List<NodeConnectionStatus> updatedNodeStatuses = new ArrayList<>();
+
+
+    @Override
+    public MessageType getType() {
+        return MessageType.HEARTBEAT_RESPONSE;
+    }
+
+    public List<NodeConnectionStatus> getUpdatedNodeStatuses() {
+        return updatedNodeStatuses;
+    }
+
+    public void setUpdatedNodeStatuses(final List<NodeConnectionStatus> 
nodeStatuses) {
+        this.updatedNodeStatuses = new ArrayList<>(nodeStatuses);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
index 2e74689..1d0d115 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
@@ -32,6 +32,7 @@ public abstract class ProtocolMessage {
         RECONNECTION_RESPONSE,
         SERVICE_BROADCAST,
         HEARTBEAT,
+        HEARTBEAT_RESPONSE,
         NODE_CONNECTION_STATUS_REQUEST,
         NODE_CONNECTION_STATUS_RESPONSE,
         NODE_STATUS_CHANGE;

http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/resources/nifi-cluster-protocol-context.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/resources/nifi-cluster-protocol-context.xml
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/resources/nifi-cluster-protocol-context.xml
index 04332a5..032081b 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/resources/nifi-cluster-protocol-context.xml
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/resources/nifi-cluster-protocol-context.xml
@@ -54,11 +54,18 @@
     </bean>
     
     <!-- node protocol sender -->
+    <!--
     <bean id="nodeProtocolSender" 
class="org.apache.nifi.cluster.coordination.node.CuratorNodeProtocolSender">
         <constructor-arg ref="protocolSocketConfiguration"/>
         <constructor-arg ref="protocolContext"/>
         <constructor-arg ref="nifiProperties"/>
     </bean>
+    -->
+    <bean id="nodeProtocolSender" 
class="org.apache.nifi.cluster.coordination.node.LeaderElectionNodeProtocolSender">
+        <constructor-arg ref="protocolSocketConfiguration"/>
+        <constructor-arg ref="protocolContext"/>
+        <constructor-arg ref="leaderElectionManager"/>
+    </bean>
     
     <!-- protocol listener -->
     <bean id="protocolListener" 
class="org.apache.nifi.cluster.protocol.impl.SocketProtocolListener">

http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/jaxb/message/TestJaxbProtocolUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/jaxb/message/TestJaxbProtocolUtils.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/jaxb/message/TestJaxbProtocolUtils.java
index 21636a4..4fa53e8 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/jaxb/message/TestJaxbProtocolUtils.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/jaxb/message/TestJaxbProtocolUtils.java
@@ -32,9 +32,12 @@ import 
org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
 import org.apache.nifi.cluster.protocol.ComponentRevision;
 import org.apache.nifi.cluster.protocol.ConnectionResponse;
 import org.apache.nifi.cluster.protocol.DataFlow;
+import org.apache.nifi.cluster.protocol.Heartbeat;
+import org.apache.nifi.cluster.protocol.HeartbeatPayload;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.cluster.protocol.StandardDataFlow;
 import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
+import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
 import 
org.apache.nifi.cluster.protocol.message.NodeConnectionStatusRequestMessage;
 import 
org.apache.nifi.cluster.protocol.message.NodeConnectionStatusResponseMessage;
 import org.apache.nifi.web.Revision;
@@ -96,4 +99,29 @@ public class TestJaxbProtocolUtils {
         final NodeConnectionStatus unmarshalledStatus = 
unmarshalledMsg.getNodeConnectionStatus();
         assertEquals(nodeStatus, unmarshalledStatus);
     }
+
+    @Test
+    public void testRoundTripHeartbeat() throws JAXBException {
+        final NodeIdentifier nodeId = new NodeIdentifier("id", "localhost", 
8000, "localhost", 8001, "localhost", 8002, 8003, true);
+        final NodeConnectionStatus nodeStatus = new 
NodeConnectionStatus(nodeId, DisconnectionCode.NOT_YET_CONNECTED);
+
+        final HeartbeatPayload payload = new HeartbeatPayload();
+        payload.setActiveThreadCount(1);
+        payload.setSystemStartTime(System.currentTimeMillis());
+        payload.setTotalFlowFileBytes(83L);
+        payload.setTotalFlowFileCount(4);
+
+        final List<NodeConnectionStatus> clusterStatus = 
Collections.singletonList(nodeStatus);
+        payload.setClusterStatus(clusterStatus);
+
+        final Heartbeat heartbeat = new Heartbeat(nodeId, nodeStatus, 
payload.marshal());
+
+        final HeartbeatMessage msg = new HeartbeatMessage();
+        msg.setHeartbeat(heartbeat);
+
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        JaxbProtocolUtils.JAXB_CONTEXT.createMarshaller().marshal(msg, baos);
+        final Object unmarshalled = 
JaxbProtocolUtils.JAXB_CONTEXT.createUnmarshaller().unmarshal(new 
ByteArrayInputStream(baos.toByteArray()));
+        assertTrue(unmarshalled instanceof HeartbeatMessage);
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
index c216ed3..e304c23 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
@@ -256,8 +256,6 @@ public abstract class AbstractHeartbeatMonitor implements 
HeartbeatMonitor {
             clusterCoordinator.finishNodeConnection(nodeId);
             clusterCoordinator.reportEvent(nodeId, Severity.INFO, "Received 
first heartbeat from connecting node. Node connected.");
         }
-
-        clusterCoordinator.updateNodeRoles(nodeId, heartbeat.getRoles());
     }
 
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
index d2d81d1..b955bd0 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
@@ -17,39 +17,34 @@
 
 package org.apache.nifi.cluster.coordination.heartbeat;
 
-import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 import javax.xml.bind.JAXBContext;
 import javax.xml.bind.Unmarshaller;
 
-import org.apache.curator.RetryPolicy;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.RetryForever;
-import org.apache.nifi.cluster.HeartbeatPayload;
 import org.apache.nifi.cluster.coordination.ClusterCoordinator;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
 import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
 import org.apache.nifi.cluster.protocol.Heartbeat;
+import org.apache.nifi.cluster.protocol.HeartbeatPayload;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.cluster.protocol.ProtocolException;
 import org.apache.nifi.cluster.protocol.ProtocolHandler;
 import org.apache.nifi.cluster.protocol.ProtocolListener;
 import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
+import org.apache.nifi.cluster.protocol.message.HeartbeatResponseMessage;
 import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
 import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType;
-import org.apache.nifi.controller.cluster.ZooKeeperClientConfig;
 import org.apache.nifi.util.NiFiProperties;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException.NoNodeException;
-import org.apache.zookeeper.KeeperException.NodeExistsException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -59,13 +54,6 @@ import org.slf4j.LoggerFactory;
  */
 public class ClusterProtocolHeartbeatMonitor extends AbstractHeartbeatMonitor 
implements HeartbeatMonitor, ProtocolHandler {
     protected static final Logger logger = 
LoggerFactory.getLogger(ClusterProtocolHeartbeatMonitor.class);
-    private static final String COORDINATOR_ZNODE_NAME = "coordinator";
-
-    private final ZooKeeperClientConfig zkClientConfig;
-    private final String clusterNodesPath;
-
-    private volatile Map<String, NodeIdentifier> clusterNodeIds = new 
HashMap<>();
-    private volatile CuratorFramework curatorClient;
 
     private final String heartbeatAddress;
     private final ConcurrentMap<NodeIdentifier, NodeHeartbeat> 
heartbeatMessages = new ConcurrentHashMap<>();
@@ -86,8 +74,6 @@ public class ClusterProtocolHeartbeatMonitor extends 
AbstractHeartbeatMonitor im
         super(clusterCoordinator, properties);
 
         protocolListener.addHandler(this);
-        this.zkClientConfig = ZooKeeperClientConfig.createConfig(properties);
-        this.clusterNodesPath = zkClientConfig.resolvePath("cluster/nodes");
 
         String hostname = 
properties.getProperty(NiFiProperties.CLUSTER_NODE_ADDRESS);
         if (hostname == null || hostname.trim().isEmpty()) {
@@ -111,17 +97,12 @@ public class ClusterProtocolHeartbeatMonitor extends 
AbstractHeartbeatMonitor im
     }
 
     @Override
-    public void onStart() {
-        final RetryPolicy retryPolicy = new RetryForever(5000);
-        curatorClient = CuratorFrameworkFactory.builder()
-            .connectString(zkClientConfig.getConnectString())
-            .sessionTimeoutMs(zkClientConfig.getSessionTimeoutMillis())
-            .connectionTimeoutMs(zkClientConfig.getConnectionTimeoutMillis())
-            .retryPolicy(retryPolicy)
-            .defaultData(new byte[0])
-            .build();
-        curatorClient.start();
+    public String getHeartbeatAddress() {
+        return heartbeatAddress;
+    }
 
+    @Override
+    public void onStart() {
         // We don't know what the heartbeats look like for the nodes, since we 
were just elected to monitoring
         // them. However, the map may be filled with old heartbeats. So we 
clear the heartbeats and populate the
         // map with new heartbeats set to the current time and using the 
currently known status. We do this so
@@ -130,58 +111,13 @@ public class ClusterProtocolHeartbeatMonitor extends 
AbstractHeartbeatMonitor im
         heartbeatMessages.clear();
         for (final NodeIdentifier nodeId : 
clusterCoordinator.getNodeIdentifiers()) {
             final NodeHeartbeat heartbeat = new StandardNodeHeartbeat(nodeId, 
System.currentTimeMillis(),
-                clusterCoordinator.getConnectionStatus(nodeId), 
Collections.emptySet(), 0, 0L, 0, System.currentTimeMillis());
+                clusterCoordinator.getConnectionStatus(nodeId), 0, 0L, 0, 
System.currentTimeMillis());
             heartbeatMessages.put(nodeId, heartbeat);
         }
-
-        final Thread publishAddress = new Thread(new Runnable() {
-            @Override
-            public void run() {
-                while (!isStopped()) {
-                    final String path = clusterNodesPath + "/" + 
COORDINATOR_ZNODE_NAME;
-                    try {
-                        try {
-                            curatorClient.setData().forPath(path, 
heartbeatAddress.getBytes(StandardCharsets.UTF_8));
-                            logger.info("Successfully published Cluster 
Heartbeat Monitor Address of {} to ZooKeeper", heartbeatAddress);
-                            return;
-                        } catch (final NoNodeException nne) {
-                            // ensure that parents are created, using a 
wide-open ACL because the parents contain no data
-                            // and the path is not in any way sensitive.
-                            try {
-                                
curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path);
-                            } catch (final NodeExistsException nee) {
-                                // This is okay. Node already exists.
-                            }
-
-                            
curatorClient.create().withMode(CreateMode.EPHEMERAL).forPath(path, 
heartbeatAddress.getBytes(StandardCharsets.UTF_8));
-                            logger.info("Successfully published address as 
heartbeat monitor address at path {} with value {}", path, heartbeatAddress);
-
-                            return;
-                        }
-                    } catch (Exception e) {
-                        logger.warn("Failed to update ZooKeeper to notify 
nodes of the heartbeat address. Will continue to retry.");
-
-                        try {
-                            Thread.sleep(2000L);
-                        } catch (final InterruptedException ie) {
-                            Thread.currentThread().interrupt();
-                            return;
-                        }
-                    }
-                }
-            }
-        });
-
-        publishAddress.setName("Publish Heartbeat Address");
-        publishAddress.setDaemon(true);
-        publishAddress.start();
     }
 
     @Override
     public void onStop() {
-        if (curatorClient != null) {
-            curatorClient.close();
-        }
     }
 
     @Override
@@ -195,10 +131,6 @@ public class ClusterProtocolHeartbeatMonitor extends 
AbstractHeartbeatMonitor im
         heartbeatMessages.remove(nodeId);
     }
 
-    protected Set<NodeIdentifier> getClusterNodeIds() {
-        return new HashSet<>(clusterNodeIds.values());
-    }
-
 
     @Override
     public ProtocolMessage handle(final ProtocolMessage msg) throws 
ProtocolException {
@@ -211,7 +143,6 @@ public class ClusterProtocolHeartbeatMonitor extends 
AbstractHeartbeatMonitor im
 
         final NodeIdentifier nodeId = heartbeat.getNodeIdentifier();
         final NodeConnectionStatus connectionStatus = 
heartbeat.getConnectionStatus();
-        final Set<String> roles = heartbeat.getRoles();
         final byte[] payloadBytes = heartbeat.getPayload();
         final HeartbeatPayload payload = 
HeartbeatPayload.unmarshal(payloadBytes);
         final int activeThreadCount = payload.getActiveThreadCount();
@@ -220,11 +151,49 @@ public class ClusterProtocolHeartbeatMonitor extends 
AbstractHeartbeatMonitor im
         final long systemStartTime = payload.getSystemStartTime();
 
         final NodeHeartbeat nodeHeartbeat = new StandardNodeHeartbeat(nodeId, 
System.currentTimeMillis(),
-            connectionStatus, roles, flowFileCount, flowFileBytes, 
activeThreadCount, systemStartTime);
+            connectionStatus, flowFileCount, flowFileBytes, activeThreadCount, 
systemStartTime);
         heartbeatMessages.put(heartbeat.getNodeIdentifier(), nodeHeartbeat);
         logger.debug("Received new heartbeat from {}", nodeId);
 
-        return null;
+        // Formulate a List of differences between our view of the cluster 
topology and the node's view
+        // and send that back to the node so that it is in-sync with us
+        final List<NodeConnectionStatus> nodeStatusList = 
payload.getClusterStatus();
+        final List<NodeConnectionStatus> updatedStatuses = 
getUpdatedStatuses(nodeStatusList);
+
+        final HeartbeatResponseMessage responseMessage = new 
HeartbeatResponseMessage();
+        responseMessage.setUpdatedNodeStatuses(updatedStatuses);
+        return responseMessage;
+    }
+
+
+    private List<NodeConnectionStatus> getUpdatedStatuses(final 
List<NodeConnectionStatus> nodeStatusList) {
+        // Map node's statuses by NodeIdentifier for quick & easy lookup
+        final Map<NodeIdentifier, NodeConnectionStatus> nodeStatusMap = 
nodeStatusList.stream()
+            .collect(Collectors.toMap(status -> status.getNodeIdentifier(), 
Function.identity()));
+
+        // Check if our connection status is the same for each Node Identifier 
and if not, add our version of the status
+        // to a List of updated statuses.
+        final List<NodeConnectionStatus> currentStatuses = 
clusterCoordinator.getConnectionStatuses();
+        final List<NodeConnectionStatus> updatedStatuses = new ArrayList<>();
+        for (final NodeConnectionStatus currentStatus : currentStatuses) {
+            final NodeConnectionStatus nodeStatus = 
nodeStatusMap.get(currentStatus.getNodeIdentifier());
+            if (!currentStatus.equals(nodeStatus)) {
+                updatedStatuses.add(currentStatus);
+            }
+        }
+
+        // If the node has any statuses that we do not have, add a REMOVED 
status to the update list
+        final Set<NodeIdentifier> nodeIds = 
currentStatuses.stream().map(status -> 
status.getNodeIdentifier()).collect(Collectors.toSet());
+        for (final NodeConnectionStatus nodeStatus : nodeStatusList) {
+            if (!nodeIds.contains(nodeStatus.getNodeIdentifier())) {
+                updatedStatuses.add(new 
NodeConnectionStatus(nodeStatus.getNodeIdentifier(), 
NodeConnectionState.REMOVED, null));
+            }
+        }
+
+        logger.debug("\n\nCalculated diff between current cluster status and 
node cluster status as follows:\nNode: {}\nSelf: {}\nDifference: {}\n\n",
+            nodeStatusList, currentStatuses, updatedStatuses);
+
+        return updatedStatuses;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/StandardNodeHeartbeat.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/StandardNodeHeartbeat.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/StandardNodeHeartbeat.java
index e455a76..a63db44 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/StandardNodeHeartbeat.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/StandardNodeHeartbeat.java
@@ -17,13 +17,9 @@
 
 package org.apache.nifi.cluster.coordination.heartbeat;
 
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.nifi.cluster.HeartbeatPayload;
 import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
 import org.apache.nifi.cluster.protocol.Heartbeat;
+import org.apache.nifi.cluster.protocol.HeartbeatPayload;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
 
@@ -32,18 +28,16 @@ public class StandardNodeHeartbeat implements NodeHeartbeat 
{
     private final NodeIdentifier nodeId;
     private final long timestamp;
     private final NodeConnectionStatus connectionStatus;
-    private final Set<String> roles;
     private final int flowFileCount;
     private final long flowFileBytes;
     private final int activeThreadCount;
     private final long systemStartTime;
 
     public StandardNodeHeartbeat(final NodeIdentifier nodeId, final long 
timestamp, final NodeConnectionStatus connectionStatus,
-        final Set<String> roles, final int flowFileCount, final long 
flowFileBytes, final int activeThreadCount, final long systemStartTime) {
+        final int flowFileCount, final long flowFileBytes, final int 
activeThreadCount, final long systemStartTime) {
         this.timestamp = timestamp;
         this.nodeId = nodeId;
         this.connectionStatus = connectionStatus;
-        this.roles = roles == null ? Collections.emptySet() : 
Collections.unmodifiableSet(new HashSet<>(roles));
         this.flowFileCount = flowFileCount;
         this.flowFileBytes = flowFileBytes;
         this.activeThreadCount = activeThreadCount;
@@ -66,11 +60,6 @@ public class StandardNodeHeartbeat implements NodeHeartbeat {
     }
 
     @Override
-    public Set<String> getRoles() {
-        return roles;
-    }
-
-    @Override
     public int getFlowFileCount() {
         return flowFileCount;
     }
@@ -85,7 +74,6 @@ public class StandardNodeHeartbeat implements NodeHeartbeat {
         return activeThreadCount;
     }
 
-
     @Override
     public long getSystemStartTime() {
         return systemStartTime;
@@ -96,7 +84,7 @@ public class StandardNodeHeartbeat implements NodeHeartbeat {
         final HeartbeatPayload payload = 
HeartbeatPayload.unmarshal(heartbeat.getPayload());
 
         return new StandardNodeHeartbeat(heartbeat.getNodeIdentifier(), 
timestamp, heartbeat.getConnectionStatus(),
-            heartbeat.getRoles(), (int) payload.getTotalFlowFileCount(), 
payload.getTotalFlowFileBytes(),
+            (int) payload.getTotalFlowFileCount(), 
payload.getTotalFlowFileBytes(),
             payload.getActiveThreadCount(), payload.getSystemStartTime());
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/CuratorNodeProtocolSender.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/CuratorNodeProtocolSender.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/CuratorNodeProtocolSender.java
deleted file mode 100644
index daa3e5c..0000000
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/CuratorNodeProtocolSender.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.cluster.coordination.node;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.charset.StandardCharsets;
-import java.util.Properties;
-
-import org.apache.curator.RetryPolicy;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.RetryNTimes;
-import org.apache.nifi.cluster.manager.exception.NoClusterCoordinatorException;
-import org.apache.nifi.cluster.protocol.AbstractNodeProtocolSender;
-import org.apache.nifi.cluster.protocol.ProtocolContext;
-import org.apache.nifi.cluster.protocol.ProtocolException;
-import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
-import org.apache.nifi.controller.cluster.ZooKeeperClientConfig;
-import org.apache.nifi.io.socket.SocketConfiguration;
-import org.apache.zookeeper.KeeperException.NoNodeException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Uses Apache Curator to determine the address of the current cluster 
coordinator
- */
-public class CuratorNodeProtocolSender extends AbstractNodeProtocolSender {
-    private static final Logger logger = 
LoggerFactory.getLogger(CuratorNodeProtocolSender.class);
-
-    private final String coordinatorPath;
-    private final ZooKeeperClientConfig zkConfig;
-    private InetSocketAddress coordinatorAddress;
-
-
-    public CuratorNodeProtocolSender(final SocketConfiguration socketConfig, 
final ProtocolContext<ProtocolMessage> protocolContext, final Properties 
properties) {
-        super(socketConfig, protocolContext);
-        zkConfig = ZooKeeperClientConfig.createConfig(properties);
-        coordinatorPath = zkConfig.resolvePath("cluster/nodes/coordinator");
-    }
-
-    @Override
-    protected synchronized InetSocketAddress getServiceAddress() throws 
IOException {
-        if (coordinatorAddress != null) {
-            return coordinatorAddress;
-        }
-
-        final RetryPolicy retryPolicy = new RetryNTimes(0, 0);
-        final CuratorFramework curatorClient = 
CuratorFrameworkFactory.newClient(zkConfig.getConnectString(),
-            zkConfig.getSessionTimeoutMillis(), 
zkConfig.getConnectionTimeoutMillis(), retryPolicy);
-        curatorClient.start();
-
-        try {
-            // Get coordinator address and add watcher to change who we are 
heartbeating to if the value changes.
-            final byte[] coordinatorAddressBytes = 
curatorClient.getData().usingWatcher(new Watcher() {
-                @Override
-                public void process(final WatchedEvent event) {
-                    coordinatorAddress = null;
-                }
-            }).forPath(coordinatorPath);
-
-            if (coordinatorAddressBytes == null || 
coordinatorAddressBytes.length == 0) {
-                throw new NoClusterCoordinatorException("No node has yet been 
elected Cluster Coordinator. Cannot establish connection to cluster yet.");
-            }
-
-            final String address = new String(coordinatorAddressBytes, 
StandardCharsets.UTF_8);
-
-            final String[] splits = address.split(":");
-            if (splits.length != 2) {
-                final String message = String.format("Attempted to determine 
Cluster Coordinator address. Zookeeper indicates "
-                    + "that address is %s, but this is not in the expected 
format of <hostname>:<port>", address);
-                logger.error(message);
-                throw new ProtocolException(message);
-            }
-
-            logger.info("Determined that Cluster Coordinator is located at {}; 
will use this address for sending heartbeat messages", address);
-
-            final String hostname = splits[0];
-            final int port;
-            try {
-                port = Integer.parseInt(splits[1]);
-                if (port < 1 || port > 65535) {
-                    throw new NumberFormatException("Port must be in the range 
of 1 - 65535 but got " + port);
-                }
-            } catch (final NumberFormatException nfe) {
-                final String message = String.format("Attempted to determine 
Cluster Coordinator address. Zookeeper indicates "
-                    + "that address is %s, but the port is not a valid port 
number", address);
-                logger.error(message);
-                throw new ProtocolException(message);
-            }
-
-            final InetSocketAddress socketAddress = 
InetSocketAddress.createUnresolved(hostname, port);
-            coordinatorAddress = socketAddress;
-            return socketAddress;
-        } catch (final NoNodeException nne) {
-            logger.info("No node has yet been elected Cluster Coordinator. 
Cannot establish connection to cluster yet.");
-            throw new NoClusterCoordinatorException("No node has yet been 
elected Cluster Coordinator. Cannot establish connection to cluster yet.");
-        } catch (final NoClusterCoordinatorException ncce) {
-            throw ncce;
-        } catch (Exception e) {
-            throw new IOException("Unable to determine Cluster Coordinator 
from ZooKeeper", e);
-        } finally {
-            curatorClient.close();
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/LeaderElectionNodeProtocolSender.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/LeaderElectionNodeProtocolSender.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/LeaderElectionNodeProtocolSender.java
new file mode 100644
index 0000000..03de329
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/LeaderElectionNodeProtocolSender.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.node;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.cluster.manager.exception.NoClusterCoordinatorException;
+import org.apache.nifi.cluster.protocol.AbstractNodeProtocolSender;
+import org.apache.nifi.cluster.protocol.ProtocolContext;
+import org.apache.nifi.cluster.protocol.ProtocolException;
+import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
+import org.apache.nifi.controller.leader.election.LeaderElectionManager;
+import org.apache.nifi.io.socket.SocketConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LeaderElectionNodeProtocolSender extends 
AbstractNodeProtocolSender {
+    private static final Logger logger = 
LoggerFactory.getLogger(LeaderElectionNodeProtocolSender.class);
+
+    private final LeaderElectionManager electionManager;
+
+    public LeaderElectionNodeProtocolSender(final SocketConfiguration 
socketConfiguration, final ProtocolContext<ProtocolMessage> protocolContext, 
final LeaderElectionManager electionManager) {
+        super(socketConfiguration, protocolContext);
+        this.electionManager = electionManager;
+    }
+
+    @Override
+    protected InetSocketAddress getServiceAddress() throws IOException {
+        final String address = 
electionManager.getLeader(ClusterRoles.CLUSTER_COORDINATOR);
+
+        if (StringUtils.isEmpty(address)) {
+            throw new NoClusterCoordinatorException("No node has yet been 
elected Cluster Coordinator. Cannot establish connection to cluster yet.");
+        }
+
+        final String[] splits = address.split(":");
+        if (splits.length != 2) {
+            final String message = String.format("Attempted to determine 
Cluster Coordinator address. Zookeeper indicates "
+                + "that address is %s, but this is not in the expected format 
of <hostname>:<port>", address);
+            logger.error(message);
+            throw new ProtocolException(message);
+        }
+
+        logger.info("Determined that Cluster Coordinator is located at {}; 
will use this address for sending heartbeat messages", address);
+
+        final String hostname = splits[0];
+        final int port;
+        try {
+            port = Integer.parseInt(splits[1]);
+            if (port < 1 || port > 65535) {
+                throw new NumberFormatException("Port must be in the range of 
1 - 65535 but got " + port);
+            }
+        } catch (final NumberFormatException nfe) {
+            final String message = String.format("Attempted to determine 
Cluster Coordinator address. Zookeeper indicates "
+                + "that address is %s, but the port is not a valid port 
number", address);
+            logger.error(message);
+            throw new ProtocolException(message);
+        }
+
+        final InetSocketAddress socketAddress = 
InetSocketAddress.createUnresolved(hostname, port);
+        return socketAddress;
+    }
+
+}

Reply via email to