This is an automated email from the ASF dual-hosted git repository.

mpetrov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 09857d4f06d IGNITE-28222 Fixed race condition during Communication 
Node Client creation (#12885)
09857d4f06d is described below

commit 09857d4f06dbbb1ff8126cc94d4d2fbf28c1a52e
Author: Mikhail Petrov <[email protected]>
AuthorDate: Sat Mar 21 20:21:46 2026 +0300

    IGNITE-28222 Fixed race condition during Communication Node Client creation 
(#12885)
---
 .../tcp/internal/CommunicationWorker.java          |   2 +
 .../tcp/internal/ConnectionClientPool.java         | 361 ++++++++++-----------
 .../CommunicationConnectionPoolMetricsTest.java    |  43 ++-
 ...cpCommunicationSpiHalfOpenedConnectionTest.java |  24 +-
 .../IgniteSpiCommunicationSelfTestSuite.java       |   2 +-
 5 files changed, 195 insertions(+), 237 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/CommunicationWorker.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/CommunicationWorker.java
index 6d18247c29c..46ecf4e4d10 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/CommunicationWorker.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/CommunicationWorker.java
@@ -205,6 +205,8 @@ public class CommunicationWorker extends GridWorker {
     private void processIdle() {
         cleanupRecovery();
 
+        clientPool.cleanupNodeMetrics();
+
         for (Map.Entry<UUID, GridCommunicationClient[]> e : 
clientPool.entrySet()) {
             UUID nodeId = e.getKey();
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ConnectionClientPool.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ConnectionClientPool.java
index ba17c3fd5cc..82720bc9774 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ConnectionClientPool.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ConnectionClientPool.java
@@ -19,7 +19,9 @@ package org.apache.ignite.spi.communication.tcp.internal;
 
 import java.net.InetSocketAddress;
 import java.util.Arrays;
+import java.util.Iterator;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.StringJoiner;
 import java.util.UUID;
@@ -27,7 +29,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
@@ -113,7 +114,7 @@ public class ConnectionClientPool {
     private final ConcurrentMap<UUID, GridCommunicationClient[]> clients = 
GridConcurrentFactory.newMap();
 
     /** Metrics for each remote node. */
-    private final Map<UUID, NodeMetrics> metrics;
+    private final Map<UUID, NodeConnectionMetrics> metrics;
 
     /** Config. */
     private final TcpCommunicationConfiguration cfg;
@@ -166,9 +167,6 @@ public class ConnectionClientPool {
     /** */
     private final GridMetricManager metricsMgr;
 
-    /** */
-    private volatile AtomicBoolean asyncMetric;
-
     /**
      * @param cfg Config.
      * @param attrs Attributes.
@@ -210,32 +208,22 @@ public class ConnectionClientPool {
         this.clusterStateProvider = clusterStateProvider;
         this.nioSrvWrapper = nioSrvWrapper;
         this.metricsMgr = metricsMgr;
-
-        this.nodeGetter = new Function<>() {
-            @Override public ClusterNode apply(UUID nodeId) {
-                ClusterNode node = nodeGetter.apply(nodeId);
-
-                if (node == null)
-                    removeNodeMetrics(nodeId);
-
-                return node;
-            }
-        };
+        this.nodeGetter = nodeGetter;
 
         this.handshakeTimeoutExecutorService = 
newSingleThreadScheduledExecutor(
             new IgniteThreadFactory(igniteInstanceName, 
"handshake-timeout-client")
         );
 
-        if (metricsMgr != null) {
-            MetricRegistryImpl mreg = 
metricsMgr.registry(SHARED_METRICS_REGISTRY_NAME);
+        metrics = new ConcurrentHashMap<>(64, 0.75f, Math.max(16, 
Runtime.getRuntime().availableProcessors()));
 
-            mreg.register(METRIC_NAME_POOL_SIZE, () -> 
cfg.connectionsPerNode(), "Maximal connections number to a remote node.");
-            mreg.register(METRIC_NAME_PAIRED_CONNS, () -> 
cfg.usePairedConnections(), "Paired connections flag.");
+        MetricRegistryImpl mreg = 
metricsMgr.registry(SHARED_METRICS_REGISTRY_NAME);
 
-            metrics = new ConcurrentHashMap<>(64, 0.75f, Math.max(16, 
Runtime.getRuntime().availableProcessors()));
-        }
-        else
-            metrics = null;
+        mreg.register(METRIC_NAME_POOL_SIZE, () -> cfg.connectionsPerNode(), 
"Maximal connections number to a remote node.");
+        mreg.register(METRIC_NAME_PAIRED_CONNS, () -> 
cfg.usePairedConnections(), "Paired connections flag.");
+        mreg.register(
+            METRIC_NAME_ASYNC_CONNS,
+            () -> true, // Currently we have only one connection 
implementation that is ASYNC (see GridTcpNioCommunicationClient#async)
+            "Asynchronous flag. If TRUE, connections put data in a queue (with 
some preprocessing) instead of immediate sending.");
     }
 
     /**
@@ -246,7 +234,9 @@ public class ConnectionClientPool {
 
         metricsMgr.remove(SHARED_METRICS_REGISTRY_NAME);
 
-        clients.keySet().forEach(this::removeNodeMetrics);
+        metrics.values().forEach(NodeConnectionMetrics::unregister);
+
+        metrics.clear();
 
         for (GridFutureAdapter<GridCommunicationClient> fut : 
clientFuts.values()) {
             if (fut instanceof ConnectionRequestFuture) {
@@ -267,7 +257,7 @@ public class ConnectionClientPool {
      * @throws IgniteCheckedException Thrown if any exception occurs.
      */
     public GridCommunicationClient reserveClient(ClusterNode node, int 
connIdx) throws IgniteCheckedException {
-        NodeMetrics nodeMetrics = metrics.get(node.id());
+        NodeConnectionMetrics nodeMetrics = metrics.get(node.id());
 
         if (nodeMetrics != null)
             nodeMetrics.acquiringThreadsCnt.incrementAndGet();
@@ -416,11 +406,8 @@ public class ConnectionClientPool {
 
                 assert connIdx == client.connectionIndex() : client;
 
-                if (client.reserve()) {
-                    updateClientAcquiredMetric(client);
-
+                if (client.reserve())
                     return client;
-                }
                 else
                     // Client has just been closed by idle worker. Help it and 
try again.
                     removeNodeClient(nodeId, client);
@@ -432,25 +419,6 @@ public class ConnectionClientPool {
         }
     }
 
-    /** */
-    private void updateClientAcquiredMetric(GridCommunicationClient client) {
-        if (asyncMetric == null) {
-            synchronized (metrics) {
-                if (asyncMetric == null) {
-                    MetricRegistryImpl mreg = 
metricsMgr.registry(SHARED_METRICS_REGISTRY_NAME);
-
-                    // We assume that all the clients have the same async flag.
-                    asyncMetric = new AtomicBoolean(client.async());
-
-                    mreg.register(METRIC_NAME_ASYNC_CONNS, () -> 
asyncMetric.get(), "Asynchronous flag. If TRUE, " +
-                        "connections put data in a queue (with some 
preprocessing) instead of immediate sending.");
-                }
-            }
-        }
-        else
-            assert client.async() == asyncMetric.get();
-    }
-
     /**
      * Handles {@link NodeUnreachableException}. This means that the method 
will try to trigger client itself to open connection.
      * The only possible way of doing this is to use {@link 
TcpCommunicationConfiguration#connectionRequestor()}'s trigger and wait.
@@ -622,135 +590,32 @@ public class ConnectionClientPool {
                 ", client=" + addClient +
                 ", oldClient=" + curClients[connIdx] + ']';
 
-            GridCommunicationClient[] newClients;
+            GridCommunicationClient[] newClients = curClients == null
+                ? new GridCommunicationClient[cfg.connectionsPerNode()]
+                : Arrays.copyOf(curClients, curClients.length);
 
-            if (curClients == null) {
-                newClients = new 
GridCommunicationClient[cfg.connectionsPerNode()];
-                newClients[connIdx] = addClient;
+            newClients[connIdx] = addClient;
 
-                curClients = clients.compute(node.id(), (nodeId0, clients0) -> 
{
-                    if (clients0 == null) {
-                        // Syncs metrics creation on this map.
-                        createNodeMetrics(node);
+            boolean success;
 
-                        return newClients;
-                    }
-
-                    return clients0;
-                });
+            if (curClients == null) {
+                success = clients.putIfAbsent(node.id(), newClients) == null;
 
-                if (curClients != null)
-                    break;
+                if (success)
+                    registerNodeMetrics(node);
             }
-            else {
-                newClients = curClients.clone();
-                newClients[connIdx] = addClient;
+            else
+                success = clients.replace(node.id(), curClients, newClients);
 
+            if (success) {
                 if (log.isDebugEnabled())
-                    log.debug("The node client was replaced [nodeId=" + 
node.id() + ", connIdx=" + connIdx + ", client=" + addClient + "]");
+                    log.debug("New node client were added [nodeId=" + 
node.id() + ", connIdx=" + connIdx + ", client=" + addClient + "]");
 
-                if (clients.replace(node.id(), curClients, newClients))
-                    break;
+                break;
             }
         }
     }
 
-    /** */
-    private void createNodeMetrics(ClusterNode node) {
-        MetricRegistryImpl mreg = 
metricsMgr.registry(nodeMetricsRegName(node.id()));
-
-        assert !mreg.iterator().hasNext() : "Node connection pools metrics 
aren't empty.";
-
-        mreg.register(METRIC_NAME_CONSIST_ID, () -> 
node.consistentId().toString(), String.class,
-            "Consistent id of the remote node as string.");
-
-        mreg.register(METRIC_NAME_CUR_CNT, () -> 
updatedNodeMetrics(node.id()).connsCnt,
-            "Number of current connections to the remote node.");
-
-        mreg.register(METRIC_NAME_MSG_QUEUE_SIZE, () -> 
updatedNodeMetrics(node.id()).msgsQueueSize,
-            "Overal number of pending messages to the remote node.");
-
-        mreg.register(METRIC_NAME_MAX_NET_IDLE_TIME, () -> 
updatedNodeMetrics(node.id()).maxIdleTime,
-            "Maximal idle time of physical sending or receiving data in 
milliseconds.");
-
-        mreg.register(METRIC_NAME_AVG_LIFE_TIME, () -> 
updatedNodeMetrics(node.id()).avgLifetime,
-            "Average connection lifetime in milliseconds.");
-
-        mreg.register(METRIC_NAME_REMOVED_CNT, () -> 
updatedNodeMetrics(node.id()).removedConnectionsCnt.get(),
-            "Total number of removed connections.");
-
-        mreg.register(METRIC_NAME_ACQUIRING_THREADS_CNT, () -> 
updatedNodeMetrics(node.id()).acquiringThreadsCnt.get(),
-            "Number of threads currently acquiring a connection.");
-    }
-
-    /** */
-    private NodeMetrics updatedNodeMetrics(UUID nodeId) {
-        long nowNanos = System.nanoTime();
-
-        NodeMetrics res = metrics.get(nodeId);
-
-        if (res == null || (nowNanos - res.updateTs > METRICS_UPDATE_THRESHOLD 
&& res.canUpdate())) {
-            GridCommunicationClient[] nodeClients = clients.get(nodeId);
-
-            // Node might already leave the cluster.
-            if (nodeClients != null) {
-                long nowMillis = U.currentTimeMillis();
-
-                res = new NodeMetrics(res);
-
-                long avgLifetime = 0;
-                long maxIdleTime = 0;
-
-                for (GridCommunicationClient nodeClient : nodeClients) {
-                    if (nodeClient == null)
-                        continue;
-
-                    ++res.connsCnt;
-
-                    avgLifetime += nowMillis - nodeClient.creationTime();
-
-                    long nodeIdleTime = nodeClient.getIdleTime();
-
-                    if (nodeIdleTime > maxIdleTime)
-                        maxIdleTime = nodeIdleTime;
-
-                    res.msgsQueueSize += nodeClient.messagesQueueSize();
-                }
-
-                if (res.connsCnt != 0)
-                    res.avgLifetime = avgLifetime / res.connsCnt;
-
-                res.maxIdleTime = maxIdleTime;
-
-                NodeMetrics res0 = res;
-
-                res.updateTs = System.nanoTime();
-
-                // Node might already leave the cluster. Syncs metrics removal 
on the clients map.
-                clients.compute(nodeId, (nodeId0, clients) -> {
-                    if (clients == null)
-                        removeNodeMetrics(nodeId);
-                    else
-                        metrics.put(nodeId, res0);
-
-                    return clients;
-                });
-            }
-            else if (res != null) {
-                removeNodeMetrics(nodeId);
-
-                res = null;
-            }
-        }
-
-        return res == null ? NodeMetrics.EMPTY : res;
-    }
-
-    /** */
-    public static String nodeMetricsRegName(UUID nodeId) {
-        return metricName(SHARED_METRICS_REGISTRY_NAME, nodeId.toString());
-    }
-
     /**
      * @param nodeId Node ID.
      * @param rmvClient Client to remove.
@@ -770,9 +635,9 @@ public class ConnectionClientPool {
             newClients[rmvClient.connectionIndex()] = null;
 
             if (clients.replace(nodeId, curClients, newClients)) {
-                NodeMetrics nodeMetrics = metrics.get(nodeId);
+                NodeConnectionMetrics nodeMetrics = metrics.get(nodeId);
 
-                if (nodeMetrics != null && nodeMetrics != NodeMetrics.EMPTY)
+                if (nodeMetrics != null)
                     nodeMetrics.removedConnectionsCnt.addAndGet(1);
 
                 if (log.isDebugEnabled())
@@ -794,8 +659,6 @@ public class ConnectionClientPool {
         if (log.isDebugEnabled())
             log.debug("The node client connections were closed [nodeId=" + 
nodeId + "]");
 
-        removeNodeMetrics(nodeId);
-
         GridCommunicationClient[] clients = this.clients.remove(nodeId);
         if (nonNull(clients)) {
             for (GridCommunicationClient client : clients)
@@ -818,7 +681,7 @@ public class ConnectionClientPool {
     public void onNodeLeft(UUID nodeId) {
         GridCommunicationClient[] clients0 = clients.remove(nodeId);
 
-        removeNodeMetrics(nodeId);
+        unregisterNodeMetrics(nodeId);
 
         if (clients0 != null) {
             for (GridCommunicationClient client : clients0) {
@@ -835,10 +698,18 @@ public class ConnectionClientPool {
     }
 
     /** */
-    private void removeNodeMetrics(UUID nodeId) {
-        metricsMgr.remove(nodeMetricsRegName(nodeId));
+    public void cleanupNodeMetrics() {
+        Iterator<Map.Entry<UUID, NodeConnectionMetrics>> iter = 
metrics.entrySet().iterator();
 
-        metrics.remove(nodeId);
+        while (iter.hasNext()) {
+            Map.Entry<UUID, NodeConnectionMetrics> entry = iter.next();
+
+            if (nodeGetter.apply(entry.getKey()) == null) {
+                entry.getValue().unregister();
+
+                iter.remove();
+            }
+        }
     }
 
     /**
@@ -909,55 +780,149 @@ public class ConnectionClientPool {
     }
 
     /** */
-    private static final class NodeMetrics {
-        /** Avoids NPEs on metrics getting because nodes leave cluster 
asynchronously. */
-        private static final NodeMetrics EMPTY = new NodeMetrics(null);
+    private void registerNodeMetrics(ClusterNode node) {
+        metrics.put(node.id(), new NodeConnectionMetrics(node));
+    }
+
+    /** */
+    void unregisterNodeMetrics(UUID nodeId) {
+        NodeConnectionMetrics nodeMetrics = metrics.remove(nodeId);
+
+        if (nodeMetrics != null)
+            nodeMetrics.unregister();
+    }
+
+    /** */
+    private class NodeConnectionMetrics {
+        /** */
+        private final MetricRegistryImpl registry;
 
         /** */
-        private volatile long updateTs = System.nanoTime();
+        private final ClusterNode node;
 
         /** */
-        private volatile boolean updatingFlag;
+        private final AtomicLong removedConnectionsCnt = new AtomicLong();
 
         /** */
-        private int connsCnt;
+        private final AtomicLong maxIdleTimeOut = new AtomicLong();
 
         /** */
-        private int msgsQueueSize;
+        private final AtomicInteger acquiringThreadsCnt = new AtomicInteger();
 
         /** */
-        private long maxIdleTime;
+        NodeConnectionMetrics(ClusterNode node) {
+            this.node = node;
+            registry = createRegistry();
+        }
 
         /** */
-        private long avgLifetime;
+        void unregister() {
+            metricsMgr.remove(registry.name());
+        }
 
         /** */
-        private final AtomicLong removedConnectionsCnt;
+        private int connectionsCount() {
+            GridCommunicationClient[] nodeClients = clients.get(node.id());
+
+            return nodeClients == null ? 0 : 
(int)Arrays.stream(nodeClients).filter(Objects::nonNull).count();
+        }
 
         /** */
-        private final AtomicInteger acquiringThreadsCnt;
+        private int pendingMessagesCount() {
+            GridCommunicationClient[] nodeClients = clients.get(node.id());
+
+            return nodeClients == null
+                ? 0
+                : 
Arrays.stream(nodeClients).filter(Objects::nonNull).mapToInt(GridCommunicationClient::messagesQueueSize).sum();
+        }
 
         /** */
-        private NodeMetrics(@Nullable NodeMetrics prev) {
-            this.removedConnectionsCnt = prev == null ? new AtomicLong() : 
prev.removedConnectionsCnt;
-            this.acquiringThreadsCnt = prev == null ? new AtomicInteger() : 
prev.acquiringThreadsCnt;
-            this.avgLifetime = prev == null ? 0 : prev.avgLifetime;
-            this.maxIdleTime = prev == null ? 0 : prev.maxIdleTime;
+        private int clientTotalReservationCount() {
+            return acquiringThreadsCnt.get();
         }
 
         /** */
-        private boolean canUpdate() {
-            if (updatingFlag)
-                return false;
+        private long averageConnectionLifeTime() {
+            GridCommunicationClient[] nodeClients = clients.get(node.id());
 
-            synchronized (this) {
-                if (updatingFlag)
-                    return false;
+            if (nodeClients == null)
+                return 0;
 
-                updatingFlag = true;
+            long now = U.currentTimeMillis();
+            int connCnt = 0;
 
-                return true;
+            long totalLifetime = 0;
+
+            for (GridCommunicationClient client : nodeClients) {
+                if (client == null)
+                    continue;
+
+                ++connCnt;
+
+                totalLifetime += (now - client.creationTime());
+            }
+
+            return totalLifetime / connCnt;
+        }
+
+        /** */
+        private long maxConnectionsIdleTime() {
+            GridCommunicationClient[] nodeClients = clients.get(node.id());
+
+            long max = nodeClients == null
+                ? 0
+                : 
Arrays.stream(nodeClients).filter(Objects::nonNull).mapToLong(GridCommunicationClient::getIdleTime).max().orElse(0);
+
+            long cur;
+
+            do {
+                cur = maxIdleTimeOut.get();
             }
+            while (cur < max && !maxIdleTimeOut.compareAndSet(cur, max));
+
+            return maxIdleTimeOut.get();
+        }
+
+        /** */
+        private long removedConnectionsCount() {
+            return removedConnectionsCnt.get();
+        }
+
+        /** */
+        private MetricRegistryImpl createRegistry() {
+            MetricRegistryImpl mreg = 
metricsMgr.registry(metricName(SHARED_METRICS_REGISTRY_NAME, 
node.id().toString()));
+
+            mreg.register(
+                METRIC_NAME_CONSIST_ID,
+                () -> node.consistentId().toString(),
+                String.class,
+                "Consistent id of the remote node as string.");
+            mreg.register(
+                METRIC_NAME_CUR_CNT,
+                this::connectionsCount,
+                "Number of current connections to the remote node.");
+            mreg.register(
+                METRIC_NAME_MSG_QUEUE_SIZE,
+                this::pendingMessagesCount,
+                "Overall number of pending messages to the remote node.");
+            mreg.register(
+                METRIC_NAME_MAX_NET_IDLE_TIME,
+                this::maxConnectionsIdleTime,
+                "Maximal idle time of physical sending or receiving data in 
milliseconds.");
+            mreg.register(
+                METRIC_NAME_AVG_LIFE_TIME,
+                this::averageConnectionLifeTime,
+                "Average connection lifetime in milliseconds.");
+            mreg.register(
+                METRIC_NAME_REMOVED_CNT,
+                this::removedConnectionsCount,
+                "Total number of removed connections.");
+            mreg.register(
+                METRIC_NAME_ACQUIRING_THREADS_CNT,
+                this::clientTotalReservationCount,
+                "Number of threads currently acquiring a connection.");
+
+            return mreg;
         }
     }
 }
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/CommunicationConnectionPoolMetricsTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/CommunicationConnectionPoolMetricsTest.java
index 5189eacc8a1..b369b2e6f1c 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/CommunicationConnectionPoolMetricsTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/CommunicationConnectionPoolMetricsTest.java
@@ -19,7 +19,6 @@ package org.apache.ignite.spi.communication.tcp;
 
 import java.nio.ByteBuffer;
 import java.util.Collection;
-import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -60,6 +59,7 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
+import static 
org.apache.ignite.internal.processors.metric.impl.MetricUtils.metricName;
 import static 
org.apache.ignite.spi.communication.tcp.internal.ConnectionClientPool.METRIC_NAME_ACQUIRING_THREADS_CNT;
 import static 
org.apache.ignite.spi.communication.tcp.internal.ConnectionClientPool.METRIC_NAME_AVG_LIFE_TIME;
 import static 
org.apache.ignite.spi.communication.tcp.internal.ConnectionClientPool.METRIC_NAME_CONSIST_ID;
@@ -67,7 +67,7 @@ import static 
org.apache.ignite.spi.communication.tcp.internal.ConnectionClientP
 import static 
org.apache.ignite.spi.communication.tcp.internal.ConnectionClientPool.METRIC_NAME_MAX_NET_IDLE_TIME;
 import static 
org.apache.ignite.spi.communication.tcp.internal.ConnectionClientPool.METRIC_NAME_MSG_QUEUE_SIZE;
 import static 
org.apache.ignite.spi.communication.tcp.internal.ConnectionClientPool.METRIC_NAME_REMOVED_CNT;
-import static 
org.apache.ignite.spi.communication.tcp.internal.ConnectionClientPool.nodeMetricsRegName;
+import static 
org.apache.ignite.spi.communication.tcp.internal.ConnectionClientPool.SHARED_METRICS_REGISTRY_NAME;
 import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
 
 /** Tests metrics of {@link ConnectionClientPool}. */
@@ -153,7 +153,6 @@ public class CommunicationConnectionPoolMetricsTest extends 
GridCommonAbstractTe
 
         Ignite ldr = clientLdr ? cli : srvr;
 
-        GridMetricManager metricsMgr = ((IgniteEx)ldr).context().metric();
         AtomicBoolean runFlag = new AtomicBoolean(true);
         TestMessage msg = new TestMessage();
 
@@ -164,7 +163,7 @@ public class CommunicationConnectionPoolMetricsTest extends 
GridCommonAbstractTe
             if (node == ldr)
                 continue;
 
-            MetricRegistryImpl mreg = 
metricsMgr.registry(nodeMetricsRegName(node.cluster().localNode().id()));
+            MetricRegistryImpl mreg = metricsForCommunicationConnection(ldr, 
node);
 
             assertTrue(waitForCondition(
                 () -> {
@@ -187,7 +186,7 @@ public class CommunicationConnectionPoolMetricsTest extends 
GridCommonAbstractTe
             if (node == ldr)
                 continue;
 
-            MetricRegistryImpl mreg = 
metricsMgr.registry(nodeMetricsRegName(node.cluster().localNode().id()));
+            MetricRegistryImpl mreg = metricsForCommunicationConnection(ldr, 
node);
 
             assertTrue(waitForCondition(() -> 
mreg.<LongMetric>findMetric(METRIC_NAME_REMOVED_CNT).value() >= connsPerNode,
                 getTestTimeout()));
@@ -206,7 +205,6 @@ public class CommunicationConnectionPoolMetricsTest extends 
GridCommonAbstractTe
 
         Ignite ldr = clientLdr ? cli : srvr;
 
-        GridMetricManager metricsMgr = ((IgniteEx)ldr).context().metric();
         AtomicBoolean runFlag = new AtomicBoolean(true);
         Message msg = new TestMessage();
 
@@ -217,7 +215,7 @@ public class CommunicationConnectionPoolMetricsTest extends 
GridCommonAbstractTe
             if (node == ldr)
                 continue;
 
-            MetricRegistryImpl mreg = 
metricsMgr.registry(nodeMetricsRegName(node.cluster().localNode().id()));
+            MetricRegistryImpl mreg = metricsForCommunicationConnection(ldr, 
node);
 
             assertTrue(waitForCondition(
                 () -> {
@@ -241,6 +239,8 @@ public class CommunicationConnectionPoolMetricsTest extends 
GridCommonAbstractTe
     /** */
     @Test
     public void testMetricsBasics() throws Exception {
+        maxConnIdleTimeout = 500;
+
         int preloadCnt = 300;
         int srvrCnt = 3;
 
@@ -250,7 +250,7 @@ public class CommunicationConnectionPoolMetricsTest extends 
GridCommonAbstractTe
         Ignite ldr = clientLdr ? cli : srvr;
 
         GridMetricManager ldrMetricsMgr = ((IgniteEx)ldr).context().metric();
-        MetricRegistryImpl mreg0 = 
ldrMetricsMgr.registry(ConnectionClientPool.SHARED_METRICS_REGISTRY_NAME);
+        MetricRegistryImpl mreg0 = 
ldrMetricsMgr.registry(SHARED_METRICS_REGISTRY_NAME);
 
         assertEquals(connsPerNode, 
mreg0.<IntMetric>findMetric(ConnectionClientPool.METRIC_NAME_POOL_SIZE).value());
         assertEquals(pairedConns, 
mreg0.<BooleanGauge>findMetric(ConnectionClientPool.METRIC_NAME_PAIRED_CONNS).value());
@@ -277,9 +277,7 @@ public class CommunicationConnectionPoolMetricsTest extends 
GridCommonAbstractTe
             if (node == ldr)
                 continue;
 
-            UUID nodeId = node.cluster().localNode().id();
-
-            MetricRegistryImpl mreg = 
ldrMetricsMgr.registry(nodeMetricsRegName(nodeId));
+            MetricRegistryImpl mreg = metricsForCommunicationConnection(ldr, 
node);
 
             // We assume that entire pool was used at least once.
             assertTrue(waitForCondition(() -> connsPerNode == 
mreg.<IntMetric>findMetric(METRIC_NAME_CUR_CNT).value(),
@@ -316,11 +314,9 @@ public class CommunicationConnectionPoolMetricsTest 
extends GridCommonAbstractTe
             if (!node.cluster().localNode().isClient() && --srvrCnt == 0)
                 break;
 
-            UUID nodeId = node.cluster().localNode().id();
-
             // Wait until there is no messages to this node.
             assertTrue(waitForCondition(() -> {
-                MetricRegistryImpl mreg = 
ldrMetricsMgr.registry(nodeMetricsRegName(nodeId));
+                MetricRegistryImpl mreg = 
metricsForCommunicationConnection(ldr, node);
 
                 assert mreg != null;
 
@@ -330,7 +326,7 @@ public class CommunicationConnectionPoolMetricsTest extends 
GridCommonAbstractTe
             assertTrue(G.stop(node.name(), false));
 
             assertTrue(waitForCondition(() -> {
-                MetricRegistryImpl mreg = 
ldrMetricsMgr.registry(nodeMetricsRegName(nodeId));
+                MetricRegistryImpl mreg = 
metricsForCommunicationConnection(ldr, node);
 
                 return mreg == null || !mreg.iterator().hasNext();
             }, getTestTimeout()));
@@ -352,7 +348,6 @@ public class CommunicationConnectionPoolMetricsTest extends 
GridCommonAbstractTe
 
         Ignite ldr = clientLdr ? cli : srvr;
 
-        GridMetricManager metricsMgr = ((IgniteEx)ldr).context().metric();
         AtomicBoolean runFlag = new AtomicBoolean(true);
 
         IgniteInternalFuture<?> monFut = GridTestUtils.runAsync(() -> {
@@ -361,7 +356,7 @@ public class CommunicationConnectionPoolMetricsTest extends 
GridCommonAbstractTe
                     if (node == ldr)
                         continue;
 
-                    MetricRegistryImpl mreg = 
metricsMgr.registry(nodeMetricsRegName(node.cluster().localNode().id()));
+                    MetricRegistryImpl mreg = 
metricsForCommunicationConnection(ldr, node);
 
                     IntMetric m = 
mreg.findMetric(METRIC_NAME_ACQUIRING_THREADS_CNT);
 
@@ -392,7 +387,6 @@ public class CommunicationConnectionPoolMetricsTest extends 
GridCommonAbstractTe
 
         Ignite ldr = clientLdr ? client : server;
 
-        GridMetricManager metricsMgr = ((IgniteEx)ldr).context().metric();
         AtomicBoolean runFlag = new AtomicBoolean(true);
         AtomicLong loadCnt = new AtomicLong(preloadCnt);
 
@@ -420,7 +414,7 @@ public class CommunicationConnectionPoolMetricsTest extends 
GridCommonAbstractTe
             if (node == ldr)
                 continue;
 
-            MetricRegistryImpl mreg = 
metricsMgr.registry(nodeMetricsRegName(node.cluster().localNode().id()));
+            MetricRegistryImpl mreg = metricsForCommunicationConnection(ldr, 
node);
 
             assertTrue(waitForCondition(
                 () -> 
mreg.<IntMetric>findMetric(METRIC_NAME_MSG_QUEUE_SIZE).value() >= Math.max(10, 
msgQueueLimit),
@@ -490,13 +484,11 @@ public class CommunicationConnectionPoolMetricsTest 
extends GridCommonAbstractTe
         if (!log.isInfoEnabled())
             return;
 
-        GridMetricManager metricsMgr = ((IgniteEx)ldr).context().metric();
-
         for (Ignite node : G.allGrids()) {
             if (node == ldr)
                 continue;
 
-            MetricRegistryImpl mreg = 
metricsMgr.registry(nodeMetricsRegName(node.cluster().localNode().id()));
+            MetricRegistryImpl mreg = metricsForCommunicationConnection(ldr, 
node);
 
             StringBuilder b = new StringBuilder()
                 .append("Pool metrics from node 
").append(ldr.cluster().localNode().order())
@@ -542,6 +534,13 @@ public class CommunicationConnectionPoolMetricsTest 
extends GridCommonAbstractTe
         }
     }
 
+    /** */
+    public static MetricRegistryImpl metricsForCommunicationConnection(Ignite 
from, Ignite to) {
+        return ((IgniteEx)from).context()
+            .metric()
+            .registry(metricName(SHARED_METRICS_REGISTRY_NAME, 
((IgniteEx)to).context().localNodeId().toString()));
+    }
+
     /** */
     private static class TestMessage extends GridTestMessage {
         /** */
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiHalfOpenedConnectionTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationSpiHalfOpenedConnectionTest.java
similarity index 88%
rename from 
modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiHalfOpenedConnectionTest.java
rename to 
modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationSpiHalfOpenedConnectionTest.java
index ee665ca8a60..b1d9bb5ee25 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiHalfOpenedConnectionTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationSpiHalfOpenedConnectionTest.java
@@ -15,17 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.spi.communication.tcp;
+package org.apache.ignite.spi.communication.tcp.internal;
 
 import java.io.IOException;
 import java.util.Iterator;
-import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.cluster.ClusterGroup;
 import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.processors.metric.GridMetricManager;
 import org.apache.ignite.internal.util.nio.GridCommunicationClient;
 import org.apache.ignite.internal.util.nio.GridNioRecoveryDescriptor;
 import org.apache.ignite.internal.util.nio.GridNioServerListener;
@@ -33,7 +31,7 @@ import 
org.apache.ignite.internal.util.nio.GridTcpNioCommunicationClient;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.spi.communication.CommunicationSpi;
-import org.apache.ignite.spi.communication.tcp.internal.ConnectionClientPool;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.Test;
@@ -138,8 +136,8 @@ public class TcpCommunicationSpiHalfOpenedConnectionTest 
extends GridCommonAbstr
     private void reconnect(Ignite srcNode, Ignite targetNode, ClusterGroup 
targetGrp) {
         CommunicationSpi commSpi = 
srcNode.configuration().getCommunicationSpi();
 
-        ConcurrentMap<UUID, GridCommunicationClient[]> clients = 
GridTestUtils.getFieldValue(commSpi, "clientPool", "clients");
-        GridMetricManager metricMgr = GridTestUtils.getFieldValue(commSpi, 
"clientPool", "metricsMgr");
+        ConnectionClientPool connPool = GridTestUtils.getFieldValue(commSpi, 
"clientPool");
+        ConcurrentMap<UUID, GridCommunicationClient[]> clients = 
GridTestUtils.getFieldValue(connPool, "clients");
         ConcurrentMap<?, GridNioRecoveryDescriptor> recoveryDescs = 
GridTestUtils.getFieldValue(commSpi, "nioSrvWrapper", "recoveryDescs");
         ConcurrentMap<?, GridNioRecoveryDescriptor> outRecDescs = 
GridTestUtils.getFieldValue(commSpi, "nioSrvWrapper", "outRecDescs");
         ConcurrentMap<?, GridNioRecoveryDescriptor> inRecDescs = 
GridTestUtils.getFieldValue(commSpi, "nioSrvWrapper", "inRecDescs");
@@ -159,20 +157,14 @@ public class TcpCommunicationSpiHalfOpenedConnectionTest 
extends GridCommonAbstr
             desc.release();
         }
 
+        UUID targetNodeId = targetNode.cluster().localNode().id();
+
         // Remove client to avoid calling close(), in that case server
         // will close connection too, but we want to keep the server
         // uninformed and force ping old connection.
-        GridCommunicationClient[] clients0 = 
clients.remove(targetNode.cluster().localNode().id());
-
-        if (metricMgr != null) {
-            Map<UUID, ?> metrics = GridTestUtils.getFieldValue(commSpi, 
"clientPool", "metrics");
-
-            assert metrics != null;
+        GridCommunicationClient[] clients0 = clients.remove(targetNodeId);
 
-            metrics.remove(targetNode.cluster().localNode().id());
-
-            
metricMgr.remove(ConnectionClientPool.nodeMetricsRegName(targetNode.cluster().localNode().id()));
-        }
+        connPool.unregisterNodeMetrics(targetNodeId);
 
         for (GridCommunicationClient commClient : clients0)
             
lsnr.onDisconnected(((GridTcpNioCommunicationClient)commClient).session(), new 
IOException("Test exception"));
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
index 1029d060c2b..48d7b6fddca 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
@@ -49,13 +49,13 @@ import 
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpiDropNodesTest;
 import 
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpiFaultyClientSslTest;
 import 
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpiFaultyClientTest;
 import 
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpiFreezingClientTest;
-import 
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpiHalfOpenedConnectionTest;
 import 
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpiInverseConnectionLoggingTest;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpiMultiJvmTest;
 import 
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpiNodeLeftLoggingTest;
 import 
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpiSkipMessageSendTest;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationStatisticsTest;
 import 
org.apache.ignite.spi.communication.tcp.TooManyOpenFilesTcpCommunicationSpiTest;
+import 
org.apache.ignite.spi.communication.tcp.internal.TcpCommunicationSpiHalfOpenedConnectionTest;
 import org.junit.runner.RunWith;
 import org.junit.runners.Suite;
 


Reply via email to