NIFI-2574 merging master with cluster changes to updated NiFiProperties approach
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/6bf7e7f3 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/6bf7e7f3 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/6bf7e7f3 Branch: refs/heads/master Commit: 6bf7e7f3251068d245903b1740e821457fb14d6c Parents: 7d7401a e42ea9a Author: joewitt <joew...@apache.org> Authored: Wed Aug 17 01:23:54 2016 -0700 Committer: joewitt <joew...@apache.org> Committed: Wed Aug 17 01:23:54 2016 -0700 ---------------------------------------------------------------------- .../coordination/ClusterCoordinator.java | 44 ++- .../heartbeat/HeartbeatMonitor.java | 5 + .../coordination/heartbeat/NodeHeartbeat.java | 7 - .../cluster/coordination/node/ClusterRoles.java | 10 + .../coordination/node/NodeConnectionStatus.java | 36 +- .../protocol/AbstractNodeProtocolSender.java | 24 +- .../apache/nifi/cluster/protocol/Heartbeat.java | 11 +- .../nifi/cluster/protocol/HeartbeatPayload.java | 128 ++++++++ .../cluster/protocol/NodeProtocolSender.java | 5 +- .../impl/NodeProtocolSenderListener.java | 5 +- .../protocol/jaxb/message/AdaptedHeartbeat.java | 11 - .../message/AdaptedNodeConnectionStatus.java | 11 - .../protocol/jaxb/message/HeartbeatAdapter.java | 5 +- .../message/NodeConnectionStatusAdapter.java | 4 +- .../protocol/jaxb/message/ObjectFactory.java | 4 + .../message/HeartbeatResponseMessage.java | 45 +++ .../protocol/message/ProtocolMessage.java | 1 + .../resources/nifi-cluster-protocol-context.xml | 7 + .../jaxb/message/TestJaxbProtocolUtils.java | 28 ++ .../heartbeat/AbstractHeartbeatMonitor.java | 2 - .../ClusterProtocolHeartbeatMonitor.java | 136 +++----- .../heartbeat/StandardNodeHeartbeat.java | 18 +- .../node/LeaderElectionNodeProtocolSender.java | 80 +++++ .../node/NodeClusterCoordinator.java | 322 +++++------------- .../NodeClusterCoordinatorFactoryBean.java | 4 +- .../resources/nifi-cluster-manager-context.xml | 6 + .../heartbeat/TestAbstractHeartbeatMonitor.java | 31 +- .../TestThreadPoolRequestReplicator.java | 5 +- .../node/TestNodeClusterCoordinator.java | 99 ++---- .../nifi/cluster/integration/Cluster.java | 34 +- .../integration/ClusterConnectionIT.java | 327 +++++++++---------- .../nifi/cluster/integration/ClusterUtils.java | 10 +- .../apache/nifi/cluster/integration/Node.java | 50 ++- .../apache/nifi/cluster/HeartbeatPayload.java | 118 ------- .../apache/nifi/controller/FlowController.java | 45 ++- .../nifi/controller/StandardFlowService.java | 5 +- .../controller/StandardFlowSynchronizer.java | 83 ++--- .../cluster/ClusterProtocolHeartbeater.java | 105 +++--- .../election/CuratorLeaderElectionManager.java | 69 +++- .../leader/election/LeaderElectionManager.java | 24 +- .../StandaloneLeaderElectionManager.java | 65 ++++ .../nifi/fingerprint/FingerprintFactory.java | 1 + .../nifi/spring/FlowControllerFactoryBean.java | 14 +- .../LeaderElectionManagerFactoryBean.java | 57 ++++ .../src/main/resources/nifi-context.xml | 1 + .../nifi/cluster/HeartbeatPayloadTest.java | 1 + .../java/org/apache/nifi/nar/NarCloseable.java | 29 ++ .../nifi/web/StandardNiFiServiceFacade.java | 42 ++- .../src/main/resources/nifi-web-api-context.xml | 1 + 49 files changed, 1169 insertions(+), 1006 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/6bf7e7f3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java ---------------------------------------------------------------------- diff --cc nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java index d2db59e,e304c23..8b0dc97 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java @@@ -253,13 -256,11 +253,11 @@@ public abstract class AbstractHeartbeat clusterCoordinator.finishNodeConnection(nodeId); clusterCoordinator.reportEvent(nodeId, Severity.INFO, "Received first heartbeat from connecting node. Node connected."); } - - clusterCoordinator.updateNodeRoles(nodeId, heartbeat.getRoles()); } - /** - * @return the most recent heartbeat information for each node in the cluster + * @return the most recent heartbeat information for each node in the + * cluster */ protected abstract Map<NodeIdentifier, NodeHeartbeat> getLatestHeartbeats(); http://git-wip-us.apache.org/repos/asf/nifi/blob/6bf7e7f3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java ---------------------------------------------------------------------- diff --cc nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java index 95b2045,b955bd0..20fbfd2 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java @@@ -14,13 -14,14 +14,12 @@@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.nifi.cluster.coordination.heartbeat; - import java.nio.charset.StandardCharsets; + import java.util.ArrayList; import java.util.Collections; - import java.util.HashMap; - import java.util.HashSet; + import java.util.List; import java.util.Map; -import java.util.Properties; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@@ -52,20 -49,11 +47,13 @@@ import org.slf4j.Logger import org.slf4j.LoggerFactory; /** - * Uses Apache ZooKeeper to advertise the address to send heartbeats to, and then relies on the NiFi Cluster - * Protocol to receive heartbeat messages from nodes in the cluster. + * Uses Apache ZooKeeper to advertise the address to send heartbeats to, and + * then relies on the NiFi Cluster Protocol to receive heartbeat messages from + * nodes in the cluster. */ public class ClusterProtocolHeartbeatMonitor extends AbstractHeartbeatMonitor implements HeartbeatMonitor, ProtocolHandler { + protected static final Logger logger = LoggerFactory.getLogger(ClusterProtocolHeartbeatMonitor.class); - private static final String COORDINATOR_ZNODE_NAME = "coordinator"; - - private final ZooKeeperClientConfig zkClientConfig; - private final String clusterNodesPath; - - private volatile Map<String, NodeIdentifier> clusterNodeIds = new HashMap<>(); - private volatile CuratorFramework curatorClient; private final String heartbeatAddress; private final ConcurrentMap<NodeIdentifier, NodeHeartbeat> heartbeatMessages = new ConcurrentHashMap<>(); @@@ -81,14 -69,13 +69,12 @@@ } } - - public ClusterProtocolHeartbeatMonitor(final ClusterCoordinator clusterCoordinator, final ProtocolListener protocolListener, final Properties properties) { - super(clusterCoordinator, properties); + public ClusterProtocolHeartbeatMonitor(final ClusterCoordinator clusterCoordinator, final ProtocolListener protocolListener, final NiFiProperties nifiProperties) { + super(clusterCoordinator, nifiProperties); protocolListener.addHandler(this); - this.zkClientConfig = ZooKeeperClientConfig.createConfig(nifiProperties); - this.clusterNodesPath = zkClientConfig.resolvePath("cluster/nodes"); - String hostname = properties.getProperty(NiFiProperties.CLUSTER_NODE_ADDRESS); + String hostname = nifiProperties.getProperty(NiFiProperties.CLUSTER_NODE_ADDRESS); if (hostname == null || hostname.trim().isEmpty()) { hostname = "localhost"; } @@@ -129,51 -111,9 +110,9 @@@ heartbeatMessages.clear(); for (final NodeIdentifier nodeId : clusterCoordinator.getNodeIdentifiers()) { final NodeHeartbeat heartbeat = new StandardNodeHeartbeat(nodeId, System.currentTimeMillis(), - clusterCoordinator.getConnectionStatus(nodeId), Collections.emptySet(), 0, 0L, 0, System.currentTimeMillis()); - clusterCoordinator.getConnectionStatus(nodeId), 0, 0L, 0, System.currentTimeMillis()); ++ clusterCoordinator.getConnectionStatus(nodeId), 0, 0L, 0, System.currentTimeMillis()); heartbeatMessages.put(nodeId, heartbeat); } - - final Thread publishAddress = new Thread(new Runnable() { - @Override - public void run() { - while (!isStopped()) { - final String path = clusterNodesPath + "/" + COORDINATOR_ZNODE_NAME; - try { - try { - curatorClient.setData().forPath(path, heartbeatAddress.getBytes(StandardCharsets.UTF_8)); - logger.info("Successfully published Cluster Heartbeat Monitor Address of {} to ZooKeeper", heartbeatAddress); - return; - } catch (final NoNodeException nne) { - // ensure that parents are created, using a wide-open ACL because the parents contain no data - // and the path is not in any way sensitive. - try { - curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path); - } catch (final NodeExistsException nee) { - // This is okay. Node already exists. - } - - curatorClient.create().withMode(CreateMode.EPHEMERAL).forPath(path, heartbeatAddress.getBytes(StandardCharsets.UTF_8)); - logger.info("Successfully published address as heartbeat monitor address at path {} with value {}", path, heartbeatAddress); - - return; - } - } catch (Exception e) { - logger.warn("Failed to update ZooKeeper to notify nodes of the heartbeat address. Will continue to retry."); - - try { - Thread.sleep(2000L); - } catch (final InterruptedException ie) { - Thread.currentThread().interrupt(); - return; - } - } - } - } - }); - - publishAddress.setName("Publish Heartbeat Address"); - publishAddress.setDaemon(true); - publishAddress.start(); } @Override @@@ -194,10 -131,7 +130,6 @@@ heartbeatMessages.remove(nodeId); } - protected Set<NodeIdentifier> getClusterNodeIds() { - return new HashSet<>(clusterNodeIds.values()); - } -- @Override public ProtocolMessage handle(final ProtocolMessage msg) throws ProtocolException { if (msg.getType() != MessageType.HEARTBEAT) { @@@ -218,11 -151,49 +149,48 @@@ final long systemStartTime = payload.getSystemStartTime(); final NodeHeartbeat nodeHeartbeat = new StandardNodeHeartbeat(nodeId, System.currentTimeMillis(), - connectionStatus, roles, flowFileCount, flowFileBytes, activeThreadCount, systemStartTime); - connectionStatus, flowFileCount, flowFileBytes, activeThreadCount, systemStartTime); ++ connectionStatus, flowFileCount, flowFileBytes, activeThreadCount, systemStartTime); heartbeatMessages.put(heartbeat.getNodeIdentifier(), nodeHeartbeat); logger.debug("Received new heartbeat from {}", nodeId); - return null; + // Formulate a List of differences between our view of the cluster topology and the node's view + // and send that back to the node so that it is in-sync with us + final List<NodeConnectionStatus> nodeStatusList = payload.getClusterStatus(); + final List<NodeConnectionStatus> updatedStatuses = getUpdatedStatuses(nodeStatusList); + + final HeartbeatResponseMessage responseMessage = new HeartbeatResponseMessage(); + responseMessage.setUpdatedNodeStatuses(updatedStatuses); + return responseMessage; + } + - + private List<NodeConnectionStatus> getUpdatedStatuses(final List<NodeConnectionStatus> nodeStatusList) { + // Map node's statuses by NodeIdentifier for quick & easy lookup + final Map<NodeIdentifier, NodeConnectionStatus> nodeStatusMap = nodeStatusList.stream() - .collect(Collectors.toMap(status -> status.getNodeIdentifier(), Function.identity())); ++ .collect(Collectors.toMap(status -> status.getNodeIdentifier(), Function.identity())); + + // Check if our connection status is the same for each Node Identifier and if not, add our version of the status + // to a List of updated statuses. + final List<NodeConnectionStatus> currentStatuses = clusterCoordinator.getConnectionStatuses(); + final List<NodeConnectionStatus> updatedStatuses = new ArrayList<>(); + for (final NodeConnectionStatus currentStatus : currentStatuses) { + final NodeConnectionStatus nodeStatus = nodeStatusMap.get(currentStatus.getNodeIdentifier()); + if (!currentStatus.equals(nodeStatus)) { + updatedStatuses.add(currentStatus); + } + } + + // If the node has any statuses that we do not have, add a REMOVED status to the update list + final Set<NodeIdentifier> nodeIds = currentStatuses.stream().map(status -> status.getNodeIdentifier()).collect(Collectors.toSet()); + for (final NodeConnectionStatus nodeStatus : nodeStatusList) { + if (!nodeIds.contains(nodeStatus.getNodeIdentifier())) { + updatedStatuses.add(new NodeConnectionStatus(nodeStatus.getNodeIdentifier(), NodeConnectionState.REMOVED, null)); + } + } + + logger.debug("\n\nCalculated diff between current cluster status and node cluster status as follows:\nNode: {}\nSelf: {}\nDifference: {}\n\n", - nodeStatusList, currentStatuses, updatedStatuses); ++ nodeStatusList, currentStatuses, updatedStatuses); + + return updatedStatuses; } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/6bf7e7f3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java ---------------------------------------------------------------------- diff --cc nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java index 70338c1,8e580db..2d25c8e --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java @@@ -70,11 -66,7 +65,8 @@@ import org.apache.nifi.controller.leade import org.apache.nifi.events.EventReporter; import org.apache.nifi.reporting.Severity; import org.apache.nifi.services.FlowService; +import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.web.revision.RevisionManager; - import org.apache.zookeeper.KeeperException; - import org.apache.zookeeper.WatchedEvent; - import org.apache.zookeeper.Watcher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@@ -92,12 -83,8 +84,9 @@@ public class NodeClusterCoordinator imp private final EventReporter eventReporter; private final ClusterNodeFirewall firewall; private final RevisionManager revisionManager; - - // Curator used to determine which node is coordinator - private final CuratorFramework curatorClient; - private final String nodesPathPrefix; - private final String coordinatorPath; + private final NiFiProperties nifiProperties; + private final LeaderElectionManager leaderElectionManager; + private final AtomicLong latestUpdateId = new AtomicLong(-1); private volatile FlowService flowService; private volatile boolean connected; @@@ -107,24 -93,14 +95,15 @@@ private final ConcurrentMap<NodeIdentifier, NodeConnectionStatus> nodeStatuses = new ConcurrentHashMap<>(); private final ConcurrentMap<NodeIdentifier, CircularFifoQueue<NodeEvent>> nodeEvents = new ConcurrentHashMap<>(); - public NodeClusterCoordinator(final ClusterCoordinationProtocolSenderListener senderListener, final EventReporter eventReporter, + public NodeClusterCoordinator(final ClusterCoordinationProtocolSenderListener senderListener, final EventReporter eventReporter, final LeaderElectionManager leaderElectionManager, - final ClusterNodeFirewall firewall, final RevisionManager revisionManager) { + final ClusterNodeFirewall firewall, final RevisionManager revisionManager, final NiFiProperties nifiProperties) { this.senderListener = senderListener; this.flowService = null; this.eventReporter = eventReporter; this.firewall = firewall; this.revisionManager = revisionManager; + this.nifiProperties = nifiProperties; - - final RetryPolicy retryPolicy = new RetryNTimes(10, 500); - final ZooKeeperClientConfig zkConfig = ZooKeeperClientConfig.createConfig(nifiProperties); - - curatorClient = CuratorFrameworkFactory.newClient(zkConfig.getConnectString(), - zkConfig.getSessionTimeoutMillis(), zkConfig.getConnectionTimeoutMillis(), retryPolicy); - - curatorClient.start(); - nodesPathPrefix = zkConfig.resolvePath("cluster/nodes"); - coordinatorPath = nodesPathPrefix + "/coordinator"; + this.leaderElectionManager = leaderElectionManager; senderListener.addHandler(this); } @@@ -140,10 -116,9 +119,8 @@@ final NodeConnectionStatus shutdownStatus = new NodeConnectionStatus(getLocalNodeIdentifier(), DisconnectionCode.NODE_SHUTDOWN); updateNodeStatus(shutdownStatus, false); logger.info("Successfully notified other nodes that I am shutting down"); - - curatorClient.close(); } - @Override public void setLocalNodeIdentifier(final NodeIdentifier nodeId) { this.nodeId = nodeId; @@@ -227,16 -177,30 +179,31 @@@ } } + @Override + public boolean resetNodeStatus(final NodeConnectionStatus connectionStatus, final long qualifyingUpdateId) { + final NodeIdentifier nodeId = connectionStatus.getNodeIdentifier(); + final NodeConnectionStatus currentStatus = getConnectionStatus(nodeId); + + if (currentStatus == null) { + return replaceNodeStatus(nodeId, null, connectionStatus); - } else { - if (currentStatus.getUpdateIdentifier() == qualifyingUpdateId) { - return replaceNodeStatus(nodeId, currentStatus, connectionStatus); - } ++ } else if (currentStatus.getUpdateIdentifier() == qualifyingUpdateId) { ++ return replaceNodeStatus(nodeId, currentStatus, connectionStatus); + } + + // The update identifier is not the same. We will not replace the value + return false; + } + /** - * Attempts to update the nodeStatuses map by changing the value for the given node id from the current status to the new status, as in - * ConcurrentMap.replace(nodeId, currentStatus, newStatus) but with the difference that this method can handle a <code>null</code> value - * for currentStatus + * Attempts to update the nodeStatuses map by changing the value for the + * given node id from the current status to the new status, as in + * ConcurrentMap.replace(nodeId, currentStatus, newStatus) but with the + * difference that this method can handle a <code>null</code> value for + * currentStatus * * @param nodeId the node id - * @param currentStatus the current status, or <code>null</code> if there is no value currently + * @param currentStatus the current status, or <code>null</code> if there is + * no value currently * @param newStatus the new status to set * @return <code>true</code> if the map was updated, false otherwise */ @@@ -272,11 -244,7 +247,6 @@@ requestReconnectionAsynchronously(request, 10, 5); } - private Set<String> getRoles(final NodeIdentifier nodeId) { - final NodeConnectionStatus status = getConnectionStatus(nodeId); - return status == null ? Collections.emptySet() : status.getRoles(); - } -- @Override public void finishNodeConnection(final NodeIdentifier nodeId) { final NodeConnectionState state = getConnectionState(nodeId); @@@ -298,9 -266,10 +268,9 @@@ } logger.info("{} is now connected", nodeId); - updateNodeStatus(new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTED, getRoles(nodeId))); + updateNodeStatus(new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTED)); } - @Override public void requestNodeDisconnect(final NodeIdentifier nodeId, final DisconnectionCode disconnectionCode, final String explanation) { final Set<NodeIdentifier> connectedNodeIds = getNodeIdentifiers(NodeConnectionState.CONNECTED); @@@ -405,59 -379,8 +380,7 @@@ } } - @Override - public synchronized void updateNodeRoles(final NodeIdentifier nodeId, final Set<String> roles) { - boolean updated = false; - while (!updated) { - final NodeConnectionStatus currentStatus = nodeStatuses.get(nodeId); - if (currentStatus == null) { - throw new UnknownNodeException("Cannot update roles for " + nodeId + " to " + roles + " because the node is not part of this cluster"); - } - - if (currentStatus.getRoles().equals(roles)) { - logger.debug("Roles for {} already up-to-date as {}", nodeId, roles); - return; - } - - final NodeConnectionStatus updatedStatus = new NodeConnectionStatus(currentStatus, roles); - updated = replaceNodeStatus(nodeId, currentStatus, updatedStatus); - - if (updated) { - logger.info("Updated Roles of {} from {} to {}", nodeId, currentStatus, updatedStatus); - notifyOthersOfNodeStatusChange(updatedStatus); - } - } - - // If any other node contains any of the given roles, revoke the role from the other node. - for (final String role : roles) { - for (final Map.Entry<NodeIdentifier, NodeConnectionStatus> entry : nodeStatuses.entrySet()) { - if (entry.getKey().equals(nodeId)) { - continue; - } - - updated = false; - while (!updated) { - final NodeConnectionStatus status = entry.getValue(); - if (status.getRoles().contains(role)) { - final Set<String> newRoles = new HashSet<>(status.getRoles()); - newRoles.remove(role); - - final NodeConnectionStatus updatedStatus = new NodeConnectionStatus(status, newRoles); - updated = replaceNodeStatus(entry.getKey(), status, updatedStatus); - - if (updated) { - logger.info("Updated Roles of {} from {} to {}", nodeId, status, updatedStatus); - notifyOthersOfNodeStatusChange(updatedStatus); - } - } else { - updated = true; - } - } - } - } - } - - @Override public NodeIdentifier getNodeIdentifier(final String uuid) { for (final NodeIdentifier nodeId : nodeStatuses.keySet()) { if (nodeId.getId().equals(uuid)) { @@@ -468,48 -391,7 +391,6 @@@ return null; } - // method is synchronized because it modifies local node state and then broadcasts the change. We synchronize any time that this - // is done so that we don't have an issue where we create a NodeConnectionStatus, then another thread creates one and sends it - // before the first one is sent (as this results in the first status having a larger id, which means that the first status is never - // seen by other nodes). - @Override - public synchronized void addRole(final String clusterRole) { - final NodeIdentifier localNodeId = waitForLocalNodeIdentifier(); - final NodeConnectionStatus status = getConnectionStatus(localNodeId); - final Set<String> roles = new HashSet<>(); - if (status != null) { - roles.addAll(status.getRoles()); - } - - final boolean roleAdded = roles.add(clusterRole); - - if (roleAdded) { - updateNodeRoles(localNodeId, roles); - logger.info("Cluster role {} added. This node is now responsible for the following roles: {}", clusterRole, roles); - } - } - - // method is synchronized because it modifies local node state and then broadcasts the change. We synchronize any time that this - // is done so that we don't have an issue where we create a NodeConnectionStatus, then another thread creates one and sends it - // before the first one is sent (as this results in the first status having a larger id, which means that the first status is never - // seen by other nodes). - @Override - public synchronized void removeRole(final String clusterRole) { - final NodeIdentifier localNodeId = waitForLocalNodeIdentifier(); - final NodeConnectionStatus status = getConnectionStatus(localNodeId); - final Set<String> roles = new HashSet<>(); - if (status != null) { - roles.addAll(status.getRoles()); - } - - final boolean roleRemoved = roles.remove(clusterRole); - - if (roleRemoved) { - updateNodeRoles(localNodeId, roles); - logger.info("Cluster role {} removed. This node is now responsible for the following roles: {}", clusterRole, roles); - } - } -- @Override public Set<NodeIdentifier> getNodeIdentifiers(final NodeConnectionState... states) { final Set<NodeConnectionState> statesOfInterest = new HashSet<>(); @@@ -531,11 -413,15 +412,15 @@@ @Override public NodeIdentifier getPrimaryNode() { - return nodeStatuses.values().stream() - .filter(status -> status.getRoles().contains(ClusterRoles.PRIMARY_NODE)) + final String primaryNodeAddress = leaderElectionManager.getLeader(ClusterRoles.PRIMARY_NODE); + if (primaryNodeAddress == null) { + return null; + } + + return nodeStatuses.keySet().stream() - .filter(nodeId -> primaryNodeAddress.equals(nodeId.getSocketAddress() + ":" + nodeId.getSocketPort())) - .findFirst() - .orElse(null); ++ .filter(nodeId -> primaryNodeAddress.equals(nodeId.getSocketAddress() + ":" + nodeId.getSocketPort())) + .findFirst() - .map(status -> status.getNodeIdentifier()) + .orElse(null); } @Override @@@ -899,35 -753,30 +753,30 @@@ final NodeIdentifier nodeId = statusChangeMessage.getNodeId(); logger.debug("Handling request {}", statusChangeMessage); - boolean updated = false; - while (!updated) { - final NodeConnectionStatus oldStatus = nodeStatuses.get(statusChangeMessage.getNodeId()); - - // Either remove the value from the map or update the map depending on the connection state - if (statusChangeMessage.getNodeConnectionStatus().getState() == NodeConnectionState.REMOVED) { - updated = nodeStatuses.remove(nodeId, oldStatus); - } else { - updated = replaceNodeStatus(nodeId, oldStatus, updatedStatus); - } + final NodeConnectionStatus oldStatus = nodeStatuses.get(statusChangeMessage.getNodeId()); - if (updated) { - logger.info("Status of {} changed from {} to {}", statusChangeMessage.getNodeId(), oldStatus, updatedStatus); - logger.debug("State of cluster nodes is now {}", nodeStatuses); + // Either remove the value from the map or update the map depending on the connection state + if (statusChangeMessage.getNodeConnectionStatus().getState() == NodeConnectionState.REMOVED) { - nodeStatuses.remove(nodeId, oldStatus); ++ nodeStatuses.remove(nodeId, oldStatus); + } else { - nodeStatuses.put(nodeId, updatedStatus); ++ nodeStatuses.put(nodeId, updatedStatus); + } - final NodeConnectionStatus status = statusChangeMessage.getNodeConnectionStatus(); - final String summary = summarizeStatusChange(oldStatus, status); - if (!StringUtils.isEmpty(summary)) { - addNodeEvent(nodeId, summary); - } + logger.info("Status of {} changed from {} to {}", statusChangeMessage.getNodeId(), oldStatus, updatedStatus); + logger.debug("State of cluster nodes is now {}", nodeStatuses); - // Update our counter so that we are in-sync with the cluster on the - // most up-to-date version of the NodeConnectionStatus' Update Identifier. - // We do this so that we can accurately compare status updates that are generated - // locally against those generated from other nodes in the cluster. - NodeConnectionStatus.updateIdGenerator(updatedStatus.getUpdateIdentifier()); - } + final NodeConnectionStatus status = statusChangeMessage.getNodeConnectionStatus(); + final String summary = summarizeStatusChange(oldStatus, status); + if (!StringUtils.isEmpty(summary)) { + addNodeEvent(nodeId, summary); } + // Update our counter so that we are in-sync with the cluster on the + // most up-to-date version of the NodeConnectionStatus' Update Identifier. + // We do this so that we can accurately compare status updates that are generated + // locally against those generated from other nodes in the cluster. + NodeConnectionStatus.updateIdGenerator(updatedStatus.getUpdateIdentifier()); + if (isActiveClusterCoordinator()) { notifyOthersOfNodeStatusChange(statusChangeMessage.getNodeConnectionStatus()); } @@@ -1009,8 -858,8 +858,8 @@@ return new ConnectionResponse(tryAgainSeconds); } - return new ConnectionResponse(resolvedNodeIdentifier, dataFlow, instanceId, new ArrayList<>(nodeStatuses.values()), + return new ConnectionResponse(resolvedNodeIdentifier, dataFlow, instanceId, getConnectionStatuses(), - revisionManager.getAllRevisions().stream().map(rev -> ComponentRevision.fromRevision(rev)).collect(Collectors.toList())); + revisionManager.getAllRevisions().stream().map(rev -> ComponentRevision.fromRevision(rev)).collect(Collectors.toList())); } private NodeIdentifier addRequestorDn(final NodeIdentifier nodeId, final String dn) { http://git-wip-us.apache.org/repos/asf/nifi/blob/6bf7e7f3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/NodeClusterCoordinatorFactoryBean.java ---------------------------------------------------------------------- diff --cc nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/NodeClusterCoordinatorFactoryBean.java index b414e0d,0b83c23..fc52ee1 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/NodeClusterCoordinatorFactoryBean.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/NodeClusterCoordinatorFactoryBean.java @@@ -42,8 -43,9 +43,9 @@@ public class NodeClusterCoordinatorFact final EventReporter eventReporter = applicationContext.getBean("eventReporter", EventReporter.class); final ClusterNodeFirewall clusterFirewall = applicationContext.getBean("clusterFirewall", ClusterNodeFirewall.class); final RevisionManager revisionManager = applicationContext.getBean("revisionManager", RevisionManager.class); + final LeaderElectionManager electionManager = applicationContext.getBean("leaderElectionManager", LeaderElectionManager.class); - nodeClusterCoordinator = new NodeClusterCoordinator(protocolSenderListener, eventReporter, clusterFirewall, revisionManager, properties); - nodeClusterCoordinator = new NodeClusterCoordinator(protocolSenderListener, eventReporter, electionManager, clusterFirewall, revisionManager); ++ nodeClusterCoordinator = new NodeClusterCoordinator(protocolSenderListener, eventReporter, electionManager, clusterFirewall, revisionManager, properties); } return nodeClusterCoordinator; http://git-wip-us.apache.org/repos/asf/nifi/blob/6bf7e7f3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/6bf7e7f3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java ---------------------------------------------------------------------- diff --cc nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java index ebefce2,f18d589..9c56fb9 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java @@@ -162,11 -163,10 +161,11 @@@ public class TestThreadPoolRequestRepli nodeIds.add(nodeId); final ClusterCoordinator coordinator = Mockito.mock(ClusterCoordinator.class); - Mockito.when(coordinator.getConnectionStatus(Mockito.any(NodeIdentifier.class))).thenReturn(new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTED, Collections.emptySet())); + Mockito.when(coordinator.getConnectionStatus(Mockito.any(NodeIdentifier.class))).thenReturn(new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTED)); final AtomicInteger requestCount = new AtomicInteger(0); - final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, new Client(), coordinator, "1 sec", "1 sec", null, null) { + final ThreadPoolRequestReplicator replicator + = new ThreadPoolRequestReplicator(2, new Client(), coordinator, "1 sec", "1 sec", null, null, NiFiProperties.createBasicNiFiProperties(null, null)) { @Override protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, final NodeIdentifier nodeId, final String method, final URI uri, final String requestId) { // the resource builder will not expose its headers to us, so we are using Mockito's Whitebox class to extract them. http://git-wip-us.apache.org/repos/asf/nifi/blob/6bf7e7f3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java ---------------------------------------------------------------------- diff --cc nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java index aaa9dca,91174ca..00edbc4 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java @@@ -78,7 -69,7 +78,7 @@@ public class TestNodeClusterCoordinato final RevisionManager revisionManager = Mockito.mock(RevisionManager.class); Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList()); - coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, revisionManager, createProperties()) { - coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, null, revisionManager) { ++ coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, null, revisionManager, createProperties()) { @Override void notifyOthersOfNodeStatusChange(NodeConnectionStatus updatedStatus, boolean notifyAllNodes, boolean waitForCoordinator) { nodeStatuses.add(updatedStatus); @@@ -133,7 -124,7 +133,7 @@@ final RevisionManager revisionManager = Mockito.mock(RevisionManager.class); Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList()); - final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, revisionManager, createProperties()) { - final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, null, revisionManager) { ++ final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, null, revisionManager, createProperties()) { @Override void notifyOthersOfNodeStatusChange(NodeConnectionStatus updatedStatus, boolean notifyAllNodes, boolean waitForCoordinator) { } @@@ -171,7 -162,7 +171,7 @@@ final RevisionManager revisionManager = Mockito.mock(RevisionManager.class); Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList()); - final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, revisionManager, createProperties()) { - final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, null, revisionManager) { ++ final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, null, revisionManager, createProperties()) { @Override void notifyOthersOfNodeStatusChange(NodeConnectionStatus updatedStatus, boolean notifyAllNodes, boolean waitForCoordinator) { } @@@ -396,61 -391,7 +396,6 @@@ assertTrue(nodeStatuses.isEmpty()); } - @Test(timeout = 5000) - public void testUpdateNodeRoles() throws InterruptedException { - // Add a connected node - final NodeIdentifier nodeId1 = createNodeId(1); - final NodeIdentifier nodeId2 = createNodeId(2); - - coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId1, NodeConnectionState.CONNECTED, Collections.emptySet())); - // wait for the status change message and clear it - while (nodeStatuses.isEmpty()) { - Thread.sleep(10L); - } - nodeStatuses.clear(); - - coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId2, NodeConnectionState.CONNECTED, Collections.emptySet())); - // wait for the status change message and clear it - while (nodeStatuses.isEmpty()) { - Thread.sleep(10L); - } - nodeStatuses.clear(); - - // Update role of node 1 to primary node - coordinator.updateNodeRoles(nodeId1, Collections.singleton(ClusterRoles.PRIMARY_NODE)); - - // wait for the status change message - while (nodeStatuses.isEmpty()) { - Thread.sleep(10L); - } - // verify the message - final NodeConnectionStatus status = nodeStatuses.get(0); - assertNotNull(status); - assertEquals(nodeId1, status.getNodeIdentifier()); - assertEquals(NodeConnectionState.CONNECTED, status.getState()); - assertEquals(Collections.singleton(ClusterRoles.PRIMARY_NODE), status.getRoles()); - nodeStatuses.clear(); - - // Update role of node 2 to primary node. This should trigger 2 status changes - - // node 1 should lose primary role & node 2 should gain it - coordinator.updateNodeRoles(nodeId2, Collections.singleton(ClusterRoles.PRIMARY_NODE)); - - // wait for the status change message - while (nodeStatuses.size() < 2) { - Thread.sleep(10L); - } - - final NodeConnectionStatus status1 = nodeStatuses.get(0); - final NodeConnectionStatus status2 = nodeStatuses.get(1); - final NodeConnectionStatus id1Msg = (status1.getNodeIdentifier().equals(nodeId1)) ? status1 : status2; - final NodeConnectionStatus id2Msg = (status1.getNodeIdentifier().equals(nodeId2)) ? status1 : status2; - - assertNotSame(id1Msg, id2Msg); - - assertTrue(id1Msg.getRoles().isEmpty()); - assertEquals(Collections.singleton(ClusterRoles.PRIMARY_NODE), id2Msg.getRoles()); - } -- @Test public void testProposedIdentifierResolvedIfConflict() { final NodeIdentifier id1 = new NodeIdentifier("1234", "localhost", 8000, "localhost", 9000, "localhost", 10000, 11000, false); http://git-wip-us.apache.org/repos/asf/nifi/blob/6bf7e7f3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Cluster.java ---------------------------------------------------------------------- diff --cc nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Cluster.java index 5809625,fd54203..a0c8648 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Cluster.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Cluster.java @@@ -91,13 -93,26 +95,26 @@@ public class Cluster return Collections.unmodifiableSet(nodes); } + public CuratorFramework createCuratorClient() { + final RetryPolicy retryPolicy = new RetryNTimes(20, 500); + final CuratorFramework curatorClient = CuratorFrameworkFactory.builder() + .connectString(getZooKeeperConnectString()) + .sessionTimeoutMs(3000) + .connectionTimeoutMs(3000) + .retryPolicy(retryPolicy) + .defaultData(new byte[0]) + .build(); + + curatorClient.start(); + return curatorClient; + } public Node createNode() { - NiFiProperties.getInstance().setProperty(NiFiProperties.ZOOKEEPER_CONNECT_STRING, getZooKeeperConnectString()); - NiFiProperties.getInstance().setProperty(NiFiProperties.CLUSTER_IS_NODE, "true"); + final Map<String, String> addProps = new HashMap<>(); + addProps.put(NiFiProperties.ZOOKEEPER_CONNECT_STRING, getZooKeeperConnectString()); + addProps.put(NiFiProperties.CLUSTER_IS_NODE, "true"); - final NiFiProperties properties = NiFiProperties.getInstance().copy(); - final Node node = new Node(properties); + final Node node = new Node(NiFiProperties.createBasicNiFiProperties("src/test/resources/conf/nifi.properties", addProps)); node.start(); nodes.add(node); http://git-wip-us.apache.org/repos/asf/nifi/blob/6bf7e7f3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java ---------------------------------------------------------------------- diff --cc nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java index 899f312,2996442..a8b51f2 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java @@@ -127,7 -115,8 +133,8 @@@ public class Node final HeartbeatMonitor heartbeatMonitor = createHeartbeatMonitor(); flowController = FlowController.createClusteredInstance(Mockito.mock(FlowFileEventRepository.class), nodeProperties, - null, null, StringEncryptor.createEncryptor(nodeProperties), protocolSender, Mockito.mock(BulletinRepository.class), clusterCoordinator, heartbeatMonitor, VariableRegistry.EMPTY_REGISTRY); - null, null, StringEncryptor.createEncryptor(), protocolSender, Mockito.mock(BulletinRepository.class), clusterCoordinator, ++ null, null, StringEncryptor.createEncryptor(nodeProperties), protocolSender, Mockito.mock(BulletinRepository.class), clusterCoordinator, + heartbeatMonitor, electionManager, VariableRegistry.EMPTY_REGISTRY); try { flowController.initializeFlow(); @@@ -267,7 -251,7 +269,7 @@@ } final ClusterCoordinationProtocolSenderListener protocolSenderListener = new ClusterCoordinationProtocolSenderListener(createCoordinatorProtocolSender(), protocolListener); - return new NodeClusterCoordinator(protocolSenderListener, eventReporter, null, revisionManager, nodeProperties); - return new NodeClusterCoordinator(protocolSenderListener, eventReporter, electionManager, null, revisionManager); ++ return new NodeClusterCoordinator(protocolSenderListener, eventReporter, electionManager, null, revisionManager, nodeProperties); } http://git-wip-us.apache.org/repos/asf/nifi/blob/6bf7e7f3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java ---------------------------------------------------------------------- diff --cc nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index abb4b78,efdf152..8fb2305 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@@ -388,6 -387,7 +387,7 @@@ public class FlowController implements bulletinRepo, /* cluster coordinator */ null, /* heartbeat monitor */ null, - /* leader election manager */ null, ++ /* leader election manager */ null, /* variable registry */ variableRegistry); } @@@ -401,7 -401,8 +401,8 @@@ final BulletinRepository bulletinRepo, final ClusterCoordinator clusterCoordinator, final HeartbeatMonitor heartbeatMonitor, - VariableRegistry variableRegistry) { - final LeaderElectionManager leaderElectionManager, - final VariableRegistry variableRegistry) { ++ final LeaderElectionManager leaderElectionManager, ++ final VariableRegistry variableRegistry) { final FlowController flowController = new FlowController( flowFileEventRepo, @@@ -413,7 -414,9 +414,9 @@@ protocolSender, bulletinRepo, clusterCoordinator, - heartbeatMonitor, variableRegistry); - heartbeatMonitor, - leaderElectionManager, - variableRegistry); ++ heartbeatMonitor, ++ leaderElectionManager, ++ variableRegistry); return flowController; } @@@ -429,6 -432,7 +432,7 @@@ final BulletinRepository bulletinRepo, final ClusterCoordinator clusterCoordinator, final HeartbeatMonitor heartbeatMonitor, - final LeaderElectionManager leaderElectionManager, ++ final LeaderElectionManager leaderElectionManager, final VariableRegistry variableRegistry) { maxTimerDrivenThreads = new AtomicInteger(10); @@@ -3304,7 -3308,10 +3307,9 @@@ return configuredForClustering; } - private void registerForClusterCoordinator() { + final String participantId = heartbeatMonitor.getHeartbeatAddress(); + leaderElectionManager.register(ClusterRoles.CLUSTER_COORDINATOR, new LeaderElectionStateChangeListener() { @Override public synchronized void onLeaderRelinquish() { http://git-wip-us.apache.org/repos/asf/nifi/blob/6bf7e7f3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/6bf7e7f3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/6bf7e7f3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ClusterProtocolHeartbeater.java ---------------------------------------------------------------------- diff --cc nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ClusterProtocolHeartbeater.java index 275fd3e,d675d0c..efc0588 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ClusterProtocolHeartbeater.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ClusterProtocolHeartbeater.java @@@ -17,12 -17,16 +17,16 @@@ package org.apache.nifi.controller.cluster; import java.io.IOException; - import java.nio.charset.StandardCharsets; + - import org.apache.curator.RetryPolicy; - import org.apache.curator.framework.CuratorFramework; - import org.apache.curator.framework.CuratorFrameworkFactory; - import org.apache.curator.retry.RetryNTimes; + import java.util.List; + import java.util.Map; + import java.util.stream.Collectors; + + import org.apache.nifi.cluster.coordination.ClusterCoordinator; + import org.apache.nifi.cluster.coordination.node.ClusterRoles; + import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus; + import org.apache.nifi.cluster.protocol.HeartbeatPayload; + import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.cluster.protocol.NodeProtocolSender; import org.apache.nifi.cluster.protocol.ProtocolException; import org.apache.nifi.cluster.protocol.message.HeartbeatMessage; @@@ -33,13 -36,11 +36,13 @@@ import org.slf4j.Logger import org.slf4j.LoggerFactory; /** - * Uses ZooKeeper in order to determine which node is the elected Cluster - * Coordinator and to indicate that this node is part of the cluster. However, - * once the Cluster Coordinator is known, heartbeats are sent directly to the - * Uses Leader Election Manager in order to determine which node is the elected Cluster Coordinator and to indicate - * that this node is part of the cluster. Once the Cluster Coordinator is known, heartbeats are - * sent directly to the Cluster Coordinator. ++ * Uses Leader Election Manager in order to determine which node is the elected ++ * Cluster Coordinator and to indicate that this node is part of the cluster. ++ * Once the Cluster Coordinator is known, heartbeats are sent directly to the + * Cluster Coordinator. */ public class ClusterProtocolHeartbeater implements Heartbeater { + private static final Logger logger = LoggerFactory.getLogger(ClusterProtocolHeartbeater.class); private final NodeProtocolSender protocolSender; @@@ -90,19 -66,27 +68,27 @@@ @Override public synchronized void send(final HeartbeatMessage heartbeatMessage) throws IOException { final String heartbeatAddress = getHeartbeatAddress(); - - try { - protocolSender.heartbeat(heartbeatMessage, heartbeatAddress); - } catch (final ProtocolException pe) { - // a ProtocolException is likely the result of not being able to communicate - // with the coordinator. If we do get an IOException communicating with the coordinator, - // it will be the cause of the Protocol Exception. In this case, set coordinatorAddress - // to null so that we double-check next time that the coordinator has not changed. - if (pe.getCause() instanceof IOException) { - coordinatorAddress = null; + final HeartbeatResponseMessage responseMessage = protocolSender.heartbeat(heartbeatMessage, heartbeatAddress); + + final byte[] payloadBytes = heartbeatMessage.getHeartbeat().getPayload(); + final HeartbeatPayload payload = HeartbeatPayload.unmarshal(payloadBytes); + final List<NodeConnectionStatus> nodeStatusList = payload.getClusterStatus(); + final Map<NodeIdentifier, Long> updateIdMap = nodeStatusList.stream().collect( - Collectors.toMap(status -> status.getNodeIdentifier(), status -> status.getUpdateIdentifier())); ++ Collectors.toMap(status -> status.getNodeIdentifier(), status -> status.getUpdateIdentifier())); + + final List<NodeConnectionStatus> updatedStatuses = responseMessage.getUpdatedNodeStatuses(); + if (updatedStatuses != null) { + for (final NodeConnectionStatus updatedStatus : updatedStatuses) { + final NodeIdentifier nodeId = updatedStatus.getNodeIdentifier(); + final Long updateId = updateIdMap.get(nodeId); + + final boolean updated = clusterCoordinator.resetNodeStatus(updatedStatus, updateId == null ? -1L : updateId); + if (updated) { + logger.info("After receiving heartbeat response, updated status of {} to {}", updatedStatus.getNodeIdentifier(), updatedStatus); + } else { + logger.debug("After receiving heartbeat response, did not update status of {} to {} because the update is out-of-date", updatedStatus.getNodeIdentifier(), updatedStatus); + } } - - throw pe; } } http://git-wip-us.apache.org/repos/asf/nifi/blob/6bf7e7f3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java ---------------------------------------------------------------------- diff --cc nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java index 3ef2b8b,1435182..8c84771 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java @@@ -18,7 -19,9 +18,8 @@@ package org.apache.nifi.controller.lead import java.util.HashMap; import java.util.Map; -import java.util.Properties; + import org.apache.commons.lang3.StringUtils; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; @@@ -46,9 -49,14 +48,9 @@@ public class CuratorLeaderElectionManag private volatile boolean stopped = true; private final Map<String, LeaderRole> leaderRoles = new HashMap<>(); - private final Map<String, LeaderElectionStateChangeListener> registeredRoles = new HashMap<>(); + private final Map<String, RegisteredRole> registeredRoles = new HashMap<>(); - - public CuratorLeaderElectionManager(final int threadPoolSize) { - this(threadPoolSize, NiFiProperties.getInstance()); - } - - public CuratorLeaderElectionManager(final int threadPoolSize, final Properties properties) { + public CuratorLeaderElectionManager(final int threadPoolSize, final NiFiProperties properties) { leaderElectionMonitorEngine = new FlowEngine(threadPoolSize, "Leader Election Notification", true); zkConfig = ZooKeeperClientConfig.createConfig(properties); } @@@ -87,8 -98,14 +90,13 @@@ register(roleName, null); } - @Override - public synchronized void register(final String roleName, final LeaderElectionStateChangeListener listener) { + public void register(String roleName, LeaderElectionStateChangeListener listener) { + register(roleName, listener, null); + } + + @Override + public synchronized void register(final String roleName, final LeaderElectionStateChangeListener listener, final String participantId) { logger.debug("{} Registering new Leader Selector for role {}", this, roleName); if (leaderRoles.containsKey(roleName)) { @@@ -174,8 -198,34 +187,35 @@@ return role.isLeader(); } + @Override + public synchronized String getLeader(final String roleName) { + final LeaderRole role = leaderRoles.get(roleName); + if (role == null) { + return null; + } + + Participant participant; + try { + participant = role.getLeaderSelector().getLeader(); + } catch (Exception e) { + logger.debug("Unable to determine leader for role '{}'; returning null", roleName); + return null; + } + + if (participant == null) { + return null; + } + + final String participantId = participant.getId(); + if (StringUtils.isEmpty(participantId)) { + return null; + } + + return participantId; + } + private static class LeaderRole { + private final LeaderSelector leaderSelector; private final ElectionListener electionListener; @@@ -193,8 -243,25 +233,27 @@@ } } + private static class RegisteredRole { ++ + private final LeaderElectionStateChangeListener listener; + private final String participantId; + + public RegisteredRole(final String participantId, final LeaderElectionStateChangeListener listener) { + this.participantId = participantId; + this.listener = listener; + } + + public LeaderElectionStateChangeListener getListener() { + return listener; + } + + public String getParticipantId() { + return participantId; + } + } + private class ElectionListener extends LeaderSelectorListenerAdapter implements LeaderSelectorListener { + private final String roleName; private final LeaderElectionStateChangeListener listener;