This is an automated email from the ASF dual-hosted git repository.
shishkovilja pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 5248d977e92 IGNITE-27853 Use MessageSerializer fo
GridCacheRawVersionedEntry and DataStreamerEntry (#12746)
5248d977e92 is described below
commit 5248d977e926c7b8778a2c9bd1c74c017b57ce68
Author: Aleksandr Chesnokov <[email protected]>
AuthorDate: Mon Mar 16 13:11:12 2026 +0300
IGNITE-27853 Use MessageSerializer fo GridCacheRawVersionedEntry and
DataStreamerEntry (#12746)
---
.../communication/GridIoMessageFactory.java | 6 +-
.../processors/cache/GridCacheAdapter.java | 2 -
.../cache/version/GridCacheRawVersionedEntry.java | 210 +--------------------
.../processors/datastreamer/DataStreamerEntry.java | 65 +------
.../dr/IgniteDrDataStreamerCacheUpdater.java | 2 -
5 files changed, 18 insertions(+), 267 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index eb022f0edb1..a3379034634 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -262,6 +262,7 @@ import
org.apache.ignite.internal.processors.cache.transactions.TxLocksRequestSe
import
org.apache.ignite.internal.processors.cache.transactions.TxLocksResponse;
import
org.apache.ignite.internal.processors.cache.transactions.TxLocksResponseSerializer;
import
org.apache.ignite.internal.processors.cache.version.GridCacheRawVersionedEntry;
+import
org.apache.ignite.internal.processors.cache.version.GridCacheRawVersionedEntrySerializer;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx;
import
org.apache.ignite.internal.processors.cache.version.GridCacheVersionExSerializer;
@@ -279,6 +280,7 @@ import
org.apache.ignite.internal.processors.continuous.ContinuousRoutineStartRe
import org.apache.ignite.internal.processors.continuous.GridContinuousMessage;
import
org.apache.ignite.internal.processors.continuous.GridContinuousMessageSerializer;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry;
+import
org.apache.ignite.internal.processors.datastreamer.DataStreamerEntrySerializer;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerRequest;
import
org.apache.ignite.internal.processors.datastreamer.DataStreamerRequestSerializer;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerResponse;
@@ -470,14 +472,14 @@ public class GridIoMessageFactory implements
MessageFactoryProvider {
factory.register((short)92, CacheEntryInfoCollection::new, new
CacheEntryInfoCollectionSerializer());
factory.register((short)93, CacheInvokeDirectResult::new, new
CacheInvokeDirectResultSerializer());
factory.register((short)94, IgniteTxKey::new, new
IgniteTxKeySerializer());
- factory.register((short)95, DataStreamerEntry::new);
+ factory.register((short)95, DataStreamerEntry::new, new
DataStreamerEntrySerializer());
factory.register((short)96, CacheContinuousQueryEntry::new, new
CacheContinuousQueryEntrySerializer());
factory.register((short)97, CacheEvictionEntry::new, new
CacheEvictionEntrySerializer());
factory.register((short)98, CacheEntryPredicateAdapter::new, new
CacheEntryPredicateAdapterSerializer());
factory.register((short)100, IgniteTxEntry::new, new
IgniteTxEntrySerializer());
factory.register((short)101, TxEntryValueHolder::new, new
TxEntryValueHolderSerializer());
factory.register((short)102, CacheVersionedValue::new, new
CacheVersionedValueSerializer());
- factory.register((short)103, GridCacheRawVersionedEntry::new);
+ factory.register((short)103, GridCacheRawVersionedEntry::new, new
GridCacheRawVersionedEntrySerializer());
factory.register((short)104, GridCacheVersionEx::new, new
GridCacheVersionExSerializer());
factory.register((short)106, GridQueryCancelRequest::new, new
GridQueryCancelRequestSerializer());
factory.register((short)107, GridQueryFailResponse::new, new
GridQueryFailResponseSerializer());
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index bb3ed8c4e82..b13161079eb 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -5900,8 +5900,6 @@ public abstract class GridCacheAdapter<K, V> implements
IgniteInternalCache<K, V
0,
ver.conflictVersion());
- e.prepareDirectMarshal(ctx.cacheObjectContext());
-
col.add(e);
if (col.size() == ldr.perNodeBufferSize()) {
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheRawVersionedEntry.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheRawVersionedEntry.java
index ccb93f176dc..88141816abf 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheRawVersionedEntry.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheRawVersionedEntry.java
@@ -17,50 +17,29 @@
package org.apache.ignite.internal.processors.cache.version;
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.nio.ByteBuffer;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.GridDirectTransient;
-import org.apache.ignite.internal.IgniteCodeGeneratingFail;
+import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.processors.cache.CacheObject;
-import org.apache.ignite.internal.processors.cache.CacheObjectContext;
import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry;
import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.marshaller.Marshaller;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.Nullable;
/**
* Raw versioned entry.
*/
-@IgniteCodeGeneratingFail
-public class GridCacheRawVersionedEntry<K, V> extends DataStreamerEntry
implements
- GridCacheVersionedEntry<K, V>, GridCacheVersionable, Externalizable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Key bytes. */
- @GridDirectTransient
- private byte[] keyBytes;
-
- /** Value bytes. */
- private byte[] valBytes;
-
+public class GridCacheRawVersionedEntry<K, V> extends DataStreamerEntry
implements GridCacheVersionedEntry<K, V>, GridCacheVersionable {
/** TTL. */
- private long ttl;
+ @Order(0)
+ long ttl;
/** Expire time. */
- private long expireTime;
+ @Order(1)
+ long expireTime;
/** Version. */
- private GridCacheVersion ver;
+ @Order(2)
+ GridCacheVersion ver;
/**
* {@code Externalizable} support.
@@ -92,28 +71,6 @@ public class GridCacheRawVersionedEntry<K, V> extends
DataStreamerEntry implemen
this.ver = ver;
}
- /**
- * Constructor used in receiver hub where marshalled key and value are
available and we do not want to
- * unmarshal value.
- *
- * @param keyBytes Key.
- * @param valBytes Value bytes.
- * @param expireTime Expire time.
- * @param ttl TTL.
- * @param ver Version.
- */
- public GridCacheRawVersionedEntry(byte[] keyBytes,
- byte[] valBytes,
- long ttl,
- long expireTime,
- GridCacheVersion ver) {
- this.keyBytes = keyBytes;
- this.valBytes = valBytes;
- this.ttl = ttl;
- this.expireTime = expireTime;
- this.ver = ver;
- }
-
/** {@inheritDoc} */
@Override public K key() {
assert key != null : "Entry is being improperly processed.";
@@ -156,162 +113,13 @@ public class GridCacheRawVersionedEntry<K, V> extends
DataStreamerEntry implemen
return ver;
}
- /**
- * Perform internal unmarshal of this entry. It must be performed after
entry is deserialized and before
- * its restored key/value are needed.
- *
- * @param ctx Context.
- * @param marsh Marshaller.
- * @throws IgniteCheckedException If failed.
- */
- public void unmarshal(CacheObjectContext ctx, Marshaller marsh) throws
IgniteCheckedException {
- unmarshalKey(ctx, marsh);
-
- if (val == null && valBytes != null) {
- val = U.unmarshal(marsh, valBytes, U.resolveClassLoader(null,
ctx.classLoader()));
-
- val.finishUnmarshal(ctx, null);
- }
- }
-
- /**
- * Perform internal key unmarshal of this entry. It must be performed
after entry is deserialized and before
- * its restored key/value are needed.
- *
- * @param ctx Context.
- * @param marsh Marshaller.
- * @throws IgniteCheckedException If failed.
- */
- public void unmarshalKey(CacheObjectContext ctx, Marshaller marsh) throws
IgniteCheckedException {
- if (key == null) {
- assert keyBytes != null;
-
- key = U.unmarshal(marsh, keyBytes, U.resolveClassLoader(null,
ctx.classLoader()));
-
- key.finishUnmarshal(ctx, null);
- }
- }
-
- /**
- * @param ctx Context.
- * @throws IgniteCheckedException If failed.
- */
- public void prepareDirectMarshal(CacheObjectContext ctx) throws
IgniteCheckedException {
- key.prepareMarshal(ctx);
-
- if (val != null)
- val.prepareMarshal(ctx);
- }
-
/** {@inheritDoc} */
@Override public short directType() {
return 103;
}
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- if (!super.readFrom(buf, reader))
- return false;
-
- switch (reader.state()) {
- case 2:
- expireTime = reader.readLong();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 3:
- ttl = reader.readLong();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 4:
- valBytes = reader.readByteArray();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 5:
- ver = reader.readMessage();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!super.writeTo(buf, writer))
- return false;
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 2:
- if (!writer.writeLong(expireTime))
- return false;
-
- writer.incrementState();
-
- case 3:
- if (!writer.writeLong(ttl))
- return false;
-
- writer.incrementState();
-
- case 4:
- if (!writer.writeByteArray(valBytes))
- return false;
-
- writer.incrementState();
-
- case 5:
- if (!writer.writeMessage(ver))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- assert false;
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException,
ClassNotFoundException {
- assert false;
- }
-
/** {@inheritDoc} */
@Override public String toString() {
- return S.toString(GridCacheRawVersionedEntry.class, this,
- "keyBytesLen", keyBytes != null ? keyBytes.length : "n/a",
- "valBytesLen", valBytes != null ? valBytes.length : "n/a",
- "super", super.toString());
+ return S.toString(GridCacheRawVersionedEntry.class, this, "super",
super.toString());
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerEntry.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerEntry.java
index 8a8c54603dc..91f6998b446 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerEntry.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerEntry.java
@@ -17,28 +17,28 @@
package org.apache.ignite.internal.processors.datastreamer;
-import java.nio.ByteBuffer;
import java.util.Map;
+import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
/**
*
*/
public class DataStreamerEntry implements Map.Entry<KeyCacheObject,
CacheObject>, Message {
/** */
+ @Order(0)
@GridToStringInclude
- protected KeyCacheObject key;
+ public KeyCacheObject key;
/** */
+ @Order(1)
@GridToStringInclude
- protected CacheObject val;
+ public CacheObject val;
/**
*
@@ -95,61 +95,6 @@ public class DataStreamerEntry implements
Map.Entry<KeyCacheObject, CacheObject>
};
}
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 0:
- if (!writer.writeKeyCacheObject(key))
- return false;
-
- writer.incrementState();
-
- case 1:
- if (!writer.writeCacheObject(val))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- switch (reader.state()) {
- case 0:
- key = reader.readKeyCacheObject();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 1:
- val = reader.readCacheObject();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return true;
- }
-
/** {@inheritDoc} */
@Override public short directType() {
return 95;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/IgniteDrDataStreamerCacheUpdater.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/IgniteDrDataStreamerCacheUpdater.java
index 08c4380fcd1..8f5dc19e20f 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/IgniteDrDataStreamerCacheUpdater.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/IgniteDrDataStreamerCacheUpdater.java
@@ -77,8 +77,6 @@ public class IgniteDrDataStreamerCacheUpdater implements
StreamReceiver<KeyCache
for (Map.Entry<KeyCacheObject, CacheObject> entry0 : col) {
GridCacheRawVersionedEntry entry =
(GridCacheRawVersionedEntry)entry0;
- entry.unmarshal(cacheObjCtx, ctx.marshaller());
-
KeyCacheObject key = entry.getKey();
// Ensure that receiver to not receive special-purpose values
for TTL and expire time.