This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 35346f2bbb3 IGNITE-28501 Retry channel open attempt in most cases
(#7964)
35346f2bbb3 is described below
commit 35346f2bbb3e7f60d74719cb57379d722fd1f32d
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Tue Apr 14 11:57:56 2026 +0400
IGNITE-28501 Retry channel open attempt in most cases (#7964)
* Retry on non-terminal I/O exceptions in all send/invoke modes
* Retry on terminal I/O exceptions (like 'Connection refused') in
send/invoke-by-name (but only until recipient falls off the physical topology)
* Take invocation timeout into consideration
---
.../network/scalecube/ItNodeRestartsTest.java | 132 +++++++--
.../internal/network/DefaultMessagingService.java | 302 +++++++++++++++++++--
.../handshake/BrokenHandshakeException.java | 29 ++
.../internal/network/netty/HandshakeHandler.java | 6 +-
.../network/DefaultMessagingServiceTest.java | 259 +++++++++++++++++-
5 files changed, 664 insertions(+), 64 deletions(-)
diff --git
a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/scalecube/ItNodeRestartsTest.java
b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/scalecube/ItNodeRestartsTest.java
index 80370612f3f..d0a2f0d467e 100644
---
a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/scalecube/ItNodeRestartsTest.java
+++
b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/scalecube/ItNodeRestartsTest.java
@@ -38,25 +38,34 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.manager.ComponentContext;
+import org.apache.ignite.internal.network.ChannelType;
import org.apache.ignite.internal.network.ClusterIdSupplier;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.ConstantClusterIdSupplier;
import org.apache.ignite.internal.network.InternalClusterNode;
+import org.apache.ignite.internal.network.MessagingService;
+import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.network.NodeFinder;
import org.apache.ignite.internal.network.RecipientLeftException;
import org.apache.ignite.internal.network.StaticNodeFinder;
-import org.apache.ignite.internal.network.handshake.HandshakeException;
+import org.apache.ignite.internal.network.UnresolvableConsistentIdException;
+import org.apache.ignite.internal.network.handshake.BrokenHandshakeException;
import org.apache.ignite.internal.network.messages.TestMessage;
+import org.apache.ignite.internal.network.messages.TestMessageTypes;
import org.apache.ignite.internal.network.messages.TestMessagesFactory;
import org.apache.ignite.internal.network.recovery.InMemoryStaleIds;
import org.apache.ignite.internal.network.utils.ClusterServiceTestUtils;
import org.apache.ignite.network.NetworkAddress;
import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
/**
* Tests if a topology size is correct after some nodes are restarted in quick
succession.
@@ -70,6 +79,13 @@ class ItNodeRestartsTest {
/** Created {@link ClusterService}s. Needed for resource management. */
private List<ClusterService> services;
+ private TestInfo testInfo;
+
+ @BeforeEach
+ void setUp(TestInfo testInfo) {
+ this.testInfo = testInfo;
+ }
+
/** Tear down method. */
@AfterEach
void tearDown() {
@@ -80,7 +96,7 @@ class ItNodeRestartsTest {
* Tests that restarting nodes get discovered in an established topology.
*/
@Test
- public void testRestarts(TestInfo testInfo) {
+ void testRestarts() {
final int initPort = 3344;
List<NetworkAddress> addresses = findLocalAddresses(initPort, initPort
+ 5);
@@ -140,9 +156,19 @@ class ItNodeRestartsTest {
assertThat(clusterService.startAsync(new ComponentContext()),
willCompleteSuccessfully());
+ echoOnInvokeRequests(clusterService);
+
return clusterService;
}
+ private static void echoOnInvokeRequests(ClusterService clusterService) {
+
clusterService.messagingService().addMessageHandler(TestMessageTypes.class,
(message, sender, correlationId) -> {
+ if (message instanceof TestMessage && correlationId != null) {
+ clusterService.messagingService().respond(sender, message,
correlationId);
+ }
+ });
+ }
+
/**
* Blocks until the given topology reaches {@code expected} amount of
members.
*
@@ -173,8 +199,9 @@ class ItNodeRestartsTest {
/**
* Tests that message sends only end with expected exceptions during
recipient node restart.
*/
- @Test
- public void testRestartDuringSends(TestInfo testInfo) {
+ @ParameterizedTest
+ @EnumSource(SendOperation.class)
+ void testRestartDuringSends(SendOperation operation) {
final int initPort = 3344;
List<NetworkAddress> addresses = findLocalAddresses(initPort, initPort
+ 2);
@@ -191,45 +218,37 @@ class ItNodeRestartsTest {
}
ClusterService sender = services.get(0);
- ClusterService receiver = services.get(1);
AtomicBoolean sending = new AtomicBoolean(true);
- CompletableFuture<Void> sendingFuture = runAsync(() -> {
- InternalClusterNode receiverNode = receiver.staticLocalNode();
+ int receiverIndex = 1;
+ AtomicReference<InternalClusterNode> receiverNodeRef = new
AtomicReference<>(services.get(receiverIndex).staticLocalNode());
+ CompletableFuture<Void> sendingFuture = runAsync(() -> {
while (sending.get()) {
+ InternalClusterNode receiverNode = receiverNodeRef.get();
+
TestMessage message = new
TestMessagesFactory().testMessage().build();
- CompletableFuture<Void> future =
sender.messagingService().send(receiverNode, message);
+ CompletableFuture<Void> future =
operation.send(sender.messagingService(), receiverNode, message);
try {
future.get(10, SECONDS);
} catch (InterruptedException | TimeoutException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
- // TODO:
https://issues.apache.org/jira/browse/IGNITE-21364 - remove everything except
RecipientLeftException.
- if (!hasCause(e, RecipientLeftException.class,
ConnectException.class, SocketException.class, IOException.class)) {
- if (!hasCause(e, "Channel has been closed before
handshake has finished", HandshakeException.class)) {
- fail("Not an expected exception", e);
- }
+ if (!operation.isAllowed(e)) {
+ fail("Not an expected exception", e);
}
}
-
- try {
- Thread.sleep(10);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
}
});
- int receiverIndex = 1;
-
- for (int i = 0; i < 10; i++) {
+ for (int i = 0; i < 100; i++) {
stopClusterService(services.get(receiverIndex));
ClusterService restartedReceiver = startNetwork(testInfo,
addresses.get(receiverIndex), nodeFinder);
services.set(receiverIndex, restartedReceiver);
+ receiverNodeRef.set(restartedReceiver.staticLocalNode());
}
sending.set(false);
@@ -240,4 +259,73 @@ class ItNodeRestartsTest {
private static void stopClusterService(ClusterService clusterService) {
assertThat(stopAsync(new ComponentContext(), clusterService),
willCompleteSuccessfully());
}
+
+ private enum SendOperation {
+ SEND_BY_NODE {
+ @Override
+ CompletableFuture<Void> send(MessagingService senderService,
InternalClusterNode receiverNode, NetworkMessage message) {
+ return senderService.send(receiverNode, message);
+ }
+
+ @Override
+ boolean isAllowed(ExecutionException ex) {
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-21364 -
remove everything except RecipientLeftException.
+ return hasCause(
+ ex,
+ RecipientLeftException.class,
+ ConnectException.class,
+ SocketException.class,
+ IOException.class,
+ BrokenHandshakeException.class
+ );
+ }
+ },
+ INVOKE_BY_NODE {
+ @Override
+ CompletableFuture<Void> send(MessagingService senderService,
InternalClusterNode receiverNode, NetworkMessage message) {
+ return senderService.invoke(receiverNode, message,
1000).thenApply(unused -> null);
+ }
+
+ @Override
+ boolean isAllowed(ExecutionException ex) {
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-21364 -
remove everything except RecipientLeftException
+ // and TimeoutException.
+ return hasCause(
+ ex,
+ RecipientLeftException.class,
+ TimeoutException.class,
+ ConnectException.class,
+ SocketException.class,
+ IOException.class,
+ BrokenHandshakeException.class
+ );
+ }
+ },
+ SEND_BY_NAME {
+ @Override
+ CompletableFuture<Void> send(MessagingService senderService,
InternalClusterNode receiverNode, NetworkMessage message) {
+ return senderService.send(receiverNode.name(),
ChannelType.DEFAULT, message);
+ }
+
+ @Override
+ boolean isAllowed(ExecutionException ex) {
+ return hasCause(ex, RecipientLeftException.class,
UnresolvableConsistentIdException.class);
+ }
+ },
+ INVOKE_BY_NAME {
+ @Override
+ CompletableFuture<Void> send(MessagingService senderService,
InternalClusterNode receiverNode, NetworkMessage message) {
+ return senderService.invoke(receiverNode.name(), message,
1000).thenApply(unused -> null);
+ }
+
+ @Override
+ boolean isAllowed(ExecutionException ex) {
+ return hasCause(ex, RecipientLeftException.class,
UnresolvableConsistentIdException.class, TimeoutException.class);
+ }
+ };
+
+ abstract CompletableFuture<Void> send(MessagingService senderService,
InternalClusterNode receiverNode, NetworkMessage message);
+
+ abstract boolean isAllowed(ExecutionException ex);
+ }
}
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java
index 3f47e6c0f3d..b93112a54ff 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java
@@ -17,7 +17,10 @@
package org.apache.ignite.internal.network;
+import static java.util.concurrent.CompletableFuture.delayedExecutor;
import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.concurrent.CompletableFuture.supplyAsync;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.function.Function.identity;
import static
org.apache.ignite.internal.lang.IgniteSystemProperties.LONG_HANDLING_LOGGING_ENABLED;
import static
org.apache.ignite.internal.metrics.sources.ThreadPoolMetricSource.THREAD_POOLS_METRICS_SOURCE_NAME;
@@ -37,7 +40,9 @@ import static
org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import it.unimi.dsi.fastutil.ints.IntSet;
import java.io.IOException;
+import java.net.ConnectException;
import java.net.InetSocketAddress;
+import java.net.NoRouteToHostException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
@@ -48,6 +53,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiPredicate;
import org.apache.ignite.internal.failure.FailureContext;
@@ -60,6 +66,7 @@ import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.metrics.MetricManager;
+import org.apache.ignite.internal.network.handshake.BrokenHandshakeException;
import org.apache.ignite.internal.network.handshake.CriticalHandshakeException;
import org.apache.ignite.internal.network.handshake.HandshakeException;
import org.apache.ignite.internal.network.message.ClassDescriptorMessage;
@@ -85,6 +92,14 @@ import org.jetbrains.annotations.TestOnly;
public class DefaultMessagingService extends AbstractMessagingService {
private static final IgniteLogger LOG =
Loggers.forClass(DefaultMessagingService.class);
+ private static final long NO_TIMEOUT = -1;
+
+ private static final TerminalIoErrorRetryStrategy NEVER_RETRY = new
NeverRetryStrategy();
+
+ private static final TerminalIoErrorRetryStrategy BY_EPHEMERAL_ID = new
ByEphemeralIdRetryStrategy();
+
+ private static final long CONNECTION_RETRY_DELAY_MS = 100;
+
private final boolean longHandlingLoggingEnabled =
IgniteSystemProperties.getBoolean(LONG_HANDLING_LOGGING_ENABLED, false);
/** Network messages factory. */
@@ -145,6 +160,10 @@ public class DefaultMessagingService extends
AbstractMessagingService {
*/
private final Map<UUID, RecipientInetAddress> recipientInetAddrByNodeId =
new ConcurrentHashMap<>();
+ private final Executor connectionRetryExecutor;
+
+ private final TerminalIoErrorRetryStrategy byConsistentId;
+
/**
* Constructor.
*
@@ -211,6 +230,10 @@ public class DefaultMessagingService extends
AbstractMessagingService {
requestsMap,
failureProcessor
);
+
+ connectionRetryExecutor = delayedExecutor(CONNECTION_RETRY_DELAY_MS,
MILLISECONDS, outboundExecutor);
+
+ byConsistentId = new ByConsistentIdRetryStrategy(topologyService);
}
@Override
@@ -220,7 +243,7 @@ public class DefaultMessagingService extends
AbstractMessagingService {
@Override
public CompletableFuture<Void> send(InternalClusterNode recipient,
ChannelType channelType, NetworkMessage msg) {
- return send0(recipient, channelType, msg, null, true);
+ return send0(recipient, channelType, msg, null, true, BY_EPHEMERAL_ID);
}
@Override
@@ -235,7 +258,7 @@ public class DefaultMessagingService extends
AbstractMessagingService {
);
}
- return send0(recipient, channelType, msg, null, false);
+ return send0(recipient, channelType, msg, null, false, byConsistentId);
}
@Override
@@ -247,12 +270,12 @@ public class DefaultMessagingService extends
AbstractMessagingService {
recipient = new ClusterNodeImpl(null, null,
recipientNetworkAddress);
}
- return send0(recipient, channelType, msg, null, false);
+ return send0(recipient, channelType, msg, null, false, NEVER_RETRY);
}
@Override
public CompletableFuture<Void> respond(InternalClusterNode recipient,
ChannelType type, NetworkMessage msg, long correlationId) {
- return send0(recipient, type, msg, correlationId, true);
+ return send0(recipient, type, msg, correlationId, true,
BY_EPHEMERAL_ID);
}
@Override
@@ -267,12 +290,12 @@ public class DefaultMessagingService extends
AbstractMessagingService {
);
}
- return send0(recipient, type, msg, correlationId, false);
+ return send0(recipient, type, msg, correlationId, false,
byConsistentId);
}
@Override
public CompletableFuture<NetworkMessage> invoke(InternalClusterNode
recipient, ChannelType type, NetworkMessage msg, long timeout) {
- return invoke0(recipient, type, msg, timeout, true);
+ return invoke0(recipient, type, msg, timeout, true, BY_EPHEMERAL_ID);
}
/** {@inheritDoc} */
@@ -288,7 +311,7 @@ public class DefaultMessagingService extends
AbstractMessagingService {
);
}
- return invoke0(recipient, type, msg, timeout, false);
+ return invoke0(recipient, type, msg, timeout, false, byConsistentId);
}
/**
@@ -299,6 +322,7 @@ public class DefaultMessagingService extends
AbstractMessagingService {
* @param correlationId Correlation id. Not null iff the message is a
response to a {@link #invoke} request.
* @param strictIdCheck Whether {@link RecipientLeftException} is to be
thrown if the node at the other side of the channel
* actually has ID different from the ID in the recipient object (that
is, that the recipient has been restarted).
+ * @param retryStrategy Retry strategy.
* @return Future of the send operation.
*/
private CompletableFuture<Void> send0(
@@ -306,7 +330,8 @@ public class DefaultMessagingService extends
AbstractMessagingService {
ChannelType type,
NetworkMessage msg,
@Nullable Long correlationId,
- boolean strictIdCheck
+ boolean strictIdCheck,
+ TerminalIoErrorRetryStrategy retryStrategy
) {
if (connectionManager.isStopped()) {
return failedFuture(new NodeStoppingException());
@@ -343,7 +368,7 @@ public class DefaultMessagingService extends
AbstractMessagingService {
NetworkMessage message = correlationId != null ?
responseFromMessage(msg, correlationId) : msg;
- return sendViaNetwork(recipient.id(), type, recipientAddress, message,
strictIdCheck);
+ return sendViaNetwork(recipient.id(), type, recipientAddress, message,
strictIdCheck, retryStrategy, recipient.name(), NO_TIMEOUT);
}
private <U> CompletableFuture<U>
recipientIsStaleFuture(InternalClusterNode recipient) {
@@ -368,6 +393,7 @@ public class DefaultMessagingService extends
AbstractMessagingService {
* @param timeout Invocation timeout.
* @param strictIdCheck Whether {@link RecipientLeftException} is to be
thrown if the node at the other side of the channel
* actually has ID different from the ID in the recipient object (that
is, that the recipient has been restarted).
+ * @param retryStrategy Retry strategy.
* @return A future holding the response or error if the expected response
was not received.
*/
private CompletableFuture<NetworkMessage> invoke0(
@@ -375,7 +401,8 @@ public class DefaultMessagingService extends
AbstractMessagingService {
ChannelType type,
NetworkMessage msg,
long timeout,
- boolean strictIdCheck
+ boolean strictIdCheck,
+ TerminalIoErrorRetryStrategy retryStrategy
) {
if (connectionManager.isStopped()) {
return failedFuture(new NodeStoppingException());
@@ -391,14 +418,17 @@ public class DefaultMessagingService extends
AbstractMessagingService {
// TODO: IGNITE-18493 - remove/move this
if (shouldDropMessage) {
- return new CompletableFuture<NetworkMessage>().orTimeout(10,
TimeUnit.MILLISECONDS);
+ return new CompletableFuture<NetworkMessage>().orTimeout(10,
MILLISECONDS);
}
long correlationId = createCorrelationId();
CompletableFuture<NetworkMessage> responseFuture = new
CompletableFuture<>();
- requestsMap.put(correlationId, new TimeoutObjectImpl(timeout > 0 ?
coarseCurrentTimeMillis() + timeout : 0, responseFuture, msg));
+ requestsMap.put(
+ correlationId,
+ new TimeoutObjectImpl(timeoutDefined(timeout) ?
coarseCurrentTimeMillis() + timeout : 0, responseFuture, msg)
+ );
InetSocketAddress recipientAddress =
resolveRecipientAddress(recipient);
@@ -414,31 +444,42 @@ public class DefaultMessagingService extends
AbstractMessagingService {
InvokeRequest message = requestFromMessage(msg, correlationId);
- return sendViaNetwork(recipient.id(), type, recipientAddress, message,
strictIdCheck)
+ return sendViaNetwork(recipient.id(), type, recipientAddress, message,
strictIdCheck, retryStrategy, recipient.name(), timeout)
.thenCompose(unused -> responseFuture);
}
+ private static boolean timeoutDefined(long timeout) {
+ return timeout > 0;
+ }
+
/**
* Sends network object.
*
- * @param nodeId Target node ID.
+ * @param nodeId Target node ID (only {@code null} if sending by address).
* @param type Channel type for send.
* @param addr Target address.
* @param message Message.
* @param strictIdCheck Whether {@link RecipientLeftException} is to be
thrown if the node at the other side of the channel
* actually has ID different from the ID in the recipient object (that
is, that the recipient has been restarted).
+ * @param retryStrategy Retry strategy.
+ * @param consistentId Consistent ID of the recipient (if known).
+ * @param channelTimeoutMillis Timeout for channel creation in
milliseconds. If -1, then no timeout.
* @return Future of the send operation.
*/
private CompletableFuture<Void> sendViaNetwork(
- UUID nodeId,
+ @Nullable UUID nodeId,
ChannelType type,
InetSocketAddress addr,
NetworkMessage message,
- boolean strictIdCheck
+ boolean strictIdCheck,
+ TerminalIoErrorRetryStrategy retryStrategy,
+ @Nullable String consistentId,
+ long channelTimeoutMillis
) {
if (isInNetworkThread()) {
- return CompletableFuture.supplyAsync(() -> sendViaNetwork(nodeId,
type, addr, message, strictIdCheck), outboundExecutor)
- .thenCompose(identity());
+ return supplyAsync(() -> {
+ return sendViaNetwork(nodeId, type, addr, message,
strictIdCheck, retryStrategy, consistentId, channelTimeoutMillis);
+ }, outboundExecutor).thenCompose(identity());
}
List<ClassDescriptorMessage> descriptors;
@@ -449,8 +490,15 @@ public class DefaultMessagingService extends
AbstractMessagingService {
return failedFuture(new IgniteException(INTERNAL_ERR, "Failed to
marshal message: " + e.getMessage(), e));
}
- // TODO IGNITE-28225 Retry channel creation in case of network issues.
- OrderingFuture<NettySender> channelFuture =
connectionManager.channel(nodeId, type, addr);
+ OrderingFuture<NettySender> channelFuture = openChannelWithRetries(
+ nodeId,
+ type,
+ addr,
+ strictIdCheck,
+ retryStrategy,
+ consistentId,
+ channelTimeoutMillis
+ );
channelFuture.whenComplete((sender, ex) -> maybeLogHandshakeError(ex,
nodeId, type, addr));
@@ -465,12 +513,159 @@ public class DefaultMessagingService extends
AbstractMessagingService {
return sender.send(
new OutNetworkObject(message, descriptors),
- () -> triggerChannelCreation(nodeId, type, addr)
+ () -> triggerChannelCreation(
+ nodeId,
+ type,
+ addr,
+ strictIdCheck,
+ retryStrategy,
+ consistentId,
+ channelTimeoutMillis
+ )
);
});
}
- private void maybeLogHandshakeError(Throwable ex, UUID nodeId, ChannelType
type, InetSocketAddress addr) {
+ private OrderingFuture<NettySender> openChannelWithRetries(
+ @Nullable UUID nodeId,
+ ChannelType type,
+ InetSocketAddress addr,
+ boolean strictIdCheck,
+ TerminalIoErrorRetryStrategy retryStrategy,
+ @Nullable String consistentId,
+ long timeoutMillis
+ ) {
+ Long deadlineMillis = timeoutMillis == NO_TIMEOUT ||
!timeoutDefined(timeoutMillis) ? null
+ : coarseCurrentTimeMillis() + timeoutMillis;
+ if (deadlineMillis != null && deadlineMillis < 0) {
+ deadlineMillis = Long.MAX_VALUE;
+ }
+
+ return openChannelWithRetriesInternal(nodeId, type, addr,
strictIdCheck, retryStrategy, consistentId, deadlineMillis, 0);
+ }
+
+ private OrderingFuture<NettySender> openChannelWithRetriesInternal(
+ @Nullable UUID nodeId,
+ ChannelType type,
+ InetSocketAddress addr,
+ boolean strictIdCheck,
+ TerminalIoErrorRetryStrategy retryStrategy,
+ @Nullable String consistentId,
+ @Nullable Long deadlineMillis,
+ int attemptOrdinal
+ ) {
+ if (attemptOrdinal >= 1000) {
+ return OrderingFuture.failedFuture(new IllegalStateException(
+ "Too many channel creation attempts [attempts=" +
attemptOrdinal + ", nodeId=" + nodeId + "]."
+ ));
+ }
+
+ return connectionManager.channel(nodeId, type, addr)
+ .handle((res, ex) -> {
+ if (ex == null) {
+ return OrderingFuture.completedFuture(res);
+ }
+
+ // Only I/O exceptions and BrokenHandshakeException are
retriable.
+ if (!causedByIoException(ex) && !hasCause(ex,
BrokenHandshakeException.class)) {
+ return OrderingFuture.<NettySender>failedFuture(ex);
+ }
+
+ if (strictIdCheck && nodeId != null &&
staleIdDetector.isIdStale(nodeId)) {
+ metrics.incrementMessageRecipientNotFound();
+
+ return OrderingFuture.<NettySender>failedFuture(
+ new RecipientLeftException("Recipient is stale
[id=" + nodeId + "]")
+ );
+ }
+
+ // We always retry 'temporary I/O failures' (those that
are not ConnectException or NoRouteToHostException).
+ boolean temporaryFailure = !recipientIsNotThere(ex);
+
+ if (!temporaryFailure &&
!retryStrategy.stillRetriable(nodeId, consistentId)) {
+ Throwable exToReturn =
retryStrategy.retriesExhaustedMeansRecipientLeft()
+ ? new RecipientLeftException("Recipient left
[id=" + nodeId + "].") : ex;
+ return
OrderingFuture.<NettySender>failedFuture(exToReturn);
+ }
+
+ if (deadlineReached(deadlineMillis)) {
+ return OrderingFuture.<NettySender>failedFuture(
+ new TimeoutException("Channel creation timed
out [id=" + nodeId + "].")
+ );
+ }
+
+ return attemptToOpenChannelAfterDelay(
+ nodeId,
+ type,
+ addr,
+ strictIdCheck,
+ retryStrategy,
+ consistentId,
+ deadlineMillis,
+ attemptOrdinal + 1
+ );
+ })
+ .thenCompose(identity());
+ }
+
+ private static boolean causedByIoException(Throwable ex) {
+ return hasCause(ex, IOException.class);
+ }
+
+ private static boolean recipientIsNotThere(Throwable ex) {
+ return hasCause(ex, ConnectException.class,
NoRouteToHostException.class);
+ }
+
+ private static boolean deadlineReached(@Nullable Long deadlineMillis) {
+ return deadlineMillis != null && coarseCurrentTimeMillis() >=
deadlineMillis;
+ }
+
+ private OrderingFuture<NettySender> attemptToOpenChannelAfterDelay(
+ @Nullable UUID nodeId,
+ ChannelType type,
+ InetSocketAddress addr,
+ boolean strictIdCheck,
+ TerminalIoErrorRetryStrategy retryStrategy,
+ @Nullable String consistentId,
+ @Nullable Long deadlineMillis,
+ int attemptOrdinal
+ ) {
+ OrderingFuture<NettySender> nextAttemptFuture = new OrderingFuture<>();
+
+ // Formally, we break ordering guarantees here, but it's not a problem
as we are only concerned about order of messages,
+ // and the corresponding callbacks will be added to a future we return
from here, so ordering of message sends will be maintained.
+
+ connectionRetryExecutor.execute(() -> {
+ try {
+ openChannelWithRetriesInternal(
+ nodeId,
+ type,
+ addr,
+ strictIdCheck,
+ retryStrategy,
+ consistentId,
+ deadlineMillis,
+ attemptOrdinal
+ ).whenComplete((nextRes, nextEx) -> {
+ if (nextEx != null) {
+ nextAttemptFuture.completeExceptionally(nextEx);
+ } else {
+ nextAttemptFuture.complete(nextRes);
+ }
+ });
+ } catch (Throwable e) {
+ nextAttemptFuture.completeExceptionally(e);
+
+ if (e instanceof Error) {
+ throw e;
+ }
+ }
+ });
+
+ return nextAttemptFuture;
+ }
+
+ private void maybeLogHandshakeError(Throwable ex, @Nullable UUID nodeId,
ChannelType type, InetSocketAddress addr) {
if (ex != null) {
if (hasCause(ex, CriticalHandshakeException.class)) {
LOG.error(
@@ -487,8 +682,16 @@ public class DefaultMessagingService extends
AbstractMessagingService {
}
}
- private void triggerChannelCreation(UUID nodeId, ChannelType type,
InetSocketAddress addr) {
- connectionManager.channel(nodeId, type, addr);
+ private void triggerChannelCreation(
+ @Nullable UUID nodeId,
+ ChannelType type,
+ InetSocketAddress addr,
+ boolean strictIdCheck,
+ TerminalIoErrorRetryStrategy retryStrategy,
+ @Nullable String consistentId,
+ long timeoutMillis
+ ) {
+ openChannelWithRetries(nodeId, type, addr, strictIdCheck,
retryStrategy, consistentId, timeoutMillis);
}
private List<ClassDescriptorMessage> prepareMarshal(NetworkMessage msg)
throws Exception {
@@ -918,4 +1121,55 @@ public class DefaultMessagingService extends
AbstractMessagingService {
return address.address();
}
+
+ private interface TerminalIoErrorRetryStrategy {
+ boolean stillRetriable(@Nullable UUID id, @Nullable String
consistentId);
+
+ boolean retriesExhaustedMeansRecipientLeft();
+ }
+
+ private static class ByEphemeralIdRetryStrategy implements
TerminalIoErrorRetryStrategy {
+ @Override
+ public boolean stillRetriable(@Nullable UUID id, @Nullable String
consistentId) {
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-28225 - make
retriable.
+ return false;
+ }
+
+ @Override
+ public boolean retriesExhaustedMeansRecipientLeft() {
+ return false;
+ }
+ }
+
+ private static class ByConsistentIdRetryStrategy implements
TerminalIoErrorRetryStrategy {
+ private final TopologyService topologyService;
+
+ private ByConsistentIdRetryStrategy(TopologyService topologyService) {
+ this.topologyService = topologyService;
+ }
+
+ @Override
+ public boolean stillRetriable(@Nullable UUID id, @Nullable String
consistentId) {
+ assert consistentId != null;
+
+ return topologyService.getByConsistentId(consistentId) != null;
+ }
+
+ @Override
+ public boolean retriesExhaustedMeansRecipientLeft() {
+ return true;
+ }
+ }
+
+ private static class NeverRetryStrategy implements
TerminalIoErrorRetryStrategy {
+ @Override
+ public boolean stillRetriable(@Nullable UUID id, @Nullable String
consistentId) {
+ return false;
+ }
+
+ @Override
+ public boolean retriesExhaustedMeansRecipientLeft() {
+ return false;
+ }
+ }
}
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/handshake/BrokenHandshakeException.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/handshake/BrokenHandshakeException.java
new file mode 100644
index 00000000000..005ba0e8b8e
--- /dev/null
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/handshake/BrokenHandshakeException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.handshake;
+
+/**
+ * Exception that notifies of handshake that failed because a channel was
closed or some internal error happened.
+ */
+public class BrokenHandshakeException extends HandshakeException {
+ private static final long serialVersionUID = 0L;
+
+ public BrokenHandshakeException() {
+ super("Channel has been closed before handshake has finished or
handshake has failed");
+ }
+}
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/HandshakeHandler.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/HandshakeHandler.java
index 5097c44e460..3a62bd9cc13 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/HandshakeHandler.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/HandshakeHandler.java
@@ -25,7 +25,7 @@ import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.network.InternalClusterNode;
import org.apache.ignite.internal.network.NetworkMessage;
-import org.apache.ignite.internal.network.handshake.HandshakeException;
+import org.apache.ignite.internal.network.handshake.BrokenHandshakeException;
import org.apache.ignite.internal.network.handshake.HandshakeManager;
import
org.apache.ignite.internal.network.serialization.PerSessionSerializationService;
@@ -111,9 +111,7 @@ public class HandshakeHandler extends
ChannelInboundHandlerAdapter {
public void channelInactive(ChannelHandlerContext ctx) {
// If this method is called that means channel has been closed before
handshake has finished or handshake
// has failed.
- manager.localHandshakeFuture().completeExceptionally(
- new HandshakeException("Channel has been closed before
handshake has finished or handshake has failed")
- );
+ manager.localHandshakeFuture().completeExceptionally(new
BrokenHandshakeException());
ctx.fireChannelInactive();
}
diff --git
a/modules/network/src/test/java/org/apache/ignite/internal/network/DefaultMessagingServiceTest.java
b/modules/network/src/test/java/org/apache/ignite/internal/network/DefaultMessagingServiceTest.java
index d2f77ca0525..82414f9a343 100644
---
a/modules/network/src/test/java/org/apache/ignite/internal/network/DefaultMessagingServiceTest.java
+++
b/modules/network/src/test/java/org/apache/ignite/internal/network/DefaultMessagingServiceTest.java
@@ -38,14 +38,22 @@ import static org.junit.Assume.assumeThat;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.AdditionalMatchers.or;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.net.ConnectException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
+import java.net.NoRouteToHostException;
+import java.net.SocketException;
import java.net.UnknownHostException;
import java.time.Instant;
import java.util.List;
@@ -67,9 +75,11 @@ import
org.apache.ignite.internal.configuration.testframework.ConfigurationExten
import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.failure.NoOpFailureManager;
+import org.apache.ignite.internal.future.OrderingFuture;
import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.metrics.NoOpMetricManager;
import org.apache.ignite.internal.network.configuration.NetworkConfiguration;
+import org.apache.ignite.internal.network.handshake.BrokenHandshakeException;
import org.apache.ignite.internal.network.messages.AllTypesMessageImpl;
import org.apache.ignite.internal.network.messages.InstantContainer;
import org.apache.ignite.internal.network.messages.MessageWithInstant;
@@ -655,21 +665,78 @@ class DefaultMessagingServiceTest extends
BaseIgniteAbstractTest {
) {
CompletableFuture<Void> messageDelivered = new
CompletableFuture<>();
- receiverServices.messagingService.addMessageHandler(
- TestMessageTypes.class,
- (message, sender, correlationId) -> {
- if (message instanceof TestMessage) {
- messageDelivered.complete(null);
+ echoTestMessagesFromInvokes(receiverServices, messageDelivered);
+
+ assertThat(
+ operation.sendAction.send(senderServices.messagingService,
testMessage("test"), receiverWithAnotherId),
+ willCompleteSuccessfully()
+ );
+ if (operation.notRespondOperation()) {
+ assertThat(messageDelivered, willCompleteSuccessfully());
+ }
+ }
+ }
+
+ private static InternalClusterNode copyWithoutName(InternalClusterNode
node) {
+ return new ClusterNodeImpl(node.id(), null, node.address());
+ }
+
+ private static void echoTestMessagesFromInvokes(Services receiverServices,
CompletableFuture<Void> messageDelivered) {
+ receiverServices.messagingService.addMessageHandler(
+ TestMessageTypes.class,
+ (message, sender, correlationId) -> {
+ if (message instanceof TestMessage) {
+ messageDelivered.complete(null);
- if (correlationId != null) {
-
receiverServices.messagingService.respond(sender, message, correlationId);
- }
+ if (correlationId != null) {
+ receiverServices.messagingService.respond(sender,
message, correlationId);
}
}
+ }
+ );
+ }
+
+ @ParameterizedTest
+ @EnumSource(SendByConsistentCoordinateOperation.class)
+ @EnumSource(SendByClusterNodeOperation.class)
+ void sendThrowsOnNonIoCausedConnectionExceptions(AsyncSendOperation
operation) throws Exception {
+ try (
+ Services senderServices = createMessagingService(senderNode,
senderNetworkConfig);
+ Services ignoredReceiverServices =
createMessagingService(receiverNode, receiverNetworkConfig)
+ ) {
+ RecipientLeftException cause = new RecipientLeftException("Oops");
+
+ doReturn(OrderingFuture.failedFuture(cause))
+ .when(senderServices.connectionManager).channel(any(),
or(eq(ChannelType.DEFAULT), eq(TEST_CHANNEL)), any());
+
+ Exception ex = assertWillThrow(
+ operation.send(senderServices.messagingService,
testMessage("test"), receiverNode),
+ Exception.class
);
+ assertThat(ex, is(cause));
+ }
+ }
+
+ @ParameterizedTest
+ @EnumSource(SendByConsistentCoordinateOperation.class)
+ @EnumSource(SendByClusterNodeOperation.class)
+ void sendRetriesOnNonTerminalConnectionExceptions(AsyncSendOperation
operation) throws Exception {
+ try (
+ Services senderServices = createMessagingService(senderNode,
senderNetworkConfig);
+ Services receiverServices =
createMessagingService(receiverNode, receiverNetworkConfig)
+ ) {
+ doReturn(OrderingFuture.failedFuture(new
SocketException("Connection reset by peer")))
+ .doReturn(OrderingFuture.failedFuture(new
BrokenHandshakeException()))
+ .doCallRealMethod()
+ .when(senderServices.connectionManager).channel(any(),
or(eq(ChannelType.DEFAULT), eq(TEST_CHANNEL)), any());
+
+ CompletableFuture<Void> messageDelivered = new
CompletableFuture<>();
+
+ echoTestMessagesFromInvokes(receiverServices, messageDelivered);
+
assertThat(
- operation.sendAction.send(senderServices.messagingService,
testMessage("test"), receiverWithAnotherId),
+ operation.send(senderServices.messagingService,
testMessage("test"), receiverNode),
willCompleteSuccessfully()
);
if (operation.notRespondOperation()) {
@@ -678,8 +745,152 @@ class DefaultMessagingServiceTest extends
BaseIgniteAbstractTest {
}
}
- private static InternalClusterNode copyWithoutName(InternalClusterNode
node) {
- return new ClusterNodeImpl(node.id(), null, node.address());
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-28225: expand this
and other tests to also include by-cluster-node send/invoke
+ // modes (when retries for them are added).
+
+ @ParameterizedTest
+ @EnumSource(SendByConsistentCoordinateOperation.class)
+ void
sendByConsistentIdRetriesOnTerminalIoConnectionExceptions(SendByConsistentCoordinateOperation
operation) throws Exception {
+ assumeThat(operation,
is(not(SendByConsistentCoordinateOperation.SEND_BY_ADDRESS)));
+
+ try (
+ Services senderServices = createMessagingService(senderNode,
senderNetworkConfig);
+ Services receiverServices =
createMessagingService(receiverNode, receiverNetworkConfig)
+ ) {
+ doReturn(OrderingFuture.failedFuture(new
ConnectException("Connection refused")))
+ .doReturn(OrderingFuture.failedFuture(new
NoRouteToHostException()))
+ .doCallRealMethod()
+ .when(senderServices.connectionManager).channel(any(),
or(eq(ChannelType.DEFAULT), eq(TEST_CHANNEL)), any());
+
+ CompletableFuture<Void> messageDelivered = new
CompletableFuture<>();
+
+ echoTestMessagesFromInvokes(receiverServices, messageDelivered);
+
+ assertThat(
+ operation.send(senderServices.messagingService,
testMessage("test"), receiverNode),
+ willCompleteSuccessfully()
+ );
+ if (operation.notRespondOperation()) {
+ assertThat(messageDelivered, willCompleteSuccessfully());
+ }
+ }
+ }
+
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-28225 change this
test when retries are added for sends/invokes by cluster node.
+ @ParameterizedTest
+ @EnumSource(SendByClusterNodeOperation.class)
+ void
sendByClusterNodeDoesNotRetryOnTerminalIoConnectionExceptions(SendByClusterNodeOperation
operation) throws Exception {
+ try (
+ Services senderServices = createMessagingService(senderNode,
senderNetworkConfig);
+ Services ignoredReceiverServices =
createMessagingService(receiverNode, receiverNetworkConfig)
+ ) {
+ ConnectException cause = new ConnectException("Connection
refused");
+ doReturn(OrderingFuture.failedFuture(cause))
+ .when(senderServices.connectionManager).channel(any(),
or(eq(ChannelType.DEFAULT), eq(TEST_CHANNEL)), any());
+
+ assertWillThrow(operation.send(senderServices.messagingService,
testMessage("test"), receiverNode), ConnectException.class);
+ }
+ }
+
+ @Test
+ void sendByAddressDoesNotRetryOnTerminalIoConnectionExceptions() throws
Exception {
+ try (
+ Services senderServices = createMessagingService(senderNode,
senderNetworkConfig);
+ Services ignoredReceiverServices =
createMessagingService(receiverNode, receiverNetworkConfig)
+ ) {
+ ConnectException cause = new ConnectException("Connection
refused");
+ doReturn(OrderingFuture.failedFuture(cause))
+ .when(senderServices.connectionManager).channel(any(),
or(eq(ChannelType.DEFAULT), eq(TEST_CHANNEL)), any());
+
+ assertWillThrow(
+
senderServices.messagingService.send(receiverNode.address(),
ChannelType.DEFAULT, testMessage("test")),
+ ConnectException.class
+ );
+ }
+ }
+
+ @ParameterizedTest
+ @EnumSource(SendByConsistentCoordinateOperation.class)
+ void
sendByConsistentIdStopsRetryingOnTerminalIoConnectionExceptionsIfReceiverLeavesTopology(
+ SendByConsistentCoordinateOperation operation
+ ) throws Exception {
+ assumeThat(operation,
is(not(SendByConsistentCoordinateOperation.SEND_BY_ADDRESS)));
+
+ AtomicInteger retryCount = new AtomicInteger();
+
+ try (
+ Services senderServices = createMessagingService(senderNode,
senderNetworkConfig);
+ Services ignoredReceiverServices =
createMessagingService(receiverNode, receiverNetworkConfig)
+ ) {
+
when(topologyService.getByConsistentId(eq(receiverNode.name()))).thenAnswer(invocation
-> {
+ // Simulate a node leaving topology after 2 retries.
+ if (retryCount.get() >= 2) {
+ return null;
+ } else {
+ return receiverNode;
+ }
+ });
+
+ doAnswer(invocation -> {
+ retryCount.incrementAndGet();
+ return OrderingFuture.failedFuture(new
ConnectException("Connection refused"));
+ }).when(senderServices.connectionManager).channel(any(),
or(eq(ChannelType.DEFAULT), eq(TEST_CHANNEL)), any());
+
+ assertWillThrow(
+ operation.send(senderServices.messagingService,
testMessage("test"), receiverNode),
+ RecipientLeftException.class
+ );
+ }
+ }
+
+ @ParameterizedTest
+ @EnumSource(SendByClusterNodeOperation.class)
+ void sendByClusterNodeChecksForReceiverStalenessOnTerminalIoException(
+ SendByClusterNodeOperation operation
+ ) throws Exception {
+ assumeThat(operation,
is(not(SendByConsistentCoordinateOperation.SEND_BY_ADDRESS)));
+
+ AtomicInteger retryCount = new AtomicInteger();
+ InMemoryStaleIds senderSideStaleIdDetector = new InMemoryStaleIds();
+
+ try (
+ Services senderServices = createMessagingService(
+ senderNode,
+ senderNetworkConfig,
+ () -> {},
+ messageSerializationRegistry,
+ senderSideStaleIdDetector
+ );
+ Services ignoredReceiverServices =
createMessagingService(receiverNode, receiverNetworkConfig)
+ ) {
+ doAnswer(invocation -> {
+ if (retryCount.incrementAndGet() == 1) {
+ senderSideStaleIdDetector.markAsStale(receiverNode.id());
+ }
+ return OrderingFuture.failedFuture(new
ConnectException("Connection refused"));
+ }).when(senderServices.connectionManager).channel(any(),
or(eq(ChannelType.DEFAULT), eq(TEST_CHANNEL)), any());
+
+ assertWillThrow(
+ operation.send(senderServices.messagingService,
testMessage("test"), receiverNode),
+ RecipientLeftException.class
+ );
+ }
+ }
+
+ @Test
+ void invokeStopsRetryingConnectionOnTimeout() throws Exception {
+ try (
+ Services senderServices = createMessagingService(senderNode,
senderNetworkConfig);
+ Services receiverServices =
createMessagingService(receiverNode, receiverNetworkConfig)
+ ) {
+ doReturn(OrderingFuture.failedFuture(new
ConnectException("Connection refused")))
+ .when(senderServices.connectionManager).channel(any(),
or(eq(ChannelType.DEFAULT), eq(TEST_CHANNEL)), any());
+
+ CompletableFuture<Void> messageDelivered = new
CompletableFuture<>();
+ echoTestMessagesFromInvokes(receiverServices, messageDelivered);
+
+
assertWillThrow(senderServices.messagingService.invoke(receiverNode.name(),
testMessage("test"), 300), TimeoutException.class);
+ }
}
private static void awaitQuietly(CountDownLatch latch) {
@@ -726,7 +937,7 @@ class DefaultMessagingServiceTest extends
BaseIgniteAbstractTest {
assertThat(bootstrapFactory.startAsync(new ComponentContext()),
willCompleteSuccessfully());
- ConnectionManager connectionManager = new TestConnectionManager(
+ ConnectionManager connectionManager = spy(new TestConnectionManager(
networkConfig,
serializationService,
node,
@@ -734,7 +945,7 @@ class DefaultMessagingServiceTest extends
BaseIgniteAbstractTest {
staleIdDetector,
clusterIdSupplier,
beforeHandshake
- );
+ ));
DefaultMessagingService messagingService = new DefaultMessagingService(
node.name(),
@@ -936,6 +1147,10 @@ class DefaultMessagingServiceTest extends
BaseIgniteAbstractTest {
private interface AsyncSendOperation {
CompletableFuture<?> send(MessagingService service, TestMessage
message, InternalClusterNode recipient);
+
+ boolean notRespondOperation();
+
+ boolean isInvoke();
}
@FunctionalInterface
@@ -961,6 +1176,16 @@ class DefaultMessagingServiceTest extends
BaseIgniteAbstractTest {
public CompletableFuture<?> send(MessagingService service, TestMessage
message, InternalClusterNode recipient) {
return sendAction.send(service, message, recipient);
}
+
+ @Override
+ public boolean notRespondOperation() {
+ return this != RESPOND_DEFAULT_CHANNEL && this !=
RESPOND_SPECIFIC_CHANNEL;
+ }
+
+ @Override
+ public boolean isInvoke() {
+ return this == INVOKE_DEFAULT_CHANNEL || this ==
INVOKE_SPECIFIC_CHANNEL;
+ }
}
private enum SendByConsistentCoordinateOperation implements
AsyncSendOperation {
@@ -982,8 +1207,14 @@ class DefaultMessagingServiceTest extends
BaseIgniteAbstractTest {
return sendAction.send(service, message, recipient);
}
- private boolean notRespondOperation() {
+ @Override
+ public boolean notRespondOperation() {
return this != RESPOND_BY_CONSISTENT_ID_DEFAULT_CHANNEL && this !=
RESPOND_BY_CONSISTENT_ID_SPECIFIC_CHANNEL;
}
+
+ @Override
+ public boolean isInvoke() {
+ return this == INVOKE_BY_CONSISTENT_ID_DEFAULT_CHANNEL || this ==
INVOKE_BY_CONSISTENT_ID_SPECIFIC_CHANNEL;
+ }
}
}