This is an automated email from the ASF dual-hosted git repository.

sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 65ad7638e9e IGNITE-27417 : Use MessageSerializer for 
TcpDiscoveryClientMetricsUpdateMessage (#12598)
65ad7638e9e is described below

commit 65ad7638e9e4e944411ecb8460528005e613ff05
Author: Vladimir Steshin <[email protected]>
AuthorDate: Mon Jan 12 11:01:06 2026 +0300

    IGNITE-27417 : Use MessageSerializer for 
TcpDiscoveryClientMetricsUpdateMessage (#12598)
---
 .../discovery/DiscoveryMessageFactory.java         |   6 ++
 .../processors/cluster/NodeMetricsMessage.java     | 104 ++++++++++-----------
 .../ignite/spi/discovery/tcp/ServerImpl.java       |   3 +-
 .../spi/discovery/tcp/TcpDiscoveryIoSession.java   |  49 +++++++---
 .../TcpDiscoveryClientMetricsUpdateMessage.java    |  38 ++++++--
 ...ge.java => TcpDiscoveryNodeMetricsMessage.java} |  47 +++-------
 6 files changed, 143 insertions(+), 104 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
index 1c01592fd0d..5f399ed0c46 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
@@ -21,6 +21,7 @@ import 
org.apache.ignite.internal.codegen.InetAddressMessageSerializer;
 import org.apache.ignite.internal.codegen.InetSocketAddressMessageSerializer;
 import 
org.apache.ignite.internal.codegen.TcpDiscoveryAuthFailedMessageSerializer;
 import 
org.apache.ignite.internal.codegen.TcpDiscoveryCheckFailedMessageSerializer;
+import 
org.apache.ignite.internal.codegen.TcpDiscoveryClientMetricsUpdateMessageSerializer;
 import 
org.apache.ignite.internal.codegen.TcpDiscoveryClientPingRequestSerializer;
 import 
org.apache.ignite.internal.codegen.TcpDiscoveryClientPingResponseSerializer;
 import 
org.apache.ignite.internal.codegen.TcpDiscoveryConnectionCheckMessageSerializer;
@@ -29,6 +30,7 @@ import 
org.apache.ignite.internal.codegen.TcpDiscoveryDuplicateIdMessageSerializ
 import 
org.apache.ignite.internal.codegen.TcpDiscoveryHandshakeRequestSerializer;
 import 
org.apache.ignite.internal.codegen.TcpDiscoveryHandshakeResponseSerializer;
 import 
org.apache.ignite.internal.codegen.TcpDiscoveryLoopbackProblemMessageSerializer;
+import 
org.apache.ignite.internal.codegen.TcpDiscoveryNodeMetricsMessageSerializer;
 import org.apache.ignite.internal.codegen.TcpDiscoveryPingRequestSerializer;
 import org.apache.ignite.internal.codegen.TcpDiscoveryPingResponseSerializer;
 import 
org.apache.ignite.internal.codegen.TcpDiscoveryRingLatencyCheckMessageSerializer;
@@ -38,6 +40,7 @@ import 
org.apache.ignite.spi.discovery.tcp.messages.InetAddressMessage;
 import org.apache.ignite.spi.discovery.tcp.messages.InetSocketAddressMessage;
 import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAuthFailedMessage;
 import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCheckFailedMessage;
+import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientMetricsUpdateMessage;
 import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingRequest;
 import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingResponse;
 import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryConnectionCheckMessage;
@@ -46,6 +49,7 @@ import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryDuplicateIdMessa
 import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeRequest;
 import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeResponse;
 import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryLoopbackProblemMessage;
+import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeMetricsMessage;
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingRequest;
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse;
 import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryRingLatencyCheckMessage;
@@ -54,6 +58,7 @@ import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryRingLatencyCheck
 public class DiscoveryMessageFactory implements MessageFactoryProvider {
     /** {@inheritDoc} */
     @Override public void registerAll(MessageFactory factory) {
+        factory.register((short)-102, TcpDiscoveryNodeMetricsMessage::new, new 
TcpDiscoveryNodeMetricsMessageSerializer());
         factory.register((short)-101, InetSocketAddressMessage::new, new 
InetSocketAddressMessageSerializer());
         factory.register((short)-100, InetAddressMessage::new, new 
InetAddressMessageSerializer());
 
@@ -70,5 +75,6 @@ public class DiscoveryMessageFactory implements 
MessageFactoryProvider {
         factory.register((short)10, TcpDiscoveryHandshakeResponse::new, new 
TcpDiscoveryHandshakeResponseSerializer());
         factory.register((short)11, TcpDiscoveryAuthFailedMessage::new, new 
TcpDiscoveryAuthFailedMessageSerializer());
         factory.register((short)12, TcpDiscoveryDuplicateIdMessage::new, new 
TcpDiscoveryDuplicateIdMessageSerializer());
+        factory.register((short)13, 
TcpDiscoveryClientMetricsUpdateMessage::new, new 
TcpDiscoveryClientMetricsUpdateMessageSerializer());
     }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/NodeMetricsMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/NodeMetricsMessage.java
index f11ea1d48c0..55b51cb041d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/NodeMetricsMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/NodeMetricsMessage.java
@@ -137,20 +137,20 @@ public class NodeMetricsMessage implements Message {
     private long curIdleTime = -1;
 
     /** */
-    @Order(value = 25, method = "totalCpus")
-    private int availProcs = -1;
+    @Order(value = 25)
+    private int totalCpus = -1;
 
     /** */
     @Order(value = 26, method = "currentCpuLoad")
-    private double load = -1;
+    private double curCpuLoad = -1;
 
     /** */
     @Order(value = 27, method = "averageCpuLoad")
-    private double avgLoad = -1;
+    private double avgCpuLoad = -1;
 
     /** */
     @Order(value = 28, method = "currentGcCpuLoad")
-    private double gcLoad = -1;
+    private double curGcCpuLoad = -1;
 
     /** */
     @Order(value = 29, method = "heapMemoryInitialized")
@@ -173,15 +173,15 @@ public class NodeMetricsMessage implements Message {
     private long heapTotal = -1;
 
     /** */
-    @Order(value = 34, method = "heapMemoryInitialized")
+    @Order(value = 34, method = "nonHeapMemoryInitialized")
     private long nonHeapInit = -1;
 
     /** */
-    @Order(value = 35, method = "heapMemoryUsed")
+    @Order(value = 35, method = "nonHeapMemoryUsed")
     private long nonHeapUsed = -1;
 
     /** */
-    @Order(value = 36, method = "heapMemoryCommitted")
+    @Order(value = 36, method = "nonHeapMemoryCommitted")
     private long nonHeapCommitted = -1;
 
     /** */
@@ -253,8 +253,8 @@ public class NodeMetricsMessage implements Message {
     private long totalJobsExecTime = -1;
 
     /** */
-    @Order(value = 54)
-    private long currentPmeDuration = -1;
+    @Order(value = 54, method = "currentPmeDuration")
+    private long curPmeDuration = -1;
 
     /** */
     public NodeMetricsMessage() {
@@ -295,10 +295,10 @@ public class NodeMetricsMessage implements Message {
         totalExecTasks = 0;
         totalIdleTime = 0;
         curIdleTime = 0;
-        availProcs = 0;
-        load = 0;
-        avgLoad = 0;
-        gcLoad = 0;
+        totalCpus = 0;
+        curCpuLoad = 0;
+        avgCpuLoad = 0;
+        curGcCpuLoad = 0;
         heapInit = 0;
         heapUsed = 0;
         heapCommitted = 0;
@@ -323,7 +323,7 @@ public class NodeMetricsMessage implements Message {
         outMesQueueSize = 0;
         heapTotal = 0;
         totalNodes = nodes.size();
-        currentPmeDuration = 0;
+        curPmeDuration = 0;
 
         for (ClusterNode node : nodes) {
             ClusterMetrics m = node.metrics();
@@ -399,9 +399,9 @@ public class NodeMetricsMessage implements Message {
             rcvdBytesCnt += m.getReceivedBytesCount();
             outMesQueueSize += m.getOutboundMessagesQueueSize();
 
-            avgLoad += m.getCurrentCpuLoad();
+            avgCpuLoad += m.getCurrentCpuLoad();
 
-            currentPmeDuration = max(currentPmeDuration, 
m.getCurrentPmeDuration());
+            curPmeDuration = max(curPmeDuration, m.getCurrentPmeDuration());
         }
 
         curJobExecTime /= size;
@@ -412,7 +412,7 @@ public class NodeMetricsMessage implements Message {
         avgWaitingJobs /= size;
         avgJobExecTime /= size;
         avgJobWaitTime /= size;
-        avgLoad /= size;
+        avgCpuLoad /= size;
 
         if (!F.isEmpty(nodes)) {
             ClusterMetrics oldestNodeMetrics = oldest(nodes).metrics();
@@ -423,9 +423,9 @@ public class NodeMetricsMessage implements Message {
 
         Map<String, Collection<ClusterNode>> neighborhood = 
U.neighborhood(nodes);
 
-        gcLoad = gcCpus(neighborhood);
-        load = cpus(neighborhood);
-        availProcs = cpuCnt(neighborhood);
+        curGcCpuLoad = currentGcCpuLoad(neighborhood);
+        curCpuLoad = currentCpuLoad(neighborhood);
+        totalCpus = cpuCnt(neighborhood);
     }
 
     /** */
@@ -464,10 +464,10 @@ public class NodeMetricsMessage implements Message {
         curIdleTime = metrics.getCurrentIdleTime();
         totalIdleTime = metrics.getTotalIdleTime();
 
-        availProcs = metrics.getTotalCpus();
-        load = metrics.getCurrentCpuLoad();
-        avgLoad = metrics.getAverageCpuLoad();
-        gcLoad = metrics.getCurrentGcCpuLoad();
+        totalCpus = metrics.getTotalCpus();
+        curCpuLoad = metrics.getCurrentCpuLoad();
+        avgCpuLoad = metrics.getAverageCpuLoad();
+        curGcCpuLoad = metrics.getCurrentGcCpuLoad();
 
         heapInit = metrics.getHeapMemoryInitialized();
         heapUsed = metrics.getHeapMemoryUsed();
@@ -487,7 +487,7 @@ public class NodeMetricsMessage implements Message {
 
         lastDataVer = metrics.getLastDataVersion();
 
-        currentPmeDuration = metrics.getCurrentPmeDuration();
+        curPmeDuration = metrics.getCurrentPmeDuration();
 
         totalNodes = metrics.getTotalNodes();
 
@@ -885,22 +885,22 @@ public class NodeMetricsMessage implements Message {
 
     /** */
     public int totalCpus() {
-        return availProcs;
+        return totalCpus;
     }
 
     /** */
     public double currentCpuLoad() {
-        return load;
+        return curCpuLoad;
     }
 
     /** */
     public double averageCpuLoad() {
-        return avgLoad;
+        return avgCpuLoad;
     }
 
     /** */
     public double currentGcCpuLoad() {
-        return gcLoad;
+        return curGcCpuLoad;
     }
 
     /** */
@@ -1020,43 +1020,43 @@ public class NodeMetricsMessage implements Message {
 
     /** */
     public long currentPmeDuration() {
-        return currentPmeDuration;
+        return curPmeDuration;
     }
 
     /**
      * Sets available processors.
      *
-     * @param availProcs Available processors.
+     * @param totalCpus Available processors.
      */
-    public void totalCpus(int availProcs) {
-        this.availProcs = availProcs;
+    public void totalCpus(int totalCpus) {
+        this.totalCpus = totalCpus;
     }
 
     /**
      * Sets current CPU load.
      *
-     * @param load Current CPU load.
+     * @param curCpuLoad Current CPU load.
      */
-    public void currentCpuLoad(double load) {
-        this.load = load;
+    public void currentCpuLoad(double curCpuLoad) {
+        this.curCpuLoad = curCpuLoad;
     }
 
     /**
      * Sets CPU load average over the metrics history.
      *
-     * @param avgLoad CPU load average.
+     * @param avgCpuLoad CPU load average.
      */
-    public void averageCpuLoad(double avgLoad) {
-        this.avgLoad = avgLoad;
+    public void averageCpuLoad(double avgCpuLoad) {
+        this.avgCpuLoad = avgCpuLoad;
     }
 
     /**
      * Sets current GC load.
      *
-     * @param gcLoad Current GC load.
+     * @param curGcCpuLoad Current GC load.
      */
-    public void currentGcCpuLoad(double gcLoad) {
-        this.gcLoad = gcLoad;
+    public void currentGcCpuLoad(double curGcCpuLoad) {
+        this.curGcCpuLoad = curGcCpuLoad;
     }
 
     /**
@@ -1263,7 +1263,7 @@ public class NodeMetricsMessage implements Message {
      * @param curPmeDuration Execution duration for current partition map 
exchange.
      */
     public void currentPmeDuration(long curPmeDuration) {
-        this.currentPmeDuration = curPmeDuration;
+        this.curPmeDuration = curPmeDuration;
     }
 
     /**
@@ -1308,36 +1308,36 @@ public class NodeMetricsMessage implements Message {
      * @param neighborhood Cluster neighborhood.
      * @return CPU load.
      */
-    private static int cpus(Map<String, Collection<ClusterNode>> neighborhood) 
{
-        int cpus = 0;
+    private static double currentCpuLoad(Map<String, Collection<ClusterNode>> 
neighborhood) {
+        double curCpuLoad = 0.0;
 
         for (Collection<ClusterNode> nodes : neighborhood.values()) {
             ClusterNode first = F.first(nodes);
 
             // Projection can be empty if all nodes in it failed.
             if (first != null)
-                cpus += first.metrics().getCurrentCpuLoad();
+                curCpuLoad += first.metrics().getCurrentCpuLoad();
         }
 
-        return cpus;
+        return curCpuLoad;
     }
 
     /**
      * @param neighborhood Cluster neighborhood.
      * @return GC CPU load.
      */
-    private static int gcCpus(Map<String, Collection<ClusterNode>> 
neighborhood) {
-        int cpus = 0;
+    private static double currentGcCpuLoad(Map<String, 
Collection<ClusterNode>> neighborhood) {
+        double curGcCpuLoad = 0;
 
         for (Collection<ClusterNode> nodes : neighborhood.values()) {
             ClusterNode first = F.first(nodes);
 
             // Projection can be empty if all nodes in it failed.
             if (first != null)
-                cpus += first.metrics().getCurrentGcCpuLoad();
+                curGcCpuLoad += first.metrics().getCurrentGcCpuLoad();
         }
 
-        return cpus;
+        return curGcCpuLoad;
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index aef67a57809..44ca6f0093f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -72,6 +72,7 @@ import org.apache.ignite.cluster.ClusterMetrics;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.NodeValidationFailedEvent;
 import org.apache.ignite.failure.FailureContext;
+import org.apache.ignite.internal.ClusterMetricsSnapshot;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
@@ -7580,7 +7581,7 @@ class ServerImpl extends TcpDiscoveryImpl {
             ClientMessageWorker wrk = 
clientMsgWorkers.get(msg.creatorNodeId());
 
             if (wrk != null)
-                wrk.metrics(msg.metrics());
+                wrk.metrics(new ClusterMetricsSnapshot(msg.metricsMessage()));
             else if (log.isDebugEnabled())
                 log.debug("Received client metrics update message from unknown 
client node: " + msg);
         }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java
index 5c1946f0eac..edcab473b4c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java
@@ -170,13 +170,12 @@ public class TcpDiscoveryIoSession {
             if (MESSAGE_SERIALIZATION != serMode) {
                 detectSslAlert(serMode, in);
 
-                throw new IgniteCheckedException("Received unexpected byte 
while reading discovery message: " + serMode);
+                // IOException type is important for ServerImpl. It may search 
the cause (X.hasCause).
+                // The connection error processing behavior depends on it.
+                throw new IOException("Received unexpected byte while reading 
discovery message: " + serMode);
             }
 
-            byte b0 = (byte)in.read();
-            byte b1 = (byte)in.read();
-
-            Message msg = spi.messageFactory().create(makeMessageType(b0, b1));
+            Message msg = 
spi.messageFactory().create(makeMessageType((byte)in.read(), (byte)in.read()));
 
             msgReader.reset();
             msgReader.setBuffer(msgBuf);
@@ -185,19 +184,47 @@ public class TcpDiscoveryIoSession {
 
             boolean finished;
 
-            do {
-                // Should be cleared before first operation.
-                msgBuf.clear();
+            msgBuf.clear();
 
-                int read = in.read(msgBuf.array(), 0, msgBuf.limit());
+            do {
+                int read = in.read(msgBuf.array(), msgBuf.position(), 
msgBuf.remaining());
 
                 if (read == -1)
                     throw new EOFException("Connection closed before message 
was fully read.");
 
-                msgBuf.limit(read);
+                if (msgBuf.position() > 0) {
+                    msgBuf.limit(msgBuf.position() + read);
+
+                    // We've stored an unprocessed tail before.
+                    msgBuf.rewind();
+                }
+                else
+                    msgBuf.limit(read);
 
                 finished = msgSer.readFrom(msg, msgReader);
-            } while (!finished);
+
+                // We rely on the fact that Discovery only sends next message 
upon receiving a receipt for the previous one.
+                // This behaviour guarantees that we never read a next message 
from the buffer right after the end of
+                // the previous message.
+                assert msgBuf.remaining() == 0 || !finished : "Some data was 
read from the socket but left unprocessed.";
+
+                if (finished)
+                    break;
+
+                // We must keep the uprocessed bytes read from the socket. It 
won't return them again.
+                byte[] unprocessedTail = null;
+
+                if (msgBuf.remaining() > 0) {
+                    unprocessedTail = new byte[msgBuf.remaining()];
+                    msgBuf.get(unprocessedTail, 0, msgBuf.remaining());
+                }
+
+                msgBuf.clear();
+
+                if (unprocessedTail != null)
+                    msgBuf.put(unprocessedTail);
+            }
+            while (true);
 
             return (T)msg;
         }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientMetricsUpdateMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientMetricsUpdateMessage.java
index 8092ef3a725..f110c40e8cb 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientMetricsUpdateMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientMetricsUpdateMessage.java
@@ -19,20 +19,28 @@ package org.apache.ignite.spi.discovery.tcp.messages;
 
 import java.util.UUID;
 import org.apache.ignite.cluster.ClusterMetrics;
-import org.apache.ignite.internal.ClusterMetricsSnapshot;
+import org.apache.ignite.internal.Order;
+import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
 
 /**
  * Metrics update message.
  * <p>
  * Client sends his metrics in this message.
  */
-public class TcpDiscoveryClientMetricsUpdateMessage extends 
TcpDiscoveryAbstractMessage {
+public class TcpDiscoveryClientMetricsUpdateMessage extends 
TcpDiscoveryAbstractMessage implements Message {
     /** */
     private static final long serialVersionUID = 0L;
 
     /** */
-    private final byte[] metrics;
+    @Order(value = 5, method = "metricsMessage")
+    private TcpDiscoveryNodeMetricsMessage metricsMsg;
+
+    /** Constructor for {@link DiscoveryMessageFactory}. */
+    public TcpDiscoveryClientMetricsUpdateMessage() {
+        // No-op.
+    }
 
     /**
      * Constructor.
@@ -43,16 +51,30 @@ public class TcpDiscoveryClientMetricsUpdateMessage extends 
TcpDiscoveryAbstract
     public TcpDiscoveryClientMetricsUpdateMessage(UUID creatorNodeId, 
ClusterMetrics metrics) {
         super(creatorNodeId);
 
-        this.metrics = ClusterMetricsSnapshot.serialize(metrics);
+        metricsMsg = new TcpDiscoveryNodeMetricsMessage(metrics);
+    }
+
+    /**
+     * Gets the metrics message.
+     *
+     * @return Metrics holder message.
+     */
+    public TcpDiscoveryNodeMetricsMessage metricsMessage() {
+        return metricsMsg;
     }
 
     /**
-     * Gets metrics map.
+     * Sets the metrics message.
      *
-     * @return Metrics map.
+     * @param metricsMsg Metrics holder message.
      */
-    public ClusterMetrics metrics() {
-        return ClusterMetricsSnapshot.deserialize(metrics, 0);
+    public void metricsMessage(TcpDiscoveryNodeMetricsMessage metricsMsg) {
+        this.metricsMsg = metricsMsg;
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return 13;
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientMetricsUpdateMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeMetricsMessage.java
similarity index 51%
copy from 
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientMetricsUpdateMessage.java
copy to 
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeMetricsMessage.java
index 8092ef3a725..0c12acee04a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientMetricsUpdateMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeMetricsMessage.java
@@ -17,51 +17,34 @@
 
 package org.apache.ignite.spi.discovery.tcp.messages;
 
-import java.util.UUID;
 import org.apache.ignite.cluster.ClusterMetrics;
-import org.apache.ignite.internal.ClusterMetricsSnapshot;
+import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory;
+import org.apache.ignite.internal.processors.cluster.NodeMetricsMessage;
 import org.apache.ignite.internal.util.typedef.internal.S;
 
 /**
- * Metrics update message.
- * <p>
- * Client sends his metrics in this message.
+ * We cannot directly reuse `NodeMetricsMessage` in Discovery as it is 
registered in a message factory of Communication
+ * component and thus is unavailable in Discovery. We have to extend 
`NodeMetricsMessage` and register this subclass in
+ * message factory of Discovery component.
  */
-public class TcpDiscoveryClientMetricsUpdateMessage extends 
TcpDiscoveryAbstractMessage {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** */
-    private final byte[] metrics;
-
-    /**
-     * Constructor.
-     *
-     * @param creatorNodeId Creator node.
-     * @param metrics Metrics.
-     */
-    public TcpDiscoveryClientMetricsUpdateMessage(UUID creatorNodeId, 
ClusterMetrics metrics) {
-        super(creatorNodeId);
-
-        this.metrics = ClusterMetricsSnapshot.serialize(metrics);
+public class TcpDiscoveryNodeMetricsMessage extends NodeMetricsMessage {
+    /** Constructor for {@link DiscoveryMessageFactory}. */
+    public TcpDiscoveryNodeMetricsMessage() {
+        // No-op.
     }
 
-    /**
-     * Gets metrics map.
-     *
-     * @return Metrics map.
-     */
-    public ClusterMetrics metrics() {
-        return ClusterMetricsSnapshot.deserialize(metrics, 0);
+    /** @param metrics Metrics. */
+    public TcpDiscoveryNodeMetricsMessage(ClusterMetrics metrics) {
+        super(metrics);
     }
 
     /** {@inheritDoc} */
-    @Override public boolean traceLogLevel() {
-        return true;
+    @Override public short directType() {
+        return -102;
     }
 
     /** {@inheritDoc} */
     @Override public String toString() {
-        return S.toString(TcpDiscoveryClientMetricsUpdateMessage.class, this, 
"super", super.toString());
+        return S.toString(TcpDiscoveryNodeMetricsMessage.class, this, "super", 
super.toString());
     }
 }

Reply via email to