This is an automated email from the ASF dual-hosted git repository.
anton-vinogradov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 2254b522c93 IGNITE-28058 Revise custom serialization in
DistributedMetaStorage* messages (#12997)
2254b522c93 is described below
commit 2254b522c933fc9f7b0e6eb2f9992b13b2169c63
Author: Dmitry Werner <[email protected]>
AuthorDate: Fri Apr 24 12:47:55 2026 +0500
IGNITE-28058 Revise custom serialization in DistributedMetaStorage*
messages (#12997)
---
.../DistributedMetaStorageCasMessage.java | 39 +++++++++++++++++-----
.../persistence/DistributedMetaStorageImpl.java | 34 ++++++++++---------
.../DistributedMetaStorageUpdateMessage.java | 31 ++++++++++++-----
.../persistence/DistributedMetaStorageUtil.java | 5 ---
4 files changed, 71 insertions(+), 38 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageCasMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageCasMessage.java
index 62c49f0f07b..d40661bbadc 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageCasMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageCasMessage.java
@@ -17,18 +17,25 @@
package org.apache.ignite.internal.processors.metastorage.persistence;
+import java.io.Serializable;
import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
import org.jetbrains.annotations.Nullable;
/** */
public class DistributedMetaStorageCasMessage extends
DistributedMetaStorageUpdateMessage {
- /** TODO: revise the external serialization
https://issues.apache.org/jira/browse/IGNITE-28058. */
+ /** */
+ private @Nullable Serializable expVal;
+
+ /** */
@Order(0)
- byte[] expectedVal;
+ byte[] expValBytes;
/** */
@Order(1)
@@ -40,16 +47,16 @@ public class DistributedMetaStorageCasMessage extends
DistributedMetaStorageUpda
}
/** */
- public DistributedMetaStorageCasMessage(UUID reqId, String key, byte[]
expValBytes, byte[] valBytes) {
- super(reqId, key, valBytes);
+ public DistributedMetaStorageCasMessage(UUID reqId, String key, @Nullable
Serializable expVal, @Nullable Serializable val) {
+ super(reqId, key, val);
- expectedVal = expValBytes;
+ this.expVal = expVal;
matches = true;
}
/** */
- public byte[] expectedValue() {
- return expectedVal;
+ public Serializable expectedValue() {
+ return expVal;
}
/** */
@@ -64,7 +71,23 @@ public class DistributedMetaStorageCasMessage extends
DistributedMetaStorageUpda
/** {@inheritDoc} */
@Override @Nullable public DiscoveryCustomMessage ackMessage() {
- return new DistributedMetaStorageCasAckMessage(requestId(), matches);
+ return new DistributedMetaStorageCasAckMessage(reqId, matches);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void prepareMarshal(Marshaller marsh) throws
IgniteCheckedException {
+ super.prepareMarshal(marsh);
+
+ if (expVal != null && expValBytes == null)
+ expValBytes = U.marshal(marsh, expVal);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void finishUnmarshal(Marshaller marsh) throws
IgniteCheckedException {
+ super.finishUnmarshal(marsh);
+
+ if (expValBytes != null && expVal == null)
+ expVal = U.unmarshal(marsh, expValBytes, U.gridClassLoader());
}
/** {@inheritDoc} */
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java
index c9c0d6c49f9..2e24715108b 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java
@@ -44,7 +44,6 @@ import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.NodeStoppingException;
-import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.managers.discovery.DiscoveryLocalJoinData;
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
@@ -84,7 +83,6 @@ import static
org.apache.ignite.internal.processors.cache.GridCacheUtils.isPersi
import static
org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageHistoryItem.EMPTY_ARRAY;
import static
org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.historyItemPrefix;
import static
org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.historyItemVer;
-import static
org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.marshal;
import static
org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.unmarshal;
import static
org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageVersion.INITIAL_VERSION;
import static
org.apache.ignite.internal.processors.metric.impl.MetricUtils.metricName;
@@ -479,7 +477,7 @@ public class DistributedMetaStorageImpl extends
GridProcessorAdapter
@Override public void write(@NotNull String key, @NotNull Serializable
val) throws IgniteCheckedException {
assert val != null : key;
- startWrite(key, marshal(marshaller, val)).get();
+ startWrite(key, val).get();
}
/** {@inheritDoc} */
@@ -489,7 +487,7 @@ public class DistributedMetaStorageImpl extends
GridProcessorAdapter
) throws IgniteCheckedException {
assert val != null : key;
- return startWrite(key, marshal(marshaller, val));
+ return startWrite(key, val);
}
/** {@inheritDoc} */
@@ -521,7 +519,7 @@ public class DistributedMetaStorageImpl extends
GridProcessorAdapter
) throws IgniteCheckedException {
assert newVal != null : key;
- return startCas(key, marshal(marshaller, expVal), marshal(marshaller,
newVal));
+ return startCas(key, expVal, newVal);
}
/** {@inheritDoc} */
@@ -531,7 +529,7 @@ public class DistributedMetaStorageImpl extends
GridProcessorAdapter
) throws IgniteCheckedException {
assert expVal != null : key;
- return startCas(key, marshal(marshaller, expVal), null).get();
+ return startCas(key, expVal, null).get();
}
/** {@inheritDoc} */
@@ -1046,10 +1044,10 @@ public class DistributedMetaStorageImpl extends
GridProcessorAdapter
* for operation to be completed.
*
* @param key The key.
- * @param valBytes Value bytes to write. Null if value needs to be removed.
+ * @param val Value to write. Null if value needs to be removed.
* @throws IgniteCheckedException If there was an error while sending
discovery message.
*/
- private GridFutureAdapter<?> startWrite(String key, byte[] valBytes)
throws IgniteCheckedException {
+ private GridFutureAdapter<?> startWrite(String key, @Nullable Serializable
val) throws IgniteCheckedException {
UUID reqId = UUID.randomUUID();
GridFutureAdapter<?> fut = prepareWriteFuture(reqId);
@@ -1057,7 +1055,9 @@ public class DistributedMetaStorageImpl extends
GridProcessorAdapter
if (fut.isDone())
return fut;
- DiscoveryCustomMessage msg = new
DistributedMetaStorageUpdateMessage(reqId, key, valBytes);
+ DistributedMetaStorageUpdateMessage msg = new
DistributedMetaStorageUpdateMessage(reqId, key, val);
+
+ msg.prepareMarshal(marshaller);
ctx.discovery().sendCustomEvent(msg);
@@ -1065,9 +1065,9 @@ public class DistributedMetaStorageImpl extends
GridProcessorAdapter
}
/**
- * Basically the same as {@link #startWrite(String, byte[])} but for CAS
operations.
+ * Basically the same as {@link #startWrite(String, Serializable)} but for
CAS operations.
*/
- private GridFutureAdapter<Boolean> startCas(String key, byte[]
expValBytes, byte[] newValBytes)
+ private GridFutureAdapter<Boolean> startCas(String key, @Nullable
Serializable expVal, @Nullable Serializable newVal)
throws IgniteCheckedException {
UUID reqId = UUID.randomUUID();
@@ -1076,7 +1076,9 @@ public class DistributedMetaStorageImpl extends
GridProcessorAdapter
if (fut.isDone())
return fut;
- DiscoveryCustomMessage msg = new
DistributedMetaStorageCasMessage(reqId, key, expValBytes, newValBytes);
+ DistributedMetaStorageCasMessage msg = new
DistributedMetaStorageCasMessage(reqId, key, expVal, newVal);
+
+ msg.prepareMarshal(marshaller);
ctx.discovery().sendCustomEvent(msg);
@@ -1134,7 +1136,7 @@ public class DistributedMetaStorageImpl extends
GridProcessorAdapter
if (msg instanceof DistributedMetaStorageCasMessage)
completeCas((DistributedMetaStorageCasMessage)msg);
else
- completeWrite(new DistributedMetaStorageHistoryItem(msg.key(),
msg.value()));
+ completeWrite(new DistributedMetaStorageHistoryItem(msg.key(),
msg.valueBytes()));
}
catch (IgniteInterruptedCheckedException e) {
throw U.convertException(e);
@@ -1318,16 +1320,16 @@ public class DistributedMetaStorageImpl extends
GridProcessorAdapter
Serializable oldVal = bridge.read(msg.key());
- Serializable expVal = unmarshal(marshaller, msg.expectedValue());
+ msg.finishUnmarshal(marshaller);
- if (!Objects.deepEquals(oldVal, expVal)) {
+ if (!Objects.deepEquals(oldVal, msg.expectedValue())) {
msg.setMatches(false);
// Do nothing if expected value doesn't match with the actual one.
return;
}
- completeWrite(new DistributedMetaStorageHistoryItem(msg.key(),
msg.value()));
+ completeWrite(new DistributedMetaStorageHistoryItem(msg.key(),
msg.valueBytes()));
}
/**
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageUpdateMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageUpdateMessage.java
index af4d7256bdb..3b1f2f02c9a 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageUpdateMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageUpdateMessage.java
@@ -17,12 +17,16 @@
package org.apache.ignite.internal.processors.metastorage.persistence;
+import java.io.Serializable;
import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
import org.jetbrains.annotations.Nullable;
@@ -42,7 +46,10 @@ public class DistributedMetaStorageUpdateMessage implements
DiscoveryCustomMessa
@Order(2)
String key;
- /** TODO: revise the external serialization
https://issues.apache.org/jira/browse/IGNITE-28058. */
+ /** */
+ private @Nullable Serializable val;
+
+ /** */
@Order(3)
byte[] valBytes;
@@ -52,12 +59,12 @@ public class DistributedMetaStorageUpdateMessage implements
DiscoveryCustomMessa
}
/** */
- public DistributedMetaStorageUpdateMessage(UUID reqId, String key, byte[]
valBytes) {
+ public DistributedMetaStorageUpdateMessage(UUID reqId, String key,
@Nullable Serializable val) {
id = IgniteUuid.randomUuid();
this.reqId = reqId;
this.key = key;
- this.valBytes = valBytes;
+ this.val = val;
}
/** {@inheritDoc} */
@@ -65,18 +72,13 @@ public class DistributedMetaStorageUpdateMessage implements
DiscoveryCustomMessa
return id;
}
- /** */
- public UUID requestId() {
- return reqId;
- }
-
/** */
public String key() {
return key;
}
/** */
- public byte[] value() {
+ public byte[] valueBytes() {
return valBytes;
}
@@ -90,6 +92,17 @@ public class DistributedMetaStorageUpdateMessage implements
DiscoveryCustomMessa
return true;
}
+ /** @param marsh Marshaller. */
+ public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException
{
+ if (val != null && valBytes == null)
+ valBytes = U.marshal(marsh, val);
+ }
+
+ /** @param marsh Marshaller. */
+ public void finishUnmarshal(Marshaller marsh) throws
IgniteCheckedException {
+ // No-op.
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(DistributedMetaStorageUpdateMessage.class, this);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageUtil.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageUtil.java
index 309b7a09ddc..3a4c800899f 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageUtil.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageUtil.java
@@ -59,11 +59,6 @@ final class DistributedMetaStorageUtil {
*/
private static final String CLEANUP_GUARD_KEY = "clean";
- /** */
- public static byte[] marshal(JdkMarshaller marshaller, Serializable val)
throws IgniteCheckedException {
- return val == null ? null : marshaller.marshal(val);
- }
-
/** */
public static Serializable unmarshal(JdkMarshaller marshaller, byte[]
valBytes) throws IgniteCheckedException {
return valBytes == null ? null : marshaller.unmarshal(valBytes,
U.gridClassLoader());