This is an automated email from the ASF dual-hosted git repository.
sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 38188bb3e78 IGNITE-27542 : Use MessageSerializer for
TcpDiscoveryMetricsUpdateMessage (#12620)
38188bb3e78 is described below
commit 38188bb3e7867c695c58f27f9d00a3af81ba78cc
Author: Vladimir Steshin <[email protected]>
AuthorDate: Wed Jan 21 12:13:01 2026 +0300
IGNITE-27542 : Use MessageSerializer for TcpDiscoveryMetricsUpdateMessage
(#12620)
---
.../discovery/DiscoveryMessageFactory.java | 13 +
.../processors/cluster/ClusterNodeMetrics.java | 6 +-
.../processors/cluster/NodeFullMetricsMessage.java | 28 +-
.../ignite/spi/discovery/tcp/ClientImpl.java | 4 +-
.../ignite/spi/discovery/tcp/ServerImpl.java | 46 +--
.../ignite/spi/discovery/tcp/TcpDiscoveryImpl.java | 48 ++-
.../spi/discovery/tcp/TcpDiscoveryIoSession.java | 188 ++++++++++--
...e.java => TcpDiscoveryCacheMetricsMessage.java} | 24 +-
... => TcpDiscoveryClientNodesMetricsMessage.java} | 35 ++-
.../messages/TcpDiscoveryMetricsUpdateMessage.java | 322 ++++++++-------------
...ava => TcpDiscoveryNodeFullMetricsMessage.java} | 22 +-
.../messages/TcpDiscoveryNodeMetricsMessage.java | 12 +-
.../cache/CacheMetricsCacheSizeTest.java | 55 +++-
13 files changed, 451 insertions(+), 352 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
index 5f399ed0c46..a1c3c9d9f76 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
@@ -20,8 +20,10 @@ package org.apache.ignite.internal.managers.discovery;
import org.apache.ignite.internal.codegen.InetAddressMessageSerializer;
import org.apache.ignite.internal.codegen.InetSocketAddressMessageSerializer;
import
org.apache.ignite.internal.codegen.TcpDiscoveryAuthFailedMessageSerializer;
+import
org.apache.ignite.internal.codegen.TcpDiscoveryCacheMetricsMessageSerializer;
import
org.apache.ignite.internal.codegen.TcpDiscoveryCheckFailedMessageSerializer;
import
org.apache.ignite.internal.codegen.TcpDiscoveryClientMetricsUpdateMessageSerializer;
+import
org.apache.ignite.internal.codegen.TcpDiscoveryClientNodesMetricsMessageSerializer;
import
org.apache.ignite.internal.codegen.TcpDiscoveryClientPingRequestSerializer;
import
org.apache.ignite.internal.codegen.TcpDiscoveryClientPingResponseSerializer;
import
org.apache.ignite.internal.codegen.TcpDiscoveryConnectionCheckMessageSerializer;
@@ -30,6 +32,8 @@ import
org.apache.ignite.internal.codegen.TcpDiscoveryDuplicateIdMessageSerializ
import
org.apache.ignite.internal.codegen.TcpDiscoveryHandshakeRequestSerializer;
import
org.apache.ignite.internal.codegen.TcpDiscoveryHandshakeResponseSerializer;
import
org.apache.ignite.internal.codegen.TcpDiscoveryLoopbackProblemMessageSerializer;
+import
org.apache.ignite.internal.codegen.TcpDiscoveryMetricsUpdateMessageSerializer;
+import
org.apache.ignite.internal.codegen.TcpDiscoveryNodeFullMetricsMessageSerializer;
import
org.apache.ignite.internal.codegen.TcpDiscoveryNodeMetricsMessageSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryPingRequestSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryPingResponseSerializer;
@@ -39,8 +43,10 @@ import
org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
import org.apache.ignite.spi.discovery.tcp.messages.InetAddressMessage;
import org.apache.ignite.spi.discovery.tcp.messages.InetSocketAddressMessage;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAuthFailedMessage;
+import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCacheMetricsMessage;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCheckFailedMessage;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientMetricsUpdateMessage;
+import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientNodesMetricsMessage;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingRequest;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingResponse;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryConnectionCheckMessage;
@@ -49,6 +55,8 @@ import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryDuplicateIdMessa
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeRequest;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeResponse;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryLoopbackProblemMessage;
+import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryMetricsUpdateMessage;
+import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFullMetricsMessage;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeMetricsMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingRequest;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse;
@@ -58,6 +66,10 @@ import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryRingLatencyCheck
public class DiscoveryMessageFactory implements MessageFactoryProvider {
/** {@inheritDoc} */
@Override public void registerAll(MessageFactory factory) {
+ factory.register((short)-105, TcpDiscoveryNodeFullMetricsMessage::new,
+ new TcpDiscoveryNodeFullMetricsMessageSerializer());
+ factory.register((short)-104,
TcpDiscoveryClientNodesMetricsMessage::new, new
TcpDiscoveryClientNodesMetricsMessageSerializer());
+ factory.register((short)-103, TcpDiscoveryCacheMetricsMessage::new,
new TcpDiscoveryCacheMetricsMessageSerializer());
factory.register((short)-102, TcpDiscoveryNodeMetricsMessage::new, new
TcpDiscoveryNodeMetricsMessageSerializer());
factory.register((short)-101, InetSocketAddressMessage::new, new
InetSocketAddressMessageSerializer());
factory.register((short)-100, InetAddressMessage::new, new
InetAddressMessageSerializer());
@@ -76,5 +88,6 @@ public class DiscoveryMessageFactory implements
MessageFactoryProvider {
factory.register((short)11, TcpDiscoveryAuthFailedMessage::new, new
TcpDiscoveryAuthFailedMessageSerializer());
factory.register((short)12, TcpDiscoveryDuplicateIdMessage::new, new
TcpDiscoveryDuplicateIdMessageSerializer());
factory.register((short)13,
TcpDiscoveryClientMetricsUpdateMessage::new, new
TcpDiscoveryClientMetricsUpdateMessageSerializer());
+ factory.register((short)14, TcpDiscoveryMetricsUpdateMessage::new, new
TcpDiscoveryMetricsUpdateMessageSerializer());
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterNodeMetrics.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterNodeMetrics.java
index a1ac2dc3c8c..7bba244fc7f 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterNodeMetrics.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterNodeMetrics.java
@@ -46,11 +46,11 @@ class ClusterNodeMetrics {
/** */
public ClusterNodeMetrics(NodeFullMetricsMessage msg) {
- nodeMetrics = new ClusterMetricsSnapshot(msg.nodeMetricsMsg());
+ nodeMetrics = new ClusterMetricsSnapshot(msg.nodeMetricsMessage());
- cacheMetrics = new HashMap<>(msg.cachesMetrics().size(), 1.0f);
+ cacheMetrics = new HashMap<>(msg.cachesMetricsMessages().size(), 1.0f);
- msg.cachesMetrics().entrySet().forEach(e ->
cacheMetrics.put(e.getKey(), new CacheMetricsSnapshot(e.getValue())));
+ msg.cachesMetricsMessages().forEach((key, value) ->
cacheMetrics.put(key, new CacheMetricsSnapshot(value)));
}
/**
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/NodeFullMetricsMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/NodeFullMetricsMessage.java
index 2974ab93351..0ca3ee6800f 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/NodeFullMetricsMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/NodeFullMetricsMessage.java
@@ -17,59 +17,59 @@
package org.apache.ignite.internal.processors.cluster;
-import java.util.HashMap;
import java.util.Map;
import org.apache.ignite.cache.CacheMetrics;
import org.apache.ignite.cluster.ClusterMetrics;
import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.plugin.extensions.communication.Message;
/** Node compound metrics message. */
-public final class NodeFullMetricsMessage implements Message {
+public class NodeFullMetricsMessage implements Message {
/** */
public static final short TYPE_CODE = 138;
/** Node metrics wrapper message. */
- @Order(0)
+ @Order(value = 0, method = "nodeMetricsMessage")
private NodeMetricsMessage nodeMetricsMsg;
/** Cache metrics wrapper message. */
- @Order(1)
- private Map<Integer, CacheMetricsMessage> cachesMetrics;
+ @Order(value = 1, method = "cachesMetricsMessages")
+ private Map<Integer, CacheMetricsMessage> cachesMetricsMsgs;
/** Empty constructor for {@link GridIoMessageFactory}. */
public NodeFullMetricsMessage() {
-
+ // No-op.
}
/** */
public NodeFullMetricsMessage(ClusterMetrics nodeMetrics, Map<Integer,
CacheMetrics> cacheMetrics) {
nodeMetricsMsg = new NodeMetricsMessage(nodeMetrics);
- cachesMetrics = new HashMap<>(cacheMetrics.size(), 1.0f);
+ cachesMetricsMsgs = U.newHashMap(cacheMetrics.size());
- cacheMetrics.forEach((key, value) -> cachesMetrics.put(key, new
CacheMetricsMessage(value)));
+ cacheMetrics.forEach((key, value) -> cachesMetricsMsgs.put(key, new
CacheMetricsMessage(value)));
}
/** */
- public Map<Integer, CacheMetricsMessage> cachesMetrics() {
- return cachesMetrics;
+ public Map<Integer, CacheMetricsMessage> cachesMetricsMessages() {
+ return cachesMetricsMsgs;
}
/** */
- public void cachesMetrics(Map<Integer, CacheMetricsMessage>
cacheMetricsMsg) {
- cachesMetrics = cacheMetricsMsg;
+ public void cachesMetricsMessages(Map<Integer, CacheMetricsMessage>
cacheMetricsMsg) {
+ cachesMetricsMsgs = cacheMetricsMsg;
}
/** */
- public NodeMetricsMessage nodeMetricsMsg() {
+ public NodeMetricsMessage nodeMetricsMessage() {
return nodeMetricsMsg;
}
/** */
- public void nodeMetricsMsg(NodeMetricsMessage nodeMetricsMsg) {
+ public void nodeMetricsMessage(NodeMetricsMessage nodeMetricsMsg) {
this.nodeMetricsMsg = nodeMetricsMsg;
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 5ecbe893fb8..238fdd13eab 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -2526,8 +2526,8 @@ class ClientImpl extends TcpDiscoveryImpl {
log.debug("Received metrics response: " + msg);
}
else {
- if (msg.hasMetrics())
- processMsgCacheMetrics(msg, System.nanoTime());
+ if (!F.isEmpty(msg.serversFullMetricsMessages()))
+ processCacheMetricsMessage(msg, System.nanoTime());
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index a8316d554a9..ea8dd5f5c2d 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -2220,15 +2220,6 @@ class ServerImpl extends TcpDiscoveryImpl {
!(msg instanceof TcpDiscoveryConnectionCheckMessage);
}
- /**
- * @param msg Message.
- * @param nodeId Node ID.
- */
- private static void removeMetrics(TcpDiscoveryMetricsUpdateMessage msg,
UUID nodeId) {
- msg.removeMetrics(nodeId);
- msg.removeCacheMetrics(nodeId);
- }
-
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(ServerImpl.class, this);
@@ -3399,10 +3390,11 @@ class ServerImpl extends TcpDiscoveryImpl {
for (ClientMessageWorker clientMsgWorker :
clientMsgWorkers.values()) {
if (msgBytes == null) {
try {
- msgBytes = U.marshal(spi.marshaller(), msg);
+ msgBytes =
clientMsgWorker.ses.serializeMessage(msg);
}
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to marshal message: " + msg,
e);
+ catch (IgniteCheckedException | IOException e) {
+ U.error(log, "Failed to serialize message to a
client: " + msg + ", recepient " +
+ "client id: " + clientMsgWorker.clientNodeId,
e);
break;
}
@@ -3418,6 +3410,7 @@ class ServerImpl extends TcpDiscoveryImpl {
if (clientMsgWorker.clientNodeId.equals(node.id())) {
try {
+ // TODO:
https://issues.apache.org/jira/browse/IGNITE-27556 refactor serialization.
msg0 = U.unmarshal(spi.marshaller(), msgBytes,
U.resolveClassLoader(spi.ignite().configuration()));
@@ -6052,30 +6045,32 @@ class ServerImpl extends TcpDiscoveryImpl {
long tsNanos = System.nanoTime();
- if (spiStateCopy() == CONNECTED && msg.hasMetrics())
- processMsgCacheMetrics(msg, tsNanos);
+ if (spiStateCopy() == CONNECTED &&
!F.isEmpty(msg.serversFullMetricsMessages()))
+ processCacheMetricsMessage(msg, tsNanos);
if (sendMessageToRemotes(msg)) {
if (laps == 0 && spiStateCopy() == CONNECTED) {
// Message is on its first ring or just created on
coordinator.
- msg.setMetrics(locNodeId, spi.metricsProvider.metrics());
- msg.setCacheMetrics(locNodeId,
spi.metricsProvider.cacheMetrics());
+ msg.addServerMetrics(locNodeId,
spi.metricsProvider.metrics());
+ msg.addServerCacheMetrics(locNodeId,
spi.metricsProvider.cacheMetrics());
for (Map.Entry<UUID, ClientMessageWorker> e :
clientMsgWorkers.entrySet()) {
UUID nodeId = e.getKey();
ClusterMetrics metrics = e.getValue().metrics();
if (metrics != null)
- msg.setClientMetrics(locNodeId, nodeId, metrics);
+ msg.addClientMetrics(locNodeId, nodeId, metrics);
msg.addClientNodeId(nodeId);
}
}
else {
// Message is on its second ring.
- removeMetrics(msg, locNodeId);
+ msg.removeServerMetrics(locNodeId);
- Collection<UUID> clientNodeIds = msg.clientNodeIds();
+ Collection<UUID> clientNodeIds =
F.isEmpty(msg.clientNodeIds())
+ ? Collections.emptySet()
+ : msg.clientNodeIds();
for (TcpDiscoveryNode clientNode : ring.clientNodes()) {
if (clientNode.visible()) {
@@ -8436,7 +8431,8 @@ class ServerImpl extends TcpDiscoveryImpl {
private int passedLaps(TcpDiscoveryMetricsUpdateMessage msg) {
UUID locNodeId = getLocalNodeId();
- boolean hasLocMetrics = hasMetrics(msg, locNodeId);
+ boolean hasLocMetrics =
!F.isEmpty(msg.serversFullMetricsMessages())
+ && msg.serversFullMetricsMessages().get(locNodeId) != null;
if (locNodeId.equals(msg.creatorNodeId()) && !hasLocMetrics &&
msg.senderNodeId() != null)
return 2;
@@ -8445,15 +8441,5 @@ class ServerImpl extends TcpDiscoveryImpl {
else
return 1;
}
-
- /**
- * @param msg Metrics update message to check.
- * @param nodeId Node ID for which the check should be performed.
- * @return {@code True} is the message contains metrics of the node
with the provided ID.
- * {@code False} otherwise.
- */
- private boolean hasMetrics(TcpDiscoveryMetricsUpdateMessage msg, UUID
nodeId) {
- return msg.hasMetrics(nodeId) || msg.hasCacheMetrics(nodeId);
- }
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
index f35f4114795..e115a3cca03 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
@@ -35,10 +35,14 @@ import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheMetrics;
import org.apache.ignite.cluster.ClusterMetrics;
import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.ClusterMetricsSnapshot;
import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.CacheMetricsSnapshot;
+import org.apache.ignite.internal.processors.cluster.CacheMetricsMessage;
+import org.apache.ignite.internal.processors.cluster.NodeMetricsMessage;
import org.apache.ignite.internal.processors.tracing.NoopTracing;
import org.apache.ignite.internal.processors.tracing.Tracing;
-import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -48,7 +52,9 @@ import org.apache.ignite.spi.IgniteSpiThread;
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientNodesMetricsMessage;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryMetricsUpdateMessage;
+import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFullMetricsMessage;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryRingLatencyCheckMessage;
import org.jetbrains.annotations.Nullable;
@@ -415,27 +421,45 @@ abstract class TcpDiscoveryImpl {
}
/** */
- public void processMsgCacheMetrics(TcpDiscoveryMetricsUpdateMessage msg,
long tsNanos) {
- for (Map.Entry<UUID, TcpDiscoveryMetricsUpdateMessage.MetricsSet> e :
msg.metrics().entrySet()) {
- UUID nodeId = e.getKey();
+ public void processCacheMetricsMessage(TcpDiscoveryMetricsUpdateMessage
msg, long tsNanos) {
+ for (Map.Entry<UUID, TcpDiscoveryNodeFullMetricsMessage> e :
msg.serversFullMetricsMessages().entrySet()) {
+ UUID srvrId = e.getKey();
+ Map<Integer, CacheMetricsMessage> cacheMetricsMsgs =
e.getValue().cachesMetricsMessages();
+ NodeMetricsMessage srvrMetricsMsg =
e.getValue().nodeMetricsMessage();
- TcpDiscoveryMetricsUpdateMessage.MetricsSet metricsSet =
e.getValue();
+ assert srvrMetricsMsg != null;
- Map<Integer, CacheMetrics> cacheMetrics =
msg.hasCacheMetrics(nodeId) ?
- msg.cacheMetrics().get(nodeId) : Collections.emptyMap();
+ Map<Integer, CacheMetrics> cacheMetrics;
- if (endTimeMetricsSizeProcessWait <= U.currentTimeMillis()
- && cacheMetrics.size() >= METRICS_QNT_WARN) {
+ if (!F.isEmpty(cacheMetricsMsgs)) {
+ cacheMetrics = U.newHashMap(cacheMetricsMsgs.size());
+
+ cacheMetricsMsgs.forEach((cacheId, cacheMetricsMsg) ->
+ cacheMetrics.put(cacheId, new
CacheMetricsSnapshot(cacheMetricsMsg)));
+ }
+ else
+ cacheMetrics = Collections.emptyMap();
+
+ if (endTimeMetricsSizeProcessWait <= U.currentTimeMillis() &&
cacheMetrics.size() >= METRICS_QNT_WARN) {
log.warning("The Discovery message has metrics for " +
cacheMetrics.size() + " caches.\n" +
"To prevent Discovery blocking use
-DIGNITE_DISCOVERY_DISABLE_CACHE_METRICS_UPDATE=true option.");
endTimeMetricsSizeProcessWait = U.currentTimeMillis() +
LOG_WARN_MSG_TIMEOUT;
}
- updateMetrics(nodeId, metricsSet.metrics(), cacheMetrics, tsNanos);
+ updateMetrics(srvrId, new ClusterMetricsSnapshot(srvrMetricsMsg),
cacheMetrics, tsNanos);
+
+ TcpDiscoveryClientNodesMetricsMessage clientsMetricsMsg =
F.isEmpty(msg.connectedClientsMetricsMessages())
+ ? null
+ : msg.connectedClientsMetricsMessages().get(srvrId);
+
+ if (clientsMetricsMsg == null)
+ continue;
+
+ assert clientsMetricsMsg.nodesMetricsMessages() != null;
- for (T2<UUID, ClusterMetrics> t : metricsSet.clientMetrics())
- updateMetrics(t.get1(), t.get2(), cacheMetrics, tsNanos);
+ clientsMetricsMsg.nodesMetricsMessages().forEach((clientId,
clientNodeMetricsMsg) ->
+ updateMetrics(clientId, new
ClusterMetricsSnapshot(clientNodeMetricsMsg), cacheMetrics, tsNanos));
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java
index edcab473b4c..fa8d71e40f1 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java
@@ -19,6 +19,7 @@ package org.apache.ignite.spi.discovery.tcp;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.IOException;
@@ -39,6 +40,7 @@ import org.apache.ignite.marshaller.jdk.JdkMarshaller;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageSerializer;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import static
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.makeMessageType;
@@ -87,7 +89,7 @@ public class TcpDiscoveryIoSession {
private final OutputStream out;
/** Buffered socket input stream. */
- private final InputStream in;
+ private final CompositeInputStream in;
/** Intermediate buffer for serializing discovery messages. */
private final ByteBuffer msgBuf;
@@ -115,7 +117,7 @@ public class TcpDiscoveryIoSession {
int rcvBufSize = sock.getReceiveBufferSize() > 0 ?
sock.getReceiveBufferSize() : DFLT_SOCK_BUFFER_SIZE;
out = new BufferedOutputStream(sock.getOutputStream(),
sendBufSize);
- in = new BufferedInputStream(sock.getInputStream(), rcvBufSize);
+ in = new CompositeInputStream(new
BufferedInputStream(sock.getInputStream(), rcvBufSize));
}
catch (IOException e) {
throw new IgniteException(e);
@@ -184,47 +186,31 @@ public class TcpDiscoveryIoSession {
boolean finished;
- msgBuf.clear();
-
do {
+ msgBuf.clear();
+
int read = in.read(msgBuf.array(), msgBuf.position(),
msgBuf.remaining());
if (read == -1)
throw new EOFException("Connection closed before message
was fully read.");
- if (msgBuf.position() > 0) {
- msgBuf.limit(msgBuf.position() + read);
-
- // We've stored an unprocessed tail before.
- msgBuf.rewind();
- }
- else
- msgBuf.limit(read);
+ msgBuf.limit(read);
finished = msgSer.readFrom(msg, msgReader);
- // We rely on the fact that Discovery only sends next message
upon receiving a receipt for the previous one.
+ // Server Discovery only sends next message to next Server
upon receiving a receipt for the previous one.
// This behaviour guarantees that we never read a next message
from the buffer right after the end of
- // the previous message.
- assert msgBuf.remaining() == 0 || !finished : "Some data was
read from the socket but left unprocessed.";
-
- if (finished)
- break;
+ // the previous message. But it is not guaranteed with Client
Discovery where messages aren't acknowledged.
+ // Thus, we have to keep the uprocessed bytes read from the
socket. It won't return them again.
+ if (msgBuf.hasRemaining()) {
+ byte[] unprocessedReadTail = new byte[msgBuf.remaining()];
- // We must keep the uprocessed bytes read from the socket. It
won't return them again.
- byte[] unprocessedTail = null;
+ msgBuf.get(unprocessedReadTail, 0, msgBuf.remaining());
- if (msgBuf.remaining() > 0) {
- unprocessedTail = new byte[msgBuf.remaining()];
- msgBuf.get(unprocessedTail, 0, msgBuf.remaining());
+ in.attachByteArray(unprocessedReadTail);
}
-
- msgBuf.clear();
-
- if (unprocessedTail != null)
- msgBuf.put(unprocessedTail);
}
- while (true);
+ while (!finished);
return (T)msg;
}
@@ -320,4 +306,148 @@ public class TcpDiscoveryIoSession {
if (hex.matches("15....00"))
throw new StreamCorruptedException("invalid stream header: " +
hex);
}
+
+ /**
+ * Input stream implementation that combines a byte array and a regular
InputStream allowing to read bytes
+ * from the array first and then proceed with reading from InputStream.
+ * Supports only basic read methods.
+ */
+ private static class CompositeInputStream extends BufferedInputStream {
+ /** Prefix data input stream to read before the original input stream.
*/
+ @Nullable private ByteArrayInputStream attachedBytesIs;
+
+ /** @param srcIs Original input stream to read when {@link
#attachedBytesIs} is empty. */
+ private CompositeInputStream(InputStream srcIs) {
+ super(srcIs);
+ }
+
+ /** @param prefixData Prefix data to read before the original input
stream. */
+ private void attachByteArray(byte[] prefixData) {
+ assert prefixBytesLeft() == 0;
+
+ attachedBytesIs = new ByteArrayInputStream(prefixData);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int read() throws IOException {
+ if (prefixBytesLeft() > 0) {
+ int res = attachedBytesIs.read();
+
+ checkPrefixBufferExhausted();
+
+ return res;
+ }
+
+ return super.read();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int read(@NotNull byte[] b, int off, int len) throws
IOException {
+ int len0 = readPrefixBuffer(b, off, len);
+
+ assert len0 <= len;
+
+ if (len0 == len)
+ return len0;
+
+ return len0 + super.read(b, off + len0, len - len0);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int read(@NotNull byte[] b) throws IOException {
+ return read(b, 0, b.length);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int readNBytes(byte[] b, int off, int len) throws
IOException {
+ int len0 = readPrefixBuffer(b, off, len);
+
+ return super.readNBytes(b, off + len0, len - len0);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int available() throws IOException {
+ // Original input stream may return Integer#MAX_VALUE.
+ if (super.available() > Integer.MAX_VALUE - prefixBytesLeft())
+ return super.available();
+
+ return super.available() + prefixBytesLeft();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() throws IOException {
+ if (attachedBytesIs != null) {
+ attachedBytesIs.close();
+
+ attachedBytesIs = null;
+ }
+
+ super.close();
+ }
+
+ /** */
+ private int readPrefixBuffer(byte[] b, int off, int len) {
+ int res = 0;
+
+ int prefixBytesLeft = prefixBytesLeft();
+
+ if (prefixBytesLeft > 0) {
+ if (len > b.length - off)
+ len = b.length - off;
+
+ res = attachedBytesIs.read(b, off, Math.min(len,
prefixBytesLeft));
+
+ checkPrefixBufferExhausted();
+ }
+
+ return res;
+ }
+
+ /** */
+ private int prefixBytesLeft() {
+ return attachedBytesIs == null ? 0 : attachedBytesIs.available();
+ }
+
+ /** */
+ private void checkPrefixBufferExhausted() {
+ if (attachedBytesIs != null && attachedBytesIs.available() == 0)
+ attachedBytesIs = null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void mark(int readlimit) {
+ throw new UnsupportedOperationException("mark() is not
supported.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean markSupported() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void reset() {
+ throw new UnsupportedOperationException("reset() is not
supported.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public long skip(long n) {
+ throw new UnsupportedOperationException("skip() is not
supported.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public long transferTo(OutputStream out) {
+ throw new UnsupportedOperationException("transferTo() is not
supported.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public @NotNull byte[] readAllBytes() {
+ throw new UnsupportedOperationException("readAllBytes() is not
supported.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public @NotNull byte[] readNBytes(int len) {
+ throw new UnsupportedOperationException("readNBytes() is not
supported.");
+ }
+ }
}
+
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeMetricsMessage.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCacheMetricsMessage.java
similarity index 60%
copy from
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeMetricsMessage.java
copy to
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCacheMetricsMessage.java
index 0c12acee04a..3fbfc2faa71 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeMetricsMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCacheMetricsMessage.java
@@ -17,34 +17,34 @@
package org.apache.ignite.spi.discovery.tcp.messages;
-import org.apache.ignite.cluster.ClusterMetrics;
+import org.apache.ignite.cache.CacheMetrics;
import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory;
-import org.apache.ignite.internal.processors.cluster.NodeMetricsMessage;
+import org.apache.ignite.internal.processors.cluster.CacheMetricsMessage;
import org.apache.ignite.internal.util.typedef.internal.S;
/**
- * We cannot directly reuse `NodeMetricsMessage` in Discovery as it is
registered in a message factory of Communication
- * component and thus is unavailable in Discovery. We have to extend
`NodeMetricsMessage` and register this subclass in
- * message factory of Discovery component.
+ * We cannot directly reuse {@link CacheMetricsMessage} in Discovery as it is
registered in a message factory of
+ * Communication component and thus is unavailable in Discovery. We have to
extend {@link CacheMetricsMessage} and
+ * register this subclass in message factory of Discovery component.
*/
-public class TcpDiscoveryNodeMetricsMessage extends NodeMetricsMessage {
+public class TcpDiscoveryCacheMetricsMessage extends CacheMetricsMessage {
/** Constructor for {@link DiscoveryMessageFactory}. */
- public TcpDiscoveryNodeMetricsMessage() {
+ public TcpDiscoveryCacheMetricsMessage() {
// No-op.
}
- /** @param metrics Metrics. */
- public TcpDiscoveryNodeMetricsMessage(ClusterMetrics metrics) {
- super(metrics);
+ /** @param cacheMetricsMsg Cache metric message. */
+ public TcpDiscoveryCacheMetricsMessage(CacheMetrics cacheMetricsMsg) {
+ super(cacheMetricsMsg);
}
/** {@inheritDoc} */
@Override public short directType() {
- return -102;
+ return -103;
}
/** {@inheritDoc} */
@Override public String toString() {
- return S.toString(TcpDiscoveryNodeMetricsMessage.class, this, "super",
super.toString());
+ return S.toString(TcpDiscoveryCacheMetricsMessage.class, this,
"super", super.toString());
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeMetricsMessage.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientNodesMetricsMessage.java
similarity index 53%
copy from
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeMetricsMessage.java
copy to
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientNodesMetricsMessage.java
index 0c12acee04a..c09a7698b80 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeMetricsMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientNodesMetricsMessage.java
@@ -17,34 +17,41 @@
package org.apache.ignite.spi.discovery.tcp.messages;
-import org.apache.ignite.cluster.ClusterMetrics;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory;
-import org.apache.ignite.internal.processors.cluster.NodeMetricsMessage;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
+
+/** Holds map of thick client or server metrics messages per node id. */
+public class TcpDiscoveryClientNodesMetricsMessage implements Message {
+ /** Map of nodes metrics messages per node id. */
+ @Order(value = 0, method = "nodesMetricsMessages")
+ private Map<UUID, TcpDiscoveryNodeMetricsMessage> nodesMetricsMsgs;
-/**
- * We cannot directly reuse `NodeMetricsMessage` in Discovery as it is
registered in a message factory of Communication
- * component and thus is unavailable in Discovery. We have to extend
`NodeMetricsMessage` and register this subclass in
- * message factory of Discovery component.
- */
-public class TcpDiscoveryNodeMetricsMessage extends NodeMetricsMessage {
/** Constructor for {@link DiscoveryMessageFactory}. */
- public TcpDiscoveryNodeMetricsMessage() {
+ public TcpDiscoveryClientNodesMetricsMessage() {
// No-op.
}
- /** @param metrics Metrics. */
- public TcpDiscoveryNodeMetricsMessage(ClusterMetrics metrics) {
- super(metrics);
+ /** @return Map of nodes metrics messages per node id. */
+ public Map<UUID, TcpDiscoveryNodeMetricsMessage> nodesMetricsMessages() {
+ return nodesMetricsMsgs;
+ }
+
+ /** @param nodesMetricsMsgs Map of nodes metrics messages per node id. */
+ public void nodesMetricsMessages(Map<UUID, TcpDiscoveryNodeMetricsMessage>
nodesMetricsMsgs) {
+ this.nodesMetricsMsgs = nodesMetricsMsgs;
}
/** {@inheritDoc} */
@Override public short directType() {
- return -102;
+ return -104;
}
/** {@inheritDoc} */
@Override public String toString() {
- return S.toString(TcpDiscoveryNodeMetricsMessage.class, this, "super",
super.toString());
+ return S.toString(TcpDiscoveryClientNodesMetricsMessage.class, this,
"super", super.toString());
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryMetricsUpdateMessage.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryMetricsUpdateMessage.java
index e6835320fa5..8b2ccfaf31f 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryMetricsUpdateMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryMetricsUpdateMessage.java
@@ -17,25 +17,21 @@
package org.apache.ignite.spi.discovery.tcp.messages;
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.ArrayList;
-import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
import org.apache.ignite.cache.CacheMetrics;
import org.apache.ignite.cluster.ClusterMetrics;
-import org.apache.ignite.internal.ClusterMetricsSnapshot;
+import org.apache.ignite.internal.Order;
+import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory;
+import org.apache.ignite.internal.processors.cluster.CacheMetricsMessage;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
-import org.apache.ignite.internal.util.typedef.C1;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.jetbrains.annotations.Nullable;
/**
* Metrics update message.
@@ -52,20 +48,28 @@ import org.apache.ignite.internal.util.typedef.internal.U;
* second pass).
*/
@TcpDiscoveryRedirectToClient
-public class TcpDiscoveryMetricsUpdateMessage extends
TcpDiscoveryAbstractMessage {
+public class TcpDiscoveryMetricsUpdateMessage extends
TcpDiscoveryAbstractMessage implements Message {
/** */
private static final long serialVersionUID = 0L;
- /** Map to store nodes metrics. */
+ /** Connected clients metrics: server id -> client id -> clients metrics.
*/
@GridToStringExclude
- private final Map<UUID, MetricsSet> metrics = new HashMap<>();
+ @Order(value = 5, method = "connectedClientsMetricsMessages")
+ private Map<UUID, TcpDiscoveryClientNodesMetricsMessage>
connectedClientsMetricsMsgs;
+
+ /** Servers full metrics: server id -> server metrics + metrics of
server's caches. */
+ @GridToStringExclude
+ @Order(value = 6, method = "serversFullMetricsMessages")
+ private @Nullable Map<UUID, TcpDiscoveryNodeFullMetricsMessage>
serversFullMetricsMsgs;
/** Client node IDs. */
- private final Collection<UUID> clientNodeIds = new HashSet<>();
+ @Order(value = 7)
+ private @Nullable Set<UUID> clientNodeIds;
- /** Cahce metrics by node. */
- @GridToStringExclude
- private final Map<UUID, Map<Integer, CacheMetrics>> cacheMetrics = new
HashMap<>();
+ /** Constructor for {@link DiscoveryMessageFactory}. */
+ public TcpDiscoveryMetricsUpdateMessage() {
+ // No-op.
+ }
/**
* Constructor.
@@ -79,130 +83,136 @@ public class TcpDiscoveryMetricsUpdateMessage extends
TcpDiscoveryAbstractMessag
/**
* Sets metrics for particular node.
*
- * @param nodeId Node ID.
- * @param metrics Node metrics.
+ * @param srvrId Server ID.
+ * @param newMetrics New server metrics to add.
*/
- public void setMetrics(UUID nodeId, ClusterMetrics metrics) {
- assert nodeId != null;
- assert metrics != null;
- assert !this.metrics.containsKey(nodeId);
+ public void addServerMetrics(UUID srvrId, ClusterMetrics newMetrics) {
+ assert srvrId != null;
+ assert newMetrics != null;
+
+ if (serversFullMetricsMsgs == null)
+ serversFullMetricsMsgs = new HashMap<>();
+
+ assert !serversFullMetricsMsgs.containsKey(srvrId);
+
+ serversFullMetricsMsgs.compute(srvrId, (srvrId0, srvrFullMetrics) -> {
+ if (srvrFullMetrics == null)
+ srvrFullMetrics = new TcpDiscoveryNodeFullMetricsMessage();
+
+ srvrFullMetrics.nodeMetricsMessage(new
TcpDiscoveryNodeMetricsMessage(newMetrics));
- this.metrics.put(nodeId, new MetricsSet(metrics));
+ return srvrFullMetrics;
+ });
}
/**
* Sets cache metrics for particular node.
*
- * @param nodeId Node ID.
- * @param metrics Node cache metrics.
+ * @param srvrId Server ID.
+ * @param newCachesMetrics News server's caches metrics to add.
*/
- public void setCacheMetrics(UUID nodeId, Map<Integer, CacheMetrics>
metrics) {
- assert nodeId != null;
- assert metrics != null;
- assert !this.cacheMetrics.containsKey(nodeId);
+ public void addServerCacheMetrics(UUID srvrId, Map<Integer, CacheMetrics>
newCachesMetrics) {
+ assert srvrId != null;
+ assert newCachesMetrics != null;
- if (!F.isEmpty(metrics))
- this.cacheMetrics.put(nodeId, metrics);
+ if (serversFullMetricsMsgs == null)
+ serversFullMetricsMsgs = new HashMap<>();
+
+ assert serversFullMetricsMsgs.containsKey(srvrId) &&
serversFullMetricsMsgs.get(srvrId).cachesMetricsMessages() == null;
+
+ serversFullMetricsMsgs.compute(srvrId, (srvrId0, srvrFullMetrics) -> {
+ if (srvrFullMetrics == null)
+ srvrFullMetrics = new TcpDiscoveryNodeFullMetricsMessage();
+
+ Map<Integer, CacheMetricsMessage> newCachesMsgsMap =
U.newHashMap(newCachesMetrics.size());
+
+ newCachesMetrics.forEach((cacheId, cacheMetrics) ->
+ newCachesMsgsMap.put(cacheId, new
TcpDiscoveryCacheMetricsMessage(cacheMetrics)));
+
+ srvrFullMetrics.cachesMetricsMessages(newCachesMsgsMap);
+
+ return srvrFullMetrics;
+ });
}
/**
- * Sets metrics for a client node.
+ * Adds metrics for a connected client node.
*
- * @param nodeId Server node ID.
- * @param clientNodeId Client node ID.
- * @param metrics Node metrics.
+ * @param srvrId Server node ID.
+ * @param clientNodeId Connected client node ID.
+ * @param clientMetrics Client metrics.
*/
- public void setClientMetrics(UUID nodeId, UUID clientNodeId,
ClusterMetrics metrics) {
- assert nodeId != null;
+ public void addClientMetrics(UUID srvrId, UUID clientNodeId,
ClusterMetrics clientMetrics) {
+ assert srvrId != null;
assert clientNodeId != null;
- assert metrics != null;
- assert this.metrics.containsKey(nodeId);
+ assert clientMetrics != null;
- this.metrics.get(nodeId).addClientMetrics(clientNodeId, metrics);
- }
+ if (connectedClientsMetricsMsgs == null)
+ connectedClientsMetricsMsgs = new HashMap<>();
- /**
- * Removes metrics for particular node from the message.
- *
- * @param nodeId Node ID.
- */
- public void removeMetrics(UUID nodeId) {
- assert nodeId != null;
+ assert !connectedClientsMetricsMsgs.containsKey(srvrId)
+ ||
connectedClientsMetricsMsgs.get(srvrId).nodesMetricsMessages().get(clientNodeId)
== null;
- metrics.remove(nodeId);
- }
+ connectedClientsMetricsMsgs.compute(srvrId, (srvrId0,
clientsMetricsMsg) -> {
+ if (clientsMetricsMsg == null) {
+ clientsMetricsMsg = new
TcpDiscoveryClientNodesMetricsMessage();
+ clientsMetricsMsg.nodesMetricsMessages(new HashMap<>());
+ }
- /**
- * Removes cache metrics for particular node from the message.
- *
- * @param nodeId Node ID.
- */
- public void removeCacheMetrics(UUID nodeId) {
- assert nodeId != null;
+ clientsMetricsMsg.nodesMetricsMessages().put(clientNodeId, new
TcpDiscoveryNodeMetricsMessage(clientMetrics));
- cacheMetrics.remove(nodeId);
+ return clientsMetricsMsg;
+ });
}
/**
- * Gets metrics map.
+ * Removes metrics for particular server node from the message.
*
- * @return Metrics map.
+ * @param srvrId Server ID.
*/
- public Map<UUID, MetricsSet> metrics() {
- return metrics;
- }
+ public void removeServerMetrics(UUID srvrId) {
+ assert srvrId != null;
+ assert serversFullMetricsMsgs != null;
- /**
- * Gets cache metrics map.
- *
- * @return Cache metrics map.
- */
- public Map<UUID, Map<Integer, CacheMetrics>> cacheMetrics() {
- return cacheMetrics;
+ serversFullMetricsMsgs.remove(srvrId);
}
- /**
- * @return {@code True} if this message contains metrics.
- */
- public boolean hasMetrics() {
- return !metrics.isEmpty();
+ /** @return Map of server full metrics messages. */
+ public Map<UUID, TcpDiscoveryNodeFullMetricsMessage>
serversFullMetricsMessages() {
+ return serversFullMetricsMsgs;
}
- /**
- * @return {@code True} this message contains cache metrics.
- */
- public boolean hasCacheMetrics() {
- return !cacheMetrics.isEmpty();
+ /** @param serversFullMetricsMsgs Map of server full metrics messages. */
+ public void serversFullMetricsMessages(Map<UUID,
TcpDiscoveryNodeFullMetricsMessage> serversFullMetricsMsgs) {
+ this.serversFullMetricsMsgs = serversFullMetricsMsgs;
}
- /**
- * @param nodeId Node ID.
- * @return {@code True} if this message contains metrics.
- */
- public boolean hasMetrics(UUID nodeId) {
- assert nodeId != null;
+ /** @return Map of nodes metrics messages. */
+ public @Nullable Map<UUID, TcpDiscoveryClientNodesMetricsMessage>
connectedClientsMetricsMessages() {
+ return connectedClientsMetricsMsgs;
+ }
- return metrics.get(nodeId) != null;
+ /** @param connectedClientsMetricsMsgs Map of nodes metrics messages. */
+ public void connectedClientsMetricsMessages(Map<UUID,
TcpDiscoveryClientNodesMetricsMessage> connectedClientsMetricsMsgs) {
+ this.connectedClientsMetricsMsgs = connectedClientsMetricsMsgs;
}
/**
- * @param nodeId Node ID.
+ * Gets client node IDs.
*
- * @return {@code True} if this message contains cache metrics for
particular node.
+ * @return Client node IDs.
*/
- public boolean hasCacheMetrics(UUID nodeId) {
- assert nodeId != null;
-
- return cacheMetrics.get(nodeId) != null;
+ public @Nullable Set<UUID> clientNodeIds() {
+ return clientNodeIds;
}
/**
- * Gets client node IDs for particular node.
+ * Sets client node IDs.
*
- * @return Client node IDs.
+ * @param clientNodeIds Client node IDs.
*/
- public Collection<UUID> clientNodeIds() {
- return clientNodeIds;
+ public void clientNodeIds(@Nullable Set<UUID> clientNodeIds) {
+ this.clientNodeIds = clientNodeIds;
}
/**
@@ -211,6 +221,9 @@ public class TcpDiscoveryMetricsUpdateMessage extends
TcpDiscoveryAbstractMessag
* @param clientNodeId Client node ID.
*/
public void addClientNodeId(UUID clientNodeId) {
+ if (clientNodeIds == null)
+ clientNodeIds = new HashSet<>();
+
clientNodeIds.add(clientNodeId);
}
@@ -220,115 +233,12 @@ public class TcpDiscoveryMetricsUpdateMessage extends
TcpDiscoveryAbstractMessag
}
/** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(TcpDiscoveryMetricsUpdateMessage.class, this,
"super", super.toString());
+ @Override public short directType() {
+ return 14;
}
- /**
- * @param nodeId Node ID.
- * @param metrics Metrics.
- * @return Serialized metrics.
- */
- private static byte[] serializeMetrics(UUID nodeId, ClusterMetrics
metrics) {
- assert nodeId != null;
- assert metrics != null;
-
- byte[] buf = new byte[16 + ClusterMetricsSnapshot.METRICS_SIZE];
-
- U.longToBytes(nodeId.getMostSignificantBits(), buf, 0);
- U.longToBytes(nodeId.getLeastSignificantBits(), buf, 8);
-
- ClusterMetricsSnapshot.serialize(buf, 16, metrics);
-
- return buf;
- }
-
- /**
- */
- @SuppressWarnings("PublicInnerClass")
- public static class MetricsSet implements Externalizable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Metrics. */
- private byte[] metrics;
-
- /** Client metrics. */
- private Collection<byte[]> clientMetrics;
-
- /**
- */
- public MetricsSet() {
- // No-op.
- }
-
- /**
- * @param metrics Metrics.
- */
- public MetricsSet(ClusterMetrics metrics) {
- assert metrics != null;
-
- this.metrics = ClusterMetricsSnapshot.serialize(metrics);
- }
-
- /**
- * @return Deserialized metrics.
- */
- public ClusterMetrics metrics() {
- return ClusterMetricsSnapshot.deserialize(metrics, 0);
- }
-
- /**
- * @return Client metrics.
- */
- public Collection<T2<UUID, ClusterMetrics>> clientMetrics() {
- return F.viewReadOnly(clientMetrics, new C1<byte[], T2<UUID,
ClusterMetrics>>() {
- @Override public T2<UUID, ClusterMetrics> apply(byte[] bytes) {
- UUID nodeId = new UUID(U.bytesToLong(bytes, 0),
U.bytesToLong(bytes, 8));
-
- return new T2<>(nodeId,
ClusterMetricsSnapshot.deserialize(bytes, 16));
- }
- });
- }
-
- /**
- * @param nodeId Client node ID.
- * @param metrics Client metrics.
- */
- private void addClientMetrics(UUID nodeId, ClusterMetrics metrics) {
- assert nodeId != null;
- assert metrics != null;
-
- if (clientMetrics == null)
- clientMetrics = new ArrayList<>();
-
- clientMetrics.add(serializeMetrics(nodeId, metrics));
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws
IOException {
- U.writeByteArray(out, metrics);
-
- out.writeInt(clientMetrics != null ? clientMetrics.size() : -1);
-
- if (clientMetrics != null) {
- for (byte[] arr : clientMetrics)
- U.writeByteArray(out, arr);
- }
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException,
ClassNotFoundException {
- metrics = U.readByteArray(in);
-
- int clientMetricsSize = in.readInt();
-
- if (clientMetricsSize >= 0) {
- clientMetrics = new ArrayList<>(clientMetricsSize);
-
- for (int i = 0; i < clientMetricsSize; i++)
- clientMetrics.add(U.readByteArray(in));
- }
- }
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(TcpDiscoveryMetricsUpdateMessage.class, this,
"super", super.toString());
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeMetricsMessage.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeFullMetricsMessage.java
similarity index 61%
copy from
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeMetricsMessage.java
copy to
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeFullMetricsMessage.java
index 0c12acee04a..7d121816da4 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeMetricsMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeFullMetricsMessage.java
@@ -17,34 +17,28 @@
package org.apache.ignite.spi.discovery.tcp.messages;
-import org.apache.ignite.cluster.ClusterMetrics;
import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory;
-import org.apache.ignite.internal.processors.cluster.NodeMetricsMessage;
+import org.apache.ignite.internal.processors.cluster.NodeFullMetricsMessage;
import org.apache.ignite.internal.util.typedef.internal.S;
/**
- * We cannot directly reuse `NodeMetricsMessage` in Discovery as it is
registered in a message factory of Communication
- * component and thus is unavailable in Discovery. We have to extend
`NodeMetricsMessage` and register this subclass in
- * message factory of Discovery component.
+ * We cannot directly reuse {@link NodeFullMetricsMessage} in Discovery as it
is registered in a message factory of
+ * Communication component and thus is unavailable in Discovery. We have to
extend {@link NodeFullMetricsMessage} and
+ * register this subclass in message factory of Discovery component.
*/
-public class TcpDiscoveryNodeMetricsMessage extends NodeMetricsMessage {
+public class TcpDiscoveryNodeFullMetricsMessage extends NodeFullMetricsMessage
{
/** Constructor for {@link DiscoveryMessageFactory}. */
- public TcpDiscoveryNodeMetricsMessage() {
+ public TcpDiscoveryNodeFullMetricsMessage() {
// No-op.
}
- /** @param metrics Metrics. */
- public TcpDiscoveryNodeMetricsMessage(ClusterMetrics metrics) {
- super(metrics);
- }
-
/** {@inheritDoc} */
@Override public short directType() {
- return -102;
+ return -105;
}
/** {@inheritDoc} */
@Override public String toString() {
- return S.toString(TcpDiscoveryNodeMetricsMessage.class, this, "super",
super.toString());
+ return S.toString(TcpDiscoveryNodeFullMetricsMessage.class, this,
"super", super.toString());
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeMetricsMessage.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeMetricsMessage.java
index 0c12acee04a..f62b3df0872 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeMetricsMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeMetricsMessage.java
@@ -23,9 +23,9 @@ import
org.apache.ignite.internal.processors.cluster.NodeMetricsMessage;
import org.apache.ignite.internal.util.typedef.internal.S;
/**
- * We cannot directly reuse `NodeMetricsMessage` in Discovery as it is
registered in a message factory of Communication
- * component and thus is unavailable in Discovery. We have to extend
`NodeMetricsMessage` and register this subclass in
- * message factory of Discovery component.
+ * We cannot directly reuse {@link NodeMetricsMessage} in Discovery as it is
registered in a message factory of
+ * Communication component and thus is unavailable in Discovery. We have to
extend {@link NodeMetricsMessage} and
+ * register this subclass in message factory of Discovery component.
*/
public class TcpDiscoveryNodeMetricsMessage extends NodeMetricsMessage {
/** Constructor for {@link DiscoveryMessageFactory}. */
@@ -33,9 +33,9 @@ public class TcpDiscoveryNodeMetricsMessage extends
NodeMetricsMessage {
// No-op.
}
- /** @param metrics Metrics. */
- public TcpDiscoveryNodeMetricsMessage(ClusterMetrics metrics) {
- super(metrics);
+ /** @param nodeMetrics Node metrics. */
+ public TcpDiscoveryNodeMetricsMessage(ClusterMetrics nodeMetrics) {
+ super(nodeMetrics);
}
/** {@inheritDoc} */
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsCacheSizeTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsCacheSizeTest.java
index d21204e99e1..77616ae9e8d 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsCacheSizeTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsCacheSizeTest.java
@@ -17,9 +17,11 @@
package org.apache.ignite.internal.processors.cache;
+import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMetrics;
@@ -27,7 +29,13 @@ import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.internal.ClusterMetricsSnapshot;
+import org.apache.ignite.internal.direct.DirectMessageReader;
+import org.apache.ignite.internal.direct.DirectMessageWriter;
+import org.apache.ignite.internal.processors.cluster.CacheMetricsMessage;
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
+import org.apache.ignite.plugin.extensions.communication.MessageSerializer;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryMetricsUpdateMessage;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
@@ -92,23 +100,50 @@ public class CacheMetricsCacheSizeTest extends
GridCommonAbstractTest {
TcpDiscoveryMetricsUpdateMessage msg = new
TcpDiscoveryMetricsUpdateMessage(UUID.randomUUID());
- msg.setCacheMetrics(UUID.randomUUID(), cacheMetrics);
+ UUID srvrId = UUID.randomUUID();
- Marshaller marshaller = marshaller(grid(0));
+ msg.addServerMetrics(srvrId, new ClusterMetricsSnapshot());
+ msg.addServerCacheMetrics(srvrId, cacheMetrics);
- byte[] buf = marshaller.marshal(msg);
+ MessageFactory msgFactory =
((TcpDiscoverySpi)grid(0).context().discovery().getInjectedDiscoverySpi()).messageFactory();
+ MessageSerializer msgSerializer =
msgFactory.serializer(msg.directType());
- Object readObj = marshaller.unmarshal(buf,
getClass().getClassLoader());
+ // First time we write initial message type which is not read by the
reader because the message type is known.
+ // We have to skip this header at the further message reading.
+ AtomicInteger initHdrSize = new AtomicInteger();
- assertTrue(readObj instanceof TcpDiscoveryMetricsUpdateMessage);
+ DirectMessageWriter msgWritter = new DirectMessageWriter(msgFactory) {
+ @Override public void onHeaderWritten() {
+ super.onHeaderWritten();
- TcpDiscoveryMetricsUpdateMessage msg2 =
(TcpDiscoveryMetricsUpdateMessage)readObj;
+ initHdrSize.compareAndSet(0, getBuffer().position());
+ }
+ };
- Map<Integer, CacheMetrics> cacheMetrics2 =
msg2.cacheMetrics().values().iterator().next();
+ // 2kb should be enough for an empty message even if it is a
relatively large metrics message.
+ msgWritter.setBuffer(ByteBuffer.allocate(2048));
- CacheMetrics cacheMetric2 = cacheMetrics2.values().iterator().next();
+ assertTrue(msgSerializer.writeTo(msg, msgWritter));
- assertEquals("TcpDiscoveryMetricsUpdateMessage serialization error,
cacheSize is different", size, cacheMetric2.getCacheSize());
+ assertTrue(msgWritter.getBuffer().hasRemaining());
+
+ DirectMessageReader msgReader = new DirectMessageReader(msgFactory,
null);
+ msgReader.setBuffer(msgWritter.getBuffer());
+
+ msgWritter.getBuffer().rewind();
+ msgWritter.getBuffer().position(initHdrSize.get());
+
+ TcpDiscoveryMetricsUpdateMessage msg2 = new
TcpDiscoveryMetricsUpdateMessage();
+
+ assertTrue(msgSerializer.readFrom(msg2, msgReader));
+
+ Map<Integer, CacheMetricsMessage> cacheMetrics2 =
msg2.serversFullMetricsMessages().values().iterator().next()
+ .cachesMetricsMessages();
+
+ CacheMetrics cacheMetric2 = new
CacheMetricsSnapshot(cacheMetrics2.values().iterator().next());
+
+ assertEquals("TcpDiscoveryMetricsUpdateMessage serialization error,
cacheSize is different", size,
+ cacheMetric2.getCacheSize());
IgniteCache cacheNode1 = grid(1).cache(DEFAULT_CACHE_NAME);