http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
index ac39dc5..8e580db 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
@@ -18,28 +18,23 @@
 package org.apache.nifi.cluster.coordination.node;
 
 import java.io.IOException;
-import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Supplier;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
 import org.apache.commons.collections4.queue.CircularFifoQueue;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.curator.RetryPolicy;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.RetryNTimes;
 import org.apache.nifi.cluster.coordination.ClusterCoordinator;
 import org.apache.nifi.cluster.coordination.http.HttpResponseMerger;
 import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMerger;
@@ -50,7 +45,6 @@ import org.apache.nifi.cluster.firewall.ClusterNodeFirewall;
 import org.apache.nifi.cluster.manager.NodeResponse;
 import 
org.apache.nifi.cluster.manager.exception.IllegalNodeDisconnectionException;
 import org.apache.nifi.cluster.manager.exception.NoClusterCoordinatorException;
-import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
 import org.apache.nifi.cluster.protocol.ComponentRevision;
 import org.apache.nifi.cluster.protocol.ConnectionRequest;
 import org.apache.nifi.cluster.protocol.ConnectionResponse;
@@ -68,14 +62,11 @@ import 
org.apache.nifi.cluster.protocol.message.NodeStatusChangeMessage;
 import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
 import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType;
 import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
-import org.apache.nifi.controller.cluster.ZooKeeperClientConfig;
+import org.apache.nifi.controller.leader.election.LeaderElectionManager;
 import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.reporting.Severity;
 import org.apache.nifi.services.FlowService;
 import org.apache.nifi.web.revision.RevisionManager;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -92,37 +83,24 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
     private final EventReporter eventReporter;
     private final ClusterNodeFirewall firewall;
     private final RevisionManager revisionManager;
-
-    // Curator used to determine which node is coordinator
-    private final CuratorFramework curatorClient;
-    private final String nodesPathPrefix;
-    private final String coordinatorPath;
+    private final LeaderElectionManager leaderElectionManager;
+    private final AtomicLong latestUpdateId = new AtomicLong(-1);
 
     private volatile FlowService flowService;
     private volatile boolean connected;
-    private volatile String coordinatorAddress;
     private volatile boolean closed = false;
 
     private final ConcurrentMap<NodeIdentifier, NodeConnectionStatus> 
nodeStatuses = new ConcurrentHashMap<>();
     private final ConcurrentMap<NodeIdentifier, CircularFifoQueue<NodeEvent>> 
nodeEvents = new ConcurrentHashMap<>();
 
-    public NodeClusterCoordinator(final 
ClusterCoordinationProtocolSenderListener senderListener, final EventReporter 
eventReporter,
-        final ClusterNodeFirewall firewall, final RevisionManager 
revisionManager, final Properties nifiProperties) {
+    public NodeClusterCoordinator(final 
ClusterCoordinationProtocolSenderListener senderListener, final EventReporter 
eventReporter, final LeaderElectionManager leaderElectionManager,
+        final ClusterNodeFirewall firewall, final RevisionManager 
revisionManager) {
         this.senderListener = senderListener;
         this.flowService = null;
         this.eventReporter = eventReporter;
         this.firewall = firewall;
         this.revisionManager = revisionManager;
-
-        final RetryPolicy retryPolicy = new RetryNTimes(10, 500);
-        final ZooKeeperClientConfig zkConfig = 
ZooKeeperClientConfig.createConfig(nifiProperties);
-
-        curatorClient = 
CuratorFrameworkFactory.newClient(zkConfig.getConnectString(),
-            zkConfig.getSessionTimeoutMillis(), 
zkConfig.getConnectionTimeoutMillis(), retryPolicy);
-
-        curatorClient.start();
-        nodesPathPrefix = zkConfig.resolvePath("cluster/nodes");
-        coordinatorPath = nodesPathPrefix + "/coordinator";
+        this.leaderElectionManager = leaderElectionManager;
 
         senderListener.addHandler(this);
     }
@@ -138,10 +116,9 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
         final NodeConnectionStatus shutdownStatus = new 
NodeConnectionStatus(getLocalNodeIdentifier(), DisconnectionCode.NODE_SHUTDOWN);
         updateNodeStatus(shutdownStatus, false);
         logger.info("Successfully notified other nodes that I am shutting 
down");
-
-        curatorClient.close();
     }
 
+
     @Override
     public void setLocalNodeIdentifier(final NodeIdentifier nodeId) {
         this.nodeId = nodeId;
@@ -153,10 +130,6 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
         return nodeId;
     }
 
-    private NodeIdentifier waitForLocalNodeIdentifier() {
-        return waitForNodeIdentifier(() -> getLocalNodeIdentifier());
-    }
-
     private NodeIdentifier waitForElectedClusterCoordinator() {
         return waitForNodeIdentifier(() -> 
getElectedActiveCoordinatorNode(false));
     }
@@ -174,6 +147,7 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
                     Thread.sleep(100L);
                 } catch (final InterruptedException ie) {
                     Thread.currentThread().interrupt();
+                    return null;
                 }
             }
         }
@@ -182,34 +156,12 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
     }
 
     private String getElectedActiveCoordinatorAddress() throws IOException {
-        final String curAddress = coordinatorAddress;
-        if (curAddress != null) {
-            return curAddress;
-        }
-
-        try {
-            // Get coordinator address and add watcher to change who we are 
heartbeating to if the value changes.
-            final byte[] coordinatorAddressBytes = 
curatorClient.getData().usingWatcher(new Watcher() {
-                @Override
-                public void process(final WatchedEvent event) {
-                    coordinatorAddress = null;
-                }
-            }).forPath(coordinatorPath);
-            final String address = coordinatorAddress = new 
String(coordinatorAddressBytes, StandardCharsets.UTF_8);
-
-            logger.info("Determined that Cluster Coordinator is located at 
{}", address);
-            return address;
-        } catch (final KeeperException.NoNodeException nne) {
-            throw new NoClusterCoordinatorException();
-        } catch (Exception e) {
-            throw new IOException("Unable to determine Cluster Coordinator 
from ZooKeeper", e);
-        }
+        return 
leaderElectionManager.getLeader(ClusterRoles.CLUSTER_COORDINATOR);
     }
 
     @Override
     public void resetNodeStatuses(final Map<NodeIdentifier, 
NodeConnectionStatus> statusMap) {
         logger.info("Resetting cluster node statuses from {} to {}", 
nodeStatuses, statusMap);
-        coordinatorAddress = null;
 
         // For each proposed replacement, update the nodeStatuses map if and 
only if the replacement
         // has a larger update id than the current value.
@@ -217,12 +169,29 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
             final NodeIdentifier nodeId = entry.getKey();
             final NodeConnectionStatus proposedStatus = entry.getValue();
 
-            boolean updated = false;
-            while (!updated) {
-                final NodeConnectionStatus currentStatus = 
nodeStatuses.get(nodeId);
-                updated = replaceNodeStatus(nodeId, currentStatus, 
proposedStatus);
+            if (proposedStatus.getState() == NodeConnectionState.REMOVED) {
+                nodeStatuses.remove(nodeId);
+            } else {
+                nodeStatuses.put(nodeId, proposedStatus);
+            }
+        }
+    }
+
+    @Override
+    public boolean resetNodeStatus(final NodeConnectionStatus 
connectionStatus, final long qualifyingUpdateId) {
+        final NodeIdentifier nodeId = connectionStatus.getNodeIdentifier();
+        final NodeConnectionStatus currentStatus = getConnectionStatus(nodeId);
+
+        if (currentStatus == null) {
+            return replaceNodeStatus(nodeId, null, connectionStatus);
+        } else {
+            if (currentStatus.getUpdateIdentifier() == qualifyingUpdateId) {
+                return replaceNodeStatus(nodeId, currentStatus, 
connectionStatus);
             }
         }
+
+        // The update identifier is not the same. We will not replace the value
+        return false;
     }
 
     /**
@@ -242,11 +211,19 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
         }
 
         if (currentStatus == null) {
-            final NodeConnectionStatus existingValue = 
nodeStatuses.putIfAbsent(nodeId, newStatus);
-            return existingValue == null;
+            if (newStatus.getState() == NodeConnectionState.REMOVED) {
+                return nodeStatuses.remove(nodeId, currentStatus);
+            } else {
+                final NodeConnectionStatus existingValue = 
nodeStatuses.putIfAbsent(nodeId, newStatus);
+                return existingValue == null;
+            }
         }
 
-        return nodeStatuses.replace(nodeId, currentStatus, newStatus);
+        if (newStatus.getState() == NodeConnectionState.REMOVED) {
+            return nodeStatuses.remove(nodeId, currentStatus);
+        } else {
+            return nodeStatuses.replace(nodeId, currentStatus, newStatus);
+        }
     }
 
     @Override
@@ -257,7 +234,7 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
             reportEvent(nodeId, Severity.INFO, "Requesting that node connect 
to cluster on behalf of " + userDn);
         }
 
-        updateNodeStatus(new NodeConnectionStatus(nodeId, 
NodeConnectionState.CONNECTING, null, null, System.currentTimeMillis(), 
getRoles(nodeId)));
+        updateNodeStatus(new NodeConnectionStatus(nodeId, 
NodeConnectionState.CONNECTING, null, null, System.currentTimeMillis()));
 
         // create the request
         final ReconnectionRequestMessage request = new 
ReconnectionRequestMessage();
@@ -267,10 +244,6 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
         requestReconnectionAsynchronously(request, 10, 5);
     }
 
-    private Set<String> getRoles(final NodeIdentifier nodeId) {
-        final NodeConnectionStatus status = getConnectionStatus(nodeId);
-        return status == null ? Collections.emptySet() : status.getRoles();
-    }
 
     @Override
     public void finishNodeConnection(final NodeIdentifier nodeId) {
@@ -293,7 +266,7 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
         }
 
         logger.info("{} is now connected", nodeId);
-        updateNodeStatus(new NodeConnectionStatus(nodeId, 
NodeConnectionState.CONNECTED, getRoles(nodeId)));
+        updateNodeStatus(new NodeConnectionStatus(nodeId, 
NodeConnectionState.CONNECTED));
     }
 
 
@@ -350,7 +323,7 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
         reportEvent(nodeId, Severity.INFO, "User " + userDn + " requested that 
node be removed from cluster");
         nodeStatuses.remove(nodeId);
         nodeEvents.remove(nodeId);
-        notifyOthersOfNodeStatusChange(new NodeConnectionStatus(nodeId, 
NodeConnectionState.REMOVED, Collections.emptySet()));
+        notifyOthersOfNodeStatusChange(new NodeConnectionStatus(nodeId, 
NodeConnectionState.REMOVED));
     }
 
     @Override
@@ -363,6 +336,10 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
         return status == null ? null : status.getState();
     }
 
+    @Override
+    public List<NodeConnectionStatus> getConnectionStatuses() {
+        return new ArrayList<>(nodeStatuses.values());
+    }
 
     @Override
     public Map<NodeConnectionState, List<NodeIdentifier>> 
getConnectionStates() {
@@ -402,57 +379,6 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
         }
     }
 
-    @Override
-    public synchronized void updateNodeRoles(final NodeIdentifier nodeId, 
final Set<String> roles) {
-        boolean updated = false;
-        while (!updated) {
-            final NodeConnectionStatus currentStatus = 
nodeStatuses.get(nodeId);
-            if (currentStatus == null) {
-                throw new UnknownNodeException("Cannot update roles for " + 
nodeId + " to " + roles + " because the node is not part of this cluster");
-            }
-
-            if (currentStatus.getRoles().equals(roles)) {
-                logger.debug("Roles for {} already up-to-date as {}", nodeId, 
roles);
-                return;
-            }
-
-            final NodeConnectionStatus updatedStatus = new 
NodeConnectionStatus(currentStatus, roles);
-            updated = replaceNodeStatus(nodeId, currentStatus, updatedStatus);
-
-            if (updated) {
-                logger.info("Updated Roles of {} from {} to {}", nodeId, 
currentStatus, updatedStatus);
-                notifyOthersOfNodeStatusChange(updatedStatus);
-            }
-        }
-
-        // If any other node contains any of the given roles, revoke the role 
from the other node.
-        for (final String role : roles) {
-            for (final Map.Entry<NodeIdentifier, NodeConnectionStatus> entry : 
nodeStatuses.entrySet()) {
-                if (entry.getKey().equals(nodeId)) {
-                    continue;
-                }
-
-                updated = false;
-                while (!updated) {
-                    final NodeConnectionStatus status = entry.getValue();
-                    if (status.getRoles().contains(role)) {
-                        final Set<String> newRoles = new 
HashSet<>(status.getRoles());
-                        newRoles.remove(role);
-
-                        final NodeConnectionStatus updatedStatus = new 
NodeConnectionStatus(status, newRoles);
-                        updated = replaceNodeStatus(entry.getKey(), status, 
updatedStatus);
-
-                        if (updated) {
-                            logger.info("Updated Roles of {} from {} to {}", 
nodeId, status, updatedStatus);
-                            notifyOthersOfNodeStatusChange(updatedStatus);
-                        }
-                    } else {
-                        updated = true;
-                    }
-                }
-            }
-        }
-    }
 
     @Override
     public NodeIdentifier getNodeIdentifier(final String uuid) {
@@ -465,47 +391,6 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
         return null;
     }
 
-    // method is synchronized because it modifies local node state and then 
broadcasts the change. We synchronize any time that this
-    // is done so that we don't have an issue where we create a 
NodeConnectionStatus, then another thread creates one and sends it
-    // before the first one is sent (as this results in the first status 
having a larger id, which means that the first status is never
-    // seen by other nodes).
-    @Override
-    public synchronized void addRole(final String clusterRole) {
-        final NodeIdentifier localNodeId = waitForLocalNodeIdentifier();
-        final NodeConnectionStatus status = getConnectionStatus(localNodeId);
-        final Set<String> roles = new HashSet<>();
-        if (status != null) {
-            roles.addAll(status.getRoles());
-        }
-
-        final boolean roleAdded = roles.add(clusterRole);
-
-        if (roleAdded) {
-            updateNodeRoles(localNodeId, roles);
-            logger.info("Cluster role {} added. This node is now responsible 
for the following roles: {}", clusterRole, roles);
-        }
-    }
-
-    // method is synchronized because it modifies local node state and then 
broadcasts the change. We synchronize any time that this
-    // is done so that we don't have an issue where we create a 
NodeConnectionStatus, then another thread creates one and sends it
-    // before the first one is sent (as this results in the first status 
having a larger id, which means that the first status is never
-    // seen by other nodes).
-    @Override
-    public synchronized void removeRole(final String clusterRole) {
-        final NodeIdentifier localNodeId = waitForLocalNodeIdentifier();
-        final NodeConnectionStatus status = getConnectionStatus(localNodeId);
-        final Set<String> roles = new HashSet<>();
-        if (status != null) {
-            roles.addAll(status.getRoles());
-        }
-
-        final boolean roleRemoved = roles.remove(clusterRole);
-
-        if (roleRemoved) {
-            updateNodeRoles(localNodeId, roles);
-            logger.info("Cluster role {} removed. This node is now responsible 
for the following roles: {}", clusterRole, roles);
-        }
-    }
 
     @Override
     public Set<NodeIdentifier> getNodeIdentifiers(final NodeConnectionState... 
states) {
@@ -528,10 +413,14 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
 
     @Override
     public NodeIdentifier getPrimaryNode() {
-        return nodeStatuses.values().stream()
-            .filter(status -> 
status.getRoles().contains(ClusterRoles.PRIMARY_NODE))
+        final String primaryNodeAddress = 
leaderElectionManager.getLeader(ClusterRoles.PRIMARY_NODE);
+        if (primaryNodeAddress == null) {
+            return null;
+        }
+
+        return nodeStatuses.keySet().stream()
+            .filter(nodeId -> 
primaryNodeAddress.equals(nodeId.getSocketAddress() + ":" + 
nodeId.getSocketPort()))
             .findFirst()
-            .map(status -> status.getNodeIdentifier())
             .orElse(null);
     }
 
@@ -558,6 +447,11 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
             return null;
         }
 
+        if (electedNodeAddress == null) {
+            logger.debug("There is currently no elected active Cluster 
Coordinator");
+            return null;
+        }
+
         final int colonLoc = electedNodeAddress.indexOf(':');
         if (colonLoc < 1) {
             if (warnOnError) {
@@ -679,12 +573,10 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
         logger.info("Status of {} changed from {} to {}", nodeId, 
currentStatus, status);
         logger.debug("State of cluster nodes is now {}", nodeStatuses);
 
+        latestUpdateId.updateAndGet(curVal -> Math.max(curVal, 
status.getUpdateIdentifier()));
+
         if (currentState == null || currentState != status.getState()) {
-            // We notify all nodes of the status change if either this node is 
the current cluster coordinator, OR if the node was
-            // the cluster coordinator and no longer is. This is done because 
if a user disconnects the cluster coordinator, we need
-            // to broadcast to the cluster that this node is no longer the 
coordinator. Otherwise, all nodes but this one will still
-            // believe that this node is connected to the cluster.
-            final boolean notifyAllNodes = isActiveClusterCoordinator() || 
(currentStatus != null && 
currentStatus.getRoles().contains(ClusterRoles.CLUSTER_COORDINATOR));
+            final boolean notifyAllNodes = isActiveClusterCoordinator();
             if (notifyAllNodes) {
                 logger.debug("Notifying all nodes that status changed from {} 
to {}", currentStatus, status);
             } else {
@@ -697,6 +589,7 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
         }
     }
 
+
     void notifyOthersOfNodeStatusChange(final NodeConnectionStatus 
updatedStatus) {
         notifyOthersOfNodeStatusChange(updatedStatus, 
isActiveClusterCoordinator(), true);
     }
@@ -784,7 +677,7 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
                         }
 
                         request.setDataFlow(new 
StandardDataFlow(flowService.createDataFlow()));
-                        request.setNodeConnectionStatuses(new 
ArrayList<>(nodeStatuses.values()));
+                        
request.setNodeConnectionStatuses(getConnectionStatuses());
                         
request.setComponentRevisions(revisionManager.getAllRevisions().stream().map(rev
 -> ComponentRevision.fromRevision(rev)).collect(Collectors.toList()));
 
                         // Issue a reconnection request to the node.
@@ -843,43 +736,9 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
     private String summarizeStatusChange(final NodeConnectionStatus oldStatus, 
final NodeConnectionStatus status) {
         final StringBuilder sb = new StringBuilder();
 
-        if (oldStatus != null && status.getState() == oldStatus.getState()) {
-            // Check if roles changed
-            final Set<String> oldRoles = oldStatus.getRoles();
-            final Set<String> newRoles = status.getRoles();
-
-            final Set<String> rolesRemoved = new HashSet<>(oldRoles);
-            rolesRemoved.removeAll(newRoles);
-
-            final Set<String> rolesAdded = new HashSet<>(newRoles);
-            rolesAdded.removeAll(oldRoles);
-
-            if (!rolesRemoved.isEmpty()) {
-                sb.append("Relinquished role");
-                if (rolesRemoved.size() != 1) {
-                    sb.append("s");
-                }
-
-                sb.append(" ").append(rolesRemoved);
-            }
-
-            if (!rolesAdded.isEmpty()) {
-                if (sb.length() > 0) {
-                    sb.append("; ");
-                }
-
-                sb.append("Acquired role");
-                if (rolesAdded.size() != 1) {
-                    sb.append("s");
-                }
-
-                sb.append(" ").append(rolesAdded);
-            }
-        } else {
+        if (oldStatus == null || status.getState() != oldStatus.getState()) {
             sb.append("Node Status changed from ").append(oldStatus == null ? 
"[Unknown Node]" : oldStatus.getState().toString()).append(" to 
").append(status.getState().toString());
-            if (status.getState() == NodeConnectionState.CONNECTED) {
-                sb.append(" 
(Roles=").append(status.getRoles().toString()).append(")");
-            } else if (status.getDisconnectReason() != null) {
+            if (status.getDisconnectReason() != null) {
                 sb.append(" due to ").append(status.getDisconnectReason());
             } else if (status.getDisconnectCode() != null) {
                 sb.append(" due to 
").append(status.getDisconnectCode().toString());
@@ -894,35 +753,30 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
         final NodeIdentifier nodeId = statusChangeMessage.getNodeId();
         logger.debug("Handling request {}", statusChangeMessage);
 
-        boolean updated = false;
-        while (!updated) {
-            final NodeConnectionStatus oldStatus = 
nodeStatuses.get(statusChangeMessage.getNodeId());
-
-            // Either remove the value from the map or update the map 
depending on the connection state
-            if (statusChangeMessage.getNodeConnectionStatus().getState() == 
NodeConnectionState.REMOVED) {
-                updated = nodeStatuses.remove(nodeId, oldStatus);
-            } else {
-                updated = replaceNodeStatus(nodeId, oldStatus, updatedStatus);
-            }
+        final NodeConnectionStatus oldStatus = 
nodeStatuses.get(statusChangeMessage.getNodeId());
 
-            if (updated) {
-                logger.info("Status of {} changed from {} to {}", 
statusChangeMessage.getNodeId(), oldStatus, updatedStatus);
-                logger.debug("State of cluster nodes is now {}", nodeStatuses);
+        // Either remove the value from the map or update the map depending on 
the connection state
+        if (statusChangeMessage.getNodeConnectionStatus().getState() == 
NodeConnectionState.REMOVED) {
+        nodeStatuses.remove(nodeId, oldStatus);
+        } else {
+        nodeStatuses.put(nodeId, updatedStatus);
+        }
 
-                final NodeConnectionStatus status = 
statusChangeMessage.getNodeConnectionStatus();
-                final String summary = summarizeStatusChange(oldStatus, 
status);
-                if (!StringUtils.isEmpty(summary)) {
-                    addNodeEvent(nodeId, summary);
-                }
+        logger.info("Status of {} changed from {} to {}", 
statusChangeMessage.getNodeId(), oldStatus, updatedStatus);
+        logger.debug("State of cluster nodes is now {}", nodeStatuses);
 
-                // Update our counter so that we are in-sync with the cluster 
on the
-                // most up-to-date version of the NodeConnectionStatus' Update 
Identifier.
-                // We do this so that we can accurately compare status updates 
that are generated
-                // locally against those generated from other nodes in the 
cluster.
-                
NodeConnectionStatus.updateIdGenerator(updatedStatus.getUpdateIdentifier());
-            }
+        final NodeConnectionStatus status = 
statusChangeMessage.getNodeConnectionStatus();
+        final String summary = summarizeStatusChange(oldStatus, status);
+        if (!StringUtils.isEmpty(summary)) {
+            addNodeEvent(nodeId, summary);
         }
 
+        // Update our counter so that we are in-sync with the cluster on the
+        // most up-to-date version of the NodeConnectionStatus' Update 
Identifier.
+        // We do this so that we can accurately compare status updates that 
are generated
+        // locally against those generated from other nodes in the cluster.
+        
NodeConnectionStatus.updateIdGenerator(updatedStatus.getUpdateIdentifier());
+
         if (isActiveClusterCoordinator()) {
             
notifyOthersOfNodeStatusChange(statusChangeMessage.getNodeConnectionStatus());
         }
@@ -980,7 +834,7 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
             addNodeEvent(resolvedNodeIdentifier, "Connection requested from 
existing node.  Setting status to connecting");
         }
 
-        status = new NodeConnectionStatus(resolvedNodeIdentifier, 
NodeConnectionState.CONNECTING, null, null, System.currentTimeMillis(), 
getRoles(resolvedNodeIdentifier));
+        status = new NodeConnectionStatus(resolvedNodeIdentifier, 
NodeConnectionState.CONNECTING, null, null, System.currentTimeMillis());
         updateNodeStatus(status);
 
         DataFlow dataFlow = null;
@@ -1004,7 +858,7 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
             return new ConnectionResponse(tryAgainSeconds);
         }
 
-        return new ConnectionResponse(resolvedNodeIdentifier, dataFlow, 
instanceId, new ArrayList<>(nodeStatuses.values()),
+        return new ConnectionResponse(resolvedNodeIdentifier, dataFlow, 
instanceId, getConnectionStatuses(),
             revisionManager.getAllRevisions().stream().map(rev -> 
ComponentRevision.fromRevision(rev)).collect(Collectors.toList()));
     }
 
@@ -1103,7 +957,6 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
     @Override
     public void setConnected(final boolean connected) {
         this.connected = connected;
-        this.coordinatorAddress = null; // if connection state changed, we are 
not sure about the coordinator. Check for address again.
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/NodeClusterCoordinatorFactoryBean.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/NodeClusterCoordinatorFactoryBean.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/NodeClusterCoordinatorFactoryBean.java
index b414e0d..0b83c23 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/NodeClusterCoordinatorFactoryBean.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/NodeClusterCoordinatorFactoryBean.java
@@ -20,6 +20,7 @@ package org.apache.nifi.cluster.spring;
 import org.apache.nifi.cluster.coordination.node.NodeClusterCoordinator;
 import org.apache.nifi.cluster.firewall.ClusterNodeFirewall;
 import 
org.apache.nifi.cluster.protocol.impl.ClusterCoordinationProtocolSenderListener;
+import org.apache.nifi.controller.leader.election.LeaderElectionManager;
 import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.web.revision.RevisionManager;
@@ -42,8 +43,9 @@ public class NodeClusterCoordinatorFactoryBean implements 
FactoryBean<NodeCluste
             final EventReporter eventReporter = 
applicationContext.getBean("eventReporter", EventReporter.class);
             final ClusterNodeFirewall clusterFirewall = 
applicationContext.getBean("clusterFirewall", ClusterNodeFirewall.class);
             final RevisionManager revisionManager = 
applicationContext.getBean("revisionManager", RevisionManager.class);
+            final LeaderElectionManager electionManager = 
applicationContext.getBean("leaderElectionManager", 
LeaderElectionManager.class);
 
-            nodeClusterCoordinator = new 
NodeClusterCoordinator(protocolSenderListener, eventReporter, clusterFirewall, 
revisionManager, properties);
+            nodeClusterCoordinator = new 
NodeClusterCoordinator(protocolSenderListener, eventReporter, electionManager, 
clusterFirewall, revisionManager);
         }
 
         return nodeClusterCoordinator;

http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/resources/nifi-cluster-manager-context.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/resources/nifi-cluster-manager-context.xml
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/resources/nifi-cluster-manager-context.xml
index a95db4b..84c9deb 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/resources/nifi-cluster-manager-context.xml
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/resources/nifi-cluster-manager-context.xml
@@ -35,6 +35,12 @@
         <property name="properties" ref="nifiProperties"/>
     </bean>
 
+    <!-- Leader Election Manager -->
+    <bean id="leaderElectionManager" 
class="org.apache.nifi.spring.LeaderElectionManagerFactoryBean">
+        <property name="numThreads" value="4" />
+        <property name="properties" ref="nifiProperties" />
+    </bean>
+
     <!-- Cluster Coordinator -->
     <bean id="clusterCoordinator" 
class="org.apache.nifi.cluster.spring.NodeClusterCoordinatorFactoryBean">
         <property name="properties" ref="nifiProperties"/>

http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
index 5086dc0..46ea49b 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
@@ -169,8 +169,8 @@ public class TestAbstractHeartbeatMonitor {
 
 
     private NodeHeartbeat createHeartbeat(final NodeIdentifier nodeId, final 
NodeConnectionState state) {
-        final NodeConnectionStatus status = new NodeConnectionStatus(nodeId, 
state, Collections.emptySet());
-        return new StandardNodeHeartbeat(nodeId, System.currentTimeMillis(), 
status, Collections.emptySet(), 0, 0, 0, 0);
+        final NodeConnectionStatus status = new NodeConnectionStatus(nodeId, 
state);
+        return new StandardNodeHeartbeat(nodeId, System.currentTimeMillis(), 
status, 0, 0, 0, 0);
     }
 
     private TestFriendlyHeartbeatMonitor createMonitor(final 
ClusterCoordinator coordinator) {
@@ -195,7 +195,7 @@ public class TestAbstractHeartbeatMonitor {
 
         @Override
         public synchronized void requestNodeConnect(NodeIdentifier nodeId, 
String userDn) {
-            statuses.put(nodeId, new NodeConnectionStatus(nodeId, 
NodeConnectionState.CONNECTING, Collections.emptySet()));
+            statuses.put(nodeId, new NodeConnectionStatus(nodeId, 
NodeConnectionState.CONNECTING));
         }
 
         @Override
@@ -205,17 +205,17 @@ public class TestAbstractHeartbeatMonitor {
 
         @Override
         public synchronized void finishNodeConnection(NodeIdentifier nodeId) {
-            statuses.put(nodeId, new NodeConnectionStatus(nodeId, 
NodeConnectionState.CONNECTED, Collections.emptySet()));
+            statuses.put(nodeId, new NodeConnectionStatus(nodeId, 
NodeConnectionState.CONNECTED));
         }
 
         @Override
         public synchronized void requestNodeDisconnect(NodeIdentifier nodeId, 
DisconnectionCode disconnectionCode, String explanation) {
-            statuses.put(nodeId, new NodeConnectionStatus(nodeId, 
NodeConnectionState.DISCONNECTED, Collections.emptySet()));
+            statuses.put(nodeId, new NodeConnectionStatus(nodeId, 
NodeConnectionState.DISCONNECTED));
         }
 
         @Override
         public synchronized void disconnectionRequestedByNode(NodeIdentifier 
nodeId, DisconnectionCode disconnectionCode, String explanation) {
-            statuses.put(nodeId, new NodeConnectionStatus(nodeId, 
NodeConnectionState.DISCONNECTED, Collections.emptySet()));
+            statuses.put(nodeId, new NodeConnectionStatus(nodeId, 
NodeConnectionState.DISCONNECTED));
         }
 
         @Override
@@ -246,10 +246,6 @@ public class TestAbstractHeartbeatMonitor {
             events.add(new ReportedEvent(nodeId, severity, event));
         }
 
-        @Override
-        public void updateNodeRoles(NodeIdentifier nodeId, Set<String> roles) {
-        }
-
         synchronized List<ReportedEvent> getEvents() {
             return new ArrayList<>(events);
         }
@@ -310,16 +306,18 @@ public class TestAbstractHeartbeatMonitor {
         }
 
         @Override
-        public void addRole(String clusterRole) {
+        public NodeIdentifier getLocalNodeIdentifier() {
+            return null;
         }
 
         @Override
-        public void removeRole(String clusterRole) {
+        public List<NodeConnectionStatus> getConnectionStatuses() {
+            return Collections.emptyList();
         }
 
         @Override
-        public NodeIdentifier getLocalNodeIdentifier() {
-            return null;
+        public boolean resetNodeStatus(NodeConnectionStatus connectionStatus, 
long qualifyingUpdateId) {
+            return false;
         }
     }
 
@@ -360,5 +358,10 @@ public class TestAbstractHeartbeatMonitor {
                 mutex.wait();
             }
         }
+
+        @Override
+        public String getHeartbeatAddress() {
+            return "localhost";
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
index 5eac846..f18d589 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
@@ -28,7 +28,6 @@ import java.net.SocketTimeoutException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -164,7 +163,7 @@ public class TestThreadPoolRequestReplicator {
         nodeIds.add(nodeId);
 
         final ClusterCoordinator coordinator = 
Mockito.mock(ClusterCoordinator.class);
-        
Mockito.when(coordinator.getConnectionStatus(Mockito.any(NodeIdentifier.class))).thenReturn(new
 NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTED, 
Collections.emptySet()));
+        
Mockito.when(coordinator.getConnectionStatus(Mockito.any(NodeIdentifier.class))).thenReturn(new
 NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTED));
 
         final AtomicInteger requestCount = new AtomicInteger(0);
         final ThreadPoolRequestReplicator replicator = new 
ThreadPoolRequestReplicator(2, new Client(), coordinator, "1 sec", "1 sec", 
null, null) {
@@ -210,7 +209,7 @@ public class TestThreadPoolRequestReplicator {
         
Mockito.when(coordinator.getConnectionStatus(Mockito.any(NodeIdentifier.class))).thenAnswer(new
 Answer<NodeConnectionStatus>() {
             @Override
             public NodeConnectionStatus answer(InvocationOnMock invocation) 
throws Throwable {
-                return new NodeConnectionStatus(invocation.getArgumentAt(0, 
NodeIdentifier.class), NodeConnectionState.CONNECTED, Collections.emptySet());
+                return new NodeConnectionStatus(invocation.getArgumentAt(0, 
NodeIdentifier.class), NodeConnectionState.CONNECTED);
             }
         });
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
index 2f034b3..91174ca 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
@@ -29,7 +29,6 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
@@ -60,11 +59,6 @@ public class TestNodeClusterCoordinator {
     private ClusterCoordinationProtocolSenderListener senderListener;
     private List<NodeConnectionStatus> nodeStatuses;
 
-    private Properties createProperties() {
-        final Properties props = new Properties();
-        props.put("nifi.zookeeper.connect.string", "localhost:2181");
-        return props;
-    }
 
     @Before
     public void setup() throws IOException {
@@ -75,7 +69,7 @@ public class TestNodeClusterCoordinator {
         final RevisionManager revisionManager = 
Mockito.mock(RevisionManager.class);
         
Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList());
 
-        coordinator = new NodeClusterCoordinator(senderListener, 
eventReporter, null, revisionManager, createProperties()) {
+        coordinator = new NodeClusterCoordinator(senderListener, 
eventReporter, null, null, revisionManager) {
             @Override
             void notifyOthersOfNodeStatusChange(NodeConnectionStatus 
updatedStatus, boolean notifyAllNodes, boolean waitForCoordinator) {
                 nodeStatuses.add(updatedStatus);
@@ -92,10 +86,10 @@ public class TestNodeClusterCoordinator {
     public void testConnectionResponseIndicatesAllNodes() throws IOException {
         // Add a disconnected node
         coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(1), 
DisconnectionCode.LACK_OF_HEARTBEAT));
-        coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(2), 
NodeConnectionState.DISCONNECTING, Collections.emptySet()));
-        coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(3), 
NodeConnectionState.CONNECTING, Collections.emptySet()));
-        coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(4), 
NodeConnectionState.CONNECTED, Collections.emptySet()));
-        coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(5), 
NodeConnectionState.CONNECTED, Collections.emptySet()));
+        coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(2), 
NodeConnectionState.DISCONNECTING));
+        coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(3), 
NodeConnectionState.CONNECTING));
+        coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(4), 
NodeConnectionState.CONNECTED));
+        coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(5), 
NodeConnectionState.CONNECTED));
 
         // Create a connection request message and send to the coordinator
         final NodeIdentifier requestedNodeId = createNodeId(6);
@@ -130,7 +124,7 @@ public class TestNodeClusterCoordinator {
         final RevisionManager revisionManager = 
Mockito.mock(RevisionManager.class);
         
Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList());
 
-        final NodeClusterCoordinator coordinator = new 
NodeClusterCoordinator(senderListener, eventReporter, null, revisionManager, 
createProperties()) {
+        final NodeClusterCoordinator coordinator = new 
NodeClusterCoordinator(senderListener, eventReporter, null, null, 
revisionManager) {
             @Override
             void notifyOthersOfNodeStatusChange(NodeConnectionStatus 
updatedStatus, boolean notifyAllNodes, boolean waitForCoordinator) {
             }
@@ -168,7 +162,7 @@ public class TestNodeClusterCoordinator {
         final RevisionManager revisionManager = 
Mockito.mock(RevisionManager.class);
         
Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList());
 
-        final NodeClusterCoordinator coordinator = new 
NodeClusterCoordinator(senderListener, eventReporter, null, revisionManager, 
createProperties()) {
+        final NodeClusterCoordinator coordinator = new 
NodeClusterCoordinator(senderListener, eventReporter, null, null, 
revisionManager) {
             @Override
             void notifyOthersOfNodeStatusChange(NodeConnectionStatus 
updatedStatus, boolean notifyAllNodes, boolean waitForCoordinator) {
             }
@@ -263,10 +257,10 @@ public class TestNodeClusterCoordinator {
     public void testGetConnectionStates() throws IOException {
         // Add a disconnected node
         coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(1), 
DisconnectionCode.LACK_OF_HEARTBEAT));
-        coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(2), 
NodeConnectionState.DISCONNECTING, Collections.emptySet()));
-        coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(3), 
NodeConnectionState.CONNECTING, Collections.emptySet()));
-        coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(4), 
NodeConnectionState.CONNECTED, Collections.emptySet()));
-        coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(5), 
NodeConnectionState.CONNECTED, Collections.emptySet()));
+        coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(2), 
NodeConnectionState.DISCONNECTING));
+        coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(3), 
NodeConnectionState.CONNECTING));
+        coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(4), 
NodeConnectionState.CONNECTED));
+        coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(5), 
NodeConnectionState.CONNECTED));
 
         final Map<NodeConnectionState, List<NodeIdentifier>> stateMap = 
coordinator.getConnectionStates();
         assertEquals(4, stateMap.size());
@@ -293,10 +287,10 @@ public class TestNodeClusterCoordinator {
     public void testGetNodeIdentifiers() throws IOException {
         // Add a disconnected node
         coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(1), 
DisconnectionCode.LACK_OF_HEARTBEAT));
-        coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(2), 
NodeConnectionState.DISCONNECTING, Collections.emptySet()));
-        coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(3), 
NodeConnectionState.CONNECTING, Collections.emptySet()));
-        coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(4), 
NodeConnectionState.CONNECTED, Collections.emptySet()));
-        coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(5), 
NodeConnectionState.CONNECTED, Collections.emptySet()));
+        coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(2), 
NodeConnectionState.DISCONNECTING));
+        coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(3), 
NodeConnectionState.CONNECTING));
+        coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(4), 
NodeConnectionState.CONNECTED));
+        coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(5), 
NodeConnectionState.CONNECTED));
 
         final Set<NodeIdentifier> connectedIds = 
coordinator.getNodeIdentifiers(NodeConnectionState.CONNECTED);
         assertEquals(2, connectedIds.size());
@@ -321,8 +315,8 @@ public class TestNodeClusterCoordinator {
     public void testRequestNodeDisconnect() throws InterruptedException {
         // Add a connected node
         final NodeIdentifier nodeId1 = createNodeId(1);
-        coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId1, 
NodeConnectionState.CONNECTED, Collections.emptySet()));
-        coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(2), 
NodeConnectionState.CONNECTED, Collections.emptySet()));
+        coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId1, 
NodeConnectionState.CONNECTED));
+        coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(2), 
NodeConnectionState.CONNECTED));
 
         // wait for the status change message and clear it
         while (nodeStatuses.isEmpty()) {
@@ -347,8 +341,8 @@ public class TestNodeClusterCoordinator {
         // Add a connected node
         final NodeIdentifier nodeId1 = createNodeId(1);
         final NodeIdentifier nodeId2 = createNodeId(2);
-        coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId1, 
NodeConnectionState.CONNECTED, Collections.emptySet()));
-        coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId2, 
NodeConnectionState.CONNECTED, Collections.emptySet()));
+        coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId1, 
NodeConnectionState.CONNECTED));
+        coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId2, 
NodeConnectionState.CONNECTED));
 
         // wait for the status change message and clear it
         while (nodeStatuses.isEmpty()) {
@@ -376,8 +370,8 @@ public class TestNodeClusterCoordinator {
         final NodeIdentifier nodeId1 = createNodeId(1);
         final NodeIdentifier nodeId2 = createNodeId(2);
 
-        coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId1, 
NodeConnectionState.CONNECTED, Collections.emptySet()));
-        coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId2, 
NodeConnectionState.CONNECTED, Collections.emptySet()));
+        coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId1, 
NodeConnectionState.CONNECTED));
+        coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId2, 
NodeConnectionState.CONNECTED));
 
         // wait for the status change message and clear it
         while (nodeStatuses.size() < 2) {
@@ -386,7 +380,7 @@ public class TestNodeClusterCoordinator {
         nodeStatuses.clear();
 
         final NodeConnectionStatus oldStatus = new NodeConnectionStatus(-1L, 
nodeId1, NodeConnectionState.DISCONNECTED,
-            DisconnectionCode.BLOCKED_BY_FIREWALL, null, 0L, null);
+            DisconnectionCode.BLOCKED_BY_FIREWALL, null, 0L);
         final NodeStatusChangeMessage msg = new NodeStatusChangeMessage();
         msg.setNodeId(nodeId1);
         msg.setNodeConnectionStatus(oldStatus);
@@ -397,61 +391,6 @@ public class TestNodeClusterCoordinator {
         assertTrue(nodeStatuses.isEmpty());
     }
 
-    @Test(timeout = 5000)
-    public void testUpdateNodeRoles() throws InterruptedException {
-        // Add a connected node
-        final NodeIdentifier nodeId1 = createNodeId(1);
-        final NodeIdentifier nodeId2 = createNodeId(2);
-
-        coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId1, 
NodeConnectionState.CONNECTED, Collections.emptySet()));
-        // wait for the status change message and clear it
-        while (nodeStatuses.isEmpty()) {
-            Thread.sleep(10L);
-        }
-        nodeStatuses.clear();
-
-        coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId2, 
NodeConnectionState.CONNECTED, Collections.emptySet()));
-        // wait for the status change message and clear it
-        while (nodeStatuses.isEmpty()) {
-            Thread.sleep(10L);
-        }
-        nodeStatuses.clear();
-
-        // Update role of node 1 to primary node
-        coordinator.updateNodeRoles(nodeId1, 
Collections.singleton(ClusterRoles.PRIMARY_NODE));
-
-        // wait for the status change message
-        while (nodeStatuses.isEmpty()) {
-            Thread.sleep(10L);
-        }
-        // verify the message
-        final NodeConnectionStatus status = nodeStatuses.get(0);
-        assertNotNull(status);
-        assertEquals(nodeId1, status.getNodeIdentifier());
-        assertEquals(NodeConnectionState.CONNECTED, status.getState());
-        assertEquals(Collections.singleton(ClusterRoles.PRIMARY_NODE), 
status.getRoles());
-        nodeStatuses.clear();
-
-        // Update role of node 2 to primary node. This should trigger 2 status 
changes -
-        // node 1 should lose primary role & node 2 should gain it
-        coordinator.updateNodeRoles(nodeId2, 
Collections.singleton(ClusterRoles.PRIMARY_NODE));
-
-        // wait for the status change message
-        while (nodeStatuses.size() < 2) {
-            Thread.sleep(10L);
-        }
-
-        final NodeConnectionStatus status1 = nodeStatuses.get(0);
-        final NodeConnectionStatus status2 = nodeStatuses.get(1);
-        final NodeConnectionStatus id1Msg = 
(status1.getNodeIdentifier().equals(nodeId1)) ? status1 : status2;
-        final NodeConnectionStatus id2Msg = 
(status1.getNodeIdentifier().equals(nodeId2)) ? status1 : status2;
-
-        assertNotSame(id1Msg, id2Msg);
-
-        assertTrue(id1Msg.getRoles().isEmpty());
-        assertEquals(Collections.singleton(ClusterRoles.PRIMARY_NODE), 
id2Msg.getRoles());
-    }
-
 
     @Test
     public void testProposedIdentifierResolvedIfConflict() {

http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Cluster.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Cluster.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Cluster.java
index dbd8c00..fd54203 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Cluster.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Cluster.java
@@ -22,6 +22,10 @@ import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryNTimes;
 import org.apache.curator.test.TestingServer;
 import org.apache.nifi.cluster.coordination.node.ClusterRoles;
 import org.apache.nifi.util.NiFiProperties;
@@ -89,6 +93,19 @@ public class Cluster {
         return Collections.unmodifiableSet(nodes);
     }
 
+    public CuratorFramework createCuratorClient() {
+        final RetryPolicy retryPolicy = new RetryNTimes(20, 500);
+        final CuratorFramework curatorClient = 
CuratorFrameworkFactory.builder()
+            .connectString(getZooKeeperConnectString())
+            .sessionTimeoutMs(3000)
+            .connectionTimeoutMs(3000)
+            .retryPolicy(retryPolicy)
+            .defaultData(new byte[0])
+            .build();
+
+        curatorClient.start();
+        return curatorClient;
+    }
 
     public Node createNode() {
         
NiFiProperties.getInstance().setProperty(NiFiProperties.ZOOKEEPER_CONNECT_STRING,
 getZooKeeperConnectString());
@@ -104,11 +121,24 @@ public class Cluster {
 
     public Node waitForClusterCoordinator(final long time, final TimeUnit 
timeUnit) {
         return ClusterUtils.waitUntilNonNull(time, timeUnit,
-            () -> getNodes().stream().filter(node -> 
node.getRoles().contains(ClusterRoles.CLUSTER_COORDINATOR)).findFirst().orElse(null));
+            () -> getNodes().stream().filter(node -> 
node.hasRole(ClusterRoles.CLUSTER_COORDINATOR)).findFirst().orElse(null));
     }
 
     public Node waitForPrimaryNode(final long time, final TimeUnit timeUnit) {
         return ClusterUtils.waitUntilNonNull(time, timeUnit,
-            () -> getNodes().stream().filter(node -> 
node.getRoles().contains(ClusterRoles.PRIMARY_NODE)).findFirst().orElse(null));
+            () -> getNodes().stream().filter(node -> 
node.hasRole(ClusterRoles.PRIMARY_NODE)).findFirst().orElse(null));
+    }
+
+    /**
+     * Waits for each node in the cluster to connect. The time given is the 
maximum amount of time to wait for each node to connect, not for
+     * the entire cluster to connect.
+     *
+     * @param time the max amount of time to wait for a node to connect
+     * @param timeUnit the unit of time that the given <code>time</code> value 
represents
+     */
+    public void waitUntilAllNodesConnected(final long time, final TimeUnit 
timeUnit) {
+        for (final Node node : nodes) {
+            node.waitUntilConnected(time, timeUnit);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/ClusterConnectionIT.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/ClusterConnectionIT.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/ClusterConnectionIT.java
index 6881ca2..3439263 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/ClusterConnectionIT.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/ClusterConnectionIT.java
@@ -17,222 +17,219 @@
 
 package org.apache.nifi.cluster.integration;
 
+import static org.junit.Assert.assertEquals;
+
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Stream;
 
 import org.apache.nifi.cluster.coordination.node.ClusterRoles;
+import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
 import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class ClusterConnectionIT {
+    private Cluster cluster;
 
     @BeforeClass
     public static void setup() {
         System.setProperty("nifi.properties.file.path", 
"src/test/resources/conf/nifi.properties");
     }
 
-    @Test(timeout = 20000)
-    public void testSingleNode() throws InterruptedException {
-        final Cluster cluster = new Cluster();
+    @Before
+    public void createCluster() {
+        cluster = new Cluster();
         cluster.start();
+    }
 
-        try {
-            final Node firstNode = cluster.createNode();
-            firstNode.waitUntilConnected(10, TimeUnit.SECONDS);
-
-            
firstNode.waitUntilElectedForRole(ClusterRoles.CLUSTER_COORDINATOR, 10, 
TimeUnit.SECONDS);
-            firstNode.waitUntilElectedForRole(ClusterRoles.PRIMARY_NODE, 10, 
TimeUnit.SECONDS);
-        } finally {
+    @After
+    public void destroyCluster() {
+        if (cluster != null) {
             cluster.stop();
         }
     }
 
+    @Test(timeout = 20000)
+    public void testSingleNode() throws InterruptedException {
+        final Node firstNode = cluster.createNode();
+        firstNode.waitUntilConnected(10, TimeUnit.SECONDS);
+
+        firstNode.waitUntilElectedForRole(ClusterRoles.CLUSTER_COORDINATOR, 
10, TimeUnit.SECONDS);
+        firstNode.waitUntilElectedForRole(ClusterRoles.PRIMARY_NODE, 10, 
TimeUnit.SECONDS);
+    }
+
     @Test(timeout = 60000)
     public void testThreeNodeCluster() throws InterruptedException {
-        final Cluster cluster = new Cluster();
-        cluster.start();
-
-        try {
-            final Node firstNode = cluster.createNode();
-            final Node secondNode = cluster.createNode();
-            final Node thirdNode = cluster.createNode();
-
-            firstNode.waitUntilConnected(10, TimeUnit.SECONDS);
-            System.out.println("**** Node 1 Connected ****");
-            secondNode.waitUntilConnected(10, TimeUnit.SECONDS);
-            System.out.println("**** Node 2 Connected ****");
-            thirdNode.waitUntilConnected(10, TimeUnit.SECONDS);
-            System.out.println("**** Node 3 Connected ****");
-
-            final Node clusterCoordinator = 
cluster.waitForClusterCoordinator(10, TimeUnit.SECONDS);
-            final Node primaryNode = cluster.waitForPrimaryNode(10, 
TimeUnit.SECONDS);
-            System.out.println("\n\n");
-            System.out.println("Cluster Coordinator = " + clusterCoordinator);
-            System.out.println("Primary Node = " + primaryNode);
-            System.out.println("\n\n");
-        } finally {
-            cluster.stop();
-        }
+        cluster.createNode();
+        cluster.createNode();
+        cluster.createNode();
+
+        cluster.waitUntilAllNodesConnected(10, TimeUnit.SECONDS);
+
+        final Node clusterCoordinator = cluster.waitForClusterCoordinator(10, 
TimeUnit.SECONDS);
+        final Node primaryNode = cluster.waitForPrimaryNode(10, 
TimeUnit.SECONDS);
+        System.out.println("\n\n");
+        System.out.println("Cluster Coordinator = " + clusterCoordinator);
+        System.out.println("Primary Node = " + primaryNode);
+        System.out.println("\n\n");
     }
 
     @Test(timeout = 60000)
     public void testNewCoordinatorElected() throws IOException {
-        final Cluster cluster = new Cluster();
-        cluster.start();
-
-        try {
-            final Node firstNode = cluster.createNode();
-            final Node secondNode = cluster.createNode();
+        final Node firstNode = cluster.createNode();
+        final Node secondNode = cluster.createNode();
 
-            firstNode.waitUntilConnected(10, TimeUnit.SECONDS);
-            System.out.println("**** Node 1 Connected ****");
-            secondNode.waitUntilConnected(10, TimeUnit.SECONDS);
-            System.out.println("**** Node 2 Connected ****");
+        cluster.waitUntilAllNodesConnected(10, TimeUnit.SECONDS);
 
-            final Node clusterCoordinator = 
cluster.waitForClusterCoordinator(10, TimeUnit.SECONDS);
-            clusterCoordinator.stop();
+        final Node clusterCoordinator = cluster.waitForClusterCoordinator(10, 
TimeUnit.SECONDS);
+        clusterCoordinator.stop();
 
-            final Node otherNode = firstNode == clusterCoordinator ? 
secondNode : firstNode;
-            
otherNode.waitUntilElectedForRole(ClusterRoles.CLUSTER_COORDINATOR, 10, 
TimeUnit.SECONDS);
-        } finally {
-            cluster.stop();
-        }
+        final Node otherNode = firstNode == clusterCoordinator ? secondNode : 
firstNode;
+        otherNode.waitUntilElectedForRole(ClusterRoles.CLUSTER_COORDINATOR, 
10, TimeUnit.SECONDS);
     }
 
     @Test(timeout = 60000)
     public void testReconnectGetsCorrectClusterTopology() throws IOException {
-        final Cluster cluster = new Cluster();
-        cluster.start();
+        final Node firstNode = cluster.createNode();
+        final Node secondNode = cluster.createNode();
+        final Node thirdNode = cluster.createNode();
 
-        try {
-            final Node firstNode = cluster.createNode();
-            final Node secondNode = cluster.createNode();
-            final Node thirdNode = cluster.createNode();
-
-            firstNode.waitUntilConnected(10, TimeUnit.SECONDS);
-            System.out.println("**** Node 1 Connected ****");
-            secondNode.waitUntilConnected(10, TimeUnit.SECONDS);
-            System.out.println("**** Node 2 Connected ****");
-            thirdNode.waitUntilConnected(10, TimeUnit.SECONDS);
-            System.out.println("**** Node 3 Connected ****");
-
-            // shutdown node
-            secondNode.stop();
-
-            System.out.println("\n\nNode 2 Shut Down\n\n");
-
-            // wait for node 1 and 3 to recognize that node 2 is gone
-            Stream.of(firstNode, thirdNode).forEach(node -> {
-                node.assertNodeDisconnects(secondNode.getIdentifier(), 5, 
TimeUnit.SECONDS);
-            });
-
-            // restart node
-            secondNode.start();
-            System.out.println("\n\nNode 2 Restarted\n\n");
-
-            secondNode.waitUntilConnected(20, TimeUnit.SECONDS);
-            System.out.println("\n\nNode 2 Reconnected\n\n");
-
-            // wait for all 3 nodes to agree that node 2 is connected
-            Stream.of(firstNode, secondNode, thirdNode).forEach(node -> {
-                ClusterUtils.waitUntilConditionMet(5, TimeUnit.SECONDS,
-                    () -> 
firstNode.getClusterCoordinator().getConnectionStatus(secondNode.getIdentifier()).getState()
 == NodeConnectionState.CONNECTED);
-            });
-
-            // Ensure that all 3 nodes see a cluster of 3 connected nodes.
-            Stream.of(firstNode, secondNode, thirdNode).forEach(node -> {
-                node.assertNodeIsConnected(firstNode.getIdentifier());
-                node.assertNodeIsConnected(secondNode.getIdentifier());
-                node.assertNodeIsConnected(thirdNode.getIdentifier());
-            });
-
-            // Ensure that we get both a cluster coordinator and a primary 
node elected
-            cluster.waitForClusterCoordinator(10, TimeUnit.SECONDS);
-            cluster.waitForPrimaryNode(10, TimeUnit.SECONDS);
-        } finally {
-            cluster.stop();
-        }
+        cluster.waitUntilAllNodesConnected(10, TimeUnit.SECONDS);
+
+        // shutdown node
+        secondNode.stop();
+
+        System.out.println("\n\nNode 2 Shut Down\n\n");
+
+        // wait for node 1 and 3 to recognize that node 2 is gone
+        Stream.of(firstNode, thirdNode).forEach(node -> {
+            node.assertNodeDisconnects(secondNode.getIdentifier(), 10, 
TimeUnit.SECONDS);
+        });
+
+        // restart node
+        secondNode.start();
+        System.out.println("\n\nNode 2 Restarted\n\n");
+
+        secondNode.waitUntilConnected(20, TimeUnit.SECONDS);
+        System.out.println("\n\nNode 2 Reconnected\n\n");
+
+        // wait for all 3 nodes to agree that node 2 is connected
+        Stream.of(firstNode, secondNode, thirdNode).forEach(node -> {
+            ClusterUtils.waitUntilConditionMet(5, TimeUnit.SECONDS,
+                () -> 
firstNode.getClusterCoordinator().getConnectionStatus(secondNode.getIdentifier()).getState()
 == NodeConnectionState.CONNECTED);
+        });
+
+        // Ensure that all 3 nodes see a cluster of 3 connected nodes.
+        Stream.of(firstNode, secondNode, thirdNode).forEach(node -> {
+            node.assertNodeIsConnected(firstNode.getIdentifier());
+            node.assertNodeIsConnected(secondNode.getIdentifier());
+            node.assertNodeIsConnected(thirdNode.getIdentifier());
+        });
+
+        // Ensure that we get both a cluster coordinator and a primary node 
elected
+        cluster.waitForClusterCoordinator(10, TimeUnit.SECONDS);
+        cluster.waitForPrimaryNode(10, TimeUnit.SECONDS);
     }
 
 
     @Test(timeout = 60000)
     public void testRestartAllNodes() throws IOException {
-        final Cluster cluster = new Cluster();
-        cluster.start();
-
-        try {
-            final Node firstNode = cluster.createNode();
-            final Node secondNode = cluster.createNode();
-            final Node thirdNode = cluster.createNode();
-
-            firstNode.waitUntilConnected(10, TimeUnit.SECONDS);
-            System.out.println("**** Node 1 Connected ****");
-            secondNode.waitUntilConnected(10, TimeUnit.SECONDS);
-            System.out.println("**** Node 2 Connected ****");
-            thirdNode.waitUntilConnected(10, TimeUnit.SECONDS);
-            System.out.println("**** Node 3 Connected ****");
-
-            // shutdown node
-            firstNode.stop();
-            secondNode.stop();
-            thirdNode.stop();
-
-            System.out.println("\n\nRestarting all nodes\n\n");
-            thirdNode.start();
-            firstNode.start();
-            secondNode.start();
-
-            Stream.of(firstNode, secondNode, thirdNode).forEach(node -> {
-                node.waitUntilConnected(10, TimeUnit.SECONDS);
-            });
-
-            // wait for all 3 nodes to agree that node 2 is connected
-            Stream.of(firstNode, secondNode, thirdNode).forEach(node -> {
-                ClusterUtils.waitUntilConditionMet(5, TimeUnit.SECONDS,
-                    () -> 
firstNode.getClusterCoordinator().getConnectionStatus(secondNode.getIdentifier()).getState()
 == NodeConnectionState.CONNECTED);
-            });
-
-            // Ensure that all 3 nodes see a cluster of 3 connected nodes.
-            Stream.of(firstNode, secondNode, thirdNode).forEach(node -> {
-                node.assertNodeConnects(firstNode.getIdentifier(), 10, 
TimeUnit.SECONDS);
-                node.assertNodeConnects(secondNode.getIdentifier(), 10, 
TimeUnit.SECONDS);
-                node.assertNodeConnects(thirdNode.getIdentifier(), 10, 
TimeUnit.SECONDS);
-            });
-
-            // Ensure that we get both a cluster coordinator and a primary 
node elected
-            cluster.waitForClusterCoordinator(10, TimeUnit.SECONDS);
-            cluster.waitForPrimaryNode(10, TimeUnit.SECONDS);
-        } finally {
-            cluster.stop();
-        }
+        final Node firstNode = cluster.createNode();
+        final Node secondNode = cluster.createNode();
+        final Node thirdNode = cluster.createNode();
+
+        firstNode.waitUntilConnected(10, TimeUnit.SECONDS);
+        System.out.println("**** Node 1 Connected ****");
+        secondNode.waitUntilConnected(10, TimeUnit.SECONDS);
+        System.out.println("**** Node 2 Connected ****");
+        thirdNode.waitUntilConnected(10, TimeUnit.SECONDS);
+        System.out.println("**** Node 3 Connected ****");
+
+        // shutdown node
+        firstNode.stop();
+        secondNode.stop();
+        thirdNode.stop();
+
+        System.out.println("\n\nRestarting all nodes\n\n");
+        thirdNode.start();
+        firstNode.start();
+        secondNode.start();
+
+        cluster.waitUntilAllNodesConnected(10, TimeUnit.SECONDS);
+
+        // wait for all 3 nodes to agree that node 2 is connected
+        Stream.of(firstNode, secondNode, thirdNode).forEach(node -> {
+            ClusterUtils.waitUntilConditionMet(5, TimeUnit.SECONDS,
+                () -> 
firstNode.getClusterCoordinator().getConnectionStatus(secondNode.getIdentifier()).getState()
 == NodeConnectionState.CONNECTED);
+        });
+
+        // Ensure that all 3 nodes see a cluster of 3 connected nodes.
+        Stream.of(firstNode, secondNode, thirdNode).forEach(node -> {
+            node.assertNodeConnects(firstNode.getIdentifier(), 10, 
TimeUnit.SECONDS);
+            node.assertNodeConnects(secondNode.getIdentifier(), 10, 
TimeUnit.SECONDS);
+            node.assertNodeConnects(thirdNode.getIdentifier(), 10, 
TimeUnit.SECONDS);
+        });
+
+        // Ensure that we get both a cluster coordinator and a primary node 
elected
+        cluster.waitForClusterCoordinator(10, TimeUnit.SECONDS);
+        cluster.waitForPrimaryNode(10, TimeUnit.SECONDS);
     }
 
 
     @Test(timeout = 30000)
     public void testHeartbeatsMonitored() throws IOException {
-        final Cluster cluster = new Cluster();
-        cluster.start();
+        final Node firstNode = cluster.createNode();
+        final Node secondNode = cluster.createNode();
 
-        try {
-            final Node firstNode = cluster.createNode();
-            final Node secondNode = cluster.createNode();
+        cluster.waitUntilAllNodesConnected(10, TimeUnit.SECONDS);
 
-            firstNode.waitUntilConnected(10, TimeUnit.SECONDS);
-            secondNode.waitUntilConnected(10, TimeUnit.SECONDS);
+        final Node nodeToSuspend = firstNode;
+        final Node otherNode = secondNode;
 
-            secondNode.suspendHeartbeating();
+        nodeToSuspend.suspendHeartbeating();
 
-            // Heartbeat interval in nifi.properties is set to 1 sec. This 
means that the node should be kicked out
-            // due to lack of heartbeat after 8 times this amount of time, or 
8 seconds.
-            firstNode.assertNodeDisconnects(secondNode.getIdentifier(), 12, 
TimeUnit.SECONDS);
+        // Heartbeat interval in nifi.properties is set to 1 sec. This means 
that the node should be kicked out
+        // due to lack of heartbeat after 8 times this amount of time, or 8 
seconds.
+        otherNode.assertNodeDisconnects(nodeToSuspend.getIdentifier(), 12, 
TimeUnit.SECONDS);
 
-            secondNode.resumeHeartbeating();
-            firstNode.assertNodeConnects(secondNode.getIdentifier(), 10, 
TimeUnit.SECONDS);
-        } finally {
-            cluster.stop();
-        }
+        nodeToSuspend.resumeHeartbeating();
+        otherNode.assertNodeConnects(nodeToSuspend.getIdentifier(), 10, 
TimeUnit.SECONDS);
     }
 
+    @Test
+    public void testNodeInheritsClusterTopologyOnHeartbeat() throws 
InterruptedException {
+        final Node node1 = cluster.createNode();
+        final Node node2 = cluster.createNode();
+        final Node node3 = cluster.createNode();
+
+        cluster.waitUntilAllNodesConnected(10, TimeUnit.SECONDS);
+        final Node coordinator = cluster.waitForClusterCoordinator(10, 
TimeUnit.SECONDS);
+
+        final NodeIdentifier node4NotReallyInCluster = new 
NodeIdentifier(UUID.randomUUID().toString(), "localhost", 9283, "localhost", 
9284, "localhost", 9285, null, false, null);
+
+        final Map<NodeIdentifier, NodeConnectionStatus> replacementStatuses = 
new HashMap<>();
+        replacementStatuses.put(node1.getIdentifier(), new 
NodeConnectionStatus(node1.getIdentifier(), 
DisconnectionCode.USER_DISCONNECTED));
+        replacementStatuses.put(node4NotReallyInCluster, new 
NodeConnectionStatus(node4NotReallyInCluster, NodeConnectionState.CONNECTING));
+
+        // reset coordinator status so that other nodes with get its now-fake 
view of the cluster
+        
coordinator.getClusterCoordinator().resetNodeStatuses(replacementStatuses);
+        final List<NodeConnectionStatus> expectedStatuses = 
coordinator.getClusterCoordinator().getConnectionStatuses();
+
+        // give nodes a bit to heartbeat in. We need to wait long enough that 
each node heartbeats.
+        // But we need to not wait more than 8 seconds because that's when 
nodes start getting kicked out.
+        Thread.sleep(6000L);
+
+        for (final Node node : new Node[] {node1, node2, node3}) {
+            assertEquals(expectedStatuses, 
node.getClusterCoordinator().getConnectionStatuses());
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/ClusterUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/ClusterUtils.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/ClusterUtils.java
index 972d2c7..98d5cb3 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/ClusterUtils.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/ClusterUtils.java
@@ -24,13 +24,21 @@ import java.util.function.Supplier;
 public class ClusterUtils {
 
     public static void waitUntilConditionMet(final long time, final TimeUnit 
timeUnit, final BooleanSupplier test) {
+        waitUntilConditionMet(time, timeUnit, test, null);
+    }
+
+    public static void waitUntilConditionMet(final long time, final TimeUnit 
timeUnit, final BooleanSupplier test, final Supplier<String> 
errorMessageSupplier) {
         final long nanosToWait = timeUnit.toNanos(time);
         final long start = System.nanoTime();
         final long maxTime = start + nanosToWait;
 
         while (!test.getAsBoolean()) {
             if (System.nanoTime() > maxTime) {
-                throw new AssertionError("Condition never occurred after 
waiting " + time + " " + timeUnit);
+                if (errorMessageSupplier == null) {
+                    throw new AssertionError("Condition never occurred after 
waiting " + time + " " + timeUnit);
+                } else {
+                    throw new AssertionError("Condition never occurred after 
waiting " + time + " " + timeUnit + " : " + errorMessageSupplier.get());
+                }
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java
index 5bfe83c..2996442 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java
@@ -18,11 +18,11 @@
 package org.apache.nifi.cluster.integration;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -30,15 +30,15 @@ import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.nifi.authorization.Authorizer;
 import org.apache.nifi.cluster.ReportedEvent;
-import org.apache.nifi.cluster.coordination.ClusterCoordinator;
 import 
org.apache.nifi.cluster.coordination.heartbeat.ClusterProtocolHeartbeatMonitor;
 import org.apache.nifi.cluster.coordination.heartbeat.HeartbeatMonitor;
-import org.apache.nifi.cluster.coordination.node.CuratorNodeProtocolSender;
+import 
org.apache.nifi.cluster.coordination.node.LeaderElectionNodeProtocolSender;
 import org.apache.nifi.cluster.coordination.node.NodeClusterCoordinator;
 import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
 import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
 import org.apache.nifi.cluster.protocol.ClusterCoordinationProtocolSender;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.cluster.protocol.NodeProtocolSender;
 import org.apache.nifi.cluster.protocol.ProtocolContext;
 import org.apache.nifi.cluster.protocol.ProtocolListener;
 import 
org.apache.nifi.cluster.protocol.impl.ClusterCoordinationProtocolSenderListener;
@@ -51,6 +51,8 @@ import 
org.apache.nifi.cluster.protocol.message.ProtocolMessage;
 import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.controller.FlowController;
 import org.apache.nifi.controller.StandardFlowService;
+import org.apache.nifi.controller.leader.election.CuratorLeaderElectionManager;
+import org.apache.nifi.controller.leader.election.LeaderElectionManager;
 import org.apache.nifi.controller.repository.FlowFileEventRepository;
 import org.apache.nifi.encrypt.StringEncryptor;
 import org.apache.nifi.engine.FlowEngine;
@@ -74,9 +76,10 @@ public class Node {
     private final RevisionManager revisionManager;
 
     private NodeClusterCoordinator clusterCoordinator;
-    private CuratorNodeProtocolSender protocolSender;
+    private NodeProtocolSender protocolSender;
     private FlowController flowController;
     private StandardFlowService flowService;
+    private LeaderElectionManager electionManager;
 
     private ProtocolListener protocolListener;
 
@@ -97,6 +100,8 @@ public class Node {
 
         revisionManager = Mockito.mock(RevisionManager.class);
         
Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.<Revision>
 emptyList());
+
+        electionManager = new CuratorLeaderElectionManager(4, nodeProperties);
     }
 
 
@@ -110,7 +115,8 @@ public class Node {
 
         final HeartbeatMonitor heartbeatMonitor = createHeartbeatMonitor();
         flowController = 
FlowController.createClusteredInstance(Mockito.mock(FlowFileEventRepository.class),
 nodeProperties,
-            null, null, StringEncryptor.createEncryptor(), protocolSender, 
Mockito.mock(BulletinRepository.class), clusterCoordinator, heartbeatMonitor, 
VariableRegistry.EMPTY_REGISTRY);
+            null, null, StringEncryptor.createEncryptor(), protocolSender, 
Mockito.mock(BulletinRepository.class), clusterCoordinator,
+            heartbeatMonitor, electionManager, 
VariableRegistry.EMPTY_REGISTRY);
 
         try {
             flowController.initializeFlow();
@@ -195,23 +201,18 @@ public class Node {
         }
     }
 
-    public Set<String> getRoles() {
-        final NodeConnectionStatus status = getConnectionStatus();
-        return status == null ? Collections.emptySet() : status.getRoles();
-    }
-
     public NodeConnectionStatus getConnectionStatus() {
         return clusterCoordinator.getConnectionStatus(nodeId);
     }
 
     @SuppressWarnings("unchecked")
-    private CuratorNodeProtocolSender createNodeProtocolSender() {
+    private NodeProtocolSender createNodeProtocolSender() {
         final SocketConfiguration socketConfig = new SocketConfiguration();
         socketConfig.setSocketTimeout(3000);
         socketConfig.setReuseAddress(true);
 
         final ProtocolContext<ProtocolMessage> protocolContext = new 
JaxbProtocolContext<>(JaxbProtocolUtils.JAXB_CONTEXT);
-        final CuratorNodeProtocolSender protocolSender = new 
CuratorNodeProtocolSender(socketConfig, protocolContext, nodeProperties);
+        final NodeProtocolSender protocolSender = new 
LeaderElectionNodeProtocolSender(socketConfig, protocolContext, 
electionManager);
         return protocolSender;
     }
 
@@ -250,11 +251,11 @@ public class Node {
         }
 
         final ClusterCoordinationProtocolSenderListener protocolSenderListener 
= new 
ClusterCoordinationProtocolSenderListener(createCoordinatorProtocolSender(), 
protocolListener);
-        return new NodeClusterCoordinator(protocolSenderListener, 
eventReporter, null, revisionManager, nodeProperties);
+        return new NodeClusterCoordinator(protocolSenderListener, 
eventReporter, electionManager, null, revisionManager);
     }
 
 
-    public ClusterCoordinator getClusterCoordinator() {
+    public NodeClusterCoordinator getClusterCoordinator() {
         return clusterCoordinator;
     }
 
@@ -278,8 +279,22 @@ public class Node {
         ClusterUtils.waitUntilConditionMet(time, timeUnit, () -> 
isConnected());
     }
 
+    private String getClusterAddress() {
+        final InetSocketAddress address = 
nodeProperties.getClusterNodeProtocolAddress();
+        return address.getHostName() + ":" + address.getPort();
+    }
+
+    public boolean hasRole(final String roleName) {
+        final String leaderAddress = electionManager.getLeader(roleName);
+        if (leaderAddress == null) {
+            return false;
+        }
+
+        return leaderAddress.equals(getClusterAddress());
+    }
+
     public void waitUntilElectedForRole(final String roleName, final long 
time, final TimeUnit timeUnit) {
-        ClusterUtils.waitUntilConditionMet(time, timeUnit, () -> 
getRoles().contains(roleName));
+        ClusterUtils.waitUntilConditionMet(time, timeUnit, () -> 
hasRole(roleName));
     }
 
     // Assertions
@@ -292,7 +307,8 @@ public class Node {
      */
     public void assertNodeConnects(final NodeIdentifier nodeId, final long 
time, final TimeUnit timeUnit) {
         ClusterUtils.waitUntilConditionMet(time, timeUnit,
-            () -> 
getClusterCoordinator().getConnectionStatus(nodeId).getState() == 
NodeConnectionState.CONNECTED);
+            () -> 
getClusterCoordinator().getConnectionStatus(nodeId).getState() == 
NodeConnectionState.CONNECTED,
+            () -> "Connection Status is " + 
getClusterCoordinator().getConnectionStatus(nodeId).toString());
     }
 
 
@@ -305,7 +321,8 @@ public class Node {
      */
     public void assertNodeDisconnects(final NodeIdentifier nodeId, final long 
time, final TimeUnit timeUnit) {
         ClusterUtils.waitUntilConditionMet(time, timeUnit,
-            () -> 
getClusterCoordinator().getConnectionStatus(nodeId).getState() == 
NodeConnectionState.DISCONNECTED);
+            () -> 
getClusterCoordinator().getConnectionStatus(nodeId).getState() == 
NodeConnectionState.DISCONNECTED,
+            () -> "Connection Status is " + 
getClusterCoordinator().getConnectionStatus(nodeId).toString());
     }
 
 

Reply via email to