This is an automated email from the ASF dual-hosted git repository.
sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 65ad7638e9e IGNITE-27417 : Use MessageSerializer for
TcpDiscoveryClientMetricsUpdateMessage (#12598)
65ad7638e9e is described below
commit 65ad7638e9e4e944411ecb8460528005e613ff05
Author: Vladimir Steshin <[email protected]>
AuthorDate: Mon Jan 12 11:01:06 2026 +0300
IGNITE-27417 : Use MessageSerializer for
TcpDiscoveryClientMetricsUpdateMessage (#12598)
---
.../discovery/DiscoveryMessageFactory.java | 6 ++
.../processors/cluster/NodeMetricsMessage.java | 104 ++++++++++-----------
.../ignite/spi/discovery/tcp/ServerImpl.java | 3 +-
.../spi/discovery/tcp/TcpDiscoveryIoSession.java | 49 +++++++---
.../TcpDiscoveryClientMetricsUpdateMessage.java | 38 ++++++--
...ge.java => TcpDiscoveryNodeMetricsMessage.java} | 47 +++-------
6 files changed, 143 insertions(+), 104 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
index 1c01592fd0d..5f399ed0c46 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
@@ -21,6 +21,7 @@ import
org.apache.ignite.internal.codegen.InetAddressMessageSerializer;
import org.apache.ignite.internal.codegen.InetSocketAddressMessageSerializer;
import
org.apache.ignite.internal.codegen.TcpDiscoveryAuthFailedMessageSerializer;
import
org.apache.ignite.internal.codegen.TcpDiscoveryCheckFailedMessageSerializer;
+import
org.apache.ignite.internal.codegen.TcpDiscoveryClientMetricsUpdateMessageSerializer;
import
org.apache.ignite.internal.codegen.TcpDiscoveryClientPingRequestSerializer;
import
org.apache.ignite.internal.codegen.TcpDiscoveryClientPingResponseSerializer;
import
org.apache.ignite.internal.codegen.TcpDiscoveryConnectionCheckMessageSerializer;
@@ -29,6 +30,7 @@ import
org.apache.ignite.internal.codegen.TcpDiscoveryDuplicateIdMessageSerializ
import
org.apache.ignite.internal.codegen.TcpDiscoveryHandshakeRequestSerializer;
import
org.apache.ignite.internal.codegen.TcpDiscoveryHandshakeResponseSerializer;
import
org.apache.ignite.internal.codegen.TcpDiscoveryLoopbackProblemMessageSerializer;
+import
org.apache.ignite.internal.codegen.TcpDiscoveryNodeMetricsMessageSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryPingRequestSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryPingResponseSerializer;
import
org.apache.ignite.internal.codegen.TcpDiscoveryRingLatencyCheckMessageSerializer;
@@ -38,6 +40,7 @@ import
org.apache.ignite.spi.discovery.tcp.messages.InetAddressMessage;
import org.apache.ignite.spi.discovery.tcp.messages.InetSocketAddressMessage;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAuthFailedMessage;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCheckFailedMessage;
+import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientMetricsUpdateMessage;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingRequest;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingResponse;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryConnectionCheckMessage;
@@ -46,6 +49,7 @@ import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryDuplicateIdMessa
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeRequest;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeResponse;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryLoopbackProblemMessage;
+import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeMetricsMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingRequest;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryRingLatencyCheckMessage;
@@ -54,6 +58,7 @@ import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryRingLatencyCheck
public class DiscoveryMessageFactory implements MessageFactoryProvider {
/** {@inheritDoc} */
@Override public void registerAll(MessageFactory factory) {
+ factory.register((short)-102, TcpDiscoveryNodeMetricsMessage::new, new
TcpDiscoveryNodeMetricsMessageSerializer());
factory.register((short)-101, InetSocketAddressMessage::new, new
InetSocketAddressMessageSerializer());
factory.register((short)-100, InetAddressMessage::new, new
InetAddressMessageSerializer());
@@ -70,5 +75,6 @@ public class DiscoveryMessageFactory implements
MessageFactoryProvider {
factory.register((short)10, TcpDiscoveryHandshakeResponse::new, new
TcpDiscoveryHandshakeResponseSerializer());
factory.register((short)11, TcpDiscoveryAuthFailedMessage::new, new
TcpDiscoveryAuthFailedMessageSerializer());
factory.register((short)12, TcpDiscoveryDuplicateIdMessage::new, new
TcpDiscoveryDuplicateIdMessageSerializer());
+ factory.register((short)13,
TcpDiscoveryClientMetricsUpdateMessage::new, new
TcpDiscoveryClientMetricsUpdateMessageSerializer());
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/NodeMetricsMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/NodeMetricsMessage.java
index f11ea1d48c0..55b51cb041d 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/NodeMetricsMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/NodeMetricsMessage.java
@@ -137,20 +137,20 @@ public class NodeMetricsMessage implements Message {
private long curIdleTime = -1;
/** */
- @Order(value = 25, method = "totalCpus")
- private int availProcs = -1;
+ @Order(value = 25)
+ private int totalCpus = -1;
/** */
@Order(value = 26, method = "currentCpuLoad")
- private double load = -1;
+ private double curCpuLoad = -1;
/** */
@Order(value = 27, method = "averageCpuLoad")
- private double avgLoad = -1;
+ private double avgCpuLoad = -1;
/** */
@Order(value = 28, method = "currentGcCpuLoad")
- private double gcLoad = -1;
+ private double curGcCpuLoad = -1;
/** */
@Order(value = 29, method = "heapMemoryInitialized")
@@ -173,15 +173,15 @@ public class NodeMetricsMessage implements Message {
private long heapTotal = -1;
/** */
- @Order(value = 34, method = "heapMemoryInitialized")
+ @Order(value = 34, method = "nonHeapMemoryInitialized")
private long nonHeapInit = -1;
/** */
- @Order(value = 35, method = "heapMemoryUsed")
+ @Order(value = 35, method = "nonHeapMemoryUsed")
private long nonHeapUsed = -1;
/** */
- @Order(value = 36, method = "heapMemoryCommitted")
+ @Order(value = 36, method = "nonHeapMemoryCommitted")
private long nonHeapCommitted = -1;
/** */
@@ -253,8 +253,8 @@ public class NodeMetricsMessage implements Message {
private long totalJobsExecTime = -1;
/** */
- @Order(value = 54)
- private long currentPmeDuration = -1;
+ @Order(value = 54, method = "currentPmeDuration")
+ private long curPmeDuration = -1;
/** */
public NodeMetricsMessage() {
@@ -295,10 +295,10 @@ public class NodeMetricsMessage implements Message {
totalExecTasks = 0;
totalIdleTime = 0;
curIdleTime = 0;
- availProcs = 0;
- load = 0;
- avgLoad = 0;
- gcLoad = 0;
+ totalCpus = 0;
+ curCpuLoad = 0;
+ avgCpuLoad = 0;
+ curGcCpuLoad = 0;
heapInit = 0;
heapUsed = 0;
heapCommitted = 0;
@@ -323,7 +323,7 @@ public class NodeMetricsMessage implements Message {
outMesQueueSize = 0;
heapTotal = 0;
totalNodes = nodes.size();
- currentPmeDuration = 0;
+ curPmeDuration = 0;
for (ClusterNode node : nodes) {
ClusterMetrics m = node.metrics();
@@ -399,9 +399,9 @@ public class NodeMetricsMessage implements Message {
rcvdBytesCnt += m.getReceivedBytesCount();
outMesQueueSize += m.getOutboundMessagesQueueSize();
- avgLoad += m.getCurrentCpuLoad();
+ avgCpuLoad += m.getCurrentCpuLoad();
- currentPmeDuration = max(currentPmeDuration,
m.getCurrentPmeDuration());
+ curPmeDuration = max(curPmeDuration, m.getCurrentPmeDuration());
}
curJobExecTime /= size;
@@ -412,7 +412,7 @@ public class NodeMetricsMessage implements Message {
avgWaitingJobs /= size;
avgJobExecTime /= size;
avgJobWaitTime /= size;
- avgLoad /= size;
+ avgCpuLoad /= size;
if (!F.isEmpty(nodes)) {
ClusterMetrics oldestNodeMetrics = oldest(nodes).metrics();
@@ -423,9 +423,9 @@ public class NodeMetricsMessage implements Message {
Map<String, Collection<ClusterNode>> neighborhood =
U.neighborhood(nodes);
- gcLoad = gcCpus(neighborhood);
- load = cpus(neighborhood);
- availProcs = cpuCnt(neighborhood);
+ curGcCpuLoad = currentGcCpuLoad(neighborhood);
+ curCpuLoad = currentCpuLoad(neighborhood);
+ totalCpus = cpuCnt(neighborhood);
}
/** */
@@ -464,10 +464,10 @@ public class NodeMetricsMessage implements Message {
curIdleTime = metrics.getCurrentIdleTime();
totalIdleTime = metrics.getTotalIdleTime();
- availProcs = metrics.getTotalCpus();
- load = metrics.getCurrentCpuLoad();
- avgLoad = metrics.getAverageCpuLoad();
- gcLoad = metrics.getCurrentGcCpuLoad();
+ totalCpus = metrics.getTotalCpus();
+ curCpuLoad = metrics.getCurrentCpuLoad();
+ avgCpuLoad = metrics.getAverageCpuLoad();
+ curGcCpuLoad = metrics.getCurrentGcCpuLoad();
heapInit = metrics.getHeapMemoryInitialized();
heapUsed = metrics.getHeapMemoryUsed();
@@ -487,7 +487,7 @@ public class NodeMetricsMessage implements Message {
lastDataVer = metrics.getLastDataVersion();
- currentPmeDuration = metrics.getCurrentPmeDuration();
+ curPmeDuration = metrics.getCurrentPmeDuration();
totalNodes = metrics.getTotalNodes();
@@ -885,22 +885,22 @@ public class NodeMetricsMessage implements Message {
/** */
public int totalCpus() {
- return availProcs;
+ return totalCpus;
}
/** */
public double currentCpuLoad() {
- return load;
+ return curCpuLoad;
}
/** */
public double averageCpuLoad() {
- return avgLoad;
+ return avgCpuLoad;
}
/** */
public double currentGcCpuLoad() {
- return gcLoad;
+ return curGcCpuLoad;
}
/** */
@@ -1020,43 +1020,43 @@ public class NodeMetricsMessage implements Message {
/** */
public long currentPmeDuration() {
- return currentPmeDuration;
+ return curPmeDuration;
}
/**
* Sets available processors.
*
- * @param availProcs Available processors.
+ * @param totalCpus Available processors.
*/
- public void totalCpus(int availProcs) {
- this.availProcs = availProcs;
+ public void totalCpus(int totalCpus) {
+ this.totalCpus = totalCpus;
}
/**
* Sets current CPU load.
*
- * @param load Current CPU load.
+ * @param curCpuLoad Current CPU load.
*/
- public void currentCpuLoad(double load) {
- this.load = load;
+ public void currentCpuLoad(double curCpuLoad) {
+ this.curCpuLoad = curCpuLoad;
}
/**
* Sets CPU load average over the metrics history.
*
- * @param avgLoad CPU load average.
+ * @param avgCpuLoad CPU load average.
*/
- public void averageCpuLoad(double avgLoad) {
- this.avgLoad = avgLoad;
+ public void averageCpuLoad(double avgCpuLoad) {
+ this.avgCpuLoad = avgCpuLoad;
}
/**
* Sets current GC load.
*
- * @param gcLoad Current GC load.
+ * @param curGcCpuLoad Current GC load.
*/
- public void currentGcCpuLoad(double gcLoad) {
- this.gcLoad = gcLoad;
+ public void currentGcCpuLoad(double curGcCpuLoad) {
+ this.curGcCpuLoad = curGcCpuLoad;
}
/**
@@ -1263,7 +1263,7 @@ public class NodeMetricsMessage implements Message {
* @param curPmeDuration Execution duration for current partition map
exchange.
*/
public void currentPmeDuration(long curPmeDuration) {
- this.currentPmeDuration = curPmeDuration;
+ this.curPmeDuration = curPmeDuration;
}
/**
@@ -1308,36 +1308,36 @@ public class NodeMetricsMessage implements Message {
* @param neighborhood Cluster neighborhood.
* @return CPU load.
*/
- private static int cpus(Map<String, Collection<ClusterNode>> neighborhood)
{
- int cpus = 0;
+ private static double currentCpuLoad(Map<String, Collection<ClusterNode>>
neighborhood) {
+ double curCpuLoad = 0.0;
for (Collection<ClusterNode> nodes : neighborhood.values()) {
ClusterNode first = F.first(nodes);
// Projection can be empty if all nodes in it failed.
if (first != null)
- cpus += first.metrics().getCurrentCpuLoad();
+ curCpuLoad += first.metrics().getCurrentCpuLoad();
}
- return cpus;
+ return curCpuLoad;
}
/**
* @param neighborhood Cluster neighborhood.
* @return GC CPU load.
*/
- private static int gcCpus(Map<String, Collection<ClusterNode>>
neighborhood) {
- int cpus = 0;
+ private static double currentGcCpuLoad(Map<String,
Collection<ClusterNode>> neighborhood) {
+ double curGcCpuLoad = 0;
for (Collection<ClusterNode> nodes : neighborhood.values()) {
ClusterNode first = F.first(nodes);
// Projection can be empty if all nodes in it failed.
if (first != null)
- cpus += first.metrics().getCurrentGcCpuLoad();
+ curGcCpuLoad += first.metrics().getCurrentGcCpuLoad();
}
- return cpus;
+ return curGcCpuLoad;
}
/** {@inheritDoc} */
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index aef67a57809..44ca6f0093f 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -72,6 +72,7 @@ import org.apache.ignite.cluster.ClusterMetrics;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.NodeValidationFailedEvent;
import org.apache.ignite.failure.FailureContext;
+import org.apache.ignite.internal.ClusterMetricsSnapshot;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
@@ -7580,7 +7581,7 @@ class ServerImpl extends TcpDiscoveryImpl {
ClientMessageWorker wrk =
clientMsgWorkers.get(msg.creatorNodeId());
if (wrk != null)
- wrk.metrics(msg.metrics());
+ wrk.metrics(new ClusterMetricsSnapshot(msg.metricsMessage()));
else if (log.isDebugEnabled())
log.debug("Received client metrics update message from unknown
client node: " + msg);
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java
index 5c1946f0eac..edcab473b4c 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java
@@ -170,13 +170,12 @@ public class TcpDiscoveryIoSession {
if (MESSAGE_SERIALIZATION != serMode) {
detectSslAlert(serMode, in);
- throw new IgniteCheckedException("Received unexpected byte
while reading discovery message: " + serMode);
+ // IOException type is important for ServerImpl. It may search
the cause (X.hasCause).
+ // The connection error processing behavior depends on it.
+ throw new IOException("Received unexpected byte while reading
discovery message: " + serMode);
}
- byte b0 = (byte)in.read();
- byte b1 = (byte)in.read();
-
- Message msg = spi.messageFactory().create(makeMessageType(b0, b1));
+ Message msg =
spi.messageFactory().create(makeMessageType((byte)in.read(), (byte)in.read()));
msgReader.reset();
msgReader.setBuffer(msgBuf);
@@ -185,19 +184,47 @@ public class TcpDiscoveryIoSession {
boolean finished;
- do {
- // Should be cleared before first operation.
- msgBuf.clear();
+ msgBuf.clear();
- int read = in.read(msgBuf.array(), 0, msgBuf.limit());
+ do {
+ int read = in.read(msgBuf.array(), msgBuf.position(),
msgBuf.remaining());
if (read == -1)
throw new EOFException("Connection closed before message
was fully read.");
- msgBuf.limit(read);
+ if (msgBuf.position() > 0) {
+ msgBuf.limit(msgBuf.position() + read);
+
+ // We've stored an unprocessed tail before.
+ msgBuf.rewind();
+ }
+ else
+ msgBuf.limit(read);
finished = msgSer.readFrom(msg, msgReader);
- } while (!finished);
+
+ // We rely on the fact that Discovery only sends next message
upon receiving a receipt for the previous one.
+ // This behaviour guarantees that we never read a next message
from the buffer right after the end of
+ // the previous message.
+ assert msgBuf.remaining() == 0 || !finished : "Some data was
read from the socket but left unprocessed.";
+
+ if (finished)
+ break;
+
+ // We must keep the uprocessed bytes read from the socket. It
won't return them again.
+ byte[] unprocessedTail = null;
+
+ if (msgBuf.remaining() > 0) {
+ unprocessedTail = new byte[msgBuf.remaining()];
+ msgBuf.get(unprocessedTail, 0, msgBuf.remaining());
+ }
+
+ msgBuf.clear();
+
+ if (unprocessedTail != null)
+ msgBuf.put(unprocessedTail);
+ }
+ while (true);
return (T)msg;
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientMetricsUpdateMessage.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientMetricsUpdateMessage.java
index 8092ef3a725..f110c40e8cb 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientMetricsUpdateMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientMetricsUpdateMessage.java
@@ -19,20 +19,28 @@ package org.apache.ignite.spi.discovery.tcp.messages;
import java.util.UUID;
import org.apache.ignite.cluster.ClusterMetrics;
-import org.apache.ignite.internal.ClusterMetricsSnapshot;
+import org.apache.ignite.internal.Order;
+import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
/**
* Metrics update message.
* <p>
* Client sends his metrics in this message.
*/
-public class TcpDiscoveryClientMetricsUpdateMessage extends
TcpDiscoveryAbstractMessage {
+public class TcpDiscoveryClientMetricsUpdateMessage extends
TcpDiscoveryAbstractMessage implements Message {
/** */
private static final long serialVersionUID = 0L;
/** */
- private final byte[] metrics;
+ @Order(value = 5, method = "metricsMessage")
+ private TcpDiscoveryNodeMetricsMessage metricsMsg;
+
+ /** Constructor for {@link DiscoveryMessageFactory}. */
+ public TcpDiscoveryClientMetricsUpdateMessage() {
+ // No-op.
+ }
/**
* Constructor.
@@ -43,16 +51,30 @@ public class TcpDiscoveryClientMetricsUpdateMessage extends
TcpDiscoveryAbstract
public TcpDiscoveryClientMetricsUpdateMessage(UUID creatorNodeId,
ClusterMetrics metrics) {
super(creatorNodeId);
- this.metrics = ClusterMetricsSnapshot.serialize(metrics);
+ metricsMsg = new TcpDiscoveryNodeMetricsMessage(metrics);
+ }
+
+ /**
+ * Gets the metrics message.
+ *
+ * @return Metrics holder message.
+ */
+ public TcpDiscoveryNodeMetricsMessage metricsMessage() {
+ return metricsMsg;
}
/**
- * Gets metrics map.
+ * Sets the metrics message.
*
- * @return Metrics map.
+ * @param metricsMsg Metrics holder message.
*/
- public ClusterMetrics metrics() {
- return ClusterMetricsSnapshot.deserialize(metrics, 0);
+ public void metricsMessage(TcpDiscoveryNodeMetricsMessage metricsMsg) {
+ this.metricsMsg = metricsMsg;
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return 13;
}
/** {@inheritDoc} */
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientMetricsUpdateMessage.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeMetricsMessage.java
similarity index 51%
copy from
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientMetricsUpdateMessage.java
copy to
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeMetricsMessage.java
index 8092ef3a725..0c12acee04a 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientMetricsUpdateMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeMetricsMessage.java
@@ -17,51 +17,34 @@
package org.apache.ignite.spi.discovery.tcp.messages;
-import java.util.UUID;
import org.apache.ignite.cluster.ClusterMetrics;
-import org.apache.ignite.internal.ClusterMetricsSnapshot;
+import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory;
+import org.apache.ignite.internal.processors.cluster.NodeMetricsMessage;
import org.apache.ignite.internal.util.typedef.internal.S;
/**
- * Metrics update message.
- * <p>
- * Client sends his metrics in this message.
+ * We cannot directly reuse `NodeMetricsMessage` in Discovery as it is
registered in a message factory of Communication
+ * component and thus is unavailable in Discovery. We have to extend
`NodeMetricsMessage` and register this subclass in
+ * message factory of Discovery component.
*/
-public class TcpDiscoveryClientMetricsUpdateMessage extends
TcpDiscoveryAbstractMessage {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- private final byte[] metrics;
-
- /**
- * Constructor.
- *
- * @param creatorNodeId Creator node.
- * @param metrics Metrics.
- */
- public TcpDiscoveryClientMetricsUpdateMessage(UUID creatorNodeId,
ClusterMetrics metrics) {
- super(creatorNodeId);
-
- this.metrics = ClusterMetricsSnapshot.serialize(metrics);
+public class TcpDiscoveryNodeMetricsMessage extends NodeMetricsMessage {
+ /** Constructor for {@link DiscoveryMessageFactory}. */
+ public TcpDiscoveryNodeMetricsMessage() {
+ // No-op.
}
- /**
- * Gets metrics map.
- *
- * @return Metrics map.
- */
- public ClusterMetrics metrics() {
- return ClusterMetricsSnapshot.deserialize(metrics, 0);
+ /** @param metrics Metrics. */
+ public TcpDiscoveryNodeMetricsMessage(ClusterMetrics metrics) {
+ super(metrics);
}
/** {@inheritDoc} */
- @Override public boolean traceLogLevel() {
- return true;
+ @Override public short directType() {
+ return -102;
}
/** {@inheritDoc} */
@Override public String toString() {
- return S.toString(TcpDiscoveryClientMetricsUpdateMessage.class, this,
"super", super.toString());
+ return S.toString(TcpDiscoveryNodeMetricsMessage.class, this, "super",
super.toString());
}
}