This is an automated email from the ASF dual-hosted git repository. nizhikov pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new eec735a IGNITE-12576: nodeId replaced with consistentId in TcpCommunicationMetricsListener. (#7310) eec735a is described below commit eec735a4ea39439d57a1da89b5cc17bfbff2a74c Author: Ivan Bessonov <bessonov...@gmail.com> AuthorDate: Thu Jan 30 11:24:12 2020 +0300 IGNITE-12576: nodeId replaced with consistentId in TcpCommunicationMetricsListener. (#7310) --- .../tcp/TcpCommunicationMetricsListener.java | 111 ++++++++++++--------- .../spi/communication/tcp/TcpCommunicationSpi.java | 58 +++++++---- .../TcpCommunicationConnectionCheckFuture.java | 17 ++-- ...idTcpCommunicationSpiMultithreadedSelfTest.java | 2 +- .../tcp/TcpCommunicationStatisticsTest.java | 18 ++-- 5 files changed, 122 insertions(+), 84 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationMetricsListener.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationMetricsListener.java index 504cbb4d..d89fe2c 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationMetricsListener.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationMetricsListener.java @@ -24,6 +24,8 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; +import org.apache.ignite.Ignite; +import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.managers.communication.GridIoMessage; import org.apache.ignite.internal.processors.metric.GridMetricManager; import org.apache.ignite.internal.processors.metric.MetricRegistry; @@ -33,6 +35,7 @@ import org.apache.ignite.spi.metric.LongMetric; import org.apache.ignite.spi.metric.Metric; import org.apache.ignite.spi.metric.ReadOnlyMetricRegistry; +import static java.util.stream.Collectors.toMap; import static org.apache.ignite.internal.processors.metric.impl.MetricUtils.SEPARATOR; import static org.apache.ignite.internal.processors.metric.impl.MetricUtils.metricName; import static org.apache.ignite.internal.util.nio.GridNioServer.RECEIVED_BYTES_METRIC_DESC; @@ -40,14 +43,14 @@ import static org.apache.ignite.internal.util.nio.GridNioServer.RECEIVED_BYTES_M import static org.apache.ignite.internal.util.nio.GridNioServer.SENT_BYTES_METRIC_DESC; import static org.apache.ignite.internal.util.nio.GridNioServer.SENT_BYTES_METRIC_NAME; import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.COMMUNICATION_METRICS_GROUP_NAME; -import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.RECEIVED_MESSAGES_BY_NODE_ID_METRIC_DESC; -import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.RECEIVED_MESSAGES_BY_NODE_ID_METRIC_NAME; +import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_DESC; +import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME; import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.RECEIVED_MESSAGES_BY_TYPE_METRIC_DESC; import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.RECEIVED_MESSAGES_BY_TYPE_METRIC_NAME; import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.RECEIVED_MESSAGES_METRIC_DESC; import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.RECEIVED_MESSAGES_METRIC_NAME; -import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.SENT_MESSAGES_BY_NODE_ID_METRIC_DESC; -import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.SENT_MESSAGES_BY_NODE_ID_METRIC_NAME; +import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_DESC; +import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME; import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.SENT_MESSAGES_BY_TYPE_METRIC_DESC; import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.SENT_MESSAGES_BY_TYPE_METRIC_NAME; import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.SENT_MESSAGES_METRIC_DESC; @@ -63,6 +66,9 @@ class TcpCommunicationMetricsListener { /** Metrics registry. */ private final org.apache.ignite.internal.processors.metric.MetricRegistry mreg; + /** Current ignite instance. */ + private final Ignite ignite; + /** All registered metrics. */ private final Set<ThreadMetrics> allMetrics = Collections.newSetFromMap(new ConcurrentHashMap<>()); @@ -81,11 +87,11 @@ class TcpCommunicationMetricsListener { /** Function to be used in {@link Map#computeIfAbsent(Object, Function)} of {@code rcvdMsgsMetricsByType}. */ private final Function<Short, LongAdderMetric> rcvdMsgsCntByTypeMetricFactory; - /** Function to be used in {@link Map#computeIfAbsent(Object, Function)} of {@code sentMsgsMetricsByNodeId}. */ - private final Function<UUID, LongAdderMetric> sentMsgsCntByNodeIdMetricFactory; + /** Function to be used in {@link Map#computeIfAbsent(Object, Function)} of {@code #sentMsgsMetricsByConsistentId}. */ + private final Function<Object, LongAdderMetric> sentMsgsCntByConsistentIdMetricFactory; - /** Function to be used in {@link Map#computeIfAbsent(Object, Function)} of {@code rcvdMsgsMetricsByNodeId}. */ - private final Function<UUID, LongAdderMetric> rcvdMsgsCntByNodeIdMetricFactory; + /** Function to be used in {@link Map#computeIfAbsent(Object, Function)} of {@code #rcvdMsgsMetricsByConsistentId}. */ + private final Function<Object, LongAdderMetric> rcvdMsgsCntByConsistentIdMetricFactory; /** Sent bytes count metric.*/ private final LongAdderMetric sentBytesMetric; @@ -106,8 +112,9 @@ class TcpCommunicationMetricsListener { private volatile Map<Short, String> msgTypMap; /** */ - public TcpCommunicationMetricsListener(GridMetricManager mmgr) { + public TcpCommunicationMetricsListener(GridMetricManager mmgr, Ignite ignite) { this.mmgr = mmgr; + this.ignite = ignite; mreg = mmgr.registry(COMMUNICATION_METRICS_GROUP_NAME); @@ -120,13 +127,13 @@ class TcpCommunicationMetricsListener { RECEIVED_MESSAGES_BY_TYPE_METRIC_DESC ); - sentMsgsCntByNodeIdMetricFactory = nodeId -> - mmgr.registry(metricName(COMMUNICATION_METRICS_GROUP_NAME, nodeId.toString())) - .findMetric(SENT_MESSAGES_BY_NODE_ID_METRIC_NAME); + sentMsgsCntByConsistentIdMetricFactory = consistentId -> + mmgr.registry(metricName(COMMUNICATION_METRICS_GROUP_NAME, consistentId.toString())) + .findMetric(SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME); - rcvdMsgsCntByNodeIdMetricFactory = nodeId -> - mmgr.registry(metricName(COMMUNICATION_METRICS_GROUP_NAME, nodeId.toString())) - .findMetric(RECEIVED_MESSAGES_BY_NODE_ID_METRIC_NAME); + rcvdMsgsCntByConsistentIdMetricFactory = consistentId -> + mmgr.registry(metricName(COMMUNICATION_METRICS_GROUP_NAME, consistentId.toString())) + .findMetric(RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME); sentBytesMetric = mreg.longAdderMetric(SENT_BYTES_METRIC_NAME, SENT_BYTES_METRIC_DESC); rcvdBytesMetric = mreg.longAdderMetric(RECEIVED_BYTES_METRIC_NAME, RECEIVED_BYTES_METRIC_DESC); @@ -139,9 +146,9 @@ class TcpCommunicationMetricsListener { if (!mreg.name().startsWith(COMMUNICATION_METRICS_GROUP_NAME + SEPARATOR)) return; - ((MetricRegistry)mreg).longAdderMetric(SENT_MESSAGES_BY_NODE_ID_METRIC_NAME, SENT_MESSAGES_BY_NODE_ID_METRIC_DESC); + ((MetricRegistry)mreg).longAdderMetric(SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME, SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_DESC); - ((MetricRegistry)mreg).longAdderMetric(RECEIVED_MESSAGES_BY_NODE_ID_METRIC_NAME, RECEIVED_MESSAGES_BY_NODE_ID_METRIC_DESC); + ((MetricRegistry)mreg).longAdderMetric(RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME, RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_DESC); }); } @@ -154,11 +161,11 @@ class TcpCommunicationMetricsListener { * Collects statistics for message sent by SPI. * * @param msg Sent message. - * @param nodeId Receiver node id. + * @param consistentId Receiver node consistent id. */ - public void onMessageSent(Message msg, UUID nodeId) { + public void onMessageSent(Message msg, Object consistentId) { assert msg != null; - assert nodeId != null; + assert consistentId != null; if (msg instanceof GridIoMessage) { msg = ((GridIoMessage) msg).message(); @@ -167,7 +174,7 @@ class TcpCommunicationMetricsListener { sentMsgsMetric.increment(); - threadMetrics.get().onMessageSent(msg, nodeId); + threadMetrics.get().onMessageSent(msg, consistentId); } } @@ -175,11 +182,11 @@ class TcpCommunicationMetricsListener { * Collects statistics for message received by SPI. * * @param msg Received message. - * @param nodeId Sender node id. + * @param consistentId Sender node consistent id. */ - public void onMessageReceived(Message msg, UUID nodeId) { + public void onMessageReceived(Message msg, Object consistentId) { assert msg != null; - assert nodeId != null; + assert consistentId != null; if (msg instanceof GridIoMessage) { msg = ((GridIoMessage) msg).message(); @@ -188,7 +195,7 @@ class TcpCommunicationMetricsListener { rcvdMsgsMetric.increment(); - threadMetrics.get().onMessageReceived(msg, nodeId); + threadMetrics.get().onMessageReceived(msg, consistentId); } } @@ -247,7 +254,7 @@ class TcpCommunicationMetricsListener { * @return Map containing sender nodes and respective counts. */ public Map<UUID, Long> receivedMessagesByNode() { - return collectMessagesCountByNodeId(RECEIVED_MESSAGES_BY_NODE_ID_METRIC_NAME); + return collectMessagesCountByNodeId(RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME); } /** @@ -265,7 +272,7 @@ class TcpCommunicationMetricsListener { * @return Map containing receiver nodes and respective counts. */ public Map<UUID, Long> sentMessagesByNode() { - return collectMessagesCountByNodeId(SENT_MESSAGES_BY_NODE_ID_METRIC_NAME); + return collectMessagesCountByNodeId(SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME); } /** */ @@ -296,13 +303,20 @@ class TcpCommunicationMetricsListener { protected Map<UUID, Long> collectMessagesCountByNodeId(String metricName) { Map<UUID, Long> res = new HashMap<>(); + Map<String, UUID> nodesMapping = ignite.cluster().nodes().stream().collect(toMap( + node -> node.consistentId().toString(), ClusterNode::id + )); + String mregPrefix = COMMUNICATION_METRICS_GROUP_NAME + SEPARATOR; for (ReadOnlyMetricRegistry mreg : mmgr) { if (mreg.name().startsWith(mregPrefix)) { - String nodeIdStr = mreg.name().substring(mregPrefix.length()); + String nodeConsIdStr = mreg.name().substring(mregPrefix.length()); + + UUID nodeId = nodesMapping.get(nodeConsIdStr); - UUID nodeId = UUID.fromString(nodeIdStr); + if (nodeId == null) + continue; res.put(nodeId, mreg.<LongMetric>findMetric(metricName).value()); } @@ -330,23 +344,26 @@ class TcpCommunicationMetricsListener { for (ReadOnlyMetricRegistry mreg : mmgr) { if (mreg.name().startsWith(COMMUNICATION_METRICS_GROUP_NAME + SEPARATOR)) { - mreg.findMetric(SENT_MESSAGES_BY_NODE_ID_METRIC_NAME).reset(); + mreg.findMetric(SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME).reset(); - mreg.findMetric(RECEIVED_MESSAGES_BY_NODE_ID_METRIC_NAME).reset(); + mreg.findMetric(RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME).reset(); } } } /** - * @param nodeId Left node id. + * @param consistentId Consistent id of the node. */ - public void onNodeLeft(UUID nodeId) { + public void onNodeLeft(Object consistentId) { + // Tricky part - these maps are not thread-safe. Ideally it's only required to delete one entry from each one + // of them, but this would lead to syncs in communication worker threads. Instead, we just "clean" them so they + // will be filled later lazily with the same data. for (ThreadMetrics threadMetrics : allMetrics) { - threadMetrics.rcvdMsgsMetricsByNodeId = new HashMap<>(); - threadMetrics.sentMsgsMetricsByNodeId = new HashMap<>(); + threadMetrics.sentMsgsMetricsByConsistentId = new HashMap<>(); + threadMetrics.rcvdMsgsMetricsByConsistentId = new HashMap<>(); } - mmgr.remove(metricName(COMMUNICATION_METRICS_GROUP_NAME, nodeId.toString())); + mmgr.remove(metricName(COMMUNICATION_METRICS_GROUP_NAME, consistentId.toString())); } /** @@ -402,38 +419,36 @@ class TcpCommunicationMetricsListener { private final Map<Short, LongAdderMetric> rcvdMsgsMetricsByType = new HashMap<>(); /** - * Sent messages count metrics grouped by message node id. + * Sent messages count metrics grouped by message node consistent id. */ - public volatile Map<UUID, LongAdderMetric> sentMsgsMetricsByNodeId = new HashMap<>(); + public volatile Map<Object, LongAdderMetric> sentMsgsMetricsByConsistentId = new HashMap<>(); /** - * Received messages metrics count grouped by message node id. + * Received messages metrics count grouped by message node consistent id. */ - public volatile Map<UUID, LongAdderMetric> rcvdMsgsMetricsByNodeId = new HashMap<>(); + public volatile Map<Object, LongAdderMetric> rcvdMsgsMetricsByConsistentId = new HashMap<>(); /** * Collects statistics for message sent by SPI. - * * @param msg Sent message. - * @param nodeId Receiver node id. + * @param consistentId Receiver node consistent id. */ - private void onMessageSent(Message msg, UUID nodeId) { + private void onMessageSent(Message msg, Object consistentId) { sentMsgsMetricsByType.computeIfAbsent(msg.directType(), sentMsgsCntByTypeMetricFactory).increment(); - sentMsgsMetricsByNodeId.computeIfAbsent(nodeId, sentMsgsCntByNodeIdMetricFactory).increment(); + sentMsgsMetricsByConsistentId.computeIfAbsent(consistentId, sentMsgsCntByConsistentIdMetricFactory).increment(); } /** * Collects statistics for message received by SPI. - * * @param msg Received message. - * @param nodeId Sender node id. + * @param consistentId Sender node consistent id. */ - private void onMessageReceived(Message msg, UUID nodeId) { + private void onMessageReceived(Message msg, Object consistentId) { rcvdMsgsMetricsByType.computeIfAbsent(msg.directType(), rcvdMsgsCntByTypeMetricFactory).increment(); - rcvdMsgsMetricsByNodeId.computeIfAbsent(nodeId, rcvdMsgsCntByNodeIdMetricFactory).increment(); + rcvdMsgsMetricsByConsistentId.computeIfAbsent(consistentId, rcvdMsgsCntByConsistentIdMetricFactory).increment(); } } } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 1f51879..ce324ca 100755 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -357,6 +357,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati /** Connection index meta for session. */ public static final int CONN_IDX_META = GridNioSessionMetaKey.nextUniqueKey(); + /** Node consistent id meta for session. */ + public static final int CONSISTENT_ID_META = GridNioSessionMetaKey.nextUniqueKey(); + /** Message tracker meta for session. */ private static final int TRACKER_META = GridNioSessionMetaKey.nextUniqueKey(); @@ -421,25 +424,29 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati public static final String SENT_MESSAGES_BY_TYPE_METRIC_NAME = "sentMessagesByType"; /** */ - public static final String SENT_MESSAGES_BY_TYPE_METRIC_DESC = "Total number of messages with given type sent by current node"; + public static final String SENT_MESSAGES_BY_TYPE_METRIC_DESC = + "Total number of messages with given type sent by current node"; /** */ public static final String RECEIVED_MESSAGES_BY_TYPE_METRIC_NAME = "receivedMessagesByType"; /** */ - public static final String RECEIVED_MESSAGES_BY_TYPE_METRIC_DESC = "Total number of messages with given type received by current node"; + public static final String RECEIVED_MESSAGES_BY_TYPE_METRIC_DESC = + "Total number of messages with given type received by current node"; /** */ - public static final String SENT_MESSAGES_BY_NODE_ID_METRIC_NAME = "sentMessagesToNode"; + public static final String SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME = "sentMessagesToNode"; /** */ - public static final String SENT_MESSAGES_BY_NODE_ID_METRIC_DESC = "Total number of messages sent by current node to the given node"; + public static final String SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_DESC = + "Total number of messages sent by current node to the given node"; /** */ - public static final String RECEIVED_MESSAGES_BY_NODE_ID_METRIC_NAME = "receivedMessagesFromNode"; + public static final String RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME = "receivedMessagesFromNode"; /** */ - public static final String RECEIVED_MESSAGES_BY_NODE_ID_METRIC_DESC = "Total number of messages received by current node from the given node"; + public static final String RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_DESC = + "Total number of messages received by current node from the given node"; /** */ private ConnectGateway connectGate; @@ -623,6 +630,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati return; } + ses.addMeta(CONSISTENT_ID_META, rmtNode.consistentId()); + final ConnectionKey old = ses.addMeta(CONN_IDX_META, connKey); assert old == null; @@ -786,13 +795,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati } @Override public void onMessageSent(GridNioSession ses, Message msg) { - ConnectionKey connKey = ses.meta(CONN_IDX_META); + Object consistentId = ses.meta(CONSISTENT_ID_META); - if (connKey != null) { - UUID nodeId = connKey.nodeId(); - - metricsLsnr.onMessageSent(msg, nodeId); - } + if (consistentId != null) + metricsLsnr.onMessageSent(msg, consistentId); } private void onChannelCreate( @@ -860,6 +866,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati } } else { + Object consistentId = ses.meta(CONSISTENT_ID_META); + + assert consistentId != null; + if (isChannelConnIdx(connKey.connectionIndex())) { if (ses.meta(CHANNEL_FUT_META) == null) onChannelCreate((GridSelectorNioSessionImpl)ses, connKey, msg); @@ -884,7 +894,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati } if (msg instanceof RecoveryLastReceivedMessage) { - metricsLsnr.onMessageReceived(msg, connKey.nodeId()); + metricsLsnr.onMessageReceived(msg, consistentId); GridNioRecoveryDescriptor recovery = ses.outRecoveryDescriptor(); @@ -935,7 +945,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati } } - metricsLsnr.onMessageReceived(msg, connKey.nodeId()); + metricsLsnr.onMessageReceived(msg, consistentId); IgniteRunnable c; @@ -1393,7 +1403,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati @MetricManagerResource private void injectMetricManager(GridMetricManager mmgr) { if (mmgr != null) - metricsLsnr = new TcpCommunicationMetricsListener(mmgr); + metricsLsnr = new TcpCommunicationMetricsListener(mmgr, ignite); } /** @@ -2767,12 +2777,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati } /** + * @param consistentId Consistent id of the node. * @param nodeId Left node ID. */ - void onNodeLeft(UUID nodeId) { + void onNodeLeft(Object consistentId, UUID nodeId) { assert nodeId != null; - metricsLsnr.onNodeLeft(nodeId); + metricsLsnr.onNodeLeft(consistentId); GridCommunicationClient[] clients0 = clients.remove(nodeId); @@ -3623,6 +3634,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati recoveryDesc.onHandshake(rcvCnt); + meta.put(CONSISTENT_ID_META, node.consistentId()); meta.put(CONN_IDX_META, connKey); meta.put(GridNioServer.RECOVERY_DESC_META_KEY, recoveryDesc); @@ -4492,7 +4504,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati assert evt instanceof DiscoveryEvent : evt; assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED; - onNodeLeft(((DiscoveryEvent)evt).eventNode().id()); + ClusterNode node = ((DiscoveryEvent)evt).eventNode(); + + onNodeLeft(node.consistentId(), node.id()); } /** {@inheritDoc} */ @@ -4698,7 +4712,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati GridNioRecoveryDescriptor recovery = null; if (!usePairedConnections(node) && client instanceof GridTcpNioCommunicationClient) { - recovery = recoveryDescs.get(new ConnectionKey(node.id(), client.connectionIndex(), -1)); + recovery = recoveryDescs.get(new ConnectionKey( + node.id(), client.connectionIndex(), -1) + ); if (recovery != null && recovery.lastAcknowledged() != recovery.received()) { RecoveryLastReceivedMessage msg = new RecoveryLastReceivedMessage(recovery.received()); @@ -4724,7 +4740,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati if (idleTime >= idleConnTimeout) { if (recovery == null && usePairedConnections(node)) - recovery = outRecDescs.get(new ConnectionKey(node.id(), client.connectionIndex(), -1)); + recovery = outRecDescs.get(new ConnectionKey( + node.id(), client.connectionIndex(), -1) + ); if (recovery != null && recovery.nodeAlive(getSpiContext().node(nodeId)) && diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java index 46fdb0b..9caff6a 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java @@ -142,14 +142,14 @@ public class TcpCommunicationConnectionCheckFuture extends GridFutureAdapter<Bit if (addrs.size() == 1) { SingleAddressConnectFuture fut = new SingleAddressConnectFuture(i); - fut.init(addrs.iterator().next(), node.id()); + fut.init(addrs.iterator().next(), node.consistentId(), node.id()); futs[i] = fut; } else { MultipleAddressesConnectFuture fut = new MultipleAddressesConnectFuture(i); - fut.init(addrs, node.id()); + fut.init(addrs, node.consistentId(), node.id()); futs[i] = fut; } @@ -292,9 +292,10 @@ public class TcpCommunicationConnectionCheckFuture extends GridFutureAdapter<Bit /** * @param addr Node address. + * @param consistentId Consistent if of the node. * @param rmtNodeId Id of node to open connection check session with. */ - public void init(InetSocketAddress addr, UUID rmtNodeId) { + public void init(InetSocketAddress addr, Object consistentId, UUID rmtNodeId) { boolean connect; try { @@ -317,7 +318,10 @@ public class TcpCommunicationConnectionCheckFuture extends GridFutureAdapter<Bit sesMeta = new GridLeanMap<>(3); // Set dummy key to identify connection-check outgoing connection. - sesMeta.put(TcpCommunicationSpi.CONN_IDX_META, new ConnectionKey(rmtNodeId, -1, -1, true)); + ConnectionKey connKey = new ConnectionKey(rmtNodeId, -1, -1, true); + + sesMeta.put(TcpCommunicationSpi.CONN_IDX_META, connKey); + sesMeta.put(TcpCommunicationSpi.CONSISTENT_ID_META, consistentId); sesMeta.put(SES_FUT_META, this); nioSrvr.createSession(ch, sesMeta, true, new IgniteInClosure<IgniteInternalFuture<GridNioSession>>() { @@ -422,9 +426,10 @@ public class TcpCommunicationConnectionCheckFuture extends GridFutureAdapter<Bit /** * @param addrs Node addresses. + * @param consistentId Consistent if of the node. * @param rmtNodeId Id of node to open connection check session with. */ - void init(Collection<InetSocketAddress> addrs, UUID rmtNodeId) { + void init(Collection<InetSocketAddress> addrs, Object consistentId, UUID rmtNodeId) { SingleAddressConnectFuture[] futs = new SingleAddressConnectFuture[addrs.size()]; for (int i = 0; i < addrs.size(); i++) { @@ -442,7 +447,7 @@ public class TcpCommunicationConnectionCheckFuture extends GridFutureAdapter<Bit int idx = 0; for (InetSocketAddress addr : addrs) { - futs[idx++].init(addr, rmtNodeId); + futs[idx++].init(addr, consistentId, rmtNodeId); if (resCnt == Integer.MAX_VALUE) return; diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java index cfde86b..a53b43b 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java @@ -349,7 +349,7 @@ public class GridTcpCommunicationSpiMultithreadedSelfTest extends GridSpiAbstrac while (run.get() && !Thread.currentThread().isInterrupted()) { U.sleep(interval * 3 / 2); - ((TcpCommunicationSpi)spis.get(from.id())).onNodeLeft(to.id()); + ((TcpCommunicationSpi)spis.get(from.id())).onNodeLeft(to.consistentId(), to.id()); } } catch (IgniteInterruptedCheckedException ignored) { diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationStatisticsTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationStatisticsTest.java index 399b8a3..99840c8 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationStatisticsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationStatisticsTest.java @@ -49,8 +49,8 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.junit.Test; import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.COMMUNICATION_METRICS_GROUP_NAME; -import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.RECEIVED_MESSAGES_BY_NODE_ID_METRIC_NAME; -import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.SENT_MESSAGES_BY_NODE_ID_METRIC_NAME; +import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME; +import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME; /** * Test for TcpCommunicationSpi statistics. @@ -141,17 +141,17 @@ public class TcpCommunicationStatisticsTest extends GridCommonAbstractTest { startGrids(2); try { - UUID node0Id = grid(0).localNode().id(); - UUID node1Id = grid(1).localNode().id(); + Object node0consistentId = grid(0).localNode().consistentId(); + Object node1consistentId = grid(1).localNode().consistentId(); String node0regName = MetricUtils.metricName( COMMUNICATION_METRICS_GROUP_NAME, - node0Id.toString() + node0consistentId.toString() ); String node1regName = MetricUtils.metricName( COMMUNICATION_METRICS_GROUP_NAME, - node1Id.toString() + node1consistentId.toString() ); // Send custom message from node0 to node1. @@ -205,11 +205,11 @@ public class TcpCommunicationStatisticsTest extends GridCommonAbstractTest { MetricRegistry mreg0 = grid(0).context().metric().registry(node1regName); MetricRegistry mreg1 = grid(1).context().metric().registry(node0regName); - LongAdderMetric sentMetric = mreg0.findMetric(SENT_MESSAGES_BY_NODE_ID_METRIC_NAME); + LongAdderMetric sentMetric = mreg0.findMetric(SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME); assertNotNull(sentMetric); assertEquals(mbean0.getSentMessagesCount(), sentMetric.value()); - LongAdderMetric rcvMetric = mreg1.findMetric(RECEIVED_MESSAGES_BY_NODE_ID_METRIC_NAME); + LongAdderMetric rcvMetric = mreg1.findMetric(RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME); assertNotNull(rcvMetric); assertEquals(mbean1.getReceivedMessagesCount(), rcvMetric.value()); @@ -217,7 +217,7 @@ public class TcpCommunicationStatisticsTest extends GridCommonAbstractTest { mreg0 = grid(0).context().metric().registry(node1regName); - sentMetric = mreg0.findMetric(SENT_MESSAGES_BY_NODE_ID_METRIC_NAME); + sentMetric = mreg0.findMetric(SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME); assertNotNull(sentMetric); // Automatically generated by MetricRegistryCreationListener. assertEquals(0, sentMetric.value()); }