This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 8acb7deb753 IGNITE-26209 Add metrics to improve node network
unavailability detection - Fixes #12339.
8acb7deb753 is described below
commit 8acb7deb7535a52e6aa2e6d89fa166f2f953d26b
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Thu Jan 15 17:37:55 2026 +0300
IGNITE-26209 Add metrics to improve node network unavailability detection -
Fixes #12339.
Signed-off-by: Aleksey Plekhanov <[email protected]>
---
.../org/apache/ignite/IgniteSystemProperties.java | 9 +
.../ignite/internal/util/nio/GridNioServer.java | 6 +
.../util/nio/GridSelectorNioSessionImpl.java | 32 ++++
.../tcp/internal/GridNioServerWrapper.java | 42 ++++-
.../ignite/spi/discovery/tcp/ServerImpl.java | 19 +++
.../IgniteSlowClientDetectionSelfTest.java | 12 +-
.../metric/OutboundIoMessageQueueSizeTest.java | 190 +++++++++++++++++++++
.../ClientReconnectContinuousQueryTest.java | 22 +--
.../apache/ignite/testframework/GridTestUtils.java | 16 ++
.../ignite/testsuites/IgniteCacheTestSuite13.java | 2 +
10 files changed, 317 insertions(+), 33 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index e0d75bafded..e77946f9cda 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -1830,6 +1830,15 @@ public final class IgniteSystemProperties extends
IgniteCommonsSystemProperties
"when value IgniteConfiguration#getLocalHost is ip, for backward
compatibility")
public static final String IGNITE_TCP_COMM_SET_ATTR_HOST_NAMES =
"IGNITE_TCP_COMM_SET_ATTR_HOST_NAMES";
+ /**
+ * When set to positive number warning will be produced when outgoing
message queue size of TCP communication SPI
+ * exeeds provided value.
+ * Default is {@code 0} (do not print warning).
+ */
+ @SystemProperty(value = "When set to positive number warning will be
produced when outgoing message queue size of " +
+ "TCP communication SPI exeeds provided value. Default is 0 (do not
print warning).", type = Integer.class)
+ public static final String IGNITE_TCP_COMM_MSG_QUEUE_WARN_SIZE =
"IGNITE_TCP_COMM_MSG_QUEUE_WARN_SIZE";
+
/**
* When above zero, prints tx key collisions once per interval.
* Each transaction besides OPTIMISTIC SERIALIZABLE capture locks on all
enlisted keys, for some reasons
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 74adb6db642..e1268673c14 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -157,6 +157,12 @@ public class GridNioServer<T> {
public static final String OUTBOUND_MESSAGES_QUEUE_SIZE_METRIC_DESC
= "Total number of messages waiting to be sent over all connections";
+ /** */
+ public static final String MAX_MESSAGES_QUEUE_SIZE_METRIC_NAME =
"maxOutboundMessagesQueueSize";
+
+ /** */
+ public static final String MAX_MESSAGES_QUEUE_SIZE_METRIC_DESC = "Maximum
number of messages waiting to be sent";
+
/** */
public static final String RECEIVED_BYTES_METRIC_NAME = "receivedBytes";
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
index 8004bce31ae..d8adbfd84d1 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
@@ -31,6 +31,7 @@ import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.metric.MetricRegistryImpl;
import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric;
+import org.apache.ignite.internal.processors.metric.impl.MaxValueMetric;
import org.apache.ignite.internal.processors.tracing.MTC;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.internal.LT;
@@ -39,6 +40,8 @@ import org.apache.ignite.util.deque.FastSizeDeque;
import org.jetbrains.annotations.Nullable;
import static
org.apache.ignite.internal.processors.tracing.messages.TraceableMessagesTable.traceName;
+import static
org.apache.ignite.internal.util.nio.GridNioServer.MAX_MESSAGES_QUEUE_SIZE_METRIC_DESC;
+import static
org.apache.ignite.internal.util.nio.GridNioServer.MAX_MESSAGES_QUEUE_SIZE_METRIC_NAME;
import static
org.apache.ignite.internal.util.nio.GridNioServer.OUTBOUND_MESSAGES_QUEUE_SIZE_METRIC_DESC;
import static
org.apache.ignite.internal.util.nio.GridNioServer.OUTBOUND_MESSAGES_QUEUE_SIZE_METRIC_NAME;
@@ -92,6 +95,9 @@ public class GridSelectorNioSessionImpl extends
GridNioSessionImpl implements Gr
/** Outbound messages queue size metric. */
@Nullable private final LongAdderMetric outboundMessagesQueueSizeMetric;
+ /** Maximum outbound messages queue size metric. */
+ @Nullable private final MaxValueMetric maxMessagesQueueSizeMetric;
+
/**
* Creates session instance.
*
@@ -149,6 +155,13 @@ public class GridSelectorNioSessionImpl extends
GridNioSessionImpl implements Gr
OUTBOUND_MESSAGES_QUEUE_SIZE_METRIC_NAME,
OUTBOUND_MESSAGES_QUEUE_SIZE_METRIC_DESC
);
+
+ maxMessagesQueueSizeMetric = mreg == null ? null : mreg.maxValueMetric(
+ MAX_MESSAGES_QUEUE_SIZE_METRIC_NAME,
+ MAX_MESSAGES_QUEUE_SIZE_METRIC_DESC,
+ 60_000,
+ 5
+ );
}
/** {@inheritDoc} */
@@ -317,6 +330,14 @@ public class GridSelectorNioSessionImpl extends
GridNioSessionImpl implements Gr
if (outboundMessagesQueueSizeMetric != null)
outboundMessagesQueueSizeMetric.increment();
+ if (maxMessagesQueueSizeMetric != null) {
+ int queueSize = queue.sizex();
+
+ maxMessagesQueueSizeMetric.update(queueSize);
+
+ return queueSize;
+ }
+
return queue.sizex();
}
@@ -347,6 +368,14 @@ public class GridSelectorNioSessionImpl extends
GridNioSessionImpl implements Gr
if (outboundMessagesQueueSizeMetric != null)
outboundMessagesQueueSizeMetric.increment();
+ if (maxMessagesQueueSizeMetric != null) {
+ int queueSize = queue.sizex();
+
+ maxMessagesQueueSizeMetric.update(queueSize);
+
+ return queueSize;
+ }
+
return queue.sizex();
}
@@ -362,6 +391,9 @@ public class GridSelectorNioSessionImpl extends
GridNioSessionImpl implements Gr
if (outboundMessagesQueueSizeMetric != null)
outboundMessagesQueueSizeMetric.add(futs.size());
+
+ if (maxMessagesQueueSizeMetric != null)
+ maxMessagesQueueSizeMetric.update(futs.size());
}
/**
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/GridNioServerWrapper.java
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/GridNioServerWrapper.java
index c88a9ba00a8..fa202b3157a 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/GridNioServerWrapper.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/GridNioServerWrapper.java
@@ -37,6 +37,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Supplier;
import javax.net.ssl.SSLEngine;
@@ -133,6 +134,9 @@ public class GridNioServerWrapper {
/** Default delay between reconnects attempts in case of temporary network
issues. */
private static final int DFLT_RECONNECT_DELAY = 50;
+ /** Minimum frequency (in milliseconds) of high message queue size
warning. */
+ private static final long MIN_MSG_QUEUE_SIZE_WARN_FREQUENCY = 30_000L;
+
/** Channel meta used for establishing channel connections. */
static final int CHANNEL_FUT_META = GridNioSessionMetaKey.nextUniqueKey();
@@ -205,9 +209,13 @@ public class GridNioServerWrapper {
private volatile ThrowableSupplier<SocketChannel, IOException>
socketChannelFactory = SocketChannel::open;
/** Enable forcible node kill. */
- private boolean forcibleNodeKillEnabled = IgniteSystemProperties
+ private final boolean forcibleNodeKillEnabled = IgniteSystemProperties
.getBoolean(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL);
+ /** Message queue size to print warning. */
+ private final int msgQueueWarningSize = IgniteSystemProperties.getInteger(
+ IgniteSystemProperties.IGNITE_TCP_COMM_MSG_QUEUE_WARN_SIZE, 0);
+
/** NIO server. */
private GridNioServer<Message> nioSrv;
@@ -223,6 +231,9 @@ public class GridNioServerWrapper {
/** Executor for establishing a connection to a node. */
private final TcpHandshakeExecutor tcpHandshakeExecutor;
+ /** Timestamp of the last high message queue size warning. */
+ private final AtomicLong lastMsqQueueSizeWarningTs = new AtomicLong();
+
/**
* @param log Logger.
* @param cfg Config.
@@ -892,7 +903,9 @@ public class GridNioServerWrapper {
boolean clientMode =
Boolean.TRUE.equals(igniteCfg.isClientMode());
IgniteBiInClosure<GridNioSession, Integer> queueSizeMonitor =
- !clientMode && cfg.slowClientQueueLimit() > 0 ?
this::checkClientQueueSize : null;
+ !clientMode && (cfg.slowClientQueueLimit() > 0 ||
msgQueueWarningSize > 0)
+ ? msgQueueWarningSize > 0 ? this::checkNodeQueueSize :
this::checkClientQueueSize
+ : null;
List<GridNioFilter> filters = new ArrayList<>();
@@ -1258,6 +1271,31 @@ public class GridNioServerWrapper {
}
}
+ /**
+ * Checks node message queue size and produce warning if message queue
size exceeds the configured threshold.
+ *
+ * @param ses Node communication session.
+ * @param msgQueueSize Message queue size.
+ */
+ private void checkNodeQueueSize(GridNioSession ses, int msgQueueSize) {
+ if (msgQueueWarningSize > 0 && msgQueueSize > msgQueueWarningSize) {
+ long lastWarnTs = lastMsqQueueSizeWarningTs.get();
+
+ if (U.currentTimeMillis() > lastWarnTs +
MIN_MSG_QUEUE_SIZE_WARN_FREQUENCY) {
+ if (lastMsqQueueSizeWarningTs.compareAndSet(lastWarnTs,
U.currentTimeMillis())) {
+ ConnectionKey id = ses.meta(CONN_IDX_META);
+ if (id != null) {
+ log.warning("Outbound message queue size for node
exceeded configured " +
+ "messageQueueWarningSize value, it may be caused
by node failure or a network problems " +
+ "[node=" + id.nodeId() + ", msqQueueSize=" +
msgQueueSize + ']');
+ }
+ }
+ }
+ }
+
+ checkClientQueueSize(ses, msgQueueSize);
+ }
+
/**
* @param commWorker New recovery and idle clients handler.
*/
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 44ca6f0093f..a8316d554a9 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -83,6 +83,8 @@ import
org.apache.ignite.internal.managers.discovery.CustomMessageWrapper;
import
org.apache.ignite.internal.managers.discovery.DiscoveryServerOnlyCustomMessage;
import
org.apache.ignite.internal.processors.configuration.distributed.DistributedBooleanProperty;
import org.apache.ignite.internal.processors.failure.FailureProcessor;
+import org.apache.ignite.internal.processors.metric.MetricRegistryImpl;
+import org.apache.ignite.internal.processors.metric.impl.MaxValueMetric;
import org.apache.ignite.internal.processors.security.SecurityContext;
import org.apache.ignite.internal.processors.security.SecurityUtils;
import org.apache.ignite.internal.processors.tracing.Span;
@@ -182,6 +184,7 @@ import static
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MARSHALLER_US
import static
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_NODE_CERTIFICATES;
import static
org.apache.ignite.internal.cluster.DistributedConfigurationUtils.CONN_DISABLED_BY_ADMIN_ERR_MSG;
import static
org.apache.ignite.internal.cluster.DistributedConfigurationUtils.newConnectionEnabledProperty;
+import static
org.apache.ignite.internal.managers.discovery.GridDiscoveryManager.DISCO_METRICS;
import static
org.apache.ignite.internal.processors.security.SecurityUtils.authenticateLocalNode;
import static
org.apache.ignite.internal.processors.security.SecurityUtils.withSecurityContext;
import static org.apache.ignite.internal.util.lang.ClusterNodeFunc.node2id;
@@ -257,6 +260,9 @@ class ServerImpl extends TcpDiscoveryImpl {
@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
private StatisticsPrinter statsPrinter;
+ /** Metric for max message queue size. */
+ private MaxValueMetric maxMsgQueueSizeMetric;
+
/** Failed nodes (but still in topology). */
private final Map<TcpDiscoveryNode, UUID> failedNodes = new HashMap<>();
@@ -492,6 +498,16 @@ class ServerImpl extends TcpDiscoveryImpl {
/** {@inheritDoc} */
@Override public void onContextInitialized0(IgniteSpiContext spiCtx)
throws IgniteSpiException {
spiCtx.registerPort(tcpSrvr.port, TCP);
+
+ MetricRegistryImpl discoReg =
(MetricRegistryImpl)spiCtx.getOrCreateMetricRegistry(DISCO_METRICS);
+
+ maxMsgQueueSizeMetric = discoReg.maxValueMetric("MaxMsgQueueSize",
+ "Max message queue size", 60_000L, 5);
+
+ discoReg.register("Next", () -> {
+ TcpDiscoveryNode next = msgWorker != null ? msgWorker.next : null;
+ return next != null ? next.id() : null;
+ }, UUID.class, "Next in the ring node ID");
}
/** {@inheritDoc} */
@@ -3116,6 +3132,9 @@ class ServerImpl extends TcpDiscoveryImpl {
if (log.isDebugEnabled())
log.debug("Message has been added to a worker's queue: " +
msg);
}
+
+ if (maxMsgQueueSizeMetric != null)
+ maxMsgQueueSizeMetric.update(queue.size());
}
/** */
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java
index 4d3a4895f6e..75f19e28c1c 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java
@@ -30,14 +30,10 @@ import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.events.EventType;
-import org.apache.ignite.internal.managers.communication.GridIoManager;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
-import org.apache.ignite.internal.util.nio.GridNioServer;
-import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.plugin.segmentation.SegmentationPolicy;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
-import org.apache.ignite.spi.communication.tcp.internal.GridNioServerWrapper;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
@@ -154,13 +150,7 @@ public class IgniteSlowClientDetectionSelfTest extends
GridCommonAbstractTest {
for (int i = 0; i < 100; i++)
cache0.put(0, i);
- GridIoManager ioMgr = slowClient.context().io();
-
- TcpCommunicationSpi commSpi =
(TcpCommunicationSpi)((Object[])U.field(ioMgr, "spis"))[0];
-
- GridNioServer nioSrvr =
((GridNioServerWrapper)GridTestUtils.getFieldValue(commSpi,
"nioSrvWrapper")).nio();
-
- GridTestUtils.setFieldValue(nioSrvr, "skipRead", true);
+ GridTestUtils.skipCommNioServerRead(slowClient, true);
// Initiate messages for client.
for (int i = 0; i < 100; i++)
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/metric/OutboundIoMessageQueueSizeTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/metric/OutboundIoMessageQueueSizeTest.java
new file mode 100644
index 00000000000..92cc272d2cc
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/metric/OutboundIoMessageQueueSizeTest.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metric;
+
+import java.util.concurrent.CountDownLatch;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.metric.impl.MaxValueMetric;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.spi.discovery.tcp.BlockTcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.LogListener;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
+import org.junit.Test;
+
+import static
org.apache.ignite.internal.managers.discovery.GridDiscoveryManager.DISCO_METRICS;
+import static
org.apache.ignite.internal.util.nio.GridNioServer.MAX_MESSAGES_QUEUE_SIZE_METRIC_NAME;
+import static
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.COMMUNICATION_METRICS_GROUP_NAME;
+
+/**
+ * Test for discovery/communication outbound message queue size metrics.
+ */
+public class OutboundIoMessageQueueSizeTest extends GridCommonAbstractTest {
+ /** */
+ private static final int MSG_LIMIT = 50;
+
+ /** */
+ private final ListeningTestLogger log = new ListeningTestLogger();
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.setDiscoverySpi(new
BlockTcpDiscoverySpi().setIpFinder(((TcpDiscoverySpi)cfg.getDiscoverySpi()).getIpFinder()));
+ cfg.setGridLogger(log);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ @WithSystemProperty(key =
IgniteSystemProperties.IGNITE_TCP_COMM_MSG_QUEUE_WARN_SIZE, value = "" +
MSG_LIMIT)
+ public void testCommunicationMsgQueue() throws Exception {
+ IgniteEx srv0 = startGrid(0);
+ IgniteEx srv1 = startGrid(1);
+
+ String logMsg = "Outbound message queue size for node exceeded";
+
+ // Only one message to log should be printed due to throttling.
+ LogListener logLsnr = LogListener.matches(logMsg).times(1).build();
+
+ log.registerListener(logLsnr);
+
+ IgniteCache<Object, Object> cache0 =
srv0.getOrCreateCache(DEFAULT_CACHE_NAME);
+ IgniteCache<Object, Object> cache1 =
srv1.getOrCreateCache(DEFAULT_CACHE_NAME);
+
+ cache1.query(new ContinuousQuery<>().setLocalListener(evt -> {}));
+
+ Integer key = primaryKey(cache0);
+
+ cache0.put(key, 0);
+
+ assertFalse(logLsnr.check());
+
+ MaxValueMetric metric =
srv0.context().metric().registry(COMMUNICATION_METRICS_GROUP_NAME)
+ .findMetric(MAX_MESSAGES_QUEUE_SIZE_METRIC_NAME);
+
+ assertTrue(metric.value() < MSG_LIMIT);
+
+ GridTestUtils.skipCommNioServerRead(srv1, true);
+
+ // Initiate messages for srv1.
+ // Some messages still may be sent until buffers overflow, so use
MSG_LIMIT * 2 messages.
+ for (int i = 0; i < MSG_LIMIT * 2; i++)
+ cache0.put(key, new byte[10 * 1024]);
+
+ assertTrue(metric.value() >= MSG_LIMIT);
+
+ assertTrue(logLsnr.check());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testDiscoveryMsgQueue() throws Exception {
+ IgniteEx srv0 = startGrids(2);
+
+ MaxValueMetric metric = srv0.context().metric().registry(DISCO_METRICS)
+ .findMetric("MaxMsgQueueSize");
+
+ metric.reset(); // Reset value accumulated before discovery SPI
startup.
+
+ srv0.context().discovery().sendCustomEvent(new
DummyCustomDiscoveryMessage(IgniteUuid.randomUuid()));
+
+ // Assume our message can be added to queue concurrently with other
messages
+ // (for example, with metrics update message).
+ assertTrue(metric.value() < 3);
+
+ BlockTcpDiscoverySpi discoverySpi =
(BlockTcpDiscoverySpi)srv0.context().config().getDiscoverySpi();
+
+ CountDownLatch latch = new CountDownLatch(1);
+
+ discoverySpi.setClosure((node, msg) -> {
+ U.awaitQuiet(latch);
+
+ return null;
+ });
+
+ try {
+ for (int i = 0; i <= MSG_LIMIT; i++)
+ srv0.context().discovery().sendCustomEvent(new
DummyCustomDiscoveryMessage(IgniteUuid.randomUuid()));
+
+ assertTrue(metric.value() >= MSG_LIMIT);
+ }
+ finally {
+ latch.countDown();
+ }
+ }
+
+ /** */
+ private static class DummyCustomDiscoveryMessage implements
DiscoveryCustomMessage {
+ /** */
+ private final IgniteUuid id;
+
+ /**
+ * @param id Message id.
+ */
+ DummyCustomDiscoveryMessage(IgniteUuid id) {
+ this.id = id;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteUuid id() {
+ return id;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public DiscoveryCustomMessage ackMessage() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isMutable() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr,
AffinityTopologyVersion topVer,
+ DiscoCache discoCache) {
+ throw new UnsupportedOperationException();
+ }
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ClientReconnectContinuousQueryTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ClientReconnectContinuousQueryTest.java
index 079869e812a..f3e788b654f 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ClientReconnectContinuousQueryTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ClientReconnectContinuousQueryTest.java
@@ -31,12 +31,8 @@ import org.apache.ignite.events.Event;
import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.managers.communication.GridIoManager;
-import org.apache.ignite.internal.util.nio.GridNioServer;
-import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
-import org.apache.ignite.spi.communication.tcp.internal.GridNioServerWrapper;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
@@ -122,13 +118,13 @@ public class ClientReconnectContinuousQueryTest extends
GridCommonAbstractTest {
assertTrue(updaterReceived.await(10_000, TimeUnit.MILLISECONDS));
- skipRead(client, true);
+ GridTestUtils.skipCommNioServerRead(client, true);
IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new
Callable<Void>() {
@Override public Void call() throws Exception {
assertTrue(disconLatch.await(10_000,
TimeUnit.MILLISECONDS));
- skipRead(client, false);
+ GridTestUtils.skipCommNioServerRead(client, false);
return null;
}
@@ -203,18 +199,4 @@ public class ClientReconnectContinuousQueryTest extends
GridCommonAbstractTest {
for (int i = 0; i < cnt; i++)
srvCache.put(0, i);
}
-
- /**
- * @param igniteClient Ignite client.
- * @param skip Skip.
- */
- private void skipRead(IgniteEx igniteClient, boolean skip) {
- GridIoManager ioMgr = igniteClient.context().io();
-
- TcpCommunicationSpi commSpi =
(TcpCommunicationSpi)((Object[])U.field(ioMgr, "spis"))[0];
-
- GridNioServer nioSrvr = ((GridNioServerWrapper)U.field(commSpi,
"nioSrvWrapper")).nio();
-
- GridTestUtils.setFieldValue(nioSrvr, "skipRead", skip);
- }
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
index 1613fdb8fba..62e450ee10c 100644
---
a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
+++
b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
@@ -112,6 +112,7 @@ import
org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.lang.IgnitePair;
import org.apache.ignite.internal.util.lang.RunnableX;
import org.apache.ignite.internal.util.lang.gridfunc.NoOpClosure;
+import org.apache.ignite.internal.util.nio.GridNioServer;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.T2;
@@ -122,6 +123,8 @@ import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.communication.tcp.internal.GridNioServerWrapper;
import org.apache.ignite.spi.discovery.DiscoveryNotification;
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.apache.ignite.spi.discovery.DiscoverySpiListener;
@@ -2644,4 +2647,17 @@ public final class GridTestUtils {
public static void suppressException(RunnableX runnableX) {
runnableX.run();
}
+
+ /**
+ * @param ignite Ignite instance.
+ * @param skip Skip.
+ */
+ @SuppressWarnings("deprecation")
+ public static void skipCommNioServerRead(IgniteEx ignite, boolean skip) {
+ TcpCommunicationSpi commSpi =
(TcpCommunicationSpi)(ignite.context().config().getCommunicationSpi());
+
+ GridNioServer<?> nioSrvr = ((GridNioServerWrapper)U.field(commSpi,
"nioSrvWrapper")).nio();
+
+ setFieldValue(nioSrvr, "skipRead", skip);
+ }
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite13.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite13.java
index 8b9b8afe183..c7c4200a009 100644
---
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite13.java
+++
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite13.java
@@ -32,6 +32,7 @@ import org.apache.ignite.internal.metric.JmxExporterSpiTest;
import org.apache.ignite.internal.metric.LogExporterSpiTest;
import org.apache.ignite.internal.metric.MetricsConfigurationTest;
import org.apache.ignite.internal.metric.MetricsSelfTest;
+import org.apache.ignite.internal.metric.OutboundIoMessageQueueSizeTest;
import org.apache.ignite.internal.metric.ReadMetricsOnNodeStartupTest;
import org.apache.ignite.internal.metric.SystemMetricsTest;
import org.apache.ignite.internal.metric.SystemViewCacheExpiryPolicyTest;
@@ -81,6 +82,7 @@ public class IgniteCacheTestSuite13 {
GridTestUtils.addTestIfNeeded(suite, IoStatisticsCacheSelfTest.class,
ignoredTests);
GridTestUtils.addTestIfNeeded(suite, IoStatisticsSelfTest.class,
ignoredTests);
GridTestUtils.addTestIfNeeded(suite,
IoStatisticsMetricsLocalMXBeanImplSelfTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite,
OutboundIoMessageQueueSizeTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, MetricsSelfTest.class,
ignoredTests);
GridTestUtils.addTestIfNeeded(suite, SystemMetricsTest.class,
ignoredTests);
GridTestUtils.addTestIfNeeded(suite, CustomMetricsTest.class,
ignoredTests);