This is an automated email from the ASF dual-hosted git repository.

rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 08ae00f5a41 IGNITE-28518 Throw RecipientLeftException if acceptor says 
initiator is stale (#7977)
08ae00f5a41 is described below

commit 08ae00f5a4175babf5e28100f077990d276e5b48
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Mon Apr 13 11:06:03 2026 +0400

    IGNITE-28518 Throw RecipientLeftException if acceptor says initiator is 
stale (#7977)
---
 .../network/recovery/HandshakeManagerUtils.java    |  2 +-
 .../network/DefaultMessagingServiceTest.java       | 46 +++++++++++++++++++++-
 2 files changed, 45 insertions(+), 3 deletions(-)

diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/HandshakeManagerUtils.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/HandshakeManagerUtils.java
index b4fc5b674ce..f4f882292b7 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/HandshakeManagerUtils.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/HandshakeManagerUtils.java
@@ -89,7 +89,7 @@ class HandshakeManagerUtils {
     }
 
     static Exception 
createExceptionFromRejectionMessage(HandshakeRejectedMessage msg) {
-        return msg.reason() == HandshakeRejectionReason.STOPPING
+        return msg.reason() == HandshakeRejectionReason.STOPPING || 
msg.reason() == HandshakeRejectionReason.STALE_LAUNCH_ID
                 ? new RecipientLeftException(msg.message())
                 : new HandshakeException(msg.message());
     }
diff --git 
a/modules/network/src/test/java/org/apache/ignite/internal/network/DefaultMessagingServiceTest.java
 
b/modules/network/src/test/java/org/apache/ignite/internal/network/DefaultMessagingServiceTest.java
index 326ad9a7bed..d2f77ca0525 100644
--- 
a/modules/network/src/test/java/org/apache/ignite/internal/network/DefaultMessagingServiceTest.java
+++ 
b/modules/network/src/test/java/org/apache/ignite/internal/network/DefaultMessagingServiceTest.java
@@ -604,6 +604,34 @@ class DefaultMessagingServiceTest extends 
BaseIgniteAbstractTest {
         }
     }
 
+    /**
+     * If the acceptor thinks we are stale, a send()/invoke() call at 
initiator side should get a {@link RecipientLeftException}
+     * (as the initiator will not be able to send anything to it before one of 
the nodes gets restarted).
+     */
+    @ParameterizedTest
+    @EnumSource(SendByClusterNodeOperation.class)
+    @EnumSource(SendByConsistentCoordinateOperation.class)
+    void sendByClusterNodeToNodeThinkingSenderIsStale(AsyncSendOperation 
operation) throws Exception {
+        var receiverSideStaleIdDetector = new InMemoryStaleIds();
+        receiverSideStaleIdDetector.markAsStale(senderNode.id());
+
+        try (
+                Services senderServices = createMessagingService(senderNode, 
senderNetworkConfig);
+                Services ignoredReceiverServices = createMessagingService(
+                        receiverNode,
+                        receiverNetworkConfig,
+                        () -> {},
+                        messageSerializationRegistry,
+                        receiverSideStaleIdDetector
+                )
+        ) {
+            assertThat(
+                    operation.send(senderServices.messagingService, 
testMessage("test"), receiverNode),
+                    willThrow(RecipientLeftException.class)
+            );
+        }
+    }
+
     private ClusterNodeImpl copyWithDifferentId() {
         return new ClusterNodeImpl(
                 randomUUID(),
@@ -906,12 +934,16 @@ class DefaultMessagingServiceTest extends 
BaseIgniteAbstractTest {
         }
     }
 
+    private interface AsyncSendOperation {
+        CompletableFuture<?> send(MessagingService service, TestMessage 
message, InternalClusterNode recipient);
+    }
+
     @FunctionalInterface
     private interface AsyncSendAction {
         CompletableFuture<?> send(MessagingService service, TestMessage 
message, InternalClusterNode recipient);
     }
 
-    private enum SendByClusterNodeOperation {
+    private enum SendByClusterNodeOperation implements AsyncSendOperation {
         SEND_DEFAULT_CHANNEL((service, message, to) -> service.send(to, 
message)),
         SEND_SPECIFIC_CHANNEL((service, message, to) -> service.send(to, 
TEST_CHANNEL, message)),
         RESPOND_DEFAULT_CHANNEL((service, message, to) -> service.respond(to, 
message, 123L)),
@@ -924,9 +956,14 @@ class DefaultMessagingServiceTest extends 
BaseIgniteAbstractTest {
         SendByClusterNodeOperation(AsyncSendAction sendAction) {
             this.sendAction = sendAction;
         }
+
+        @Override
+        public CompletableFuture<?> send(MessagingService service, TestMessage 
message, InternalClusterNode recipient) {
+            return sendAction.send(service, message, recipient);
+        }
     }
 
-    private enum SendByConsistentCoordinateOperation {
+    private enum SendByConsistentCoordinateOperation implements 
AsyncSendOperation {
         SEND_BY_CONSISTENT_ID((service, message, to) -> 
service.send(to.name(), ChannelType.DEFAULT, message)),
         SEND_BY_ADDRESS((service, message, to) -> service.send(to.address(), 
ChannelType.DEFAULT, message)),
         RESPOND_BY_CONSISTENT_ID_DEFAULT_CHANNEL((service, message, to) -> 
service.respond(to.name(), message, 123L)),
@@ -940,6 +977,11 @@ class DefaultMessagingServiceTest extends 
BaseIgniteAbstractTest {
             this.sendAction = sendAction;
         }
 
+        @Override
+        public CompletableFuture<?> send(MessagingService service, TestMessage 
message, InternalClusterNode recipient) {
+            return sendAction.send(service, message, recipient);
+        }
+
         private boolean notRespondOperation() {
             return this != RESPOND_BY_CONSISTENT_ID_DEFAULT_CHANNEL && this != 
RESPOND_BY_CONSISTENT_ID_SPECIFIC_CHANNEL;
         }

Reply via email to