This is an automated email from the ASF dual-hosted git repository.

alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 8acb7deb753 IGNITE-26209 Add metrics to improve node network 
unavailability detection - Fixes #12339.
8acb7deb753 is described below

commit 8acb7deb7535a52e6aa2e6d89fa166f2f953d26b
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Thu Jan 15 17:37:55 2026 +0300

    IGNITE-26209 Add metrics to improve node network unavailability detection - 
Fixes #12339.
    
    Signed-off-by: Aleksey Plekhanov <[email protected]>
---
 .../org/apache/ignite/IgniteSystemProperties.java  |   9 +
 .../ignite/internal/util/nio/GridNioServer.java    |   6 +
 .../util/nio/GridSelectorNioSessionImpl.java       |  32 ++++
 .../tcp/internal/GridNioServerWrapper.java         |  42 ++++-
 .../ignite/spi/discovery/tcp/ServerImpl.java       |  19 +++
 .../IgniteSlowClientDetectionSelfTest.java         |  12 +-
 .../metric/OutboundIoMessageQueueSizeTest.java     | 190 +++++++++++++++++++++
 .../ClientReconnectContinuousQueryTest.java        |  22 +--
 .../apache/ignite/testframework/GridTestUtils.java |  16 ++
 .../ignite/testsuites/IgniteCacheTestSuite13.java  |   2 +
 10 files changed, 317 insertions(+), 33 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java 
b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index e0d75bafded..e77946f9cda 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -1830,6 +1830,15 @@ public final class IgniteSystemProperties extends 
IgniteCommonsSystemProperties
         "when value IgniteConfiguration#getLocalHost is ip, for backward 
compatibility")
     public static final String IGNITE_TCP_COMM_SET_ATTR_HOST_NAMES = 
"IGNITE_TCP_COMM_SET_ATTR_HOST_NAMES";
 
+    /**
+     * When set to positive number warning will be produced when outgoing 
message queue size of TCP communication SPI
+     * exeeds provided value.
+     * Default is {@code 0} (do not print warning).
+     */
+    @SystemProperty(value = "When set to positive number warning will be 
produced when outgoing message queue size of " +
+        "TCP communication SPI exeeds provided value. Default is 0 (do not 
print warning).", type = Integer.class)
+    public static final String IGNITE_TCP_COMM_MSG_QUEUE_WARN_SIZE = 
"IGNITE_TCP_COMM_MSG_QUEUE_WARN_SIZE";
+
     /**
      * When above zero, prints tx key collisions once per interval.
      * Each transaction besides OPTIMISTIC SERIALIZABLE capture locks on all 
enlisted keys, for some reasons
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 74adb6db642..e1268673c14 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -157,6 +157,12 @@ public class GridNioServer<T> {
     public static final String OUTBOUND_MESSAGES_QUEUE_SIZE_METRIC_DESC
         = "Total number of messages waiting to be sent over all connections";
 
+    /** */
+    public static final String MAX_MESSAGES_QUEUE_SIZE_METRIC_NAME = 
"maxOutboundMessagesQueueSize";
+
+    /** */
+    public static final String MAX_MESSAGES_QUEUE_SIZE_METRIC_DESC = "Maximum 
number of messages waiting to be sent";
+
     /** */
     public static final String RECEIVED_BYTES_METRIC_NAME = "receivedBytes";
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
index 8004bce31ae..d8adbfd84d1 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
@@ -31,6 +31,7 @@ import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.metric.MetricRegistryImpl;
 import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric;
+import org.apache.ignite.internal.processors.metric.impl.MaxValueMetric;
 import org.apache.ignite.internal.processors.tracing.MTC;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.internal.LT;
@@ -39,6 +40,8 @@ import org.apache.ignite.util.deque.FastSizeDeque;
 import org.jetbrains.annotations.Nullable;
 
 import static 
org.apache.ignite.internal.processors.tracing.messages.TraceableMessagesTable.traceName;
+import static 
org.apache.ignite.internal.util.nio.GridNioServer.MAX_MESSAGES_QUEUE_SIZE_METRIC_DESC;
+import static 
org.apache.ignite.internal.util.nio.GridNioServer.MAX_MESSAGES_QUEUE_SIZE_METRIC_NAME;
 import static 
org.apache.ignite.internal.util.nio.GridNioServer.OUTBOUND_MESSAGES_QUEUE_SIZE_METRIC_DESC;
 import static 
org.apache.ignite.internal.util.nio.GridNioServer.OUTBOUND_MESSAGES_QUEUE_SIZE_METRIC_NAME;
 
@@ -92,6 +95,9 @@ public class GridSelectorNioSessionImpl extends 
GridNioSessionImpl implements Gr
     /** Outbound messages queue size metric. */
     @Nullable private final LongAdderMetric outboundMessagesQueueSizeMetric;
 
+    /** Maximum outbound messages queue size metric. */
+    @Nullable private final MaxValueMetric maxMessagesQueueSizeMetric;
+
     /**
      * Creates session instance.
      *
@@ -149,6 +155,13 @@ public class GridSelectorNioSessionImpl extends 
GridNioSessionImpl implements Gr
             OUTBOUND_MESSAGES_QUEUE_SIZE_METRIC_NAME,
             OUTBOUND_MESSAGES_QUEUE_SIZE_METRIC_DESC
         );
+
+        maxMessagesQueueSizeMetric = mreg == null ? null : mreg.maxValueMetric(
+            MAX_MESSAGES_QUEUE_SIZE_METRIC_NAME,
+            MAX_MESSAGES_QUEUE_SIZE_METRIC_DESC,
+            60_000,
+            5
+        );
     }
 
     /** {@inheritDoc} */
@@ -317,6 +330,14 @@ public class GridSelectorNioSessionImpl extends 
GridNioSessionImpl implements Gr
         if (outboundMessagesQueueSizeMetric != null)
             outboundMessagesQueueSizeMetric.increment();
 
+        if (maxMessagesQueueSizeMetric != null) {
+            int queueSize = queue.sizex();
+
+            maxMessagesQueueSizeMetric.update(queueSize);
+
+            return queueSize;
+        }
+
         return queue.sizex();
     }
 
@@ -347,6 +368,14 @@ public class GridSelectorNioSessionImpl extends 
GridNioSessionImpl implements Gr
         if (outboundMessagesQueueSizeMetric != null)
             outboundMessagesQueueSizeMetric.increment();
 
+        if (maxMessagesQueueSizeMetric != null) {
+            int queueSize = queue.sizex();
+
+            maxMessagesQueueSizeMetric.update(queueSize);
+
+            return queueSize;
+        }
+
         return queue.sizex();
     }
 
@@ -362,6 +391,9 @@ public class GridSelectorNioSessionImpl extends 
GridNioSessionImpl implements Gr
 
         if (outboundMessagesQueueSizeMetric != null)
             outboundMessagesQueueSizeMetric.add(futs.size());
+
+        if (maxMessagesQueueSizeMetric != null)
+            maxMessagesQueueSizeMetric.update(futs.size());
     }
 
     /**
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/GridNioServerWrapper.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/GridNioServerWrapper.java
index c88a9ba00a8..fa202b3157a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/GridNioServerWrapper.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/GridNioServerWrapper.java
@@ -37,6 +37,7 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
 import java.util.function.Supplier;
 import javax.net.ssl.SSLEngine;
@@ -133,6 +134,9 @@ public class GridNioServerWrapper {
     /** Default delay between reconnects attempts in case of temporary network 
issues. */
     private static final int DFLT_RECONNECT_DELAY = 50;
 
+    /** Minimum frequency (in milliseconds) of high message queue size 
warning. */
+    private static final long MIN_MSG_QUEUE_SIZE_WARN_FREQUENCY = 30_000L;
+
     /** Channel meta used for establishing channel connections. */
     static final int CHANNEL_FUT_META = GridNioSessionMetaKey.nextUniqueKey();
 
@@ -205,9 +209,13 @@ public class GridNioServerWrapper {
     private volatile ThrowableSupplier<SocketChannel, IOException> 
socketChannelFactory = SocketChannel::open;
 
     /** Enable forcible node kill. */
-    private boolean forcibleNodeKillEnabled = IgniteSystemProperties
+    private final boolean forcibleNodeKillEnabled = IgniteSystemProperties
         .getBoolean(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL);
 
+    /** Message queue size to print warning. */
+    private final int msgQueueWarningSize = IgniteSystemProperties.getInteger(
+        IgniteSystemProperties.IGNITE_TCP_COMM_MSG_QUEUE_WARN_SIZE, 0);
+
     /** NIO server. */
     private GridNioServer<Message> nioSrv;
 
@@ -223,6 +231,9 @@ public class GridNioServerWrapper {
     /** Executor for establishing a connection to a node. */
     private final TcpHandshakeExecutor tcpHandshakeExecutor;
 
+    /** Timestamp of the last high message queue size warning. */
+    private final AtomicLong lastMsqQueueSizeWarningTs = new AtomicLong();
+
     /**
      * @param log Logger.
      * @param cfg Config.
@@ -892,7 +903,9 @@ public class GridNioServerWrapper {
                 boolean clientMode = 
Boolean.TRUE.equals(igniteCfg.isClientMode());
 
                 IgniteBiInClosure<GridNioSession, Integer> queueSizeMonitor =
-                    !clientMode && cfg.slowClientQueueLimit() > 0 ? 
this::checkClientQueueSize : null;
+                    !clientMode && (cfg.slowClientQueueLimit() > 0 || 
msgQueueWarningSize > 0)
+                        ? msgQueueWarningSize > 0 ? this::checkNodeQueueSize : 
this::checkClientQueueSize
+                        : null;
 
                 List<GridNioFilter> filters = new ArrayList<>();
 
@@ -1258,6 +1271,31 @@ public class GridNioServerWrapper {
         }
     }
 
+    /**
+     * Checks node message queue size and produce warning if message queue 
size exceeds the configured threshold.
+     *
+     * @param ses Node communication session.
+     * @param msgQueueSize Message queue size.
+     */
+    private void checkNodeQueueSize(GridNioSession ses, int msgQueueSize) {
+        if (msgQueueWarningSize > 0 && msgQueueSize > msgQueueWarningSize) {
+            long lastWarnTs = lastMsqQueueSizeWarningTs.get();
+
+            if (U.currentTimeMillis() > lastWarnTs + 
MIN_MSG_QUEUE_SIZE_WARN_FREQUENCY) {
+                if (lastMsqQueueSizeWarningTs.compareAndSet(lastWarnTs, 
U.currentTimeMillis())) {
+                    ConnectionKey id = ses.meta(CONN_IDX_META);
+                    if (id != null) {
+                        log.warning("Outbound message queue size for node 
exceeded configured " +
+                            "messageQueueWarningSize value, it may be caused 
by node failure or a network problems " +
+                            "[node=" + id.nodeId() + ", msqQueueSize=" + 
msgQueueSize + ']');
+                    }
+                }
+            }
+        }
+
+        checkClientQueueSize(ses, msgQueueSize);
+    }
+
     /**
      * @param commWorker New recovery and idle clients handler.
      */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 44ca6f0093f..a8316d554a9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -83,6 +83,8 @@ import 
org.apache.ignite.internal.managers.discovery.CustomMessageWrapper;
 import 
org.apache.ignite.internal.managers.discovery.DiscoveryServerOnlyCustomMessage;
 import 
org.apache.ignite.internal.processors.configuration.distributed.DistributedBooleanProperty;
 import org.apache.ignite.internal.processors.failure.FailureProcessor;
+import org.apache.ignite.internal.processors.metric.MetricRegistryImpl;
+import org.apache.ignite.internal.processors.metric.impl.MaxValueMetric;
 import org.apache.ignite.internal.processors.security.SecurityContext;
 import org.apache.ignite.internal.processors.security.SecurityUtils;
 import org.apache.ignite.internal.processors.tracing.Span;
@@ -182,6 +184,7 @@ import static 
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MARSHALLER_US
 import static 
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_NODE_CERTIFICATES;
 import static 
org.apache.ignite.internal.cluster.DistributedConfigurationUtils.CONN_DISABLED_BY_ADMIN_ERR_MSG;
 import static 
org.apache.ignite.internal.cluster.DistributedConfigurationUtils.newConnectionEnabledProperty;
+import static 
org.apache.ignite.internal.managers.discovery.GridDiscoveryManager.DISCO_METRICS;
 import static 
org.apache.ignite.internal.processors.security.SecurityUtils.authenticateLocalNode;
 import static 
org.apache.ignite.internal.processors.security.SecurityUtils.withSecurityContext;
 import static org.apache.ignite.internal.util.lang.ClusterNodeFunc.node2id;
@@ -257,6 +260,9 @@ class ServerImpl extends TcpDiscoveryImpl {
     @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
     private StatisticsPrinter statsPrinter;
 
+    /** Metric for max message queue size. */
+    private MaxValueMetric maxMsgQueueSizeMetric;
+
     /** Failed nodes (but still in topology). */
     private final Map<TcpDiscoveryNode, UUID> failedNodes = new HashMap<>();
 
@@ -492,6 +498,16 @@ class ServerImpl extends TcpDiscoveryImpl {
     /** {@inheritDoc} */
     @Override public void onContextInitialized0(IgniteSpiContext spiCtx) 
throws IgniteSpiException {
         spiCtx.registerPort(tcpSrvr.port, TCP);
+
+        MetricRegistryImpl discoReg = 
(MetricRegistryImpl)spiCtx.getOrCreateMetricRegistry(DISCO_METRICS);
+
+        maxMsgQueueSizeMetric = discoReg.maxValueMetric("MaxMsgQueueSize",
+            "Max message queue size", 60_000L, 5);
+
+        discoReg.register("Next", () -> {
+            TcpDiscoveryNode next = msgWorker != null ? msgWorker.next : null;
+            return next != null ? next.id() : null;
+        }, UUID.class, "Next in the ring node ID");
     }
 
     /** {@inheritDoc} */
@@ -3116,6 +3132,9 @@ class ServerImpl extends TcpDiscoveryImpl {
                 if (log.isDebugEnabled())
                     log.debug("Message has been added to a worker's queue: " + 
msg);
             }
+
+            if (maxMsgQueueSizeMetric != null)
+                maxMsgQueueSizeMetric.update(queue.size());
         }
 
         /** */
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java
index 4d3a4895f6e..75f19e28c1c 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java
@@ -30,14 +30,10 @@ import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.events.EventType;
-import org.apache.ignite.internal.managers.communication.GridIoManager;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
-import org.apache.ignite.internal.util.nio.GridNioServer;
-import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.plugin.segmentation.SegmentationPolicy;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
-import org.apache.ignite.spi.communication.tcp.internal.GridNioServerWrapper;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
@@ -154,13 +150,7 @@ public class IgniteSlowClientDetectionSelfTest extends 
GridCommonAbstractTest {
         for (int i = 0; i < 100; i++)
             cache0.put(0, i);
 
-        GridIoManager ioMgr = slowClient.context().io();
-
-        TcpCommunicationSpi commSpi = 
(TcpCommunicationSpi)((Object[])U.field(ioMgr, "spis"))[0];
-
-        GridNioServer nioSrvr = 
((GridNioServerWrapper)GridTestUtils.getFieldValue(commSpi, 
"nioSrvWrapper")).nio();
-
-        GridTestUtils.setFieldValue(nioSrvr, "skipRead", true);
+        GridTestUtils.skipCommNioServerRead(slowClient, true);
 
         // Initiate messages for client.
         for (int i = 0; i < 100; i++)
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/metric/OutboundIoMessageQueueSizeTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/metric/OutboundIoMessageQueueSizeTest.java
new file mode 100644
index 00000000000..92cc272d2cc
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/metric/OutboundIoMessageQueueSizeTest.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metric;
+
+import java.util.concurrent.CountDownLatch;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.metric.impl.MaxValueMetric;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.spi.discovery.tcp.BlockTcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.LogListener;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
+import org.junit.Test;
+
+import static 
org.apache.ignite.internal.managers.discovery.GridDiscoveryManager.DISCO_METRICS;
+import static 
org.apache.ignite.internal.util.nio.GridNioServer.MAX_MESSAGES_QUEUE_SIZE_METRIC_NAME;
+import static 
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.COMMUNICATION_METRICS_GROUP_NAME;
+
+/**
+ * Test for discovery/communication outbound message queue size metrics.
+ */
+public class OutboundIoMessageQueueSizeTest extends GridCommonAbstractTest {
+    /** */
+    private static final int MSG_LIMIT = 50;
+
+    /** */
+    private final ListeningTestLogger log = new ListeningTestLogger();
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.setDiscoverySpi(new 
BlockTcpDiscoverySpi().setIpFinder(((TcpDiscoverySpi)cfg.getDiscoverySpi()).getIpFinder()));
+        cfg.setGridLogger(log);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    @WithSystemProperty(key = 
IgniteSystemProperties.IGNITE_TCP_COMM_MSG_QUEUE_WARN_SIZE, value = "" + 
MSG_LIMIT)
+    public void testCommunicationMsgQueue() throws Exception {
+        IgniteEx srv0 = startGrid(0);
+        IgniteEx srv1 = startGrid(1);
+
+        String logMsg = "Outbound message queue size for node exceeded";
+
+        // Only one message to log should be printed due to throttling.
+        LogListener logLsnr = LogListener.matches(logMsg).times(1).build();
+
+        log.registerListener(logLsnr);
+
+        IgniteCache<Object, Object> cache0 = 
srv0.getOrCreateCache(DEFAULT_CACHE_NAME);
+        IgniteCache<Object, Object> cache1 = 
srv1.getOrCreateCache(DEFAULT_CACHE_NAME);
+
+        cache1.query(new ContinuousQuery<>().setLocalListener(evt -> {}));
+
+        Integer key = primaryKey(cache0);
+
+        cache0.put(key, 0);
+
+        assertFalse(logLsnr.check());
+
+        MaxValueMetric metric = 
srv0.context().metric().registry(COMMUNICATION_METRICS_GROUP_NAME)
+            .findMetric(MAX_MESSAGES_QUEUE_SIZE_METRIC_NAME);
+
+        assertTrue(metric.value() < MSG_LIMIT);
+
+        GridTestUtils.skipCommNioServerRead(srv1, true);
+
+        // Initiate messages for srv1.
+        // Some messages still may be sent until buffers overflow, so use 
MSG_LIMIT * 2 messages.
+        for (int i = 0; i < MSG_LIMIT * 2; i++)
+            cache0.put(key, new byte[10 * 1024]);
+
+        assertTrue(metric.value() >= MSG_LIMIT);
+
+        assertTrue(logLsnr.check());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testDiscoveryMsgQueue() throws Exception {
+        IgniteEx srv0 = startGrids(2);
+
+        MaxValueMetric metric = srv0.context().metric().registry(DISCO_METRICS)
+            .findMetric("MaxMsgQueueSize");
+
+        metric.reset(); // Reset value accumulated before discovery SPI 
startup.
+
+        srv0.context().discovery().sendCustomEvent(new 
DummyCustomDiscoveryMessage(IgniteUuid.randomUuid()));
+
+        // Assume our message can be added to queue concurrently with other 
messages
+        // (for example, with metrics update message).
+        assertTrue(metric.value() < 3);
+
+        BlockTcpDiscoverySpi discoverySpi = 
(BlockTcpDiscoverySpi)srv0.context().config().getDiscoverySpi();
+
+        CountDownLatch latch = new CountDownLatch(1);
+
+        discoverySpi.setClosure((node, msg) -> {
+            U.awaitQuiet(latch);
+
+            return null;
+        });
+
+        try {
+            for (int i = 0; i <= MSG_LIMIT; i++)
+                srv0.context().discovery().sendCustomEvent(new 
DummyCustomDiscoveryMessage(IgniteUuid.randomUuid()));
+
+            assertTrue(metric.value() >= MSG_LIMIT);
+        }
+        finally {
+            latch.countDown();
+        }
+    }
+
+    /** */
+    private static class DummyCustomDiscoveryMessage implements 
DiscoveryCustomMessage {
+        /** */
+        private final IgniteUuid id;
+
+        /**
+         * @param id Message id.
+         */
+        DummyCustomDiscoveryMessage(IgniteUuid id) {
+            this.id = id;
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteUuid id() {
+            return id;
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public DiscoveryCustomMessage ackMessage() {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean isMutable() {
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr, 
AffinityTopologyVersion topVer,
+            DiscoCache discoCache) {
+            throw new UnsupportedOperationException();
+        }
+    }
+}
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ClientReconnectContinuousQueryTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ClientReconnectContinuousQueryTest.java
index 079869e812a..f3e788b654f 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ClientReconnectContinuousQueryTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ClientReconnectContinuousQueryTest.java
@@ -31,12 +31,8 @@ import org.apache.ignite.events.Event;
 import org.apache.ignite.events.EventType;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.managers.communication.GridIoManager;
-import org.apache.ignite.internal.util.nio.GridNioServer;
-import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
-import org.apache.ignite.spi.communication.tcp.internal.GridNioServerWrapper;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.Test;
@@ -122,13 +118,13 @@ public class ClientReconnectContinuousQueryTest extends 
GridCommonAbstractTest {
 
             assertTrue(updaterReceived.await(10_000, TimeUnit.MILLISECONDS));
 
-            skipRead(client, true);
+            GridTestUtils.skipCommNioServerRead(client, true);
 
             IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new 
Callable<Void>() {
                 @Override public Void call() throws Exception {
                     assertTrue(disconLatch.await(10_000, 
TimeUnit.MILLISECONDS));
 
-                    skipRead(client, false);
+                    GridTestUtils.skipCommNioServerRead(client, false);
 
                     return null;
                 }
@@ -203,18 +199,4 @@ public class ClientReconnectContinuousQueryTest extends 
GridCommonAbstractTest {
         for (int i = 0; i < cnt; i++)
             srvCache.put(0, i);
     }
-
-    /**
-     * @param igniteClient Ignite client.
-     * @param skip Skip.
-     */
-    private void skipRead(IgniteEx igniteClient, boolean skip) {
-        GridIoManager ioMgr = igniteClient.context().io();
-
-        TcpCommunicationSpi commSpi = 
(TcpCommunicationSpi)((Object[])U.field(ioMgr, "spis"))[0];
-
-        GridNioServer nioSrvr = ((GridNioServerWrapper)U.field(commSpi, 
"nioSrvWrapper")).nio();
-
-        GridTestUtils.setFieldValue(nioSrvr, "skipRead", skip);
-    }
 }
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java 
b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
index 1613fdb8fba..62e450ee10c 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
@@ -112,6 +112,7 @@ import 
org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.lang.IgnitePair;
 import org.apache.ignite.internal.util.lang.RunnableX;
 import org.apache.ignite.internal.util.lang.gridfunc.NoOpClosure;
+import org.apache.ignite.internal.util.nio.GridNioServer;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.T2;
@@ -122,6 +123,8 @@ import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.communication.tcp.internal.GridNioServerWrapper;
 import org.apache.ignite.spi.discovery.DiscoveryNotification;
 import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
 import org.apache.ignite.spi.discovery.DiscoverySpiListener;
@@ -2644,4 +2647,17 @@ public final class GridTestUtils {
     public static void suppressException(RunnableX runnableX) {
         runnableX.run();
     }
+
+    /**
+     * @param ignite Ignite instance.
+     * @param skip Skip.
+     */
+    @SuppressWarnings("deprecation")
+    public static void skipCommNioServerRead(IgniteEx ignite, boolean skip) {
+        TcpCommunicationSpi commSpi = 
(TcpCommunicationSpi)(ignite.context().config().getCommunicationSpi());
+
+        GridNioServer<?> nioSrvr = ((GridNioServerWrapper)U.field(commSpi, 
"nioSrvWrapper")).nio();
+
+        setFieldValue(nioSrvr, "skipRead", skip);
+    }
 }
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite13.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite13.java
index 8b9b8afe183..c7c4200a009 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite13.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite13.java
@@ -32,6 +32,7 @@ import org.apache.ignite.internal.metric.JmxExporterSpiTest;
 import org.apache.ignite.internal.metric.LogExporterSpiTest;
 import org.apache.ignite.internal.metric.MetricsConfigurationTest;
 import org.apache.ignite.internal.metric.MetricsSelfTest;
+import org.apache.ignite.internal.metric.OutboundIoMessageQueueSizeTest;
 import org.apache.ignite.internal.metric.ReadMetricsOnNodeStartupTest;
 import org.apache.ignite.internal.metric.SystemMetricsTest;
 import org.apache.ignite.internal.metric.SystemViewCacheExpiryPolicyTest;
@@ -81,6 +82,7 @@ public class IgniteCacheTestSuite13 {
         GridTestUtils.addTestIfNeeded(suite, IoStatisticsCacheSelfTest.class, 
ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, IoStatisticsSelfTest.class, 
ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, 
IoStatisticsMetricsLocalMXBeanImplSelfTest.class, ignoredTests);
+        GridTestUtils.addTestIfNeeded(suite, 
OutboundIoMessageQueueSizeTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, MetricsSelfTest.class, 
ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, SystemMetricsTest.class, 
ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, CustomMetricsTest.class, 
ignoredTests);

Reply via email to