This is an automated email from the ASF dual-hosted git repository. ptupitsyn pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new c7ea5c8bd5 IGNITE-18071 Add client-side heartbeat timeout (#1448) c7ea5c8bd5 is described below commit c7ea5c8bd5811316e5facb54cd8d72f501681137 Author: Pavel Tupitsyn <ptupit...@apache.org> AuthorDate: Wed Dec 14 21:36:00 2022 +0300 IGNITE-18071 Add client-side heartbeat timeout (#1448) * Add `IgniteClientConfiguration.heartbeatTimeout`. * Close connection when there is no response from server to heartbeat message within the specified timeout. * Minor fixes - add nullable annotations, improve logger handling. * Add test for `connectTimeout`. * Fix Netty buffer leak in case of connect timeout. --- .idea/inspectionProfiles/Project_Default.xml | 2 +- .../org/apache/ignite/client/IgniteClient.java | 27 ++++++++++++-- .../ignite/client/IgniteClientConfiguration.java | 18 +++++++-- .../apache/ignite/internal/client/ClientUtils.java | 20 ++++++++++ .../client/IgniteClientConfigurationImpl.java | 16 +++++++- .../ignite/internal/client/ReliableChannel.java | 4 +- .../ignite/internal/client/TcpClientChannel.java | 43 +++++++++++++++++++--- .../ignite/internal/client/TcpIgniteClient.java | 10 +---- .../apache/ignite/client/AbstractClientTest.java | 2 +- .../apache/ignite/client/ClientComputeTest.java | 6 +-- .../org/apache/ignite/client/ConnectionTest.java | 23 +++++++++++- .../org/apache/ignite/client/HeartbeatTest.java | 28 ++++++++++++++ .../org/apache/ignite/client/MultiClusterTest.java | 6 +-- .../org/apache/ignite/client/RetryPolicyTest.java | 4 +- .../ignite/client/TestClientHandlerModule.java | 35 ++++++++++++++++++ .../java/org/apache/ignite/client/TestServer.java | 15 ++++++-- 16 files changed, 220 insertions(+), 39 deletions(-) diff --git a/.idea/inspectionProfiles/Project_Default.xml b/.idea/inspectionProfiles/Project_Default.xml index a77fb48328..2b5ac76e01 100644 --- a/.idea/inspectionProfiles/Project_Default.xml +++ b/.idea/inspectionProfiles/Project_Default.xml @@ -1185,4 +1185,4 @@ <option name="ADD_NONJAVA_TO_ENTRIES" value="true" /> </inspection_tool> </profile> -</component> \ No newline at end of file +</component> diff --git a/modules/client/src/main/java/org/apache/ignite/client/IgniteClient.java b/modules/client/src/main/java/org/apache/ignite/client/IgniteClient.java index 9f5b5c123d..5580f2f331 100644 --- a/modules/client/src/main/java/org/apache/ignite/client/IgniteClient.java +++ b/modules/client/src/main/java/org/apache/ignite/client/IgniteClient.java @@ -19,6 +19,7 @@ package org.apache.ignite.client; import static org.apache.ignite.client.IgniteClientConfiguration.DFLT_CONNECT_TIMEOUT; import static org.apache.ignite.client.IgniteClientConfiguration.DFLT_HEARTBEAT_INTERVAL; +import static org.apache.ignite.client.IgniteClientConfiguration.DFLT_HEARTBEAT_TIMEOUT; import static org.apache.ignite.client.IgniteClientConfiguration.DFLT_RECONNECT_THROTTLING_PERIOD; import static org.apache.ignite.client.IgniteClientConfiguration.DFLT_RECONNECT_THROTTLING_RETRIES; import static org.apache.ignite.internal.client.ClientUtils.sync; @@ -34,6 +35,7 @@ import org.apache.ignite.internal.client.IgniteClientConfigurationImpl; import org.apache.ignite.internal.client.TcpIgniteClient; import org.apache.ignite.lang.LoggerFactory; import org.apache.ignite.network.ClusterNode; +import org.jetbrains.annotations.Nullable; /** * Ignite client entry point. @@ -86,10 +88,14 @@ public interface IgniteClient extends Ignite { /** Heartbeat interval. */ private long heartbeatInterval = DFLT_HEARTBEAT_INTERVAL; + /** Heartbeat timeout. */ + private long heartbeatTimeout = DFLT_HEARTBEAT_TIMEOUT; + /** Retry policy. */ - private RetryPolicy retryPolicy = new RetryReadPolicy(); + private @Nullable RetryPolicy retryPolicy = new RetryReadPolicy(); - private LoggerFactory loggerFactory; + /** Logger factory. */ + private @Nullable LoggerFactory loggerFactory; /** * Sets the addresses of Ignite server nodes within a cluster. An address can be an IP address or a hostname, with or without port. @@ -116,7 +122,7 @@ public interface IgniteClient extends Ignite { * @param retryPolicy Retry policy. * @return This instance. */ - public Builder retryPolicy(RetryPolicy retryPolicy) { + public Builder retryPolicy(@Nullable RetryPolicy retryPolicy) { this.retryPolicy = retryPolicy; return this; @@ -240,6 +246,20 @@ public interface IgniteClient extends Ignite { return this; } + /** + * Sets the heartbeat message timeout, in milliseconds. Default is <code>5000</code>. + * + * <p>When a server does not respond to a heartbeat within the specified timeout, client will close the connection. + * + * @param heartbeatTimeout Heartbeat timeout. + * @return This instance. + */ + public Builder heartbeatTimeout(long heartbeatTimeout) { + this.heartbeatTimeout = heartbeatTimeout; + + return this; + } + /** * Builds the client. * @@ -263,6 +283,7 @@ public interface IgniteClient extends Ignite { reconnectThrottlingRetries, asyncContinuationExecutor, heartbeatInterval, + heartbeatTimeout, retryPolicy, loggerFactory ); diff --git a/modules/client/src/main/java/org/apache/ignite/client/IgniteClientConfiguration.java b/modules/client/src/main/java/org/apache/ignite/client/IgniteClientConfiguration.java index 5fefcbfcd7..d957492bb1 100644 --- a/modules/client/src/main/java/org/apache/ignite/client/IgniteClientConfiguration.java +++ b/modules/client/src/main/java/org/apache/ignite/client/IgniteClientConfiguration.java @@ -36,12 +36,12 @@ public interface IgniteClientConfiguration { /** Default socket connect timeout, in milliseconds. */ int DFLT_CONNECT_TIMEOUT = 5000; + /** Default heartbeat timeout, in milliseconds. */ + int DFLT_HEARTBEAT_TIMEOUT = 5000; + /** Default heartbeat interval, in milliseconds. */ int DFLT_HEARTBEAT_INTERVAL = 30_000; - /** Default operation retry limit. */ - int DFLT_RETRY_LIMIT = 5; - /** Default reconnect throttling period. */ long DFLT_RECONNECT_THROTTLING_PERIOD = 30_000L; @@ -121,6 +121,18 @@ public interface IgniteClientConfiguration { */ public long heartbeatInterval(); + /** + * Gets the heartbeat message timeout, in milliseconds. Default is <code>5000</code>. + * + * <p>When a server does not respond to a heartbeat within the specified timeout, client will close the connection. + * + * <p>When thin client connection is idle (no operations are performed), heartbeat messages are sent periodically + * to keep the connection alive and detect potential half-open state. + * + * @return Heartbeat interval. + */ + public long heartbeatTimeout(); + /** * Returns the logger factory. This factory will be used to create a logger instance when needed. * diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientUtils.java b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientUtils.java index fea4e78e20..3e0ba81d86 100644 --- a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientUtils.java +++ b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientUtils.java @@ -20,8 +20,12 @@ package org.apache.ignite.internal.client; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import org.apache.ignite.client.ClientOperationType; +import org.apache.ignite.client.IgniteClientConfiguration; import org.apache.ignite.internal.client.proto.ClientOp; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.lang.IgniteException; +import org.apache.ignite.lang.LoggerFactory; /** * Client utilities. @@ -178,4 +182,20 @@ public class ClientUtils { throw new UnsupportedOperationException("Invalid op code: " + opCode); } } + + /** + * Gets a logger for the given class. + * + * @param cls Class. + * @return Logger. + */ + public static <T> IgniteLogger logger(IgniteClientConfiguration cfg, Class<T> cls) { + var loggerFactory = cfg.loggerFactory() == null + ? (LoggerFactory) System::getLogger + : cfg.loggerFactory(); + + return loggerFactory == null + ? Loggers.voidLogger() + : Loggers.forClass(cls, loggerFactory); + } } diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/IgniteClientConfigurationImpl.java b/modules/client/src/main/java/org/apache/ignite/internal/client/IgniteClientConfigurationImpl.java index 842c53c8e2..2fa413b2c2 100644 --- a/modules/client/src/main/java/org/apache/ignite/internal/client/IgniteClientConfigurationImpl.java +++ b/modules/client/src/main/java/org/apache/ignite/internal/client/IgniteClientConfigurationImpl.java @@ -49,6 +49,9 @@ public final class IgniteClientConfigurationImpl implements IgniteClientConfigur /** Heartbeat interval. */ private final long heartbeatInterval; + /** Heartbeat timout. */ + private final long heartbeatTimeout; + /** Retry policy. */ private final RetryPolicy retryPolicy; @@ -64,6 +67,7 @@ public final class IgniteClientConfigurationImpl implements IgniteClientConfigur * @param reconnectThrottlingRetries Reconnect throttling retries. * @param asyncContinuationExecutor Async continuation executor. * @param heartbeatInterval Heartbeat message interval. + * @param heartbeatTimeout Heartbeat message timeout. * @param retryPolicy Retry policy. * @param loggerFactory Logger factory which will be used to create a logger instance for this this particular client when needed. */ @@ -75,8 +79,9 @@ public final class IgniteClientConfigurationImpl implements IgniteClientConfigur int reconnectThrottlingRetries, Executor asyncContinuationExecutor, long heartbeatInterval, - RetryPolicy retryPolicy, - LoggerFactory loggerFactory + long heartbeatTimeout, + @Nullable RetryPolicy retryPolicy, + @Nullable LoggerFactory loggerFactory ) { this.addressFinder = addressFinder; @@ -88,6 +93,7 @@ public final class IgniteClientConfigurationImpl implements IgniteClientConfigur this.reconnectThrottlingRetries = reconnectThrottlingRetries; this.asyncContinuationExecutor = asyncContinuationExecutor; this.heartbeatInterval = heartbeatInterval; + this.heartbeatTimeout = heartbeatTimeout; this.retryPolicy = retryPolicy; this.loggerFactory = loggerFactory; } @@ -134,6 +140,12 @@ public final class IgniteClientConfigurationImpl implements IgniteClientConfigur return heartbeatInterval; } + /** {@inheritDoc} */ + @Override + public long heartbeatTimeout() { + return heartbeatTimeout; + } + /** {@inheritDoc} */ @Override public @Nullable LoggerFactory loggerFactory() { diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java index d739ba0598..04d80f52a0 100644 --- a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java +++ b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java @@ -114,10 +114,10 @@ public final class ReliableChannel implements AutoCloseable { * @param clientCfg Client config. */ ReliableChannel(BiFunction<ClientChannelConfiguration, ClientConnectionMultiplexer, ClientChannel> chFactory, - IgniteClientConfiguration clientCfg, IgniteLogger log) { + IgniteClientConfiguration clientCfg) { this.clientCfg = Objects.requireNonNull(clientCfg, "clientCfg"); this.chFactory = Objects.requireNonNull(chFactory, "chFactory"); - this.log = Objects.requireNonNull(log, "log"); + this.log = ClientUtils.logger(clientCfg, ReliableChannel.class); connMgr = new NettyClientConnectionMultiplexer(); connMgr.start(clientCfg); diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java index 4ca527d49b..6f7bde9e2c 100644 --- a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java +++ b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java @@ -38,6 +38,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; @@ -53,6 +54,7 @@ import org.apache.ignite.internal.client.proto.ClientOp; import org.apache.ignite.internal.client.proto.ProtocolVersion; import org.apache.ignite.internal.client.proto.ResponseFlags; import org.apache.ignite.internal.client.proto.ServerMessageType; +import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.lang.IgniteException; import org.apache.ignite.network.ClusterNode; import org.apache.ignite.network.NetworkAddress; @@ -97,9 +99,15 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon /** Connect timeout in milliseconds. */ private final long connectTimeout; + /** Heartbeat timeout in milliseconds. */ + private final long heartbeatTimeout; + /** Heartbeat timer. */ private final Timer heartbeatTimer; + /** Logger. */ + private final IgniteLogger log; + /** Last send operation timestamp. */ private volatile long lastSendMillis; @@ -112,11 +120,14 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon TcpClientChannel(ClientChannelConfiguration cfg, ClientConnectionMultiplexer connMgr) { validateConfiguration(cfg); + log = ClientUtils.logger(cfg.clientConfiguration(), TcpClientChannel.class); + asyncContinuationExecutor = cfg.clientConfiguration().asyncContinuationExecutor() == null ? ForkJoinPool.commonPool() : cfg.clientConfiguration().asyncContinuationExecutor(); connectTimeout = cfg.clientConfiguration().connectTimeout(); + heartbeatTimeout = cfg.clientConfiguration().heartbeatTimeout(); sock = connMgr.open(cfg.getAddress(), this, this); @@ -136,7 +147,7 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon /** * Close the channel with cause. */ - private void close(Exception cause) { + private void close(@Nullable Exception cause) { if (closed.compareAndSet(false, true)) { // Disconnect can happen before we initialize the timer. var timer = heartbeatTimer; @@ -312,7 +323,7 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon * @param unpacker Unpacker. * @return Exception. */ - private IgniteException readError(ClientMessageUnpacker unpacker) { + private static IgniteException readError(ClientMessageUnpacker unpacker) { var traceId = unpacker.unpackUuid(); var code = unpacker.unpackInt(); var errClassName = unpacker.unpackString(); @@ -379,8 +390,14 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon try { handshakeReq(ver); - var res = connectTimeout > 0 ? fut.get(connectTimeout, TimeUnit.MILLISECONDS) : fut.get(); - handshakeRes(res, ver); + // handshakeRes must be called even in case of timeout to release the buffer. + var resFut = fut.thenAccept(res -> handshakeRes(res, ver)); + + if (connectTimeout > 0) { + resFut.get(connectTimeout, TimeUnit.MILLISECONDS); + } else { + resFut.get(); + } } catch (Throwable e) { throw IgniteException.wrap(e); } @@ -498,7 +515,7 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon private final long interval; /** Constructor. */ - public HeartbeatTask(long interval) { + HeartbeatTask(long interval) { this.interval = interval; } @@ -506,7 +523,21 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon @Override public void run() { try { if (System.currentTimeMillis() - lastSendMillis > interval) { - serviceAsync(ClientOp.HEARTBEAT, null, null); + var fut = serviceAsync(ClientOp.HEARTBEAT, null, null); + + if (connectTimeout > 0) { + fut + .orTimeout(heartbeatTimeout, TimeUnit.MILLISECONDS) + .exceptionally(e -> { + if (e instanceof TimeoutException) { + log.warn("Heartbeat timeout, closing the channel"); + + close((TimeoutException) e); + } + + return null; + }); + } } } catch (Throwable ignored) { // Ignore failed heartbeats. diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java index 2ccf7506af..efc3227dde 100644 --- a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java +++ b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java @@ -34,8 +34,6 @@ import org.apache.ignite.internal.client.sql.ClientSql; import org.apache.ignite.internal.client.table.ClientTables; import org.apache.ignite.internal.client.tx.ClientTransactions; import org.apache.ignite.internal.jdbc.proto.ClientMessage; -import org.apache.ignite.internal.logger.Loggers; -import org.apache.ignite.lang.LoggerFactory; import org.apache.ignite.network.ClusterNode; import org.apache.ignite.network.NetworkAddress; import org.apache.ignite.sql.IgniteSql; @@ -88,13 +86,7 @@ public class TcpIgniteClient implements IgniteClient { this.cfg = cfg; - var loggerFactory = cfg.loggerFactory() == null - ? (LoggerFactory) System::getLogger - : cfg.loggerFactory(); - - var log = Loggers.forClass(TcpIgniteClient.class, loggerFactory); - - ch = new ReliableChannel(chFactory, cfg, log); + ch = new ReliableChannel(chFactory, cfg); tables = new ClientTables(ch); transactions = new ClientTransactions(ch); compute = new ClientCompute(ch, tables); diff --git a/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java b/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java index 422e56993b..efb03a47f8 100644 --- a/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java +++ b/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java @@ -136,7 +136,7 @@ public abstract class AbstractClientTest { Ignite ignite, String nodeName ) { - return new TestServer(port, portRange, idleTimeout, ignite, null, nodeName, clusterId); + return new TestServer(port, portRange, idleTimeout, ignite, null, null, nodeName, clusterId); } /** diff --git a/modules/client/src/test/java/org/apache/ignite/client/ClientComputeTest.java b/modules/client/src/test/java/org/apache/ignite/client/ClientComputeTest.java index 22c23493fa..db8db0311d 100644 --- a/modules/client/src/test/java/org/apache/ignite/client/ClientComputeTest.java +++ b/modules/client/src/test/java/org/apache/ignite/client/ClientComputeTest.java @@ -176,9 +176,9 @@ public class ClientComputeTest { var clusterId = UUID.randomUUID(); - server1 = new TestServer(10900, 10, 0, ignite, shouldDropConnection, "s1", clusterId); - server2 = new TestServer(10910, 10, 0, ignite, shouldDropConnection, "s2", clusterId); - server3 = new TestServer(10920, 10, 0, ignite, shouldDropConnection, "s3", clusterId); + server1 = new TestServer(10900, 10, 0, ignite, shouldDropConnection, null, "s1", clusterId); + server2 = new TestServer(10910, 10, 0, ignite, shouldDropConnection, null, "s2", clusterId); + server3 = new TestServer(10920, 10, 0, ignite, shouldDropConnection, null, "s3", clusterId); } private Set<ClusterNode> getClusterNodes(String... names) { diff --git a/modules/client/src/test/java/org/apache/ignite/client/ConnectionTest.java b/modules/client/src/test/java/org/apache/ignite/client/ConnectionTest.java index a6ee3677cf..7c6f03aba8 100644 --- a/modules/client/src/test/java/org/apache/ignite/client/ConnectionTest.java +++ b/modules/client/src/test/java/org/apache/ignite/client/ConnectionTest.java @@ -17,11 +17,17 @@ package org.apache.ignite.client; +import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.endsWith; import static org.junit.jupiter.api.Assertions.assertThrows; +import java.util.UUID; +import java.util.concurrent.TimeoutException; +import java.util.function.Function; +import org.apache.ignite.client.IgniteClient.Builder; +import org.apache.ignite.client.fakes.FakeIgnite; import org.apache.ignite.lang.IgniteException; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; @@ -70,7 +76,22 @@ public class ConnectionTest extends AbstractClientTest { testConnection("[::1]:" + serverPort); } - private void testConnection(String... addrs) throws Exception { + @SuppressWarnings("ThrowableNotThrown") + @Test + public void testNoResponseFromServerWithinConnectTimeoutThrowsException() throws Exception { + Function<Integer, Integer> responseDelay = x -> 500; + + try (var srv = new TestServer(10800, 10, 300, new FakeIgnite(), x -> false, responseDelay, null, UUID.randomUUID())) { + Builder builder = IgniteClient.builder() + .addresses("127.0.0.1:" + srv.port()) + .retryPolicy(new RetryLimitPolicy().retryLimit(1)) + .connectTimeout(50); + + assertThrowsWithCause(builder::build, TimeoutException.class); + } + } + + private static void testConnection(String... addrs) throws Exception { AbstractClientTest.startClient(addrs).close(); } } diff --git a/modules/client/src/test/java/org/apache/ignite/client/HeartbeatTest.java b/modules/client/src/test/java/org/apache/ignite/client/HeartbeatTest.java index c0cfe67c16..8a797702a9 100644 --- a/modules/client/src/test/java/org/apache/ignite/client/HeartbeatTest.java +++ b/modules/client/src/test/java/org/apache/ignite/client/HeartbeatTest.java @@ -20,8 +20,11 @@ package org.apache.ignite.client; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; +import java.util.UUID; +import java.util.function.Function; import org.apache.ignite.client.IgniteClient.Builder; import org.apache.ignite.client.fakes.FakeIgnite; +import org.apache.ignite.internal.testframework.IgniteTestUtils; import org.junit.jupiter.api.Test; /** @@ -75,4 +78,29 @@ public class HeartbeatTest { assertEquals("Negative delay.", ex.getMessage()); } } + + @Test + public void testHeartbeatTimeoutClosesConnection() throws Exception { + Function<Integer, Integer> responseDelayFunc = requestCount -> requestCount > 1 ? 500 : 0; + var loggerFactory = new TestLoggerFactory("client"); + + try (var srv = new TestServer(10800, 10, 300, new FakeIgnite(), x -> false, responseDelayFunc, null, UUID.randomUUID())) { + int srvPort = srv.port(); + + Builder builder = IgniteClient.builder() + .addresses("127.0.0.1:" + srvPort) + .retryPolicy(new RetryLimitPolicy().retryLimit(1)) + .heartbeatTimeout(30) + .reconnectThrottlingPeriod(5000) + .reconnectThrottlingRetries(0) + .heartbeatInterval(50) + .loggerFactory(loggerFactory); + + try (var ignored = builder.build()) { + IgniteTestUtils.waitForCondition( + () -> loggerFactory.logger.entries().stream().anyMatch(x -> x.contains("Heartbeat timeout, closing the channel")), + 3000); + } + } + } } diff --git a/modules/client/src/test/java/org/apache/ignite/client/MultiClusterTest.java b/modules/client/src/test/java/org/apache/ignite/client/MultiClusterTest.java index 9af2164593..7955e58739 100644 --- a/modules/client/src/test/java/org/apache/ignite/client/MultiClusterTest.java +++ b/modules/client/src/test/java/org/apache/ignite/client/MultiClusterTest.java @@ -49,8 +49,8 @@ public class MultiClusterTest { @BeforeEach void setUp() { - server1 = new TestServer(10900, 10, 0, new FakeIgnite(), null, "s1", clusterId1); - server2 = new TestServer(10900, 10, 0, new FakeIgnite(), null, "s2", clusterId2); + server1 = new TestServer(10900, 10, 0, new FakeIgnite(), null, null, "s1", clusterId1); + server2 = new TestServer(10900, 10, 0, new FakeIgnite(), null, null, "s2", clusterId2); } @AfterEach @@ -91,7 +91,7 @@ public class MultiClusterTest { client.tables().tables(); server1.close(); - server1 = new TestServer(10900, 10, 0, new FakeIgnite(), null, "s1", clusterId2); + server1 = new TestServer(10900, 10, 0, new FakeIgnite(), null, null, "s1", clusterId2); IgniteClientConnectionException ex = (IgniteClientConnectionException) assertThrowsWithCause( () -> client.tables().tables(), IgniteClientConnectionException.class, "Cluster ID mismatch"); diff --git a/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java b/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java index 35b7ee1297..cbcde03218 100644 --- a/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java +++ b/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java @@ -216,7 +216,7 @@ public class RetryPolicyTest { @Test public void testRetryReadPolicyAllOperationsSupported() throws IllegalAccessException { var plc = new RetryReadPolicy(); - var cfg = new IgniteClientConfigurationImpl(null, null, 0, 0, 0, null, 0, null, null); + var cfg = new IgniteClientConfigurationImpl(null, null, 0, 0, 0, null, 0, 0, null, null); for (var op : ClientOperationType.values()) { var ctx = new RetryPolicyContextImpl(cfg, op, 0, null); @@ -265,6 +265,6 @@ public class RetryPolicyTest { FakeIgnite ign = new FakeIgnite(); ((FakeIgniteTables) ign.tables()).createTable("t"); - server = new TestServer(10900, 10, 0, ign, shouldDropConnection, null, UUID.randomUUID()); + server = new TestServer(10900, 10, 0, ign, shouldDropConnection, null, null, UUID.randomUUID()); } } diff --git a/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java b/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java index bc23b933bc..6995ecd021 100644 --- a/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java +++ b/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java @@ -60,6 +60,9 @@ public class TestClientHandlerModule implements IgniteComponent { /** Connection drop condition. */ private final Function<Integer, Boolean> shouldDropConnection; + /** Server response delay function. */ + private final Function<Integer, Integer> responseDelay; + /** Cluster service. */ private final ClusterService clusterService; @@ -82,6 +85,7 @@ public class TestClientHandlerModule implements IgniteComponent { * @param registry Configuration registry. * @param bootstrapFactory Bootstrap factory. * @param shouldDropConnection Connection drop condition. + * @param responseDelay Response delay, in milliseconds. * @param clusterService Cluster service. * @param compute Compute. * @param clusterId Cluster id. @@ -91,6 +95,7 @@ public class TestClientHandlerModule implements IgniteComponent { ConfigurationRegistry registry, NettyBootstrapFactory bootstrapFactory, Function<Integer, Boolean> shouldDropConnection, + @Nullable Function<Integer, Integer> responseDelay, ClusterService clusterService, IgniteCompute compute, UUID clusterId) { @@ -102,6 +107,7 @@ public class TestClientHandlerModule implements IgniteComponent { this.registry = registry; this.bootstrapFactory = bootstrapFactory; this.shouldDropConnection = shouldDropConnection; + this.responseDelay = responseDelay; this.clusterService = clusterService; this.compute = compute; this.clusterId = clusterId; @@ -166,6 +172,7 @@ public class TestClientHandlerModule implements IgniteComponent { ch.pipeline().addLast( new ClientMessageDecoder(), new ConnectionDropHandler(requestCounter, shouldDropConnection), + new ResponseDelayHandler(responseDelay), new ClientInboundMessageHandler( (IgniteTablesInternal) ignite.tables(), ignite.transactions(), @@ -230,4 +237,32 @@ public class TestClientHandlerModule implements IgniteComponent { } } } + + private static class ResponseDelayHandler extends ChannelInboundHandlerAdapter { + /** Delay. */ + private final Function<Integer, Integer> delay; + + /** Counter. */ + private final AtomicInteger cnt = new AtomicInteger(); + + /** + * Constructor. + * + * @param delay Delay. + */ + private ResponseDelayHandler(@Nullable Function<Integer, Integer> delay) { + this.delay = delay; + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + var delayMs = delay == null ? 0 : delay.apply(cnt.incrementAndGet()); + + if (delayMs > 0) { + Thread.sleep(delayMs); + } + + super.channelRead(ctx, msg); + } + } } diff --git a/modules/client/src/test/java/org/apache/ignite/client/TestServer.java b/modules/client/src/test/java/org/apache/ignite/client/TestServer.java index df1e9d8045..432c777224 100644 --- a/modules/client/src/test/java/org/apache/ignite/client/TestServer.java +++ b/modules/client/src/test/java/org/apache/ignite/client/TestServer.java @@ -74,7 +74,7 @@ public class TestServer implements AutoCloseable { long idleTimeout, Ignite ignite ) { - this(port, portRange, idleTimeout, ignite, null, null, UUID.randomUUID()); + this(port, portRange, idleTimeout, ignite, null, null, null, UUID.randomUUID()); } /** @@ -91,7 +91,8 @@ public class TestServer implements AutoCloseable { long idleTimeout, Ignite ignite, @Nullable Function<Integer, Boolean> shouldDropConnection, - String nodeName, + @Nullable Function<Integer, Integer> responseDelay, + @Nullable String nodeName, UUID clusterId ) { cfg = new ConfigurationRegistry( @@ -131,7 +132,15 @@ public class TestServer implements AutoCloseable { compute.executeColocated(anyString(), any(), anyString(), any())).thenReturn(CompletableFuture.completedFuture(nodeName)); module = shouldDropConnection != null - ? new TestClientHandlerModule(ignite, cfg, bootstrapFactory, shouldDropConnection, clusterService, compute, clusterId) + ? new TestClientHandlerModule( + ignite, + cfg, + bootstrapFactory, + shouldDropConnection, + responseDelay, + clusterService, + compute, + clusterId) : new ClientHandlerModule( ((FakeIgnite) ignite).queryEngine(), (IgniteTablesInternal) ignite.tables(),