This is an automated email from the ASF dual-hosted git repository.
mcgilman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new b004f1f NIFI-6779: When resolving node identifier, if NodeIdentifier
has a different value for hostname/port for web api or cluster protocol, assume
that node is correct about itself instead of assuming that Cluster Coordinator
knows best about the other node NIFI-6779: Remove any conflicting Node
Identifiers when a new Node ID is encountered
b004f1f is described below
commit b004f1f94c5857e89d4410153f43dc0000709ff5
Author: Mark Payne <[email protected]>
AuthorDate: Tue Oct 15 15:51:46 2019 -0400
NIFI-6779: When resolving node identifier, if NodeIdentifier has a
different value for hostname/port for web api or cluster protocol, assume that
node is correct about itself instead of assuming that Cluster Coordinator knows
best about the other node
NIFI-6779: Remove any conflicting Node Identifiers when a new Node ID is
encountered
This closes #3819
---
.../heartbeat/AbstractHeartbeatMonitor.java | 28 ++++-
.../coordination/node/NodeClusterCoordinator.java | 114 +++++++++++----------
.../node/TestNodeClusterCoordinator.java | 3 +-
3 files changed, 90 insertions(+), 55 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
index 86b4cc1..53fefb4 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
@@ -17,6 +17,7 @@
package org.apache.nifi.cluster.coordination.heartbeat;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
+import org.apache.nifi.cluster.coordination.ClusterTopologyEventListener;
import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
@@ -49,6 +50,11 @@ public abstract class AbstractHeartbeatMonitor implements
HeartbeatMonitor {
final String heartbeatInterval =
nifiProperties.getProperty(NiFiProperties.CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL,
NiFiProperties.DEFAULT_CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL);
this.heartbeatIntervalMillis = (int)
FormatUtils.getTimeDuration(heartbeatInterval, TimeUnit.MILLISECONDS);
+
+ // Register an event listener so that if any nodes are removed, we
also remove the heartbeat.
+ // Otherwise, we'll have a condition where a node is removed from the
Cluster Coordinator, but its heartbeat has already been received.
+ // As a result, when it is processed, we will ask the node to
reconnect, adding it back to the cluster.
+ clusterCoordinator.registerEventListener(new
ClusterChangeEventListener());
}
@Override
@@ -207,7 +213,8 @@ public abstract class AbstractHeartbeatMonitor implements
HeartbeatMonitor {
final NodeConnectionStatus connectionStatus =
clusterCoordinator.getConnectionStatus(nodeId);
if (connectionStatus == null) {
// Unknown node. Issue reconnect request
- clusterCoordinator.reportEvent(nodeId, Severity.INFO, "Received
heartbeat from unknown node. Removing heartbeat and requesting that node
connect to cluster.");
+ clusterCoordinator.reportEvent(nodeId, Severity.INFO,
+ "Received heartbeat from unknown node " +
nodeId.getFullDescription() + ". Removing heartbeat and requesting that node
connect to cluster.");
removeHeartbeat(nodeId);
clusterCoordinator.requestNodeConnect(nodeId, null);
@@ -311,4 +318,23 @@ public abstract class AbstractHeartbeatMonitor implements
HeartbeatMonitor {
*/
protected void onStop() {
}
+
+ private class ClusterChangeEventListener implements
ClusterTopologyEventListener {
+ @Override
+ public void onNodeAdded(final NodeIdentifier nodeId) {
+ }
+
+ @Override
+ public void onNodeRemoved(final NodeIdentifier nodeId) {
+ AbstractHeartbeatMonitor.this.removeHeartbeat(nodeId);
+ }
+
+ @Override
+ public void onLocalNodeIdentifierSet(final NodeIdentifier localNodeId)
{
+ }
+
+ @Override
+ public void onNodeStateChange(final NodeIdentifier nodeId, final
NodeConnectionState newState) {
+ }
+ }
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
index aec3a7a..888f970 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
@@ -22,7 +22,6 @@ import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.collections4.queue.CircularFifoQueue;
import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.parameter.ParameterLookup;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.ClusterTopologyEventListener;
import org.apache.nifi.cluster.coordination.flow.FlowElection;
@@ -66,6 +65,7 @@ import
org.apache.nifi.controller.leader.election.LeaderElectionManager;
import org.apache.nifi.controller.state.manager.StandardStateManagerProvider;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.nar.ExtensionManager;
+import org.apache.nifi.parameter.ParameterLookup;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.services.FlowService;
@@ -118,8 +118,8 @@ public class NodeClusterCoordinator implements
ClusterCoordinator, ProtocolHandl
private volatile boolean closed = false;
private volatile boolean requireElection = true;
- private final ConcurrentMap<NodeIdentifier, NodeConnectionStatus>
nodeStatuses = new ConcurrentHashMap<>();
- private final ConcurrentMap<NodeIdentifier, CircularFifoQueue<NodeEvent>>
nodeEvents = new ConcurrentHashMap<>();
+ private final ConcurrentMap<String, NodeConnectionStatus> nodeStatuses =
new ConcurrentHashMap<>();
+ private final ConcurrentMap<String, CircularFifoQueue<NodeEvent>>
nodeEvents = new ConcurrentHashMap<>();
private final List<ClusterTopologyEventListener> eventListeners = new
CopyOnWriteArrayList<>();
@@ -257,7 +257,7 @@ public class NodeClusterCoordinator implements
ClusterCoordinator, ProtocolHandl
}
this.nodeId = nodeId;
- nodeStatuses.computeIfAbsent(nodeId, id -> new
NodeConnectionStatus(id, DisconnectionCode.NOT_YET_CONNECTED));
+ nodeStatuses.computeIfAbsent(nodeId.getId(), id -> new
NodeConnectionStatus(nodeId, DisconnectionCode.NOT_YET_CONNECTED));
eventListeners.forEach(listener ->
listener.onLocalNodeIdentifierSet(nodeId));
}
@@ -316,8 +316,8 @@ public class NodeClusterCoordinator implements
ClusterCoordinator, ProtocolHandl
}
private NodeConnectionStatus removeNode(final NodeIdentifier nodeId) {
- final NodeConnectionStatus status = nodeStatuses.remove(nodeId);
- nodeEvents.remove(nodeId);
+ final NodeConnectionStatus status =
nodeStatuses.remove(nodeId.getId());
+ nodeEvents.remove(nodeId.getId());
if (status != null) {
onNodeRemoved(nodeId);
}
@@ -326,9 +326,9 @@ public class NodeClusterCoordinator implements
ClusterCoordinator, ProtocolHandl
}
private boolean removeNodeConditionally(final NodeIdentifier nodeId, final
NodeConnectionStatus expectedStatus) {
- final boolean removed = nodeStatuses.remove(nodeId, expectedStatus);
+ final boolean removed = nodeStatuses.remove(nodeId.getId(),
expectedStatus);
if (removed) {
- nodeEvents.remove(nodeId);
+ nodeEvents.remove(nodeId.getId());
onNodeRemoved(nodeId);
}
@@ -340,7 +340,7 @@ public class NodeClusterCoordinator implements
ClusterCoordinator, ProtocolHandl
}
private NodeConnectionStatus updateNodeStatus(final NodeIdentifier nodeId,
final NodeConnectionStatus updatedStatus, final boolean storeState) {
- final NodeConnectionStatus evictedStatus = nodeStatuses.put(nodeId,
updatedStatus);
+ final NodeConnectionStatus evictedStatus =
nodeStatuses.put(nodeId.getId(), updatedStatus);
if (evictedStatus == null) {
onNodeAdded(nodeId, storeState);
} else {
@@ -353,14 +353,14 @@ public class NodeClusterCoordinator implements
ClusterCoordinator, ProtocolHandl
private boolean updateNodeStatusConditionally(final NodeIdentifier nodeId,
final NodeConnectionStatus expectedStatus, final NodeConnectionStatus
updatedStatus) {
final boolean updated;
if (expectedStatus == null) {
- final NodeConnectionStatus existingValue =
nodeStatuses.putIfAbsent(nodeId, updatedStatus);
+ final NodeConnectionStatus existingValue =
nodeStatuses.putIfAbsent(nodeId.getId(), updatedStatus);
updated = existingValue == null;
if (updated) {
onNodeAdded(nodeId, true);
}
} else {
- updated = nodeStatuses.replace(nodeId, expectedStatus,
updatedStatus);
+ updated = nodeStatuses.replace(nodeId.getId(), expectedStatus,
updatedStatus);
}
if (updated) {
@@ -597,7 +597,7 @@ public class NodeClusterCoordinator implements
ClusterCoordinator, ProtocolHandl
@Override
public NodeConnectionStatus getConnectionStatus(final NodeIdentifier
nodeId) {
- return nodeStatuses.get(nodeId);
+ return nodeStatuses.get(nodeId.getId());
}
private NodeConnectionState getConnectionState(final NodeIdentifier
nodeId) {
@@ -613,10 +613,10 @@ public class NodeClusterCoordinator implements
ClusterCoordinator, ProtocolHandl
@Override
public Map<NodeConnectionState, List<NodeIdentifier>>
getConnectionStates() {
final Map<NodeConnectionState, List<NodeIdentifier>> connectionStates
= new HashMap<>();
- for (final Map.Entry<NodeIdentifier, NodeConnectionStatus> entry :
nodeStatuses.entrySet()) {
+ for (final Map.Entry<String, NodeConnectionStatus> entry :
nodeStatuses.entrySet()) {
final NodeConnectionState state = entry.getValue().getState();
final List<NodeIdentifier> nodeIds =
connectionStates.computeIfAbsent(state, s -> new ArrayList<>());
- nodeIds.add(entry.getKey());
+ nodeIds.add(entry.getValue().getNodeIdentifier());
}
return connectionStates;
@@ -660,13 +660,8 @@ public class NodeClusterCoordinator implements
ClusterCoordinator, ProtocolHandl
@Override
public NodeIdentifier getNodeIdentifier(final String uuid) {
- for (final NodeIdentifier nodeId : nodeStatuses.keySet()) {
- if (nodeId.getId().equals(uuid)) {
- return nodeId;
- }
- }
-
- return null;
+ final NodeConnectionStatus status = nodeStatuses.get(uuid);
+ return status == null ? null : status.getNodeIdentifier();
}
@Override
@@ -684,7 +679,7 @@ public class NodeClusterCoordinator implements
ClusterCoordinator, ProtocolHandl
return nodeStatuses.entrySet().stream()
.filter(entry ->
statesOfInterest.contains(entry.getValue().getState()))
- .map(entry -> entry.getKey())
+ .map(entry -> entry.getValue().getNodeIdentifier())
.collect(Collectors.toSet());
}
@@ -695,7 +690,8 @@ public class NodeClusterCoordinator implements
ClusterCoordinator, ProtocolHandl
return null;
}
- return nodeStatuses.keySet().stream()
+ return nodeStatuses.values().stream()
+ .map(NodeConnectionStatus::getNodeIdentifier)
.filter(nodeId ->
primaryNodeAddress.equals(nodeId.getSocketAddress() + ":" +
nodeId.getSocketPort()))
.findFirst()
.orElse(null);
@@ -762,7 +758,7 @@ public class NodeClusterCoordinator implements
ClusterCoordinator, ProtocolHandl
return null;
}
- final NodeConnectionStatus existingStatus =
this.nodeStatuses.putIfAbsent(connectionStatus.getNodeIdentifier(),
connectionStatus);
+ final NodeConnectionStatus existingStatus =
this.nodeStatuses.putIfAbsent(connectionStatus.getNodeIdentifier().getId(),
connectionStatus);
if (existingStatus == null) {
onNodeAdded(connectionStatus.getNodeIdentifier(), true);
return connectionStatus.getNodeIdentifier();
@@ -789,7 +785,7 @@ public class NodeClusterCoordinator implements
ClusterCoordinator, ProtocolHandl
@Override
public List<NodeEvent> getNodeEvents(final NodeIdentifier nodeId) {
- final CircularFifoQueue<NodeEvent> eventQueue = nodeEvents.get(nodeId);
+ final CircularFifoQueue<NodeEvent> eventQueue =
nodeEvents.get(nodeId.getId());
if (eventQueue == null) {
return Collections.emptyList();
}
@@ -813,7 +809,7 @@ public class NodeClusterCoordinator implements
ClusterCoordinator, ProtocolHandl
private void addNodeEvent(final NodeIdentifier nodeId, final Severity
severity, final String message) {
final NodeEvent event = new Event(nodeId.toString(), message,
severity);
- final CircularFifoQueue<NodeEvent> eventQueue =
nodeEvents.computeIfAbsent(nodeId, id -> new CircularFifoQueue<>());
+ final CircularFifoQueue<NodeEvent> eventQueue =
nodeEvents.computeIfAbsent(nodeId.getId(), id -> new CircularFifoQueue<>());
synchronized (eventQueue) {
eventQueue.add(event);
}
@@ -1032,7 +1028,7 @@ public class NodeClusterCoordinator implements
ClusterCoordinator, ProtocolHandl
final NodeConnectionStatusResponseMessage msg = new
NodeConnectionStatusResponseMessage();
final NodeIdentifier self = getLocalNodeIdentifier();
if (self != null) {
- final NodeConnectionStatus connectionStatus =
nodeStatuses.get(self);
+ final NodeConnectionStatus connectionStatus =
nodeStatuses.get(self.getId());
msg.setNodeConnectionStatus(connectionStatus);
}
@@ -1059,7 +1055,7 @@ public class NodeClusterCoordinator implements
ClusterCoordinator, ProtocolHandl
final NodeIdentifier nodeId = statusChangeMessage.getNodeId();
logger.debug("Handling request {}", statusChangeMessage);
- final NodeConnectionStatus oldStatus =
nodeStatuses.get(statusChangeMessage.getNodeId());
+ final NodeConnectionStatus oldStatus =
nodeStatuses.get(statusChangeMessage.getNodeId().getId());
// Either remove the value from the map or update the map depending on
the connection state
if (statusChangeMessage.getNodeConnectionStatus().getState() ==
NodeConnectionState.REMOVED) {
@@ -1104,45 +1100,59 @@ public class NodeClusterCoordinator implements
ClusterCoordinator, ProtocolHandl
return !requireElection || flowElection.isElectionComplete();
}
- private NodeIdentifier resolveNodeId(final NodeIdentifier
proposedIdentifier) {
- final NodeConnectionStatus proposedConnectionStatus = new
NodeConnectionStatus(proposedIdentifier, DisconnectionCode.NOT_YET_CONNECTED);
- final NodeConnectionStatus existingStatus =
nodeStatuses.putIfAbsent(proposedIdentifier, proposedConnectionStatus);
+ private void registerNodeId(final NodeIdentifier nodeIdentifier) {
+ final NodeConnectionStatus proposedConnectionStatus = new
NodeConnectionStatus(nodeIdentifier, DisconnectionCode.NOT_YET_CONNECTED);
+ final NodeConnectionStatus existingStatus =
nodeStatuses.putIfAbsent(nodeIdentifier.getId(), proposedConnectionStatus);
+
+ removeConflictingNodeIds(nodeIdentifier);
- NodeIdentifier resolvedNodeId = proposedIdentifier;
if (existingStatus == null) {
// there is no node with that ID
- resolvedNodeId = proposedIdentifier;
- logger.debug("No existing node with ID {}; resolved node ID is
as-proposed", proposedIdentifier.getFullDescription());
- onNodeAdded(resolvedNodeId, true);
- } else if
(existingStatus.getNodeIdentifier().logicallyEquals(proposedIdentifier)) {
- // there is a node with that ID but it's the same node.
- resolvedNodeId = proposedIdentifier;
- logger.debug("A node already exists with ID {} and is logically
equivalent; resolved node ID is as-proposed: {}", proposedIdentifier.getId(),
proposedIdentifier.getFullDescription());
+ logger.info("No existing node with ID {}; will add Node as {}",
nodeIdentifier.getId(), nodeIdentifier.getFullDescription());
+ logger.debug("After adding node {}, node statuses are {}",
nodeIdentifier, nodeStatuses);
+
+ onNodeAdded(nodeIdentifier, true);
} else {
- // there is a node with that ID and it's a different node
- resolvedNodeId = new NodeIdentifier(UUID.randomUUID().toString(),
proposedIdentifier.getApiAddress(), proposedIdentifier.getApiPort(),
- proposedIdentifier.getSocketAddress(),
proposedIdentifier.getSocketPort(), proposedIdentifier.getLoadBalanceAddress(),
proposedIdentifier.getLoadBalancePort(),
- proposedIdentifier.getSiteToSiteAddress(),
proposedIdentifier.getSiteToSitePort(),
proposedIdentifier.getSiteToSiteHttpApiPort(),
proposedIdentifier.isSiteToSiteSecure());
+ // there is a node with that ID but it's the same node.
+ logger.debug("A node already exists with ID {}; existing Node
Identifier is: {}", nodeIdentifier.getId(),
existingStatus.getNodeIdentifier().getFullDescription());
+ }
+ }
- logger.debug("A node already exists with ID {}. Proposed Node
Identifier was {}; existing Node Identifier is {}; Resolved Node Identifier is
{}",
- proposedIdentifier.getId(),
proposedIdentifier.getFullDescription(),
getNodeIdentifier(proposedIdentifier.getId()).getFullDescription(),
resolvedNodeId.getFullDescription());
+ private void removeConflictingNodeIds(final NodeIdentifier nodeIdentifier)
{
+ final Set<NodeIdentifier> conflictingNodeIds =
findConflictingNodeIds(nodeIdentifier);
+ if (!conflictingNodeIds.isEmpty()) {
+ final Set<String> fullNodeIdDescriptions =
conflictingNodeIds.stream()
+ .map(NodeIdentifier::getFullDescription)
+ .collect(Collectors.toSet());
+
+ logger.warn("New Node {} was registered for this cluster, but this
Node Identifier conflicts with {} others: {}; " +
+ "each of these conflicting Node Identifiers will be removed
from the cluster",
+ nodeIdentifier.getFullDescription(),
fullNodeIdDescriptions.size(), fullNodeIdDescriptions);
+
+ conflictingNodeIds.forEach(uuid -> removeNode(uuid));
}
+ }
- return resolvedNodeId;
+ private Set<NodeIdentifier> findConflictingNodeIds(final NodeIdentifier
nodeId) {
+ return nodeStatuses.values().stream()
+ .map(NodeConnectionStatus::getNodeIdentifier)
+ .filter(potential -> !potential.equals(nodeId))
+ .filter(nodeId::logicallyEquals)
+ .collect(Collectors.toSet());
}
private ConnectionResponseMessage handleConnectionRequest(final
ConnectionRequestMessage requestMessage, final Set<String> nodeIdentities) {
- final NodeIdentifier proposedIdentifier =
requestMessage.getConnectionRequest().getProposedNodeIdentifier();
- final NodeIdentifier withNodeIdentities =
addNodeIdentities(proposedIdentifier, nodeIdentities);
+ final NodeIdentifier nodeIdentifier =
requestMessage.getConnectionRequest().getProposedNodeIdentifier();
+ final NodeIdentifier withNodeIdentities =
addNodeIdentities(nodeIdentifier, nodeIdentities);
final DataFlow dataFlow =
requestMessage.getConnectionRequest().getDataFlow();
final ConnectionRequest requestWithNodeIdentities = new
ConnectionRequest(withNodeIdentities, dataFlow);
// Resolve Node identifier.
- final NodeIdentifier resolvedNodeId =
resolveNodeId(proposedIdentifier);
+ registerNodeId(nodeIdentifier);
if (isBlockedByFirewall(nodeIdentities)) {
// if the socket address is not listed in the firewall, then
return a null response
- logger.info("Firewall blocked connection request from node " +
resolvedNodeId + " with Node Identities " + nodeIdentities);
+ logger.info("Firewall blocked connection request from node " +
nodeIdentifier + " with Node Identities " + nodeIdentities);
final ConnectionResponse response =
ConnectionResponse.createBlockedByFirewallResponse();
final ConnectionResponseMessage responseMessage = new
ConnectionResponseMessage();
responseMessage.setConnectionResponse(response);
@@ -1156,12 +1166,12 @@ public class NodeClusterCoordinator implements
ClusterCoordinator, ProtocolHandl
return createFlowElectionInProgressResponse();
} else {
logger.info("Received Connection Request from {}; responding
with DataFlow that was elected", withNodeIdentities);
- return createConnectionResponse(requestWithNodeIdentities,
resolvedNodeId, electedDataFlow);
+ return createConnectionResponse(requestWithNodeIdentities,
nodeIdentifier, electedDataFlow);
}
}
logger.info("Received Connection Request from {}; responding with my
DataFlow", withNodeIdentities);
- return createConnectionResponse(requestWithNodeIdentities,
resolvedNodeId);
+ return createConnectionResponse(requestWithNodeIdentities,
nodeIdentifier);
}
private ConnectionResponseMessage createFlowElectionInProgressResponse() {
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
index b266ba9..1bca6fb 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
@@ -58,7 +58,6 @@ import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
@@ -443,7 +442,7 @@ public class TestNodeClusterCoordinator {
assertTrue(conflictingResponse instanceof ConnectionResponseMessage);
final ConnectionResponseMessage conflictingResponseMessage =
(ConnectionResponseMessage) conflictingResponse;
final NodeIdentifier conflictingNodeId =
conflictingResponseMessage.getConnectionResponse().getNodeIdentifier();
- assertNotSame(id1.getId(), conflictingNodeId.getId());
+ assertEquals(id1.getId(), conflictingNodeId.getId());
assertEquals(conflictingId.getApiAddress(),
conflictingNodeId.getApiAddress());
assertEquals(conflictingId.getApiPort(),
conflictingNodeId.getApiPort());
assertEquals(conflictingId.getSiteToSiteAddress(),
conflictingNodeId.getSiteToSiteAddress());