This is an automated email from the ASF dual-hosted git repository.

sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 38188bb3e78 IGNITE-27542 : Use MessageSerializer for 
TcpDiscoveryMetricsUpdateMessage (#12620)
38188bb3e78 is described below

commit 38188bb3e7867c695c58f27f9d00a3af81ba78cc
Author: Vladimir Steshin <[email protected]>
AuthorDate: Wed Jan 21 12:13:01 2026 +0300

    IGNITE-27542 : Use MessageSerializer for TcpDiscoveryMetricsUpdateMessage 
(#12620)
---
 .../discovery/DiscoveryMessageFactory.java         |  13 +
 .../processors/cluster/ClusterNodeMetrics.java     |   6 +-
 .../processors/cluster/NodeFullMetricsMessage.java |  28 +-
 .../ignite/spi/discovery/tcp/ClientImpl.java       |   4 +-
 .../ignite/spi/discovery/tcp/ServerImpl.java       |  46 +--
 .../ignite/spi/discovery/tcp/TcpDiscoveryImpl.java |  48 ++-
 .../spi/discovery/tcp/TcpDiscoveryIoSession.java   | 188 ++++++++++--
 ...e.java => TcpDiscoveryCacheMetricsMessage.java} |  24 +-
 ... => TcpDiscoveryClientNodesMetricsMessage.java} |  35 ++-
 .../messages/TcpDiscoveryMetricsUpdateMessage.java | 322 ++++++++-------------
 ...ava => TcpDiscoveryNodeFullMetricsMessage.java} |  22 +-
 .../messages/TcpDiscoveryNodeMetricsMessage.java   |  12 +-
 .../cache/CacheMetricsCacheSizeTest.java           |  55 +++-
 13 files changed, 451 insertions(+), 352 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
index 5f399ed0c46..a1c3c9d9f76 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
@@ -20,8 +20,10 @@ package org.apache.ignite.internal.managers.discovery;
 import org.apache.ignite.internal.codegen.InetAddressMessageSerializer;
 import org.apache.ignite.internal.codegen.InetSocketAddressMessageSerializer;
 import 
org.apache.ignite.internal.codegen.TcpDiscoveryAuthFailedMessageSerializer;
+import 
org.apache.ignite.internal.codegen.TcpDiscoveryCacheMetricsMessageSerializer;
 import 
org.apache.ignite.internal.codegen.TcpDiscoveryCheckFailedMessageSerializer;
 import 
org.apache.ignite.internal.codegen.TcpDiscoveryClientMetricsUpdateMessageSerializer;
+import 
org.apache.ignite.internal.codegen.TcpDiscoveryClientNodesMetricsMessageSerializer;
 import 
org.apache.ignite.internal.codegen.TcpDiscoveryClientPingRequestSerializer;
 import 
org.apache.ignite.internal.codegen.TcpDiscoveryClientPingResponseSerializer;
 import 
org.apache.ignite.internal.codegen.TcpDiscoveryConnectionCheckMessageSerializer;
@@ -30,6 +32,8 @@ import 
org.apache.ignite.internal.codegen.TcpDiscoveryDuplicateIdMessageSerializ
 import 
org.apache.ignite.internal.codegen.TcpDiscoveryHandshakeRequestSerializer;
 import 
org.apache.ignite.internal.codegen.TcpDiscoveryHandshakeResponseSerializer;
 import 
org.apache.ignite.internal.codegen.TcpDiscoveryLoopbackProblemMessageSerializer;
+import 
org.apache.ignite.internal.codegen.TcpDiscoveryMetricsUpdateMessageSerializer;
+import 
org.apache.ignite.internal.codegen.TcpDiscoveryNodeFullMetricsMessageSerializer;
 import 
org.apache.ignite.internal.codegen.TcpDiscoveryNodeMetricsMessageSerializer;
 import org.apache.ignite.internal.codegen.TcpDiscoveryPingRequestSerializer;
 import org.apache.ignite.internal.codegen.TcpDiscoveryPingResponseSerializer;
@@ -39,8 +43,10 @@ import 
org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
 import org.apache.ignite.spi.discovery.tcp.messages.InetAddressMessage;
 import org.apache.ignite.spi.discovery.tcp.messages.InetSocketAddressMessage;
 import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAuthFailedMessage;
+import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCacheMetricsMessage;
 import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCheckFailedMessage;
 import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientMetricsUpdateMessage;
+import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientNodesMetricsMessage;
 import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingRequest;
 import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingResponse;
 import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryConnectionCheckMessage;
@@ -49,6 +55,8 @@ import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryDuplicateIdMessa
 import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeRequest;
 import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeResponse;
 import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryLoopbackProblemMessage;
+import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryMetricsUpdateMessage;
+import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFullMetricsMessage;
 import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeMetricsMessage;
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingRequest;
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse;
@@ -58,6 +66,10 @@ import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryRingLatencyCheck
 public class DiscoveryMessageFactory implements MessageFactoryProvider {
     /** {@inheritDoc} */
     @Override public void registerAll(MessageFactory factory) {
+        factory.register((short)-105, TcpDiscoveryNodeFullMetricsMessage::new,
+            new TcpDiscoveryNodeFullMetricsMessageSerializer());
+        factory.register((short)-104, 
TcpDiscoveryClientNodesMetricsMessage::new, new 
TcpDiscoveryClientNodesMetricsMessageSerializer());
+        factory.register((short)-103, TcpDiscoveryCacheMetricsMessage::new, 
new TcpDiscoveryCacheMetricsMessageSerializer());
         factory.register((short)-102, TcpDiscoveryNodeMetricsMessage::new, new 
TcpDiscoveryNodeMetricsMessageSerializer());
         factory.register((short)-101, InetSocketAddressMessage::new, new 
InetSocketAddressMessageSerializer());
         factory.register((short)-100, InetAddressMessage::new, new 
InetAddressMessageSerializer());
@@ -76,5 +88,6 @@ public class DiscoveryMessageFactory implements 
MessageFactoryProvider {
         factory.register((short)11, TcpDiscoveryAuthFailedMessage::new, new 
TcpDiscoveryAuthFailedMessageSerializer());
         factory.register((short)12, TcpDiscoveryDuplicateIdMessage::new, new 
TcpDiscoveryDuplicateIdMessageSerializer());
         factory.register((short)13, 
TcpDiscoveryClientMetricsUpdateMessage::new, new 
TcpDiscoveryClientMetricsUpdateMessageSerializer());
+        factory.register((short)14, TcpDiscoveryMetricsUpdateMessage::new, new 
TcpDiscoveryMetricsUpdateMessageSerializer());
     }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterNodeMetrics.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterNodeMetrics.java
index a1ac2dc3c8c..7bba244fc7f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterNodeMetrics.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterNodeMetrics.java
@@ -46,11 +46,11 @@ class ClusterNodeMetrics {
 
     /** */
     public ClusterNodeMetrics(NodeFullMetricsMessage msg) {
-        nodeMetrics = new ClusterMetricsSnapshot(msg.nodeMetricsMsg());
+        nodeMetrics = new ClusterMetricsSnapshot(msg.nodeMetricsMessage());
 
-        cacheMetrics = new HashMap<>(msg.cachesMetrics().size(), 1.0f);
+        cacheMetrics = new HashMap<>(msg.cachesMetricsMessages().size(), 1.0f);
 
-        msg.cachesMetrics().entrySet().forEach(e -> 
cacheMetrics.put(e.getKey(), new CacheMetricsSnapshot(e.getValue())));
+        msg.cachesMetricsMessages().forEach((key, value) -> 
cacheMetrics.put(key, new CacheMetricsSnapshot(value)));
     }
 
     /**
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/NodeFullMetricsMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/NodeFullMetricsMessage.java
index 2974ab93351..0ca3ee6800f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/NodeFullMetricsMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/NodeFullMetricsMessage.java
@@ -17,59 +17,59 @@
 
 package org.apache.ignite.internal.processors.cluster;
 
-import java.util.HashMap;
 import java.util.Map;
 import org.apache.ignite.cache.CacheMetrics;
 import org.apache.ignite.cluster.ClusterMetrics;
 import org.apache.ignite.internal.Order;
 import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.plugin.extensions.communication.Message;
 
 /** Node compound metrics message. */
-public final class NodeFullMetricsMessage implements Message {
+public class NodeFullMetricsMessage implements Message {
     /** */
     public static final short TYPE_CODE = 138;
 
     /** Node metrics wrapper message. */
-    @Order(0)
+    @Order(value = 0, method = "nodeMetricsMessage")
     private NodeMetricsMessage nodeMetricsMsg;
 
     /** Cache metrics wrapper message. */
-    @Order(1)
-    private Map<Integer, CacheMetricsMessage> cachesMetrics;
+    @Order(value = 1, method = "cachesMetricsMessages")
+    private Map<Integer, CacheMetricsMessage> cachesMetricsMsgs;
 
     /** Empty constructor for {@link GridIoMessageFactory}. */
     public NodeFullMetricsMessage() {
-
+        // No-op.
     }
 
     /** */
     public NodeFullMetricsMessage(ClusterMetrics nodeMetrics, Map<Integer, 
CacheMetrics> cacheMetrics) {
         nodeMetricsMsg = new NodeMetricsMessage(nodeMetrics);
 
-        cachesMetrics = new HashMap<>(cacheMetrics.size(), 1.0f);
+        cachesMetricsMsgs = U.newHashMap(cacheMetrics.size());
 
-        cacheMetrics.forEach((key, value) -> cachesMetrics.put(key, new 
CacheMetricsMessage(value)));
+        cacheMetrics.forEach((key, value) -> cachesMetricsMsgs.put(key, new 
CacheMetricsMessage(value)));
     }
 
     /** */
-    public Map<Integer, CacheMetricsMessage> cachesMetrics() {
-        return cachesMetrics;
+    public Map<Integer, CacheMetricsMessage> cachesMetricsMessages() {
+        return cachesMetricsMsgs;
     }
 
     /** */
-    public void cachesMetrics(Map<Integer, CacheMetricsMessage> 
cacheMetricsMsg) {
-        cachesMetrics = cacheMetricsMsg;
+    public void cachesMetricsMessages(Map<Integer, CacheMetricsMessage> 
cacheMetricsMsg) {
+        cachesMetricsMsgs = cacheMetricsMsg;
     }
 
     /** */
-    public NodeMetricsMessage nodeMetricsMsg() {
+    public NodeMetricsMessage nodeMetricsMessage() {
         return nodeMetricsMsg;
     }
 
     /** */
-    public void nodeMetricsMsg(NodeMetricsMessage nodeMetricsMsg) {
+    public void nodeMetricsMessage(NodeMetricsMessage nodeMetricsMsg) {
         this.nodeMetricsMsg = nodeMetricsMsg;
     }
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 5ecbe893fb8..238fdd13eab 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -2526,8 +2526,8 @@ class ClientImpl extends TcpDiscoveryImpl {
                     log.debug("Received metrics response: " + msg);
             }
             else {
-                if (msg.hasMetrics())
-                    processMsgCacheMetrics(msg, System.nanoTime());
+                if (!F.isEmpty(msg.serversFullMetricsMessages()))
+                    processCacheMetricsMessage(msg, System.nanoTime());
             }
         }
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index a8316d554a9..ea8dd5f5c2d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -2220,15 +2220,6 @@ class ServerImpl extends TcpDiscoveryImpl {
             !(msg instanceof TcpDiscoveryConnectionCheckMessage);
     }
 
-    /**
-     * @param msg Message.
-     * @param nodeId Node ID.
-     */
-    private static void removeMetrics(TcpDiscoveryMetricsUpdateMessage msg, 
UUID nodeId) {
-        msg.removeMetrics(nodeId);
-        msg.removeCacheMetrics(nodeId);
-    }
-
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(ServerImpl.class, this);
@@ -3399,10 +3390,11 @@ class ServerImpl extends TcpDiscoveryImpl {
                 for (ClientMessageWorker clientMsgWorker : 
clientMsgWorkers.values()) {
                     if (msgBytes == null) {
                         try {
-                            msgBytes = U.marshal(spi.marshaller(), msg);
+                            msgBytes = 
clientMsgWorker.ses.serializeMessage(msg);
                         }
-                        catch (IgniteCheckedException e) {
-                            U.error(log, "Failed to marshal message: " + msg, 
e);
+                        catch (IgniteCheckedException | IOException e) {
+                            U.error(log, "Failed to serialize message to a 
client: " + msg + ", recepient " +
+                                "client id: " + clientMsgWorker.clientNodeId, 
e);
 
                             break;
                         }
@@ -3418,6 +3410,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                         if (clientMsgWorker.clientNodeId.equals(node.id())) {
                             try {
+                                // TODO: 
https://issues.apache.org/jira/browse/IGNITE-27556 refactor serialization.
                                 msg0 = U.unmarshal(spi.marshaller(), msgBytes,
                                     
U.resolveClassLoader(spi.ignite().configuration()));
 
@@ -6052,30 +6045,32 @@ class ServerImpl extends TcpDiscoveryImpl {
 
             long tsNanos = System.nanoTime();
 
-            if (spiStateCopy() == CONNECTED && msg.hasMetrics())
-                processMsgCacheMetrics(msg, tsNanos);
+            if (spiStateCopy() == CONNECTED && 
!F.isEmpty(msg.serversFullMetricsMessages()))
+                processCacheMetricsMessage(msg, tsNanos);
 
             if (sendMessageToRemotes(msg)) {
                 if (laps == 0 && spiStateCopy() == CONNECTED) {
                     // Message is on its first ring or just created on 
coordinator.
-                    msg.setMetrics(locNodeId, spi.metricsProvider.metrics());
-                    msg.setCacheMetrics(locNodeId, 
spi.metricsProvider.cacheMetrics());
+                    msg.addServerMetrics(locNodeId, 
spi.metricsProvider.metrics());
+                    msg.addServerCacheMetrics(locNodeId, 
spi.metricsProvider.cacheMetrics());
 
                     for (Map.Entry<UUID, ClientMessageWorker> e : 
clientMsgWorkers.entrySet()) {
                         UUID nodeId = e.getKey();
                         ClusterMetrics metrics = e.getValue().metrics();
 
                         if (metrics != null)
-                            msg.setClientMetrics(locNodeId, nodeId, metrics);
+                            msg.addClientMetrics(locNodeId, nodeId, metrics);
 
                         msg.addClientNodeId(nodeId);
                     }
                 }
                 else {
                     // Message is on its second ring.
-                    removeMetrics(msg, locNodeId);
+                    msg.removeServerMetrics(locNodeId);
 
-                    Collection<UUID> clientNodeIds = msg.clientNodeIds();
+                    Collection<UUID> clientNodeIds = 
F.isEmpty(msg.clientNodeIds())
+                        ? Collections.emptySet()
+                        : msg.clientNodeIds();
 
                     for (TcpDiscoveryNode clientNode : ring.clientNodes()) {
                         if (clientNode.visible()) {
@@ -8436,7 +8431,8 @@ class ServerImpl extends TcpDiscoveryImpl {
         private int passedLaps(TcpDiscoveryMetricsUpdateMessage msg) {
             UUID locNodeId = getLocalNodeId();
 
-            boolean hasLocMetrics = hasMetrics(msg, locNodeId);
+            boolean hasLocMetrics = 
!F.isEmpty(msg.serversFullMetricsMessages())
+                && msg.serversFullMetricsMessages().get(locNodeId) != null;
 
             if (locNodeId.equals(msg.creatorNodeId()) && !hasLocMetrics && 
msg.senderNodeId() != null)
                 return 2;
@@ -8445,15 +8441,5 @@ class ServerImpl extends TcpDiscoveryImpl {
             else
                 return 1;
         }
-
-        /**
-         * @param msg Metrics update message to check.
-         * @param nodeId Node ID for which the check should be performed.
-         * @return {@code True} is the message contains metrics of the node 
with the provided ID.
-         * {@code False} otherwise.
-         */
-        private boolean hasMetrics(TcpDiscoveryMetricsUpdateMessage msg, UUID 
nodeId) {
-            return msg.hasMetrics(nodeId) || msg.hasCacheMetrics(nodeId);
-        }
     }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
index f35f4114795..e115a3cca03 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
@@ -35,10 +35,14 @@ import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.CacheMetrics;
 import org.apache.ignite.cluster.ClusterMetrics;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.ClusterMetricsSnapshot;
 import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.CacheMetricsSnapshot;
+import org.apache.ignite.internal.processors.cluster.CacheMetricsMessage;
+import org.apache.ignite.internal.processors.cluster.NodeMetricsMessage;
 import org.apache.ignite.internal.processors.tracing.NoopTracing;
 import org.apache.ignite.internal.processors.tracing.Tracing;
-import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -48,7 +52,9 @@ import org.apache.ignite.spi.IgniteSpiThread;
 import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
 import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
 import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientNodesMetricsMessage;
 import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryMetricsUpdateMessage;
+import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFullMetricsMessage;
 import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryRingLatencyCheckMessage;
 import org.jetbrains.annotations.Nullable;
 
@@ -415,27 +421,45 @@ abstract class TcpDiscoveryImpl {
     }
 
     /** */
-    public void processMsgCacheMetrics(TcpDiscoveryMetricsUpdateMessage msg, 
long tsNanos) {
-        for (Map.Entry<UUID, TcpDiscoveryMetricsUpdateMessage.MetricsSet> e : 
msg.metrics().entrySet()) {
-            UUID nodeId = e.getKey();
+    public void processCacheMetricsMessage(TcpDiscoveryMetricsUpdateMessage 
msg, long tsNanos) {
+        for (Map.Entry<UUID, TcpDiscoveryNodeFullMetricsMessage> e : 
msg.serversFullMetricsMessages().entrySet()) {
+            UUID srvrId = e.getKey();
+            Map<Integer, CacheMetricsMessage> cacheMetricsMsgs = 
e.getValue().cachesMetricsMessages();
+            NodeMetricsMessage srvrMetricsMsg = 
e.getValue().nodeMetricsMessage();
 
-            TcpDiscoveryMetricsUpdateMessage.MetricsSet metricsSet = 
e.getValue();
+            assert srvrMetricsMsg != null;
 
-            Map<Integer, CacheMetrics> cacheMetrics = 
msg.hasCacheMetrics(nodeId) ?
-                msg.cacheMetrics().get(nodeId) : Collections.emptyMap();
+            Map<Integer, CacheMetrics> cacheMetrics;
 
-            if (endTimeMetricsSizeProcessWait <= U.currentTimeMillis()
-                && cacheMetrics.size() >= METRICS_QNT_WARN) {
+            if (!F.isEmpty(cacheMetricsMsgs)) {
+                cacheMetrics = U.newHashMap(cacheMetricsMsgs.size());
+
+                cacheMetricsMsgs.forEach((cacheId, cacheMetricsMsg) ->
+                    cacheMetrics.put(cacheId, new 
CacheMetricsSnapshot(cacheMetricsMsg)));
+            }
+            else
+                cacheMetrics = Collections.emptyMap();
+
+            if (endTimeMetricsSizeProcessWait <= U.currentTimeMillis() && 
cacheMetrics.size() >= METRICS_QNT_WARN) {
                 log.warning("The Discovery message has metrics for " + 
cacheMetrics.size() + " caches.\n" +
                     "To prevent Discovery blocking use 
-DIGNITE_DISCOVERY_DISABLE_CACHE_METRICS_UPDATE=true option.");
 
                 endTimeMetricsSizeProcessWait = U.currentTimeMillis() + 
LOG_WARN_MSG_TIMEOUT;
             }
 
-            updateMetrics(nodeId, metricsSet.metrics(), cacheMetrics, tsNanos);
+            updateMetrics(srvrId, new ClusterMetricsSnapshot(srvrMetricsMsg), 
cacheMetrics, tsNanos);
+
+            TcpDiscoveryClientNodesMetricsMessage clientsMetricsMsg = 
F.isEmpty(msg.connectedClientsMetricsMessages())
+                ? null
+                : msg.connectedClientsMetricsMessages().get(srvrId);
+
+            if (clientsMetricsMsg == null)
+                continue;
+
+            assert clientsMetricsMsg.nodesMetricsMessages() != null;
 
-            for (T2<UUID, ClusterMetrics> t : metricsSet.clientMetrics())
-                updateMetrics(t.get1(), t.get2(), cacheMetrics, tsNanos);
+            clientsMetricsMsg.nodesMetricsMessages().forEach((clientId, 
clientNodeMetricsMsg) ->
+                updateMetrics(clientId, new 
ClusterMetricsSnapshot(clientNodeMetricsMsg), cacheMetrics, tsNanos));
         }
     }
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java
index edcab473b4c..fa8d71e40f1 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java
@@ -19,6 +19,7 @@ package org.apache.ignite.spi.discovery.tcp;
 
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.EOFException;
 import java.io.IOException;
@@ -39,6 +40,7 @@ import org.apache.ignite.marshaller.jdk.JdkMarshaller;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.plugin.extensions.communication.MessageSerializer;
 import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
 import static 
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.makeMessageType;
@@ -87,7 +89,7 @@ public class TcpDiscoveryIoSession {
     private final OutputStream out;
 
     /** Buffered socket input stream. */
-    private final InputStream in;
+    private final CompositeInputStream in;
 
     /** Intermediate buffer for serializing discovery messages. */
     private final ByteBuffer msgBuf;
@@ -115,7 +117,7 @@ public class TcpDiscoveryIoSession {
             int rcvBufSize = sock.getReceiveBufferSize() > 0 ? 
sock.getReceiveBufferSize() : DFLT_SOCK_BUFFER_SIZE;
 
             out = new BufferedOutputStream(sock.getOutputStream(), 
sendBufSize);
-            in = new BufferedInputStream(sock.getInputStream(), rcvBufSize);
+            in = new CompositeInputStream(new 
BufferedInputStream(sock.getInputStream(), rcvBufSize));
         }
         catch (IOException e) {
             throw new IgniteException(e);
@@ -184,47 +186,31 @@ public class TcpDiscoveryIoSession {
 
             boolean finished;
 
-            msgBuf.clear();
-
             do {
+                msgBuf.clear();
+
                 int read = in.read(msgBuf.array(), msgBuf.position(), 
msgBuf.remaining());
 
                 if (read == -1)
                     throw new EOFException("Connection closed before message 
was fully read.");
 
-                if (msgBuf.position() > 0) {
-                    msgBuf.limit(msgBuf.position() + read);
-
-                    // We've stored an unprocessed tail before.
-                    msgBuf.rewind();
-                }
-                else
-                    msgBuf.limit(read);
+                msgBuf.limit(read);
 
                 finished = msgSer.readFrom(msg, msgReader);
 
-                // We rely on the fact that Discovery only sends next message 
upon receiving a receipt for the previous one.
+                // Server Discovery only sends next message to next Server 
upon receiving a receipt for the previous one.
                 // This behaviour guarantees that we never read a next message 
from the buffer right after the end of
-                // the previous message.
-                assert msgBuf.remaining() == 0 || !finished : "Some data was 
read from the socket but left unprocessed.";
-
-                if (finished)
-                    break;
+                // the previous message. But it is not guaranteed with Client 
Discovery where messages aren't acknowledged.
+                // Thus, we have to keep the uprocessed bytes read from the 
socket. It won't return them again.
+                if (msgBuf.hasRemaining()) {
+                    byte[] unprocessedReadTail = new byte[msgBuf.remaining()];
 
-                // We must keep the uprocessed bytes read from the socket. It 
won't return them again.
-                byte[] unprocessedTail = null;
+                    msgBuf.get(unprocessedReadTail, 0, msgBuf.remaining());
 
-                if (msgBuf.remaining() > 0) {
-                    unprocessedTail = new byte[msgBuf.remaining()];
-                    msgBuf.get(unprocessedTail, 0, msgBuf.remaining());
+                    in.attachByteArray(unprocessedReadTail);
                 }
-
-                msgBuf.clear();
-
-                if (unprocessedTail != null)
-                    msgBuf.put(unprocessedTail);
             }
-            while (true);
+            while (!finished);
 
             return (T)msg;
         }
@@ -320,4 +306,148 @@ public class TcpDiscoveryIoSession {
         if (hex.matches("15....00"))
             throw new StreamCorruptedException("invalid stream header: " + 
hex);
     }
+
+    /**
+     * Input stream implementation that combines a byte array and a regular 
InputStream allowing to read bytes
+     * from the array first and then proceed with reading from InputStream.
+     * Supports only basic read methods.
+     */
+    private static class CompositeInputStream extends BufferedInputStream {
+        /** Prefix data input stream to read before the original input stream. 
*/
+        @Nullable private ByteArrayInputStream attachedBytesIs;
+
+        /** @param srcIs Original input stream to read when {@link 
#attachedBytesIs} is empty. */
+        private CompositeInputStream(InputStream srcIs) {
+            super(srcIs);
+        }
+
+        /** @param prefixData Prefix data to read before the original input 
stream. */
+        private void attachByteArray(byte[] prefixData) {
+            assert prefixBytesLeft() == 0;
+
+            attachedBytesIs = new ByteArrayInputStream(prefixData);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int read() throws IOException {
+            if (prefixBytesLeft() > 0) {
+                int res = attachedBytesIs.read();
+
+                checkPrefixBufferExhausted();
+
+                return res;
+            }
+
+            return super.read();
+        }
+
+        /** {@inheritDoc} */
+        @Override public int read(@NotNull byte[] b, int off, int len) throws 
IOException {
+            int len0 = readPrefixBuffer(b, off, len);
+
+            assert len0 <= len;
+
+            if (len0 == len)
+                return len0;
+
+            return len0 + super.read(b, off + len0, len - len0);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int read(@NotNull byte[] b) throws IOException {
+            return read(b, 0, b.length);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int readNBytes(byte[] b, int off, int len) throws 
IOException {
+            int len0 = readPrefixBuffer(b, off, len);
+
+            return super.readNBytes(b, off + len0, len - len0);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int available() throws IOException {
+            // Original input stream may return Integer#MAX_VALUE.
+            if (super.available() > Integer.MAX_VALUE - prefixBytesLeft())
+                return super.available();
+
+            return super.available() + prefixBytesLeft();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void close() throws IOException {
+            if (attachedBytesIs != null) {
+                attachedBytesIs.close();
+
+                attachedBytesIs = null;
+            }
+
+            super.close();
+        }
+
+        /** */
+        private int readPrefixBuffer(byte[] b, int off, int len) {
+            int res = 0;
+
+            int prefixBytesLeft = prefixBytesLeft();
+
+            if (prefixBytesLeft > 0) {
+                if (len > b.length - off)
+                    len = b.length - off;
+
+                res = attachedBytesIs.read(b, off, Math.min(len, 
prefixBytesLeft));
+
+                checkPrefixBufferExhausted();
+            }
+
+            return res;
+        }
+
+        /** */
+        private int prefixBytesLeft() {
+            return attachedBytesIs == null ? 0 : attachedBytesIs.available();
+        }
+
+        /** */
+        private void checkPrefixBufferExhausted() {
+            if (attachedBytesIs != null && attachedBytesIs.available() == 0)
+                attachedBytesIs = null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void mark(int readlimit) {
+            throw new UnsupportedOperationException("mark() is not 
supported.");
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean markSupported() {
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void reset() {
+            throw new UnsupportedOperationException("reset() is not 
supported.");
+        }
+
+        /** {@inheritDoc} */
+        @Override public long skip(long n) {
+            throw new UnsupportedOperationException("skip() is not 
supported.");
+        }
+
+        /** {@inheritDoc} */
+        @Override public long transferTo(OutputStream out) {
+            throw new UnsupportedOperationException("transferTo() is not 
supported.");
+        }
+
+        /** {@inheritDoc} */
+        @Override public @NotNull byte[] readAllBytes() {
+            throw new UnsupportedOperationException("readAllBytes() is not 
supported.");
+        }
+
+        /** {@inheritDoc} */
+        @Override public @NotNull byte[] readNBytes(int len) {
+            throw new UnsupportedOperationException("readNBytes() is not 
supported.");
+        }
+    }
 }
+
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeMetricsMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCacheMetricsMessage.java
similarity index 60%
copy from 
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeMetricsMessage.java
copy to 
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCacheMetricsMessage.java
index 0c12acee04a..3fbfc2faa71 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeMetricsMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCacheMetricsMessage.java
@@ -17,34 +17,34 @@
 
 package org.apache.ignite.spi.discovery.tcp.messages;
 
-import org.apache.ignite.cluster.ClusterMetrics;
+import org.apache.ignite.cache.CacheMetrics;
 import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory;
-import org.apache.ignite.internal.processors.cluster.NodeMetricsMessage;
+import org.apache.ignite.internal.processors.cluster.CacheMetricsMessage;
 import org.apache.ignite.internal.util.typedef.internal.S;
 
 /**
- * We cannot directly reuse `NodeMetricsMessage` in Discovery as it is 
registered in a message factory of Communication
- * component and thus is unavailable in Discovery. We have to extend 
`NodeMetricsMessage` and register this subclass in
- * message factory of Discovery component.
+ * We cannot directly reuse {@link CacheMetricsMessage} in Discovery as it is 
registered in a message factory of
+ * Communication component and thus is unavailable in Discovery. We have to 
extend {@link CacheMetricsMessage} and
+ * register this subclass in message factory of Discovery component.
  */
-public class TcpDiscoveryNodeMetricsMessage extends NodeMetricsMessage {
+public class TcpDiscoveryCacheMetricsMessage extends CacheMetricsMessage {
     /** Constructor for {@link DiscoveryMessageFactory}. */
-    public TcpDiscoveryNodeMetricsMessage() {
+    public TcpDiscoveryCacheMetricsMessage() {
         // No-op.
     }
 
-    /** @param metrics Metrics. */
-    public TcpDiscoveryNodeMetricsMessage(ClusterMetrics metrics) {
-        super(metrics);
+    /** @param cacheMetricsMsg Cache metric message. */
+    public TcpDiscoveryCacheMetricsMessage(CacheMetrics cacheMetricsMsg) {
+        super(cacheMetricsMsg);
     }
 
     /** {@inheritDoc} */
     @Override public short directType() {
-        return -102;
+        return -103;
     }
 
     /** {@inheritDoc} */
     @Override public String toString() {
-        return S.toString(TcpDiscoveryNodeMetricsMessage.class, this, "super", 
super.toString());
+        return S.toString(TcpDiscoveryCacheMetricsMessage.class, this, 
"super", super.toString());
     }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeMetricsMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientNodesMetricsMessage.java
similarity index 53%
copy from 
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeMetricsMessage.java
copy to 
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientNodesMetricsMessage.java
index 0c12acee04a..c09a7698b80 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeMetricsMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientNodesMetricsMessage.java
@@ -17,34 +17,41 @@
 
 package org.apache.ignite.spi.discovery.tcp.messages;
 
-import org.apache.ignite.cluster.ClusterMetrics;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.internal.Order;
 import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory;
-import org.apache.ignite.internal.processors.cluster.NodeMetricsMessage;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
+
+/** Holds map of thick client or server metrics messages per node id. */
+public class TcpDiscoveryClientNodesMetricsMessage implements Message {
+    /** Map of nodes metrics messages per node id. */
+    @Order(value = 0, method = "nodesMetricsMessages")
+    private Map<UUID, TcpDiscoveryNodeMetricsMessage> nodesMetricsMsgs;
 
-/**
- * We cannot directly reuse `NodeMetricsMessage` in Discovery as it is 
registered in a message factory of Communication
- * component and thus is unavailable in Discovery. We have to extend 
`NodeMetricsMessage` and register this subclass in
- * message factory of Discovery component.
- */
-public class TcpDiscoveryNodeMetricsMessage extends NodeMetricsMessage {
     /** Constructor for {@link DiscoveryMessageFactory}. */
-    public TcpDiscoveryNodeMetricsMessage() {
+    public TcpDiscoveryClientNodesMetricsMessage() {
         // No-op.
     }
 
-    /** @param metrics Metrics. */
-    public TcpDiscoveryNodeMetricsMessage(ClusterMetrics metrics) {
-        super(metrics);
+    /** @return Map of nodes metrics messages per node id. */
+    public Map<UUID, TcpDiscoveryNodeMetricsMessage> nodesMetricsMessages() {
+        return nodesMetricsMsgs;
+    }
+
+    /** @param nodesMetricsMsgs Map of nodes metrics messages per node id. */
+    public void nodesMetricsMessages(Map<UUID, TcpDiscoveryNodeMetricsMessage> 
nodesMetricsMsgs) {
+        this.nodesMetricsMsgs = nodesMetricsMsgs;
     }
 
     /** {@inheritDoc} */
     @Override public short directType() {
-        return -102;
+        return -104;
     }
 
     /** {@inheritDoc} */
     @Override public String toString() {
-        return S.toString(TcpDiscoveryNodeMetricsMessage.class, this, "super", 
super.toString());
+        return S.toString(TcpDiscoveryClientNodesMetricsMessage.class, this, 
"super", super.toString());
     }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryMetricsUpdateMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryMetricsUpdateMessage.java
index e6835320fa5..8b2ccfaf31f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryMetricsUpdateMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryMetricsUpdateMessage.java
@@ -17,25 +17,21 @@
 
 package org.apache.ignite.spi.discovery.tcp.messages;
 
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 import org.apache.ignite.cache.CacheMetrics;
 import org.apache.ignite.cluster.ClusterMetrics;
-import org.apache.ignite.internal.ClusterMetricsSnapshot;
+import org.apache.ignite.internal.Order;
+import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory;
+import org.apache.ignite.internal.processors.cluster.CacheMetricsMessage;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
-import org.apache.ignite.internal.util.typedef.C1;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Metrics update message.
@@ -52,20 +48,28 @@ import org.apache.ignite.internal.util.typedef.internal.U;
  * second pass).
  */
 @TcpDiscoveryRedirectToClient
-public class TcpDiscoveryMetricsUpdateMessage extends 
TcpDiscoveryAbstractMessage {
+public class TcpDiscoveryMetricsUpdateMessage extends 
TcpDiscoveryAbstractMessage implements Message {
     /** */
     private static final long serialVersionUID = 0L;
 
-    /** Map to store nodes metrics. */
+    /** Connected clients metrics: server id -> client id -> clients metrics. 
*/
     @GridToStringExclude
-    private final Map<UUID, MetricsSet> metrics = new HashMap<>();
+    @Order(value = 5, method = "connectedClientsMetricsMessages")
+    private Map<UUID, TcpDiscoveryClientNodesMetricsMessage> 
connectedClientsMetricsMsgs;
+
+    /** Servers full metrics: server id -> server metrics + metrics of 
server's caches. */
+    @GridToStringExclude
+    @Order(value = 6, method = "serversFullMetricsMessages")
+    private @Nullable Map<UUID, TcpDiscoveryNodeFullMetricsMessage> 
serversFullMetricsMsgs;
 
     /** Client node IDs. */
-    private final Collection<UUID> clientNodeIds = new HashSet<>();
+    @Order(value = 7)
+    private @Nullable Set<UUID> clientNodeIds;
 
-    /** Cahce metrics by node. */
-    @GridToStringExclude
-    private final Map<UUID, Map<Integer, CacheMetrics>> cacheMetrics = new 
HashMap<>();
+    /** Constructor for {@link DiscoveryMessageFactory}. */
+    public TcpDiscoveryMetricsUpdateMessage() {
+        // No-op.
+    }
 
     /**
      * Constructor.
@@ -79,130 +83,136 @@ public class TcpDiscoveryMetricsUpdateMessage extends 
TcpDiscoveryAbstractMessag
     /**
      * Sets metrics for particular node.
      *
-     * @param nodeId Node ID.
-     * @param metrics Node metrics.
+     * @param srvrId Server ID.
+     * @param newMetrics New server metrics to add.
      */
-    public void setMetrics(UUID nodeId, ClusterMetrics metrics) {
-        assert nodeId != null;
-        assert metrics != null;
-        assert !this.metrics.containsKey(nodeId);
+    public void addServerMetrics(UUID srvrId, ClusterMetrics newMetrics) {
+        assert srvrId != null;
+        assert newMetrics != null;
+
+        if (serversFullMetricsMsgs == null)
+            serversFullMetricsMsgs = new HashMap<>();
+
+        assert !serversFullMetricsMsgs.containsKey(srvrId);
+
+        serversFullMetricsMsgs.compute(srvrId, (srvrId0, srvrFullMetrics) -> {
+            if (srvrFullMetrics == null)
+                srvrFullMetrics = new TcpDiscoveryNodeFullMetricsMessage();
+
+            srvrFullMetrics.nodeMetricsMessage(new 
TcpDiscoveryNodeMetricsMessage(newMetrics));
 
-        this.metrics.put(nodeId, new MetricsSet(metrics));
+            return srvrFullMetrics;
+        });
     }
 
     /**
      * Sets cache metrics for particular node.
      *
-     * @param nodeId Node ID.
-     * @param metrics Node cache metrics.
+     * @param srvrId Server ID.
+     * @param newCachesMetrics News server's caches metrics to add.
      */
-    public void setCacheMetrics(UUID nodeId, Map<Integer, CacheMetrics> 
metrics) {
-        assert nodeId != null;
-        assert metrics != null;
-        assert !this.cacheMetrics.containsKey(nodeId);
+    public void addServerCacheMetrics(UUID srvrId, Map<Integer, CacheMetrics> 
newCachesMetrics) {
+        assert srvrId != null;
+        assert newCachesMetrics != null;
 
-        if (!F.isEmpty(metrics))
-            this.cacheMetrics.put(nodeId, metrics);
+        if (serversFullMetricsMsgs == null)
+            serversFullMetricsMsgs = new HashMap<>();
+
+        assert serversFullMetricsMsgs.containsKey(srvrId) && 
serversFullMetricsMsgs.get(srvrId).cachesMetricsMessages() == null;
+
+        serversFullMetricsMsgs.compute(srvrId, (srvrId0, srvrFullMetrics) -> {
+            if (srvrFullMetrics == null)
+                srvrFullMetrics = new TcpDiscoveryNodeFullMetricsMessage();
+
+            Map<Integer, CacheMetricsMessage> newCachesMsgsMap = 
U.newHashMap(newCachesMetrics.size());
+
+            newCachesMetrics.forEach((cacheId, cacheMetrics) ->
+                newCachesMsgsMap.put(cacheId, new 
TcpDiscoveryCacheMetricsMessage(cacheMetrics)));
+
+            srvrFullMetrics.cachesMetricsMessages(newCachesMsgsMap);
+
+            return srvrFullMetrics;
+        });
     }
 
     /**
-     * Sets metrics for a client node.
+     * Adds metrics for a connected client node.
      *
-     * @param nodeId Server node ID.
-     * @param clientNodeId Client node ID.
-     * @param metrics Node metrics.
+     * @param srvrId Server node ID.
+     * @param clientNodeId Connected client node ID.
+     * @param clientMetrics Client metrics.
      */
-    public void setClientMetrics(UUID nodeId, UUID clientNodeId, 
ClusterMetrics metrics) {
-        assert nodeId != null;
+    public void addClientMetrics(UUID srvrId, UUID clientNodeId, 
ClusterMetrics clientMetrics) {
+        assert srvrId != null;
         assert clientNodeId != null;
-        assert metrics != null;
-        assert this.metrics.containsKey(nodeId);
+        assert clientMetrics != null;
 
-        this.metrics.get(nodeId).addClientMetrics(clientNodeId, metrics);
-    }
+        if (connectedClientsMetricsMsgs == null)
+            connectedClientsMetricsMsgs = new HashMap<>();
 
-    /**
-     * Removes metrics for particular node from the message.
-     *
-     * @param nodeId Node ID.
-     */
-    public void removeMetrics(UUID nodeId) {
-        assert nodeId != null;
+        assert !connectedClientsMetricsMsgs.containsKey(srvrId)
+            || 
connectedClientsMetricsMsgs.get(srvrId).nodesMetricsMessages().get(clientNodeId)
 == null;
 
-        metrics.remove(nodeId);
-    }
+        connectedClientsMetricsMsgs.compute(srvrId, (srvrId0, 
clientsMetricsMsg) -> {
+            if (clientsMetricsMsg == null) {
+                clientsMetricsMsg = new 
TcpDiscoveryClientNodesMetricsMessage();
+                clientsMetricsMsg.nodesMetricsMessages(new HashMap<>());
+            }
 
-    /**
-     * Removes cache metrics for particular node from the message.
-     *
-     * @param nodeId Node ID.
-     */
-    public void removeCacheMetrics(UUID nodeId) {
-        assert nodeId != null;
+            clientsMetricsMsg.nodesMetricsMessages().put(clientNodeId, new 
TcpDiscoveryNodeMetricsMessage(clientMetrics));
 
-        cacheMetrics.remove(nodeId);
+            return clientsMetricsMsg;
+        });
     }
 
     /**
-     * Gets metrics map.
+     * Removes metrics for particular server node from the message.
      *
-     * @return Metrics map.
+     * @param srvrId Server ID.
      */
-    public Map<UUID, MetricsSet> metrics() {
-        return metrics;
-    }
+    public void removeServerMetrics(UUID srvrId) {
+        assert srvrId != null;
+        assert serversFullMetricsMsgs != null;
 
-    /**
-     * Gets cache metrics map.
-     *
-     * @return Cache metrics map.
-     */
-    public Map<UUID, Map<Integer, CacheMetrics>> cacheMetrics() {
-        return cacheMetrics;
+        serversFullMetricsMsgs.remove(srvrId);
     }
 
-    /**
-     * @return {@code True} if this message contains metrics.
-     */
-    public boolean hasMetrics() {
-        return !metrics.isEmpty();
+    /** @return Map of server full metrics messages. */
+    public Map<UUID, TcpDiscoveryNodeFullMetricsMessage> 
serversFullMetricsMessages() {
+        return serversFullMetricsMsgs;
     }
 
-    /**
-     * @return {@code True} this message contains cache metrics.
-     */
-    public boolean hasCacheMetrics() {
-        return !cacheMetrics.isEmpty();
+    /** @param serversFullMetricsMsgs Map of server full metrics messages. */
+    public void serversFullMetricsMessages(Map<UUID, 
TcpDiscoveryNodeFullMetricsMessage> serversFullMetricsMsgs) {
+        this.serversFullMetricsMsgs = serversFullMetricsMsgs;
     }
 
-    /**
-     * @param nodeId Node ID.
-     * @return {@code True} if this message contains metrics.
-     */
-    public boolean hasMetrics(UUID nodeId) {
-        assert nodeId != null;
+    /** @return Map of nodes metrics messages. */
+    public @Nullable Map<UUID, TcpDiscoveryClientNodesMetricsMessage> 
connectedClientsMetricsMessages() {
+        return connectedClientsMetricsMsgs;
+    }
 
-        return metrics.get(nodeId) != null;
+    /** @param connectedClientsMetricsMsgs Map of nodes metrics messages. */
+    public void connectedClientsMetricsMessages(Map<UUID, 
TcpDiscoveryClientNodesMetricsMessage> connectedClientsMetricsMsgs) {
+        this.connectedClientsMetricsMsgs = connectedClientsMetricsMsgs;
     }
 
     /**
-     * @param nodeId Node ID.
+     * Gets client node IDs.
      *
-     * @return {@code True} if this message contains cache metrics for 
particular node.
+     * @return Client node IDs.
      */
-    public boolean hasCacheMetrics(UUID nodeId) {
-        assert nodeId != null;
-
-        return cacheMetrics.get(nodeId) != null;
+    public @Nullable Set<UUID> clientNodeIds() {
+        return clientNodeIds;
     }
 
     /**
-     * Gets client node IDs for  particular node.
+     * Sets client node IDs.
      *
-     * @return Client node IDs.
+     * @param clientNodeIds Client node IDs.
      */
-    public Collection<UUID> clientNodeIds() {
-        return clientNodeIds;
+    public void clientNodeIds(@Nullable Set<UUID> clientNodeIds) {
+        this.clientNodeIds = clientNodeIds;
     }
 
     /**
@@ -211,6 +221,9 @@ public class TcpDiscoveryMetricsUpdateMessage extends 
TcpDiscoveryAbstractMessag
      * @param clientNodeId Client node ID.
      */
     public void addClientNodeId(UUID clientNodeId) {
+        if (clientNodeIds == null)
+            clientNodeIds = new HashSet<>();
+
         clientNodeIds.add(clientNodeId);
     }
 
@@ -220,115 +233,12 @@ public class TcpDiscoveryMetricsUpdateMessage extends 
TcpDiscoveryAbstractMessag
     }
 
     /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(TcpDiscoveryMetricsUpdateMessage.class, this, 
"super", super.toString());
+    @Override public short directType() {
+        return 14;
     }
 
-    /**
-     * @param nodeId Node ID.
-     * @param metrics Metrics.
-     * @return Serialized metrics.
-     */
-    private static byte[] serializeMetrics(UUID nodeId, ClusterMetrics 
metrics) {
-        assert nodeId != null;
-        assert metrics != null;
-
-        byte[] buf = new byte[16 + ClusterMetricsSnapshot.METRICS_SIZE];
-
-        U.longToBytes(nodeId.getMostSignificantBits(), buf, 0);
-        U.longToBytes(nodeId.getLeastSignificantBits(), buf, 8);
-
-        ClusterMetricsSnapshot.serialize(buf, 16, metrics);
-
-        return buf;
-    }
-
-    /**
-     */
-    @SuppressWarnings("PublicInnerClass")
-    public static class MetricsSet implements Externalizable {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** Metrics. */
-        private byte[] metrics;
-
-        /** Client metrics. */
-        private Collection<byte[]> clientMetrics;
-
-        /**
-         */
-        public MetricsSet() {
-            // No-op.
-        }
-
-        /**
-         * @param metrics Metrics.
-         */
-        public MetricsSet(ClusterMetrics metrics) {
-            assert metrics != null;
-
-            this.metrics = ClusterMetricsSnapshot.serialize(metrics);
-        }
-
-        /**
-         * @return Deserialized metrics.
-         */
-        public ClusterMetrics metrics() {
-            return ClusterMetricsSnapshot.deserialize(metrics, 0);
-        }
-
-        /**
-         * @return Client metrics.
-         */
-        public Collection<T2<UUID, ClusterMetrics>> clientMetrics() {
-            return F.viewReadOnly(clientMetrics, new C1<byte[], T2<UUID, 
ClusterMetrics>>() {
-                @Override public T2<UUID, ClusterMetrics> apply(byte[] bytes) {
-                    UUID nodeId = new UUID(U.bytesToLong(bytes, 0), 
U.bytesToLong(bytes, 8));
-
-                    return new T2<>(nodeId, 
ClusterMetricsSnapshot.deserialize(bytes, 16));
-                }
-            });
-        }
-
-        /**
-         * @param nodeId Client node ID.
-         * @param metrics Client metrics.
-         */
-        private void addClientMetrics(UUID nodeId, ClusterMetrics metrics) {
-            assert nodeId != null;
-            assert metrics != null;
-
-            if (clientMetrics == null)
-                clientMetrics = new ArrayList<>();
-
-            clientMetrics.add(serializeMetrics(nodeId, metrics));
-        }
-
-        /** {@inheritDoc} */
-        @Override public void writeExternal(ObjectOutput out) throws 
IOException {
-            U.writeByteArray(out, metrics);
-
-            out.writeInt(clientMetrics != null ? clientMetrics.size() : -1);
-
-            if (clientMetrics != null) {
-                for (byte[] arr : clientMetrics)
-                    U.writeByteArray(out, arr);
-            }
-        }
-
-        /** {@inheritDoc} */
-        @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
-            metrics = U.readByteArray(in);
-
-            int clientMetricsSize = in.readInt();
-
-            if (clientMetricsSize >= 0) {
-                clientMetrics = new ArrayList<>(clientMetricsSize);
-
-                for (int i = 0; i < clientMetricsSize; i++)
-                    clientMetrics.add(U.readByteArray(in));
-            }
-        }
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(TcpDiscoveryMetricsUpdateMessage.class, this, 
"super", super.toString());
     }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeMetricsMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeFullMetricsMessage.java
similarity index 61%
copy from 
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeMetricsMessage.java
copy to 
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeFullMetricsMessage.java
index 0c12acee04a..7d121816da4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeMetricsMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeFullMetricsMessage.java
@@ -17,34 +17,28 @@
 
 package org.apache.ignite.spi.discovery.tcp.messages;
 
-import org.apache.ignite.cluster.ClusterMetrics;
 import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory;
-import org.apache.ignite.internal.processors.cluster.NodeMetricsMessage;
+import org.apache.ignite.internal.processors.cluster.NodeFullMetricsMessage;
 import org.apache.ignite.internal.util.typedef.internal.S;
 
 /**
- * We cannot directly reuse `NodeMetricsMessage` in Discovery as it is 
registered in a message factory of Communication
- * component and thus is unavailable in Discovery. We have to extend 
`NodeMetricsMessage` and register this subclass in
- * message factory of Discovery component.
+ * We cannot directly reuse {@link NodeFullMetricsMessage} in Discovery as it 
is registered in a message factory of
+ * Communication component and thus is unavailable in Discovery. We have to 
extend {@link NodeFullMetricsMessage} and
+ * register this subclass in message factory of Discovery component.
  */
-public class TcpDiscoveryNodeMetricsMessage extends NodeMetricsMessage {
+public class TcpDiscoveryNodeFullMetricsMessage extends NodeFullMetricsMessage 
{
     /** Constructor for {@link DiscoveryMessageFactory}. */
-    public TcpDiscoveryNodeMetricsMessage() {
+    public TcpDiscoveryNodeFullMetricsMessage() {
         // No-op.
     }
 
-    /** @param metrics Metrics. */
-    public TcpDiscoveryNodeMetricsMessage(ClusterMetrics metrics) {
-        super(metrics);
-    }
-
     /** {@inheritDoc} */
     @Override public short directType() {
-        return -102;
+        return -105;
     }
 
     /** {@inheritDoc} */
     @Override public String toString() {
-        return S.toString(TcpDiscoveryNodeMetricsMessage.class, this, "super", 
super.toString());
+        return S.toString(TcpDiscoveryNodeFullMetricsMessage.class, this, 
"super", super.toString());
     }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeMetricsMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeMetricsMessage.java
index 0c12acee04a..f62b3df0872 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeMetricsMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeMetricsMessage.java
@@ -23,9 +23,9 @@ import 
org.apache.ignite.internal.processors.cluster.NodeMetricsMessage;
 import org.apache.ignite.internal.util.typedef.internal.S;
 
 /**
- * We cannot directly reuse `NodeMetricsMessage` in Discovery as it is 
registered in a message factory of Communication
- * component and thus is unavailable in Discovery. We have to extend 
`NodeMetricsMessage` and register this subclass in
- * message factory of Discovery component.
+ * We cannot directly reuse {@link NodeMetricsMessage} in Discovery as it is 
registered in a message factory of
+ * Communication component and thus is unavailable in Discovery. We have to 
extend {@link NodeMetricsMessage} and
+ * register this subclass in message factory of Discovery component.
  */
 public class TcpDiscoveryNodeMetricsMessage extends NodeMetricsMessage {
     /** Constructor for {@link DiscoveryMessageFactory}. */
@@ -33,9 +33,9 @@ public class TcpDiscoveryNodeMetricsMessage extends 
NodeMetricsMessage {
         // No-op.
     }
 
-    /** @param metrics Metrics. */
-    public TcpDiscoveryNodeMetricsMessage(ClusterMetrics metrics) {
-        super(metrics);
+    /** @param nodeMetrics Node metrics. */
+    public TcpDiscoveryNodeMetricsMessage(ClusterMetrics nodeMetrics) {
+        super(nodeMetrics);
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsCacheSizeTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsCacheSizeTest.java
index d21204e99e1..77616ae9e8d 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsCacheSizeTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsCacheSizeTest.java
@@ -17,9 +17,11 @@
 
 package org.apache.ignite.internal.processors.cache;
 
+import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheMetrics;
@@ -27,7 +29,13 @@ import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.internal.ClusterMetricsSnapshot;
+import org.apache.ignite.internal.direct.DirectMessageReader;
+import org.apache.ignite.internal.direct.DirectMessageWriter;
+import org.apache.ignite.internal.processors.cluster.CacheMetricsMessage;
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
+import org.apache.ignite.plugin.extensions.communication.MessageSerializer;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryMetricsUpdateMessage;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.Test;
@@ -92,23 +100,50 @@ public class CacheMetricsCacheSizeTest extends 
GridCommonAbstractTest {
 
         TcpDiscoveryMetricsUpdateMessage msg = new 
TcpDiscoveryMetricsUpdateMessage(UUID.randomUUID());
 
-        msg.setCacheMetrics(UUID.randomUUID(), cacheMetrics);
+        UUID srvrId = UUID.randomUUID();
 
-        Marshaller marshaller = marshaller(grid(0));
+        msg.addServerMetrics(srvrId, new ClusterMetricsSnapshot());
+        msg.addServerCacheMetrics(srvrId, cacheMetrics);
 
-        byte[] buf = marshaller.marshal(msg);
+        MessageFactory msgFactory = 
((TcpDiscoverySpi)grid(0).context().discovery().getInjectedDiscoverySpi()).messageFactory();
+        MessageSerializer msgSerializer = 
msgFactory.serializer(msg.directType());
 
-        Object readObj = marshaller.unmarshal(buf, 
getClass().getClassLoader());
+        // First time we write initial message type which is not read by the 
reader because the message type is known.
+        // We have to skip this header at the further message reading.
+        AtomicInteger initHdrSize = new AtomicInteger();
 
-        assertTrue(readObj instanceof TcpDiscoveryMetricsUpdateMessage);
+        DirectMessageWriter msgWritter = new DirectMessageWriter(msgFactory) {
+            @Override public void onHeaderWritten() {
+                super.onHeaderWritten();
 
-        TcpDiscoveryMetricsUpdateMessage msg2 = 
(TcpDiscoveryMetricsUpdateMessage)readObj;
+                initHdrSize.compareAndSet(0, getBuffer().position());
+            }
+        };
 
-        Map<Integer, CacheMetrics> cacheMetrics2 = 
msg2.cacheMetrics().values().iterator().next();
+        // 2kb should be enough for an empty message even if it is a 
relatively large metrics message.
+        msgWritter.setBuffer(ByteBuffer.allocate(2048));
 
-        CacheMetrics cacheMetric2 = cacheMetrics2.values().iterator().next();
+        assertTrue(msgSerializer.writeTo(msg, msgWritter));
 
-        assertEquals("TcpDiscoveryMetricsUpdateMessage serialization error, 
cacheSize is different", size, cacheMetric2.getCacheSize());
+        assertTrue(msgWritter.getBuffer().hasRemaining());
+
+        DirectMessageReader msgReader = new DirectMessageReader(msgFactory, 
null);
+        msgReader.setBuffer(msgWritter.getBuffer());
+
+        msgWritter.getBuffer().rewind();
+        msgWritter.getBuffer().position(initHdrSize.get());
+
+        TcpDiscoveryMetricsUpdateMessage msg2 = new 
TcpDiscoveryMetricsUpdateMessage();
+
+        assertTrue(msgSerializer.readFrom(msg2, msgReader));
+
+        Map<Integer, CacheMetricsMessage> cacheMetrics2 = 
msg2.serversFullMetricsMessages().values().iterator().next()
+            .cachesMetricsMessages();
+
+        CacheMetrics cacheMetric2 = new 
CacheMetricsSnapshot(cacheMetrics2.values().iterator().next());
+
+        assertEquals("TcpDiscoveryMetricsUpdateMessage serialization error, 
cacheSize is different", size,
+            cacheMetric2.getCacheSize());
 
         IgniteCache cacheNode1 = grid(1).cache(DEFAULT_CACHE_NAME);
 

Reply via email to