This is an automated email from the ASF dual-hosted git repository.
av pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 3b76ad01259 IGNITE-27626 Add capability to use MessageSerializer for
custom messages (#12819)
3b76ad01259 is described below
commit 3b76ad0125938d2c968f5f964fcb320077d2c702
Author: Ilya Shishkov <[email protected]>
AuthorDate: Thu Feb 26 18:27:54 2026 +0300
IGNITE-27626 Add capability to use MessageSerializer for custom messages
(#12819)
---
.../discovery/DiscoveryMessageFactory.java | 6 ++++
.../cache/CacheStatisticsModeChangeMessage.java | 35 +++++++++++++++++-----
.../processors/cache/GridCacheProcessor.java | 2 +-
.../messages/TcpDiscoveryCustomEventMessage.java | 23 +++++++++++---
4 files changed, 54 insertions(+), 12 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
index 707feeac700..8255abdc2a8 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.managers.discovery;
+import
org.apache.ignite.internal.processors.cache.CacheStatisticsModeChangeMessage;
+import
org.apache.ignite.internal.processors.cache.CacheStatisticsModeChangeMessageSerializer;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
import
org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket;
@@ -96,6 +98,7 @@ public class DiscoveryMessageFactory implements
MessageFactoryProvider {
factory.register((short)-101, InetSocketAddressMessage::new, new
InetSocketAddressMessageSerializer());
factory.register((short)-100, InetAddressMessage::new, new
InetAddressMessageSerializer());
+ // TcpDiscoveryAbstractMessage
factory.register((short)0, TcpDiscoveryCheckFailedMessage::new, new
TcpDiscoveryCheckFailedMessageSerializer());
factory.register((short)1, TcpDiscoveryPingRequest::new, new
TcpDiscoveryPingRequestSerializer());
factory.register((short)2, TcpDiscoveryPingResponse::new, new
TcpDiscoveryPingResponseSerializer());
@@ -120,5 +123,8 @@ public class DiscoveryMessageFactory implements
MessageFactoryProvider {
factory.register((short)21, TcpDiscoveryCustomEventMessage::new, new
TcpDiscoveryCustomEventMessageSerializer());
factory.register((short)22,
TcpDiscoveryServerOnlyCustomEventMessage::new,
new TcpDiscoveryServerOnlyCustomEventMessageSerializer());
+
+ // DiscoveryCustomMessage
+ factory.register((short)500, CacheStatisticsModeChangeMessage::new,
new CacheStatisticsModeChangeMessageSerializer());
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStatisticsModeChangeMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStatisticsModeChangeMessage.java
index 40bcfaf12de..308af19b0e9 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStatisticsModeChangeMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStatisticsModeChangeMessage.java
@@ -20,18 +20,21 @@ package org.apache.ignite.internal.processors.cache;
import java.util.Collection;
import java.util.Collections;
import java.util.UUID;
+import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory;
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.plugin.extensions.communication.Message;
import org.jetbrains.annotations.Nullable;
/**
* Cache statistics mode change discovery message.
*/
-public class CacheStatisticsModeChangeMessage implements
DiscoveryCustomMessage {
+public class CacheStatisticsModeChangeMessage implements
DiscoveryCustomMessage, Message {
/** */
private static final long serialVersionUID = 0L;
@@ -42,16 +45,27 @@ public class CacheStatisticsModeChangeMessage implements
DiscoveryCustomMessage
private static final byte ENABLED_MASK = 0x02;
/** Custom message ID. */
- private final IgniteUuid id = IgniteUuid.randomUuid();
+ @Order(0)
+ IgniteUuid id;
/** Request id. */
- private final UUID reqId;
+ @Order(1)
+ UUID reqId;
/** Cache names. */
- private final Collection<String> caches;
+ @Order(2)
+ Collection<String> caches;
/** Flags. */
- private final byte flags;
+ @Order(3)
+ byte flags;
+
+ /**
+ * Constructor for {@link DiscoveryMessageFactory}.
+ */
+ public CacheStatisticsModeChangeMessage() {
+ // No-op.
+ }
/**
* Constructor for response.
@@ -59,6 +73,7 @@ public class CacheStatisticsModeChangeMessage implements
DiscoveryCustomMessage
* @param req Request message.
*/
private CacheStatisticsModeChangeMessage(CacheStatisticsModeChangeMessage
req) {
+ id = IgniteUuid.randomUuid();
reqId = req.reqId;
caches = null;
@@ -73,8 +88,9 @@ public class CacheStatisticsModeChangeMessage implements
DiscoveryCustomMessage
*
* @param caches Collection of cache names.
*/
- public CacheStatisticsModeChangeMessage(UUID reqId, Collection<String>
caches, boolean enabled) {
- this.reqId = reqId;
+ public CacheStatisticsModeChangeMessage(Collection<String> caches, boolean
enabled) {
+ id = IgniteUuid.randomUuid();
+ reqId = UUID.randomUUID();
this.caches = Collections.unmodifiableCollection(caches);
byte flags = INITIAL_MSG_MASK;
@@ -138,4 +154,9 @@ public class CacheStatisticsModeChangeMessage implements
DiscoveryCustomMessage
@Override public String toString() {
return S.toString(CacheStatisticsModeChangeMessage.class, this);
}
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return 500;
+ }
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index fe419c8ef30..7ddc2bb8fda 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -5221,7 +5221,7 @@ public class GridCacheProcessor extends
GridProcessorAdapter {
if (globalCaches.isEmpty())
return;
- CacheStatisticsModeChangeMessage msg = new
CacheStatisticsModeChangeMessage(UUID.randomUUID(), globalCaches, enabled);
+ CacheStatisticsModeChangeMessage msg = new
CacheStatisticsModeChangeMessage(globalCaches, enabled);
EnableStatisticsFuture fut = new
EnableStatisticsFuture(msg.requestId());
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
index 980d12f1277..242931e2bae 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
@@ -47,6 +47,11 @@ public class TcpDiscoveryCustomEventMessage extends
TcpDiscoveryAbstractTraceabl
@Order(6)
volatile @Nullable byte[] msgBytes;
+ /** {@link Message} representation of original message. */
+ // TODO: Should be removed in
https://issues.apache.org/jira/browse/IGNITE-27627
+ @Order(7)
+ volatile @Nullable Message serMsg;
+
/**
* Constructor for {@link DiscoveryMessageFactory}.
*/
@@ -72,6 +77,7 @@ public class TcpDiscoveryCustomEventMessage extends
TcpDiscoveryAbstractTraceabl
super(msg);
msgBytes = msg.msgBytes;
+ serMsg = msg.serMsg;
this.msg = msg.msg;
}
@@ -96,9 +102,13 @@ public class TcpDiscoveryCustomEventMessage extends
TcpDiscoveryAbstractTraceabl
*/
// TODO: Should be removed in
https://issues.apache.org/jira/browse/IGNITE-27627
public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException
{
- assert msgBytes == null || msg.isMutable() : "Message bytes are not
null for immutable message: msg =" + msg;
+ if (msg instanceof Message)
+ serMsg = (Message)msg;
+ else {
+ assert msgBytes == null || msg.isMutable() : "Message bytes are
not null for immutable message: msg =" + msg;
- msgBytes = U.marshal(marsh, msg);
+ msgBytes = U.marshal(marsh, msg);
+ }
}
/**
@@ -108,8 +118,13 @@ public class TcpDiscoveryCustomEventMessage extends
TcpDiscoveryAbstractTraceabl
* @param ldr Class loader.
*/
// TODO: Should be removed in
https://issues.apache.org/jira/browse/IGNITE-27627
- public void finishUnmarhal(Marshaller marsh, ClassLoader ldr) throws
Throwable {
- if (msg == null) {
+ public void finishUnmarhal(Marshaller marsh, ClassLoader ldr) throws
IgniteCheckedException {
+ if (msg != null)
+ return;
+
+ if (serMsg != null)
+ msg = (DiscoveryCustomMessage)serMsg;
+ else {
try {
msg = U.unmarshal(marsh, msgBytes, ldr);
}