This is an automated email from the ASF dual-hosted git repository.
sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new f021bbd67dc IGNITE-26307 Use MessageSerializer for
GridDistributedTxPrepareRequest (#12350)
f021bbd67dc is described below
commit f021bbd67dc3f7ee4410ed04a2efede4def2a3cd
Author: Dmitry Werner <[email protected]>
AuthorDate: Thu Sep 18 18:02:58 2025 +0500
IGNITE-26307 Use MessageSerializer for GridDistributedTxPrepareRequest
(#12350)
---
.../communication/GridIoMessageFactory.java | 9 +-
.../distributed/GridDistributedBaseMessage.java | 20 ++
.../GridDistributedTxPrepareRequest.java | 388 ++++++++-------------
.../distributed/dht/GridDhtTxPrepareRequest.java | 382 ++++++++------------
.../distributed/near/GridNearTxPrepareRequest.java | 169 +++------
5 files changed, 363 insertions(+), 605 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 4a2ac144135..3f0d245b619 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -38,11 +38,14 @@ import
org.apache.ignite.internal.codegen.GridDeploymentResponseSerializer;
import org.apache.ignite.internal.codegen.GridDhtPartitionExchangeIdSerializer;
import
org.apache.ignite.internal.codegen.GridDhtPartitionsSingleRequestSerializer;
import
org.apache.ignite.internal.codegen.GridDhtTxOnePhaseCommitAckRequestSerializer;
+import org.apache.ignite.internal.codegen.GridDhtTxPrepareRequestSerializer;
+import
org.apache.ignite.internal.codegen.GridDistributedTxPrepareRequestSerializer;
import org.apache.ignite.internal.codegen.GridIntListSerializer;
import org.apache.ignite.internal.codegen.GridJobCancelRequestSerializer;
import org.apache.ignite.internal.codegen.GridJobSiblingsRequestSerializer;
import
org.apache.ignite.internal.codegen.GridNearAtomicCheckUpdateRequestSerializer;
import
org.apache.ignite.internal.codegen.GridNearAtomicUpdateResponseSerializer;
+import org.apache.ignite.internal.codegen.GridNearTxPrepareRequestSerializer;
import org.apache.ignite.internal.codegen.GridQueryCancelRequestSerializer;
import org.apache.ignite.internal.codegen.GridQueryFailResponseSerializer;
import org.apache.ignite.internal.codegen.GridQueryKillRequestSerializer;
@@ -259,7 +262,7 @@ public class GridIoMessageFactory implements
MessageFactoryProvider {
factory.register((short)22, GridDistributedLockResponse::new);
factory.register((short)23, GridDistributedTxFinishRequest::new);
factory.register((short)24, GridDistributedTxFinishResponse::new);
- factory.register((short)25, GridDistributedTxPrepareRequest::new);
+ factory.register((short)25, GridDistributedTxPrepareRequest::new, new
GridDistributedTxPrepareRequestSerializer());
factory.register((short)26, GridDistributedTxPrepareResponse::new);
factory.register((short)27, GridDistributedUnlockRequest::new);
factory.register((short)28, GridDhtAffinityAssignmentRequest::new);
@@ -268,7 +271,7 @@ public class GridIoMessageFactory implements
MessageFactoryProvider {
factory.register((short)31, GridDhtLockResponse::new);
factory.register((short)32, GridDhtTxFinishRequest::new);
factory.register((short)33, GridDhtTxFinishResponse::new);
- factory.register((short)34, GridDhtTxPrepareRequest::new);
+ factory.register((short)34, GridDhtTxPrepareRequest::new, new
GridDhtTxPrepareRequestSerializer());
factory.register((short)35, GridDhtTxPrepareResponse::new);
factory.register((short)36, GridDhtUnlockRequest::new);
factory.register((short)37, GridDhtAtomicDeferredUpdateResponse::new);
@@ -288,7 +291,7 @@ public class GridIoMessageFactory implements
MessageFactoryProvider {
factory.register((short)52, GridNearLockResponse::new);
factory.register((short)53, GridNearTxFinishRequest::new);
factory.register((short)54, GridNearTxFinishResponse::new);
- factory.register((short)55, GridNearTxPrepareRequest::new);
+ factory.register((short)55, GridNearTxPrepareRequest::new, new
GridNearTxPrepareRequestSerializer());
factory.register((short)56, GridNearTxPrepareResponse::new);
factory.register((short)57, GridNearUnlockRequest::new);
factory.register((short)58, GridCacheQueryRequest::new);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java
index 3ee379d57ab..264caa27e6c 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java
@@ -22,6 +22,7 @@ import java.util.Collection;
import java.util.Collections;
import org.apache.ignite.internal.GridDirectCollection;
import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -38,15 +39,18 @@ import
org.apache.ignite.plugin.extensions.communication.MessageWriter;
public abstract class GridDistributedBaseMessage extends GridCacheIdMessage
implements GridCacheDeployable,
GridCacheVersionable {
/** Lock or transaction version. */
+ @Order(value = 4, method = "version")
@GridToStringInclude
protected GridCacheVersion ver;
/** Committed versions with order higher than one for this message (needed
for commit ordering). */
+ @Order(value = 5, method = "committedVersions")
@GridToStringInclude
@GridDirectCollection(GridCacheVersion.class)
private Collection<GridCacheVersion> committedVers;
/** Rolled back versions with order higher than one for this message
(needed for commit ordering). */
+ @Order(value = 6, method = "rolledbackVersions")
@GridToStringInclude
@GridDirectCollection(GridCacheVersion.class)
private Collection<GridCacheVersion> rolledbackVers;
@@ -123,6 +127,13 @@ public abstract class GridDistributedBaseMessage extends
GridCacheIdMessage impl
return committedVers == null ?
Collections.<GridCacheVersion>emptyList() : committedVers;
}
+ /**
+ * @param committedVers Committed versions.
+ */
+ public void committedVersions(Collection<GridCacheVersion> committedVers) {
+ this.committedVers = committedVers;
+ }
+
/**
* @return Rolled back versions.
*/
@@ -130,6 +141,13 @@ public abstract class GridDistributedBaseMessage extends
GridCacheIdMessage impl
return rolledbackVers == null ?
Collections.<GridCacheVersion>emptyList() : rolledbackVers;
}
+ /**
+ * @param rolledbackVers Rolled back versions.
+ */
+ public void rolledbackVersions(Collection<GridCacheVersion>
rolledbackVers) {
+ this.rolledbackVers = rolledbackVers;
+ }
+
/**
* @return Count of keys referenced in candidates array (needed only
locally for optimization).
*/
@@ -139,6 +157,7 @@ public abstract class GridDistributedBaseMessage extends
GridCacheIdMessage impl
/** {@inheritDoc} */
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ // TODO: Remove #writeTo() after all inheritors have migrated to the
new ser/der scheme (IGNITE-25490).
writer.setBuffer(buf);
if (!super.writeTo(buf, writer))
@@ -177,6 +196,7 @@ public abstract class GridDistributedBaseMessage extends
GridCacheIdMessage impl
/** {@inheritDoc} */
@Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ // TODO: Remove #readFrom() after all inheritors have migrated to the
new ser/der scheme (IGNITE-25490).
reader.setBuffer(buf);
if (!super.readFrom(buf, reader))
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
index eae5ba8439f..017d19d82b1 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.processors.cache.distributed;
-import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -26,9 +25,7 @@ import java.util.Map;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.GridDirectCollection;
-import org.apache.ignite.internal.GridDirectMap;
-import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import
org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
@@ -44,9 +41,6 @@ import
org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
-import
org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
import org.jetbrains.annotations.Nullable;
@@ -81,6 +75,7 @@ public class GridDistributedTxPrepareRequest extends
GridDistributedBaseMessage
private static final C1<UUIDCollectionMessage, Collection<UUID>>
MSG_TO_COL = UUIDCollectionMessage::uuids;
/** Thread ID. */
+ @Order(7)
@GridToStringInclude
private long threadId;
@@ -88,65 +83,74 @@ public class GridDistributedTxPrepareRequest extends
GridDistributedBaseMessage
@GridToStringInclude
private TransactionConcurrency concurrency;
+ /** Transaction concurrency ordinal. */
+ @Order(value = 8, method = "concurrencyOrdinal")
+ private byte concurrencyOrd;
+
/** Transaction isolation. */
@GridToStringInclude
private TransactionIsolation isolation;
+ /** Transaction isolation ordinal. */
+ @Order(value = 9, method = "isolationOrdinal")
+ private byte isolationOrd;
+
/** Commit version for EC transactions. */
+ @Order(value = 10, method = "writeVersion")
@GridToStringInclude
private GridCacheVersion writeVer;
/** Transaction timeout. */
+ @Order(11)
@GridToStringInclude
private long timeout;
/** Transaction read set. */
+ @Order(12)
@GridToStringInclude
- @GridDirectCollection(IgniteTxEntry.class)
private Collection<IgniteTxEntry> reads;
/** Transaction write entries. */
+ @Order(13)
@GridToStringInclude
- @GridDirectCollection(IgniteTxEntry.class)
private Collection<IgniteTxEntry> writes;
/** DHT versions to verify. */
@GridToStringInclude
- @GridDirectTransient
private Map<IgniteTxKey, GridCacheVersion> dhtVers;
/** */
- @GridDirectCollection(IgniteTxKey.class)
+ @Order(value = 14, method = "dhtVersionKeys")
private Collection<IgniteTxKey> dhtVerKeys;
/** */
- @GridDirectCollection(GridCacheVersion.class)
+ @Order(value = 15, method = "dhtVersionValues")
private Collection<GridCacheVersion> dhtVerVals;
/** Expected transaction size. */
+ @Order(16)
private int txSize;
/** Transaction nodes mapping (primary node -> related backup nodes). */
- @GridDirectTransient
private Map<UUID, Collection<UUID>> txNodes;
/** Tx nodes direct marshallable message. */
- @GridDirectMap(keyType = UUID.class, valueType =
UUIDCollectionMessage.class)
+ @Order(value = 17, method = "txNodesMessages")
private Map<UUID, UUIDCollectionMessage> txNodesMsg;
/** IO policy. */
+ @Order(value = 18, method = "policy")
private byte plc;
/** Transient TX state. */
- @GridDirectTransient
private IgniteTxState txState;
/** */
+ @Order(19)
@GridToStringExclude
private byte flags;
/** Application attributes. */
- @GridDirectTransient
@GridToStringExclude
private @Nullable Map<String, String> appAttrs;
@@ -184,7 +188,9 @@ public class GridDistributedTxPrepareRequest extends
GridDistributedBaseMessage
writeVer = tx.writeVersion();
threadId = tx.threadId();
concurrency = tx.concurrency();
+ concurrencyOrd = concurrency != null ? (byte)concurrency.ordinal() :
-1;
isolation = tx.isolation();
+ isolationOrd = isolation != null ? (byte)isolation.ordinal() : -1;
txSize = tx.size();
plc = tx.ioPolicy();
@@ -252,6 +258,13 @@ public class GridDistributedTxPrepareRequest extends
GridDistributedBaseMessage
return plc;
}
+ /**
+ * @param plc IO policy.
+ */
+ public void policy(byte plc) {
+ this.plc = plc;
+ }
+
/**
* Adds version to be verified on remote node.
*
@@ -279,6 +292,13 @@ public class GridDistributedTxPrepareRequest extends
GridDistributedBaseMessage
return threadId;
}
+ /**
+ * @param threadId Thread ID.
+ */
+ public void threadId(long threadId) {
+ this.threadId = threadId;
+ }
+
/**
* @return Commit version.
*/
@@ -286,6 +306,13 @@ public class GridDistributedTxPrepareRequest extends
GridDistributedBaseMessage
return writeVer;
}
+ /**
+ * @param writeVer Commit version.
+ */
+ public void writeVersion(GridCacheVersion writeVer) {
+ this.writeVer = writeVer;
+ }
+
/**
* @return Invalidate flag.
*/
@@ -300,6 +327,13 @@ public class GridDistributedTxPrepareRequest extends
GridDistributedBaseMessage
return timeout;
}
+ /**
+ * @param timeout Transaction timeout.
+ */
+ public void timeout(long timeout) {
+ this.timeout = timeout;
+ }
+
/**
* @return Concurrency.
*/
@@ -307,6 +341,22 @@ public class GridDistributedTxPrepareRequest extends
GridDistributedBaseMessage
return concurrency;
}
+ /**
+ * @return Concurrency ordinal.
+ */
+ public byte concurrencyOrdinal() {
+ return concurrencyOrd;
+ }
+
+ /**
+ * @param concurrencyOrd Concurrency ordinal.
+ */
+ public void concurrencyOrdinal(byte concurrencyOrd) {
+ this.concurrencyOrd = concurrencyOrd;
+
+ concurrency = TransactionConcurrency.fromOrdinal(concurrencyOrd);
+ }
+
/**
* @return Isolation level.
*/
@@ -314,6 +364,22 @@ public class GridDistributedTxPrepareRequest extends
GridDistributedBaseMessage
return isolation;
}
+ /**
+ * @return Isolation level ordinal.
+ */
+ public byte isolationOrdinal() {
+ return isolationOrd;
+ }
+
+ /**
+ * @param isolationOrd Isolation level ordinal.
+ */
+ public void isolationOrdinal(byte isolationOrd) {
+ this.isolationOrd = isolationOrd;
+
+ isolation = TransactionIsolation.fromOrdinal(isolationOrd);
+ }
+
/**
* @return Read set.
*/
@@ -331,17 +397,45 @@ public class GridDistributedTxPrepareRequest extends
GridDistributedBaseMessage
/**
* @param reads Reads.
*/
- protected void reads(Collection<IgniteTxEntry> reads) {
+ public void reads(Collection<IgniteTxEntry> reads) {
this.reads = reads;
}
/**
* @param writes Writes.
*/
- protected void writes(Collection<IgniteTxEntry> writes) {
+ public void writes(Collection<IgniteTxEntry> writes) {
this.writes = writes;
}
+ /**
+ * @return DHT version keys.
+ */
+ public Collection<IgniteTxKey> dhtVersionKeys() {
+ return dhtVerKeys;
+ }
+
+ /**
+ * @param dhtVerKeys DHT version keys.
+ */
+ public void dhtVersionKeys(Collection<IgniteTxKey> dhtVerKeys) {
+ this.dhtVerKeys = dhtVerKeys;
+ }
+
+ /**
+ * @return DHT version values.
+ */
+ public Collection<GridCacheVersion> dhtVersionValues() {
+ return dhtVerVals;
+ }
+
+ /**
+ * @param dhtVerVals DHT version values.
+ */
+ public void dhtVersionValues(Collection<GridCacheVersion> dhtVerVals) {
+ this.dhtVerVals = dhtVerVals;
+ }
+
/**
* @return Expected transaction size.
*/
@@ -349,6 +443,41 @@ public class GridDistributedTxPrepareRequest extends
GridDistributedBaseMessage
return txSize;
}
+ /**
+ * @param txSize Expected transaction size.
+ */
+ public void txSize(int txSize) {
+ this.txSize = txSize;
+ }
+
+ /**
+ * @return Tx nodes direct marshallable message.
+ */
+ public Map<UUID, UUIDCollectionMessage> txNodesMessages() {
+ return txNodesMsg;
+ }
+
+ /**
+ * @param txNodesMsg Tx nodes direct marshallable message.
+ */
+ public void txNodesMessages(Map<UUID, UUIDCollectionMessage> txNodesMsg) {
+ this.txNodesMsg = txNodesMsg;
+ }
+
+ /**
+ * @return Flags.
+ */
+ public byte flags() {
+ return flags;
+ }
+
+ /**
+ * @param flags Flags.
+ */
+ public void flags(byte flags) {
+ this.flags = flags;
+ }
+
/**
* @return One phase commit flag.
*/
@@ -474,229 +603,6 @@ public class GridDistributedTxPrepareRequest extends
GridDistributedBaseMessage
return (flags & mask) != 0;
}
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!super.writeTo(buf, writer))
- return false;
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 7:
- if (!writer.writeByte(concurrency != null ?
(byte)concurrency.ordinal() : -1))
- return false;
-
- writer.incrementState();
-
- case 8:
- if (!writer.writeCollection(dhtVerKeys,
MessageCollectionItemType.MSG))
- return false;
-
- writer.incrementState();
-
- case 9:
- if (!writer.writeCollection(dhtVerVals,
MessageCollectionItemType.MSG))
- return false;
-
- writer.incrementState();
-
- case 10:
- if (!writer.writeByte(flags))
- return false;
-
- writer.incrementState();
-
- case 11:
- if (!writer.writeByte(isolation != null ?
(byte)isolation.ordinal() : -1))
- return false;
-
- writer.incrementState();
-
- case 12:
- if (!writer.writeByte(plc))
- return false;
-
- writer.incrementState();
-
- case 13:
- if (!writer.writeCollection(reads,
MessageCollectionItemType.MSG))
- return false;
-
- writer.incrementState();
-
- case 14:
- if (!writer.writeLong(threadId))
- return false;
-
- writer.incrementState();
-
- case 15:
- if (!writer.writeLong(timeout))
- return false;
-
- writer.incrementState();
-
- case 16:
- if (!writer.writeMap(txNodesMsg,
MessageCollectionItemType.UUID, MessageCollectionItemType.MSG))
- return false;
-
- writer.incrementState();
-
- case 17:
- if (!writer.writeInt(txSize))
- return false;
-
- writer.incrementState();
-
- case 18:
- if (!writer.writeMessage(writeVer))
- return false;
-
- writer.incrementState();
-
- case 19:
- if (!writer.writeCollection(writes,
MessageCollectionItemType.MSG))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- if (!super.readFrom(buf, reader))
- return false;
-
- switch (reader.state()) {
- case 7:
- byte concurrencyOrd;
-
- concurrencyOrd = reader.readByte();
-
- if (!reader.isLastRead())
- return false;
-
- concurrency =
TransactionConcurrency.fromOrdinal(concurrencyOrd);
-
- reader.incrementState();
-
- case 8:
- dhtVerKeys =
reader.readCollection(MessageCollectionItemType.MSG);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 9:
- dhtVerVals =
reader.readCollection(MessageCollectionItemType.MSG);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 10:
- flags = reader.readByte();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 11:
- byte isolationOrd;
-
- isolationOrd = reader.readByte();
-
- if (!reader.isLastRead())
- return false;
-
- isolation = TransactionIsolation.fromOrdinal(isolationOrd);
-
- reader.incrementState();
-
- case 12:
- plc = reader.readByte();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 13:
- reads = reader.readCollection(MessageCollectionItemType.MSG);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 14:
- threadId = reader.readLong();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 15:
- timeout = reader.readLong();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 16:
- txNodesMsg = reader.readMap(MessageCollectionItemType.UUID,
MessageCollectionItemType.MSG, false);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 17:
- txSize = reader.readInt();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 18:
- writeVer = reader.readMessage();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 19:
- writes = reader.readCollection(MessageCollectionItemType.MSG);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return true;
- }
-
/** {@inheritDoc} */
@Override public short directType() {
return 25;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
index 6bc0853059c..250be9692be 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.processors.cache.distributed.dht;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection;
@@ -27,8 +26,7 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.GridDirectCollection;
-import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
@@ -41,9 +39,6 @@ import
org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
-import
org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.Nullable;
/**
@@ -51,59 +46,67 @@ import org.jetbrains.annotations.Nullable;
*/
public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
/** Max order. */
+ @Order(20)
private UUID nearNodeId;
/** Future ID. */
+ @Order(value = 21, method = "futureId")
private IgniteUuid futId;
/** Mini future ID. */
+ @Order(22)
private int miniId;
/** Topology version. */
+ @Order(value = 23, method = "topologyVersion")
private AffinityTopologyVersion topVer;
/** Invalidate near entries flags. */
+ @Order(24)
private BitSet invalidateNearEntries;
/** Near writes. */
+ @Order(25)
@GridToStringInclude
- @GridDirectCollection(IgniteTxEntry.class)
private Collection<IgniteTxEntry> nearWrites;
/** Owned versions by key. */
@GridToStringInclude
- @GridDirectTransient
private Map<IgniteTxKey, GridCacheVersion> owned;
/** Owned keys. */
- @GridDirectCollection(IgniteTxKey.class)
+ @Order(26)
private Collection<IgniteTxKey> ownedKeys;
/** Owned values. */
- @GridDirectCollection(GridCacheVersion.class)
+ @Order(value = 27, method = "ownedValues")
private Collection<GridCacheVersion> ownedVals;
/** */
- @GridDirectCollection(PartitionUpdateCountersMessage.class)
+ @Order(value = 28, method = "updateCounters")
private Collection<PartitionUpdateCountersMessage> updCntrs;
/** Near transaction ID. */
+ @Order(value = 29, method = "nearXidVersion")
private GridCacheVersion nearXidVer;
/** Task name hash. */
+ @Order(30)
private int taskNameHash;
/** Preload keys. */
+ @Order(31)
private BitSet preloadKeys;
/** */
- @GridDirectTransient
private List<IgniteTxKey> nearWritesCacheMissed;
/** {@code True} if remote tx should skip adding itself to completed
versions map on finish. */
+ @Order(value = 32, method = "skipCompletedVersion")
private boolean skipCompletedVers;
/** Transaction label. */
+ @Order(value = 33, method = "txLabel")
@GridToStringInclude
@Nullable private String txLbl;
@@ -187,6 +190,13 @@ public class GridDhtTxPrepareRequest extends
GridDistributedTxPrepareRequest {
return updCntrs;
}
+ /**
+ * @param updCntrs Update counters list.
+ */
+ public void updateCounters(Collection<PartitionUpdateCountersMessage>
updCntrs) {
+ this.updCntrs = updCntrs;
+ }
+
/**
* @return Near cache writes for which cache was not found (possible if
client near cache was closed).
*/
@@ -201,6 +211,13 @@ public class GridDhtTxPrepareRequest extends
GridDistributedTxPrepareRequest {
return nearXidVer;
}
+ /**
+ * @param nearXidVer Near transaction ID.
+ */
+ public void nearXidVersion(GridCacheVersion nearXidVer) {
+ this.nearXidVer = nearXidVer;
+ }
+
/**
* @return Near node ID.
*/
@@ -208,6 +225,27 @@ public class GridDhtTxPrepareRequest extends
GridDistributedTxPrepareRequest {
return nearNodeId;
}
+ /**
+ * @param nodeId Near node ID.
+ */
+ public void nearNodeId(UUID nodeId) {
+ nearNodeId = nodeId;
+ }
+
+ /**
+ * @return Invalidate near entries flags.
+ */
+ public BitSet invalidateNearEntries() {
+ return invalidateNearEntries;
+ }
+
+ /**
+ * @param invalidateNearEntries Invalidate near entries flags.
+ */
+ public void invalidateNearEntries(BitSet invalidateNearEntries) {
+ this.invalidateNearEntries = invalidateNearEntries;
+ }
+
/**
* @return Task name hash.
*/
@@ -215,6 +253,13 @@ public class GridDhtTxPrepareRequest extends
GridDistributedTxPrepareRequest {
return taskNameHash;
}
+ /**
+ * @param taskNameHash Task name hash.
+ */
+ public void taskNameHash(int taskNameHash) {
+ this.taskNameHash = taskNameHash;
+ }
+
/**
* @return Near writes.
*/
@@ -222,6 +267,13 @@ public class GridDhtTxPrepareRequest extends
GridDistributedTxPrepareRequest {
return nearWrites == null ? Collections.emptyList() : nearWrites;
}
+ /**
+ * @param nearWrites Near writes.
+ */
+ public void nearWrites(Collection<IgniteTxEntry> nearWrites) {
+ this.nearWrites = nearWrites;
+ }
+
/**
* @param idx Entry index to set invalidation flag.
* @param invalidate Invalidation flag value.
@@ -267,6 +319,13 @@ public class GridDhtTxPrepareRequest extends
GridDistributedTxPrepareRequest {
return futId;
}
+ /**
+ * @param futId Future ID.
+ */
+ public void futureId(IgniteUuid futId) {
+ this.futId = futId;
+ }
+
/**
* @return Mini future ID.
*/
@@ -274,6 +333,13 @@ public class GridDhtTxPrepareRequest extends
GridDistributedTxPrepareRequest {
return miniId;
}
+ /**
+ * @param miniId Mini future ID.
+ */
+ public void miniId(int miniId) {
+ this.miniId = miniId;
+ }
+
/**
* @return Topology version.
*/
@@ -281,6 +347,55 @@ public class GridDhtTxPrepareRequest extends
GridDistributedTxPrepareRequest {
return topVer;
}
+ /**
+ * @param topVer Topology version.
+ */
+ public void topologyVersion(AffinityTopologyVersion topVer) {
+ this.topVer = topVer;
+ }
+
+ /**
+ * @return Owned keys.
+ */
+ public Collection<IgniteTxKey> ownedKeys() {
+ return ownedKeys;
+ }
+
+ /**
+ * @param ownedKeys Owned keys.
+ */
+ public void ownedKeys(Collection<IgniteTxKey> ownedKeys) {
+ this.ownedKeys = ownedKeys;
+ }
+
+ /**
+ * @return Owned values.
+ */
+ public Collection<GridCacheVersion> ownedValues() {
+ return ownedVals;
+ }
+
+ /**
+ * @param ownedVals Owned values.
+ */
+ public void ownedValues(Collection<GridCacheVersion> ownedVals) {
+ this.ownedVals = ownedVals;
+ }
+
+ /**
+ * @return Preload keys.
+ */
+ public BitSet preloadKeys() {
+ return preloadKeys;
+ }
+
+ /**
+ * @param preloadKeys Preload keys.
+ */
+ public void preloadKeys(BitSet preloadKeys) {
+ this.preloadKeys = preloadKeys;
+ }
+
/**
* Sets owner and its mapped version.
*
@@ -308,6 +423,13 @@ public class GridDhtTxPrepareRequest extends
GridDistributedTxPrepareRequest {
return skipCompletedVers;
}
+ /**
+ * @param skipCompletedVers {@code True} if remote tx should skip adding
itself to completed versions map on finish.
+ */
+ public void skipCompletedVersion(boolean skipCompletedVers) {
+ this.skipCompletedVers = skipCompletedVers;
+ }
+
/**
* @return Transaction label.
*/
@@ -315,6 +437,13 @@ public class GridDhtTxPrepareRequest extends
GridDistributedTxPrepareRequest {
return txLbl;
}
+ /**
+ * @param txLbl Transaction label.
+ */
+ public void txLabel(String txLbl) {
+ this.txLbl = txLbl;
+ }
+
/**
* {@inheritDoc}
*
@@ -391,235 +520,6 @@ public class GridDhtTxPrepareRequest extends
GridDistributedTxPrepareRequest {
}
}
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!super.writeTo(buf, writer))
- return false;
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 20:
- if (!writer.writeIgniteUuid(futId))
- return false;
-
- writer.incrementState();
-
- case 21:
- if (!writer.writeBitSet(invalidateNearEntries))
- return false;
-
- writer.incrementState();
-
- case 22:
- if (!writer.writeInt(miniId))
- return false;
-
- writer.incrementState();
-
- case 23:
- if (!writer.writeUuid(nearNodeId))
- return false;
-
- writer.incrementState();
-
- case 24:
- if (!writer.writeCollection(nearWrites,
MessageCollectionItemType.MSG))
- return false;
-
- writer.incrementState();
-
- case 25:
- if (!writer.writeMessage(nearXidVer))
- return false;
-
- writer.incrementState();
-
- case 26:
- if (!writer.writeCollection(ownedKeys,
MessageCollectionItemType.MSG))
- return false;
-
- writer.incrementState();
-
- case 27:
- if (!writer.writeCollection(ownedVals,
MessageCollectionItemType.MSG))
- return false;
-
- writer.incrementState();
-
- case 28:
- if (!writer.writeBitSet(preloadKeys))
- return false;
-
- writer.incrementState();
-
- case 29:
- if (!writer.writeBoolean(skipCompletedVers))
- return false;
-
- writer.incrementState();
-
- case 30:
- if (!writer.writeInt(taskNameHash))
- return false;
-
- writer.incrementState();
-
- case 31:
- if (!writer.writeAffinityTopologyVersion(topVer))
- return false;
-
- writer.incrementState();
-
- case 32:
- if (!writer.writeString(txLbl))
- return false;
-
- writer.incrementState();
-
- case 33:
- if (!writer.writeCollection(updCntrs,
MessageCollectionItemType.MSG))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- if (!super.readFrom(buf, reader))
- return false;
-
- switch (reader.state()) {
- case 20:
- futId = reader.readIgniteUuid();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 21:
- invalidateNearEntries = reader.readBitSet();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 22:
- miniId = reader.readInt();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 23:
- nearNodeId = reader.readUuid();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 24:
- nearWrites =
reader.readCollection(MessageCollectionItemType.MSG);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 25:
- nearXidVer = reader.readMessage();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 26:
- ownedKeys =
reader.readCollection(MessageCollectionItemType.MSG);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 27:
- ownedVals =
reader.readCollection(MessageCollectionItemType.MSG);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 28:
- preloadKeys = reader.readBitSet();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 29:
- skipCompletedVers = reader.readBoolean();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 30:
- taskNameHash = reader.readInt();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 31:
- topVer = reader.readAffinityTopologyVersion();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 32:
- txLbl = reader.readString();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 33:
- updCntrs =
reader.readCollection(MessageCollectionItemType.MSG);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return true;
- }
-
/** {@inheritDoc} */
@Override public short directType() {
return 34;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
index c09ced3b812..22f53066376 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
@@ -17,12 +17,12 @@
package org.apache.ignite.internal.processors.cache.distributed.near;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.UUID;
+import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import
org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxPrepareRequest;
@@ -33,8 +33,6 @@ import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.Nullable;
/**
@@ -60,22 +58,28 @@ public class GridNearTxPrepareRequest extends
GridDistributedTxPrepareRequest {
private static final int RECOVERY_FLAG_MASK = 0x40;
/** Future ID. */
+ @Order(value = 20, method = "futureId")
private IgniteUuid futId;
/** Mini future ID. */
+ @Order(21)
private int miniId;
/** Topology version. */
+ @Order(value = 22, method = "topologyVersion")
private AffinityTopologyVersion topVer;
/** Task name hash. */
+ @Order(23)
private int taskNameHash;
/** */
+ @Order(value = 24, method = "nearFlags")
@GridToStringExclude
private byte flags;
/** Transaction label. */
+ @Order(value = 25, method = "txLabel")
@GridToStringInclude
@Nullable private String txLbl;
@@ -195,6 +199,13 @@ public class GridNearTxPrepareRequest extends
GridDistributedTxPrepareRequest {
return futId;
}
+ /**
+ * @param futId Future ID.
+ */
+ public void futureId(IgniteUuid futId) {
+ this.futId = futId;
+ }
+
/**
* @return Mini future ID.
*/
@@ -216,6 +227,13 @@ public class GridNearTxPrepareRequest extends
GridDistributedTxPrepareRequest {
return taskNameHash;
}
+ /**
+ * @param taskNameHash Task name hash.
+ */
+ public void taskNameHash(int taskNameHash) {
+ this.taskNameHash = taskNameHash;
+ }
+
/**
* @return Implicit single flag.
*/
@@ -237,6 +255,13 @@ public class GridNearTxPrepareRequest extends
GridDistributedTxPrepareRequest {
return topVer;
}
+ /**
+ * @param topVer Topology version.
+ */
+ public void topologyVersion(AffinityTopologyVersion topVer) {
+ this.topVer = topVer;
+ }
+
/**
* @return Transaction label.
*/
@@ -244,6 +269,27 @@ public class GridNearTxPrepareRequest extends
GridDistributedTxPrepareRequest {
return txLbl;
}
+ /**
+ * @param txLbl Transaction label.
+ */
+ public void txLabel(String txLbl) {
+ this.txLbl = txLbl;
+ }
+
+ /**
+ * @return Flags.
+ */
+ public byte nearFlags() {
+ return flags;
+ }
+
+ /**
+ * @param flags Flags.
+ */
+ public void nearFlags(byte flags) {
+ this.flags = flags;
+ }
+
/**
*
*/
@@ -303,123 +349,6 @@ public class GridNearTxPrepareRequest extends
GridDistributedTxPrepareRequest {
return (flags & mask) != 0;
}
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!super.writeTo(buf, writer))
- return false;
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 20:
- if (!writer.writeByte(flags))
- return false;
-
- writer.incrementState();
-
- case 21:
- if (!writer.writeIgniteUuid(futId))
- return false;
-
- writer.incrementState();
-
- case 22:
- if (!writer.writeInt(miniId))
- return false;
-
- writer.incrementState();
-
- case 23:
- if (!writer.writeInt(taskNameHash))
- return false;
-
- writer.incrementState();
-
- case 24:
- if (!writer.writeAffinityTopologyVersion(topVer))
- return false;
-
- writer.incrementState();
-
- case 25:
- if (!writer.writeString(txLbl))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- if (!super.readFrom(buf, reader))
- return false;
-
- switch (reader.state()) {
- case 20:
- flags = reader.readByte();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 21:
- futId = reader.readIgniteUuid();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 22:
- miniId = reader.readInt();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 23:
- taskNameHash = reader.readInt();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 24:
- topVer = reader.readAffinityTopologyVersion();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 25:
- txLbl = reader.readString();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return true;
- }
-
/** {@inheritDoc} */
@Override public short directType() {
return 55;