This is an automated email from the ASF dual-hosted git repository.
av pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 614974f433f IGNITE-27900 Use message serializer for
CacheStatisticsClearMessage, ChangeGlobalStateMessage, DynamicCacheChangeBatch,
ClientCacheChangeDummyDiscoveryMessage (#12780)
614974f433f is described below
commit 614974f433fd7fafcee0095ddc4c3540e9e42ce9
Author: Alexey Abashev <[email protected]>
AuthorDate: Tue Mar 24 17:33:31 2026 +0300
IGNITE-27900 Use message serializer for CacheStatisticsClearMessage,
ChangeGlobalStateMessage, DynamicCacheChangeBatch,
ClientCacheChangeDummyDiscoveryMessage (#12780)
---
.../discovery/DiscoveryMessageFactory.java | 15 +++++
.../cache/CacheStatisticsClearMessage.java | 45 +++++++++++----
.../ClientCacheChangeDummyDiscoveryMessage.java | 41 +++++++++++--
.../processors/cache/DynamicCacheChangeBatch.java | 49 +++++++++++++---
.../cluster/ChangeGlobalStateMessage.java | 67 ++++++++++++++++++----
5 files changed, 184 insertions(+), 33 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
index e30853967bb..fe0a323461a 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
@@ -33,10 +33,16 @@ import
org.apache.ignite.internal.processors.authentication.UserProposedMessageS
import org.apache.ignite.internal.processors.authentication.UserSerializer;
import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage;
import
org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessageSerializer;
+import org.apache.ignite.internal.processors.cache.CacheStatisticsClearMessage;
+import
org.apache.ignite.internal.processors.cache.CacheStatisticsClearMessageSerializer;
import
org.apache.ignite.internal.processors.cache.CacheStatisticsModeChangeMessage;
import
org.apache.ignite.internal.processors.cache.CacheStatisticsModeChangeMessageSerializer;
import
org.apache.ignite.internal.processors.cache.ClientCacheChangeDiscoveryMessage;
import
org.apache.ignite.internal.processors.cache.ClientCacheChangeDiscoveryMessageSerializer;
+import
org.apache.ignite.internal.processors.cache.ClientCacheChangeDummyDiscoveryMessage;
+import
org.apache.ignite.internal.processors.cache.ClientCacheChangeDummyDiscoveryMessageMarshallableSerializer;
+import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
+import
org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatchMarshallableSerializer;
import
org.apache.ignite.internal.processors.cache.TxTimeoutOnPartitionMapExchangeChangeMessage;
import
org.apache.ignite.internal.processors.cache.TxTimeoutOnPartitionMapExchangeChangeMessageSerializer;
import org.apache.ignite.internal.processors.cache.WalStateFinishMessage;
@@ -85,6 +91,8 @@ import
org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import
org.apache.ignite.internal.processors.cache.version.GridCacheVersionSerializer;
import
org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
import
org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessageSerializer;
+import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
+import
org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessageMarshallableSerializer;
import
org.apache.ignite.internal.processors.continuous.StopRoutineAckDiscoveryMessage;
import
org.apache.ignite.internal.processors.continuous.StopRoutineAckDiscoveryMessageSerializer;
import
org.apache.ignite.internal.processors.continuous.StopRoutineDiscoveryMessage;
@@ -330,5 +338,12 @@ public class DiscoveryMessageFactory implements
MessageFactoryProvider {
factory.register((short)529, SnapshotCheckHandlersNodeResponse::new,
new SnapshotCheckHandlersNodeResponseSerializer());
factory.register((short)530,
SnapshotPartitionsVerifyHandlerResponse::new,
new
SnapshotPartitionsVerifyHandlerResponseMarshallableSerializer(marsh, clsLdr));
+ factory.register((short)531, CacheStatisticsClearMessage::new, new
CacheStatisticsClearMessageSerializer());
+ factory.register((short)532, ChangeGlobalStateMessage::new,
+ new ChangeGlobalStateMessageMarshallableSerializer(marsh, clsLdr));
+ factory.register((short)533,
ClientCacheChangeDummyDiscoveryMessage::new,
+ new
ClientCacheChangeDummyDiscoveryMessageMarshallableSerializer(marsh, clsLdr));
+ factory.register((short)534, DynamicCacheChangeBatch::new,
+ new DynamicCacheChangeBatchMarshallableSerializer(marsh, clsLdr));
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStatisticsClearMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStatisticsClearMessage.java
index f360a6acc70..a30edd8fc27 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStatisticsClearMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStatisticsClearMessage.java
@@ -19,15 +19,17 @@ package org.apache.ignite.internal.processors.cache;
import java.util.Collection;
import java.util.UUID;
+import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.plugin.extensions.communication.Message;
import org.jetbrains.annotations.Nullable;
/**
* Cache statistics clear discovery message.
*/
-public class CacheStatisticsClearMessage implements DiscoveryCustomMessage {
+public class CacheStatisticsClearMessage implements DiscoveryCustomMessage,
Message {
/** */
private static final long serialVersionUID = 0L;
@@ -35,16 +37,27 @@ public class CacheStatisticsClearMessage implements
DiscoveryCustomMessage {
private static final byte INITIAL_MSG_MASK = 0x01;
/** Custom message ID. */
- private final IgniteUuid id = IgniteUuid.randomUuid();
+ @Order(0)
+ IgniteUuid id;
/** Request id. */
- private final UUID reqId;
+ @Order(1)
+ UUID reqId;
/** Cache names. */
- private final Collection<String> caches;
+ @Order(2)
+ Collection<String> caches;
/** Flags. */
- private final byte flags;
+ @Order(3)
+ byte flags;
+
+ /**
+ * Default constructor.
+ */
+ public CacheStatisticsClearMessage() {
+ // No-op.
+ }
/**
* Constructor for request.
@@ -54,7 +67,9 @@ public class CacheStatisticsClearMessage implements
DiscoveryCustomMessage {
public CacheStatisticsClearMessage(UUID reqId, Collection<String> caches) {
this.reqId = reqId;
this.caches = caches;
- this.flags = INITIAL_MSG_MASK;
+
+ id = IgniteUuid.randomUuid();
+ flags = INITIAL_MSG_MASK;
}
/**
@@ -63,14 +78,15 @@ public class CacheStatisticsClearMessage implements
DiscoveryCustomMessage {
* @param msg Request message.
*/
private CacheStatisticsClearMessage(CacheStatisticsClearMessage msg) {
- this.reqId = msg.reqId;
- this.caches = null;
- this.flags = 0;
+ id = IgniteUuid.randomUuid();
+ reqId = msg.reqId;
+ caches = null;
+ flags = 0;
}
/** {@inheritDoc} */
@Override public IgniteUuid id() {
- return this.id;
+ return id;
}
/** {@inheritDoc} */
@@ -78,11 +94,16 @@ public class CacheStatisticsClearMessage implements
DiscoveryCustomMessage {
return initial() ? new CacheStatisticsClearMessage(this) : null;
}
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return 531;
+ }
+
/**
* @return Cache names.
*/
public Collection<String> caches() {
- return this.caches;
+ return caches;
}
/**
@@ -96,7 +117,7 @@ public class CacheStatisticsClearMessage implements
DiscoveryCustomMessage {
* @return Request id.
*/
public UUID requestId() {
- return this.reqId;
+ return reqId;
}
/** {@inheritDoc} */
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDummyDiscoveryMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDummyDiscoveryMessage.java
index 5a31a26a8ba..3317a44166b 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDummyDiscoveryMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDummyDiscoveryMessage.java
@@ -20,30 +20,46 @@ package org.apache.ignite.internal.processors.cache;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.processors.security.SecurityContext;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.plugin.extensions.communication.MarshallableMessage;
import org.jetbrains.annotations.Nullable;
/**
* Dummy discovery message which is not really sent via ring, it is just added
in local discovery worker queue.
*/
public class ClientCacheChangeDummyDiscoveryMessage extends
AbstractCachePartitionExchangeWorkerTask
- implements DiscoveryCustomMessage {
+ implements DiscoveryCustomMessage, MarshallableMessage {
/** */
private static final long serialVersionUID = 0L;
/** */
- private final UUID reqId;
+ @Order(0)
+ UUID reqId;
/** */
- private final Map<String, DynamicCacheChangeRequest> startReqs;
+ Map<String, DynamicCacheChangeRequest> startReqs;
+
+ /** JDK Serialized version of startReqs. */
+ @Order(1)
+ byte[] startRequestsBytes;
/** */
@GridToStringInclude
- private final Set<String> cachesToClose;
+ @Order(2)
+ Set<String> cachesToClose;
+
+ /** */
+ public ClientCacheChangeDummyDiscoveryMessage() {
+ super(null);
+ }
/**
* @param secCtx Security context in which current task must be executed.
@@ -103,6 +119,23 @@ public class ClientCacheChangeDummyDiscoveryMessage
extends AbstractCachePartiti
throw new UnsupportedOperationException();
}
+ /** {@inheritDoc} */
+ @Override public void prepareMarshal(Marshaller marsh) throws
IgniteCheckedException {
+ if (startReqs != null)
+ startRequestsBytes = U.marshal(marsh, startReqs);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void finishUnmarshal(Marshaller marsh, ClassLoader
clsLdr) throws IgniteCheckedException {
+ if (startRequestsBytes != null)
+ startReqs = U.unmarshal(marsh, startRequestsBytes, clsLdr);
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return 533;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(ClientCacheChangeDummyDiscoveryMessage.class, this,
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
index 2e70e956f07..a0f913e3bc0 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
@@ -19,6 +19,8 @@ package org.apache.ignite.internal.processors.cache;
import java.util.Collection;
import java.util.Set;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
@@ -28,35 +30,50 @@ import
org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.plugin.extensions.communication.MarshallableMessage;
import org.jetbrains.annotations.Nullable;
/**
* Cache change batch.
*/
-public class DynamicCacheChangeBatch implements DiscoveryCustomMessage {
+public class DynamicCacheChangeBatch implements DiscoveryCustomMessage,
MarshallableMessage {
/** */
private static final long serialVersionUID = 0L;
/** Discovery custom message ID. */
- private IgniteUuid id = IgniteUuid.randomUuid();
+ @Order(0)
+ IgniteUuid id;
/** Change requests. */
@GridToStringInclude
- private Collection<DynamicCacheChangeRequest> reqs;
+ Collection<DynamicCacheChangeRequest> reqs;
+
+ /** JDK Serialized version of reqs. */
+ @Order(1)
+ byte[] requestsBytes;
/** Cache updates to be executed on exchange. */
- private transient ExchangeActions exchangeActions;
+ private ExchangeActions exchangeActions;
/** */
- private boolean startCaches;
+ @Order(2)
+ boolean startCaches;
/** Restarting caches. */
- private Set<String> restartingCaches;
+ @Order(3)
+ Set<String> restartingCaches;
/** Affinity (cache related) services updates to be processed on services
deployment process. */
@GridToStringExclude
- @Nullable private transient ServiceDeploymentActions
serviceDeploymentActions;
+ @Nullable private ServiceDeploymentActions serviceDeploymentActions;
+
+ /** */
+ public DynamicCacheChangeBatch() {
+ // No-op.
+ }
/**
* @param reqs Requests.
@@ -64,6 +81,7 @@ public class DynamicCacheChangeBatch implements
DiscoveryCustomMessage {
public DynamicCacheChangeBatch(Collection<DynamicCacheChangeRequest> reqs)
{
assert !F.isEmpty(reqs) : reqs;
+ id = IgniteUuid.randomUuid();
this.reqs = reqs;
}
@@ -157,6 +175,23 @@ public class DynamicCacheChangeBatch implements
DiscoveryCustomMessage {
this.startCaches = startCaches;
}
+ /** {@inheritDoc} */
+ @Override public void prepareMarshal(Marshaller marsh) throws
IgniteCheckedException {
+ if (reqs != null)
+ requestsBytes = U.marshal(marsh, reqs);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void finishUnmarshal(Marshaller marsh, ClassLoader
clsLdr) throws IgniteCheckedException {
+ if (requestsBytes != null)
+ reqs = U.unmarshal(marsh, requestsBytes, clsLdr);
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return 534;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(DynamicCacheChangeBatch.class, this);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java
index be4e628b18c..d5763cbba05 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java
@@ -19,7 +19,9 @@ package org.apache.ignite.internal.processors.cluster;
import java.util.List;
import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
@@ -29,47 +31,68 @@ import
org.apache.ignite.internal.processors.cache.StoredCacheData;
import org.apache.ignite.internal.processors.service.ServiceDeploymentActions;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.plugin.extensions.communication.MarshallableMessage;
import org.jetbrains.annotations.Nullable;
/**
* Message represent request for change cluster global state.
*/
-public class ChangeGlobalStateMessage implements DiscoveryCustomMessage {
+public class ChangeGlobalStateMessage implements DiscoveryCustomMessage,
MarshallableMessage {
/** */
private static final long serialVersionUID = 0L;
/** Custom message ID. */
- private IgniteUuid id = IgniteUuid.randomUuid();
+ @Order(0)
+ IgniteUuid id;
/** Request ID */
- private UUID reqId;
+ @Order(1)
+ UUID reqId;
/** Initiator node ID. */
- private UUID initiatingNodeId;
+ @Order(2)
+ UUID initiatingNodeId;
/** Cluster state */
- private ClusterState state;
+ @Order(3)
+ ClusterState state;
/** Configurations read from persistent store. */
private List<StoredCacheData> storedCfgs;
+ /** JDK Serialized version of storedCfgs. */
+ @Order(4)
+ byte[] storedCfgsBytes;
+
/** */
@Nullable private BaselineTopology baselineTopology;
+ /** JDK Serialized version of baselineTopology. */
+ @Order(5)
+ byte[] baselineTopologyBytes;
+
/** */
- private boolean forceChangeBaselineTopology;
+ @Order(6)
+ boolean forceChangeBaselineTopology;
/** */
@GridToStringExclude
- private transient ExchangeActions exchangeActions;
+ private ExchangeActions exchangeActions;
/** Services deployment actions to be processed on services deployment
process. */
@GridToStringExclude
- @Nullable private transient ServiceDeploymentActions
serviceDeploymentActions;
+ @Nullable private ServiceDeploymentActions serviceDeploymentActions;
/** If {@code true}, cluster deactivation will be forced. */
- private boolean forceDeactivation;
+ @Order(7)
+ boolean forceDeactivation;
+
+ /** No-arg constructor for deserialization. */
+ public ChangeGlobalStateMessage() {
+ }
/**
* @param reqId State change request ID.
@@ -94,6 +117,7 @@ public class ChangeGlobalStateMessage implements
DiscoveryCustomMessage {
assert reqId != null;
assert initiatingNodeId != null;
+ id = IgniteUuid.randomUuid();
this.reqId = reqId;
this.initiatingNodeId = initiatingNodeId;
this.storedCfgs = storedCfgs;
@@ -104,7 +128,7 @@ public class ChangeGlobalStateMessage implements
DiscoveryCustomMessage {
}
/**
- * @return Configurations read from persistent store..
+ * @return Configurations read from persistent store.
*/
@Nullable public List<StoredCacheData> storedCacheConfigurations() {
return storedCfgs;
@@ -211,6 +235,29 @@ public class ChangeGlobalStateMessage implements
DiscoveryCustomMessage {
return reqId;
}
+ /** {@inheritDoc} */
+ @Override public void prepareMarshal(Marshaller marsh) throws
IgniteCheckedException {
+ if (storedCfgs != null)
+ storedCfgsBytes = U.marshal(marsh, storedCfgs);
+
+ if (baselineTopology != null)
+ baselineTopologyBytes = U.marshal(marsh, baselineTopology);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void finishUnmarshal(Marshaller marsh, ClassLoader
clsLdr) throws IgniteCheckedException {
+ if (storedCfgsBytes != null)
+ storedCfgs = U.unmarshal(marsh, storedCfgsBytes, clsLdr);
+
+ if (baselineTopologyBytes != null)
+ baselineTopology = U.unmarshal(marsh, baselineTopologyBytes,
clsLdr);
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return 532;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(ChangeGlobalStateMessage.class, this);