This is an automated email from the ASF dual-hosted git repository.
mpetrov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 09857d4f06d IGNITE-28222 Fixed race condition during Communication
Node Client creation (#12885)
09857d4f06d is described below
commit 09857d4f06dbbb1ff8126cc94d4d2fbf28c1a52e
Author: Mikhail Petrov <[email protected]>
AuthorDate: Sat Mar 21 20:21:46 2026 +0300
IGNITE-28222 Fixed race condition during Communication Node Client creation
(#12885)
---
.../tcp/internal/CommunicationWorker.java | 2 +
.../tcp/internal/ConnectionClientPool.java | 361 ++++++++++-----------
.../CommunicationConnectionPoolMetricsTest.java | 43 ++-
...cpCommunicationSpiHalfOpenedConnectionTest.java | 24 +-
.../IgniteSpiCommunicationSelfTestSuite.java | 2 +-
5 files changed, 195 insertions(+), 237 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/CommunicationWorker.java
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/CommunicationWorker.java
index 6d18247c29c..46ecf4e4d10 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/CommunicationWorker.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/CommunicationWorker.java
@@ -205,6 +205,8 @@ public class CommunicationWorker extends GridWorker {
private void processIdle() {
cleanupRecovery();
+ clientPool.cleanupNodeMetrics();
+
for (Map.Entry<UUID, GridCommunicationClient[]> e :
clientPool.entrySet()) {
UUID nodeId = e.getKey();
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ConnectionClientPool.java
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ConnectionClientPool.java
index ba17c3fd5cc..82720bc9774 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ConnectionClientPool.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ConnectionClientPool.java
@@ -19,7 +19,9 @@ package org.apache.ignite.spi.communication.tcp.internal;
import java.net.InetSocketAddress;
import java.util.Arrays;
+import java.util.Iterator;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.StringJoiner;
import java.util.UUID;
@@ -27,7 +29,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
@@ -113,7 +114,7 @@ public class ConnectionClientPool {
private final ConcurrentMap<UUID, GridCommunicationClient[]> clients =
GridConcurrentFactory.newMap();
/** Metrics for each remote node. */
- private final Map<UUID, NodeMetrics> metrics;
+ private final Map<UUID, NodeConnectionMetrics> metrics;
/** Config. */
private final TcpCommunicationConfiguration cfg;
@@ -166,9 +167,6 @@ public class ConnectionClientPool {
/** */
private final GridMetricManager metricsMgr;
- /** */
- private volatile AtomicBoolean asyncMetric;
-
/**
* @param cfg Config.
* @param attrs Attributes.
@@ -210,32 +208,22 @@ public class ConnectionClientPool {
this.clusterStateProvider = clusterStateProvider;
this.nioSrvWrapper = nioSrvWrapper;
this.metricsMgr = metricsMgr;
-
- this.nodeGetter = new Function<>() {
- @Override public ClusterNode apply(UUID nodeId) {
- ClusterNode node = nodeGetter.apply(nodeId);
-
- if (node == null)
- removeNodeMetrics(nodeId);
-
- return node;
- }
- };
+ this.nodeGetter = nodeGetter;
this.handshakeTimeoutExecutorService =
newSingleThreadScheduledExecutor(
new IgniteThreadFactory(igniteInstanceName,
"handshake-timeout-client")
);
- if (metricsMgr != null) {
- MetricRegistryImpl mreg =
metricsMgr.registry(SHARED_METRICS_REGISTRY_NAME);
+ metrics = new ConcurrentHashMap<>(64, 0.75f, Math.max(16,
Runtime.getRuntime().availableProcessors()));
- mreg.register(METRIC_NAME_POOL_SIZE, () ->
cfg.connectionsPerNode(), "Maximal connections number to a remote node.");
- mreg.register(METRIC_NAME_PAIRED_CONNS, () ->
cfg.usePairedConnections(), "Paired connections flag.");
+ MetricRegistryImpl mreg =
metricsMgr.registry(SHARED_METRICS_REGISTRY_NAME);
- metrics = new ConcurrentHashMap<>(64, 0.75f, Math.max(16,
Runtime.getRuntime().availableProcessors()));
- }
- else
- metrics = null;
+ mreg.register(METRIC_NAME_POOL_SIZE, () -> cfg.connectionsPerNode(),
"Maximal connections number to a remote node.");
+ mreg.register(METRIC_NAME_PAIRED_CONNS, () ->
cfg.usePairedConnections(), "Paired connections flag.");
+ mreg.register(
+ METRIC_NAME_ASYNC_CONNS,
+ () -> true, // Currently we have only one connection
implementation that is ASYNC (see GridTcpNioCommunicationClient#async)
+ "Asynchronous flag. If TRUE, connections put data in a queue (with
some preprocessing) instead of immediate sending.");
}
/**
@@ -246,7 +234,9 @@ public class ConnectionClientPool {
metricsMgr.remove(SHARED_METRICS_REGISTRY_NAME);
- clients.keySet().forEach(this::removeNodeMetrics);
+ metrics.values().forEach(NodeConnectionMetrics::unregister);
+
+ metrics.clear();
for (GridFutureAdapter<GridCommunicationClient> fut :
clientFuts.values()) {
if (fut instanceof ConnectionRequestFuture) {
@@ -267,7 +257,7 @@ public class ConnectionClientPool {
* @throws IgniteCheckedException Thrown if any exception occurs.
*/
public GridCommunicationClient reserveClient(ClusterNode node, int
connIdx) throws IgniteCheckedException {
- NodeMetrics nodeMetrics = metrics.get(node.id());
+ NodeConnectionMetrics nodeMetrics = metrics.get(node.id());
if (nodeMetrics != null)
nodeMetrics.acquiringThreadsCnt.incrementAndGet();
@@ -416,11 +406,8 @@ public class ConnectionClientPool {
assert connIdx == client.connectionIndex() : client;
- if (client.reserve()) {
- updateClientAcquiredMetric(client);
-
+ if (client.reserve())
return client;
- }
else
// Client has just been closed by idle worker. Help it and
try again.
removeNodeClient(nodeId, client);
@@ -432,25 +419,6 @@ public class ConnectionClientPool {
}
}
- /** */
- private void updateClientAcquiredMetric(GridCommunicationClient client) {
- if (asyncMetric == null) {
- synchronized (metrics) {
- if (asyncMetric == null) {
- MetricRegistryImpl mreg =
metricsMgr.registry(SHARED_METRICS_REGISTRY_NAME);
-
- // We assume that all the clients have the same async flag.
- asyncMetric = new AtomicBoolean(client.async());
-
- mreg.register(METRIC_NAME_ASYNC_CONNS, () ->
asyncMetric.get(), "Asynchronous flag. If TRUE, " +
- "connections put data in a queue (with some
preprocessing) instead of immediate sending.");
- }
- }
- }
- else
- assert client.async() == asyncMetric.get();
- }
-
/**
* Handles {@link NodeUnreachableException}. This means that the method
will try to trigger client itself to open connection.
* The only possible way of doing this is to use {@link
TcpCommunicationConfiguration#connectionRequestor()}'s trigger and wait.
@@ -622,135 +590,32 @@ public class ConnectionClientPool {
", client=" + addClient +
", oldClient=" + curClients[connIdx] + ']';
- GridCommunicationClient[] newClients;
+ GridCommunicationClient[] newClients = curClients == null
+ ? new GridCommunicationClient[cfg.connectionsPerNode()]
+ : Arrays.copyOf(curClients, curClients.length);
- if (curClients == null) {
- newClients = new
GridCommunicationClient[cfg.connectionsPerNode()];
- newClients[connIdx] = addClient;
+ newClients[connIdx] = addClient;
- curClients = clients.compute(node.id(), (nodeId0, clients0) ->
{
- if (clients0 == null) {
- // Syncs metrics creation on this map.
- createNodeMetrics(node);
+ boolean success;
- return newClients;
- }
-
- return clients0;
- });
+ if (curClients == null) {
+ success = clients.putIfAbsent(node.id(), newClients) == null;
- if (curClients != null)
- break;
+ if (success)
+ registerNodeMetrics(node);
}
- else {
- newClients = curClients.clone();
- newClients[connIdx] = addClient;
+ else
+ success = clients.replace(node.id(), curClients, newClients);
+ if (success) {
if (log.isDebugEnabled())
- log.debug("The node client was replaced [nodeId=" +
node.id() + ", connIdx=" + connIdx + ", client=" + addClient + "]");
+ log.debug("New node client were added [nodeId=" +
node.id() + ", connIdx=" + connIdx + ", client=" + addClient + "]");
- if (clients.replace(node.id(), curClients, newClients))
- break;
+ break;
}
}
}
- /** */
- private void createNodeMetrics(ClusterNode node) {
- MetricRegistryImpl mreg =
metricsMgr.registry(nodeMetricsRegName(node.id()));
-
- assert !mreg.iterator().hasNext() : "Node connection pools metrics
aren't empty.";
-
- mreg.register(METRIC_NAME_CONSIST_ID, () ->
node.consistentId().toString(), String.class,
- "Consistent id of the remote node as string.");
-
- mreg.register(METRIC_NAME_CUR_CNT, () ->
updatedNodeMetrics(node.id()).connsCnt,
- "Number of current connections to the remote node.");
-
- mreg.register(METRIC_NAME_MSG_QUEUE_SIZE, () ->
updatedNodeMetrics(node.id()).msgsQueueSize,
- "Overal number of pending messages to the remote node.");
-
- mreg.register(METRIC_NAME_MAX_NET_IDLE_TIME, () ->
updatedNodeMetrics(node.id()).maxIdleTime,
- "Maximal idle time of physical sending or receiving data in
milliseconds.");
-
- mreg.register(METRIC_NAME_AVG_LIFE_TIME, () ->
updatedNodeMetrics(node.id()).avgLifetime,
- "Average connection lifetime in milliseconds.");
-
- mreg.register(METRIC_NAME_REMOVED_CNT, () ->
updatedNodeMetrics(node.id()).removedConnectionsCnt.get(),
- "Total number of removed connections.");
-
- mreg.register(METRIC_NAME_ACQUIRING_THREADS_CNT, () ->
updatedNodeMetrics(node.id()).acquiringThreadsCnt.get(),
- "Number of threads currently acquiring a connection.");
- }
-
- /** */
- private NodeMetrics updatedNodeMetrics(UUID nodeId) {
- long nowNanos = System.nanoTime();
-
- NodeMetrics res = metrics.get(nodeId);
-
- if (res == null || (nowNanos - res.updateTs > METRICS_UPDATE_THRESHOLD
&& res.canUpdate())) {
- GridCommunicationClient[] nodeClients = clients.get(nodeId);
-
- // Node might already leave the cluster.
- if (nodeClients != null) {
- long nowMillis = U.currentTimeMillis();
-
- res = new NodeMetrics(res);
-
- long avgLifetime = 0;
- long maxIdleTime = 0;
-
- for (GridCommunicationClient nodeClient : nodeClients) {
- if (nodeClient == null)
- continue;
-
- ++res.connsCnt;
-
- avgLifetime += nowMillis - nodeClient.creationTime();
-
- long nodeIdleTime = nodeClient.getIdleTime();
-
- if (nodeIdleTime > maxIdleTime)
- maxIdleTime = nodeIdleTime;
-
- res.msgsQueueSize += nodeClient.messagesQueueSize();
- }
-
- if (res.connsCnt != 0)
- res.avgLifetime = avgLifetime / res.connsCnt;
-
- res.maxIdleTime = maxIdleTime;
-
- NodeMetrics res0 = res;
-
- res.updateTs = System.nanoTime();
-
- // Node might already leave the cluster. Syncs metrics removal
on the clients map.
- clients.compute(nodeId, (nodeId0, clients) -> {
- if (clients == null)
- removeNodeMetrics(nodeId);
- else
- metrics.put(nodeId, res0);
-
- return clients;
- });
- }
- else if (res != null) {
- removeNodeMetrics(nodeId);
-
- res = null;
- }
- }
-
- return res == null ? NodeMetrics.EMPTY : res;
- }
-
- /** */
- public static String nodeMetricsRegName(UUID nodeId) {
- return metricName(SHARED_METRICS_REGISTRY_NAME, nodeId.toString());
- }
-
/**
* @param nodeId Node ID.
* @param rmvClient Client to remove.
@@ -770,9 +635,9 @@ public class ConnectionClientPool {
newClients[rmvClient.connectionIndex()] = null;
if (clients.replace(nodeId, curClients, newClients)) {
- NodeMetrics nodeMetrics = metrics.get(nodeId);
+ NodeConnectionMetrics nodeMetrics = metrics.get(nodeId);
- if (nodeMetrics != null && nodeMetrics != NodeMetrics.EMPTY)
+ if (nodeMetrics != null)
nodeMetrics.removedConnectionsCnt.addAndGet(1);
if (log.isDebugEnabled())
@@ -794,8 +659,6 @@ public class ConnectionClientPool {
if (log.isDebugEnabled())
log.debug("The node client connections were closed [nodeId=" +
nodeId + "]");
- removeNodeMetrics(nodeId);
-
GridCommunicationClient[] clients = this.clients.remove(nodeId);
if (nonNull(clients)) {
for (GridCommunicationClient client : clients)
@@ -818,7 +681,7 @@ public class ConnectionClientPool {
public void onNodeLeft(UUID nodeId) {
GridCommunicationClient[] clients0 = clients.remove(nodeId);
- removeNodeMetrics(nodeId);
+ unregisterNodeMetrics(nodeId);
if (clients0 != null) {
for (GridCommunicationClient client : clients0) {
@@ -835,10 +698,18 @@ public class ConnectionClientPool {
}
/** */
- private void removeNodeMetrics(UUID nodeId) {
- metricsMgr.remove(nodeMetricsRegName(nodeId));
+ public void cleanupNodeMetrics() {
+ Iterator<Map.Entry<UUID, NodeConnectionMetrics>> iter =
metrics.entrySet().iterator();
- metrics.remove(nodeId);
+ while (iter.hasNext()) {
+ Map.Entry<UUID, NodeConnectionMetrics> entry = iter.next();
+
+ if (nodeGetter.apply(entry.getKey()) == null) {
+ entry.getValue().unregister();
+
+ iter.remove();
+ }
+ }
}
/**
@@ -909,55 +780,149 @@ public class ConnectionClientPool {
}
/** */
- private static final class NodeMetrics {
- /** Avoids NPEs on metrics getting because nodes leave cluster
asynchronously. */
- private static final NodeMetrics EMPTY = new NodeMetrics(null);
+ private void registerNodeMetrics(ClusterNode node) {
+ metrics.put(node.id(), new NodeConnectionMetrics(node));
+ }
+
+ /** */
+ void unregisterNodeMetrics(UUID nodeId) {
+ NodeConnectionMetrics nodeMetrics = metrics.remove(nodeId);
+
+ if (nodeMetrics != null)
+ nodeMetrics.unregister();
+ }
+
+ /** */
+ private class NodeConnectionMetrics {
+ /** */
+ private final MetricRegistryImpl registry;
/** */
- private volatile long updateTs = System.nanoTime();
+ private final ClusterNode node;
/** */
- private volatile boolean updatingFlag;
+ private final AtomicLong removedConnectionsCnt = new AtomicLong();
/** */
- private int connsCnt;
+ private final AtomicLong maxIdleTimeOut = new AtomicLong();
/** */
- private int msgsQueueSize;
+ private final AtomicInteger acquiringThreadsCnt = new AtomicInteger();
/** */
- private long maxIdleTime;
+ NodeConnectionMetrics(ClusterNode node) {
+ this.node = node;
+ registry = createRegistry();
+ }
/** */
- private long avgLifetime;
+ void unregister() {
+ metricsMgr.remove(registry.name());
+ }
/** */
- private final AtomicLong removedConnectionsCnt;
+ private int connectionsCount() {
+ GridCommunicationClient[] nodeClients = clients.get(node.id());
+
+ return nodeClients == null ? 0 :
(int)Arrays.stream(nodeClients).filter(Objects::nonNull).count();
+ }
/** */
- private final AtomicInteger acquiringThreadsCnt;
+ private int pendingMessagesCount() {
+ GridCommunicationClient[] nodeClients = clients.get(node.id());
+
+ return nodeClients == null
+ ? 0
+ :
Arrays.stream(nodeClients).filter(Objects::nonNull).mapToInt(GridCommunicationClient::messagesQueueSize).sum();
+ }
/** */
- private NodeMetrics(@Nullable NodeMetrics prev) {
- this.removedConnectionsCnt = prev == null ? new AtomicLong() :
prev.removedConnectionsCnt;
- this.acquiringThreadsCnt = prev == null ? new AtomicInteger() :
prev.acquiringThreadsCnt;
- this.avgLifetime = prev == null ? 0 : prev.avgLifetime;
- this.maxIdleTime = prev == null ? 0 : prev.maxIdleTime;
+ private int clientTotalReservationCount() {
+ return acquiringThreadsCnt.get();
}
/** */
- private boolean canUpdate() {
- if (updatingFlag)
- return false;
+ private long averageConnectionLifeTime() {
+ GridCommunicationClient[] nodeClients = clients.get(node.id());
- synchronized (this) {
- if (updatingFlag)
- return false;
+ if (nodeClients == null)
+ return 0;
- updatingFlag = true;
+ long now = U.currentTimeMillis();
+ int connCnt = 0;
- return true;
+ long totalLifetime = 0;
+
+ for (GridCommunicationClient client : nodeClients) {
+ if (client == null)
+ continue;
+
+ ++connCnt;
+
+ totalLifetime += (now - client.creationTime());
+ }
+
+ return totalLifetime / connCnt;
+ }
+
+ /** */
+ private long maxConnectionsIdleTime() {
+ GridCommunicationClient[] nodeClients = clients.get(node.id());
+
+ long max = nodeClients == null
+ ? 0
+ :
Arrays.stream(nodeClients).filter(Objects::nonNull).mapToLong(GridCommunicationClient::getIdleTime).max().orElse(0);
+
+ long cur;
+
+ do {
+ cur = maxIdleTimeOut.get();
}
+ while (cur < max && !maxIdleTimeOut.compareAndSet(cur, max));
+
+ return maxIdleTimeOut.get();
+ }
+
+ /** */
+ private long removedConnectionsCount() {
+ return removedConnectionsCnt.get();
+ }
+
+ /** */
+ private MetricRegistryImpl createRegistry() {
+ MetricRegistryImpl mreg =
metricsMgr.registry(metricName(SHARED_METRICS_REGISTRY_NAME,
node.id().toString()));
+
+ mreg.register(
+ METRIC_NAME_CONSIST_ID,
+ () -> node.consistentId().toString(),
+ String.class,
+ "Consistent id of the remote node as string.");
+ mreg.register(
+ METRIC_NAME_CUR_CNT,
+ this::connectionsCount,
+ "Number of current connections to the remote node.");
+ mreg.register(
+ METRIC_NAME_MSG_QUEUE_SIZE,
+ this::pendingMessagesCount,
+ "Overall number of pending messages to the remote node.");
+ mreg.register(
+ METRIC_NAME_MAX_NET_IDLE_TIME,
+ this::maxConnectionsIdleTime,
+ "Maximal idle time of physical sending or receiving data in
milliseconds.");
+ mreg.register(
+ METRIC_NAME_AVG_LIFE_TIME,
+ this::averageConnectionLifeTime,
+ "Average connection lifetime in milliseconds.");
+ mreg.register(
+ METRIC_NAME_REMOVED_CNT,
+ this::removedConnectionsCount,
+ "Total number of removed connections.");
+ mreg.register(
+ METRIC_NAME_ACQUIRING_THREADS_CNT,
+ this::clientTotalReservationCount,
+ "Number of threads currently acquiring a connection.");
+
+ return mreg;
}
}
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/CommunicationConnectionPoolMetricsTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/CommunicationConnectionPoolMetricsTest.java
index 5189eacc8a1..b369b2e6f1c 100644
---
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/CommunicationConnectionPoolMetricsTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/CommunicationConnectionPoolMetricsTest.java
@@ -19,7 +19,6 @@ package org.apache.ignite.spi.communication.tcp;
import java.nio.ByteBuffer;
import java.util.Collection;
-import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -60,6 +59,7 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import static
org.apache.ignite.internal.processors.metric.impl.MetricUtils.metricName;
import static
org.apache.ignite.spi.communication.tcp.internal.ConnectionClientPool.METRIC_NAME_ACQUIRING_THREADS_CNT;
import static
org.apache.ignite.spi.communication.tcp.internal.ConnectionClientPool.METRIC_NAME_AVG_LIFE_TIME;
import static
org.apache.ignite.spi.communication.tcp.internal.ConnectionClientPool.METRIC_NAME_CONSIST_ID;
@@ -67,7 +67,7 @@ import static
org.apache.ignite.spi.communication.tcp.internal.ConnectionClientP
import static
org.apache.ignite.spi.communication.tcp.internal.ConnectionClientPool.METRIC_NAME_MAX_NET_IDLE_TIME;
import static
org.apache.ignite.spi.communication.tcp.internal.ConnectionClientPool.METRIC_NAME_MSG_QUEUE_SIZE;
import static
org.apache.ignite.spi.communication.tcp.internal.ConnectionClientPool.METRIC_NAME_REMOVED_CNT;
-import static
org.apache.ignite.spi.communication.tcp.internal.ConnectionClientPool.nodeMetricsRegName;
+import static
org.apache.ignite.spi.communication.tcp.internal.ConnectionClientPool.SHARED_METRICS_REGISTRY_NAME;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
/** Tests metrics of {@link ConnectionClientPool}. */
@@ -153,7 +153,6 @@ public class CommunicationConnectionPoolMetricsTest extends
GridCommonAbstractTe
Ignite ldr = clientLdr ? cli : srvr;
- GridMetricManager metricsMgr = ((IgniteEx)ldr).context().metric();
AtomicBoolean runFlag = new AtomicBoolean(true);
TestMessage msg = new TestMessage();
@@ -164,7 +163,7 @@ public class CommunicationConnectionPoolMetricsTest extends
GridCommonAbstractTe
if (node == ldr)
continue;
- MetricRegistryImpl mreg =
metricsMgr.registry(nodeMetricsRegName(node.cluster().localNode().id()));
+ MetricRegistryImpl mreg = metricsForCommunicationConnection(ldr,
node);
assertTrue(waitForCondition(
() -> {
@@ -187,7 +186,7 @@ public class CommunicationConnectionPoolMetricsTest extends
GridCommonAbstractTe
if (node == ldr)
continue;
- MetricRegistryImpl mreg =
metricsMgr.registry(nodeMetricsRegName(node.cluster().localNode().id()));
+ MetricRegistryImpl mreg = metricsForCommunicationConnection(ldr,
node);
assertTrue(waitForCondition(() ->
mreg.<LongMetric>findMetric(METRIC_NAME_REMOVED_CNT).value() >= connsPerNode,
getTestTimeout()));
@@ -206,7 +205,6 @@ public class CommunicationConnectionPoolMetricsTest extends
GridCommonAbstractTe
Ignite ldr = clientLdr ? cli : srvr;
- GridMetricManager metricsMgr = ((IgniteEx)ldr).context().metric();
AtomicBoolean runFlag = new AtomicBoolean(true);
Message msg = new TestMessage();
@@ -217,7 +215,7 @@ public class CommunicationConnectionPoolMetricsTest extends
GridCommonAbstractTe
if (node == ldr)
continue;
- MetricRegistryImpl mreg =
metricsMgr.registry(nodeMetricsRegName(node.cluster().localNode().id()));
+ MetricRegistryImpl mreg = metricsForCommunicationConnection(ldr,
node);
assertTrue(waitForCondition(
() -> {
@@ -241,6 +239,8 @@ public class CommunicationConnectionPoolMetricsTest extends
GridCommonAbstractTe
/** */
@Test
public void testMetricsBasics() throws Exception {
+ maxConnIdleTimeout = 500;
+
int preloadCnt = 300;
int srvrCnt = 3;
@@ -250,7 +250,7 @@ public class CommunicationConnectionPoolMetricsTest extends
GridCommonAbstractTe
Ignite ldr = clientLdr ? cli : srvr;
GridMetricManager ldrMetricsMgr = ((IgniteEx)ldr).context().metric();
- MetricRegistryImpl mreg0 =
ldrMetricsMgr.registry(ConnectionClientPool.SHARED_METRICS_REGISTRY_NAME);
+ MetricRegistryImpl mreg0 =
ldrMetricsMgr.registry(SHARED_METRICS_REGISTRY_NAME);
assertEquals(connsPerNode,
mreg0.<IntMetric>findMetric(ConnectionClientPool.METRIC_NAME_POOL_SIZE).value());
assertEquals(pairedConns,
mreg0.<BooleanGauge>findMetric(ConnectionClientPool.METRIC_NAME_PAIRED_CONNS).value());
@@ -277,9 +277,7 @@ public class CommunicationConnectionPoolMetricsTest extends
GridCommonAbstractTe
if (node == ldr)
continue;
- UUID nodeId = node.cluster().localNode().id();
-
- MetricRegistryImpl mreg =
ldrMetricsMgr.registry(nodeMetricsRegName(nodeId));
+ MetricRegistryImpl mreg = metricsForCommunicationConnection(ldr,
node);
// We assume that entire pool was used at least once.
assertTrue(waitForCondition(() -> connsPerNode ==
mreg.<IntMetric>findMetric(METRIC_NAME_CUR_CNT).value(),
@@ -316,11 +314,9 @@ public class CommunicationConnectionPoolMetricsTest
extends GridCommonAbstractTe
if (!node.cluster().localNode().isClient() && --srvrCnt == 0)
break;
- UUID nodeId = node.cluster().localNode().id();
-
// Wait until there is no messages to this node.
assertTrue(waitForCondition(() -> {
- MetricRegistryImpl mreg =
ldrMetricsMgr.registry(nodeMetricsRegName(nodeId));
+ MetricRegistryImpl mreg =
metricsForCommunicationConnection(ldr, node);
assert mreg != null;
@@ -330,7 +326,7 @@ public class CommunicationConnectionPoolMetricsTest extends
GridCommonAbstractTe
assertTrue(G.stop(node.name(), false));
assertTrue(waitForCondition(() -> {
- MetricRegistryImpl mreg =
ldrMetricsMgr.registry(nodeMetricsRegName(nodeId));
+ MetricRegistryImpl mreg =
metricsForCommunicationConnection(ldr, node);
return mreg == null || !mreg.iterator().hasNext();
}, getTestTimeout()));
@@ -352,7 +348,6 @@ public class CommunicationConnectionPoolMetricsTest extends
GridCommonAbstractTe
Ignite ldr = clientLdr ? cli : srvr;
- GridMetricManager metricsMgr = ((IgniteEx)ldr).context().metric();
AtomicBoolean runFlag = new AtomicBoolean(true);
IgniteInternalFuture<?> monFut = GridTestUtils.runAsync(() -> {
@@ -361,7 +356,7 @@ public class CommunicationConnectionPoolMetricsTest extends
GridCommonAbstractTe
if (node == ldr)
continue;
- MetricRegistryImpl mreg =
metricsMgr.registry(nodeMetricsRegName(node.cluster().localNode().id()));
+ MetricRegistryImpl mreg =
metricsForCommunicationConnection(ldr, node);
IntMetric m =
mreg.findMetric(METRIC_NAME_ACQUIRING_THREADS_CNT);
@@ -392,7 +387,6 @@ public class CommunicationConnectionPoolMetricsTest extends
GridCommonAbstractTe
Ignite ldr = clientLdr ? client : server;
- GridMetricManager metricsMgr = ((IgniteEx)ldr).context().metric();
AtomicBoolean runFlag = new AtomicBoolean(true);
AtomicLong loadCnt = new AtomicLong(preloadCnt);
@@ -420,7 +414,7 @@ public class CommunicationConnectionPoolMetricsTest extends
GridCommonAbstractTe
if (node == ldr)
continue;
- MetricRegistryImpl mreg =
metricsMgr.registry(nodeMetricsRegName(node.cluster().localNode().id()));
+ MetricRegistryImpl mreg = metricsForCommunicationConnection(ldr,
node);
assertTrue(waitForCondition(
() ->
mreg.<IntMetric>findMetric(METRIC_NAME_MSG_QUEUE_SIZE).value() >= Math.max(10,
msgQueueLimit),
@@ -490,13 +484,11 @@ public class CommunicationConnectionPoolMetricsTest
extends GridCommonAbstractTe
if (!log.isInfoEnabled())
return;
- GridMetricManager metricsMgr = ((IgniteEx)ldr).context().metric();
-
for (Ignite node : G.allGrids()) {
if (node == ldr)
continue;
- MetricRegistryImpl mreg =
metricsMgr.registry(nodeMetricsRegName(node.cluster().localNode().id()));
+ MetricRegistryImpl mreg = metricsForCommunicationConnection(ldr,
node);
StringBuilder b = new StringBuilder()
.append("Pool metrics from node
").append(ldr.cluster().localNode().order())
@@ -542,6 +534,13 @@ public class CommunicationConnectionPoolMetricsTest
extends GridCommonAbstractTe
}
}
+ /** */
+ public static MetricRegistryImpl metricsForCommunicationConnection(Ignite
from, Ignite to) {
+ return ((IgniteEx)from).context()
+ .metric()
+ .registry(metricName(SHARED_METRICS_REGISTRY_NAME,
((IgniteEx)to).context().localNodeId().toString()));
+ }
+
/** */
private static class TestMessage extends GridTestMessage {
/** */
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiHalfOpenedConnectionTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationSpiHalfOpenedConnectionTest.java
similarity index 88%
rename from
modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiHalfOpenedConnectionTest.java
rename to
modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationSpiHalfOpenedConnectionTest.java
index ee665ca8a60..b1d9bb5ee25 100644
---
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiHalfOpenedConnectionTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationSpiHalfOpenedConnectionTest.java
@@ -15,17 +15,15 @@
* limitations under the License.
*/
-package org.apache.ignite.spi.communication.tcp;
+package org.apache.ignite.spi.communication.tcp.internal;
import java.io.IOException;
import java.util.Iterator;
-import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import org.apache.ignite.Ignite;
import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.processors.metric.GridMetricManager;
import org.apache.ignite.internal.util.nio.GridCommunicationClient;
import org.apache.ignite.internal.util.nio.GridNioRecoveryDescriptor;
import org.apache.ignite.internal.util.nio.GridNioServerListener;
@@ -33,7 +31,7 @@ import
org.apache.ignite.internal.util.nio.GridTcpNioCommunicationClient;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.communication.CommunicationSpi;
-import org.apache.ignite.spi.communication.tcp.internal.ConnectionClientPool;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
@@ -138,8 +136,8 @@ public class TcpCommunicationSpiHalfOpenedConnectionTest
extends GridCommonAbstr
private void reconnect(Ignite srcNode, Ignite targetNode, ClusterGroup
targetGrp) {
CommunicationSpi commSpi =
srcNode.configuration().getCommunicationSpi();
- ConcurrentMap<UUID, GridCommunicationClient[]> clients =
GridTestUtils.getFieldValue(commSpi, "clientPool", "clients");
- GridMetricManager metricMgr = GridTestUtils.getFieldValue(commSpi,
"clientPool", "metricsMgr");
+ ConnectionClientPool connPool = GridTestUtils.getFieldValue(commSpi,
"clientPool");
+ ConcurrentMap<UUID, GridCommunicationClient[]> clients =
GridTestUtils.getFieldValue(connPool, "clients");
ConcurrentMap<?, GridNioRecoveryDescriptor> recoveryDescs =
GridTestUtils.getFieldValue(commSpi, "nioSrvWrapper", "recoveryDescs");
ConcurrentMap<?, GridNioRecoveryDescriptor> outRecDescs =
GridTestUtils.getFieldValue(commSpi, "nioSrvWrapper", "outRecDescs");
ConcurrentMap<?, GridNioRecoveryDescriptor> inRecDescs =
GridTestUtils.getFieldValue(commSpi, "nioSrvWrapper", "inRecDescs");
@@ -159,20 +157,14 @@ public class TcpCommunicationSpiHalfOpenedConnectionTest
extends GridCommonAbstr
desc.release();
}
+ UUID targetNodeId = targetNode.cluster().localNode().id();
+
// Remove client to avoid calling close(), in that case server
// will close connection too, but we want to keep the server
// uninformed and force ping old connection.
- GridCommunicationClient[] clients0 =
clients.remove(targetNode.cluster().localNode().id());
-
- if (metricMgr != null) {
- Map<UUID, ?> metrics = GridTestUtils.getFieldValue(commSpi,
"clientPool", "metrics");
-
- assert metrics != null;
+ GridCommunicationClient[] clients0 = clients.remove(targetNodeId);
- metrics.remove(targetNode.cluster().localNode().id());
-
-
metricMgr.remove(ConnectionClientPool.nodeMetricsRegName(targetNode.cluster().localNode().id()));
- }
+ connPool.unregisterNodeMetrics(targetNodeId);
for (GridCommunicationClient commClient : clients0)
lsnr.onDisconnected(((GridTcpNioCommunicationClient)commClient).session(), new
IOException("Test exception"));
diff --git
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
index 1029d060c2b..48d7b6fddca 100644
---
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
+++
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
@@ -49,13 +49,13 @@ import
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpiDropNodesTest;
import
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpiFaultyClientSslTest;
import
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpiFaultyClientTest;
import
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpiFreezingClientTest;
-import
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpiHalfOpenedConnectionTest;
import
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpiInverseConnectionLoggingTest;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpiMultiJvmTest;
import
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpiNodeLeftLoggingTest;
import
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpiSkipMessageSendTest;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationStatisticsTest;
import
org.apache.ignite.spi.communication.tcp.TooManyOpenFilesTcpCommunicationSpiTest;
+import
org.apache.ignite.spi.communication.tcp.internal.TcpCommunicationSpiHalfOpenedConnectionTest;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;