This is an automated email from the ASF dual-hosted git repository.
timoninmaxim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new c35309a6c13 IGNITE-25911 Use MessageSerializer for
GridJobSiblingsRequest (#12237)
c35309a6c13 is described below
commit c35309a6c13457a72eea664d74ebd9c3491099b5
Author: Dmitry Werner <[email protected]>
AuthorDate: Mon Aug 4 15:19:40 2025 +0500
IGNITE-25911 Use MessageSerializer for GridJobSiblingsRequest (#12237)
---
.../ignite/internal/GridJobSiblingsRequest.java | 94 +++++-----------------
.../communication/GridIoMessageFactory.java | 3 +-
.../internal/processors/job/GridJobProcessor.java | 9 +--
.../processors/task/GridTaskProcessor.java | 8 +-
4 files changed, 25 insertions(+), 89 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingsRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingsRequest.java
index 5cd037c0ebd..33a55e450b1 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingsRequest.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingsRequest.java
@@ -17,26 +17,21 @@
package org.apache.ignite.internal;
-import java.nio.ByteBuffer;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
/**
* Job siblings request.
*/
public class GridJobSiblingsRequest implements Message {
/** */
+ @Order(value = 0, method = "sessionId")
private IgniteUuid sesId;
/** */
- @GridDirectTransient
- private Object topic;
-
- /** */
- private byte[] topicBytes;
+ @Order(1)
+ private long topicId;
/**
* Empty constructor.
@@ -47,16 +42,13 @@ public class GridJobSiblingsRequest implements Message {
/**
* @param sesId Session ID.
- * @param topic Topic.
- * @param topicBytes Serialized topic.
+ * @param topicId Topic ID.
*/
- public GridJobSiblingsRequest(IgniteUuid sesId, Object topic, byte[]
topicBytes) {
+ public GridJobSiblingsRequest(IgniteUuid sesId, long topicId) {
assert sesId != null;
- assert topic != null || topicBytes != null;
this.sesId = sesId;
- this.topic = topic;
- this.topicBytes = topicBytes;
+ this.topicId = topicId;
}
/**
@@ -67,77 +59,29 @@ public class GridJobSiblingsRequest implements Message {
}
/**
- * @return Topic.
+ * @param sesId New session ID.
*/
- public Object topic() {
- return topic;
+ public void sessionId(IgniteUuid sesId) {
+ this.sesId = sesId;
}
/**
- * @return Serialized topic.
+ * @return Topic ID.
*/
- public byte[] topicBytes() {
- return topicBytes;
- }
-
- /** {@inheritDoc} */
- @Override public void onAckReceived() {
- // No-op.
+ public long topicId() {
+ return topicId;
}
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 0:
- if (!writer.writeIgniteUuid(sesId))
- return false;
-
- writer.incrementState();
-
- case 1:
- if (!writer.writeByteArray(topicBytes))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
+ /**
+ * @param topicId New topic ID.
+ */
+ public void topicId(long topicId) {
+ this.topicId = topicId;
}
/** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- switch (reader.state()) {
- case 0:
- sesId = reader.readIgniteUuid();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 1:
- topicBytes = reader.readByteArray();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return true;
+ @Override public void onAckReceived() {
+ // No-op.
}
/** {@inheritDoc} */
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index d445adb3738..4827e722dd9 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -34,6 +34,7 @@ import
org.apache.ignite.internal.codegen.GridCacheVersionSerializer;
import org.apache.ignite.internal.codegen.GridDhtPartitionExchangeIdSerializer;
import org.apache.ignite.internal.codegen.GridIntListSerializer;
import org.apache.ignite.internal.codegen.GridJobCancelRequestSerializer;
+import org.apache.ignite.internal.codegen.GridJobSiblingsRequestSerializer;
import org.apache.ignite.internal.codegen.GridQueryKillRequestSerializer;
import org.apache.ignite.internal.codegen.GridQueryKillResponseSerializer;
import org.apache.ignite.internal.codegen.LatchAckMessageSerializer;
@@ -220,7 +221,7 @@ public class GridIoMessageFactory implements
MessageFactoryProvider {
factory.register((short)0, GridJobCancelRequest::new, new
GridJobCancelRequestSerializer());
factory.register((short)1, GridJobExecuteRequest::new);
factory.register((short)2, GridJobExecuteResponse::new);
- factory.register((short)3, GridJobSiblingsRequest::new);
+ factory.register((short)3, GridJobSiblingsRequest::new, new
GridJobSiblingsRequestSerializer());
factory.register((short)4, GridJobSiblingsResponse::new);
factory.register((short)5, GridTaskCancelRequest::new);
factory.register((short)6, GridTaskSessionRequest::new);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
index 3b112dae4cc..1b7f7a32df3 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
@@ -702,10 +702,9 @@ public class GridJobProcessor extends GridProcessorAdapter
{
}
};
- boolean loc = ctx.localNodeId().equals(taskNodeId);
-
// 1. Create unique topic name.
- Object topic = TOPIC_JOB_SIBLINGS.topic(ses.getId(),
topicIdGen.getAndIncrement());
+ long topicId = topicIdGen.getAndIncrement();
+ Object topic = TOPIC_JOB_SIBLINGS.topic(ses.getId(), topicId);
try {
// 2. Register listener.
@@ -713,9 +712,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
// 3. Send message.
ctx.io().sendToGridTopic(taskNode, TOPIC_JOB_SIBLINGS,
- new GridJobSiblingsRequest(ses.getId(),
- loc ? topic : null,
- loc ? null : U.marshal(marsh, topic)),
+ new GridJobSiblingsRequest(ses.getId(), topicId),
SYSTEM_POOL);
// 4. Listen to discovery events.
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
index 8b0a863f858..668dd35abe0 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
@@ -1411,13 +1411,7 @@ public class GridTaskProcessor extends
GridProcessorAdapter implements IgniteCha
}
try {
- Object topic = req.topic();
-
- if (topic == null) {
- assert req.topicBytes() != null;
-
- topic = U.unmarshal(marsh, req.topicBytes(),
U.resolveClassLoader(ctx.config()));
- }
+ Object topic = TOPIC_JOB_SIBLINGS.topic(req.sessionId(),
req.topicId());
boolean loc = ctx.localNodeId().equals(nodeId);