This is an automated email from the ASF dual-hosted git repository.

nizhikov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new eec735a  IGNITE-12576: nodeId replaced with consistentId in 
TcpCommunicationMetricsListener. (#7310)
eec735a is described below

commit eec735a4ea39439d57a1da89b5cc17bfbff2a74c
Author: Ivan Bessonov <bessonov...@gmail.com>
AuthorDate: Thu Jan 30 11:24:12 2020 +0300

    IGNITE-12576: nodeId replaced with consistentId in 
TcpCommunicationMetricsListener. (#7310)
---
 .../tcp/TcpCommunicationMetricsListener.java       | 111 ++++++++++++---------
 .../spi/communication/tcp/TcpCommunicationSpi.java |  58 +++++++----
 .../TcpCommunicationConnectionCheckFuture.java     |  17 ++--
 ...idTcpCommunicationSpiMultithreadedSelfTest.java |   2 +-
 .../tcp/TcpCommunicationStatisticsTest.java        |  18 ++--
 5 files changed, 122 insertions(+), 84 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationMetricsListener.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationMetricsListener.java
index 504cbb4d..d89fe2c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationMetricsListener.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationMetricsListener.java
@@ -24,6 +24,8 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Function;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.managers.communication.GridIoMessage;
 import org.apache.ignite.internal.processors.metric.GridMetricManager;
 import org.apache.ignite.internal.processors.metric.MetricRegistry;
@@ -33,6 +35,7 @@ import org.apache.ignite.spi.metric.LongMetric;
 import org.apache.ignite.spi.metric.Metric;
 import org.apache.ignite.spi.metric.ReadOnlyMetricRegistry;
 
+import static java.util.stream.Collectors.toMap;
 import static 
org.apache.ignite.internal.processors.metric.impl.MetricUtils.SEPARATOR;
 import static 
org.apache.ignite.internal.processors.metric.impl.MetricUtils.metricName;
 import static 
org.apache.ignite.internal.util.nio.GridNioServer.RECEIVED_BYTES_METRIC_DESC;
@@ -40,14 +43,14 @@ import static 
org.apache.ignite.internal.util.nio.GridNioServer.RECEIVED_BYTES_M
 import static 
org.apache.ignite.internal.util.nio.GridNioServer.SENT_BYTES_METRIC_DESC;
 import static 
org.apache.ignite.internal.util.nio.GridNioServer.SENT_BYTES_METRIC_NAME;
 import static 
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.COMMUNICATION_METRICS_GROUP_NAME;
-import static 
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.RECEIVED_MESSAGES_BY_NODE_ID_METRIC_DESC;
-import static 
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.RECEIVED_MESSAGES_BY_NODE_ID_METRIC_NAME;
+import static 
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_DESC;
+import static 
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME;
 import static 
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.RECEIVED_MESSAGES_BY_TYPE_METRIC_DESC;
 import static 
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.RECEIVED_MESSAGES_BY_TYPE_METRIC_NAME;
 import static 
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.RECEIVED_MESSAGES_METRIC_DESC;
 import static 
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.RECEIVED_MESSAGES_METRIC_NAME;
-import static 
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.SENT_MESSAGES_BY_NODE_ID_METRIC_DESC;
-import static 
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.SENT_MESSAGES_BY_NODE_ID_METRIC_NAME;
+import static 
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_DESC;
+import static 
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME;
 import static 
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.SENT_MESSAGES_BY_TYPE_METRIC_DESC;
 import static 
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.SENT_MESSAGES_BY_TYPE_METRIC_NAME;
 import static 
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.SENT_MESSAGES_METRIC_DESC;
@@ -63,6 +66,9 @@ class TcpCommunicationMetricsListener {
     /** Metrics registry. */
     private final org.apache.ignite.internal.processors.metric.MetricRegistry 
mreg;
 
+    /** Current ignite instance. */
+    private final Ignite ignite;
+
     /** All registered metrics. */
     private final Set<ThreadMetrics> allMetrics = 
Collections.newSetFromMap(new ConcurrentHashMap<>());
 
@@ -81,11 +87,11 @@ class TcpCommunicationMetricsListener {
     /** Function to be used in {@link Map#computeIfAbsent(Object, Function)} 
of {@code rcvdMsgsMetricsByType}. */
     private final Function<Short, LongAdderMetric> 
rcvdMsgsCntByTypeMetricFactory;
 
-    /** Function to be used in {@link Map#computeIfAbsent(Object, Function)} 
of {@code sentMsgsMetricsByNodeId}. */
-    private final Function<UUID, LongAdderMetric> 
sentMsgsCntByNodeIdMetricFactory;
+    /** Function to be used in {@link Map#computeIfAbsent(Object, Function)} 
of {@code #sentMsgsMetricsByConsistentId}. */
+    private final Function<Object, LongAdderMetric> 
sentMsgsCntByConsistentIdMetricFactory;
 
-    /** Function to be used in {@link Map#computeIfAbsent(Object, Function)} 
of {@code rcvdMsgsMetricsByNodeId}. */
-    private final Function<UUID, LongAdderMetric> 
rcvdMsgsCntByNodeIdMetricFactory;
+    /** Function to be used in {@link Map#computeIfAbsent(Object, Function)} 
of {@code #rcvdMsgsMetricsByConsistentId}. */
+    private final Function<Object, LongAdderMetric> 
rcvdMsgsCntByConsistentIdMetricFactory;
 
     /** Sent bytes count metric.*/
     private final LongAdderMetric sentBytesMetric;
@@ -106,8 +112,9 @@ class TcpCommunicationMetricsListener {
     private volatile Map<Short, String> msgTypMap;
 
     /** */
-    public TcpCommunicationMetricsListener(GridMetricManager mmgr) {
+    public TcpCommunicationMetricsListener(GridMetricManager mmgr, Ignite 
ignite) {
         this.mmgr = mmgr;
+        this.ignite = ignite;
 
         mreg = mmgr.registry(COMMUNICATION_METRICS_GROUP_NAME);
 
@@ -120,13 +127,13 @@ class TcpCommunicationMetricsListener {
             RECEIVED_MESSAGES_BY_TYPE_METRIC_DESC
         );
 
-        sentMsgsCntByNodeIdMetricFactory = nodeId ->
-            mmgr.registry(metricName(COMMUNICATION_METRICS_GROUP_NAME, 
nodeId.toString()))
-                .findMetric(SENT_MESSAGES_BY_NODE_ID_METRIC_NAME);
+        sentMsgsCntByConsistentIdMetricFactory = consistentId ->
+            mmgr.registry(metricName(COMMUNICATION_METRICS_GROUP_NAME, 
consistentId.toString()))
+                .findMetric(SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME);
 
-        rcvdMsgsCntByNodeIdMetricFactory = nodeId ->
-            mmgr.registry(metricName(COMMUNICATION_METRICS_GROUP_NAME, 
nodeId.toString()))
-                .findMetric(RECEIVED_MESSAGES_BY_NODE_ID_METRIC_NAME);
+        rcvdMsgsCntByConsistentIdMetricFactory = consistentId ->
+            mmgr.registry(metricName(COMMUNICATION_METRICS_GROUP_NAME, 
consistentId.toString()))
+                
.findMetric(RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME);
 
         sentBytesMetric = mreg.longAdderMetric(SENT_BYTES_METRIC_NAME, 
SENT_BYTES_METRIC_DESC);
         rcvdBytesMetric = mreg.longAdderMetric(RECEIVED_BYTES_METRIC_NAME, 
RECEIVED_BYTES_METRIC_DESC);
@@ -139,9 +146,9 @@ class TcpCommunicationMetricsListener {
             if (!mreg.name().startsWith(COMMUNICATION_METRICS_GROUP_NAME + 
SEPARATOR))
                 return;
 
-            
((MetricRegistry)mreg).longAdderMetric(SENT_MESSAGES_BY_NODE_ID_METRIC_NAME, 
SENT_MESSAGES_BY_NODE_ID_METRIC_DESC);
+            
((MetricRegistry)mreg).longAdderMetric(SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME,
 SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_DESC);
 
-            
((MetricRegistry)mreg).longAdderMetric(RECEIVED_MESSAGES_BY_NODE_ID_METRIC_NAME,
 RECEIVED_MESSAGES_BY_NODE_ID_METRIC_DESC);
+            
((MetricRegistry)mreg).longAdderMetric(RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME,
 RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_DESC);
         });
     }
 
@@ -154,11 +161,11 @@ class TcpCommunicationMetricsListener {
      * Collects statistics for message sent by SPI.
      *
      * @param msg Sent message.
-     * @param nodeId Receiver node id.
+     * @param consistentId Receiver node consistent id.
      */
-    public void onMessageSent(Message msg, UUID nodeId) {
+    public void onMessageSent(Message msg, Object consistentId) {
         assert msg != null;
-        assert nodeId != null;
+        assert consistentId != null;
 
         if (msg instanceof GridIoMessage) {
             msg = ((GridIoMessage) msg).message();
@@ -167,7 +174,7 @@ class TcpCommunicationMetricsListener {
 
             sentMsgsMetric.increment();
 
-            threadMetrics.get().onMessageSent(msg, nodeId);
+            threadMetrics.get().onMessageSent(msg, consistentId);
         }
     }
 
@@ -175,11 +182,11 @@ class TcpCommunicationMetricsListener {
      * Collects statistics for message received by SPI.
      *
      * @param msg Received message.
-     * @param nodeId Sender node id.
+     * @param consistentId Sender node consistent id.
      */
-    public void onMessageReceived(Message msg, UUID nodeId) {
+    public void onMessageReceived(Message msg, Object consistentId) {
         assert msg != null;
-        assert nodeId != null;
+        assert consistentId != null;
 
         if (msg instanceof GridIoMessage) {
             msg = ((GridIoMessage) msg).message();
@@ -188,7 +195,7 @@ class TcpCommunicationMetricsListener {
 
             rcvdMsgsMetric.increment();
 
-            threadMetrics.get().onMessageReceived(msg, nodeId);
+            threadMetrics.get().onMessageReceived(msg, consistentId);
         }
     }
 
@@ -247,7 +254,7 @@ class TcpCommunicationMetricsListener {
      * @return Map containing sender nodes and respective counts.
      */
     public Map<UUID, Long> receivedMessagesByNode() {
-        return 
collectMessagesCountByNodeId(RECEIVED_MESSAGES_BY_NODE_ID_METRIC_NAME);
+        return 
collectMessagesCountByNodeId(RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME);
     }
 
     /**
@@ -265,7 +272,7 @@ class TcpCommunicationMetricsListener {
      * @return Map containing receiver nodes and respective counts.
      */
     public Map<UUID, Long> sentMessagesByNode() {
-        return 
collectMessagesCountByNodeId(SENT_MESSAGES_BY_NODE_ID_METRIC_NAME);
+        return 
collectMessagesCountByNodeId(SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME);
     }
 
     /** */
@@ -296,13 +303,20 @@ class TcpCommunicationMetricsListener {
     protected Map<UUID, Long> collectMessagesCountByNodeId(String metricName) {
         Map<UUID, Long> res = new HashMap<>();
 
+        Map<String, UUID> nodesMapping = 
ignite.cluster().nodes().stream().collect(toMap(
+            node -> node.consistentId().toString(), ClusterNode::id
+        ));
+
         String mregPrefix = COMMUNICATION_METRICS_GROUP_NAME + SEPARATOR;
 
         for (ReadOnlyMetricRegistry mreg : mmgr) {
             if (mreg.name().startsWith(mregPrefix)) {
-                String nodeIdStr = mreg.name().substring(mregPrefix.length());
+                String nodeConsIdStr = 
mreg.name().substring(mregPrefix.length());
+
+                UUID nodeId = nodesMapping.get(nodeConsIdStr);
 
-                UUID nodeId = UUID.fromString(nodeIdStr);
+                if (nodeId == null)
+                    continue;
 
                 res.put(nodeId, 
mreg.<LongMetric>findMetric(metricName).value());
             }
@@ -330,23 +344,26 @@ class TcpCommunicationMetricsListener {
 
         for (ReadOnlyMetricRegistry mreg : mmgr) {
             if (mreg.name().startsWith(COMMUNICATION_METRICS_GROUP_NAME + 
SEPARATOR)) {
-                mreg.findMetric(SENT_MESSAGES_BY_NODE_ID_METRIC_NAME).reset();
+                
mreg.findMetric(SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME).reset();
 
-                
mreg.findMetric(RECEIVED_MESSAGES_BY_NODE_ID_METRIC_NAME).reset();
+                
mreg.findMetric(RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME).reset();
             }
         }
     }
 
     /**
-     * @param nodeId Left node id.
+     * @param consistentId Consistent id of the node.
      */
-    public void onNodeLeft(UUID nodeId) {
+    public void onNodeLeft(Object consistentId) {
+        // Tricky part - these maps are not thread-safe. Ideally it's only 
required to delete one entry from each one
+        // of them, but this would lead to syncs in communication worker 
threads. Instead, we just "clean" them so they
+        // will be filled later lazily with the same data.
         for (ThreadMetrics threadMetrics : allMetrics) {
-            threadMetrics.rcvdMsgsMetricsByNodeId = new HashMap<>();
-            threadMetrics.sentMsgsMetricsByNodeId = new HashMap<>();
+            threadMetrics.sentMsgsMetricsByConsistentId = new HashMap<>();
+            threadMetrics.rcvdMsgsMetricsByConsistentId = new HashMap<>();
         }
 
-        mmgr.remove(metricName(COMMUNICATION_METRICS_GROUP_NAME, 
nodeId.toString()));
+        mmgr.remove(metricName(COMMUNICATION_METRICS_GROUP_NAME, 
consistentId.toString()));
     }
 
     /**
@@ -402,38 +419,36 @@ class TcpCommunicationMetricsListener {
         private final Map<Short, LongAdderMetric> rcvdMsgsMetricsByType = new 
HashMap<>();
 
         /**
-         * Sent messages count metrics grouped by message node id.
+         * Sent messages count metrics grouped by message node consistent id.
          */
-        public volatile Map<UUID, LongAdderMetric> sentMsgsMetricsByNodeId = 
new HashMap<>();
+        public volatile Map<Object, LongAdderMetric> 
sentMsgsMetricsByConsistentId = new HashMap<>();
 
         /**
-         * Received messages metrics count grouped by message node id.
+         * Received messages metrics count grouped by message node consistent 
id.
          */
-        public volatile Map<UUID, LongAdderMetric> rcvdMsgsMetricsByNodeId = 
new HashMap<>();
+        public volatile Map<Object, LongAdderMetric> 
rcvdMsgsMetricsByConsistentId = new HashMap<>();
 
 
         /**
          * Collects statistics for message sent by SPI.
-         *
          * @param msg Sent message.
-         * @param nodeId Receiver node id.
+         * @param consistentId Receiver node consistent id.
          */
-        private void onMessageSent(Message msg, UUID nodeId) {
+        private void onMessageSent(Message msg, Object consistentId) {
             sentMsgsMetricsByType.computeIfAbsent(msg.directType(), 
sentMsgsCntByTypeMetricFactory).increment();
 
-            sentMsgsMetricsByNodeId.computeIfAbsent(nodeId, 
sentMsgsCntByNodeIdMetricFactory).increment();
+            sentMsgsMetricsByConsistentId.computeIfAbsent(consistentId, 
sentMsgsCntByConsistentIdMetricFactory).increment();
         }
 
         /**
          * Collects statistics for message received by SPI.
-         *
          * @param msg Received message.
-         * @param nodeId Sender node id.
+         * @param consistentId Sender node consistent id.
          */
-        private void onMessageReceived(Message msg, UUID nodeId) {
+        private void onMessageReceived(Message msg, Object consistentId) {
             rcvdMsgsMetricsByType.computeIfAbsent(msg.directType(), 
rcvdMsgsCntByTypeMetricFactory).increment();
 
-            rcvdMsgsMetricsByNodeId.computeIfAbsent(nodeId, 
rcvdMsgsCntByNodeIdMetricFactory).increment();
+            rcvdMsgsMetricsByConsistentId.computeIfAbsent(consistentId, 
rcvdMsgsCntByConsistentIdMetricFactory).increment();
         }
     }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 1f51879..ce324ca 100755
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -357,6 +357,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter 
implements Communicati
     /** Connection index meta for session. */
     public static final int CONN_IDX_META = 
GridNioSessionMetaKey.nextUniqueKey();
 
+    /** Node consistent id meta for session. */
+    public static final int CONSISTENT_ID_META = 
GridNioSessionMetaKey.nextUniqueKey();
+
     /** Message tracker meta for session. */
     private static final int TRACKER_META = 
GridNioSessionMetaKey.nextUniqueKey();
 
@@ -421,25 +424,29 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter 
implements Communicati
     public static final String SENT_MESSAGES_BY_TYPE_METRIC_NAME = 
"sentMessagesByType";
 
     /** */
-    public static final String SENT_MESSAGES_BY_TYPE_METRIC_DESC = "Total 
number of messages with given type sent by current node";
+    public static final String SENT_MESSAGES_BY_TYPE_METRIC_DESC =
+        "Total number of messages with given type sent by current node";
 
     /** */
     public static final String RECEIVED_MESSAGES_BY_TYPE_METRIC_NAME = 
"receivedMessagesByType";
 
     /** */
-    public static final String RECEIVED_MESSAGES_BY_TYPE_METRIC_DESC = "Total 
number of messages with given type received by current node";
+    public static final String RECEIVED_MESSAGES_BY_TYPE_METRIC_DESC =
+        "Total number of messages with given type received by current node";
 
     /** */
-    public static final String SENT_MESSAGES_BY_NODE_ID_METRIC_NAME = 
"sentMessagesToNode";
+    public static final String SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME 
= "sentMessagesToNode";
 
     /** */
-    public static final String SENT_MESSAGES_BY_NODE_ID_METRIC_DESC = "Total 
number of messages sent by current node to the given node";
+    public static final String SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_DESC 
=
+        "Total number of messages sent by current node to the given node";
 
     /** */
-    public static final String RECEIVED_MESSAGES_BY_NODE_ID_METRIC_NAME = 
"receivedMessagesFromNode";
+    public static final String 
RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME = 
"receivedMessagesFromNode";
 
     /** */
-    public static final String RECEIVED_MESSAGES_BY_NODE_ID_METRIC_DESC = 
"Total number of messages received by current node from the given node";
+    public static final String 
RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_DESC =
+        "Total number of messages received by current node from the given 
node";
 
     /** */
     private ConnectGateway connectGate;
@@ -623,6 +630,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter 
implements Communicati
                     return;
                 }
 
+                ses.addMeta(CONSISTENT_ID_META, rmtNode.consistentId());
+
                 final ConnectionKey old = ses.addMeta(CONN_IDX_META, connKey);
 
                 assert old == null;
@@ -786,13 +795,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter 
implements Communicati
             }
 
             @Override public void onMessageSent(GridNioSession ses, Message 
msg) {
-                ConnectionKey connKey = ses.meta(CONN_IDX_META);
+                Object consistentId = ses.meta(CONSISTENT_ID_META);
 
-                if (connKey != null) {
-                    UUID nodeId = connKey.nodeId();
-
-                    metricsLsnr.onMessageSent(msg, nodeId);
-                }
+                if (consistentId != null)
+                    metricsLsnr.onMessageSent(msg, consistentId);
             }
 
             private void onChannelCreate(
@@ -860,6 +866,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter 
implements Communicati
                     }
                 }
                 else {
+                    Object consistentId = ses.meta(CONSISTENT_ID_META);
+
+                    assert consistentId != null;
+
                     if (isChannelConnIdx(connKey.connectionIndex())) {
                         if (ses.meta(CHANNEL_FUT_META) == null)
                             onChannelCreate((GridSelectorNioSessionImpl)ses, 
connKey, msg);
@@ -884,7 +894,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter 
implements Communicati
                     }
 
                     if (msg instanceof RecoveryLastReceivedMessage) {
-                        metricsLsnr.onMessageReceived(msg, connKey.nodeId());
+                        metricsLsnr.onMessageReceived(msg, consistentId);
 
                         GridNioRecoveryDescriptor recovery = 
ses.outRecoveryDescriptor();
 
@@ -935,7 +945,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter 
implements Communicati
                         }
                     }
 
-                    metricsLsnr.onMessageReceived(msg, connKey.nodeId());
+                    metricsLsnr.onMessageReceived(msg, consistentId);
 
                     IgniteRunnable c;
 
@@ -1393,7 +1403,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter 
implements Communicati
     @MetricManagerResource
     private void injectMetricManager(GridMetricManager mmgr) {
         if (mmgr != null)
-            metricsLsnr = new TcpCommunicationMetricsListener(mmgr);
+            metricsLsnr = new TcpCommunicationMetricsListener(mmgr, ignite);
     }
 
     /**
@@ -2767,12 +2777,13 @@ public class TcpCommunicationSpi extends 
IgniteSpiAdapter implements Communicati
     }
 
     /**
+     * @param consistentId Consistent id of the node.
      * @param nodeId Left node ID.
      */
-    void onNodeLeft(UUID nodeId) {
+    void onNodeLeft(Object consistentId, UUID nodeId) {
         assert nodeId != null;
 
-        metricsLsnr.onNodeLeft(nodeId);
+        metricsLsnr.onNodeLeft(consistentId);
 
         GridCommunicationClient[] clients0 = clients.remove(nodeId);
 
@@ -3623,6 +3634,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter 
implements Communicati
 
                         recoveryDesc.onHandshake(rcvCnt);
 
+                        meta.put(CONSISTENT_ID_META, node.consistentId());
                         meta.put(CONN_IDX_META, connKey);
                         meta.put(GridNioServer.RECOVERY_DESC_META_KEY, 
recoveryDesc);
 
@@ -4492,7 +4504,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter 
implements Communicati
             assert evt instanceof DiscoveryEvent : evt;
             assert evt.type() == EVT_NODE_LEFT || evt.type() == 
EVT_NODE_FAILED;
 
-            onNodeLeft(((DiscoveryEvent)evt).eventNode().id());
+            ClusterNode node = ((DiscoveryEvent)evt).eventNode();
+
+            onNodeLeft(node.consistentId(), node.id());
         }
 
         /** {@inheritDoc} */
@@ -4698,7 +4712,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter 
implements Communicati
                     GridNioRecoveryDescriptor recovery = null;
 
                     if (!usePairedConnections(node) && client instanceof 
GridTcpNioCommunicationClient) {
-                        recovery = recoveryDescs.get(new 
ConnectionKey(node.id(), client.connectionIndex(), -1));
+                        recovery = recoveryDescs.get(new ConnectionKey(
+                            node.id(), client.connectionIndex(), -1)
+                        );
 
                         if (recovery != null && recovery.lastAcknowledged() != 
recovery.received()) {
                             RecoveryLastReceivedMessage msg = new 
RecoveryLastReceivedMessage(recovery.received());
@@ -4724,7 +4740,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter 
implements Communicati
 
                     if (idleTime >= idleConnTimeout) {
                         if (recovery == null && usePairedConnections(node))
-                            recovery = outRecDescs.get(new 
ConnectionKey(node.id(), client.connectionIndex(), -1));
+                            recovery = outRecDescs.get(new ConnectionKey(
+                                node.id(), client.connectionIndex(), -1)
+                            );
 
                         if (recovery != null &&
                             recovery.nodeAlive(getSpiContext().node(nodeId)) &&
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java
index 46fdb0b..9caff6a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java
@@ -142,14 +142,14 @@ public class TcpCommunicationConnectionCheckFuture 
extends GridFutureAdapter<Bit
                 if (addrs.size() == 1) {
                     SingleAddressConnectFuture fut = new 
SingleAddressConnectFuture(i);
 
-                    fut.init(addrs.iterator().next(), node.id());
+                    fut.init(addrs.iterator().next(), node.consistentId(), 
node.id());
 
                     futs[i] = fut;
                 }
                 else {
                     MultipleAddressesConnectFuture fut = new 
MultipleAddressesConnectFuture(i);
 
-                    fut.init(addrs, node.id());
+                    fut.init(addrs, node.consistentId(), node.id());
 
                     futs[i] = fut;
                 }
@@ -292,9 +292,10 @@ public class TcpCommunicationConnectionCheckFuture extends 
GridFutureAdapter<Bit
 
         /**
          * @param addr Node address.
+         * @param consistentId Consistent if of the node.
          * @param rmtNodeId Id of node to open connection check session with.
          */
-        public void init(InetSocketAddress addr, UUID rmtNodeId) {
+        public void init(InetSocketAddress addr, Object consistentId, UUID 
rmtNodeId) {
             boolean connect;
 
             try {
@@ -317,7 +318,10 @@ public class TcpCommunicationConnectionCheckFuture extends 
GridFutureAdapter<Bit
                 sesMeta = new GridLeanMap<>(3);
 
                 // Set dummy key to identify connection-check outgoing 
connection.
-                sesMeta.put(TcpCommunicationSpi.CONN_IDX_META, new 
ConnectionKey(rmtNodeId, -1, -1, true));
+                ConnectionKey connKey = new ConnectionKey(rmtNodeId, -1, -1, 
true);
+
+                sesMeta.put(TcpCommunicationSpi.CONN_IDX_META, connKey);
+                sesMeta.put(TcpCommunicationSpi.CONSISTENT_ID_META, 
consistentId);
                 sesMeta.put(SES_FUT_META, this);
 
                 nioSrvr.createSession(ch, sesMeta, true, new 
IgniteInClosure<IgniteInternalFuture<GridNioSession>>() {
@@ -422,9 +426,10 @@ public class TcpCommunicationConnectionCheckFuture extends 
GridFutureAdapter<Bit
 
         /**
          * @param addrs Node addresses.
+         * @param consistentId Consistent if of the node.
          * @param rmtNodeId Id of node to open connection check session with.
          */
-        void init(Collection<InetSocketAddress> addrs, UUID rmtNodeId) {
+        void init(Collection<InetSocketAddress> addrs, Object consistentId, 
UUID rmtNodeId) {
             SingleAddressConnectFuture[] futs = new 
SingleAddressConnectFuture[addrs.size()];
 
             for (int i = 0; i < addrs.size(); i++) {
@@ -442,7 +447,7 @@ public class TcpCommunicationConnectionCheckFuture extends 
GridFutureAdapter<Bit
             int idx = 0;
 
             for (InetSocketAddress addr : addrs) {
-                futs[idx++].init(addr, rmtNodeId);
+                futs[idx++].init(addr, consistentId, rmtNodeId);
 
                 if (resCnt == Integer.MAX_VALUE)
                     return;
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
index cfde86b..a53b43b 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
@@ -349,7 +349,7 @@ public class GridTcpCommunicationSpiMultithreadedSelfTest 
extends GridSpiAbstrac
                     while (run.get() && 
!Thread.currentThread().isInterrupted()) {
                         U.sleep(interval * 3 / 2);
 
-                        
((TcpCommunicationSpi)spis.get(from.id())).onNodeLeft(to.id());
+                        
((TcpCommunicationSpi)spis.get(from.id())).onNodeLeft(to.consistentId(), 
to.id());
                     }
                 }
                 catch (IgniteInterruptedCheckedException ignored) {
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationStatisticsTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationStatisticsTest.java
index 399b8a3..99840c8 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationStatisticsTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationStatisticsTest.java
@@ -49,8 +49,8 @@ import 
org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.Test;
 
 import static 
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.COMMUNICATION_METRICS_GROUP_NAME;
-import static 
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.RECEIVED_MESSAGES_BY_NODE_ID_METRIC_NAME;
-import static 
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.SENT_MESSAGES_BY_NODE_ID_METRIC_NAME;
+import static 
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME;
+import static 
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME;
 
 /**
  * Test for TcpCommunicationSpi statistics.
@@ -141,17 +141,17 @@ public class TcpCommunicationStatisticsTest extends 
GridCommonAbstractTest {
         startGrids(2);
 
         try {
-            UUID node0Id = grid(0).localNode().id();
-            UUID node1Id = grid(1).localNode().id();
+            Object node0consistentId = grid(0).localNode().consistentId();
+            Object node1consistentId = grid(1).localNode().consistentId();
 
             String node0regName = MetricUtils.metricName(
                 COMMUNICATION_METRICS_GROUP_NAME,
-                node0Id.toString()
+                node0consistentId.toString()
             );
 
             String node1regName = MetricUtils.metricName(
                 COMMUNICATION_METRICS_GROUP_NAME,
-                node1Id.toString()
+                node1consistentId.toString()
             );
 
             // Send custom message from node0 to node1.
@@ -205,11 +205,11 @@ public class TcpCommunicationStatisticsTest extends 
GridCommonAbstractTest {
                 MetricRegistry mreg0 = 
grid(0).context().metric().registry(node1regName);
                 MetricRegistry mreg1 = 
grid(1).context().metric().registry(node0regName);
 
-                LongAdderMetric sentMetric = 
mreg0.findMetric(SENT_MESSAGES_BY_NODE_ID_METRIC_NAME);
+                LongAdderMetric sentMetric = 
mreg0.findMetric(SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME);
                 assertNotNull(sentMetric);
                 assertEquals(mbean0.getSentMessagesCount(), 
sentMetric.value());
 
-                LongAdderMetric rcvMetric = 
mreg1.findMetric(RECEIVED_MESSAGES_BY_NODE_ID_METRIC_NAME);
+                LongAdderMetric rcvMetric = 
mreg1.findMetric(RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME);
                 assertNotNull(rcvMetric);
                 assertEquals(mbean1.getReceivedMessagesCount(), 
rcvMetric.value());
 
@@ -217,7 +217,7 @@ public class TcpCommunicationStatisticsTest extends 
GridCommonAbstractTest {
 
                 mreg0 = grid(0).context().metric().registry(node1regName);
 
-                sentMetric = 
mreg0.findMetric(SENT_MESSAGES_BY_NODE_ID_METRIC_NAME);
+                sentMetric = 
mreg0.findMetric(SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME);
                 assertNotNull(sentMetric); // Automatically generated by 
MetricRegistryCreationListener.
                 assertEquals(0, sentMetric.value());
             }

Reply via email to