This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 42b18d49fe7 IGNITE-22377 Fail one of nodes if a stale node detected
(#7248)
42b18d49fe7 is described below
commit 42b18d49fe725b1d0e8b79c841623421b8797308
Author: Ivan Bessonov <[email protected]>
AuthorDate: Mon Jan 12 10:13:01 2026 +0300
IGNITE-22377 Fail one of nodes if a stale node detected (#7248)
---
.../management/topology/ItLogicalTopologyTest.java | 6 +++
.../internal/network/file/TestTopologyService.java | 9 +++-
...des.java => LogicalTopologyEventsListener.java} | 8 ++--
.../ignite/internal/network/TopologyService.java | 7 ++-
modules/network/build.gradle | 1 +
.../node/ItNodeStalenessAndRestartTest.java | 39 ++++++++++++++++-
.../internal/network/DefaultMessagingService.java | 6 ++-
.../internal/network/netty/ConnectionManager.java | 3 +-
.../network/recovery/HandshakeManagerUtils.java | 29 +++++++++++++
.../recovery/RecoveryAcceptorHandshakeManager.java | 50 ++++++++++++++++------
.../RecoveryInitiatorHandshakeManager.java | 32 ++++++++------
.../recovery/StaleNodeHandlingParametersImpl.java} | 29 ++++++-------
.../recovery/message/HandshakeStartMessage.java | 5 ++-
.../message/HandshakeStartResponseMessage.java | 5 ++-
.../message/StaleNodeHandlingParameters.java} | 22 +++-------
.../scalecube/ScaleCubeTopologyService.java | 14 +++++-
.../network/netty/RecoveryHandshakeTest.java | 3 +-
.../RecoveryAcceptorHandshakeManagerTest.java | 4 +-
.../org/apache/ignite/internal/app/IgniteImpl.java | 8 ++--
.../engine/framework/ClusterServiceFactory.java | 9 +++-
20 files changed, 211 insertions(+), 78 deletions(-)
diff --git
a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/topology/ItLogicalTopologyTest.java
b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/topology/ItLogicalTopologyTest.java
index d8ce414dbd0..57661230558 100644
---
a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/topology/ItLogicalTopologyTest.java
+++
b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/topology/ItLogicalTopologyTest.java
@@ -47,15 +47,19 @@ import
org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
import org.apache.ignite.internal.network.message.ScaleCubeMessage;
+import
org.apache.ignite.internal.testframework.failure.FailureManagerExtension;
+import
org.apache.ignite.internal.testframework.failure.MuteFailureManagerLogging;
import org.apache.ignite.internal.tostring.S;
import org.intellij.lang.annotations.Language;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.extension.ExtendWith;
/**
* Integration tests for functionality of logical topology events subscription.
*/
@SuppressWarnings("resource")
+@ExtendWith(FailureManagerExtension.class)
class ItLogicalTopologyTest extends ClusterPerTestIntegrationTest {
private final BlockingQueue<Event> events = new LinkedBlockingQueue<>();
@@ -254,6 +258,7 @@ class ItLogicalTopologyTest extends
ClusterPerTestIntegrationTest {
}
@Test
+ @MuteFailureManagerLogging
void nodeReturnedToPhysicalTopologyDoesNotReturnToLogicalTopology() throws
Exception {
cluster.startAndInit(1);
@@ -318,6 +323,7 @@ class ItLogicalTopologyTest extends
ClusterPerTestIntegrationTest {
}
@Test
+ @MuteFailureManagerLogging
void nodeThatCouldNotJoinShouldBeInvalidated(TestInfo testInfo) throws
Exception {
cluster.startAndInit(1);
diff --git
a/modules/file-transfer/src/test/java/org/apache/ignite/internal/network/file/TestTopologyService.java
b/modules/file-transfer/src/test/java/org/apache/ignite/internal/network/file/TestTopologyService.java
index 6479eab06fa..d1275b92cfc 100644
---
a/modules/file-transfer/src/test/java/org/apache/ignite/internal/network/file/TestTopologyService.java
+++
b/modules/file-transfer/src/test/java/org/apache/ignite/internal/network/file/TestTopologyService.java
@@ -44,6 +44,11 @@ public class TestTopologyService extends
AbstractTopologyService {
throw new UnsupportedOperationException();
}
+ @Override
+ public long logicalTopologyVersion() {
+ throw new UnsupportedOperationException();
+ }
+
@Override
public @Nullable InternalClusterNode getByAddress(NetworkAddress addr) {
throw new UnsupportedOperationException();
@@ -78,10 +83,10 @@ public class TestTopologyService extends
AbstractTopologyService {
}
@Override
- public void onJoined(InternalClusterNode node) {
+ public void onJoined(InternalClusterNode node, long topologyVersion) {
}
@Override
- public void onLeft(InternalClusterNode node) {
+ public void onLeft(InternalClusterNode node, long topologyVersion) {
}
}
diff --git
a/modules/network-api/src/main/java/org/apache/ignite/internal/network/JoinedNodes.java
b/modules/network-api/src/main/java/org/apache/ignite/internal/network/LogicalTopologyEventsListener.java
similarity index 79%
copy from
modules/network-api/src/main/java/org/apache/ignite/internal/network/JoinedNodes.java
copy to
modules/network-api/src/main/java/org/apache/ignite/internal/network/LogicalTopologyEventsListener.java
index d31c31f5b61..d223a94f88b 100644
---
a/modules/network-api/src/main/java/org/apache/ignite/internal/network/JoinedNodes.java
+++
b/modules/network-api/src/main/java/org/apache/ignite/internal/network/LogicalTopologyEventsListener.java
@@ -20,19 +20,21 @@ package org.apache.ignite.internal.network;
/**
* Allows reacting to logical topology changes.
*/
-public interface JoinedNodes {
+public interface LogicalTopologyEventsListener {
/**
* Called when the node joins logical topology.
*
* @param node Node.
+ * @param topologyVersion Logical topology version.
*/
- void onJoined(InternalClusterNode node);
+ void onJoined(InternalClusterNode node, long topologyVersion);
/**
* Called when the node leaves logical topology.
*
* @param node Node.
+ * @param topologyVersion Logical topology version.
*/
- void onLeft(InternalClusterNode node);
+ void onLeft(InternalClusterNode node, long topologyVersion);
}
diff --git
a/modules/network-api/src/main/java/org/apache/ignite/internal/network/TopologyService.java
b/modules/network-api/src/main/java/org/apache/ignite/internal/network/TopologyService.java
index 2bcc05fcadc..4896787d5d9 100644
---
a/modules/network-api/src/main/java/org/apache/ignite/internal/network/TopologyService.java
+++
b/modules/network-api/src/main/java/org/apache/ignite/internal/network/TopologyService.java
@@ -25,7 +25,7 @@ import org.jetbrains.annotations.Nullable;
* Entry point for obtaining physical cluster topology information.
*/
// TODO: allow removing event handlers, see
https://issues.apache.org/jira/browse/IGNITE-14519
-public interface TopologyService extends ClusterNodeResolver, JoinedNodes {
+public interface TopologyService extends ClusterNodeResolver,
LogicalTopologyEventsListener {
/**
* Returns information about the current node.
*
@@ -47,6 +47,11 @@ public interface TopologyService extends
ClusterNodeResolver, JoinedNodes {
*/
Collection<InternalClusterNode> logicalTopologyMembers();
+ /**
+ * Returns the logical topology version.
+ */
+ long logicalTopologyVersion();
+
/**
* Registers a handler for physical topology change events.
*
diff --git a/modules/network/build.gradle b/modules/network/build.gradle
index 2945aff22a4..310b1303cfe 100644
--- a/modules/network/build.gradle
+++ b/modules/network/build.gradle
@@ -86,6 +86,7 @@ dependencies {
integrationTestImplementation
testFixtures(project(':ignite-configuration'))
integrationTestImplementation testFixtures(project(':ignite-runner'))
integrationTestImplementation
testFixtures(project(':ignite-failure-handler:'))
+ integrationTestImplementation libs.awaitility
integrationTestImplementation libs.compileTesting
integrationTestImplementation libs.netty.handler
integrationTestImplementation libs.scalecube.cluster
diff --git
a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/node/ItNodeStalenessAndRestartTest.java
b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/node/ItNodeStalenessAndRestartTest.java
index a7ef269561f..064737bf271 100644
---
a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/node/ItNodeStalenessAndRestartTest.java
+++
b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/node/ItNodeStalenessAndRestartTest.java
@@ -21,17 +21,29 @@ import static java.util.concurrent.TimeUnit.SECONDS;
import static
org.apache.ignite.internal.ConfigTemplates.FAST_FAILURE_DETECTION_NODE_BOOTSTRAP_CFG_TEMPLATE;
import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.concurrent.CountDownLatch;
import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.ConfigOverride;
import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.failure.FailureManager;
import org.apache.ignite.internal.network.InternalClusterNode;
import org.apache.ignite.internal.network.TopologyEventHandler;
import org.apache.ignite.internal.network.message.ScaleCubeMessage;
+import
org.apache.ignite.internal.testframework.failure.FailureManagerExtension;
+import
org.apache.ignite.internal.testframework.failure.MuteFailureManagerLogging;
+import org.apache.ignite.internal.testframework.log4j2.LogInspector;
+import org.apache.logging.log4j.Level;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+@ExtendWith(FailureManagerExtension.class)
class ItNodeStalenessAndRestartTest extends ClusterPerTestIntegrationTest {
+
+ private static final String FAILURE_MESSAGE = "Cluster segmentation
detected, current node will be shut down";
+
@Override
protected int initialNodes() {
return 2;
@@ -43,6 +55,7 @@ class ItNodeStalenessAndRestartTest extends
ClusterPerTestIntegrationTest {
}
@Test
+ @MuteFailureManagerLogging
void nodeStalenessStatusIsClearedOnRestart() throws Exception {
IgniteImpl ignite0 = unwrapIgniteImpl(cluster.node(0));
@@ -59,13 +72,37 @@ class ItNodeStalenessAndRestartTest extends
ClusterPerTestIntegrationTest {
);
}
+ @Test
+ @ConfigOverride(name = "ignite.failureHandler.handler.type", value =
"stop")
+ @MuteFailureManagerLogging
+ void staleNodeIsShutDown() throws Exception {
+ IgniteImpl ignite0 = unwrapIgniteImpl(cluster.node(0));
+
+ LogInspector logInspector = new LogInspector(
+ FailureManager.class.getName(),
+ evt -> evt.getLevel() == Level.ERROR
+ && evt.getThrown() != null
+ &&
evt.getThrown().getMessage().contains(FAILURE_MESSAGE)
+ &&
Thread.currentThread().getName().contains(cluster.nodeName(1))
+ );
+
+ logInspector.start();
+ try {
+ simulateNetworkPartition(ignite0);
+
+ await().timeout(10, SECONDS).until(logInspector::isMatched);
+ } finally {
+ logInspector.stop();
+ }
+ }
+
private void simulateNetworkPartition(IgniteImpl ignite0) throws
InterruptedException {
CountDownLatch ignite1Left = new CountDownLatch(1);
ignite0.clusterService().topologyService().addEventHandler(new
TopologyEventHandler() {
@Override
public void onDisappeared(InternalClusterNode member) {
- if (member.name().equals(cluster.node(1).name())) {
+ if (member.name().equals(cluster.nodeName(1))) {
ignite1Left.countDown();
}
}
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java
index a41f053db78..f6e0926f11a 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java
@@ -415,7 +415,7 @@ public class DefaultMessagingService extends
AbstractMessagingService {
"Handshake failed [destNodeId={}, channelType={},
destAddr={}, localBindAddr={}]", ex,
nodeId, type, addr,
connectionManager.localBindAddress()
);
- } else if (!hasCause(ex, NodeStoppingException.class) &&
LOG.isInfoEnabled()) {
+ } else if (!ignorableHandshakeException(ex) &&
LOG.isInfoEnabled()) {
// TODO IGNITE-25802 Detect a LOOP rejection reason and retry
the connection.
LOG.info(
"Handshake failed [message={}, destNodeId={},
channelType={}, destAddr={}, localBindAddr={}]",
@@ -425,6 +425,10 @@ public class DefaultMessagingService extends
AbstractMessagingService {
}
}
+ private static boolean ignorableHandshakeException(Throwable ex) {
+ return hasCause(ex, NodeStoppingException.class,
RecipientLeftException.class);
+ }
+
private void triggerChannelCreation(UUID nodeId, ChannelType type,
InetSocketAddress addr) {
connectionManager.channel(nodeId, type, addr);
}
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
index 55ce2153bdd..b6eb5b34113 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
@@ -537,7 +537,8 @@ public class ConnectionManager implements
ChannelCreationListener {
this,
stopping::get,
productVersionSource,
- topologyService
+ topologyService,
+ failureProcessor
);
}
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/HandshakeManagerUtils.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/HandshakeManagerUtils.java
index 842ecc5454a..b4fc5b674ce 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/HandshakeManagerUtils.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/HandshakeManagerUtils.java
@@ -23,6 +23,9 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
+import org.apache.ignite.internal.failure.FailureContext;
+import org.apache.ignite.internal.failure.FailureProcessor;
+import org.apache.ignite.internal.failure.FailureType;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.network.InternalClusterNode;
@@ -35,6 +38,8 @@ import org.apache.ignite.internal.network.netty.NettySender;
import org.apache.ignite.internal.network.netty.NettyUtils;
import
org.apache.ignite.internal.network.recovery.message.HandshakeRejectedMessage;
import
org.apache.ignite.internal.network.recovery.message.HandshakeRejectionReason;
+import
org.apache.ignite.internal.network.recovery.message.StaleNodeHandlingParameters;
+import org.apache.ignite.internal.tostring.S;
class HandshakeManagerUtils {
private static final IgniteLogger LOG =
Loggers.forClass(HandshakeManagerUtils.class);
@@ -88,4 +93,28 @@ class HandshakeManagerUtils {
? new RecipientLeftException(msg.message())
: new HandshakeException(msg.message());
}
+
+ static void maybeFailOnStaleNodeDetection(
+ FailureProcessor failureProcessor,
+ StaleNodeHandlingParameters local,
+ StaleNodeHandlingParameters remote,
+ ClusterNodeMessage remoteNode
+ ) {
+ long localTopologyVersion = local.topologyVersion();
+ long remoteTopologyVersion = remote.topologyVersion();
+
+ if (localTopologyVersion >= remoteTopologyVersion) {
+ return;
+ }
+
+ String message = S.toString(
+ "Cluster segmentation detected, current node will be shut
down",
+ "logicalTopologyVersion", localTopologyVersion, false,
+ "remoteLogicalTopologyVersion", remoteTopologyVersion, false,
+ "remoteNodeId", remoteNode.id(), false,
+ "remoteNodeName", remoteNode.name(), false
+ );
+
+ failureProcessor.process(new
FailureContext(FailureType.CRITICAL_ERROR, null, message));
+ }
}
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryAcceptorHandshakeManager.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryAcceptorHandshakeManager.java
index ceef409062b..d8d0b7e81cb 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryAcceptorHandshakeManager.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryAcceptorHandshakeManager.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.network.recovery;
import static java.util.Collections.emptyList;
import static java.util.stream.Collectors.toList;
+import static
org.apache.ignite.internal.network.recovery.HandshakeManagerUtils.maybeFailOnStaleNodeDetection;
import static
org.apache.ignite.internal.network.recovery.HandshakeTieBreaker.shouldCloseChannel;
import io.netty.channel.Channel;
@@ -29,6 +30,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.BooleanSupplier;
import java.util.function.Function;
+import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
@@ -54,6 +56,7 @@ import
org.apache.ignite.internal.network.recovery.message.HandshakeRejectionRea
import
org.apache.ignite.internal.network.recovery.message.HandshakeStartMessage;
import
org.apache.ignite.internal.network.recovery.message.HandshakeStartResponseMessage;
import org.apache.ignite.internal.network.recovery.message.ProbeMessage;
+import org.apache.ignite.internal.tostring.S;
import org.apache.ignite.internal.version.IgniteProductVersionSource;
/**
@@ -107,9 +110,11 @@ public class RecoveryAcceptorHandshakeManager implements
HandshakeManager {
private RecoveryDescriptor recoveryDescriptor;
/** Cluster topology service. */
- @SuppressWarnings("FieldCanBeLocal")
private final TopologyService topologyService;
+ /** Failure processor. */
+ private final FailureProcessor failureProcessor;
+
/**
* Constructor.
*
@@ -130,7 +135,8 @@ public class RecoveryAcceptorHandshakeManager implements
HandshakeManager {
ChannelCreationListener channelCreationListener,
BooleanSupplier stopping,
IgniteProductVersionSource productVersionSource,
- TopologyService topologyService
+ TopologyService topologyService,
+ FailureProcessor failureProcessor
) {
this.localNode = localNode;
this.messageFactory = messageFactory;
@@ -141,6 +147,7 @@ public class RecoveryAcceptorHandshakeManager implements
HandshakeManager {
this.stopping = stopping;
this.productVersionSource = productVersionSource;
this.topologyService = topologyService;
+ this.failureProcessor = failureProcessor;
this.handshakeCompleteFuture.whenComplete((nettySender, throwable) -> {
if (throwable != null) {
@@ -174,12 +181,19 @@ public class RecoveryAcceptorHandshakeManager implements
HandshakeManager {
/** {@inheritDoc} */
@Override
public void onConnectionOpen() {
- HandshakeStartMessage handshakeStartMessage =
messageFactory.handshakeStartMessage()
-
.serverNode(HandshakeManagerUtils.clusterNodeToMessage(localNode))
- .serverClusterId(clusterIdSupplier.clusterId())
- .productName(productVersionSource.productName())
-
.productVersion(productVersionSource.productVersion().toString())
- .build();
+ if (stopping.getAsBoolean()) {
+ sendRejectionMessageAndFailHandshake(
+ S.toString("The node is stopping", "name",
localNode.name(), false),
+ HandshakeRejectionReason.STOPPING,
+ m -> new NodeStoppingException()
+ );
+ } else {
+ sendHandshakeStartMessage();
+ }
+ }
+
+ private void sendHandshakeStartMessage() {
+ HandshakeStartMessage handshakeStartMessage =
createHandshakeStartMessage();
ChannelFuture sendFuture = channel.writeAndFlush(new
OutNetworkObject(handshakeStartMessage, emptyList()));
@@ -192,6 +206,16 @@ public class RecoveryAcceptorHandshakeManager implements
HandshakeManager {
});
}
+ private HandshakeStartMessage createHandshakeStartMessage() {
+ return messageFactory.handshakeStartMessage()
+
.serverNode(HandshakeManagerUtils.clusterNodeToMessage(localNode))
+ .serverClusterId(clusterIdSupplier.clusterId())
+ .productName(productVersionSource.productName())
+
.productVersion(productVersionSource.productVersion().toString())
+ .topologyVersion(topologyService.logicalTopologyVersion())
+ .build();
+ }
+
/** {@inheritDoc} */
@Override
public void onMessage(NetworkMessage message) {
@@ -241,14 +265,14 @@ public class RecoveryAcceptorHandshakeManager implements
HandshakeManager {
}
private boolean
possiblyRejectHandshakeStartResponse(HandshakeStartResponseMessage message) {
- if (staleIdDetector.isIdStale(message.clientNode().id())) {
- handleStaleInitiatorId(message);
+ if (stopping.getAsBoolean()) {
+ handleRefusalToEstablishConnectionDueToStopping(message);
return true;
}
- if (stopping.getAsBoolean()) {
- handleRefusalToEstablishConnectionDueToStopping(message);
+ if (staleIdDetector.isIdStale(message.clientNode().id())) {
+ handleStaleInitiatorId(message);
return true;
}
@@ -262,6 +286,8 @@ public class RecoveryAcceptorHandshakeManager implements
HandshakeManager {
);
sendRejectionMessageAndFailHandshake(message,
HandshakeRejectionReason.STALE_LAUNCH_ID, HandshakeException::new);
+
+ maybeFailOnStaleNodeDetection(failureProcessor, new
StaleNodeHandlingParametersImpl(topologyService), msg, msg.clientNode());
}
private void
handleRefusalToEstablishConnectionDueToStopping(HandshakeStartResponseMessage
msg) {
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryInitiatorHandshakeManager.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryInitiatorHandshakeManager.java
index db34a4ead3a..c42d901c6f7 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryInitiatorHandshakeManager.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryInitiatorHandshakeManager.java
@@ -22,6 +22,7 @@ import static java.util.function.Function.identity;
import static java.util.stream.Collectors.toList;
import static
org.apache.ignite.internal.network.netty.NettyUtils.toCompletableFuture;
import static
org.apache.ignite.internal.network.recovery.HandshakeManagerUtils.clusterNodeToMessage;
+import static
org.apache.ignite.internal.network.recovery.HandshakeManagerUtils.maybeFailOnStaleNodeDetection;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
@@ -117,11 +118,9 @@ public class RecoveryInitiatorHandshakeManager implements
HandshakeManager {
private RecoveryDescriptor recoveryDescriptor;
/** Cluster topology service. */
- @SuppressWarnings("FieldCanBeLocal")
private final TopologyService topologyService;
/** Failure processor. */
- @SuppressWarnings("FieldCanBeLocal")
private final FailureProcessor failureProcessor;
/**
@@ -324,12 +323,6 @@ public class RecoveryInitiatorHandshakeManager implements
HandshakeManager {
return true;
}
- if (staleIdDetector.isIdStale(message.serverNode().id())) {
- handleStaleAcceptorId(message);
-
- return true;
- }
-
if (clusterIdMismatch(message.serverClusterId(),
clusterIdSupplier.clusterId())) {
handleClusterIdMismatch(message);
@@ -354,6 +347,12 @@ public class RecoveryInitiatorHandshakeManager implements
HandshakeManager {
return true;
}
+ if (staleIdDetector.isIdStale(message.serverNode().id())) {
+ handleStaleAcceptorId(message);
+
+ return true;
+ }
+
return false;
}
@@ -389,6 +388,8 @@ public class RecoveryInitiatorHandshakeManager implements
HandshakeManager {
HandshakeRejectionReason.STALE_LAUNCH_ID,
unused -> new RecipientLeftException("Recipient is stale: " +
msg.serverNode().id())
);
+
+ maybeFailOnStaleNodeDetection(failureProcessor, new
StaleNodeHandlingParametersImpl(topologyService), msg, msg.serverNode());
}
private void handleClusterIdMismatch(HandshakeStartMessage msg) {
@@ -491,11 +492,7 @@ public class RecoveryInitiatorHandshakeManager implements
HandshakeManager {
private void handshake(RecoveryDescriptor descriptor) {
PipelineUtils.afterHandshake(ctx.pipeline(), descriptor,
createMessageHandler(), MESSAGE_FACTORY);
- HandshakeStartResponseMessage response =
MESSAGE_FACTORY.handshakeStartResponseMessage()
- .clientNode(clusterNodeToMessage(localNode))
- .receivedCount(descriptor.receivedCount())
- .connectionId(connectionId)
- .build();
+ HandshakeStartResponseMessage response =
createHandshakeStartResponseMessage(descriptor);
ChannelFuture sendFuture = ctx.channel().writeAndFlush(new
OutNetworkObject(response, emptyList()));
@@ -508,6 +505,15 @@ public class RecoveryInitiatorHandshakeManager implements
HandshakeManager {
});
}
+ protected HandshakeStartResponseMessage
createHandshakeStartResponseMessage(RecoveryDescriptor descriptor) {
+ return MESSAGE_FACTORY.handshakeStartResponseMessage()
+ .clientNode(clusterNodeToMessage(localNode))
+ .receivedCount(descriptor.receivedCount())
+ .connectionId(connectionId)
+ .topologyVersion(topologyService.logicalTopologyVersion())
+ .build();
+ }
+
/**
* Creates a message handler using the consistent id of a remote node.
*
diff --git
a/modules/network-api/src/main/java/org/apache/ignite/internal/network/JoinedNodes.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/StaleNodeHandlingParametersImpl.java
similarity index 54%
copy from
modules/network-api/src/main/java/org/apache/ignite/internal/network/JoinedNodes.java
copy to
modules/network/src/main/java/org/apache/ignite/internal/network/recovery/StaleNodeHandlingParametersImpl.java
index d31c31f5b61..d3710357415 100644
---
a/modules/network-api/src/main/java/org/apache/ignite/internal/network/JoinedNodes.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/StaleNodeHandlingParametersImpl.java
@@ -15,24 +15,23 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.network;
+package org.apache.ignite.internal.network.recovery;
+
+import org.apache.ignite.internal.network.TopologyService;
+import
org.apache.ignite.internal.network.recovery.message.StaleNodeHandlingParameters;
/**
- * Allows reacting to logical topology changes.
+ * Basic implementation of {@link StaleNodeHandlingParameters}. Exists only to
simplify the code and avoid code duplication.
*/
-public interface JoinedNodes {
+class StaleNodeHandlingParametersImpl implements StaleNodeHandlingParameters {
+ private final TopologyService topologyService;
- /**
- * Called when the node joins logical topology.
- *
- * @param node Node.
- */
- void onJoined(InternalClusterNode node);
+ StaleNodeHandlingParametersImpl(TopologyService topologyService) {
+ this.topologyService = topologyService;
+ }
- /**
- * Called when the node leaves logical topology.
- *
- * @param node Node.
- */
- void onLeft(InternalClusterNode node);
+ @Override
+ public long topologyVersion() {
+ return topologyService.logicalTopologyVersion();
+ }
}
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeStartMessage.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeStartMessage.java
index 2bd7f956bc3..722c0b80b22 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeStartMessage.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeStartMessage.java
@@ -28,7 +28,7 @@ import org.jetbrains.annotations.Nullable;
* This message is sent from an acceptor to an initiator at the connection
opening.
*/
@Transferable(NetworkMessageTypes.HANDSHAKE_START)
-public interface HandshakeStartMessage extends InternalMessage {
+public interface HandshakeStartMessage extends StaleNodeHandlingParameters,
InternalMessage {
/** Returns the acceptor node that sends this. */
ClusterNodeMessage serverNode();
@@ -41,4 +41,7 @@ public interface HandshakeStartMessage extends
InternalMessage {
/** Product version of the node that sends the message. */
String productVersion();
+
+ @Override
+ long topologyVersion();
}
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeStartResponseMessage.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeStartResponseMessage.java
index 7b6761f213a..83e9cd793ae 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeStartResponseMessage.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeStartResponseMessage.java
@@ -26,7 +26,7 @@ import
org.apache.ignite.internal.network.message.ClusterNodeMessage;
* This message is sent from an initiator to an acceptor as a response to the
{@link HandshakeStartMessage}.
*/
@Transferable(NetworkMessageTypes.HANDSHAKE_START_RESPONSE)
-public interface HandshakeStartResponseMessage extends InternalMessage {
+public interface HandshakeStartResponseMessage extends
StaleNodeHandlingParameters, InternalMessage {
/** Returns the initiator node that sends this. */
ClusterNodeMessage clientNode();
@@ -43,4 +43,7 @@ public interface HandshakeStartResponseMessage extends
InternalMessage {
* @return Number of received messages.
*/
long receivedCount();
+
+ @Override
+ long topologyVersion();
}
diff --git
a/modules/network-api/src/main/java/org/apache/ignite/internal/network/JoinedNodes.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/StaleNodeHandlingParameters.java
similarity index 66%
rename from
modules/network-api/src/main/java/org/apache/ignite/internal/network/JoinedNodes.java
rename to
modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/StaleNodeHandlingParameters.java
index d31c31f5b61..a8409f3d6b8 100644
---
a/modules/network-api/src/main/java/org/apache/ignite/internal/network/JoinedNodes.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/StaleNodeHandlingParameters.java
@@ -15,24 +15,12 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.network;
+package org.apache.ignite.internal.network.recovery.message;
/**
- * Allows reacting to logical topology changes.
+ * Parameters required for handling stale state of a node.
*/
-public interface JoinedNodes {
-
- /**
- * Called when the node joins logical topology.
- *
- * @param node Node.
- */
- void onJoined(InternalClusterNode node);
-
- /**
- * Called when the node leaves logical topology.
- *
- * @param node Node.
- */
- void onLeft(InternalClusterNode node);
+public interface StaleNodeHandlingParameters {
+ /** Returns the logical topology version. */
+ long topologyVersion();
}
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/scalecube/ScaleCubeTopologyService.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/scalecube/ScaleCubeTopologyService.java
index e79a818c8c2..f9949236a6c 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/scalecube/ScaleCubeTopologyService.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/scalecube/ScaleCubeTopologyService.java
@@ -59,6 +59,8 @@ final class ScaleCubeTopologyService extends
AbstractTopologyService {
*/
private volatile Cluster cluster;
+ private volatile long topologyVersion;
+
/** Topology members from the network address to the cluster node.. */
private final ConcurrentMap<NetworkAddress, InternalClusterNode> members =
new ConcurrentHashMap<>();
@@ -71,6 +73,11 @@ final class ScaleCubeTopologyService extends
AbstractTopologyService {
/** Topology members map from the id to the cluster node. */
private final ConcurrentMap<UUID, InternalClusterNode> idToMemberMap = new
ConcurrentHashMap<>();
+ @Override
+ public long logicalTopologyVersion() {
+ return topologyVersion;
+ }
+
/**
* Sets the ScaleCube's {@link Cluster}. Needed for cyclic dependency
injection.
*
@@ -339,18 +346,21 @@ final class ScaleCubeTopologyService extends
AbstractTopologyService {
}
@Override
- public void onJoined(InternalClusterNode node) {
+ public void onJoined(InternalClusterNode node, long topologyVersion) {
if (LOG.isDebugEnabled()) {
LOG.debug("Node joined logical topology [node={}]", node);
}
membersByConsistentIdInLogicalTopology.put(node.name(), node);
+ this.topologyVersion = topologyVersion;
}
@Override
- public void onLeft(InternalClusterNode node) {
+ public void onLeft(InternalClusterNode node, long topologyVersion) {
if (LOG.isDebugEnabled()) {
LOG.debug("Node left logical topology [node={}]", node);
}
+
membersByConsistentIdInLogicalTopology.remove(node.name());
+ this.topologyVersion = topologyVersion;
}
}
diff --git
a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
index c5f85e2233a..07d1771573c 100644
---
a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
+++
b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
@@ -810,7 +810,8 @@ public class RecoveryHandshakeTest extends
BaseIgniteAbstractTest {
channel -> {},
() -> false,
new DefaultIgniteProductVersionSource(),
- topologyService
+ topologyService,
+ new NoOpFailureManager()
);
}
diff --git
a/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryAcceptorHandshakeManagerTest.java
b/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryAcceptorHandshakeManagerTest.java
index 80187692436..1424af9a75f 100644
---
a/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryAcceptorHandshakeManagerTest.java
+++
b/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryAcceptorHandshakeManagerTest.java
@@ -45,6 +45,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BooleanSupplier;
+import org.apache.ignite.internal.failure.NoOpFailureManager;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.network.ClusterNodeImpl;
import org.apache.ignite.internal.network.ConstantClusterIdSupplier;
@@ -180,7 +181,8 @@ class RecoveryAcceptorHandshakeManagerTest extends
HandshakeManagerTest {
channelCreationListener,
stopping,
new DefaultIgniteProductVersionSource(),
- topologyService
+ topologyService,
+ new NoOpFailureManager()
);
manager.onInit(context);
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 122ce290a7a..01882dc23d3 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -178,7 +178,7 @@ import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.DefaultMessagingService;
import org.apache.ignite.internal.network.IgniteClusterImpl;
import org.apache.ignite.internal.network.InternalClusterNode;
-import org.apache.ignite.internal.network.JoinedNodes;
+import org.apache.ignite.internal.network.LogicalTopologyEventsListener;
import org.apache.ignite.internal.network.MessageSerializationRegistryImpl;
import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.network.NettyBootstrapFactory;
@@ -1347,16 +1347,16 @@ public class IgniteImpl implements Ignite {
);
}
- private static LogicalTopologyEventListener
logicalTopologyJoinedNodesListener(JoinedNodes joinedNodes) {
+ private static LogicalTopologyEventListener
logicalTopologyJoinedNodesListener(LogicalTopologyEventsListener listener) {
return new LogicalTopologyEventListener() {
@Override
public void onNodeJoined(LogicalNode joinedNode,
LogicalTopologySnapshot newTopology) {
- joinedNodes.onJoined(joinedNode);
+ listener.onJoined(joinedNode, newTopology.version());
}
@Override
public void onNodeLeft(LogicalNode leftNode,
LogicalTopologySnapshot newTopology) {
- joinedNodes.onLeft(leftNode);
+ listener.onLeft(leftNode, newTopology.version());
}
};
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ClusterServiceFactory.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ClusterServiceFactory.java
index 88fee61fa5f..d989ce29c7c 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ClusterServiceFactory.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ClusterServiceFactory.java
@@ -176,6 +176,11 @@ public class ClusterServiceFactory {
return Commons.cast(logicalTopology.getLogicalTopology().nodes());
}
+ @Override
+ public long logicalTopologyVersion() {
+ return logicalTopology.getLogicalTopology().version();
+ }
+
/** {@inheritDoc} */
@Override
public @Nullable InternalClusterNode getByAddress(NetworkAddress addr)
{
@@ -197,11 +202,11 @@ public class ClusterServiceFactory {
}
@Override
- public void onJoined(InternalClusterNode node) {
+ public void onJoined(InternalClusterNode node, long topologyVersion) {
}
@Override
- public void onLeft(InternalClusterNode node) {
+ public void onLeft(InternalClusterNode node, long topologyVersion) {
}
}