This is an automated email from the ASF dual-hosted git repository.

rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 35346f2bbb3 IGNITE-28501 Retry channel open attempt in most cases 
(#7964)
35346f2bbb3 is described below

commit 35346f2bbb3e7f60d74719cb57379d722fd1f32d
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Tue Apr 14 11:57:56 2026 +0400

    IGNITE-28501 Retry channel open attempt in most cases (#7964)
    
    * Retry on non-terminal I/O exceptions in all send/invoke modes
    * Retry on terminal I/O exceptions (like 'Connection refused') in 
send/invoke-by-name (but only until recipient falls off the physical topology)
    * Take invocation timeout into consideration
---
 .../network/scalecube/ItNodeRestartsTest.java      | 132 +++++++--
 .../internal/network/DefaultMessagingService.java  | 302 +++++++++++++++++++--
 .../handshake/BrokenHandshakeException.java        |  29 ++
 .../internal/network/netty/HandshakeHandler.java   |   6 +-
 .../network/DefaultMessagingServiceTest.java       | 259 +++++++++++++++++-
 5 files changed, 664 insertions(+), 64 deletions(-)

diff --git 
a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/scalecube/ItNodeRestartsTest.java
 
b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/scalecube/ItNodeRestartsTest.java
index 80370612f3f..d0a2f0d467e 100644
--- 
a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/scalecube/ItNodeRestartsTest.java
+++ 
b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/scalecube/ItNodeRestartsTest.java
@@ -38,25 +38,34 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.manager.ComponentContext;
+import org.apache.ignite.internal.network.ChannelType;
 import org.apache.ignite.internal.network.ClusterIdSupplier;
 import org.apache.ignite.internal.network.ClusterService;
 import org.apache.ignite.internal.network.ConstantClusterIdSupplier;
 import org.apache.ignite.internal.network.InternalClusterNode;
+import org.apache.ignite.internal.network.MessagingService;
+import org.apache.ignite.internal.network.NetworkMessage;
 import org.apache.ignite.internal.network.NodeFinder;
 import org.apache.ignite.internal.network.RecipientLeftException;
 import org.apache.ignite.internal.network.StaticNodeFinder;
-import org.apache.ignite.internal.network.handshake.HandshakeException;
+import org.apache.ignite.internal.network.UnresolvableConsistentIdException;
+import org.apache.ignite.internal.network.handshake.BrokenHandshakeException;
 import org.apache.ignite.internal.network.messages.TestMessage;
+import org.apache.ignite.internal.network.messages.TestMessageTypes;
 import org.apache.ignite.internal.network.messages.TestMessagesFactory;
 import org.apache.ignite.internal.network.recovery.InMemoryStaleIds;
 import org.apache.ignite.internal.network.utils.ClusterServiceTestUtils;
 import org.apache.ignite.network.NetworkAddress;
 import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
 
 /**
  * Tests if a topology size is correct after some nodes are restarted in quick 
succession.
@@ -70,6 +79,13 @@ class ItNodeRestartsTest {
     /** Created {@link ClusterService}s. Needed for resource management. */
     private List<ClusterService> services;
 
+    private TestInfo testInfo;
+
+    @BeforeEach
+    void setUp(TestInfo testInfo) {
+        this.testInfo = testInfo;
+    }
+
     /** Tear down method. */
     @AfterEach
     void tearDown() {
@@ -80,7 +96,7 @@ class ItNodeRestartsTest {
      * Tests that restarting nodes get discovered in an established topology.
      */
     @Test
-    public void testRestarts(TestInfo testInfo) {
+    void testRestarts() {
         final int initPort = 3344;
 
         List<NetworkAddress> addresses = findLocalAddresses(initPort, initPort 
+ 5);
@@ -140,9 +156,19 @@ class ItNodeRestartsTest {
 
         assertThat(clusterService.startAsync(new ComponentContext()), 
willCompleteSuccessfully());
 
+        echoOnInvokeRequests(clusterService);
+
         return clusterService;
     }
 
+    private static void echoOnInvokeRequests(ClusterService clusterService) {
+        
clusterService.messagingService().addMessageHandler(TestMessageTypes.class, 
(message, sender, correlationId) -> {
+            if (message instanceof TestMessage && correlationId != null) {
+                clusterService.messagingService().respond(sender, message, 
correlationId);
+            }
+        });
+    }
+
     /**
      * Blocks until the given topology reaches {@code expected} amount of 
members.
      *
@@ -173,8 +199,9 @@ class ItNodeRestartsTest {
     /**
      * Tests that message sends only end with expected exceptions during 
recipient node restart.
      */
-    @Test
-    public void testRestartDuringSends(TestInfo testInfo) {
+    @ParameterizedTest
+    @EnumSource(SendOperation.class)
+    void testRestartDuringSends(SendOperation operation) {
         final int initPort = 3344;
 
         List<NetworkAddress> addresses = findLocalAddresses(initPort, initPort 
+ 2);
@@ -191,45 +218,37 @@ class ItNodeRestartsTest {
         }
 
         ClusterService sender = services.get(0);
-        ClusterService receiver = services.get(1);
 
         AtomicBoolean sending = new AtomicBoolean(true);
 
-        CompletableFuture<Void> sendingFuture = runAsync(() -> {
-            InternalClusterNode receiverNode = receiver.staticLocalNode();
+        int receiverIndex = 1;
+        AtomicReference<InternalClusterNode> receiverNodeRef = new 
AtomicReference<>(services.get(receiverIndex).staticLocalNode());
 
+        CompletableFuture<Void> sendingFuture = runAsync(() -> {
             while (sending.get()) {
+                InternalClusterNode receiverNode = receiverNodeRef.get();
+
                 TestMessage message = new 
TestMessagesFactory().testMessage().build();
-                CompletableFuture<Void> future = 
sender.messagingService().send(receiverNode, message);
+                CompletableFuture<Void> future = 
operation.send(sender.messagingService(), receiverNode, message);
 
                 try {
                     future.get(10, SECONDS);
                 } catch (InterruptedException | TimeoutException e) {
                     throw new RuntimeException(e);
                 } catch (ExecutionException e) {
-                    // TODO: 
https://issues.apache.org/jira/browse/IGNITE-21364 - remove everything except 
RecipientLeftException.
-                    if (!hasCause(e, RecipientLeftException.class, 
ConnectException.class, SocketException.class, IOException.class)) {
-                        if (!hasCause(e, "Channel has been closed before 
handshake has finished", HandshakeException.class)) {
-                            fail("Not an expected exception", e);
-                        }
+                    if (!operation.isAllowed(e)) {
+                        fail("Not an expected exception", e);
                     }
                 }
-
-                try {
-                    Thread.sleep(10);
-                } catch (InterruptedException e) {
-                    throw new RuntimeException(e);
-                }
             }
         });
 
-        int receiverIndex = 1;
-
-        for (int i = 0; i < 10; i++) {
+        for (int i = 0; i < 100; i++) {
             stopClusterService(services.get(receiverIndex));
 
             ClusterService restartedReceiver = startNetwork(testInfo, 
addresses.get(receiverIndex), nodeFinder);
             services.set(receiverIndex, restartedReceiver);
+            receiverNodeRef.set(restartedReceiver.staticLocalNode());
         }
 
         sending.set(false);
@@ -240,4 +259,73 @@ class ItNodeRestartsTest {
     private static void stopClusterService(ClusterService clusterService) {
         assertThat(stopAsync(new ComponentContext(), clusterService), 
willCompleteSuccessfully());
     }
+
+    private enum SendOperation {
+        SEND_BY_NODE {
+            @Override
+            CompletableFuture<Void> send(MessagingService senderService, 
InternalClusterNode receiverNode, NetworkMessage message) {
+                return senderService.send(receiverNode, message);
+            }
+
+            @Override
+            boolean isAllowed(ExecutionException ex) {
+                // TODO: https://issues.apache.org/jira/browse/IGNITE-21364 - 
remove everything except RecipientLeftException.
+                return hasCause(
+                        ex,
+                        RecipientLeftException.class,
+                        ConnectException.class,
+                        SocketException.class,
+                        IOException.class,
+                        BrokenHandshakeException.class
+                );
+            }
+        },
+        INVOKE_BY_NODE {
+            @Override
+            CompletableFuture<Void> send(MessagingService senderService, 
InternalClusterNode receiverNode, NetworkMessage message) {
+                return senderService.invoke(receiverNode, message, 
1000).thenApply(unused -> null);
+            }
+
+            @Override
+            boolean isAllowed(ExecutionException ex) {
+                // TODO: https://issues.apache.org/jira/browse/IGNITE-21364 - 
remove everything except RecipientLeftException
+                // and TimeoutException.
+                return hasCause(
+                        ex,
+                        RecipientLeftException.class,
+                        TimeoutException.class,
+                        ConnectException.class,
+                        SocketException.class,
+                        IOException.class,
+                        BrokenHandshakeException.class
+                );
+            }
+        },
+        SEND_BY_NAME {
+            @Override
+            CompletableFuture<Void> send(MessagingService senderService, 
InternalClusterNode receiverNode, NetworkMessage message) {
+                return senderService.send(receiverNode.name(), 
ChannelType.DEFAULT, message);
+            }
+
+            @Override
+            boolean isAllowed(ExecutionException ex) {
+                return hasCause(ex, RecipientLeftException.class, 
UnresolvableConsistentIdException.class);
+            }
+        },
+        INVOKE_BY_NAME {
+            @Override
+            CompletableFuture<Void> send(MessagingService senderService, 
InternalClusterNode receiverNode, NetworkMessage message) {
+                return senderService.invoke(receiverNode.name(), message, 
1000).thenApply(unused -> null);
+            }
+
+            @Override
+            boolean isAllowed(ExecutionException ex) {
+                return hasCause(ex, RecipientLeftException.class, 
UnresolvableConsistentIdException.class, TimeoutException.class);
+            }
+        };
+
+        abstract CompletableFuture<Void> send(MessagingService senderService, 
InternalClusterNode receiverNode, NetworkMessage message);
+
+        abstract boolean isAllowed(ExecutionException ex);
+    }
 }
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java
index 3f47e6c0f3d..b93112a54ff 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java
@@ -17,7 +17,10 @@
 
 package org.apache.ignite.internal.network;
 
+import static java.util.concurrent.CompletableFuture.delayedExecutor;
 import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.concurrent.CompletableFuture.supplyAsync;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.function.Function.identity;
 import static 
org.apache.ignite.internal.lang.IgniteSystemProperties.LONG_HANDLING_LOGGING_ENABLED;
 import static 
org.apache.ignite.internal.metrics.sources.ThreadPoolMetricSource.THREAD_POOLS_METRICS_SOURCE_NAME;
@@ -37,7 +40,9 @@ import static 
org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
 import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
 import it.unimi.dsi.fastutil.ints.IntSet;
 import java.io.IOException;
+import java.net.ConnectException;
 import java.net.InetSocketAddress;
+import java.net.NoRouteToHostException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
@@ -48,6 +53,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.BiPredicate;
 import org.apache.ignite.internal.failure.FailureContext;
@@ -60,6 +66,7 @@ import org.apache.ignite.internal.lang.NodeStoppingException;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.metrics.MetricManager;
+import org.apache.ignite.internal.network.handshake.BrokenHandshakeException;
 import org.apache.ignite.internal.network.handshake.CriticalHandshakeException;
 import org.apache.ignite.internal.network.handshake.HandshakeException;
 import org.apache.ignite.internal.network.message.ClassDescriptorMessage;
@@ -85,6 +92,14 @@ import org.jetbrains.annotations.TestOnly;
 public class DefaultMessagingService extends AbstractMessagingService {
     private static final IgniteLogger LOG = 
Loggers.forClass(DefaultMessagingService.class);
 
+    private static final long NO_TIMEOUT = -1;
+
+    private static final TerminalIoErrorRetryStrategy NEVER_RETRY = new 
NeverRetryStrategy();
+
+    private static final TerminalIoErrorRetryStrategy BY_EPHEMERAL_ID = new 
ByEphemeralIdRetryStrategy();
+
+    private static final long CONNECTION_RETRY_DELAY_MS = 100;
+
     private final boolean longHandlingLoggingEnabled = 
IgniteSystemProperties.getBoolean(LONG_HANDLING_LOGGING_ENABLED, false);
 
     /** Network messages factory. */
@@ -145,6 +160,10 @@ public class DefaultMessagingService extends 
AbstractMessagingService {
      */
     private final Map<UUID, RecipientInetAddress> recipientInetAddrByNodeId = 
new ConcurrentHashMap<>();
 
+    private final Executor connectionRetryExecutor;
+
+    private final TerminalIoErrorRetryStrategy byConsistentId;
+
     /**
      * Constructor.
      *
@@ -211,6 +230,10 @@ public class DefaultMessagingService extends 
AbstractMessagingService {
                 requestsMap,
                 failureProcessor
         );
+
+        connectionRetryExecutor = delayedExecutor(CONNECTION_RETRY_DELAY_MS, 
MILLISECONDS, outboundExecutor);
+
+        byConsistentId = new ByConsistentIdRetryStrategy(topologyService);
     }
 
     @Override
@@ -220,7 +243,7 @@ public class DefaultMessagingService extends 
AbstractMessagingService {
 
     @Override
     public CompletableFuture<Void> send(InternalClusterNode recipient, 
ChannelType channelType, NetworkMessage msg) {
-        return send0(recipient, channelType, msg, null, true);
+        return send0(recipient, channelType, msg, null, true, BY_EPHEMERAL_ID);
     }
 
     @Override
@@ -235,7 +258,7 @@ public class DefaultMessagingService extends 
AbstractMessagingService {
             );
         }
 
-        return send0(recipient, channelType, msg, null, false);
+        return send0(recipient, channelType, msg, null, false, byConsistentId);
     }
 
     @Override
@@ -247,12 +270,12 @@ public class DefaultMessagingService extends 
AbstractMessagingService {
             recipient = new ClusterNodeImpl(null, null, 
recipientNetworkAddress);
         }
 
-        return send0(recipient, channelType, msg, null, false);
+        return send0(recipient, channelType, msg, null, false, NEVER_RETRY);
     }
 
     @Override
     public CompletableFuture<Void> respond(InternalClusterNode recipient, 
ChannelType type, NetworkMessage msg, long correlationId) {
-        return send0(recipient, type, msg, correlationId, true);
+        return send0(recipient, type, msg, correlationId, true, 
BY_EPHEMERAL_ID);
     }
 
     @Override
@@ -267,12 +290,12 @@ public class DefaultMessagingService extends 
AbstractMessagingService {
             );
         }
 
-        return send0(recipient, type, msg, correlationId, false);
+        return send0(recipient, type, msg, correlationId, false, 
byConsistentId);
     }
 
     @Override
     public CompletableFuture<NetworkMessage> invoke(InternalClusterNode 
recipient, ChannelType type, NetworkMessage msg, long timeout) {
-        return invoke0(recipient, type, msg, timeout, true);
+        return invoke0(recipient, type, msg, timeout, true, BY_EPHEMERAL_ID);
     }
 
     /** {@inheritDoc} */
@@ -288,7 +311,7 @@ public class DefaultMessagingService extends 
AbstractMessagingService {
             );
         }
 
-        return invoke0(recipient, type, msg, timeout, false);
+        return invoke0(recipient, type, msg, timeout, false, byConsistentId);
     }
 
     /**
@@ -299,6 +322,7 @@ public class DefaultMessagingService extends 
AbstractMessagingService {
      * @param correlationId Correlation id. Not null iff the message is a 
response to a {@link #invoke} request.
      * @param strictIdCheck Whether {@link RecipientLeftException} is to be 
thrown if the node at the other side of the channel
      *     actually has ID different from the ID in the recipient object (that 
is, that the recipient has been restarted).
+     * @param retryStrategy Retry strategy.
      * @return Future of the send operation.
      */
     private CompletableFuture<Void> send0(
@@ -306,7 +330,8 @@ public class DefaultMessagingService extends 
AbstractMessagingService {
             ChannelType type,
             NetworkMessage msg,
             @Nullable Long correlationId,
-            boolean strictIdCheck
+            boolean strictIdCheck,
+            TerminalIoErrorRetryStrategy retryStrategy
     ) {
         if (connectionManager.isStopped()) {
             return failedFuture(new NodeStoppingException());
@@ -343,7 +368,7 @@ public class DefaultMessagingService extends 
AbstractMessagingService {
 
         NetworkMessage message = correlationId != null ? 
responseFromMessage(msg, correlationId) : msg;
 
-        return sendViaNetwork(recipient.id(), type, recipientAddress, message, 
strictIdCheck);
+        return sendViaNetwork(recipient.id(), type, recipientAddress, message, 
strictIdCheck, retryStrategy, recipient.name(), NO_TIMEOUT);
     }
 
     private <U> CompletableFuture<U> 
recipientIsStaleFuture(InternalClusterNode recipient) {
@@ -368,6 +393,7 @@ public class DefaultMessagingService extends 
AbstractMessagingService {
      * @param timeout Invocation timeout.
      * @param strictIdCheck Whether {@link RecipientLeftException} is to be 
thrown if the node at the other side of the channel
      *     actually has ID different from the ID in the recipient object (that 
is, that the recipient has been restarted).
+     * @param retryStrategy Retry strategy.
      * @return A future holding the response or error if the expected response 
was not received.
      */
     private CompletableFuture<NetworkMessage> invoke0(
@@ -375,7 +401,8 @@ public class DefaultMessagingService extends 
AbstractMessagingService {
             ChannelType type,
             NetworkMessage msg,
             long timeout,
-            boolean strictIdCheck
+            boolean strictIdCheck,
+            TerminalIoErrorRetryStrategy retryStrategy
     ) {
         if (connectionManager.isStopped()) {
             return failedFuture(new NodeStoppingException());
@@ -391,14 +418,17 @@ public class DefaultMessagingService extends 
AbstractMessagingService {
 
         // TODO: IGNITE-18493 - remove/move this
         if (shouldDropMessage) {
-            return new CompletableFuture<NetworkMessage>().orTimeout(10, 
TimeUnit.MILLISECONDS);
+            return new CompletableFuture<NetworkMessage>().orTimeout(10, 
MILLISECONDS);
         }
 
         long correlationId = createCorrelationId();
 
         CompletableFuture<NetworkMessage> responseFuture = new 
CompletableFuture<>();
 
-        requestsMap.put(correlationId, new TimeoutObjectImpl(timeout > 0 ? 
coarseCurrentTimeMillis() + timeout : 0, responseFuture, msg));
+        requestsMap.put(
+                correlationId,
+                new TimeoutObjectImpl(timeoutDefined(timeout) ? 
coarseCurrentTimeMillis() + timeout : 0, responseFuture, msg)
+        );
 
         InetSocketAddress recipientAddress = 
resolveRecipientAddress(recipient);
 
@@ -414,31 +444,42 @@ public class DefaultMessagingService extends 
AbstractMessagingService {
 
         InvokeRequest message = requestFromMessage(msg, correlationId);
 
-        return sendViaNetwork(recipient.id(), type, recipientAddress, message, 
strictIdCheck)
+        return sendViaNetwork(recipient.id(), type, recipientAddress, message, 
strictIdCheck, retryStrategy, recipient.name(), timeout)
                 .thenCompose(unused -> responseFuture);
     }
 
+    private static boolean timeoutDefined(long timeout) {
+        return timeout > 0;
+    }
+
     /**
      * Sends network object.
      *
-     * @param nodeId Target node ID.
+     * @param nodeId Target node ID (only {@code null} if sending by address).
      * @param type Channel type for send.
      * @param addr Target address.
      * @param message Message.
      * @param strictIdCheck Whether {@link RecipientLeftException} is to be 
thrown if the node at the other side of the channel
      *     actually has ID different from the ID in the recipient object (that 
is, that the recipient has been restarted).
+     * @param retryStrategy Retry strategy.
+     * @param consistentId Consistent ID of the recipient (if known).
+     * @param channelTimeoutMillis Timeout for channel creation in 
milliseconds. If -1, then no timeout.
      * @return Future of the send operation.
      */
     private CompletableFuture<Void> sendViaNetwork(
-            UUID nodeId,
+            @Nullable UUID nodeId,
             ChannelType type,
             InetSocketAddress addr,
             NetworkMessage message,
-            boolean strictIdCheck
+            boolean strictIdCheck,
+            TerminalIoErrorRetryStrategy retryStrategy,
+            @Nullable String consistentId,
+            long channelTimeoutMillis
     ) {
         if (isInNetworkThread()) {
-            return CompletableFuture.supplyAsync(() -> sendViaNetwork(nodeId, 
type, addr, message, strictIdCheck), outboundExecutor)
-                    .thenCompose(identity());
+            return supplyAsync(() -> {
+                return sendViaNetwork(nodeId, type, addr, message, 
strictIdCheck, retryStrategy, consistentId, channelTimeoutMillis);
+            }, outboundExecutor).thenCompose(identity());
         }
 
         List<ClassDescriptorMessage> descriptors;
@@ -449,8 +490,15 @@ public class DefaultMessagingService extends 
AbstractMessagingService {
             return failedFuture(new IgniteException(INTERNAL_ERR, "Failed to 
marshal message: " + e.getMessage(), e));
         }
 
-        // TODO IGNITE-28225 Retry channel creation in case of network issues.
-        OrderingFuture<NettySender> channelFuture = 
connectionManager.channel(nodeId, type, addr);
+        OrderingFuture<NettySender> channelFuture = openChannelWithRetries(
+                nodeId,
+                type,
+                addr,
+                strictIdCheck,
+                retryStrategy,
+                consistentId,
+                channelTimeoutMillis
+        );
 
         channelFuture.whenComplete((sender, ex) -> maybeLogHandshakeError(ex, 
nodeId, type, addr));
 
@@ -465,12 +513,159 @@ public class DefaultMessagingService extends 
AbstractMessagingService {
 
                     return sender.send(
                             new OutNetworkObject(message, descriptors),
-                            () -> triggerChannelCreation(nodeId, type, addr)
+                            () -> triggerChannelCreation(
+                                    nodeId,
+                                    type,
+                                    addr,
+                                    strictIdCheck,
+                                    retryStrategy,
+                                    consistentId,
+                                    channelTimeoutMillis
+                            )
                     );
                 });
     }
 
-    private void maybeLogHandshakeError(Throwable ex, UUID nodeId, ChannelType 
type, InetSocketAddress addr) {
+    private OrderingFuture<NettySender> openChannelWithRetries(
+            @Nullable UUID nodeId,
+            ChannelType type,
+            InetSocketAddress addr,
+            boolean strictIdCheck,
+            TerminalIoErrorRetryStrategy retryStrategy,
+            @Nullable String consistentId,
+            long timeoutMillis
+    ) {
+        Long deadlineMillis = timeoutMillis == NO_TIMEOUT || 
!timeoutDefined(timeoutMillis) ? null
+                : coarseCurrentTimeMillis() + timeoutMillis;
+        if (deadlineMillis != null && deadlineMillis < 0) {
+            deadlineMillis = Long.MAX_VALUE;
+        }
+
+        return openChannelWithRetriesInternal(nodeId, type, addr, 
strictIdCheck, retryStrategy, consistentId, deadlineMillis, 0);
+    }
+
+    private OrderingFuture<NettySender> openChannelWithRetriesInternal(
+            @Nullable UUID nodeId,
+            ChannelType type,
+            InetSocketAddress addr,
+            boolean strictIdCheck,
+            TerminalIoErrorRetryStrategy retryStrategy,
+            @Nullable String consistentId,
+            @Nullable Long deadlineMillis,
+            int attemptOrdinal
+    ) {
+        if (attemptOrdinal >= 1000) {
+            return OrderingFuture.failedFuture(new IllegalStateException(
+                    "Too many channel creation attempts [attempts=" + 
attemptOrdinal + ", nodeId=" + nodeId + "]."
+            ));
+        }
+
+        return connectionManager.channel(nodeId, type, addr)
+                .handle((res, ex) -> {
+                    if (ex == null) {
+                        return OrderingFuture.completedFuture(res);
+                    }
+
+                    // Only I/O exceptions and BrokenHandshakeException are 
retriable.
+                    if (!causedByIoException(ex) && !hasCause(ex, 
BrokenHandshakeException.class)) {
+                        return OrderingFuture.<NettySender>failedFuture(ex);
+                    }
+
+                    if (strictIdCheck && nodeId != null && 
staleIdDetector.isIdStale(nodeId)) {
+                        metrics.incrementMessageRecipientNotFound();
+
+                        return OrderingFuture.<NettySender>failedFuture(
+                                new RecipientLeftException("Recipient is stale 
[id=" + nodeId + "]")
+                        );
+                    }
+
+                    // We always retry 'temporary I/O failures' (those that 
are not ConnectException or NoRouteToHostException).
+                    boolean temporaryFailure = !recipientIsNotThere(ex);
+
+                    if (!temporaryFailure && 
!retryStrategy.stillRetriable(nodeId, consistentId)) {
+                        Throwable exToReturn = 
retryStrategy.retriesExhaustedMeansRecipientLeft()
+                                ? new RecipientLeftException("Recipient left 
[id=" + nodeId + "].") : ex;
+                        return 
OrderingFuture.<NettySender>failedFuture(exToReturn);
+                    }
+
+                    if (deadlineReached(deadlineMillis)) {
+                        return OrderingFuture.<NettySender>failedFuture(
+                                new TimeoutException("Channel creation timed 
out [id=" + nodeId + "].")
+                        );
+                    }
+
+                    return attemptToOpenChannelAfterDelay(
+                            nodeId,
+                            type,
+                            addr,
+                            strictIdCheck,
+                            retryStrategy,
+                            consistentId,
+                            deadlineMillis,
+                            attemptOrdinal + 1
+                    );
+                })
+                .thenCompose(identity());
+    }
+
+    private static boolean causedByIoException(Throwable ex) {
+        return hasCause(ex, IOException.class);
+    }
+
+    private static boolean recipientIsNotThere(Throwable ex) {
+        return hasCause(ex, ConnectException.class, 
NoRouteToHostException.class);
+    }
+
+    private static boolean deadlineReached(@Nullable Long deadlineMillis) {
+        return deadlineMillis != null && coarseCurrentTimeMillis() >= 
deadlineMillis;
+    }
+
+    private OrderingFuture<NettySender> attemptToOpenChannelAfterDelay(
+            @Nullable UUID nodeId,
+            ChannelType type,
+            InetSocketAddress addr,
+            boolean strictIdCheck,
+            TerminalIoErrorRetryStrategy retryStrategy,
+            @Nullable String consistentId,
+            @Nullable Long deadlineMillis,
+            int attemptOrdinal
+    ) {
+        OrderingFuture<NettySender> nextAttemptFuture = new OrderingFuture<>();
+
+        // Formally, we break ordering guarantees here, but it's not a problem 
as we are only concerned about order of messages,
+        // and the corresponding callbacks will be added to a future we return 
from here, so ordering of message sends will be maintained.
+
+        connectionRetryExecutor.execute(() -> {
+            try {
+                openChannelWithRetriesInternal(
+                        nodeId,
+                        type,
+                        addr,
+                        strictIdCheck,
+                        retryStrategy,
+                        consistentId,
+                        deadlineMillis,
+                        attemptOrdinal
+                ).whenComplete((nextRes, nextEx) -> {
+                    if (nextEx != null) {
+                        nextAttemptFuture.completeExceptionally(nextEx);
+                    } else {
+                        nextAttemptFuture.complete(nextRes);
+                    }
+                });
+            } catch (Throwable e) {
+                nextAttemptFuture.completeExceptionally(e);
+
+                if (e instanceof Error) {
+                    throw e;
+                }
+            }
+        });
+
+        return nextAttemptFuture;
+    }
+
+    private void maybeLogHandshakeError(Throwable ex, @Nullable UUID nodeId, 
ChannelType type, InetSocketAddress addr) {
         if (ex != null) {
             if (hasCause(ex, CriticalHandshakeException.class)) {
                 LOG.error(
@@ -487,8 +682,16 @@ public class DefaultMessagingService extends 
AbstractMessagingService {
         }
     }
 
-    private void triggerChannelCreation(UUID nodeId, ChannelType type, 
InetSocketAddress addr) {
-        connectionManager.channel(nodeId, type, addr);
+    private void triggerChannelCreation(
+            @Nullable UUID nodeId,
+            ChannelType type,
+            InetSocketAddress addr,
+            boolean strictIdCheck,
+            TerminalIoErrorRetryStrategy retryStrategy,
+            @Nullable String consistentId,
+            long timeoutMillis
+    ) {
+        openChannelWithRetries(nodeId, type, addr, strictIdCheck, 
retryStrategy, consistentId, timeoutMillis);
     }
 
     private List<ClassDescriptorMessage> prepareMarshal(NetworkMessage msg) 
throws Exception {
@@ -918,4 +1121,55 @@ public class DefaultMessagingService extends 
AbstractMessagingService {
 
         return address.address();
     }
+
+    private interface TerminalIoErrorRetryStrategy {
+        boolean stillRetriable(@Nullable UUID id, @Nullable String 
consistentId);
+
+        boolean retriesExhaustedMeansRecipientLeft();
+    }
+
+    private static class ByEphemeralIdRetryStrategy implements 
TerminalIoErrorRetryStrategy {
+        @Override
+        public boolean stillRetriable(@Nullable UUID id, @Nullable String 
consistentId) {
+            // TODO: https://issues.apache.org/jira/browse/IGNITE-28225 - make 
retriable.
+            return false;
+        }
+
+        @Override
+        public boolean retriesExhaustedMeansRecipientLeft() {
+            return false;
+        }
+    }
+
+    private static class ByConsistentIdRetryStrategy implements 
TerminalIoErrorRetryStrategy {
+        private final TopologyService topologyService;
+
+        private ByConsistentIdRetryStrategy(TopologyService topologyService) {
+            this.topologyService = topologyService;
+        }
+
+        @Override
+        public boolean stillRetriable(@Nullable UUID id, @Nullable String 
consistentId) {
+            assert consistentId != null;
+
+            return topologyService.getByConsistentId(consistentId) != null;
+        }
+
+        @Override
+        public boolean retriesExhaustedMeansRecipientLeft() {
+            return true;
+        }
+    }
+
+    private static class NeverRetryStrategy implements 
TerminalIoErrorRetryStrategy {
+        @Override
+        public boolean stillRetriable(@Nullable UUID id, @Nullable String 
consistentId) {
+            return false;
+        }
+
+        @Override
+        public boolean retriesExhaustedMeansRecipientLeft() {
+            return false;
+        }
+    }
 }
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/handshake/BrokenHandshakeException.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/handshake/BrokenHandshakeException.java
new file mode 100644
index 00000000000..005ba0e8b8e
--- /dev/null
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/handshake/BrokenHandshakeException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.handshake;
+
+/**
+ * Exception that notifies of handshake that failed because a channel was 
closed or some internal error happened.
+ */
+public class BrokenHandshakeException extends HandshakeException {
+    private static final long serialVersionUID = 0L;
+
+    public BrokenHandshakeException() {
+        super("Channel has been closed before handshake has finished or 
handshake has failed");
+    }
+}
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/HandshakeHandler.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/HandshakeHandler.java
index 5097c44e460..3a62bd9cc13 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/HandshakeHandler.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/HandshakeHandler.java
@@ -25,7 +25,7 @@ import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.network.InternalClusterNode;
 import org.apache.ignite.internal.network.NetworkMessage;
-import org.apache.ignite.internal.network.handshake.HandshakeException;
+import org.apache.ignite.internal.network.handshake.BrokenHandshakeException;
 import org.apache.ignite.internal.network.handshake.HandshakeManager;
 import 
org.apache.ignite.internal.network.serialization.PerSessionSerializationService;
 
@@ -111,9 +111,7 @@ public class HandshakeHandler extends 
ChannelInboundHandlerAdapter {
     public void channelInactive(ChannelHandlerContext ctx) {
         // If this method is called that means channel has been closed before 
handshake has finished or handshake
         // has failed.
-        manager.localHandshakeFuture().completeExceptionally(
-                new HandshakeException("Channel has been closed before 
handshake has finished or handshake has failed")
-        );
+        manager.localHandshakeFuture().completeExceptionally(new 
BrokenHandshakeException());
 
         ctx.fireChannelInactive();
     }
diff --git 
a/modules/network/src/test/java/org/apache/ignite/internal/network/DefaultMessagingServiceTest.java
 
b/modules/network/src/test/java/org/apache/ignite/internal/network/DefaultMessagingServiceTest.java
index d2f77ca0525..82414f9a343 100644
--- 
a/modules/network/src/test/java/org/apache/ignite/internal/network/DefaultMessagingServiceTest.java
+++ 
b/modules/network/src/test/java/org/apache/ignite/internal/network/DefaultMessagingServiceTest.java
@@ -38,14 +38,22 @@ import static org.junit.Assume.assumeThat;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.AdditionalMatchers.or;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.net.ConnectException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.net.NoRouteToHostException;
+import java.net.SocketException;
 import java.net.UnknownHostException;
 import java.time.Instant;
 import java.util.List;
@@ -67,9 +75,11 @@ import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExten
 import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.failure.FailureProcessor;
 import org.apache.ignite.internal.failure.NoOpFailureManager;
+import org.apache.ignite.internal.future.OrderingFuture;
 import org.apache.ignite.internal.manager.ComponentContext;
 import org.apache.ignite.internal.metrics.NoOpMetricManager;
 import org.apache.ignite.internal.network.configuration.NetworkConfiguration;
+import org.apache.ignite.internal.network.handshake.BrokenHandshakeException;
 import org.apache.ignite.internal.network.messages.AllTypesMessageImpl;
 import org.apache.ignite.internal.network.messages.InstantContainer;
 import org.apache.ignite.internal.network.messages.MessageWithInstant;
@@ -655,21 +665,78 @@ class DefaultMessagingServiceTest extends 
BaseIgniteAbstractTest {
         ) {
             CompletableFuture<Void> messageDelivered = new 
CompletableFuture<>();
 
-            receiverServices.messagingService.addMessageHandler(
-                    TestMessageTypes.class,
-                    (message, sender, correlationId) -> {
-                        if (message instanceof TestMessage) {
-                            messageDelivered.complete(null);
+            echoTestMessagesFromInvokes(receiverServices, messageDelivered);
+
+            assertThat(
+                    operation.sendAction.send(senderServices.messagingService, 
testMessage("test"), receiverWithAnotherId),
+                    willCompleteSuccessfully()
+            );
+            if (operation.notRespondOperation()) {
+                assertThat(messageDelivered, willCompleteSuccessfully());
+            }
+        }
+    }
+
+    private static InternalClusterNode copyWithoutName(InternalClusterNode 
node) {
+        return new ClusterNodeImpl(node.id(), null, node.address());
+    }
+
+    private static void echoTestMessagesFromInvokes(Services receiverServices, 
CompletableFuture<Void> messageDelivered) {
+        receiverServices.messagingService.addMessageHandler(
+                TestMessageTypes.class,
+                (message, sender, correlationId) -> {
+                    if (message instanceof TestMessage) {
+                        messageDelivered.complete(null);
 
-                            if (correlationId != null) {
-                                
receiverServices.messagingService.respond(sender, message, correlationId);
-                            }
+                        if (correlationId != null) {
+                            receiverServices.messagingService.respond(sender, 
message, correlationId);
                         }
                     }
+                }
+        );
+    }
+
+    @ParameterizedTest
+    @EnumSource(SendByConsistentCoordinateOperation.class)
+    @EnumSource(SendByClusterNodeOperation.class)
+    void sendThrowsOnNonIoCausedConnectionExceptions(AsyncSendOperation 
operation) throws Exception {
+        try (
+                Services senderServices = createMessagingService(senderNode, 
senderNetworkConfig);
+                Services ignoredReceiverServices = 
createMessagingService(receiverNode, receiverNetworkConfig)
+        ) {
+            RecipientLeftException cause = new RecipientLeftException("Oops");
+
+            doReturn(OrderingFuture.failedFuture(cause))
+                    .when(senderServices.connectionManager).channel(any(), 
or(eq(ChannelType.DEFAULT), eq(TEST_CHANNEL)), any());
+
+            Exception ex = assertWillThrow(
+                    operation.send(senderServices.messagingService, 
testMessage("test"), receiverNode),
+                    Exception.class
             );
 
+            assertThat(ex, is(cause));
+        }
+    }
+
+    @ParameterizedTest
+    @EnumSource(SendByConsistentCoordinateOperation.class)
+    @EnumSource(SendByClusterNodeOperation.class)
+    void sendRetriesOnNonTerminalConnectionExceptions(AsyncSendOperation 
operation) throws Exception {
+        try (
+                Services senderServices = createMessagingService(senderNode, 
senderNetworkConfig);
+                Services receiverServices = 
createMessagingService(receiverNode, receiverNetworkConfig)
+        ) {
+            doReturn(OrderingFuture.failedFuture(new 
SocketException("Connection reset by peer")))
+                    .doReturn(OrderingFuture.failedFuture(new 
BrokenHandshakeException()))
+                    .doCallRealMethod()
+                    .when(senderServices.connectionManager).channel(any(), 
or(eq(ChannelType.DEFAULT), eq(TEST_CHANNEL)), any());
+
+            CompletableFuture<Void> messageDelivered = new 
CompletableFuture<>();
+
+            echoTestMessagesFromInvokes(receiverServices, messageDelivered);
+
             assertThat(
-                    operation.sendAction.send(senderServices.messagingService, 
testMessage("test"), receiverWithAnotherId),
+                    operation.send(senderServices.messagingService, 
testMessage("test"), receiverNode),
                     willCompleteSuccessfully()
             );
             if (operation.notRespondOperation()) {
@@ -678,8 +745,152 @@ class DefaultMessagingServiceTest extends 
BaseIgniteAbstractTest {
         }
     }
 
-    private static InternalClusterNode copyWithoutName(InternalClusterNode 
node) {
-        return new ClusterNodeImpl(node.id(), null, node.address());
+    // TODO: https://issues.apache.org/jira/browse/IGNITE-28225: expand this 
and other tests to also include by-cluster-node send/invoke
+    // modes (when retries for them are added).
+
+    @ParameterizedTest
+    @EnumSource(SendByConsistentCoordinateOperation.class)
+    void 
sendByConsistentIdRetriesOnTerminalIoConnectionExceptions(SendByConsistentCoordinateOperation
 operation) throws Exception {
+        assumeThat(operation, 
is(not(SendByConsistentCoordinateOperation.SEND_BY_ADDRESS)));
+
+        try (
+                Services senderServices = createMessagingService(senderNode, 
senderNetworkConfig);
+                Services receiverServices = 
createMessagingService(receiverNode, receiverNetworkConfig)
+        ) {
+            doReturn(OrderingFuture.failedFuture(new 
ConnectException("Connection refused")))
+                    .doReturn(OrderingFuture.failedFuture(new 
NoRouteToHostException()))
+                    .doCallRealMethod()
+                    .when(senderServices.connectionManager).channel(any(), 
or(eq(ChannelType.DEFAULT), eq(TEST_CHANNEL)), any());
+
+            CompletableFuture<Void> messageDelivered = new 
CompletableFuture<>();
+
+            echoTestMessagesFromInvokes(receiverServices, messageDelivered);
+
+            assertThat(
+                    operation.send(senderServices.messagingService, 
testMessage("test"), receiverNode),
+                    willCompleteSuccessfully()
+            );
+            if (operation.notRespondOperation()) {
+                assertThat(messageDelivered, willCompleteSuccessfully());
+            }
+        }
+    }
+
+    // TODO: https://issues.apache.org/jira/browse/IGNITE-28225 change this 
test when retries are added for sends/invokes by cluster node.
+    @ParameterizedTest
+    @EnumSource(SendByClusterNodeOperation.class)
+    void 
sendByClusterNodeDoesNotRetryOnTerminalIoConnectionExceptions(SendByClusterNodeOperation
 operation) throws Exception {
+        try (
+                Services senderServices = createMessagingService(senderNode, 
senderNetworkConfig);
+                Services ignoredReceiverServices = 
createMessagingService(receiverNode, receiverNetworkConfig)
+        ) {
+            ConnectException cause = new ConnectException("Connection 
refused");
+            doReturn(OrderingFuture.failedFuture(cause))
+                    .when(senderServices.connectionManager).channel(any(), 
or(eq(ChannelType.DEFAULT), eq(TEST_CHANNEL)), any());
+
+            assertWillThrow(operation.send(senderServices.messagingService, 
testMessage("test"), receiverNode), ConnectException.class);
+        }
+    }
+
+    @Test
+    void sendByAddressDoesNotRetryOnTerminalIoConnectionExceptions() throws 
Exception {
+        try (
+                Services senderServices = createMessagingService(senderNode, 
senderNetworkConfig);
+                Services ignoredReceiverServices = 
createMessagingService(receiverNode, receiverNetworkConfig)
+        ) {
+            ConnectException cause = new ConnectException("Connection 
refused");
+            doReturn(OrderingFuture.failedFuture(cause))
+                    .when(senderServices.connectionManager).channel(any(), 
or(eq(ChannelType.DEFAULT), eq(TEST_CHANNEL)), any());
+
+            assertWillThrow(
+                    
senderServices.messagingService.send(receiverNode.address(), 
ChannelType.DEFAULT, testMessage("test")),
+                    ConnectException.class
+            );
+        }
+    }
+
+    @ParameterizedTest
+    @EnumSource(SendByConsistentCoordinateOperation.class)
+    void 
sendByConsistentIdStopsRetryingOnTerminalIoConnectionExceptionsIfReceiverLeavesTopology(
+            SendByConsistentCoordinateOperation operation
+    ) throws Exception {
+        assumeThat(operation, 
is(not(SendByConsistentCoordinateOperation.SEND_BY_ADDRESS)));
+
+        AtomicInteger retryCount = new AtomicInteger();
+
+        try (
+                Services senderServices = createMessagingService(senderNode, 
senderNetworkConfig);
+                Services ignoredReceiverServices = 
createMessagingService(receiverNode, receiverNetworkConfig)
+        ) {
+            
when(topologyService.getByConsistentId(eq(receiverNode.name()))).thenAnswer(invocation
 -> {
+                // Simulate a node leaving topology after 2 retries.
+                if (retryCount.get() >= 2) {
+                    return null;
+                } else {
+                    return receiverNode;
+                }
+            });
+
+            doAnswer(invocation -> {
+                retryCount.incrementAndGet();
+                return OrderingFuture.failedFuture(new 
ConnectException("Connection refused"));
+            }).when(senderServices.connectionManager).channel(any(), 
or(eq(ChannelType.DEFAULT), eq(TEST_CHANNEL)), any());
+
+            assertWillThrow(
+                    operation.send(senderServices.messagingService, 
testMessage("test"), receiverNode),
+                    RecipientLeftException.class
+            );
+        }
+    }
+
+    @ParameterizedTest
+    @EnumSource(SendByClusterNodeOperation.class)
+    void sendByClusterNodeChecksForReceiverStalenessOnTerminalIoException(
+            SendByClusterNodeOperation operation
+    ) throws Exception {
+        assumeThat(operation, 
is(not(SendByConsistentCoordinateOperation.SEND_BY_ADDRESS)));
+
+        AtomicInteger retryCount = new AtomicInteger();
+        InMemoryStaleIds senderSideStaleIdDetector = new InMemoryStaleIds();
+
+        try (
+                Services senderServices = createMessagingService(
+                        senderNode,
+                        senderNetworkConfig,
+                        () -> {},
+                        messageSerializationRegistry,
+                        senderSideStaleIdDetector
+                );
+                Services ignoredReceiverServices = 
createMessagingService(receiverNode, receiverNetworkConfig)
+        ) {
+            doAnswer(invocation -> {
+                if (retryCount.incrementAndGet() == 1) {
+                    senderSideStaleIdDetector.markAsStale(receiverNode.id());
+                }
+                return OrderingFuture.failedFuture(new 
ConnectException("Connection refused"));
+            }).when(senderServices.connectionManager).channel(any(), 
or(eq(ChannelType.DEFAULT), eq(TEST_CHANNEL)), any());
+
+            assertWillThrow(
+                    operation.send(senderServices.messagingService, 
testMessage("test"), receiverNode),
+                    RecipientLeftException.class
+            );
+        }
+    }
+
+    @Test
+    void invokeStopsRetryingConnectionOnTimeout() throws Exception {
+        try (
+                Services senderServices = createMessagingService(senderNode, 
senderNetworkConfig);
+                Services receiverServices = 
createMessagingService(receiverNode, receiverNetworkConfig)
+        ) {
+            doReturn(OrderingFuture.failedFuture(new 
ConnectException("Connection refused")))
+                    .when(senderServices.connectionManager).channel(any(), 
or(eq(ChannelType.DEFAULT), eq(TEST_CHANNEL)), any());
+
+            CompletableFuture<Void> messageDelivered = new 
CompletableFuture<>();
+            echoTestMessagesFromInvokes(receiverServices, messageDelivered);
+
+            
assertWillThrow(senderServices.messagingService.invoke(receiverNode.name(), 
testMessage("test"), 300), TimeoutException.class);
+        }
     }
 
     private static void awaitQuietly(CountDownLatch latch) {
@@ -726,7 +937,7 @@ class DefaultMessagingServiceTest extends 
BaseIgniteAbstractTest {
 
         assertThat(bootstrapFactory.startAsync(new ComponentContext()), 
willCompleteSuccessfully());
 
-        ConnectionManager connectionManager = new TestConnectionManager(
+        ConnectionManager connectionManager = spy(new TestConnectionManager(
                 networkConfig,
                 serializationService,
                 node,
@@ -734,7 +945,7 @@ class DefaultMessagingServiceTest extends 
BaseIgniteAbstractTest {
                 staleIdDetector,
                 clusterIdSupplier,
                 beforeHandshake
-        );
+        ));
 
         DefaultMessagingService messagingService = new DefaultMessagingService(
                 node.name(),
@@ -936,6 +1147,10 @@ class DefaultMessagingServiceTest extends 
BaseIgniteAbstractTest {
 
     private interface AsyncSendOperation {
         CompletableFuture<?> send(MessagingService service, TestMessage 
message, InternalClusterNode recipient);
+
+        boolean notRespondOperation();
+
+        boolean isInvoke();
     }
 
     @FunctionalInterface
@@ -961,6 +1176,16 @@ class DefaultMessagingServiceTest extends 
BaseIgniteAbstractTest {
         public CompletableFuture<?> send(MessagingService service, TestMessage 
message, InternalClusterNode recipient) {
             return sendAction.send(service, message, recipient);
         }
+
+        @Override
+        public boolean notRespondOperation() {
+            return this != RESPOND_DEFAULT_CHANNEL && this != 
RESPOND_SPECIFIC_CHANNEL;
+        }
+
+        @Override
+        public boolean isInvoke() {
+            return this == INVOKE_DEFAULT_CHANNEL || this == 
INVOKE_SPECIFIC_CHANNEL;
+        }
     }
 
     private enum SendByConsistentCoordinateOperation implements 
AsyncSendOperation {
@@ -982,8 +1207,14 @@ class DefaultMessagingServiceTest extends 
BaseIgniteAbstractTest {
             return sendAction.send(service, message, recipient);
         }
 
-        private boolean notRespondOperation() {
+        @Override
+        public boolean notRespondOperation() {
             return this != RESPOND_BY_CONSISTENT_ID_DEFAULT_CHANNEL && this != 
RESPOND_BY_CONSISTENT_ID_SPECIFIC_CHANNEL;
         }
+
+        @Override
+        public boolean isInvoke() {
+            return this == INVOKE_BY_CONSISTENT_ID_DEFAULT_CHANNEL || this == 
INVOKE_BY_CONSISTENT_ID_SPECIFIC_CHANNEL;
+        }
     }
 }

Reply via email to