This is an automated email from the ASF dual-hosted git repository.
sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new ca1ac020a3e IGNITE-25961 Use MessageSerializer for
GridNearAtomicUpdateResponse and GridNearAtomicCheckUpdateRequest (#12201)
ca1ac020a3e is described below
commit ca1ac020a3e943ecc4e4b0b8fb0aa71184e9e9ce
Author: Maksim Davydov <[email protected]>
AuthorDate: Thu Sep 18 12:41:02 2025 +0300
IGNITE-25961 Use MessageSerializer for GridNearAtomicUpdateResponse and
GridNearAtomicCheckUpdateRequest (#12201)
---
.../communication/GridIoMessageFactory.java | 6 +-
.../processors/cache/GridCacheIdMessage.java | 4 +
.../atomic/GridNearAtomicCheckUpdateRequest.java | 84 ++-------
.../dht/atomic/GridNearAtomicUpdateResponse.java | 198 ++++++---------------
4 files changed, 81 insertions(+), 211 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index f48f7c3a599..4a2ac144135 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -41,6 +41,8 @@ import
org.apache.ignite.internal.codegen.GridDhtTxOnePhaseCommitAckRequestSeria
import org.apache.ignite.internal.codegen.GridIntListSerializer;
import org.apache.ignite.internal.codegen.GridJobCancelRequestSerializer;
import org.apache.ignite.internal.codegen.GridJobSiblingsRequestSerializer;
+import
org.apache.ignite.internal.codegen.GridNearAtomicCheckUpdateRequestSerializer;
+import
org.apache.ignite.internal.codegen.GridNearAtomicUpdateResponseSerializer;
import org.apache.ignite.internal.codegen.GridQueryCancelRequestSerializer;
import org.apache.ignite.internal.codegen.GridQueryFailResponseSerializer;
import org.apache.ignite.internal.codegen.GridQueryKillRequestSerializer;
@@ -221,7 +223,7 @@ public class GridIoMessageFactory implements
MessageFactoryProvider {
factory.register((short)-53, SchemaOperationStatusMessage::new, new
SchemaOperationStatusMessageSerializer());
factory.register((short)-52, GridIntList::new, new
GridIntListSerializer());
factory.register((short)-51, NearCacheUpdates::new, new
NearCacheUpdatesSerializer());
- factory.register((short)-50, GridNearAtomicCheckUpdateRequest::new);
+ factory.register((short)-50, GridNearAtomicCheckUpdateRequest::new,
new GridNearAtomicCheckUpdateRequestSerializer());
factory.register((short)-49, UpdateErrors::new);
factory.register((short)-48, GridDhtAtomicNearResponse::new);
factory.register((short)-45,
GridChangeGlobalStateMessageResponse::new);
@@ -273,7 +275,7 @@ public class GridIoMessageFactory implements
MessageFactoryProvider {
factory.register((short)38, GridDhtAtomicUpdateRequest::new);
factory.register((short)39, GridDhtAtomicUpdateResponse::new);
factory.register((short)40, GridNearAtomicFullUpdateRequest::new);
- factory.register((short)41, GridNearAtomicUpdateResponse::new);
+ factory.register((short)41, GridNearAtomicUpdateResponse::new, new
GridNearAtomicUpdateResponseSerializer());
factory.register((short)42, GridDhtForceKeysRequest::new);
factory.register((short)43, GridDhtForceKeysResponse::new);
factory.register((short)45, GridDhtPartitionDemandMessage::new);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIdMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIdMessage.java
index be87ddff1f1..8a9c8a4da58 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIdMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIdMessage.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.cache;
import java.nio.ByteBuffer;
+import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
@@ -29,6 +30,7 @@ import
org.apache.ignite.plugin.extensions.communication.MessageWriter;
public abstract class GridCacheIdMessage extends GridCacheMessage {
/** Cache ID. */
@GridToStringInclude
+ @Order(3)
protected int cacheId;
/**
@@ -52,6 +54,7 @@ public abstract class GridCacheIdMessage extends
GridCacheMessage {
/** {@inheritDoc} */
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ // TODO: Remove #writeTo() after all inheritors have migrated to the
new ser/der scheme (IGNITE-25490).
writer.setBuffer(buf);
if (!super.writeTo(buf, writer))
@@ -78,6 +81,7 @@ public abstract class GridCacheIdMessage extends
GridCacheMessage {
/** {@inheritDoc} */
@Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ // TODO: Remove #readFrom() after all inheritors have migrated to the
new ser/der scheme (IGNITE-25490).
reader.setBuffer(buf);
if (!super.readFrom(buf, reader))
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicCheckUpdateRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicCheckUpdateRequest.java
index 39ba66599dc..78a11bf56d2 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicCheckUpdateRequest.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicCheckUpdateRequest.java
@@ -17,12 +17,9 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
-import java.nio.ByteBuffer;
-import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
/**
*
@@ -32,13 +29,14 @@ public class GridNearAtomicCheckUpdateRequest extends
GridCacheIdMessage {
public static final int CACHE_MSG_IDX = nextIndexId();
/** */
- @GridDirectTransient
private GridNearAtomicAbstractUpdateRequest updateReq;
/** */
+ @Order(value = 4, method = "partition")
private int partId;
/** */
+ @Order(value = 5, method = "futureId")
private long futId;
/**
@@ -69,6 +67,14 @@ public class GridNearAtomicCheckUpdateRequest extends
GridCacheIdMessage {
return futId;
}
+ /**
+ * @param futId Future ID on near node.
+ */
+ public void futureId(long futId) {
+ this.futId = futId;
+ }
+
+
/**
* @return Related update request.
*/
@@ -81,6 +87,13 @@ public class GridNearAtomicCheckUpdateRequest extends
GridCacheIdMessage {
return partId;
}
+ /**
+ * @param partId Partition ID this message is targeted to or {@code -1} if
it cannot be determined.
+ */
+ public void partition(int partId) {
+ this.partId = partId;
+ }
+
/** {@inheritDoc} */
@Override public int lookupIndex() {
return CACHE_MSG_IDX;
@@ -96,67 +109,6 @@ public class GridNearAtomicCheckUpdateRequest extends
GridCacheIdMessage {
return -50;
}
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!super.writeTo(buf, writer))
- return false;
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 4:
- if (!writer.writeLong(futId))
- return false;
-
- writer.incrementState();
-
- case 5:
- if (!writer.writeInt(partId))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- if (!super.readFrom(buf, reader))
- return false;
-
- switch (reader.state()) {
- case 4:
- futId = reader.readLong();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 5:
- partId = reader.readInt();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return true;
- }
-
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridNearAtomicCheckUpdateRequest.class, this);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
index 086391bbe39..ff631a36d47 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
@@ -17,14 +17,12 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
-import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.GridDirectCollection;
-import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -36,9 +34,6 @@ import
org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
-import
org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.Nullable;
/**
@@ -49,35 +44,39 @@ public class GridNearAtomicUpdateResponse extends
GridCacheIdMessage implements
public static final int CACHE_MSG_IDX = nextIndexId();
/** Node ID this reply should be sent to. */
- @GridDirectTransient
private UUID nodeId;
/** Future ID. */
+ @Order(value = 4, method = "futureId")
private long futId;
/** */
+ @Order(value = 5, method = "errors")
private UpdateErrors errs;
/** Return value. */
@GridToStringInclude
+ @Order(value = 6, method = "returnValue")
private GridCacheReturn ret;
/** */
+ @Order(value = 7, method = "remapTopologyVersion")
private AffinityTopologyVersion remapTopVer;
/** Data for near cache update. */
+ @Order(8)
private NearCacheUpdates nearUpdates;
/** Partition ID. */
- private int partId = -1;
+ @Order(value = 9, method = "partition")
+ private int partId;
/** */
- @GridDirectCollection(UUID.class)
@GridToStringInclude
+ @Order(10)
private List<UUID> mapping;
/** */
- @GridDirectTransient
private boolean nodeLeft;
/**
@@ -107,6 +106,8 @@ public class GridNearAtomicUpdateResponse extends
GridCacheIdMessage implements
this.partId = partId;
this.nodeLeft = nodeLeft;
this.addDepInfo = addDepInfo;
+
+ assert partId >= 0;
}
/**
@@ -156,6 +157,27 @@ public class GridNearAtomicUpdateResponse extends
GridCacheIdMessage implements
return futId;
}
+ /**
+ * @param futId New future ID.
+ */
+ public void futureId(long futId) {
+ this.futId = futId;
+ }
+
+ /**
+ * @return Errs.
+ */
+ public UpdateErrors errors() {
+ return errs;
+ }
+
+ /**
+ * @param errs New errs.
+ */
+ public void errors(UpdateErrors errs) {
+ this.errs = errs;
+ }
+
/**
* Sets update error.
*
@@ -197,14 +219,14 @@ public class GridNearAtomicUpdateResponse extends
GridCacheIdMessage implements
/**
* @param remapTopVer Topology version to remap update.
*/
- void remapTopologyVersion(AffinityTopologyVersion remapTopVer) {
+ public void remapTopologyVersion(AffinityTopologyVersion remapTopVer) {
this.remapTopVer = remapTopVer;
}
/**
* @return Topology version if update should be remapped.
*/
- @Nullable AffinityTopologyVersion remapTopologyVersion() {
+ @Nullable public AffinityTopologyVersion remapTopologyVersion() {
return remapTopVer;
}
@@ -369,11 +391,32 @@ public class GridNearAtomicUpdateResponse extends
GridCacheIdMessage implements
ret.finishUnmarshal(cctx, ldr);
}
+ /**
+ * @return Data for near cache update.
+ */
+ public NearCacheUpdates nearUpdates() {
+ return nearUpdates;
+ }
+
+ /**
+ * @param nearUpdates New data for near cache update.
+ */
+ public void nearUpdates(NearCacheUpdates nearUpdates) {
+ this.nearUpdates = nearUpdates;
+ }
+
/** {@inheritDoc} */
@Override public int partition() {
return partId;
}
+ /**
+ * @param partId New partition ID.
+ */
+ public void partition(int partId) {
+ this.partId = partId;
+ }
+
/** {@inheritDoc} */
@Override public boolean addDeploymentInfo() {
return addDepInfo;
@@ -384,137 +427,6 @@ public class GridNearAtomicUpdateResponse extends
GridCacheIdMessage implements
return ctx.atomicMessageLogger();
}
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!super.writeTo(buf, writer))
- return false;
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 4:
- if (!writer.writeMessage(errs))
- return false;
-
- writer.incrementState();
-
- case 5:
- if (!writer.writeLong(futId))
- return false;
-
- writer.incrementState();
-
- case 6:
- if (!writer.writeCollection(mapping,
MessageCollectionItemType.UUID))
- return false;
-
- writer.incrementState();
-
- case 7:
- if (!writer.writeMessage(nearUpdates))
- return false;
-
- writer.incrementState();
-
- case 8:
- if (!writer.writeInt(partId))
- return false;
-
- writer.incrementState();
-
- case 9:
- if (!writer.writeAffinityTopologyVersion(remapTopVer))
- return false;
-
- writer.incrementState();
-
- case 10:
- if (!writer.writeMessage(ret))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- if (!super.readFrom(buf, reader))
- return false;
-
- switch (reader.state()) {
- case 4:
- errs = reader.readMessage();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 5:
- futId = reader.readLong();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 6:
- mapping =
reader.readCollection(MessageCollectionItemType.UUID);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 7:
- nearUpdates = reader.readMessage();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 8:
- partId = reader.readInt();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 9:
- remapTopVer = reader.readAffinityTopologyVersion();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 10:
- ret = reader.readMessage();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return true;
- }
-
/** {@inheritDoc} */
@Override public short directType() {
return 41;