This is an automated email from the ASF dual-hosted git repository.
sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new e29c66b7854 IGNITE-26774 Use MessageSerializer for
GridDhtAtomicUpdateRequest (#12516)
e29c66b7854 is described below
commit e29c66b78543d38375e44f4c8136a2ed48792025
Author: Didar Shayarov <[email protected]>
AuthorDate: Thu Jan 15 13:11:13 2026 +0300
IGNITE-26774 Use MessageSerializer for GridDhtAtomicUpdateRequest (#12516)
---
.../communication/GridIoMessageFactory.java | 3 +-
.../atomic/GridDhtAtomicAbstractUpdateRequest.java | 138 -----
.../dht/atomic/GridDhtAtomicUpdateRequest.java | 577 ++++++++++-----------
3 files changed, 266 insertions(+), 452 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 190e61faea4..6dc2d02dd0a 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -68,6 +68,7 @@ import
org.apache.ignite.internal.codegen.GridDhtAffinityAssignmentRequestSerial
import
org.apache.ignite.internal.codegen.GridDhtAtomicDeferredUpdateResponseSerializer;
import org.apache.ignite.internal.codegen.GridDhtAtomicNearResponseSerializer;
import
org.apache.ignite.internal.codegen.GridDhtAtomicSingleUpdateRequestSerializer;
+import org.apache.ignite.internal.codegen.GridDhtAtomicUpdateRequestSerializer;
import
org.apache.ignite.internal.codegen.GridDhtAtomicUpdateResponseSerializer;
import org.apache.ignite.internal.codegen.GridDhtForceKeysRequestSerializer;
import org.apache.ignite.internal.codegen.GridDhtForceKeysResponseSerializer;
@@ -387,7 +388,7 @@ public class GridIoMessageFactory implements
MessageFactoryProvider {
factory.register((short)35, GridDhtTxPrepareResponse::new, new
GridDhtTxPrepareResponseSerializer());
factory.register((short)36, GridDhtUnlockRequest::new, new
GridDhtUnlockRequestSerializer());
factory.register((short)37, GridDhtAtomicDeferredUpdateResponse::new,
new GridDhtAtomicDeferredUpdateResponseSerializer());
- factory.register((short)38, GridDhtAtomicUpdateRequest::new);
+ factory.register((short)38, GridDhtAtomicUpdateRequest::new, new
GridDhtAtomicUpdateRequestSerializer());
factory.register((short)39, GridDhtAtomicUpdateResponse::new, new
GridDhtAtomicUpdateResponseSerializer());
factory.register((short)40, GridNearAtomicFullUpdateRequest::new);
factory.register((short)41, GridNearAtomicUpdateResponse::new, new
GridNearAtomicUpdateResponseSerializer());
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
index 49fc5c2af21..e9e09cea3c6 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
@@ -17,11 +17,9 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
-import java.nio.ByteBuffer;
import java.util.UUID;
import javax.cache.processor.EntryProcessor;
import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.GridDirectTransient;
import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheObject;
@@ -32,8 +30,6 @@ import
org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -82,11 +78,9 @@ public abstract class GridDhtAtomicAbstractUpdateRequest
extends GridCacheIdMess
protected int taskNameHash;
/** Node ID. */
- @GridDirectTransient
protected UUID nodeId;
/** On response flag. Access should be synced on future. */
- @GridDirectTransient
private boolean onRes;
/** */
@@ -498,138 +492,6 @@ public abstract class GridDhtAtomicAbstractUpdateRequest
extends GridCacheIdMess
return (flags & mask) != 0;
}
- // TODO: remove after IGNITE-26774
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!super.writeTo(buf, writer))
- return false;
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 4:
- if (!writer.writeByte(flags))
- return false;
-
- writer.incrementState();
-
- case 5:
- if (!writer.writeLong(futId))
- return false;
-
- writer.incrementState();
-
- case 6:
- if (!writer.writeLong(nearFutId))
- return false;
-
- writer.incrementState();
-
- case 7:
- if (!writer.writeUuid(nearNodeId))
- return false;
-
- writer.incrementState();
-
- case 8:
- if (!writer.writeInt(taskNameHash))
- return false;
-
- writer.incrementState();
-
- case 9:
- if (!writer.writeAffinityTopologyVersion(topVer))
- return false;
-
- writer.incrementState();
-
- case 10:
- if (!writer.writeMessage(writeVer))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- if (!super.readFrom(buf, reader))
- return false;
-
- switch (reader.state()) {
- case 4:
- flags = reader.readByte();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 5:
- futId = reader.readLong();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 6:
- nearFutId = reader.readLong();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 7:
- nearNodeId = reader.readUuid();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 8:
- taskNameHash = reader.readInt();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 9:
- topVer = reader.readAffinityTopologyVersion();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 10:
- writeVer = reader.readMessage();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return true;
- }
-
/** {@inheritDoc} */
@Override public String toString() {
StringBuilder flagsStr = new StringBuilder();
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
index 28415c85490..265e13ab382 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
@@ -17,14 +17,12 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import javax.cache.processor.EntryProcessor;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.GridDirectCollection;
-import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -37,9 +35,6 @@ import
org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
-import
org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -49,77 +44,81 @@ import org.jetbrains.annotations.Nullable;
public class GridDhtAtomicUpdateRequest extends
GridDhtAtomicAbstractUpdateRequest {
/** Keys to update. */
@GridToStringInclude
- @GridDirectCollection(KeyCacheObject.class)
+ @Order(11)
private List<KeyCacheObject> keys;
/** Values to update. */
@GridToStringInclude
- @GridDirectCollection(CacheObject.class)
+ @Order(value = 12, method = "values")
private List<CacheObject> vals;
/** Previous values. */
@GridToStringInclude
- @GridDirectCollection(CacheObject.class)
+ @Order(value = 13, method = "previousValues")
private List<CacheObject> prevVals;
/** Conflict versions. */
- @GridDirectCollection(GridCacheVersion.class)
+ @Order(value = 14, method = "conflictVersions")
private List<GridCacheVersion> conflictVers;
/** TTLs. */
+ @Order(15)
private GridLongList ttls;
/** Conflict expire time. */
+ @Order(16)
private GridLongList conflictExpireTimes;
/** Near TTLs. */
+ @Order(17)
private GridLongList nearTtls;
/** Near expire times. */
+ @Order(18)
private GridLongList nearExpireTimes;
/** Near cache keys to update. */
@GridToStringInclude
- @GridDirectCollection(KeyCacheObject.class)
+ @Order(19)
private List<KeyCacheObject> nearKeys;
/** Values to update. */
@GridToStringInclude
- @GridDirectCollection(CacheObject.class)
+ @Order(value = 20, method = "nearValues")
private List<CacheObject> nearVals;
/** Obsolete near values. */
@GridToStringInclude
- @GridDirectCollection(Integer.class)
+ @Order(21)
private List<Integer> obsoleteIndexes;
/** Force transform backups flag. */
+ @Order(22)
private boolean forceTransformBackups;
/** Entry processors. */
- @GridDirectTransient
private List<EntryProcessor<Object, Object, Object>> entryProcessors;
/** Entry processors bytes. */
- @GridDirectCollection(byte[].class)
+ @Order(23)
private List<byte[]> entryProcessorsBytes;
/** Near entry processors. */
- @GridDirectTransient
private List<EntryProcessor<Object, Object, Object>> nearEntryProcessors;
/** Near entry processors bytes. */
- @GridDirectCollection(byte[].class)
+ @Order(24)
private List<byte[]> nearEntryProcessorsBytes;
/** Optional arguments for entry processor. */
- @GridDirectTransient
private Object[] invokeArgs;
/** Entry processor arguments bytes. */
- private byte[][] invokeArgsBytes;
+ @Order(value = 25, method = "invokeArgumentsBytes")
+ private List<byte[]> invokeArgsBytes;
/** Partition. */
+ @Order(value = 26, method = "updateCounters")
private GridLongList updateCntrs;
/**
@@ -188,7 +187,7 @@ public class GridDhtAtomicUpdateRequest extends
GridDhtAtomicAbstractUpdateReque
/** {@inheritDoc} */
@Override public void addWriteValue(KeyCacheObject key,
@Nullable CacheObject val,
- EntryProcessor<Object, Object, Object> entryProcessor,
+ EntryProcessor<Object, Object, Object> entryProc,
long ttl,
long conflictExpireTime,
@Nullable GridCacheVersion conflictVer,
@@ -201,9 +200,9 @@ public class GridDhtAtomicUpdateRequest extends
GridDhtAtomicAbstractUpdateReque
keys.add(key);
if (forceTransformBackups) {
- assert entryProcessor != null;
+ assert entryProc != null;
- entryProcessors.add(entryProcessor);
+ entryProcessors.add(entryProc);
}
else
vals.add(val);
@@ -234,25 +233,21 @@ public class GridDhtAtomicUpdateRequest extends
GridDhtAtomicAbstractUpdateReque
else if (conflictVers != null)
conflictVers.add(null);
- if (ttl >= 0) {
- if (ttls == null) {
- ttls = new GridLongList(keys.size());
+ if (ttl >= 0 && ttls == null) {
+ ttls = new GridLongList(keys.size());
- for (int i = 0; i < keys.size() - 1; i++)
- ttls.add(CU.TTL_NOT_CHANGED);
- }
+ for (int i = 0; i < keys.size() - 1; i++)
+ ttls.add(CU.TTL_NOT_CHANGED);
}
if (ttls != null)
ttls.add(ttl);
- if (conflictExpireTime >= 0) {
- if (conflictExpireTimes == null) {
- conflictExpireTimes = new GridLongList(keys.size());
+ if (conflictExpireTime >= 0 && conflictExpireTimes == null) {
+ conflictExpireTimes = new GridLongList(keys.size());
- for (int i = 0; i < keys.size() - 1; i++)
- conflictExpireTimes.add(CU.EXPIRE_TIME_CALCULATE);
- }
+ for (int i = 0; i < keys.size() - 1; i++)
+ conflictExpireTimes.add(CU.EXPIRE_TIME_CALCULATE);
}
if (conflictExpireTimes != null)
@@ -262,7 +257,7 @@ public class GridDhtAtomicUpdateRequest extends
GridDhtAtomicAbstractUpdateReque
/** {@inheritDoc} */
@Override public void addNearWriteValue(KeyCacheObject key,
@Nullable CacheObject val,
- EntryProcessor<Object, Object, Object> entryProcessor,
+ EntryProcessor<Object, Object, Object> entryProc,
long ttl,
long expireTime) {
assert key.partition() >= 0 : key;
@@ -292,32 +287,28 @@ public class GridDhtAtomicUpdateRequest extends
GridDhtAtomicAbstractUpdateReque
nearKeys.add(key);
if (forceTransformBackups) {
- assert entryProcessor != null;
+ assert entryProc != null;
- nearEntryProcessors.add(entryProcessor);
+ nearEntryProcessors.add(entryProc);
}
else
nearVals.add(val);
- if (ttl >= 0) {
- if (nearTtls == null) {
- nearTtls = new GridLongList(nearKeys.size());
+ if (ttl >= 0 && nearTtls == null) {
+ nearTtls = new GridLongList(nearKeys.size());
- for (int i = 0; i < nearKeys.size() - 1; i++)
- nearTtls.add(CU.TTL_NOT_CHANGED);
- }
+ for (int i = 0; i < nearKeys.size() - 1; i++)
+ nearTtls.add(CU.TTL_NOT_CHANGED);
}
if (nearTtls != null)
nearTtls.add(ttl);
- if (expireTime >= 0) {
- if (nearExpireTimes == null) {
- nearExpireTimes = new GridLongList(nearKeys.size());
+ if (expireTime >= 0 && nearExpireTimes == null) {
+ nearExpireTimes = new GridLongList(nearKeys.size());
- for (int i = 0; i < nearKeys.size() - 1; i++)
- nearExpireTimes.add(CU.EXPIRE_TIME_CALCULATE);
- }
+ for (int i = 0; i < nearKeys.size() - 1; i++)
+ nearExpireTimes.add(CU.EXPIRE_TIME_CALCULATE);
}
if (nearExpireTimes != null)
@@ -329,6 +320,13 @@ public class GridDhtAtomicUpdateRequest extends
GridDhtAtomicAbstractUpdateReque
return forceTransformBackups;
}
+ /**
+ * @param forceTransformBackups New force transform backups flag.
+ */
+ public void forceTransformBackups(boolean forceTransformBackups) {
+ this.forceTransformBackups = forceTransformBackups;
+ }
+
/** {@inheritDoc} */
@Override public int size() {
return keys.size();
@@ -354,6 +352,20 @@ public class GridDhtAtomicUpdateRequest extends
GridDhtAtomicAbstractUpdateReque
return keys.get(idx);
}
+ /**
+ * @return Keys to update.
+ */
+ public List<KeyCacheObject> keys() {
+ return keys;
+ }
+
+ /**
+ * @param keys New keys to update.
+ */
+ public void keys(List<KeyCacheObject> keys) {
+ this.keys = keys;
+ }
+
/** {@inheritDoc} */
@Override public Long updateCounter(int updCntr) {
if (updateCntrs != null && updCntr < updateCntrs.size())
@@ -367,6 +379,20 @@ public class GridDhtAtomicUpdateRequest extends
GridDhtAtomicAbstractUpdateReque
return nearKeys.get(idx);
}
+ /**
+ * @return Near cache keys to update.
+ */
+ public List<KeyCacheObject> nearKeys() {
+ return nearKeys;
+ }
+
+ /**
+ * @param nearKeys New near cache keys to update.
+ */
+ public void nearKeys(List<KeyCacheObject> nearKeys) {
+ this.nearKeys = nearKeys;
+ }
+
/** {@inheritDoc} */
@Override @Nullable public CacheObject value(int idx) {
if (vals != null)
@@ -375,6 +401,20 @@ public class GridDhtAtomicUpdateRequest extends
GridDhtAtomicAbstractUpdateReque
return null;
}
+ /**
+ * @return Values to update.
+ */
+ public List<CacheObject> values() {
+ return vals;
+ }
+
+ /**
+ * @param vals New values to update.
+ */
+ public void values(List<CacheObject> vals) {
+ this.vals = vals;
+ }
+
/** {@inheritDoc} */
@Override @Nullable public CacheObject previousValue(int idx) {
if (prevVals != null)
@@ -383,6 +423,20 @@ public class GridDhtAtomicUpdateRequest extends
GridDhtAtomicAbstractUpdateReque
return null;
}
+ /**
+ * @return Previous values.
+ */
+ public List<CacheObject> previousValues() {
+ return prevVals;
+ }
+
+ /**
+ * @param prevVals New previous values.
+ */
+ public void previousValues(List<CacheObject> prevVals) {
+ this.prevVals = prevVals;
+ }
+
/** {@inheritDoc} */
@Override @Nullable public EntryProcessor<Object, Object, Object>
entryProcessor(int idx) {
return entryProcessors == null ? null : entryProcessors.get(idx);
@@ -396,6 +450,20 @@ public class GridDhtAtomicUpdateRequest extends
GridDhtAtomicAbstractUpdateReque
return null;
}
+ /**
+ * @return Values to update.
+ */
+ public List<CacheObject> nearValues() {
+ return nearVals;
+ }
+
+ /**
+ * @param nearVals New values to update.
+ */
+ public void nearValues(List<CacheObject> nearVals) {
+ this.nearVals = nearVals;
+ }
+
/** {@inheritDoc} */
@Override @Nullable public EntryProcessor<Object, Object, Object>
nearEntryProcessor(int idx) {
return nearEntryProcessors == null ? null :
nearEntryProcessors.get(idx);
@@ -412,6 +480,20 @@ public class GridDhtAtomicUpdateRequest extends
GridDhtAtomicAbstractUpdateReque
return null;
}
+ /**
+ * @return Conflict versions.
+ */
+ public List<GridCacheVersion> conflictVersions() {
+ return conflictVers;
+ }
+
+ /**
+ * @param conflictVers New conflict versions.
+ */
+ public void conflictVersions(List<GridCacheVersion> conflictVers) {
+ this.conflictVers = conflictVers;
+ }
+
/** {@inheritDoc} */
@Override public long ttl(int idx) {
if (ttls != null) {
@@ -423,6 +505,20 @@ public class GridDhtAtomicUpdateRequest extends
GridDhtAtomicAbstractUpdateReque
return CU.TTL_NOT_CHANGED;
}
+ /**
+ * @return TTLs.
+ */
+ public GridLongList ttls() {
+ return ttls;
+ }
+
+ /**
+ * @param ttls New TTLs.
+ */
+ public void ttls(GridLongList ttls) {
+ this.ttls = ttls;
+ }
+
/** {@inheritDoc} */
@Override public long nearTtl(int idx) {
if (nearTtls != null) {
@@ -434,6 +530,20 @@ public class GridDhtAtomicUpdateRequest extends
GridDhtAtomicAbstractUpdateReque
return CU.TTL_NOT_CHANGED;
}
+ /**
+ * @return Near TTLs.
+ */
+ public GridLongList nearTtls() {
+ return nearTtls;
+ }
+
+ /**
+ * @param nearTtls New near TTLs.
+ */
+ public void nearTtls(GridLongList nearTtls) {
+ this.nearTtls = nearTtls;
+ }
+
/** {@inheritDoc} */
@Override public int partition() {
assert !F.isEmpty(keys) || !F.isEmpty(nearKeys);
@@ -456,6 +566,20 @@ public class GridDhtAtomicUpdateRequest extends
GridDhtAtomicAbstractUpdateReque
return CU.EXPIRE_TIME_CALCULATE;
}
+ /**
+ * @return Conflict expire times.
+ */
+ public GridLongList conflictExpireTimes() {
+ return conflictExpireTimes;
+ }
+
+ /**
+ * @param conflictExpireTimes New conflict expire times.
+ */
+ public void conflictExpireTimes(GridLongList conflictExpireTimes) {
+ this.conflictExpireTimes = conflictExpireTimes;
+ }
+
/** {@inheritDoc} */
@Override public long nearExpireTime(int idx) {
if (nearExpireTimes != null) {
@@ -467,16 +591,100 @@ public class GridDhtAtomicUpdateRequest extends
GridDhtAtomicAbstractUpdateReque
return CU.EXPIRE_TIME_CALCULATE;
}
+ /**
+ * @return Near expire times.
+ */
+ public GridLongList nearExpireTimes() {
+ return nearExpireTimes;
+ }
+
+ /**
+ * @param nearExpireTimes New near expire times.
+ */
+ public void nearExpireTimes(GridLongList nearExpireTimes) {
+ this.nearExpireTimes = nearExpireTimes;
+ }
+
+ /**
+ * @return Obsolete near values.
+ */
+ public List<Integer> obsoleteIndexes() {
+ return obsoleteIndexes;
+ }
+
+ /**
+ * @param obsoleteIndexes New obsolete near values.
+ */
+ public void obsoleteIndexes(List<Integer> obsoleteIndexes) {
+ this.obsoleteIndexes = obsoleteIndexes;
+ }
+
+ /**
+ * @return Partition update counters.
+ */
+ public GridLongList updateCounters() {
+ return updateCntrs;
+ }
+
+ /**
+ * @param updateCntrs New partition update counters.
+ */
+ public void updateCounters(GridLongList updateCntrs) {
+ this.updateCntrs = updateCntrs;
+ }
+
+ /**
+ * @return Serialized entry processors.
+ */
+ public List<byte[]> entryProcessorsBytes() {
+ return entryProcessorsBytes;
+ }
+
+ /**
+ * @param entryProcessorsBytes New entry processors.
+ */
+ public void entryProcessorsBytes(List<byte[]> entryProcessorsBytes) {
+ this.entryProcessorsBytes = entryProcessorsBytes;
+ }
+
+ /**
+ * @return Serialized near entry processors.
+ */
+ public List<byte[]> nearEntryProcessorsBytes() {
+ return nearEntryProcessorsBytes;
+ }
+
+ /**
+ * @param nearEntryProcessorsBytes New serialized near entry processors.
+ */
+ public void nearEntryProcessorsBytes(List<byte[]>
nearEntryProcessorsBytes) {
+ this.nearEntryProcessorsBytes = nearEntryProcessorsBytes;
+ }
+
+ /**
+ * @return Serialized optional entry processor arguments.
+ */
+ public List<byte[]> invokeArgumentsBytes() {
+ return invokeArgsBytes;
+ }
+
+ /**
+ * @param invokeArgsBytes New serialized optional entry processor
arguments.
+ */
+ public void invokeArgumentsBytes(List<byte[]> invokeArgsBytes) {
+ this.invokeArgsBytes = invokeArgsBytes;
+ }
+
/** {@inheritDoc} */
@Override @Nullable public Object[] invokeArguments() {
return invokeArgs;
}
/** {@inheritDoc} */
- @Override public void prepareMarshal(GridCacheSharedContext ctx) throws
IgniteCheckedException {
+ @Override public void prepareMarshal(GridCacheSharedContext<?, ?> ctx)
throws IgniteCheckedException {
super.prepareMarshal(ctx);
- GridCacheContext cctx = ctx.cacheContext(cacheId);
+ GridCacheContext<?, ?> cctx = ctx.cacheContext(cacheId);
prepareMarshalCacheObjects(keys, cctx);
@@ -494,7 +702,7 @@ public class GridDhtAtomicUpdateRequest extends
GridDhtAtomicAbstractUpdateReque
addDepInfo = true;
if (invokeArgsBytes == null)
- invokeArgsBytes = marshalInvokeArguments(invokeArgs, cctx);
+ invokeArgsBytes = F.asList(marshalInvokeArguments(invokeArgs,
cctx));
if (entryProcessorsBytes == null)
entryProcessorsBytes = marshalCollection(entryProcessors,
cctx);
@@ -505,10 +713,10 @@ public class GridDhtAtomicUpdateRequest extends
GridDhtAtomicAbstractUpdateReque
}
/** {@inheritDoc} */
- @Override public void finishUnmarshal(GridCacheSharedContext ctx,
ClassLoader ldr) throws IgniteCheckedException {
+ @Override public void finishUnmarshal(GridCacheSharedContext<?, ?> ctx,
ClassLoader ldr) throws IgniteCheckedException {
super.finishUnmarshal(ctx, ldr);
- GridCacheContext cctx = ctx.cacheContext(cacheId);
+ GridCacheContext<?, ?> cctx = ctx.cacheContext(cacheId);
finishUnmarshalCacheObjects(keys, cctx, ldr);
@@ -524,271 +732,14 @@ public class GridDhtAtomicUpdateRequest extends
GridDhtAtomicAbstractUpdateReque
if (entryProcessors == null)
entryProcessors = unmarshalCollection(entryProcessorsBytes,
ctx, ldr);
- if (invokeArgs == null)
- invokeArgs = unmarshalInvokeArguments(invokeArgsBytes, ctx,
ldr);
+ if (invokeArgsBytes != null && invokeArgs == null)
+ invokeArgs =
unmarshalInvokeArguments(invokeArgsBytes.toArray(new
byte[invokeArgsBytes.size()][]), ctx, ldr);
if (nearEntryProcessors == null)
nearEntryProcessors =
unmarshalCollection(nearEntryProcessorsBytes, ctx, ldr);
}
}
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!super.writeTo(buf, writer))
- return false;
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 11:
- if (!writer.writeGridLongList(conflictExpireTimes))
- return false;
-
- writer.incrementState();
-
- case 12:
- if (!writer.writeCollection(conflictVers,
MessageCollectionItemType.MSG))
- return false;
-
- writer.incrementState();
-
- case 13:
- if (!writer.writeCollection(entryProcessorsBytes,
MessageCollectionItemType.BYTE_ARR))
- return false;
-
- writer.incrementState();
-
- case 14:
- if (!writer.writeBoolean(forceTransformBackups))
- return false;
-
- writer.incrementState();
-
- case 15:
- if (!writer.writeObjectArray(invokeArgsBytes,
MessageCollectionItemType.BYTE_ARR))
- return false;
-
- writer.incrementState();
-
- case 16:
- if (!writer.writeCollection(keys,
MessageCollectionItemType.KEY_CACHE_OBJECT))
- return false;
-
- writer.incrementState();
-
- case 17:
- if (!writer.writeCollection(nearEntryProcessorsBytes,
MessageCollectionItemType.BYTE_ARR))
- return false;
-
- writer.incrementState();
-
- case 18:
- if (!writer.writeGridLongList(nearExpireTimes))
- return false;
-
- writer.incrementState();
-
- case 19:
- if (!writer.writeCollection(nearKeys,
MessageCollectionItemType.KEY_CACHE_OBJECT))
- return false;
-
- writer.incrementState();
-
- case 20:
- if (!writer.writeGridLongList(nearTtls))
- return false;
-
- writer.incrementState();
-
- case 21:
- if (!writer.writeCollection(nearVals,
MessageCollectionItemType.CACHE_OBJECT))
- return false;
-
- writer.incrementState();
-
- case 22:
- if (!writer.writeCollection(obsoleteIndexes,
MessageCollectionItemType.INT))
- return false;
-
- writer.incrementState();
-
- case 23:
- if (!writer.writeCollection(prevVals,
MessageCollectionItemType.CACHE_OBJECT))
- return false;
-
- writer.incrementState();
-
- case 24:
- if (!writer.writeGridLongList(ttls))
- return false;
-
- writer.incrementState();
-
- case 25:
- if (!writer.writeGridLongList(updateCntrs))
- return false;
-
- writer.incrementState();
-
- case 26:
- if (!writer.writeCollection(vals,
MessageCollectionItemType.CACHE_OBJECT))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- if (!super.readFrom(buf, reader))
- return false;
-
- switch (reader.state()) {
- case 11:
- conflictExpireTimes = reader.readGridLongList();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 12:
- conflictVers =
reader.readCollection(MessageCollectionItemType.MSG);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 13:
- entryProcessorsBytes =
reader.readCollection(MessageCollectionItemType.BYTE_ARR);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 14:
- forceTransformBackups = reader.readBoolean();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 15:
- invokeArgsBytes =
reader.readObjectArray(MessageCollectionItemType.BYTE_ARR, byte[].class);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 16:
- keys =
reader.readCollection(MessageCollectionItemType.KEY_CACHE_OBJECT);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 17:
- nearEntryProcessorsBytes =
reader.readCollection(MessageCollectionItemType.BYTE_ARR);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 18:
- nearExpireTimes = reader.readGridLongList();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 19:
- nearKeys =
reader.readCollection(MessageCollectionItemType.KEY_CACHE_OBJECT);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 20:
- nearTtls = reader.readGridLongList();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 21:
- nearVals =
reader.readCollection(MessageCollectionItemType.CACHE_OBJECT);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 22:
- obsoleteIndexes =
reader.readCollection(MessageCollectionItemType.INT);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 23:
- prevVals =
reader.readCollection(MessageCollectionItemType.CACHE_OBJECT);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 24:
- ttls = reader.readGridLongList();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 25:
- updateCntrs = reader.readGridLongList();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 26:
- vals =
reader.readCollection(MessageCollectionItemType.CACHE_OBJECT);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return true;
- }
-
/** {@inheritDoc} */
@Override protected void cleanup() {
nearVals = null;