This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 08ae00f5a41 IGNITE-28518 Throw RecipientLeftException if acceptor says
initiator is stale (#7977)
08ae00f5a41 is described below
commit 08ae00f5a4175babf5e28100f077990d276e5b48
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Mon Apr 13 11:06:03 2026 +0400
IGNITE-28518 Throw RecipientLeftException if acceptor says initiator is
stale (#7977)
---
.../network/recovery/HandshakeManagerUtils.java | 2 +-
.../network/DefaultMessagingServiceTest.java | 46 +++++++++++++++++++++-
2 files changed, 45 insertions(+), 3 deletions(-)
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/HandshakeManagerUtils.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/HandshakeManagerUtils.java
index b4fc5b674ce..f4f882292b7 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/HandshakeManagerUtils.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/HandshakeManagerUtils.java
@@ -89,7 +89,7 @@ class HandshakeManagerUtils {
}
static Exception
createExceptionFromRejectionMessage(HandshakeRejectedMessage msg) {
- return msg.reason() == HandshakeRejectionReason.STOPPING
+ return msg.reason() == HandshakeRejectionReason.STOPPING ||
msg.reason() == HandshakeRejectionReason.STALE_LAUNCH_ID
? new RecipientLeftException(msg.message())
: new HandshakeException(msg.message());
}
diff --git
a/modules/network/src/test/java/org/apache/ignite/internal/network/DefaultMessagingServiceTest.java
b/modules/network/src/test/java/org/apache/ignite/internal/network/DefaultMessagingServiceTest.java
index 326ad9a7bed..d2f77ca0525 100644
---
a/modules/network/src/test/java/org/apache/ignite/internal/network/DefaultMessagingServiceTest.java
+++
b/modules/network/src/test/java/org/apache/ignite/internal/network/DefaultMessagingServiceTest.java
@@ -604,6 +604,34 @@ class DefaultMessagingServiceTest extends
BaseIgniteAbstractTest {
}
}
+ /**
+ * If the acceptor thinks we are stale, a send()/invoke() call at
initiator side should get a {@link RecipientLeftException}
+ * (as the initiator will not be able to send anything to it before one of
the nodes gets restarted).
+ */
+ @ParameterizedTest
+ @EnumSource(SendByClusterNodeOperation.class)
+ @EnumSource(SendByConsistentCoordinateOperation.class)
+ void sendByClusterNodeToNodeThinkingSenderIsStale(AsyncSendOperation
operation) throws Exception {
+ var receiverSideStaleIdDetector = new InMemoryStaleIds();
+ receiverSideStaleIdDetector.markAsStale(senderNode.id());
+
+ try (
+ Services senderServices = createMessagingService(senderNode,
senderNetworkConfig);
+ Services ignoredReceiverServices = createMessagingService(
+ receiverNode,
+ receiverNetworkConfig,
+ () -> {},
+ messageSerializationRegistry,
+ receiverSideStaleIdDetector
+ )
+ ) {
+ assertThat(
+ operation.send(senderServices.messagingService,
testMessage("test"), receiverNode),
+ willThrow(RecipientLeftException.class)
+ );
+ }
+ }
+
private ClusterNodeImpl copyWithDifferentId() {
return new ClusterNodeImpl(
randomUUID(),
@@ -906,12 +934,16 @@ class DefaultMessagingServiceTest extends
BaseIgniteAbstractTest {
}
}
+ private interface AsyncSendOperation {
+ CompletableFuture<?> send(MessagingService service, TestMessage
message, InternalClusterNode recipient);
+ }
+
@FunctionalInterface
private interface AsyncSendAction {
CompletableFuture<?> send(MessagingService service, TestMessage
message, InternalClusterNode recipient);
}
- private enum SendByClusterNodeOperation {
+ private enum SendByClusterNodeOperation implements AsyncSendOperation {
SEND_DEFAULT_CHANNEL((service, message, to) -> service.send(to,
message)),
SEND_SPECIFIC_CHANNEL((service, message, to) -> service.send(to,
TEST_CHANNEL, message)),
RESPOND_DEFAULT_CHANNEL((service, message, to) -> service.respond(to,
message, 123L)),
@@ -924,9 +956,14 @@ class DefaultMessagingServiceTest extends
BaseIgniteAbstractTest {
SendByClusterNodeOperation(AsyncSendAction sendAction) {
this.sendAction = sendAction;
}
+
+ @Override
+ public CompletableFuture<?> send(MessagingService service, TestMessage
message, InternalClusterNode recipient) {
+ return sendAction.send(service, message, recipient);
+ }
}
- private enum SendByConsistentCoordinateOperation {
+ private enum SendByConsistentCoordinateOperation implements
AsyncSendOperation {
SEND_BY_CONSISTENT_ID((service, message, to) ->
service.send(to.name(), ChannelType.DEFAULT, message)),
SEND_BY_ADDRESS((service, message, to) -> service.send(to.address(),
ChannelType.DEFAULT, message)),
RESPOND_BY_CONSISTENT_ID_DEFAULT_CHANNEL((service, message, to) ->
service.respond(to.name(), message, 123L)),
@@ -940,6 +977,11 @@ class DefaultMessagingServiceTest extends
BaseIgniteAbstractTest {
this.sendAction = sendAction;
}
+ @Override
+ public CompletableFuture<?> send(MessagingService service, TestMessage
message, InternalClusterNode recipient) {
+ return sendAction.send(service, message, recipient);
+ }
+
private boolean notRespondOperation() {
return this != RESPOND_BY_CONSISTENT_ID_DEFAULT_CHANNEL && this !=
RESPOND_BY_CONSISTENT_ID_SPECIFIC_CHANNEL;
}