This is an automated email from the ASF dual-hosted git repository.

mcgilman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new b004f1f  NIFI-6779: When resolving node identifier, if NodeIdentifier 
has a different value for hostname/port for web api or cluster protocol, assume 
that node is correct about itself instead of assuming that Cluster Coordinator 
knows best about the other node NIFI-6779: Remove any conflicting Node 
Identifiers when a new Node ID is encountered
b004f1f is described below

commit b004f1f94c5857e89d4410153f43dc0000709ff5
Author: Mark Payne <[email protected]>
AuthorDate: Tue Oct 15 15:51:46 2019 -0400

    NIFI-6779: When resolving node identifier, if NodeIdentifier has a 
different value for hostname/port for web api or cluster protocol, assume that 
node is correct about itself instead of assuming that Cluster Coordinator knows 
best about the other node
    NIFI-6779: Remove any conflicting Node Identifiers when a new Node ID is 
encountered
    
    This closes #3819
---
 .../heartbeat/AbstractHeartbeatMonitor.java        |  28 ++++-
 .../coordination/node/NodeClusterCoordinator.java  | 114 +++++++++++----------
 .../node/TestNodeClusterCoordinator.java           |   3 +-
 3 files changed, 90 insertions(+), 55 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
index 86b4cc1..53fefb4 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.cluster.coordination.heartbeat;
 
 import org.apache.nifi.cluster.coordination.ClusterCoordinator;
+import org.apache.nifi.cluster.coordination.ClusterTopologyEventListener;
 import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
 import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
 import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
@@ -49,6 +50,11 @@ public abstract class AbstractHeartbeatMonitor implements 
HeartbeatMonitor {
         final String heartbeatInterval = 
nifiProperties.getProperty(NiFiProperties.CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL,
                 NiFiProperties.DEFAULT_CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL);
         this.heartbeatIntervalMillis = (int) 
FormatUtils.getTimeDuration(heartbeatInterval, TimeUnit.MILLISECONDS);
+
+        // Register an event listener so that if any nodes are removed, we 
also remove the heartbeat.
+        // Otherwise, we'll have a condition where a node is removed from the 
Cluster Coordinator, but its heartbeat has already been received.
+        // As a result, when it is processed, we will ask the node to 
reconnect, adding it back to the cluster.
+        clusterCoordinator.registerEventListener(new 
ClusterChangeEventListener());
     }
 
     @Override
@@ -207,7 +213,8 @@ public abstract class AbstractHeartbeatMonitor implements 
HeartbeatMonitor {
         final NodeConnectionStatus connectionStatus = 
clusterCoordinator.getConnectionStatus(nodeId);
         if (connectionStatus == null) {
             // Unknown node. Issue reconnect request
-            clusterCoordinator.reportEvent(nodeId, Severity.INFO, "Received 
heartbeat from unknown node. Removing heartbeat and requesting that node 
connect to cluster.");
+            clusterCoordinator.reportEvent(nodeId, Severity.INFO,
+                "Received heartbeat from unknown node " + 
nodeId.getFullDescription() + ". Removing heartbeat and requesting that node 
connect to cluster.");
             removeHeartbeat(nodeId);
 
             clusterCoordinator.requestNodeConnect(nodeId, null);
@@ -311,4 +318,23 @@ public abstract class AbstractHeartbeatMonitor implements 
HeartbeatMonitor {
      */
     protected void onStop() {
     }
+
+    private class ClusterChangeEventListener implements 
ClusterTopologyEventListener {
+        @Override
+        public void onNodeAdded(final NodeIdentifier nodeId) {
+        }
+
+        @Override
+        public void onNodeRemoved(final NodeIdentifier nodeId) {
+            AbstractHeartbeatMonitor.this.removeHeartbeat(nodeId);
+        }
+
+        @Override
+        public void onLocalNodeIdentifierSet(final NodeIdentifier localNodeId) 
{
+        }
+
+        @Override
+        public void onNodeStateChange(final NodeIdentifier nodeId, final 
NodeConnectionState newState) {
+        }
+    }
 }
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
index aec3a7a..888f970 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
@@ -22,7 +22,6 @@ import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.commons.collections4.queue.CircularFifoQueue;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.parameter.ParameterLookup;
 import org.apache.nifi.cluster.coordination.ClusterCoordinator;
 import org.apache.nifi.cluster.coordination.ClusterTopologyEventListener;
 import org.apache.nifi.cluster.coordination.flow.FlowElection;
@@ -66,6 +65,7 @@ import 
org.apache.nifi.controller.leader.election.LeaderElectionManager;
 import org.apache.nifi.controller.state.manager.StandardStateManagerProvider;
 import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.nar.ExtensionManager;
+import org.apache.nifi.parameter.ParameterLookup;
 import org.apache.nifi.registry.VariableRegistry;
 import org.apache.nifi.reporting.Severity;
 import org.apache.nifi.services.FlowService;
@@ -118,8 +118,8 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
     private volatile boolean closed = false;
     private volatile boolean requireElection = true;
 
-    private final ConcurrentMap<NodeIdentifier, NodeConnectionStatus> 
nodeStatuses = new ConcurrentHashMap<>();
-    private final ConcurrentMap<NodeIdentifier, CircularFifoQueue<NodeEvent>> 
nodeEvents = new ConcurrentHashMap<>();
+    private final ConcurrentMap<String, NodeConnectionStatus> nodeStatuses = 
new ConcurrentHashMap<>();
+    private final ConcurrentMap<String, CircularFifoQueue<NodeEvent>> 
nodeEvents = new ConcurrentHashMap<>();
 
     private final List<ClusterTopologyEventListener> eventListeners = new 
CopyOnWriteArrayList<>();
 
@@ -257,7 +257,7 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
         }
 
         this.nodeId = nodeId;
-        nodeStatuses.computeIfAbsent(nodeId, id -> new 
NodeConnectionStatus(id, DisconnectionCode.NOT_YET_CONNECTED));
+        nodeStatuses.computeIfAbsent(nodeId.getId(), id -> new 
NodeConnectionStatus(nodeId, DisconnectionCode.NOT_YET_CONNECTED));
         eventListeners.forEach(listener -> 
listener.onLocalNodeIdentifierSet(nodeId));
     }
 
@@ -316,8 +316,8 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
     }
 
     private NodeConnectionStatus removeNode(final NodeIdentifier nodeId) {
-        final NodeConnectionStatus status = nodeStatuses.remove(nodeId);
-        nodeEvents.remove(nodeId);
+        final NodeConnectionStatus status = 
nodeStatuses.remove(nodeId.getId());
+        nodeEvents.remove(nodeId.getId());
         if (status != null) {
             onNodeRemoved(nodeId);
         }
@@ -326,9 +326,9 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
     }
 
     private boolean removeNodeConditionally(final NodeIdentifier nodeId, final 
NodeConnectionStatus expectedStatus) {
-        final boolean removed = nodeStatuses.remove(nodeId, expectedStatus);
+        final boolean removed = nodeStatuses.remove(nodeId.getId(), 
expectedStatus);
         if (removed) {
-            nodeEvents.remove(nodeId);
+            nodeEvents.remove(nodeId.getId());
             onNodeRemoved(nodeId);
         }
 
@@ -340,7 +340,7 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
     }
 
     private NodeConnectionStatus updateNodeStatus(final NodeIdentifier nodeId, 
final NodeConnectionStatus updatedStatus, final boolean storeState) {
-        final NodeConnectionStatus evictedStatus = nodeStatuses.put(nodeId, 
updatedStatus);
+        final NodeConnectionStatus evictedStatus = 
nodeStatuses.put(nodeId.getId(), updatedStatus);
         if (evictedStatus == null) {
             onNodeAdded(nodeId, storeState);
         } else {
@@ -353,14 +353,14 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
     private boolean updateNodeStatusConditionally(final NodeIdentifier nodeId, 
final NodeConnectionStatus expectedStatus, final NodeConnectionStatus 
updatedStatus) {
         final boolean updated;
         if (expectedStatus == null) {
-            final NodeConnectionStatus existingValue = 
nodeStatuses.putIfAbsent(nodeId, updatedStatus);
+            final NodeConnectionStatus existingValue = 
nodeStatuses.putIfAbsent(nodeId.getId(), updatedStatus);
             updated = existingValue == null;
 
             if (updated) {
                 onNodeAdded(nodeId, true);
             }
         } else {
-            updated = nodeStatuses.replace(nodeId, expectedStatus, 
updatedStatus);
+            updated = nodeStatuses.replace(nodeId.getId(), expectedStatus, 
updatedStatus);
         }
 
         if (updated) {
@@ -597,7 +597,7 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
 
     @Override
     public NodeConnectionStatus getConnectionStatus(final NodeIdentifier 
nodeId) {
-        return nodeStatuses.get(nodeId);
+        return nodeStatuses.get(nodeId.getId());
     }
 
     private NodeConnectionState getConnectionState(final NodeIdentifier 
nodeId) {
@@ -613,10 +613,10 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
     @Override
     public Map<NodeConnectionState, List<NodeIdentifier>> 
getConnectionStates() {
         final Map<NodeConnectionState, List<NodeIdentifier>> connectionStates 
= new HashMap<>();
-        for (final Map.Entry<NodeIdentifier, NodeConnectionStatus> entry : 
nodeStatuses.entrySet()) {
+        for (final Map.Entry<String, NodeConnectionStatus> entry : 
nodeStatuses.entrySet()) {
             final NodeConnectionState state = entry.getValue().getState();
             final List<NodeIdentifier> nodeIds = 
connectionStates.computeIfAbsent(state, s -> new ArrayList<>());
-            nodeIds.add(entry.getKey());
+            nodeIds.add(entry.getValue().getNodeIdentifier());
         }
 
         return connectionStates;
@@ -660,13 +660,8 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
 
     @Override
     public NodeIdentifier getNodeIdentifier(final String uuid) {
-        for (final NodeIdentifier nodeId : nodeStatuses.keySet()) {
-            if (nodeId.getId().equals(uuid)) {
-                return nodeId;
-            }
-        }
-
-        return null;
+        final NodeConnectionStatus status = nodeStatuses.get(uuid);
+        return status == null ? null : status.getNodeIdentifier();
     }
 
     @Override
@@ -684,7 +679,7 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
 
         return nodeStatuses.entrySet().stream()
                 .filter(entry -> 
statesOfInterest.contains(entry.getValue().getState()))
-                .map(entry -> entry.getKey())
+                .map(entry -> entry.getValue().getNodeIdentifier())
                 .collect(Collectors.toSet());
     }
 
@@ -695,7 +690,8 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
             return null;
         }
 
-        return nodeStatuses.keySet().stream()
+        return nodeStatuses.values().stream()
+                .map(NodeConnectionStatus::getNodeIdentifier)
                 .filter(nodeId -> 
primaryNodeAddress.equals(nodeId.getSocketAddress() + ":" + 
nodeId.getSocketPort()))
                 .findFirst()
                 .orElse(null);
@@ -762,7 +758,7 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
                     return null;
                 }
 
-                final NodeConnectionStatus existingStatus = 
this.nodeStatuses.putIfAbsent(connectionStatus.getNodeIdentifier(), 
connectionStatus);
+                final NodeConnectionStatus existingStatus = 
this.nodeStatuses.putIfAbsent(connectionStatus.getNodeIdentifier().getId(), 
connectionStatus);
                 if (existingStatus == null) {
                     onNodeAdded(connectionStatus.getNodeIdentifier(), true);
                     return connectionStatus.getNodeIdentifier();
@@ -789,7 +785,7 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
 
     @Override
     public List<NodeEvent> getNodeEvents(final NodeIdentifier nodeId) {
-        final CircularFifoQueue<NodeEvent> eventQueue = nodeEvents.get(nodeId);
+        final CircularFifoQueue<NodeEvent> eventQueue = 
nodeEvents.get(nodeId.getId());
         if (eventQueue == null) {
             return Collections.emptyList();
         }
@@ -813,7 +809,7 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
 
     private void addNodeEvent(final NodeIdentifier nodeId, final Severity 
severity, final String message) {
         final NodeEvent event = new Event(nodeId.toString(), message, 
severity);
-        final CircularFifoQueue<NodeEvent> eventQueue = 
nodeEvents.computeIfAbsent(nodeId, id -> new CircularFifoQueue<>());
+        final CircularFifoQueue<NodeEvent> eventQueue = 
nodeEvents.computeIfAbsent(nodeId.getId(), id -> new CircularFifoQueue<>());
         synchronized (eventQueue) {
             eventQueue.add(event);
         }
@@ -1032,7 +1028,7 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
         final NodeConnectionStatusResponseMessage msg = new 
NodeConnectionStatusResponseMessage();
         final NodeIdentifier self = getLocalNodeIdentifier();
         if (self != null) {
-            final NodeConnectionStatus connectionStatus = 
nodeStatuses.get(self);
+            final NodeConnectionStatus connectionStatus = 
nodeStatuses.get(self.getId());
             msg.setNodeConnectionStatus(connectionStatus);
         }
 
@@ -1059,7 +1055,7 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
         final NodeIdentifier nodeId = statusChangeMessage.getNodeId();
         logger.debug("Handling request {}", statusChangeMessage);
 
-        final NodeConnectionStatus oldStatus = 
nodeStatuses.get(statusChangeMessage.getNodeId());
+        final NodeConnectionStatus oldStatus = 
nodeStatuses.get(statusChangeMessage.getNodeId().getId());
 
         // Either remove the value from the map or update the map depending on 
the connection state
         if (statusChangeMessage.getNodeConnectionStatus().getState() == 
NodeConnectionState.REMOVED) {
@@ -1104,45 +1100,59 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
         return !requireElection || flowElection.isElectionComplete();
     }
 
-    private NodeIdentifier resolveNodeId(final NodeIdentifier 
proposedIdentifier) {
-        final NodeConnectionStatus proposedConnectionStatus = new 
NodeConnectionStatus(proposedIdentifier, DisconnectionCode.NOT_YET_CONNECTED);
-        final NodeConnectionStatus existingStatus = 
nodeStatuses.putIfAbsent(proposedIdentifier, proposedConnectionStatus);
+    private void registerNodeId(final NodeIdentifier nodeIdentifier) {
+        final NodeConnectionStatus proposedConnectionStatus = new 
NodeConnectionStatus(nodeIdentifier, DisconnectionCode.NOT_YET_CONNECTED);
+        final NodeConnectionStatus existingStatus = 
nodeStatuses.putIfAbsent(nodeIdentifier.getId(), proposedConnectionStatus);
+
+        removeConflictingNodeIds(nodeIdentifier);
 
-        NodeIdentifier resolvedNodeId = proposedIdentifier;
         if (existingStatus == null) {
             // there is no node with that ID
-            resolvedNodeId = proposedIdentifier;
-            logger.debug("No existing node with ID {}; resolved node ID is 
as-proposed", proposedIdentifier.getFullDescription());
-            onNodeAdded(resolvedNodeId, true);
-        } else if 
(existingStatus.getNodeIdentifier().logicallyEquals(proposedIdentifier)) {
-            // there is a node with that ID but it's the same node.
-            resolvedNodeId = proposedIdentifier;
-            logger.debug("A node already exists with ID {} and is logically 
equivalent; resolved node ID is as-proposed: {}", proposedIdentifier.getId(), 
proposedIdentifier.getFullDescription());
+            logger.info("No existing node with ID {}; will add Node as {}", 
nodeIdentifier.getId(), nodeIdentifier.getFullDescription());
+            logger.debug("After adding node {}, node statuses are {}", 
nodeIdentifier, nodeStatuses);
+
+            onNodeAdded(nodeIdentifier, true);
         } else {
-            // there is a node with that ID and it's a different node
-            resolvedNodeId = new NodeIdentifier(UUID.randomUUID().toString(), 
proposedIdentifier.getApiAddress(), proposedIdentifier.getApiPort(),
-                    proposedIdentifier.getSocketAddress(), 
proposedIdentifier.getSocketPort(), proposedIdentifier.getLoadBalanceAddress(), 
proposedIdentifier.getLoadBalancePort(),
-                    proposedIdentifier.getSiteToSiteAddress(), 
proposedIdentifier.getSiteToSitePort(), 
proposedIdentifier.getSiteToSiteHttpApiPort(), 
proposedIdentifier.isSiteToSiteSecure());
+            // there is a node with that ID but it's the same node.
+            logger.debug("A node already exists with ID {}; existing Node 
Identifier is: {}", nodeIdentifier.getId(), 
existingStatus.getNodeIdentifier().getFullDescription());
+        }
+    }
 
-            logger.debug("A node already exists with ID {}. Proposed Node 
Identifier was {}; existing Node Identifier is {}; Resolved Node Identifier is 
{}",
-                    proposedIdentifier.getId(), 
proposedIdentifier.getFullDescription(), 
getNodeIdentifier(proposedIdentifier.getId()).getFullDescription(), 
resolvedNodeId.getFullDescription());
+    private void removeConflictingNodeIds(final NodeIdentifier nodeIdentifier) 
{
+        final Set<NodeIdentifier> conflictingNodeIds = 
findConflictingNodeIds(nodeIdentifier);
+        if (!conflictingNodeIds.isEmpty()) {
+            final Set<String> fullNodeIdDescriptions = 
conflictingNodeIds.stream()
+                .map(NodeIdentifier::getFullDescription)
+                .collect(Collectors.toSet());
+
+            logger.warn("New Node {} was registered for this cluster, but this 
Node Identifier conflicts with {} others: {}; " +
+                "each of these conflicting Node Identifiers will be removed 
from the cluster",
+                nodeIdentifier.getFullDescription(), 
fullNodeIdDescriptions.size(), fullNodeIdDescriptions);
+
+            conflictingNodeIds.forEach(uuid -> removeNode(uuid));
         }
+    }
 
-        return resolvedNodeId;
+    private Set<NodeIdentifier> findConflictingNodeIds(final NodeIdentifier 
nodeId) {
+        return nodeStatuses.values().stream()
+            .map(NodeConnectionStatus::getNodeIdentifier)
+            .filter(potential -> !potential.equals(nodeId))
+            .filter(nodeId::logicallyEquals)
+            .collect(Collectors.toSet());
     }
 
     private ConnectionResponseMessage handleConnectionRequest(final 
ConnectionRequestMessage requestMessage, final Set<String> nodeIdentities) {
-        final NodeIdentifier proposedIdentifier = 
requestMessage.getConnectionRequest().getProposedNodeIdentifier();
-        final NodeIdentifier withNodeIdentities = 
addNodeIdentities(proposedIdentifier, nodeIdentities);
+        final NodeIdentifier nodeIdentifier = 
requestMessage.getConnectionRequest().getProposedNodeIdentifier();
+        final NodeIdentifier withNodeIdentities = 
addNodeIdentities(nodeIdentifier, nodeIdentities);
         final DataFlow dataFlow = 
requestMessage.getConnectionRequest().getDataFlow();
         final ConnectionRequest requestWithNodeIdentities = new 
ConnectionRequest(withNodeIdentities, dataFlow);
 
         // Resolve Node identifier.
-        final NodeIdentifier resolvedNodeId = 
resolveNodeId(proposedIdentifier);
+        registerNodeId(nodeIdentifier);
 
         if (isBlockedByFirewall(nodeIdentities)) {
             // if the socket address is not listed in the firewall, then 
return a null response
-            logger.info("Firewall blocked connection request from node " + 
resolvedNodeId + " with Node Identities " + nodeIdentities);
+            logger.info("Firewall blocked connection request from node " + 
nodeIdentifier + " with Node Identities " + nodeIdentities);
             final ConnectionResponse response = 
ConnectionResponse.createBlockedByFirewallResponse();
             final ConnectionResponseMessage responseMessage = new 
ConnectionResponseMessage();
             responseMessage.setConnectionResponse(response);
@@ -1156,12 +1166,12 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
                 return createFlowElectionInProgressResponse();
             } else {
                 logger.info("Received Connection Request from {}; responding 
with DataFlow that was elected", withNodeIdentities);
-                return createConnectionResponse(requestWithNodeIdentities, 
resolvedNodeId, electedDataFlow);
+                return createConnectionResponse(requestWithNodeIdentities, 
nodeIdentifier, electedDataFlow);
             }
         }
 
         logger.info("Received Connection Request from {}; responding with my 
DataFlow", withNodeIdentities);
-        return createConnectionResponse(requestWithNodeIdentities, 
resolvedNodeId);
+        return createConnectionResponse(requestWithNodeIdentities, 
nodeIdentifier);
     }
 
     private ConnectionResponseMessage createFlowElectionInProgressResponse() {
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
index b266ba9..1bca6fb 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
@@ -58,7 +58,6 @@ import java.util.stream.Collectors;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
@@ -443,7 +442,7 @@ public class TestNodeClusterCoordinator {
         assertTrue(conflictingResponse instanceof ConnectionResponseMessage);
         final ConnectionResponseMessage conflictingResponseMessage = 
(ConnectionResponseMessage) conflictingResponse;
         final NodeIdentifier conflictingNodeId = 
conflictingResponseMessage.getConnectionResponse().getNodeIdentifier();
-        assertNotSame(id1.getId(), conflictingNodeId.getId());
+        assertEquals(id1.getId(), conflictingNodeId.getId());
         assertEquals(conflictingId.getApiAddress(), 
conflictingNodeId.getApiAddress());
         assertEquals(conflictingId.getApiPort(), 
conflictingNodeId.getApiPort());
         assertEquals(conflictingId.getSiteToSiteAddress(), 
conflictingNodeId.getSiteToSiteAddress());

Reply via email to