This is an automated email from the ASF dual-hosted git repository.
timoninmaxim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 8b70dc0e23c IGNITE-26013 Use MessageSerializer for SnapshotFiles*
messages (#12308)
8b70dc0e23c is described below
commit 8b70dc0e23cc3edae758bc5e11efaf33e4b281bb
Author: Dmitry Werner <[email protected]>
AuthorDate: Tue Sep 2 13:32:48 2025 +0500
IGNITE-26013 Use MessageSerializer for SnapshotFiles* messages (#12308)
---
.../communication/GridIoMessageFactory.java | 8 +-
.../snapshot/AbstractSnapshotMessage.java | 45 ++-----
.../snapshot/SnapshotFilesFailureMessage.java | 53 +--------
.../snapshot/SnapshotFilesRequestMessage.java | 131 ++++++---------------
4 files changed, 54 insertions(+), 183 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index b462c8aa538..665f92e5e7b 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -54,6 +54,8 @@ import
org.apache.ignite.internal.codegen.MissingMappingResponseMessageSerialize
import org.apache.ignite.internal.codegen.NearCacheUpdatesSerializer;
import
org.apache.ignite.internal.codegen.SchemaOperationStatusMessageSerializer;
import org.apache.ignite.internal.codegen.SessionChannelMessageSerializer;
+import
org.apache.ignite.internal.codegen.SnapshotFilesFailureMessageSerializer;
+import
org.apache.ignite.internal.codegen.SnapshotFilesRequestMessageSerializer;
import
org.apache.ignite.internal.codegen.TcpInverseConnectionResponseMessageSerializer;
import org.apache.ignite.internal.codegen.TxLockSerializer;
import org.apache.ignite.internal.codegen.TxLocksRequestSerializer;
@@ -359,8 +361,10 @@ public class GridIoMessageFactory implements
MessageFactoryProvider {
factory.register(SessionChannelMessage.TYPE_CODE,
SessionChannelMessage::new, new SessionChannelMessageSerializer());
factory.register(SingleNodeMessage.TYPE_CODE, SingleNodeMessage::new);
factory.register((short)177, TcpInverseConnectionResponseMessage::new,
new TcpInverseConnectionResponseMessageSerializer());
- factory.register(SnapshotFilesRequestMessage.TYPE_CODE,
SnapshotFilesRequestMessage::new);
- factory.register(SnapshotFilesFailureMessage.TYPE_CODE,
SnapshotFilesFailureMessage::new);
+ factory.register(SnapshotFilesRequestMessage.TYPE_CODE,
SnapshotFilesRequestMessage::new,
+ new SnapshotFilesRequestMessageSerializer());
+ factory.register(SnapshotFilesFailureMessage.TYPE_CODE,
SnapshotFilesFailureMessage::new,
+ new SnapshotFilesFailureMessageSerializer());
factory.register((short)180,
AtomicApplicationAttributesAwareRequest::new);
factory.register((short)181, TransactionAttributesAwareRequest::new);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotMessage.java
index c21bd53bb5e..3c10a9eb74a 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotMessage.java
@@ -18,18 +18,17 @@
package org.apache.ignite.internal.processors.cache.persistence.snapshot;
-import java.nio.ByteBuffer;
+import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
/**
*
*/
abstract class AbstractSnapshotMessage implements Message {
/** Unique message ID. */
+ @Order(0)
private String id;
/**
@@ -55,41 +54,11 @@ abstract class AbstractSnapshotMessage implements Message {
return id;
}
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- if (writer.state() == 0) {
- if (!writer.writeString(id))
- return false;
-
- writer.incrementState();
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- if (reader.state() == 0) {
- id = reader.readString();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
- }
-
- return true;
+ /**
+ * @param id Unique message ID.
+ */
+ public void id(String id) {
+ this.id = id;
}
/** {@inheritDoc} */
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFilesFailureMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFilesFailureMessage.java
index 6463cb80f98..80b00b1e228 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFilesFailureMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFilesFailureMessage.java
@@ -18,10 +18,8 @@
package org.apache.ignite.internal.processors.cache.persistence.snapshot;
-import java.nio.ByteBuffer;
+import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
/**
* Message indicating a failure occurred during processing snapshot files
request.
@@ -31,6 +29,7 @@ public class SnapshotFilesFailureMessage extends
AbstractSnapshotMessage {
public static final short TYPE_CODE = 179;
/** Exception message which is occurred during snapshot request
processing. */
+ @Order(value = 1, method = "errorMessage")
private String errMsg;
/**
@@ -59,55 +58,9 @@ public class SnapshotFilesFailureMessage extends
AbstractSnapshotMessage {
/**
* @param errMsg Response error message.
- * @return {@code this} for chaining.
*/
- public SnapshotFilesFailureMessage errorMessage(String errMsg) {
+ public void errorMessage(String errMsg) {
this.errMsg = errMsg;
-
- return this;
- }
-
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!super.writeTo(buf, writer))
- return false;
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- if (writer.state() == 1) {
- if (!writer.writeString(errMsg))
- return false;
-
- writer.incrementState();
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- if (!super.readFrom(buf, reader))
- return false;
-
- if (reader.state() == 1) {
- errMsg = reader.readString();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
- }
-
- return true;
}
/** {@inheritDoc} */
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFilesRequestMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFilesRequestMessage.java
index b599bb18924..89622fae573 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFilesRequestMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFilesRequestMessage.java
@@ -18,20 +18,16 @@
package org.apache.ignite.internal.processors.cache.persistence.snapshot;
-import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
-import org.apache.ignite.internal.GridDirectMap;
+import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
-import
org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.Nullable;
/**
@@ -42,16 +38,19 @@ public class SnapshotFilesRequestMessage extends
AbstractSnapshotMessage {
public static final short TYPE_CODE = 178;
/** Snapshot operation request ID. */
+ @Order(value = 1, method = "requestId")
private UUID reqId;
/** Snapshot name to request. */
+ @Order(value = 2, method = "snapshotName")
private String snpName;
/** Snapshot directory path. */
+ @Order(value = 3, method = "snapshotPath")
private String snpPath;
/** Map of cache group ids and corresponding set of its partition ids. */
- @GridDirectMap(keyType = Integer.class, valueType = int[].class)
+ @Order(value = 4, method = "partitions")
private Map<Integer, int[]> parts;
/**
@@ -100,6 +99,20 @@ public class SnapshotFilesRequestMessage extends
AbstractSnapshotMessage {
return res;
}
+ /**
+ * @return The demanded cache group partitions per each cache group.
+ */
+ public Map<Integer, int[]> partitions() {
+ return parts;
+ }
+
+ /**
+ * @param parts The demanded cache group partitions per each cache group.
+ */
+ public void partitions(Map<Integer, int[]> parts) {
+ this.parts = parts;
+ }
+
/**
* @return Requested snapshot name.
*/
@@ -107,6 +120,13 @@ public class SnapshotFilesRequestMessage extends
AbstractSnapshotMessage {
return snpName;
}
+ /**
+ * @param snpName Requested snapshot name.
+ */
+ public void snapshotName(String snpName) {
+ this.snpName = snpName;
+ }
+
/**
* @return Snapshot directory path.
*/
@@ -114,6 +134,13 @@ public class SnapshotFilesRequestMessage extends
AbstractSnapshotMessage {
return snpPath;
}
+ /**
+ * @param snpPath Snapshot directory path.
+ */
+ public void snapshotPath(String snpPath) {
+ this.snpPath = snpPath;
+ }
+
/**
* @return Snapshot operation request ID.
*/
@@ -121,93 +148,11 @@ public class SnapshotFilesRequestMessage extends
AbstractSnapshotMessage {
return reqId;
}
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!super.writeTo(buf, writer))
- return false;
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 1:
- if (!writer.writeMap(parts, MessageCollectionItemType.INT,
MessageCollectionItemType.INT_ARR))
- return false;
-
- writer.incrementState();
-
- case 2:
- if (!writer.writeUuid(reqId))
- return false;
-
- writer.incrementState();
-
- case 3:
- if (!writer.writeString(snpName))
- return false;
-
- writer.incrementState();
-
- case 4:
- if (!writer.writeString(snpPath))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- if (!super.readFrom(buf, reader))
- return false;
-
- switch (reader.state()) {
- case 1:
- parts = reader.readMap(MessageCollectionItemType.INT,
MessageCollectionItemType.INT_ARR, false);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 2:
- reqId = reader.readUuid();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 3:
- snpName = reader.readString();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 4:
- snpPath = reader.readString();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return true;
+ /**
+ * @param reqId Snapshot operation request ID.
+ */
+ public void requestId(UUID reqId) {
+ this.reqId = reqId;
}
/** {@inheritDoc} */