NIFI-2574 merging master with cluster changes to updated NiFiProperties approach


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/6bf7e7f3
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/6bf7e7f3
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/6bf7e7f3

Branch: refs/heads/master
Commit: 6bf7e7f3251068d245903b1740e821457fb14d6c
Parents: 7d7401a e42ea9a
Author: joewitt <joew...@apache.org>
Authored: Wed Aug 17 01:23:54 2016 -0700
Committer: joewitt <joew...@apache.org>
Committed: Wed Aug 17 01:23:54 2016 -0700

----------------------------------------------------------------------
 .../coordination/ClusterCoordinator.java        |  44 ++-
 .../heartbeat/HeartbeatMonitor.java             |   5 +
 .../coordination/heartbeat/NodeHeartbeat.java   |   7 -
 .../cluster/coordination/node/ClusterRoles.java |  10 +
 .../coordination/node/NodeConnectionStatus.java |  36 +-
 .../protocol/AbstractNodeProtocolSender.java    |  24 +-
 .../apache/nifi/cluster/protocol/Heartbeat.java |  11 +-
 .../nifi/cluster/protocol/HeartbeatPayload.java | 128 ++++++++
 .../cluster/protocol/NodeProtocolSender.java    |   5 +-
 .../impl/NodeProtocolSenderListener.java        |   5 +-
 .../protocol/jaxb/message/AdaptedHeartbeat.java |  11 -
 .../message/AdaptedNodeConnectionStatus.java    |  11 -
 .../protocol/jaxb/message/HeartbeatAdapter.java |   5 +-
 .../message/NodeConnectionStatusAdapter.java    |   4 +-
 .../protocol/jaxb/message/ObjectFactory.java    |   4 +
 .../message/HeartbeatResponseMessage.java       |  45 +++
 .../protocol/message/ProtocolMessage.java       |   1 +
 .../resources/nifi-cluster-protocol-context.xml |   7 +
 .../jaxb/message/TestJaxbProtocolUtils.java     |  28 ++
 .../heartbeat/AbstractHeartbeatMonitor.java     |   2 -
 .../ClusterProtocolHeartbeatMonitor.java        | 136 +++-----
 .../heartbeat/StandardNodeHeartbeat.java        |  18 +-
 .../node/LeaderElectionNodeProtocolSender.java  |  80 +++++
 .../node/NodeClusterCoordinator.java            | 322 +++++-------------
 .../NodeClusterCoordinatorFactoryBean.java      |   4 +-
 .../resources/nifi-cluster-manager-context.xml  |   6 +
 .../heartbeat/TestAbstractHeartbeatMonitor.java |  31 +-
 .../TestThreadPoolRequestReplicator.java        |   5 +-
 .../node/TestNodeClusterCoordinator.java        |  99 ++----
 .../nifi/cluster/integration/Cluster.java       |  34 +-
 .../integration/ClusterConnectionIT.java        | 327 +++++++++----------
 .../nifi/cluster/integration/ClusterUtils.java  |  10 +-
 .../apache/nifi/cluster/integration/Node.java   |  50 ++-
 .../apache/nifi/cluster/HeartbeatPayload.java   | 118 -------
 .../apache/nifi/controller/FlowController.java  |  45 ++-
 .../nifi/controller/StandardFlowService.java    |   5 +-
 .../controller/StandardFlowSynchronizer.java    |  83 ++---
 .../cluster/ClusterProtocolHeartbeater.java     | 105 +++---
 .../election/CuratorLeaderElectionManager.java  |  69 +++-
 .../leader/election/LeaderElectionManager.java  |  24 +-
 .../StandaloneLeaderElectionManager.java        |  65 ++++
 .../nifi/fingerprint/FingerprintFactory.java    |   1 +
 .../nifi/spring/FlowControllerFactoryBean.java  |  14 +-
 .../LeaderElectionManagerFactoryBean.java       |  57 ++++
 .../src/main/resources/nifi-context.xml         |   1 +
 .../nifi/cluster/HeartbeatPayloadTest.java      |   1 +
 .../java/org/apache/nifi/nar/NarCloseable.java  |  29 ++
 .../nifi/web/StandardNiFiServiceFacade.java     |  42 ++-
 .../src/main/resources/nifi-web-api-context.xml |   1 +
 49 files changed, 1169 insertions(+), 1006 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/6bf7e7f3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
----------------------------------------------------------------------
diff --cc 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
index d2db59e,e304c23..8b0dc97
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
@@@ -253,13 -256,11 +253,11 @@@ public abstract class AbstractHeartbeat
              clusterCoordinator.finishNodeConnection(nodeId);
              clusterCoordinator.reportEvent(nodeId, Severity.INFO, "Received 
first heartbeat from connecting node. Node connected.");
          }
- 
-         clusterCoordinator.updateNodeRoles(nodeId, heartbeat.getRoles());
      }
  
 -
      /**
 -     * @return the most recent heartbeat information for each node in the 
cluster
 +     * @return the most recent heartbeat information for each node in the
 +     * cluster
       */
      protected abstract Map<NodeIdentifier, NodeHeartbeat> 
getLatestHeartbeats();
  

http://git-wip-us.apache.org/repos/asf/nifi/blob/6bf7e7f3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
----------------------------------------------------------------------
diff --cc 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
index 95b2045,b955bd0..20fbfd2
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
@@@ -14,13 -14,14 +14,12 @@@
   * See the License for the specific language governing permissions and
   * limitations under the License.
   */
 -
  package org.apache.nifi.cluster.coordination.heartbeat;
  
- import java.nio.charset.StandardCharsets;
+ import java.util.ArrayList;
  import java.util.Collections;
- import java.util.HashMap;
- import java.util.HashSet;
+ import java.util.List;
  import java.util.Map;
 -import java.util.Properties;
  import java.util.Set;
  import java.util.concurrent.ConcurrentHashMap;
  import java.util.concurrent.ConcurrentMap;
@@@ -52,20 -49,11 +47,13 @@@ import org.slf4j.Logger
  import org.slf4j.LoggerFactory;
  
  /**
 - * Uses Apache ZooKeeper to advertise the address to send heartbeats to, and 
then relies on the NiFi Cluster
 - * Protocol to receive heartbeat messages from nodes in the cluster.
 + * Uses Apache ZooKeeper to advertise the address to send heartbeats to, and
 + * then relies on the NiFi Cluster Protocol to receive heartbeat messages from
 + * nodes in the cluster.
   */
  public class ClusterProtocolHeartbeatMonitor extends AbstractHeartbeatMonitor 
implements HeartbeatMonitor, ProtocolHandler {
 +
      protected static final Logger logger = 
LoggerFactory.getLogger(ClusterProtocolHeartbeatMonitor.class);
-     private static final String COORDINATOR_ZNODE_NAME = "coordinator";
- 
-     private final ZooKeeperClientConfig zkClientConfig;
-     private final String clusterNodesPath;
- 
-     private volatile Map<String, NodeIdentifier> clusterNodeIds = new 
HashMap<>();
-     private volatile CuratorFramework curatorClient;
  
      private final String heartbeatAddress;
      private final ConcurrentMap<NodeIdentifier, NodeHeartbeat> 
heartbeatMessages = new ConcurrentHashMap<>();
@@@ -81,14 -69,13 +69,12 @@@
          }
      }
  
 -
 -    public ClusterProtocolHeartbeatMonitor(final ClusterCoordinator 
clusterCoordinator, final ProtocolListener protocolListener, final Properties 
properties) {
 -        super(clusterCoordinator, properties);
 +    public ClusterProtocolHeartbeatMonitor(final ClusterCoordinator 
clusterCoordinator, final ProtocolListener protocolListener, final 
NiFiProperties nifiProperties) {
 +        super(clusterCoordinator, nifiProperties);
  
          protocolListener.addHandler(this);
-         this.zkClientConfig = 
ZooKeeperClientConfig.createConfig(nifiProperties);
-         this.clusterNodesPath = zkClientConfig.resolvePath("cluster/nodes");
  
 -        String hostname = 
properties.getProperty(NiFiProperties.CLUSTER_NODE_ADDRESS);
 +        String hostname = 
nifiProperties.getProperty(NiFiProperties.CLUSTER_NODE_ADDRESS);
          if (hostname == null || hostname.trim().isEmpty()) {
              hostname = "localhost";
          }
@@@ -129,51 -111,9 +110,9 @@@
          heartbeatMessages.clear();
          for (final NodeIdentifier nodeId : 
clusterCoordinator.getNodeIdentifiers()) {
              final NodeHeartbeat heartbeat = new StandardNodeHeartbeat(nodeId, 
System.currentTimeMillis(),
-                     clusterCoordinator.getConnectionStatus(nodeId), 
Collections.emptySet(), 0, 0L, 0, System.currentTimeMillis());
 -                clusterCoordinator.getConnectionStatus(nodeId), 0, 0L, 0, 
System.currentTimeMillis());
++                    clusterCoordinator.getConnectionStatus(nodeId), 0, 0L, 0, 
System.currentTimeMillis());
              heartbeatMessages.put(nodeId, heartbeat);
          }
- 
-         final Thread publishAddress = new Thread(new Runnable() {
-             @Override
-             public void run() {
-                 while (!isStopped()) {
-                     final String path = clusterNodesPath + "/" + 
COORDINATOR_ZNODE_NAME;
-                     try {
-                         try {
-                             curatorClient.setData().forPath(path, 
heartbeatAddress.getBytes(StandardCharsets.UTF_8));
-                             logger.info("Successfully published Cluster 
Heartbeat Monitor Address of {} to ZooKeeper", heartbeatAddress);
-                             return;
-                         } catch (final NoNodeException nne) {
-                             // ensure that parents are created, using a 
wide-open ACL because the parents contain no data
-                             // and the path is not in any way sensitive.
-                             try {
-                                 
curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path);
-                             } catch (final NodeExistsException nee) {
-                                 // This is okay. Node already exists.
-                             }
- 
-                             
curatorClient.create().withMode(CreateMode.EPHEMERAL).forPath(path, 
heartbeatAddress.getBytes(StandardCharsets.UTF_8));
-                             logger.info("Successfully published address as 
heartbeat monitor address at path {} with value {}", path, heartbeatAddress);
- 
-                             return;
-                         }
-                     } catch (Exception e) {
-                         logger.warn("Failed to update ZooKeeper to notify 
nodes of the heartbeat address. Will continue to retry.");
- 
-                         try {
-                             Thread.sleep(2000L);
-                         } catch (final InterruptedException ie) {
-                             Thread.currentThread().interrupt();
-                             return;
-                         }
-                     }
-                 }
-             }
-         });
- 
-         publishAddress.setName("Publish Heartbeat Address");
-         publishAddress.setDaemon(true);
-         publishAddress.start();
      }
  
      @Override
@@@ -194,10 -131,7 +130,6 @@@
          heartbeatMessages.remove(nodeId);
      }
  
-     protected Set<NodeIdentifier> getClusterNodeIds() {
-         return new HashSet<>(clusterNodeIds.values());
-     }
--
      @Override
      public ProtocolMessage handle(final ProtocolMessage msg) throws 
ProtocolException {
          if (msg.getType() != MessageType.HEARTBEAT) {
@@@ -218,11 -151,49 +149,48 @@@
          final long systemStartTime = payload.getSystemStartTime();
  
          final NodeHeartbeat nodeHeartbeat = new StandardNodeHeartbeat(nodeId, 
System.currentTimeMillis(),
-                 connectionStatus, roles, flowFileCount, flowFileBytes, 
activeThreadCount, systemStartTime);
 -            connectionStatus, flowFileCount, flowFileBytes, 
activeThreadCount, systemStartTime);
++                connectionStatus, flowFileCount, flowFileBytes, 
activeThreadCount, systemStartTime);
          heartbeatMessages.put(heartbeat.getNodeIdentifier(), nodeHeartbeat);
          logger.debug("Received new heartbeat from {}", nodeId);
  
-         return null;
+         // Formulate a List of differences between our view of the cluster 
topology and the node's view
+         // and send that back to the node so that it is in-sync with us
+         final List<NodeConnectionStatus> nodeStatusList = 
payload.getClusterStatus();
+         final List<NodeConnectionStatus> updatedStatuses = 
getUpdatedStatuses(nodeStatusList);
+ 
+         final HeartbeatResponseMessage responseMessage = new 
HeartbeatResponseMessage();
+         responseMessage.setUpdatedNodeStatuses(updatedStatuses);
+         return responseMessage;
+     }
+ 
 -
+     private List<NodeConnectionStatus> getUpdatedStatuses(final 
List<NodeConnectionStatus> nodeStatusList) {
+         // Map node's statuses by NodeIdentifier for quick & easy lookup
+         final Map<NodeIdentifier, NodeConnectionStatus> nodeStatusMap = 
nodeStatusList.stream()
 -            .collect(Collectors.toMap(status -> status.getNodeIdentifier(), 
Function.identity()));
++                .collect(Collectors.toMap(status -> 
status.getNodeIdentifier(), Function.identity()));
+ 
+         // Check if our connection status is the same for each Node 
Identifier and if not, add our version of the status
+         // to a List of updated statuses.
+         final List<NodeConnectionStatus> currentStatuses = 
clusterCoordinator.getConnectionStatuses();
+         final List<NodeConnectionStatus> updatedStatuses = new ArrayList<>();
+         for (final NodeConnectionStatus currentStatus : currentStatuses) {
+             final NodeConnectionStatus nodeStatus = 
nodeStatusMap.get(currentStatus.getNodeIdentifier());
+             if (!currentStatus.equals(nodeStatus)) {
+                 updatedStatuses.add(currentStatus);
+             }
+         }
+ 
+         // If the node has any statuses that we do not have, add a REMOVED 
status to the update list
+         final Set<NodeIdentifier> nodeIds = 
currentStatuses.stream().map(status -> 
status.getNodeIdentifier()).collect(Collectors.toSet());
+         for (final NodeConnectionStatus nodeStatus : nodeStatusList) {
+             if (!nodeIds.contains(nodeStatus.getNodeIdentifier())) {
+                 updatedStatuses.add(new 
NodeConnectionStatus(nodeStatus.getNodeIdentifier(), 
NodeConnectionState.REMOVED, null));
+             }
+         }
+ 
+         logger.debug("\n\nCalculated diff between current cluster status and 
node cluster status as follows:\nNode: {}\nSelf: {}\nDifference: {}\n\n",
 -            nodeStatusList, currentStatuses, updatedStatuses);
++                nodeStatusList, currentStatuses, updatedStatuses);
+ 
+         return updatedStatuses;
      }
  
      @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/6bf7e7f3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
----------------------------------------------------------------------
diff --cc 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
index 70338c1,8e580db..2d25c8e
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
@@@ -70,11 -66,7 +65,8 @@@ import org.apache.nifi.controller.leade
  import org.apache.nifi.events.EventReporter;
  import org.apache.nifi.reporting.Severity;
  import org.apache.nifi.services.FlowService;
 +import org.apache.nifi.util.NiFiProperties;
  import org.apache.nifi.web.revision.RevisionManager;
- import org.apache.zookeeper.KeeperException;
- import org.apache.zookeeper.WatchedEvent;
- import org.apache.zookeeper.Watcher;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
@@@ -92,12 -83,8 +84,9 @@@ public class NodeClusterCoordinator imp
      private final EventReporter eventReporter;
      private final ClusterNodeFirewall firewall;
      private final RevisionManager revisionManager;
- 
-     // Curator used to determine which node is coordinator
-     private final CuratorFramework curatorClient;
-     private final String nodesPathPrefix;
-     private final String coordinatorPath;
 +    private final NiFiProperties nifiProperties;
+     private final LeaderElectionManager leaderElectionManager;
+     private final AtomicLong latestUpdateId = new AtomicLong(-1);
  
      private volatile FlowService flowService;
      private volatile boolean connected;
@@@ -107,24 -93,14 +95,15 @@@
      private final ConcurrentMap<NodeIdentifier, NodeConnectionStatus> 
nodeStatuses = new ConcurrentHashMap<>();
      private final ConcurrentMap<NodeIdentifier, CircularFifoQueue<NodeEvent>> 
nodeEvents = new ConcurrentHashMap<>();
  
-     public NodeClusterCoordinator(final 
ClusterCoordinationProtocolSenderListener senderListener, final EventReporter 
eventReporter,
+     public NodeClusterCoordinator(final 
ClusterCoordinationProtocolSenderListener senderListener, final EventReporter 
eventReporter, final LeaderElectionManager leaderElectionManager,
 -        final ClusterNodeFirewall firewall, final RevisionManager 
revisionManager) {
 +            final ClusterNodeFirewall firewall, final RevisionManager 
revisionManager, final NiFiProperties nifiProperties) {
          this.senderListener = senderListener;
          this.flowService = null;
          this.eventReporter = eventReporter;
          this.firewall = firewall;
          this.revisionManager = revisionManager;
 +        this.nifiProperties = nifiProperties;
- 
-         final RetryPolicy retryPolicy = new RetryNTimes(10, 500);
-         final ZooKeeperClientConfig zkConfig = 
ZooKeeperClientConfig.createConfig(nifiProperties);
- 
-         curatorClient = 
CuratorFrameworkFactory.newClient(zkConfig.getConnectString(),
-                 zkConfig.getSessionTimeoutMillis(), 
zkConfig.getConnectionTimeoutMillis(), retryPolicy);
- 
-         curatorClient.start();
-         nodesPathPrefix = zkConfig.resolvePath("cluster/nodes");
-         coordinatorPath = nodesPathPrefix + "/coordinator";
+         this.leaderElectionManager = leaderElectionManager;
  
          senderListener.addHandler(this);
      }
@@@ -140,10 -116,9 +119,8 @@@
          final NodeConnectionStatus shutdownStatus = new 
NodeConnectionStatus(getLocalNodeIdentifier(), DisconnectionCode.NODE_SHUTDOWN);
          updateNodeStatus(shutdownStatus, false);
          logger.info("Successfully notified other nodes that I am shutting 
down");
- 
-         curatorClient.close();
      }
  
 -
      @Override
      public void setLocalNodeIdentifier(final NodeIdentifier nodeId) {
          this.nodeId = nodeId;
@@@ -227,16 -177,30 +179,31 @@@
          }
      }
  
+     @Override
+     public boolean resetNodeStatus(final NodeConnectionStatus 
connectionStatus, final long qualifyingUpdateId) {
+         final NodeIdentifier nodeId = connectionStatus.getNodeIdentifier();
+         final NodeConnectionStatus currentStatus = 
getConnectionStatus(nodeId);
+ 
+         if (currentStatus == null) {
+             return replaceNodeStatus(nodeId, null, connectionStatus);
 -        } else {
 -            if (currentStatus.getUpdateIdentifier() == qualifyingUpdateId) {
 -                return replaceNodeStatus(nodeId, currentStatus, 
connectionStatus);
 -            }
++        } else if (currentStatus.getUpdateIdentifier() == qualifyingUpdateId) 
{
++            return replaceNodeStatus(nodeId, currentStatus, connectionStatus);
+         }
+ 
+         // The update identifier is not the same. We will not replace the 
value
+         return false;
+     }
+ 
      /**
 -     * Attempts to update the nodeStatuses map by changing the value for the 
given node id from the current status to the new status, as in
 -     * ConcurrentMap.replace(nodeId, currentStatus, newStatus) but with the 
difference that this method can handle a <code>null</code> value
 -     * for currentStatus
 +     * Attempts to update the nodeStatuses map by changing the value for the
 +     * given node id from the current status to the new status, as in
 +     * ConcurrentMap.replace(nodeId, currentStatus, newStatus) but with the
 +     * difference that this method can handle a <code>null</code> value for
 +     * currentStatus
       *
       * @param nodeId the node id
 -     * @param currentStatus the current status, or <code>null</code> if there 
is no value currently
 +     * @param currentStatus the current status, or <code>null</code> if there 
is
 +     * no value currently
       * @param newStatus the new status to set
       * @return <code>true</code> if the map was updated, false otherwise
       */
@@@ -272,11 -244,7 +247,6 @@@
          requestReconnectionAsynchronously(request, 10, 5);
      }
  
-     private Set<String> getRoles(final NodeIdentifier nodeId) {
-         final NodeConnectionStatus status = getConnectionStatus(nodeId);
-         return status == null ? Collections.emptySet() : status.getRoles();
-     }
--
      @Override
      public void finishNodeConnection(final NodeIdentifier nodeId) {
          final NodeConnectionState state = getConnectionState(nodeId);
@@@ -298,9 -266,10 +268,9 @@@
          }
  
          logger.info("{} is now connected", nodeId);
-         updateNodeStatus(new NodeConnectionStatus(nodeId, 
NodeConnectionState.CONNECTED, getRoles(nodeId)));
+         updateNodeStatus(new NodeConnectionStatus(nodeId, 
NodeConnectionState.CONNECTED));
      }
  
 -
      @Override
      public void requestNodeDisconnect(final NodeIdentifier nodeId, final 
DisconnectionCode disconnectionCode, final String explanation) {
          final Set<NodeIdentifier> connectedNodeIds = 
getNodeIdentifiers(NodeConnectionState.CONNECTED);
@@@ -405,59 -379,8 +380,7 @@@
          }
      }
  
 -
      @Override
-     public synchronized void updateNodeRoles(final NodeIdentifier nodeId, 
final Set<String> roles) {
-         boolean updated = false;
-         while (!updated) {
-             final NodeConnectionStatus currentStatus = 
nodeStatuses.get(nodeId);
-             if (currentStatus == null) {
-                 throw new UnknownNodeException("Cannot update roles for " + 
nodeId + " to " + roles + " because the node is not part of this cluster");
-             }
- 
-             if (currentStatus.getRoles().equals(roles)) {
-                 logger.debug("Roles for {} already up-to-date as {}", nodeId, 
roles);
-                 return;
-             }
- 
-             final NodeConnectionStatus updatedStatus = new 
NodeConnectionStatus(currentStatus, roles);
-             updated = replaceNodeStatus(nodeId, currentStatus, updatedStatus);
- 
-             if (updated) {
-                 logger.info("Updated Roles of {} from {} to {}", nodeId, 
currentStatus, updatedStatus);
-                 notifyOthersOfNodeStatusChange(updatedStatus);
-             }
-         }
- 
-         // If any other node contains any of the given roles, revoke the role 
from the other node.
-         for (final String role : roles) {
-             for (final Map.Entry<NodeIdentifier, NodeConnectionStatus> entry 
: nodeStatuses.entrySet()) {
-                 if (entry.getKey().equals(nodeId)) {
-                     continue;
-                 }
- 
-                 updated = false;
-                 while (!updated) {
-                     final NodeConnectionStatus status = entry.getValue();
-                     if (status.getRoles().contains(role)) {
-                         final Set<String> newRoles = new 
HashSet<>(status.getRoles());
-                         newRoles.remove(role);
- 
-                         final NodeConnectionStatus updatedStatus = new 
NodeConnectionStatus(status, newRoles);
-                         updated = replaceNodeStatus(entry.getKey(), status, 
updatedStatus);
- 
-                         if (updated) {
-                             logger.info("Updated Roles of {} from {} to {}", 
nodeId, status, updatedStatus);
-                             notifyOthersOfNodeStatusChange(updatedStatus);
-                         }
-                     } else {
-                         updated = true;
-                     }
-                 }
-             }
-         }
-     }
- 
-     @Override
      public NodeIdentifier getNodeIdentifier(final String uuid) {
          for (final NodeIdentifier nodeId : nodeStatuses.keySet()) {
              if (nodeId.getId().equals(uuid)) {
@@@ -468,48 -391,7 +391,6 @@@
          return null;
      }
  
-     // method is synchronized because it modifies local node state and then 
broadcasts the change. We synchronize any time that this
-     // is done so that we don't have an issue where we create a 
NodeConnectionStatus, then another thread creates one and sends it
-     // before the first one is sent (as this results in the first status 
having a larger id, which means that the first status is never
-     // seen by other nodes).
-     @Override
-     public synchronized void addRole(final String clusterRole) {
-         final NodeIdentifier localNodeId = waitForLocalNodeIdentifier();
-         final NodeConnectionStatus status = getConnectionStatus(localNodeId);
-         final Set<String> roles = new HashSet<>();
-         if (status != null) {
-             roles.addAll(status.getRoles());
-         }
- 
-         final boolean roleAdded = roles.add(clusterRole);
- 
-         if (roleAdded) {
-             updateNodeRoles(localNodeId, roles);
-             logger.info("Cluster role {} added. This node is now responsible 
for the following roles: {}", clusterRole, roles);
-         }
-     }
- 
-     // method is synchronized because it modifies local node state and then 
broadcasts the change. We synchronize any time that this
-     // is done so that we don't have an issue where we create a 
NodeConnectionStatus, then another thread creates one and sends it
-     // before the first one is sent (as this results in the first status 
having a larger id, which means that the first status is never
-     // seen by other nodes).
-     @Override
-     public synchronized void removeRole(final String clusterRole) {
-         final NodeIdentifier localNodeId = waitForLocalNodeIdentifier();
-         final NodeConnectionStatus status = getConnectionStatus(localNodeId);
-         final Set<String> roles = new HashSet<>();
-         if (status != null) {
-             roles.addAll(status.getRoles());
-         }
- 
-         final boolean roleRemoved = roles.remove(clusterRole);
- 
-         if (roleRemoved) {
-             updateNodeRoles(localNodeId, roles);
-             logger.info("Cluster role {} removed. This node is now 
responsible for the following roles: {}", clusterRole, roles);
-         }
-     }
--
      @Override
      public Set<NodeIdentifier> getNodeIdentifiers(final 
NodeConnectionState... states) {
          final Set<NodeConnectionState> statesOfInterest = new HashSet<>();
@@@ -531,11 -413,15 +412,15 @@@
  
      @Override
      public NodeIdentifier getPrimaryNode() {
-         return nodeStatuses.values().stream()
-                 .filter(status -> 
status.getRoles().contains(ClusterRoles.PRIMARY_NODE))
+         final String primaryNodeAddress = 
leaderElectionManager.getLeader(ClusterRoles.PRIMARY_NODE);
+         if (primaryNodeAddress == null) {
+             return null;
+         }
+ 
+         return nodeStatuses.keySet().stream()
 -            .filter(nodeId -> 
primaryNodeAddress.equals(nodeId.getSocketAddress() + ":" + 
nodeId.getSocketPort()))
 -            .findFirst()
 -            .orElse(null);
++                .filter(nodeId -> 
primaryNodeAddress.equals(nodeId.getSocketAddress() + ":" + 
nodeId.getSocketPort()))
 +                .findFirst()
-                 .map(status -> status.getNodeIdentifier())
 +                .orElse(null);
      }
  
      @Override
@@@ -899,35 -753,30 +753,30 @@@
          final NodeIdentifier nodeId = statusChangeMessage.getNodeId();
          logger.debug("Handling request {}", statusChangeMessage);
  
-         boolean updated = false;
-         while (!updated) {
-             final NodeConnectionStatus oldStatus = 
nodeStatuses.get(statusChangeMessage.getNodeId());
- 
-             // Either remove the value from the map or update the map 
depending on the connection state
-             if (statusChangeMessage.getNodeConnectionStatus().getState() == 
NodeConnectionState.REMOVED) {
-                 updated = nodeStatuses.remove(nodeId, oldStatus);
-             } else {
-                 updated = replaceNodeStatus(nodeId, oldStatus, updatedStatus);
-             }
+         final NodeConnectionStatus oldStatus = 
nodeStatuses.get(statusChangeMessage.getNodeId());
  
-             if (updated) {
-                 logger.info("Status of {} changed from {} to {}", 
statusChangeMessage.getNodeId(), oldStatus, updatedStatus);
-                 logger.debug("State of cluster nodes is now {}", 
nodeStatuses);
+         // Either remove the value from the map or update the map depending 
on the connection state
+         if (statusChangeMessage.getNodeConnectionStatus().getState() == 
NodeConnectionState.REMOVED) {
 -        nodeStatuses.remove(nodeId, oldStatus);
++            nodeStatuses.remove(nodeId, oldStatus);
+         } else {
 -        nodeStatuses.put(nodeId, updatedStatus);
++            nodeStatuses.put(nodeId, updatedStatus);
+         }
  
-                 final NodeConnectionStatus status = 
statusChangeMessage.getNodeConnectionStatus();
-                 final String summary = summarizeStatusChange(oldStatus, 
status);
-                 if (!StringUtils.isEmpty(summary)) {
-                     addNodeEvent(nodeId, summary);
-                 }
+         logger.info("Status of {} changed from {} to {}", 
statusChangeMessage.getNodeId(), oldStatus, updatedStatus);
+         logger.debug("State of cluster nodes is now {}", nodeStatuses);
  
-                 // Update our counter so that we are in-sync with the cluster 
on the
-                 // most up-to-date version of the NodeConnectionStatus' 
Update Identifier.
-                 // We do this so that we can accurately compare status 
updates that are generated
-                 // locally against those generated from other nodes in the 
cluster.
-                 
NodeConnectionStatus.updateIdGenerator(updatedStatus.getUpdateIdentifier());
-             }
+         final NodeConnectionStatus status = 
statusChangeMessage.getNodeConnectionStatus();
+         final String summary = summarizeStatusChange(oldStatus, status);
+         if (!StringUtils.isEmpty(summary)) {
+             addNodeEvent(nodeId, summary);
          }
  
+         // Update our counter so that we are in-sync with the cluster on the
+         // most up-to-date version of the NodeConnectionStatus' Update 
Identifier.
+         // We do this so that we can accurately compare status updates that 
are generated
+         // locally against those generated from other nodes in the cluster.
+         
NodeConnectionStatus.updateIdGenerator(updatedStatus.getUpdateIdentifier());
+ 
          if (isActiveClusterCoordinator()) {
              
notifyOthersOfNodeStatusChange(statusChangeMessage.getNodeConnectionStatus());
          }
@@@ -1009,8 -858,8 +858,8 @@@
              return new ConnectionResponse(tryAgainSeconds);
          }
  
-         return new ConnectionResponse(resolvedNodeIdentifier, dataFlow, 
instanceId, new ArrayList<>(nodeStatuses.values()),
+         return new ConnectionResponse(resolvedNodeIdentifier, dataFlow, 
instanceId, getConnectionStatuses(),
 -            revisionManager.getAllRevisions().stream().map(rev -> 
ComponentRevision.fromRevision(rev)).collect(Collectors.toList()));
 +                revisionManager.getAllRevisions().stream().map(rev -> 
ComponentRevision.fromRevision(rev)).collect(Collectors.toList()));
      }
  
      private NodeIdentifier addRequestorDn(final NodeIdentifier nodeId, final 
String dn) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/6bf7e7f3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/NodeClusterCoordinatorFactoryBean.java
----------------------------------------------------------------------
diff --cc 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/NodeClusterCoordinatorFactoryBean.java
index b414e0d,0b83c23..fc52ee1
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/NodeClusterCoordinatorFactoryBean.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/NodeClusterCoordinatorFactoryBean.java
@@@ -42,8 -43,9 +43,9 @@@ public class NodeClusterCoordinatorFact
              final EventReporter eventReporter = 
applicationContext.getBean("eventReporter", EventReporter.class);
              final ClusterNodeFirewall clusterFirewall = 
applicationContext.getBean("clusterFirewall", ClusterNodeFirewall.class);
              final RevisionManager revisionManager = 
applicationContext.getBean("revisionManager", RevisionManager.class);
+             final LeaderElectionManager electionManager = 
applicationContext.getBean("leaderElectionManager", 
LeaderElectionManager.class);
  
-             nodeClusterCoordinator = new 
NodeClusterCoordinator(protocolSenderListener, eventReporter, clusterFirewall, 
revisionManager, properties);
 -            nodeClusterCoordinator = new 
NodeClusterCoordinator(protocolSenderListener, eventReporter, electionManager, 
clusterFirewall, revisionManager);
++            nodeClusterCoordinator = new 
NodeClusterCoordinator(protocolSenderListener, eventReporter, electionManager, 
clusterFirewall, revisionManager, properties);
          }
  
          return nodeClusterCoordinator;

http://git-wip-us.apache.org/repos/asf/nifi/blob/6bf7e7f3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/nifi/blob/6bf7e7f3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
----------------------------------------------------------------------
diff --cc 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
index ebefce2,f18d589..9c56fb9
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
@@@ -162,11 -163,10 +161,11 @@@ public class TestThreadPoolRequestRepli
          nodeIds.add(nodeId);
  
          final ClusterCoordinator coordinator = 
Mockito.mock(ClusterCoordinator.class);
-         
Mockito.when(coordinator.getConnectionStatus(Mockito.any(NodeIdentifier.class))).thenReturn(new
 NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTED, 
Collections.emptySet()));
+         
Mockito.when(coordinator.getConnectionStatus(Mockito.any(NodeIdentifier.class))).thenReturn(new
 NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTED));
  
          final AtomicInteger requestCount = new AtomicInteger(0);
 -        final ThreadPoolRequestReplicator replicator = new 
ThreadPoolRequestReplicator(2, new Client(), coordinator, "1 sec", "1 sec", 
null, null) {
 +        final ThreadPoolRequestReplicator replicator
 +                = new ThreadPoolRequestReplicator(2, new Client(), 
coordinator, "1 sec", "1 sec", null, null, 
NiFiProperties.createBasicNiFiProperties(null, null)) {
              @Override
              protected NodeResponse replicateRequest(final WebResource.Builder 
resourceBuilder, final NodeIdentifier nodeId, final String method, final URI 
uri, final String requestId) {
                  // the resource builder will not expose its headers to us, so 
we are using Mockito's Whitebox class to extract them.

http://git-wip-us.apache.org/repos/asf/nifi/blob/6bf7e7f3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
----------------------------------------------------------------------
diff --cc 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
index aaa9dca,91174ca..00edbc4
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
@@@ -78,7 -69,7 +78,7 @@@ public class TestNodeClusterCoordinato
          final RevisionManager revisionManager = 
Mockito.mock(RevisionManager.class);
          
Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList());
  
-         coordinator = new NodeClusterCoordinator(senderListener, 
eventReporter, null, revisionManager, createProperties()) {
 -        coordinator = new NodeClusterCoordinator(senderListener, 
eventReporter, null, null, revisionManager) {
++        coordinator = new NodeClusterCoordinator(senderListener, 
eventReporter, null, null, revisionManager, createProperties()) {
              @Override
              void notifyOthersOfNodeStatusChange(NodeConnectionStatus 
updatedStatus, boolean notifyAllNodes, boolean waitForCoordinator) {
                  nodeStatuses.add(updatedStatus);
@@@ -133,7 -124,7 +133,7 @@@
          final RevisionManager revisionManager = 
Mockito.mock(RevisionManager.class);
          
Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList());
  
-         final NodeClusterCoordinator coordinator = new 
NodeClusterCoordinator(senderListener, eventReporter, null, revisionManager, 
createProperties()) {
 -        final NodeClusterCoordinator coordinator = new 
NodeClusterCoordinator(senderListener, eventReporter, null, null, 
revisionManager) {
++        final NodeClusterCoordinator coordinator = new 
NodeClusterCoordinator(senderListener, eventReporter, null, null, 
revisionManager, createProperties()) {
              @Override
              void notifyOthersOfNodeStatusChange(NodeConnectionStatus 
updatedStatus, boolean notifyAllNodes, boolean waitForCoordinator) {
              }
@@@ -171,7 -162,7 +171,7 @@@
          final RevisionManager revisionManager = 
Mockito.mock(RevisionManager.class);
          
Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList());
  
-         final NodeClusterCoordinator coordinator = new 
NodeClusterCoordinator(senderListener, eventReporter, null, revisionManager, 
createProperties()) {
 -        final NodeClusterCoordinator coordinator = new 
NodeClusterCoordinator(senderListener, eventReporter, null, null, 
revisionManager) {
++        final NodeClusterCoordinator coordinator = new 
NodeClusterCoordinator(senderListener, eventReporter, null, null, 
revisionManager, createProperties()) {
              @Override
              void notifyOthersOfNodeStatusChange(NodeConnectionStatus 
updatedStatus, boolean notifyAllNodes, boolean waitForCoordinator) {
              }
@@@ -396,61 -391,7 +396,6 @@@
          assertTrue(nodeStatuses.isEmpty());
      }
  
-     @Test(timeout = 5000)
-     public void testUpdateNodeRoles() throws InterruptedException {
-         // Add a connected node
-         final NodeIdentifier nodeId1 = createNodeId(1);
-         final NodeIdentifier nodeId2 = createNodeId(2);
- 
-         coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId1, 
NodeConnectionState.CONNECTED, Collections.emptySet()));
-         // wait for the status change message and clear it
-         while (nodeStatuses.isEmpty()) {
-             Thread.sleep(10L);
-         }
-         nodeStatuses.clear();
- 
-         coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId2, 
NodeConnectionState.CONNECTED, Collections.emptySet()));
-         // wait for the status change message and clear it
-         while (nodeStatuses.isEmpty()) {
-             Thread.sleep(10L);
-         }
-         nodeStatuses.clear();
- 
-         // Update role of node 1 to primary node
-         coordinator.updateNodeRoles(nodeId1, 
Collections.singleton(ClusterRoles.PRIMARY_NODE));
- 
-         // wait for the status change message
-         while (nodeStatuses.isEmpty()) {
-             Thread.sleep(10L);
-         }
-         // verify the message
-         final NodeConnectionStatus status = nodeStatuses.get(0);
-         assertNotNull(status);
-         assertEquals(nodeId1, status.getNodeIdentifier());
-         assertEquals(NodeConnectionState.CONNECTED, status.getState());
-         assertEquals(Collections.singleton(ClusterRoles.PRIMARY_NODE), 
status.getRoles());
-         nodeStatuses.clear();
- 
-         // Update role of node 2 to primary node. This should trigger 2 
status changes -
-         // node 1 should lose primary role & node 2 should gain it
-         coordinator.updateNodeRoles(nodeId2, 
Collections.singleton(ClusterRoles.PRIMARY_NODE));
- 
-         // wait for the status change message
-         while (nodeStatuses.size() < 2) {
-             Thread.sleep(10L);
-         }
- 
-         final NodeConnectionStatus status1 = nodeStatuses.get(0);
-         final NodeConnectionStatus status2 = nodeStatuses.get(1);
-         final NodeConnectionStatus id1Msg = 
(status1.getNodeIdentifier().equals(nodeId1)) ? status1 : status2;
-         final NodeConnectionStatus id2Msg = 
(status1.getNodeIdentifier().equals(nodeId2)) ? status1 : status2;
- 
-         assertNotSame(id1Msg, id2Msg);
- 
-         assertTrue(id1Msg.getRoles().isEmpty());
-         assertEquals(Collections.singleton(ClusterRoles.PRIMARY_NODE), 
id2Msg.getRoles());
-     }
--
      @Test
      public void testProposedIdentifierResolvedIfConflict() {
          final NodeIdentifier id1 = new NodeIdentifier("1234", "localhost", 
8000, "localhost", 9000, "localhost", 10000, 11000, false);

http://git-wip-us.apache.org/repos/asf/nifi/blob/6bf7e7f3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Cluster.java
----------------------------------------------------------------------
diff --cc 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Cluster.java
index 5809625,fd54203..a0c8648
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Cluster.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Cluster.java
@@@ -91,13 -93,26 +95,26 @@@ public class Cluster 
          return Collections.unmodifiableSet(nodes);
      }
  
+     public CuratorFramework createCuratorClient() {
+         final RetryPolicy retryPolicy = new RetryNTimes(20, 500);
+         final CuratorFramework curatorClient = 
CuratorFrameworkFactory.builder()
+             .connectString(getZooKeeperConnectString())
+             .sessionTimeoutMs(3000)
+             .connectionTimeoutMs(3000)
+             .retryPolicy(retryPolicy)
+             .defaultData(new byte[0])
+             .build();
+ 
+         curatorClient.start();
+         return curatorClient;
+     }
  
      public Node createNode() {
 -        
NiFiProperties.getInstance().setProperty(NiFiProperties.ZOOKEEPER_CONNECT_STRING,
 getZooKeeperConnectString());
 -        
NiFiProperties.getInstance().setProperty(NiFiProperties.CLUSTER_IS_NODE, 
"true");
 +        final Map<String, String> addProps = new HashMap<>();
 +        addProps.put(NiFiProperties.ZOOKEEPER_CONNECT_STRING, 
getZooKeeperConnectString());
 +        addProps.put(NiFiProperties.CLUSTER_IS_NODE, "true");
  
 -        final NiFiProperties properties = NiFiProperties.getInstance().copy();
 -        final Node node = new Node(properties);
 +        final Node node = new 
Node(NiFiProperties.createBasicNiFiProperties("src/test/resources/conf/nifi.properties",
 addProps));
          node.start();
          nodes.add(node);
  

http://git-wip-us.apache.org/repos/asf/nifi/blob/6bf7e7f3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java
----------------------------------------------------------------------
diff --cc 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java
index 899f312,2996442..a8b51f2
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java
@@@ -127,7 -115,8 +133,8 @@@ public class Node 
  
          final HeartbeatMonitor heartbeatMonitor = createHeartbeatMonitor();
          flowController = 
FlowController.createClusteredInstance(Mockito.mock(FlowFileEventRepository.class),
 nodeProperties,
-             null, null, StringEncryptor.createEncryptor(nodeProperties), 
protocolSender, Mockito.mock(BulletinRepository.class), clusterCoordinator, 
heartbeatMonitor, VariableRegistry.EMPTY_REGISTRY);
 -            null, null, StringEncryptor.createEncryptor(), protocolSender, 
Mockito.mock(BulletinRepository.class), clusterCoordinator,
++            null, null, StringEncryptor.createEncryptor(nodeProperties), 
protocolSender, Mockito.mock(BulletinRepository.class), clusterCoordinator,
+             heartbeatMonitor, electionManager, 
VariableRegistry.EMPTY_REGISTRY);
  
          try {
              flowController.initializeFlow();
@@@ -267,7 -251,7 +269,7 @@@
          }
  
          final ClusterCoordinationProtocolSenderListener 
protocolSenderListener = new 
ClusterCoordinationProtocolSenderListener(createCoordinatorProtocolSender(), 
protocolListener);
-         return new NodeClusterCoordinator(protocolSenderListener, 
eventReporter, null, revisionManager, nodeProperties);
 -        return new NodeClusterCoordinator(protocolSenderListener, 
eventReporter, electionManager, null, revisionManager);
++        return new NodeClusterCoordinator(protocolSenderListener, 
eventReporter, electionManager, null, revisionManager, nodeProperties);
      }
  
  

http://git-wip-us.apache.org/repos/asf/nifi/blob/6bf7e7f3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --cc 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index abb4b78,efdf152..8fb2305
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@@ -388,6 -387,7 +387,7 @@@ public class FlowController implements 
                  bulletinRepo,
                  /* cluster coordinator */ null,
                  /* heartbeat monitor */ null,
 -            /* leader election manager */ null,
++                /* leader election manager */ null,
                  /* variable registry */ variableRegistry);
      }
  
@@@ -401,7 -401,8 +401,8 @@@
              final BulletinRepository bulletinRepo,
              final ClusterCoordinator clusterCoordinator,
              final HeartbeatMonitor heartbeatMonitor,
-             VariableRegistry variableRegistry) {
 -        final LeaderElectionManager leaderElectionManager,
 -        final VariableRegistry variableRegistry) {
++            final LeaderElectionManager leaderElectionManager,
++            final VariableRegistry variableRegistry) {
  
          final FlowController flowController = new FlowController(
                  flowFileEventRepo,
@@@ -413,7 -414,9 +414,9 @@@
                  protocolSender,
                  bulletinRepo,
                  clusterCoordinator,
-                 heartbeatMonitor, variableRegistry);
 -            heartbeatMonitor,
 -            leaderElectionManager,
 -            variableRegistry);
++                heartbeatMonitor,
++                leaderElectionManager,
++                variableRegistry);
  
          return flowController;
      }
@@@ -429,6 -432,7 +432,7 @@@
              final BulletinRepository bulletinRepo,
              final ClusterCoordinator clusterCoordinator,
              final HeartbeatMonitor heartbeatMonitor,
 -        final LeaderElectionManager leaderElectionManager,
++            final LeaderElectionManager leaderElectionManager,
              final VariableRegistry variableRegistry) {
  
          maxTimerDrivenThreads = new AtomicInteger(10);
@@@ -3304,7 -3308,10 +3307,9 @@@
          return configuredForClustering;
      }
  
 -
      private void registerForClusterCoordinator() {
+         final String participantId = heartbeatMonitor.getHeartbeatAddress();
+ 
          leaderElectionManager.register(ClusterRoles.CLUSTER_COORDINATOR, new 
LeaderElectionStateChangeListener() {
              @Override
              public synchronized void onLeaderRelinquish() {

http://git-wip-us.apache.org/repos/asf/nifi/blob/6bf7e7f3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/nifi/blob/6bf7e7f3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/nifi/blob/6bf7e7f3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ClusterProtocolHeartbeater.java
----------------------------------------------------------------------
diff --cc 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ClusterProtocolHeartbeater.java
index 275fd3e,d675d0c..efc0588
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ClusterProtocolHeartbeater.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ClusterProtocolHeartbeater.java
@@@ -17,12 -17,16 +17,16 @@@
  package org.apache.nifi.controller.cluster;
  
  import java.io.IOException;
- import java.nio.charset.StandardCharsets;
 +
- import org.apache.curator.RetryPolicy;
- import org.apache.curator.framework.CuratorFramework;
- import org.apache.curator.framework.CuratorFrameworkFactory;
- import org.apache.curator.retry.RetryNTimes;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.stream.Collectors;
+ 
+ import org.apache.nifi.cluster.coordination.ClusterCoordinator;
+ import org.apache.nifi.cluster.coordination.node.ClusterRoles;
+ import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
+ import org.apache.nifi.cluster.protocol.HeartbeatPayload;
+ import org.apache.nifi.cluster.protocol.NodeIdentifier;
  import org.apache.nifi.cluster.protocol.NodeProtocolSender;
  import org.apache.nifi.cluster.protocol.ProtocolException;
  import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
@@@ -33,13 -36,11 +36,13 @@@ import org.slf4j.Logger
  import org.slf4j.LoggerFactory;
  
  /**
-  * Uses ZooKeeper in order to determine which node is the elected Cluster
-  * Coordinator and to indicate that this node is part of the cluster. However,
-  * once the Cluster Coordinator is known, heartbeats are sent directly to the
 - * Uses Leader Election Manager in order to determine which node is the 
elected Cluster Coordinator and to indicate
 - * that this node is part of the cluster. Once the Cluster Coordinator is 
known, heartbeats are
 - * sent directly to the Cluster Coordinator.
++ * Uses Leader Election Manager in order to determine which node is the 
elected
++ * Cluster Coordinator and to indicate that this node is part of the cluster.
++ * Once the Cluster Coordinator is known, heartbeats are sent directly to the
 + * Cluster Coordinator.
   */
  public class ClusterProtocolHeartbeater implements Heartbeater {
 +
      private static final Logger logger = 
LoggerFactory.getLogger(ClusterProtocolHeartbeater.class);
  
      private final NodeProtocolSender protocolSender;
@@@ -90,19 -66,27 +68,27 @@@
      @Override
      public synchronized void send(final HeartbeatMessage heartbeatMessage) 
throws IOException {
          final String heartbeatAddress = getHeartbeatAddress();
- 
-         try {
-             protocolSender.heartbeat(heartbeatMessage, heartbeatAddress);
-         } catch (final ProtocolException pe) {
-             // a ProtocolException is likely the result of not being able to 
communicate
-             // with the coordinator. If we do get an IOException 
communicating with the coordinator,
-             // it will be the cause of the Protocol Exception. In this case, 
set coordinatorAddress
-             // to null so that we double-check next time that the coordinator 
has not changed.
-             if (pe.getCause() instanceof IOException) {
-                 coordinatorAddress = null;
+         final HeartbeatResponseMessage responseMessage = 
protocolSender.heartbeat(heartbeatMessage, heartbeatAddress);
+ 
+         final byte[] payloadBytes = 
heartbeatMessage.getHeartbeat().getPayload();
+         final HeartbeatPayload payload = 
HeartbeatPayload.unmarshal(payloadBytes);
+         final List<NodeConnectionStatus> nodeStatusList = 
payload.getClusterStatus();
+         final Map<NodeIdentifier, Long> updateIdMap = 
nodeStatusList.stream().collect(
 -            Collectors.toMap(status -> status.getNodeIdentifier(), status -> 
status.getUpdateIdentifier()));
++                Collectors.toMap(status -> status.getNodeIdentifier(), status 
-> status.getUpdateIdentifier()));
+ 
+         final List<NodeConnectionStatus> updatedStatuses = 
responseMessage.getUpdatedNodeStatuses();
+         if (updatedStatuses != null) {
+             for (final NodeConnectionStatus updatedStatus : updatedStatuses) {
+                 final NodeIdentifier nodeId = 
updatedStatus.getNodeIdentifier();
+                 final Long updateId = updateIdMap.get(nodeId);
+ 
+                 final boolean updated = 
clusterCoordinator.resetNodeStatus(updatedStatus, updateId == null ? -1L : 
updateId);
+                 if (updated) {
+                     logger.info("After receiving heartbeat response, updated 
status of {} to {}", updatedStatus.getNodeIdentifier(), updatedStatus);
+                 } else {
+                     logger.debug("After receiving heartbeat response, did not 
update status of {} to {} because the update is out-of-date", 
updatedStatus.getNodeIdentifier(), updatedStatus);
+                 }
              }
- 
-             throw pe;
          }
      }
  

http://git-wip-us.apache.org/repos/asf/nifi/blob/6bf7e7f3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java
----------------------------------------------------------------------
diff --cc 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java
index 3ef2b8b,1435182..8c84771
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java
@@@ -18,7 -19,9 +18,8 @@@ package org.apache.nifi.controller.lead
  
  import java.util.HashMap;
  import java.util.Map;
 -import java.util.Properties;
  
+ import org.apache.commons.lang3.StringUtils;
  import org.apache.curator.RetryPolicy;
  import org.apache.curator.framework.CuratorFramework;
  import org.apache.curator.framework.CuratorFrameworkFactory;
@@@ -46,9 -49,14 +48,9 @@@ public class CuratorLeaderElectionManag
      private volatile boolean stopped = true;
  
      private final Map<String, LeaderRole> leaderRoles = new HashMap<>();
-     private final Map<String, LeaderElectionStateChangeListener> 
registeredRoles = new HashMap<>();
+     private final Map<String, RegisteredRole> registeredRoles = new 
HashMap<>();
  
 -
 -    public CuratorLeaderElectionManager(final int threadPoolSize) {
 -        this(threadPoolSize, NiFiProperties.getInstance());
 -    }
 -
 -    public CuratorLeaderElectionManager(final int threadPoolSize, final 
Properties properties) {
 +    public CuratorLeaderElectionManager(final int threadPoolSize, final 
NiFiProperties properties) {
          leaderElectionMonitorEngine = new FlowEngine(threadPoolSize, "Leader 
Election Notification", true);
          zkConfig = ZooKeeperClientConfig.createConfig(properties);
      }
@@@ -87,8 -98,14 +90,13 @@@
          register(roleName, null);
      }
  
 -
      @Override
-     public synchronized void register(final String roleName, final 
LeaderElectionStateChangeListener listener) {
+     public void register(String roleName, LeaderElectionStateChangeListener 
listener) {
+         register(roleName, listener, null);
+     }
+ 
+     @Override
+     public synchronized void register(final String roleName, final 
LeaderElectionStateChangeListener listener, final String participantId) {
          logger.debug("{} Registering new Leader Selector for role {}", this, 
roleName);
  
          if (leaderRoles.containsKey(roleName)) {
@@@ -174,8 -198,34 +187,35 @@@
          return role.isLeader();
      }
  
+     @Override
+     public synchronized String getLeader(final String roleName) {
+         final LeaderRole role = leaderRoles.get(roleName);
+         if (role == null) {
+             return null;
+         }
+ 
+         Participant participant;
+         try {
+             participant = role.getLeaderSelector().getLeader();
+         } catch (Exception e) {
+             logger.debug("Unable to determine leader for role '{}'; returning 
null", roleName);
+             return null;
+         }
+ 
+         if (participant == null) {
+             return null;
+         }
+ 
+         final String participantId = participant.getId();
+         if (StringUtils.isEmpty(participantId)) {
+             return null;
+         }
+ 
+         return participantId;
+     }
+ 
      private static class LeaderRole {
 +
          private final LeaderSelector leaderSelector;
          private final ElectionListener electionListener;
  
@@@ -193,8 -243,25 +233,27 @@@
          }
      }
  
+     private static class RegisteredRole {
++
+         private final LeaderElectionStateChangeListener listener;
+         private final String participantId;
+ 
+         public RegisteredRole(final String participantId, final 
LeaderElectionStateChangeListener listener) {
+             this.participantId = participantId;
+             this.listener = listener;
+         }
+ 
+         public LeaderElectionStateChangeListener getListener() {
+             return listener;
+         }
+ 
+         public String getParticipantId() {
+             return participantId;
+         }
+     }
+ 
      private class ElectionListener extends LeaderSelectorListenerAdapter 
implements LeaderSelectorListener {
 +
          private final String roleName;
          private final LeaderElectionStateChangeListener listener;
  

Reply via email to