http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java index ac39dc5..8e580db 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java @@ -18,28 +18,23 @@ package org.apache.nifi.cluster.coordination.node; import java.io.IOException; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Properties; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; import java.util.regex.Pattern; import java.util.stream.Collectors; import org.apache.commons.collections4.queue.CircularFifoQueue; import org.apache.commons.lang3.StringUtils; -import org.apache.curator.RetryPolicy; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.CuratorFrameworkFactory; -import org.apache.curator.retry.RetryNTimes; import org.apache.nifi.cluster.coordination.ClusterCoordinator; import org.apache.nifi.cluster.coordination.http.HttpResponseMerger; import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMerger; @@ -50,7 +45,6 @@ import org.apache.nifi.cluster.firewall.ClusterNodeFirewall; import org.apache.nifi.cluster.manager.NodeResponse; import org.apache.nifi.cluster.manager.exception.IllegalNodeDisconnectionException; import org.apache.nifi.cluster.manager.exception.NoClusterCoordinatorException; -import org.apache.nifi.cluster.manager.exception.UnknownNodeException; import org.apache.nifi.cluster.protocol.ComponentRevision; import org.apache.nifi.cluster.protocol.ConnectionRequest; import org.apache.nifi.cluster.protocol.ConnectionResponse; @@ -68,14 +62,11 @@ import org.apache.nifi.cluster.protocol.message.NodeStatusChangeMessage; import org.apache.nifi.cluster.protocol.message.ProtocolMessage; import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType; import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage; -import org.apache.nifi.controller.cluster.ZooKeeperClientConfig; +import org.apache.nifi.controller.leader.election.LeaderElectionManager; import org.apache.nifi.events.EventReporter; import org.apache.nifi.reporting.Severity; import org.apache.nifi.services.FlowService; import org.apache.nifi.web.revision.RevisionManager; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -92,37 +83,24 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl private final EventReporter eventReporter; private final ClusterNodeFirewall firewall; private final RevisionManager revisionManager; - - // Curator used to determine which node is coordinator - private final CuratorFramework curatorClient; - private final String nodesPathPrefix; - private final String coordinatorPath; + private final LeaderElectionManager leaderElectionManager; + private final AtomicLong latestUpdateId = new AtomicLong(-1); private volatile FlowService flowService; private volatile boolean connected; - private volatile String coordinatorAddress; private volatile boolean closed = false; private final ConcurrentMap<NodeIdentifier, NodeConnectionStatus> nodeStatuses = new ConcurrentHashMap<>(); private final ConcurrentMap<NodeIdentifier, CircularFifoQueue<NodeEvent>> nodeEvents = new ConcurrentHashMap<>(); - public NodeClusterCoordinator(final ClusterCoordinationProtocolSenderListener senderListener, final EventReporter eventReporter, - final ClusterNodeFirewall firewall, final RevisionManager revisionManager, final Properties nifiProperties) { + public NodeClusterCoordinator(final ClusterCoordinationProtocolSenderListener senderListener, final EventReporter eventReporter, final LeaderElectionManager leaderElectionManager, + final ClusterNodeFirewall firewall, final RevisionManager revisionManager) { this.senderListener = senderListener; this.flowService = null; this.eventReporter = eventReporter; this.firewall = firewall; this.revisionManager = revisionManager; - - final RetryPolicy retryPolicy = new RetryNTimes(10, 500); - final ZooKeeperClientConfig zkConfig = ZooKeeperClientConfig.createConfig(nifiProperties); - - curatorClient = CuratorFrameworkFactory.newClient(zkConfig.getConnectString(), - zkConfig.getSessionTimeoutMillis(), zkConfig.getConnectionTimeoutMillis(), retryPolicy); - - curatorClient.start(); - nodesPathPrefix = zkConfig.resolvePath("cluster/nodes"); - coordinatorPath = nodesPathPrefix + "/coordinator"; + this.leaderElectionManager = leaderElectionManager; senderListener.addHandler(this); } @@ -138,10 +116,9 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl final NodeConnectionStatus shutdownStatus = new NodeConnectionStatus(getLocalNodeIdentifier(), DisconnectionCode.NODE_SHUTDOWN); updateNodeStatus(shutdownStatus, false); logger.info("Successfully notified other nodes that I am shutting down"); - - curatorClient.close(); } + @Override public void setLocalNodeIdentifier(final NodeIdentifier nodeId) { this.nodeId = nodeId; @@ -153,10 +130,6 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl return nodeId; } - private NodeIdentifier waitForLocalNodeIdentifier() { - return waitForNodeIdentifier(() -> getLocalNodeIdentifier()); - } - private NodeIdentifier waitForElectedClusterCoordinator() { return waitForNodeIdentifier(() -> getElectedActiveCoordinatorNode(false)); } @@ -174,6 +147,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl Thread.sleep(100L); } catch (final InterruptedException ie) { Thread.currentThread().interrupt(); + return null; } } } @@ -182,34 +156,12 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl } private String getElectedActiveCoordinatorAddress() throws IOException { - final String curAddress = coordinatorAddress; - if (curAddress != null) { - return curAddress; - } - - try { - // Get coordinator address and add watcher to change who we are heartbeating to if the value changes. - final byte[] coordinatorAddressBytes = curatorClient.getData().usingWatcher(new Watcher() { - @Override - public void process(final WatchedEvent event) { - coordinatorAddress = null; - } - }).forPath(coordinatorPath); - final String address = coordinatorAddress = new String(coordinatorAddressBytes, StandardCharsets.UTF_8); - - logger.info("Determined that Cluster Coordinator is located at {}", address); - return address; - } catch (final KeeperException.NoNodeException nne) { - throw new NoClusterCoordinatorException(); - } catch (Exception e) { - throw new IOException("Unable to determine Cluster Coordinator from ZooKeeper", e); - } + return leaderElectionManager.getLeader(ClusterRoles.CLUSTER_COORDINATOR); } @Override public void resetNodeStatuses(final Map<NodeIdentifier, NodeConnectionStatus> statusMap) { logger.info("Resetting cluster node statuses from {} to {}", nodeStatuses, statusMap); - coordinatorAddress = null; // For each proposed replacement, update the nodeStatuses map if and only if the replacement // has a larger update id than the current value. @@ -217,12 +169,29 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl final NodeIdentifier nodeId = entry.getKey(); final NodeConnectionStatus proposedStatus = entry.getValue(); - boolean updated = false; - while (!updated) { - final NodeConnectionStatus currentStatus = nodeStatuses.get(nodeId); - updated = replaceNodeStatus(nodeId, currentStatus, proposedStatus); + if (proposedStatus.getState() == NodeConnectionState.REMOVED) { + nodeStatuses.remove(nodeId); + } else { + nodeStatuses.put(nodeId, proposedStatus); + } + } + } + + @Override + public boolean resetNodeStatus(final NodeConnectionStatus connectionStatus, final long qualifyingUpdateId) { + final NodeIdentifier nodeId = connectionStatus.getNodeIdentifier(); + final NodeConnectionStatus currentStatus = getConnectionStatus(nodeId); + + if (currentStatus == null) { + return replaceNodeStatus(nodeId, null, connectionStatus); + } else { + if (currentStatus.getUpdateIdentifier() == qualifyingUpdateId) { + return replaceNodeStatus(nodeId, currentStatus, connectionStatus); } } + + // The update identifier is not the same. We will not replace the value + return false; } /** @@ -242,11 +211,19 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl } if (currentStatus == null) { - final NodeConnectionStatus existingValue = nodeStatuses.putIfAbsent(nodeId, newStatus); - return existingValue == null; + if (newStatus.getState() == NodeConnectionState.REMOVED) { + return nodeStatuses.remove(nodeId, currentStatus); + } else { + final NodeConnectionStatus existingValue = nodeStatuses.putIfAbsent(nodeId, newStatus); + return existingValue == null; + } } - return nodeStatuses.replace(nodeId, currentStatus, newStatus); + if (newStatus.getState() == NodeConnectionState.REMOVED) { + return nodeStatuses.remove(nodeId, currentStatus); + } else { + return nodeStatuses.replace(nodeId, currentStatus, newStatus); + } } @Override @@ -257,7 +234,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl reportEvent(nodeId, Severity.INFO, "Requesting that node connect to cluster on behalf of " + userDn); } - updateNodeStatus(new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTING, null, null, System.currentTimeMillis(), getRoles(nodeId))); + updateNodeStatus(new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTING, null, null, System.currentTimeMillis())); // create the request final ReconnectionRequestMessage request = new ReconnectionRequestMessage(); @@ -267,10 +244,6 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl requestReconnectionAsynchronously(request, 10, 5); } - private Set<String> getRoles(final NodeIdentifier nodeId) { - final NodeConnectionStatus status = getConnectionStatus(nodeId); - return status == null ? Collections.emptySet() : status.getRoles(); - } @Override public void finishNodeConnection(final NodeIdentifier nodeId) { @@ -293,7 +266,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl } logger.info("{} is now connected", nodeId); - updateNodeStatus(new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTED, getRoles(nodeId))); + updateNodeStatus(new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTED)); } @@ -350,7 +323,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl reportEvent(nodeId, Severity.INFO, "User " + userDn + " requested that node be removed from cluster"); nodeStatuses.remove(nodeId); nodeEvents.remove(nodeId); - notifyOthersOfNodeStatusChange(new NodeConnectionStatus(nodeId, NodeConnectionState.REMOVED, Collections.emptySet())); + notifyOthersOfNodeStatusChange(new NodeConnectionStatus(nodeId, NodeConnectionState.REMOVED)); } @Override @@ -363,6 +336,10 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl return status == null ? null : status.getState(); } + @Override + public List<NodeConnectionStatus> getConnectionStatuses() { + return new ArrayList<>(nodeStatuses.values()); + } @Override public Map<NodeConnectionState, List<NodeIdentifier>> getConnectionStates() { @@ -402,57 +379,6 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl } } - @Override - public synchronized void updateNodeRoles(final NodeIdentifier nodeId, final Set<String> roles) { - boolean updated = false; - while (!updated) { - final NodeConnectionStatus currentStatus = nodeStatuses.get(nodeId); - if (currentStatus == null) { - throw new UnknownNodeException("Cannot update roles for " + nodeId + " to " + roles + " because the node is not part of this cluster"); - } - - if (currentStatus.getRoles().equals(roles)) { - logger.debug("Roles for {} already up-to-date as {}", nodeId, roles); - return; - } - - final NodeConnectionStatus updatedStatus = new NodeConnectionStatus(currentStatus, roles); - updated = replaceNodeStatus(nodeId, currentStatus, updatedStatus); - - if (updated) { - logger.info("Updated Roles of {} from {} to {}", nodeId, currentStatus, updatedStatus); - notifyOthersOfNodeStatusChange(updatedStatus); - } - } - - // If any other node contains any of the given roles, revoke the role from the other node. - for (final String role : roles) { - for (final Map.Entry<NodeIdentifier, NodeConnectionStatus> entry : nodeStatuses.entrySet()) { - if (entry.getKey().equals(nodeId)) { - continue; - } - - updated = false; - while (!updated) { - final NodeConnectionStatus status = entry.getValue(); - if (status.getRoles().contains(role)) { - final Set<String> newRoles = new HashSet<>(status.getRoles()); - newRoles.remove(role); - - final NodeConnectionStatus updatedStatus = new NodeConnectionStatus(status, newRoles); - updated = replaceNodeStatus(entry.getKey(), status, updatedStatus); - - if (updated) { - logger.info("Updated Roles of {} from {} to {}", nodeId, status, updatedStatus); - notifyOthersOfNodeStatusChange(updatedStatus); - } - } else { - updated = true; - } - } - } - } - } @Override public NodeIdentifier getNodeIdentifier(final String uuid) { @@ -465,47 +391,6 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl return null; } - // method is synchronized because it modifies local node state and then broadcasts the change. We synchronize any time that this - // is done so that we don't have an issue where we create a NodeConnectionStatus, then another thread creates one and sends it - // before the first one is sent (as this results in the first status having a larger id, which means that the first status is never - // seen by other nodes). - @Override - public synchronized void addRole(final String clusterRole) { - final NodeIdentifier localNodeId = waitForLocalNodeIdentifier(); - final NodeConnectionStatus status = getConnectionStatus(localNodeId); - final Set<String> roles = new HashSet<>(); - if (status != null) { - roles.addAll(status.getRoles()); - } - - final boolean roleAdded = roles.add(clusterRole); - - if (roleAdded) { - updateNodeRoles(localNodeId, roles); - logger.info("Cluster role {} added. This node is now responsible for the following roles: {}", clusterRole, roles); - } - } - - // method is synchronized because it modifies local node state and then broadcasts the change. We synchronize any time that this - // is done so that we don't have an issue where we create a NodeConnectionStatus, then another thread creates one and sends it - // before the first one is sent (as this results in the first status having a larger id, which means that the first status is never - // seen by other nodes). - @Override - public synchronized void removeRole(final String clusterRole) { - final NodeIdentifier localNodeId = waitForLocalNodeIdentifier(); - final NodeConnectionStatus status = getConnectionStatus(localNodeId); - final Set<String> roles = new HashSet<>(); - if (status != null) { - roles.addAll(status.getRoles()); - } - - final boolean roleRemoved = roles.remove(clusterRole); - - if (roleRemoved) { - updateNodeRoles(localNodeId, roles); - logger.info("Cluster role {} removed. This node is now responsible for the following roles: {}", clusterRole, roles); - } - } @Override public Set<NodeIdentifier> getNodeIdentifiers(final NodeConnectionState... states) { @@ -528,10 +413,14 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl @Override public NodeIdentifier getPrimaryNode() { - return nodeStatuses.values().stream() - .filter(status -> status.getRoles().contains(ClusterRoles.PRIMARY_NODE)) + final String primaryNodeAddress = leaderElectionManager.getLeader(ClusterRoles.PRIMARY_NODE); + if (primaryNodeAddress == null) { + return null; + } + + return nodeStatuses.keySet().stream() + .filter(nodeId -> primaryNodeAddress.equals(nodeId.getSocketAddress() + ":" + nodeId.getSocketPort())) .findFirst() - .map(status -> status.getNodeIdentifier()) .orElse(null); } @@ -558,6 +447,11 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl return null; } + if (electedNodeAddress == null) { + logger.debug("There is currently no elected active Cluster Coordinator"); + return null; + } + final int colonLoc = electedNodeAddress.indexOf(':'); if (colonLoc < 1) { if (warnOnError) { @@ -679,12 +573,10 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl logger.info("Status of {} changed from {} to {}", nodeId, currentStatus, status); logger.debug("State of cluster nodes is now {}", nodeStatuses); + latestUpdateId.updateAndGet(curVal -> Math.max(curVal, status.getUpdateIdentifier())); + if (currentState == null || currentState != status.getState()) { - // We notify all nodes of the status change if either this node is the current cluster coordinator, OR if the node was - // the cluster coordinator and no longer is. This is done because if a user disconnects the cluster coordinator, we need - // to broadcast to the cluster that this node is no longer the coordinator. Otherwise, all nodes but this one will still - // believe that this node is connected to the cluster. - final boolean notifyAllNodes = isActiveClusterCoordinator() || (currentStatus != null && currentStatus.getRoles().contains(ClusterRoles.CLUSTER_COORDINATOR)); + final boolean notifyAllNodes = isActiveClusterCoordinator(); if (notifyAllNodes) { logger.debug("Notifying all nodes that status changed from {} to {}", currentStatus, status); } else { @@ -697,6 +589,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl } } + void notifyOthersOfNodeStatusChange(final NodeConnectionStatus updatedStatus) { notifyOthersOfNodeStatusChange(updatedStatus, isActiveClusterCoordinator(), true); } @@ -784,7 +677,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl } request.setDataFlow(new StandardDataFlow(flowService.createDataFlow())); - request.setNodeConnectionStatuses(new ArrayList<>(nodeStatuses.values())); + request.setNodeConnectionStatuses(getConnectionStatuses()); request.setComponentRevisions(revisionManager.getAllRevisions().stream().map(rev -> ComponentRevision.fromRevision(rev)).collect(Collectors.toList())); // Issue a reconnection request to the node. @@ -843,43 +736,9 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl private String summarizeStatusChange(final NodeConnectionStatus oldStatus, final NodeConnectionStatus status) { final StringBuilder sb = new StringBuilder(); - if (oldStatus != null && status.getState() == oldStatus.getState()) { - // Check if roles changed - final Set<String> oldRoles = oldStatus.getRoles(); - final Set<String> newRoles = status.getRoles(); - - final Set<String> rolesRemoved = new HashSet<>(oldRoles); - rolesRemoved.removeAll(newRoles); - - final Set<String> rolesAdded = new HashSet<>(newRoles); - rolesAdded.removeAll(oldRoles); - - if (!rolesRemoved.isEmpty()) { - sb.append("Relinquished role"); - if (rolesRemoved.size() != 1) { - sb.append("s"); - } - - sb.append(" ").append(rolesRemoved); - } - - if (!rolesAdded.isEmpty()) { - if (sb.length() > 0) { - sb.append("; "); - } - - sb.append("Acquired role"); - if (rolesAdded.size() != 1) { - sb.append("s"); - } - - sb.append(" ").append(rolesAdded); - } - } else { + if (oldStatus == null || status.getState() != oldStatus.getState()) { sb.append("Node Status changed from ").append(oldStatus == null ? "[Unknown Node]" : oldStatus.getState().toString()).append(" to ").append(status.getState().toString()); - if (status.getState() == NodeConnectionState.CONNECTED) { - sb.append(" (Roles=").append(status.getRoles().toString()).append(")"); - } else if (status.getDisconnectReason() != null) { + if (status.getDisconnectReason() != null) { sb.append(" due to ").append(status.getDisconnectReason()); } else if (status.getDisconnectCode() != null) { sb.append(" due to ").append(status.getDisconnectCode().toString()); @@ -894,35 +753,30 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl final NodeIdentifier nodeId = statusChangeMessage.getNodeId(); logger.debug("Handling request {}", statusChangeMessage); - boolean updated = false; - while (!updated) { - final NodeConnectionStatus oldStatus = nodeStatuses.get(statusChangeMessage.getNodeId()); - - // Either remove the value from the map or update the map depending on the connection state - if (statusChangeMessage.getNodeConnectionStatus().getState() == NodeConnectionState.REMOVED) { - updated = nodeStatuses.remove(nodeId, oldStatus); - } else { - updated = replaceNodeStatus(nodeId, oldStatus, updatedStatus); - } + final NodeConnectionStatus oldStatus = nodeStatuses.get(statusChangeMessage.getNodeId()); - if (updated) { - logger.info("Status of {} changed from {} to {}", statusChangeMessage.getNodeId(), oldStatus, updatedStatus); - logger.debug("State of cluster nodes is now {}", nodeStatuses); + // Either remove the value from the map or update the map depending on the connection state + if (statusChangeMessage.getNodeConnectionStatus().getState() == NodeConnectionState.REMOVED) { + nodeStatuses.remove(nodeId, oldStatus); + } else { + nodeStatuses.put(nodeId, updatedStatus); + } - final NodeConnectionStatus status = statusChangeMessage.getNodeConnectionStatus(); - final String summary = summarizeStatusChange(oldStatus, status); - if (!StringUtils.isEmpty(summary)) { - addNodeEvent(nodeId, summary); - } + logger.info("Status of {} changed from {} to {}", statusChangeMessage.getNodeId(), oldStatus, updatedStatus); + logger.debug("State of cluster nodes is now {}", nodeStatuses); - // Update our counter so that we are in-sync with the cluster on the - // most up-to-date version of the NodeConnectionStatus' Update Identifier. - // We do this so that we can accurately compare status updates that are generated - // locally against those generated from other nodes in the cluster. - NodeConnectionStatus.updateIdGenerator(updatedStatus.getUpdateIdentifier()); - } + final NodeConnectionStatus status = statusChangeMessage.getNodeConnectionStatus(); + final String summary = summarizeStatusChange(oldStatus, status); + if (!StringUtils.isEmpty(summary)) { + addNodeEvent(nodeId, summary); } + // Update our counter so that we are in-sync with the cluster on the + // most up-to-date version of the NodeConnectionStatus' Update Identifier. + // We do this so that we can accurately compare status updates that are generated + // locally against those generated from other nodes in the cluster. + NodeConnectionStatus.updateIdGenerator(updatedStatus.getUpdateIdentifier()); + if (isActiveClusterCoordinator()) { notifyOthersOfNodeStatusChange(statusChangeMessage.getNodeConnectionStatus()); } @@ -980,7 +834,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl addNodeEvent(resolvedNodeIdentifier, "Connection requested from existing node. Setting status to connecting"); } - status = new NodeConnectionStatus(resolvedNodeIdentifier, NodeConnectionState.CONNECTING, null, null, System.currentTimeMillis(), getRoles(resolvedNodeIdentifier)); + status = new NodeConnectionStatus(resolvedNodeIdentifier, NodeConnectionState.CONNECTING, null, null, System.currentTimeMillis()); updateNodeStatus(status); DataFlow dataFlow = null; @@ -1004,7 +858,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl return new ConnectionResponse(tryAgainSeconds); } - return new ConnectionResponse(resolvedNodeIdentifier, dataFlow, instanceId, new ArrayList<>(nodeStatuses.values()), + return new ConnectionResponse(resolvedNodeIdentifier, dataFlow, instanceId, getConnectionStatuses(), revisionManager.getAllRevisions().stream().map(rev -> ComponentRevision.fromRevision(rev)).collect(Collectors.toList())); } @@ -1103,7 +957,6 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl @Override public void setConnected(final boolean connected) { this.connected = connected; - this.coordinatorAddress = null; // if connection state changed, we are not sure about the coordinator. Check for address again. } @Override
http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/NodeClusterCoordinatorFactoryBean.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/NodeClusterCoordinatorFactoryBean.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/NodeClusterCoordinatorFactoryBean.java index b414e0d..0b83c23 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/NodeClusterCoordinatorFactoryBean.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/NodeClusterCoordinatorFactoryBean.java @@ -20,6 +20,7 @@ package org.apache.nifi.cluster.spring; import org.apache.nifi.cluster.coordination.node.NodeClusterCoordinator; import org.apache.nifi.cluster.firewall.ClusterNodeFirewall; import org.apache.nifi.cluster.protocol.impl.ClusterCoordinationProtocolSenderListener; +import org.apache.nifi.controller.leader.election.LeaderElectionManager; import org.apache.nifi.events.EventReporter; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.web.revision.RevisionManager; @@ -42,8 +43,9 @@ public class NodeClusterCoordinatorFactoryBean implements FactoryBean<NodeCluste final EventReporter eventReporter = applicationContext.getBean("eventReporter", EventReporter.class); final ClusterNodeFirewall clusterFirewall = applicationContext.getBean("clusterFirewall", ClusterNodeFirewall.class); final RevisionManager revisionManager = applicationContext.getBean("revisionManager", RevisionManager.class); + final LeaderElectionManager electionManager = applicationContext.getBean("leaderElectionManager", LeaderElectionManager.class); - nodeClusterCoordinator = new NodeClusterCoordinator(protocolSenderListener, eventReporter, clusterFirewall, revisionManager, properties); + nodeClusterCoordinator = new NodeClusterCoordinator(protocolSenderListener, eventReporter, electionManager, clusterFirewall, revisionManager); } return nodeClusterCoordinator; http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/resources/nifi-cluster-manager-context.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/resources/nifi-cluster-manager-context.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/resources/nifi-cluster-manager-context.xml index a95db4b..84c9deb 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/resources/nifi-cluster-manager-context.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/resources/nifi-cluster-manager-context.xml @@ -35,6 +35,12 @@ <property name="properties" ref="nifiProperties"/> </bean> + <!-- Leader Election Manager --> + <bean id="leaderElectionManager" class="org.apache.nifi.spring.LeaderElectionManagerFactoryBean"> + <property name="numThreads" value="4" /> + <property name="properties" ref="nifiProperties" /> + </bean> + <!-- Cluster Coordinator --> <bean id="clusterCoordinator" class="org.apache.nifi.cluster.spring.NodeClusterCoordinatorFactoryBean"> <property name="properties" ref="nifiProperties"/> http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java index 5086dc0..46ea49b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java @@ -169,8 +169,8 @@ public class TestAbstractHeartbeatMonitor { private NodeHeartbeat createHeartbeat(final NodeIdentifier nodeId, final NodeConnectionState state) { - final NodeConnectionStatus status = new NodeConnectionStatus(nodeId, state, Collections.emptySet()); - return new StandardNodeHeartbeat(nodeId, System.currentTimeMillis(), status, Collections.emptySet(), 0, 0, 0, 0); + final NodeConnectionStatus status = new NodeConnectionStatus(nodeId, state); + return new StandardNodeHeartbeat(nodeId, System.currentTimeMillis(), status, 0, 0, 0, 0); } private TestFriendlyHeartbeatMonitor createMonitor(final ClusterCoordinator coordinator) { @@ -195,7 +195,7 @@ public class TestAbstractHeartbeatMonitor { @Override public synchronized void requestNodeConnect(NodeIdentifier nodeId, String userDn) { - statuses.put(nodeId, new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTING, Collections.emptySet())); + statuses.put(nodeId, new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTING)); } @Override @@ -205,17 +205,17 @@ public class TestAbstractHeartbeatMonitor { @Override public synchronized void finishNodeConnection(NodeIdentifier nodeId) { - statuses.put(nodeId, new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTED, Collections.emptySet())); + statuses.put(nodeId, new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTED)); } @Override public synchronized void requestNodeDisconnect(NodeIdentifier nodeId, DisconnectionCode disconnectionCode, String explanation) { - statuses.put(nodeId, new NodeConnectionStatus(nodeId, NodeConnectionState.DISCONNECTED, Collections.emptySet())); + statuses.put(nodeId, new NodeConnectionStatus(nodeId, NodeConnectionState.DISCONNECTED)); } @Override public synchronized void disconnectionRequestedByNode(NodeIdentifier nodeId, DisconnectionCode disconnectionCode, String explanation) { - statuses.put(nodeId, new NodeConnectionStatus(nodeId, NodeConnectionState.DISCONNECTED, Collections.emptySet())); + statuses.put(nodeId, new NodeConnectionStatus(nodeId, NodeConnectionState.DISCONNECTED)); } @Override @@ -246,10 +246,6 @@ public class TestAbstractHeartbeatMonitor { events.add(new ReportedEvent(nodeId, severity, event)); } - @Override - public void updateNodeRoles(NodeIdentifier nodeId, Set<String> roles) { - } - synchronized List<ReportedEvent> getEvents() { return new ArrayList<>(events); } @@ -310,16 +306,18 @@ public class TestAbstractHeartbeatMonitor { } @Override - public void addRole(String clusterRole) { + public NodeIdentifier getLocalNodeIdentifier() { + return null; } @Override - public void removeRole(String clusterRole) { + public List<NodeConnectionStatus> getConnectionStatuses() { + return Collections.emptyList(); } @Override - public NodeIdentifier getLocalNodeIdentifier() { - return null; + public boolean resetNodeStatus(NodeConnectionStatus connectionStatus, long qualifyingUpdateId) { + return false; } } @@ -360,5 +358,10 @@ public class TestAbstractHeartbeatMonitor { mutex.wait(); } } + + @Override + public String getHeartbeatAddress() { + return "localhost"; + } } } http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java index 5eac846..f18d589 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java @@ -28,7 +28,6 @@ import java.net.SocketTimeoutException; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -164,7 +163,7 @@ public class TestThreadPoolRequestReplicator { nodeIds.add(nodeId); final ClusterCoordinator coordinator = Mockito.mock(ClusterCoordinator.class); - Mockito.when(coordinator.getConnectionStatus(Mockito.any(NodeIdentifier.class))).thenReturn(new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTED, Collections.emptySet())); + Mockito.when(coordinator.getConnectionStatus(Mockito.any(NodeIdentifier.class))).thenReturn(new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTED)); final AtomicInteger requestCount = new AtomicInteger(0); final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, new Client(), coordinator, "1 sec", "1 sec", null, null) { @@ -210,7 +209,7 @@ public class TestThreadPoolRequestReplicator { Mockito.when(coordinator.getConnectionStatus(Mockito.any(NodeIdentifier.class))).thenAnswer(new Answer<NodeConnectionStatus>() { @Override public NodeConnectionStatus answer(InvocationOnMock invocation) throws Throwable { - return new NodeConnectionStatus(invocation.getArgumentAt(0, NodeIdentifier.class), NodeConnectionState.CONNECTED, Collections.emptySet()); + return new NodeConnectionStatus(invocation.getArgumentAt(0, NodeIdentifier.class), NodeConnectionState.CONNECTED); } }); http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java index 2f034b3..91174ca 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java @@ -29,7 +29,6 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Properties; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -60,11 +59,6 @@ public class TestNodeClusterCoordinator { private ClusterCoordinationProtocolSenderListener senderListener; private List<NodeConnectionStatus> nodeStatuses; - private Properties createProperties() { - final Properties props = new Properties(); - props.put("nifi.zookeeper.connect.string", "localhost:2181"); - return props; - } @Before public void setup() throws IOException { @@ -75,7 +69,7 @@ public class TestNodeClusterCoordinator { final RevisionManager revisionManager = Mockito.mock(RevisionManager.class); Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList()); - coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, revisionManager, createProperties()) { + coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, null, revisionManager) { @Override void notifyOthersOfNodeStatusChange(NodeConnectionStatus updatedStatus, boolean notifyAllNodes, boolean waitForCoordinator) { nodeStatuses.add(updatedStatus); @@ -92,10 +86,10 @@ public class TestNodeClusterCoordinator { public void testConnectionResponseIndicatesAllNodes() throws IOException { // Add a disconnected node coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(1), DisconnectionCode.LACK_OF_HEARTBEAT)); - coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(2), NodeConnectionState.DISCONNECTING, Collections.emptySet())); - coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(3), NodeConnectionState.CONNECTING, Collections.emptySet())); - coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(4), NodeConnectionState.CONNECTED, Collections.emptySet())); - coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(5), NodeConnectionState.CONNECTED, Collections.emptySet())); + coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(2), NodeConnectionState.DISCONNECTING)); + coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(3), NodeConnectionState.CONNECTING)); + coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(4), NodeConnectionState.CONNECTED)); + coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(5), NodeConnectionState.CONNECTED)); // Create a connection request message and send to the coordinator final NodeIdentifier requestedNodeId = createNodeId(6); @@ -130,7 +124,7 @@ public class TestNodeClusterCoordinator { final RevisionManager revisionManager = Mockito.mock(RevisionManager.class); Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList()); - final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, revisionManager, createProperties()) { + final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, null, revisionManager) { @Override void notifyOthersOfNodeStatusChange(NodeConnectionStatus updatedStatus, boolean notifyAllNodes, boolean waitForCoordinator) { } @@ -168,7 +162,7 @@ public class TestNodeClusterCoordinator { final RevisionManager revisionManager = Mockito.mock(RevisionManager.class); Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList()); - final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, revisionManager, createProperties()) { + final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, null, revisionManager) { @Override void notifyOthersOfNodeStatusChange(NodeConnectionStatus updatedStatus, boolean notifyAllNodes, boolean waitForCoordinator) { } @@ -263,10 +257,10 @@ public class TestNodeClusterCoordinator { public void testGetConnectionStates() throws IOException { // Add a disconnected node coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(1), DisconnectionCode.LACK_OF_HEARTBEAT)); - coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(2), NodeConnectionState.DISCONNECTING, Collections.emptySet())); - coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(3), NodeConnectionState.CONNECTING, Collections.emptySet())); - coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(4), NodeConnectionState.CONNECTED, Collections.emptySet())); - coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(5), NodeConnectionState.CONNECTED, Collections.emptySet())); + coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(2), NodeConnectionState.DISCONNECTING)); + coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(3), NodeConnectionState.CONNECTING)); + coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(4), NodeConnectionState.CONNECTED)); + coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(5), NodeConnectionState.CONNECTED)); final Map<NodeConnectionState, List<NodeIdentifier>> stateMap = coordinator.getConnectionStates(); assertEquals(4, stateMap.size()); @@ -293,10 +287,10 @@ public class TestNodeClusterCoordinator { public void testGetNodeIdentifiers() throws IOException { // Add a disconnected node coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(1), DisconnectionCode.LACK_OF_HEARTBEAT)); - coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(2), NodeConnectionState.DISCONNECTING, Collections.emptySet())); - coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(3), NodeConnectionState.CONNECTING, Collections.emptySet())); - coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(4), NodeConnectionState.CONNECTED, Collections.emptySet())); - coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(5), NodeConnectionState.CONNECTED, Collections.emptySet())); + coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(2), NodeConnectionState.DISCONNECTING)); + coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(3), NodeConnectionState.CONNECTING)); + coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(4), NodeConnectionState.CONNECTED)); + coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(5), NodeConnectionState.CONNECTED)); final Set<NodeIdentifier> connectedIds = coordinator.getNodeIdentifiers(NodeConnectionState.CONNECTED); assertEquals(2, connectedIds.size()); @@ -321,8 +315,8 @@ public class TestNodeClusterCoordinator { public void testRequestNodeDisconnect() throws InterruptedException { // Add a connected node final NodeIdentifier nodeId1 = createNodeId(1); - coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId1, NodeConnectionState.CONNECTED, Collections.emptySet())); - coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(2), NodeConnectionState.CONNECTED, Collections.emptySet())); + coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId1, NodeConnectionState.CONNECTED)); + coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(2), NodeConnectionState.CONNECTED)); // wait for the status change message and clear it while (nodeStatuses.isEmpty()) { @@ -347,8 +341,8 @@ public class TestNodeClusterCoordinator { // Add a connected node final NodeIdentifier nodeId1 = createNodeId(1); final NodeIdentifier nodeId2 = createNodeId(2); - coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId1, NodeConnectionState.CONNECTED, Collections.emptySet())); - coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId2, NodeConnectionState.CONNECTED, Collections.emptySet())); + coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId1, NodeConnectionState.CONNECTED)); + coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId2, NodeConnectionState.CONNECTED)); // wait for the status change message and clear it while (nodeStatuses.isEmpty()) { @@ -376,8 +370,8 @@ public class TestNodeClusterCoordinator { final NodeIdentifier nodeId1 = createNodeId(1); final NodeIdentifier nodeId2 = createNodeId(2); - coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId1, NodeConnectionState.CONNECTED, Collections.emptySet())); - coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId2, NodeConnectionState.CONNECTED, Collections.emptySet())); + coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId1, NodeConnectionState.CONNECTED)); + coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId2, NodeConnectionState.CONNECTED)); // wait for the status change message and clear it while (nodeStatuses.size() < 2) { @@ -386,7 +380,7 @@ public class TestNodeClusterCoordinator { nodeStatuses.clear(); final NodeConnectionStatus oldStatus = new NodeConnectionStatus(-1L, nodeId1, NodeConnectionState.DISCONNECTED, - DisconnectionCode.BLOCKED_BY_FIREWALL, null, 0L, null); + DisconnectionCode.BLOCKED_BY_FIREWALL, null, 0L); final NodeStatusChangeMessage msg = new NodeStatusChangeMessage(); msg.setNodeId(nodeId1); msg.setNodeConnectionStatus(oldStatus); @@ -397,61 +391,6 @@ public class TestNodeClusterCoordinator { assertTrue(nodeStatuses.isEmpty()); } - @Test(timeout = 5000) - public void testUpdateNodeRoles() throws InterruptedException { - // Add a connected node - final NodeIdentifier nodeId1 = createNodeId(1); - final NodeIdentifier nodeId2 = createNodeId(2); - - coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId1, NodeConnectionState.CONNECTED, Collections.emptySet())); - // wait for the status change message and clear it - while (nodeStatuses.isEmpty()) { - Thread.sleep(10L); - } - nodeStatuses.clear(); - - coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId2, NodeConnectionState.CONNECTED, Collections.emptySet())); - // wait for the status change message and clear it - while (nodeStatuses.isEmpty()) { - Thread.sleep(10L); - } - nodeStatuses.clear(); - - // Update role of node 1 to primary node - coordinator.updateNodeRoles(nodeId1, Collections.singleton(ClusterRoles.PRIMARY_NODE)); - - // wait for the status change message - while (nodeStatuses.isEmpty()) { - Thread.sleep(10L); - } - // verify the message - final NodeConnectionStatus status = nodeStatuses.get(0); - assertNotNull(status); - assertEquals(nodeId1, status.getNodeIdentifier()); - assertEquals(NodeConnectionState.CONNECTED, status.getState()); - assertEquals(Collections.singleton(ClusterRoles.PRIMARY_NODE), status.getRoles()); - nodeStatuses.clear(); - - // Update role of node 2 to primary node. This should trigger 2 status changes - - // node 1 should lose primary role & node 2 should gain it - coordinator.updateNodeRoles(nodeId2, Collections.singleton(ClusterRoles.PRIMARY_NODE)); - - // wait for the status change message - while (nodeStatuses.size() < 2) { - Thread.sleep(10L); - } - - final NodeConnectionStatus status1 = nodeStatuses.get(0); - final NodeConnectionStatus status2 = nodeStatuses.get(1); - final NodeConnectionStatus id1Msg = (status1.getNodeIdentifier().equals(nodeId1)) ? status1 : status2; - final NodeConnectionStatus id2Msg = (status1.getNodeIdentifier().equals(nodeId2)) ? status1 : status2; - - assertNotSame(id1Msg, id2Msg); - - assertTrue(id1Msg.getRoles().isEmpty()); - assertEquals(Collections.singleton(ClusterRoles.PRIMARY_NODE), id2Msg.getRoles()); - } - @Test public void testProposedIdentifierResolvedIfConflict() { http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Cluster.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Cluster.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Cluster.java index dbd8c00..fd54203 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Cluster.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Cluster.java @@ -22,6 +22,10 @@ import java.util.HashSet; import java.util.Set; import java.util.concurrent.TimeUnit; +import org.apache.curator.RetryPolicy; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.RetryNTimes; import org.apache.curator.test.TestingServer; import org.apache.nifi.cluster.coordination.node.ClusterRoles; import org.apache.nifi.util.NiFiProperties; @@ -89,6 +93,19 @@ public class Cluster { return Collections.unmodifiableSet(nodes); } + public CuratorFramework createCuratorClient() { + final RetryPolicy retryPolicy = new RetryNTimes(20, 500); + final CuratorFramework curatorClient = CuratorFrameworkFactory.builder() + .connectString(getZooKeeperConnectString()) + .sessionTimeoutMs(3000) + .connectionTimeoutMs(3000) + .retryPolicy(retryPolicy) + .defaultData(new byte[0]) + .build(); + + curatorClient.start(); + return curatorClient; + } public Node createNode() { NiFiProperties.getInstance().setProperty(NiFiProperties.ZOOKEEPER_CONNECT_STRING, getZooKeeperConnectString()); @@ -104,11 +121,24 @@ public class Cluster { public Node waitForClusterCoordinator(final long time, final TimeUnit timeUnit) { return ClusterUtils.waitUntilNonNull(time, timeUnit, - () -> getNodes().stream().filter(node -> node.getRoles().contains(ClusterRoles.CLUSTER_COORDINATOR)).findFirst().orElse(null)); + () -> getNodes().stream().filter(node -> node.hasRole(ClusterRoles.CLUSTER_COORDINATOR)).findFirst().orElse(null)); } public Node waitForPrimaryNode(final long time, final TimeUnit timeUnit) { return ClusterUtils.waitUntilNonNull(time, timeUnit, - () -> getNodes().stream().filter(node -> node.getRoles().contains(ClusterRoles.PRIMARY_NODE)).findFirst().orElse(null)); + () -> getNodes().stream().filter(node -> node.hasRole(ClusterRoles.PRIMARY_NODE)).findFirst().orElse(null)); + } + + /** + * Waits for each node in the cluster to connect. The time given is the maximum amount of time to wait for each node to connect, not for + * the entire cluster to connect. + * + * @param time the max amount of time to wait for a node to connect + * @param timeUnit the unit of time that the given <code>time</code> value represents + */ + public void waitUntilAllNodesConnected(final long time, final TimeUnit timeUnit) { + for (final Node node : nodes) { + node.waitUntilConnected(time, timeUnit); + } } } http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/ClusterConnectionIT.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/ClusterConnectionIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/ClusterConnectionIT.java index 6881ca2..3439263 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/ClusterConnectionIT.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/ClusterConnectionIT.java @@ -17,222 +17,219 @@ package org.apache.nifi.cluster.integration; +import static org.junit.Assert.assertEquals; + import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.stream.Stream; import org.apache.nifi.cluster.coordination.node.ClusterRoles; +import org.apache.nifi.cluster.coordination.node.DisconnectionCode; import org.apache.nifi.cluster.coordination.node.NodeConnectionState; +import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.junit.After; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; public class ClusterConnectionIT { + private Cluster cluster; @BeforeClass public static void setup() { System.setProperty("nifi.properties.file.path", "src/test/resources/conf/nifi.properties"); } - @Test(timeout = 20000) - public void testSingleNode() throws InterruptedException { - final Cluster cluster = new Cluster(); + @Before + public void createCluster() { + cluster = new Cluster(); cluster.start(); + } - try { - final Node firstNode = cluster.createNode(); - firstNode.waitUntilConnected(10, TimeUnit.SECONDS); - - firstNode.waitUntilElectedForRole(ClusterRoles.CLUSTER_COORDINATOR, 10, TimeUnit.SECONDS); - firstNode.waitUntilElectedForRole(ClusterRoles.PRIMARY_NODE, 10, TimeUnit.SECONDS); - } finally { + @After + public void destroyCluster() { + if (cluster != null) { cluster.stop(); } } + @Test(timeout = 20000) + public void testSingleNode() throws InterruptedException { + final Node firstNode = cluster.createNode(); + firstNode.waitUntilConnected(10, TimeUnit.SECONDS); + + firstNode.waitUntilElectedForRole(ClusterRoles.CLUSTER_COORDINATOR, 10, TimeUnit.SECONDS); + firstNode.waitUntilElectedForRole(ClusterRoles.PRIMARY_NODE, 10, TimeUnit.SECONDS); + } + @Test(timeout = 60000) public void testThreeNodeCluster() throws InterruptedException { - final Cluster cluster = new Cluster(); - cluster.start(); - - try { - final Node firstNode = cluster.createNode(); - final Node secondNode = cluster.createNode(); - final Node thirdNode = cluster.createNode(); - - firstNode.waitUntilConnected(10, TimeUnit.SECONDS); - System.out.println("**** Node 1 Connected ****"); - secondNode.waitUntilConnected(10, TimeUnit.SECONDS); - System.out.println("**** Node 2 Connected ****"); - thirdNode.waitUntilConnected(10, TimeUnit.SECONDS); - System.out.println("**** Node 3 Connected ****"); - - final Node clusterCoordinator = cluster.waitForClusterCoordinator(10, TimeUnit.SECONDS); - final Node primaryNode = cluster.waitForPrimaryNode(10, TimeUnit.SECONDS); - System.out.println("\n\n"); - System.out.println("Cluster Coordinator = " + clusterCoordinator); - System.out.println("Primary Node = " + primaryNode); - System.out.println("\n\n"); - } finally { - cluster.stop(); - } + cluster.createNode(); + cluster.createNode(); + cluster.createNode(); + + cluster.waitUntilAllNodesConnected(10, TimeUnit.SECONDS); + + final Node clusterCoordinator = cluster.waitForClusterCoordinator(10, TimeUnit.SECONDS); + final Node primaryNode = cluster.waitForPrimaryNode(10, TimeUnit.SECONDS); + System.out.println("\n\n"); + System.out.println("Cluster Coordinator = " + clusterCoordinator); + System.out.println("Primary Node = " + primaryNode); + System.out.println("\n\n"); } @Test(timeout = 60000) public void testNewCoordinatorElected() throws IOException { - final Cluster cluster = new Cluster(); - cluster.start(); - - try { - final Node firstNode = cluster.createNode(); - final Node secondNode = cluster.createNode(); + final Node firstNode = cluster.createNode(); + final Node secondNode = cluster.createNode(); - firstNode.waitUntilConnected(10, TimeUnit.SECONDS); - System.out.println("**** Node 1 Connected ****"); - secondNode.waitUntilConnected(10, TimeUnit.SECONDS); - System.out.println("**** Node 2 Connected ****"); + cluster.waitUntilAllNodesConnected(10, TimeUnit.SECONDS); - final Node clusterCoordinator = cluster.waitForClusterCoordinator(10, TimeUnit.SECONDS); - clusterCoordinator.stop(); + final Node clusterCoordinator = cluster.waitForClusterCoordinator(10, TimeUnit.SECONDS); + clusterCoordinator.stop(); - final Node otherNode = firstNode == clusterCoordinator ? secondNode : firstNode; - otherNode.waitUntilElectedForRole(ClusterRoles.CLUSTER_COORDINATOR, 10, TimeUnit.SECONDS); - } finally { - cluster.stop(); - } + final Node otherNode = firstNode == clusterCoordinator ? secondNode : firstNode; + otherNode.waitUntilElectedForRole(ClusterRoles.CLUSTER_COORDINATOR, 10, TimeUnit.SECONDS); } @Test(timeout = 60000) public void testReconnectGetsCorrectClusterTopology() throws IOException { - final Cluster cluster = new Cluster(); - cluster.start(); + final Node firstNode = cluster.createNode(); + final Node secondNode = cluster.createNode(); + final Node thirdNode = cluster.createNode(); - try { - final Node firstNode = cluster.createNode(); - final Node secondNode = cluster.createNode(); - final Node thirdNode = cluster.createNode(); - - firstNode.waitUntilConnected(10, TimeUnit.SECONDS); - System.out.println("**** Node 1 Connected ****"); - secondNode.waitUntilConnected(10, TimeUnit.SECONDS); - System.out.println("**** Node 2 Connected ****"); - thirdNode.waitUntilConnected(10, TimeUnit.SECONDS); - System.out.println("**** Node 3 Connected ****"); - - // shutdown node - secondNode.stop(); - - System.out.println("\n\nNode 2 Shut Down\n\n"); - - // wait for node 1 and 3 to recognize that node 2 is gone - Stream.of(firstNode, thirdNode).forEach(node -> { - node.assertNodeDisconnects(secondNode.getIdentifier(), 5, TimeUnit.SECONDS); - }); - - // restart node - secondNode.start(); - System.out.println("\n\nNode 2 Restarted\n\n"); - - secondNode.waitUntilConnected(20, TimeUnit.SECONDS); - System.out.println("\n\nNode 2 Reconnected\n\n"); - - // wait for all 3 nodes to agree that node 2 is connected - Stream.of(firstNode, secondNode, thirdNode).forEach(node -> { - ClusterUtils.waitUntilConditionMet(5, TimeUnit.SECONDS, - () -> firstNode.getClusterCoordinator().getConnectionStatus(secondNode.getIdentifier()).getState() == NodeConnectionState.CONNECTED); - }); - - // Ensure that all 3 nodes see a cluster of 3 connected nodes. - Stream.of(firstNode, secondNode, thirdNode).forEach(node -> { - node.assertNodeIsConnected(firstNode.getIdentifier()); - node.assertNodeIsConnected(secondNode.getIdentifier()); - node.assertNodeIsConnected(thirdNode.getIdentifier()); - }); - - // Ensure that we get both a cluster coordinator and a primary node elected - cluster.waitForClusterCoordinator(10, TimeUnit.SECONDS); - cluster.waitForPrimaryNode(10, TimeUnit.SECONDS); - } finally { - cluster.stop(); - } + cluster.waitUntilAllNodesConnected(10, TimeUnit.SECONDS); + + // shutdown node + secondNode.stop(); + + System.out.println("\n\nNode 2 Shut Down\n\n"); + + // wait for node 1 and 3 to recognize that node 2 is gone + Stream.of(firstNode, thirdNode).forEach(node -> { + node.assertNodeDisconnects(secondNode.getIdentifier(), 10, TimeUnit.SECONDS); + }); + + // restart node + secondNode.start(); + System.out.println("\n\nNode 2 Restarted\n\n"); + + secondNode.waitUntilConnected(20, TimeUnit.SECONDS); + System.out.println("\n\nNode 2 Reconnected\n\n"); + + // wait for all 3 nodes to agree that node 2 is connected + Stream.of(firstNode, secondNode, thirdNode).forEach(node -> { + ClusterUtils.waitUntilConditionMet(5, TimeUnit.SECONDS, + () -> firstNode.getClusterCoordinator().getConnectionStatus(secondNode.getIdentifier()).getState() == NodeConnectionState.CONNECTED); + }); + + // Ensure that all 3 nodes see a cluster of 3 connected nodes. + Stream.of(firstNode, secondNode, thirdNode).forEach(node -> { + node.assertNodeIsConnected(firstNode.getIdentifier()); + node.assertNodeIsConnected(secondNode.getIdentifier()); + node.assertNodeIsConnected(thirdNode.getIdentifier()); + }); + + // Ensure that we get both a cluster coordinator and a primary node elected + cluster.waitForClusterCoordinator(10, TimeUnit.SECONDS); + cluster.waitForPrimaryNode(10, TimeUnit.SECONDS); } @Test(timeout = 60000) public void testRestartAllNodes() throws IOException { - final Cluster cluster = new Cluster(); - cluster.start(); - - try { - final Node firstNode = cluster.createNode(); - final Node secondNode = cluster.createNode(); - final Node thirdNode = cluster.createNode(); - - firstNode.waitUntilConnected(10, TimeUnit.SECONDS); - System.out.println("**** Node 1 Connected ****"); - secondNode.waitUntilConnected(10, TimeUnit.SECONDS); - System.out.println("**** Node 2 Connected ****"); - thirdNode.waitUntilConnected(10, TimeUnit.SECONDS); - System.out.println("**** Node 3 Connected ****"); - - // shutdown node - firstNode.stop(); - secondNode.stop(); - thirdNode.stop(); - - System.out.println("\n\nRestarting all nodes\n\n"); - thirdNode.start(); - firstNode.start(); - secondNode.start(); - - Stream.of(firstNode, secondNode, thirdNode).forEach(node -> { - node.waitUntilConnected(10, TimeUnit.SECONDS); - }); - - // wait for all 3 nodes to agree that node 2 is connected - Stream.of(firstNode, secondNode, thirdNode).forEach(node -> { - ClusterUtils.waitUntilConditionMet(5, TimeUnit.SECONDS, - () -> firstNode.getClusterCoordinator().getConnectionStatus(secondNode.getIdentifier()).getState() == NodeConnectionState.CONNECTED); - }); - - // Ensure that all 3 nodes see a cluster of 3 connected nodes. - Stream.of(firstNode, secondNode, thirdNode).forEach(node -> { - node.assertNodeConnects(firstNode.getIdentifier(), 10, TimeUnit.SECONDS); - node.assertNodeConnects(secondNode.getIdentifier(), 10, TimeUnit.SECONDS); - node.assertNodeConnects(thirdNode.getIdentifier(), 10, TimeUnit.SECONDS); - }); - - // Ensure that we get both a cluster coordinator and a primary node elected - cluster.waitForClusterCoordinator(10, TimeUnit.SECONDS); - cluster.waitForPrimaryNode(10, TimeUnit.SECONDS); - } finally { - cluster.stop(); - } + final Node firstNode = cluster.createNode(); + final Node secondNode = cluster.createNode(); + final Node thirdNode = cluster.createNode(); + + firstNode.waitUntilConnected(10, TimeUnit.SECONDS); + System.out.println("**** Node 1 Connected ****"); + secondNode.waitUntilConnected(10, TimeUnit.SECONDS); + System.out.println("**** Node 2 Connected ****"); + thirdNode.waitUntilConnected(10, TimeUnit.SECONDS); + System.out.println("**** Node 3 Connected ****"); + + // shutdown node + firstNode.stop(); + secondNode.stop(); + thirdNode.stop(); + + System.out.println("\n\nRestarting all nodes\n\n"); + thirdNode.start(); + firstNode.start(); + secondNode.start(); + + cluster.waitUntilAllNodesConnected(10, TimeUnit.SECONDS); + + // wait for all 3 nodes to agree that node 2 is connected + Stream.of(firstNode, secondNode, thirdNode).forEach(node -> { + ClusterUtils.waitUntilConditionMet(5, TimeUnit.SECONDS, + () -> firstNode.getClusterCoordinator().getConnectionStatus(secondNode.getIdentifier()).getState() == NodeConnectionState.CONNECTED); + }); + + // Ensure that all 3 nodes see a cluster of 3 connected nodes. + Stream.of(firstNode, secondNode, thirdNode).forEach(node -> { + node.assertNodeConnects(firstNode.getIdentifier(), 10, TimeUnit.SECONDS); + node.assertNodeConnects(secondNode.getIdentifier(), 10, TimeUnit.SECONDS); + node.assertNodeConnects(thirdNode.getIdentifier(), 10, TimeUnit.SECONDS); + }); + + // Ensure that we get both a cluster coordinator and a primary node elected + cluster.waitForClusterCoordinator(10, TimeUnit.SECONDS); + cluster.waitForPrimaryNode(10, TimeUnit.SECONDS); } @Test(timeout = 30000) public void testHeartbeatsMonitored() throws IOException { - final Cluster cluster = new Cluster(); - cluster.start(); + final Node firstNode = cluster.createNode(); + final Node secondNode = cluster.createNode(); - try { - final Node firstNode = cluster.createNode(); - final Node secondNode = cluster.createNode(); + cluster.waitUntilAllNodesConnected(10, TimeUnit.SECONDS); - firstNode.waitUntilConnected(10, TimeUnit.SECONDS); - secondNode.waitUntilConnected(10, TimeUnit.SECONDS); + final Node nodeToSuspend = firstNode; + final Node otherNode = secondNode; - secondNode.suspendHeartbeating(); + nodeToSuspend.suspendHeartbeating(); - // Heartbeat interval in nifi.properties is set to 1 sec. This means that the node should be kicked out - // due to lack of heartbeat after 8 times this amount of time, or 8 seconds. - firstNode.assertNodeDisconnects(secondNode.getIdentifier(), 12, TimeUnit.SECONDS); + // Heartbeat interval in nifi.properties is set to 1 sec. This means that the node should be kicked out + // due to lack of heartbeat after 8 times this amount of time, or 8 seconds. + otherNode.assertNodeDisconnects(nodeToSuspend.getIdentifier(), 12, TimeUnit.SECONDS); - secondNode.resumeHeartbeating(); - firstNode.assertNodeConnects(secondNode.getIdentifier(), 10, TimeUnit.SECONDS); - } finally { - cluster.stop(); - } + nodeToSuspend.resumeHeartbeating(); + otherNode.assertNodeConnects(nodeToSuspend.getIdentifier(), 10, TimeUnit.SECONDS); } + @Test + public void testNodeInheritsClusterTopologyOnHeartbeat() throws InterruptedException { + final Node node1 = cluster.createNode(); + final Node node2 = cluster.createNode(); + final Node node3 = cluster.createNode(); + + cluster.waitUntilAllNodesConnected(10, TimeUnit.SECONDS); + final Node coordinator = cluster.waitForClusterCoordinator(10, TimeUnit.SECONDS); + + final NodeIdentifier node4NotReallyInCluster = new NodeIdentifier(UUID.randomUUID().toString(), "localhost", 9283, "localhost", 9284, "localhost", 9285, null, false, null); + + final Map<NodeIdentifier, NodeConnectionStatus> replacementStatuses = new HashMap<>(); + replacementStatuses.put(node1.getIdentifier(), new NodeConnectionStatus(node1.getIdentifier(), DisconnectionCode.USER_DISCONNECTED)); + replacementStatuses.put(node4NotReallyInCluster, new NodeConnectionStatus(node4NotReallyInCluster, NodeConnectionState.CONNECTING)); + + // reset coordinator status so that other nodes with get its now-fake view of the cluster + coordinator.getClusterCoordinator().resetNodeStatuses(replacementStatuses); + final List<NodeConnectionStatus> expectedStatuses = coordinator.getClusterCoordinator().getConnectionStatuses(); + + // give nodes a bit to heartbeat in. We need to wait long enough that each node heartbeats. + // But we need to not wait more than 8 seconds because that's when nodes start getting kicked out. + Thread.sleep(6000L); + + for (final Node node : new Node[] {node1, node2, node3}) { + assertEquals(expectedStatuses, node.getClusterCoordinator().getConnectionStatuses()); + } + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/ClusterUtils.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/ClusterUtils.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/ClusterUtils.java index 972d2c7..98d5cb3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/ClusterUtils.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/ClusterUtils.java @@ -24,13 +24,21 @@ import java.util.function.Supplier; public class ClusterUtils { public static void waitUntilConditionMet(final long time, final TimeUnit timeUnit, final BooleanSupplier test) { + waitUntilConditionMet(time, timeUnit, test, null); + } + + public static void waitUntilConditionMet(final long time, final TimeUnit timeUnit, final BooleanSupplier test, final Supplier<String> errorMessageSupplier) { final long nanosToWait = timeUnit.toNanos(time); final long start = System.nanoTime(); final long maxTime = start + nanosToWait; while (!test.getAsBoolean()) { if (System.nanoTime() > maxTime) { - throw new AssertionError("Condition never occurred after waiting " + time + " " + timeUnit); + if (errorMessageSupplier == null) { + throw new AssertionError("Condition never occurred after waiting " + time + " " + timeUnit); + } else { + throw new AssertionError("Condition never occurred after waiting " + time + " " + timeUnit + " : " + errorMessageSupplier.get()); + } } } } http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java index 5bfe83c..2996442 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java @@ -18,11 +18,11 @@ package org.apache.nifi.cluster.integration; import java.io.IOException; +import java.net.InetSocketAddress; import java.net.ServerSocket; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Set; import java.util.UUID; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -30,15 +30,15 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.nifi.authorization.Authorizer; import org.apache.nifi.cluster.ReportedEvent; -import org.apache.nifi.cluster.coordination.ClusterCoordinator; import org.apache.nifi.cluster.coordination.heartbeat.ClusterProtocolHeartbeatMonitor; import org.apache.nifi.cluster.coordination.heartbeat.HeartbeatMonitor; -import org.apache.nifi.cluster.coordination.node.CuratorNodeProtocolSender; +import org.apache.nifi.cluster.coordination.node.LeaderElectionNodeProtocolSender; import org.apache.nifi.cluster.coordination.node.NodeClusterCoordinator; import org.apache.nifi.cluster.coordination.node.NodeConnectionState; import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus; import org.apache.nifi.cluster.protocol.ClusterCoordinationProtocolSender; import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.cluster.protocol.NodeProtocolSender; import org.apache.nifi.cluster.protocol.ProtocolContext; import org.apache.nifi.cluster.protocol.ProtocolListener; import org.apache.nifi.cluster.protocol.impl.ClusterCoordinationProtocolSenderListener; @@ -51,6 +51,8 @@ import org.apache.nifi.cluster.protocol.message.ProtocolMessage; import org.apache.nifi.components.state.Scope; import org.apache.nifi.controller.FlowController; import org.apache.nifi.controller.StandardFlowService; +import org.apache.nifi.controller.leader.election.CuratorLeaderElectionManager; +import org.apache.nifi.controller.leader.election.LeaderElectionManager; import org.apache.nifi.controller.repository.FlowFileEventRepository; import org.apache.nifi.encrypt.StringEncryptor; import org.apache.nifi.engine.FlowEngine; @@ -74,9 +76,10 @@ public class Node { private final RevisionManager revisionManager; private NodeClusterCoordinator clusterCoordinator; - private CuratorNodeProtocolSender protocolSender; + private NodeProtocolSender protocolSender; private FlowController flowController; private StandardFlowService flowService; + private LeaderElectionManager electionManager; private ProtocolListener protocolListener; @@ -97,6 +100,8 @@ public class Node { revisionManager = Mockito.mock(RevisionManager.class); Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.<Revision> emptyList()); + + electionManager = new CuratorLeaderElectionManager(4, nodeProperties); } @@ -110,7 +115,8 @@ public class Node { final HeartbeatMonitor heartbeatMonitor = createHeartbeatMonitor(); flowController = FlowController.createClusteredInstance(Mockito.mock(FlowFileEventRepository.class), nodeProperties, - null, null, StringEncryptor.createEncryptor(), protocolSender, Mockito.mock(BulletinRepository.class), clusterCoordinator, heartbeatMonitor, VariableRegistry.EMPTY_REGISTRY); + null, null, StringEncryptor.createEncryptor(), protocolSender, Mockito.mock(BulletinRepository.class), clusterCoordinator, + heartbeatMonitor, electionManager, VariableRegistry.EMPTY_REGISTRY); try { flowController.initializeFlow(); @@ -195,23 +201,18 @@ public class Node { } } - public Set<String> getRoles() { - final NodeConnectionStatus status = getConnectionStatus(); - return status == null ? Collections.emptySet() : status.getRoles(); - } - public NodeConnectionStatus getConnectionStatus() { return clusterCoordinator.getConnectionStatus(nodeId); } @SuppressWarnings("unchecked") - private CuratorNodeProtocolSender createNodeProtocolSender() { + private NodeProtocolSender createNodeProtocolSender() { final SocketConfiguration socketConfig = new SocketConfiguration(); socketConfig.setSocketTimeout(3000); socketConfig.setReuseAddress(true); final ProtocolContext<ProtocolMessage> protocolContext = new JaxbProtocolContext<>(JaxbProtocolUtils.JAXB_CONTEXT); - final CuratorNodeProtocolSender protocolSender = new CuratorNodeProtocolSender(socketConfig, protocolContext, nodeProperties); + final NodeProtocolSender protocolSender = new LeaderElectionNodeProtocolSender(socketConfig, protocolContext, electionManager); return protocolSender; } @@ -250,11 +251,11 @@ public class Node { } final ClusterCoordinationProtocolSenderListener protocolSenderListener = new ClusterCoordinationProtocolSenderListener(createCoordinatorProtocolSender(), protocolListener); - return new NodeClusterCoordinator(protocolSenderListener, eventReporter, null, revisionManager, nodeProperties); + return new NodeClusterCoordinator(protocolSenderListener, eventReporter, electionManager, null, revisionManager); } - public ClusterCoordinator getClusterCoordinator() { + public NodeClusterCoordinator getClusterCoordinator() { return clusterCoordinator; } @@ -278,8 +279,22 @@ public class Node { ClusterUtils.waitUntilConditionMet(time, timeUnit, () -> isConnected()); } + private String getClusterAddress() { + final InetSocketAddress address = nodeProperties.getClusterNodeProtocolAddress(); + return address.getHostName() + ":" + address.getPort(); + } + + public boolean hasRole(final String roleName) { + final String leaderAddress = electionManager.getLeader(roleName); + if (leaderAddress == null) { + return false; + } + + return leaderAddress.equals(getClusterAddress()); + } + public void waitUntilElectedForRole(final String roleName, final long time, final TimeUnit timeUnit) { - ClusterUtils.waitUntilConditionMet(time, timeUnit, () -> getRoles().contains(roleName)); + ClusterUtils.waitUntilConditionMet(time, timeUnit, () -> hasRole(roleName)); } // Assertions @@ -292,7 +307,8 @@ public class Node { */ public void assertNodeConnects(final NodeIdentifier nodeId, final long time, final TimeUnit timeUnit) { ClusterUtils.waitUntilConditionMet(time, timeUnit, - () -> getClusterCoordinator().getConnectionStatus(nodeId).getState() == NodeConnectionState.CONNECTED); + () -> getClusterCoordinator().getConnectionStatus(nodeId).getState() == NodeConnectionState.CONNECTED, + () -> "Connection Status is " + getClusterCoordinator().getConnectionStatus(nodeId).toString()); } @@ -305,7 +321,8 @@ public class Node { */ public void assertNodeDisconnects(final NodeIdentifier nodeId, final long time, final TimeUnit timeUnit) { ClusterUtils.waitUntilConditionMet(time, timeUnit, - () -> getClusterCoordinator().getConnectionStatus(nodeId).getState() == NodeConnectionState.DISCONNECTED); + () -> getClusterCoordinator().getConnectionStatus(nodeId).getState() == NodeConnectionState.DISCONNECTED, + () -> "Connection Status is " + getClusterCoordinator().getConnectionStatus(nodeId).toString()); }
