This is an automated email from the ASF dual-hosted git repository.

timoninmaxim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new c35309a6c13 IGNITE-25911 Use MessageSerializer for 
GridJobSiblingsRequest (#12237)
c35309a6c13 is described below

commit c35309a6c13457a72eea664d74ebd9c3491099b5
Author: Dmitry Werner <[email protected]>
AuthorDate: Mon Aug 4 15:19:40 2025 +0500

    IGNITE-25911 Use MessageSerializer for GridJobSiblingsRequest (#12237)
---
 .../ignite/internal/GridJobSiblingsRequest.java    | 94 +++++-----------------
 .../communication/GridIoMessageFactory.java        |  3 +-
 .../internal/processors/job/GridJobProcessor.java  |  9 +--
 .../processors/task/GridTaskProcessor.java         |  8 +-
 4 files changed, 25 insertions(+), 89 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingsRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingsRequest.java
index 5cd037c0ebd..33a55e450b1 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingsRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingsRequest.java
@@ -17,26 +17,21 @@
 
 package org.apache.ignite.internal;
 
-import java.nio.ByteBuffer;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 
 /**
  * Job siblings request.
  */
 public class GridJobSiblingsRequest implements Message {
     /** */
+    @Order(value = 0, method = "sessionId")
     private IgniteUuid sesId;
 
     /** */
-    @GridDirectTransient
-    private Object topic;
-
-    /** */
-    private byte[] topicBytes;
+    @Order(1)
+    private long topicId;
 
     /**
      * Empty constructor.
@@ -47,16 +42,13 @@ public class GridJobSiblingsRequest implements Message {
 
     /**
      * @param sesId Session ID.
-     * @param topic Topic.
-     * @param topicBytes Serialized topic.
+     * @param topicId Topic ID.
      */
-    public GridJobSiblingsRequest(IgniteUuid sesId, Object topic, byte[] 
topicBytes) {
+    public GridJobSiblingsRequest(IgniteUuid sesId, long topicId) {
         assert sesId != null;
-        assert topic != null || topicBytes != null;
 
         this.sesId = sesId;
-        this.topic = topic;
-        this.topicBytes = topicBytes;
+        this.topicId = topicId;
     }
 
     /**
@@ -67,77 +59,29 @@ public class GridJobSiblingsRequest implements Message {
     }
 
     /**
-     * @return Topic.
+     * @param sesId New session ID.
      */
-    public Object topic() {
-        return topic;
+    public void sessionId(IgniteUuid sesId) {
+        this.sesId = sesId;
     }
 
     /**
-     * @return Serialized topic.
+     * @return Topic ID.
      */
-    public byte[] topicBytes() {
-        return topicBytes;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onAckReceived() {
-        // No-op.
+    public long topicId() {
+        return topicId;
     }
 
-    /** {@inheritDoc} */
-    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
-        writer.setBuffer(buf);
-
-        if (!writer.isHeaderWritten()) {
-            if (!writer.writeHeader(directType()))
-                return false;
-
-            writer.onHeaderWritten();
-        }
-
-        switch (writer.state()) {
-            case 0:
-                if (!writer.writeIgniteUuid(sesId))
-                    return false;
-
-                writer.incrementState();
-
-            case 1:
-                if (!writer.writeByteArray(topicBytes))
-                    return false;
-
-                writer.incrementState();
-
-        }
-
-        return true;
+    /**
+     * @param topicId New topic ID.
+     */
+    public void topicId(long topicId) {
+        this.topicId = topicId;
     }
 
     /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
-        reader.setBuffer(buf);
-
-        switch (reader.state()) {
-            case 0:
-                sesId = reader.readIgniteUuid();
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 1:
-                topicBytes = reader.readByteArray();
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-        }
-
-        return true;
+    @Override public void onAckReceived() {
+        // No-op.
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index d445adb3738..4827e722dd9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -34,6 +34,7 @@ import 
org.apache.ignite.internal.codegen.GridCacheVersionSerializer;
 import org.apache.ignite.internal.codegen.GridDhtPartitionExchangeIdSerializer;
 import org.apache.ignite.internal.codegen.GridIntListSerializer;
 import org.apache.ignite.internal.codegen.GridJobCancelRequestSerializer;
+import org.apache.ignite.internal.codegen.GridJobSiblingsRequestSerializer;
 import org.apache.ignite.internal.codegen.GridQueryKillRequestSerializer;
 import org.apache.ignite.internal.codegen.GridQueryKillResponseSerializer;
 import org.apache.ignite.internal.codegen.LatchAckMessageSerializer;
@@ -220,7 +221,7 @@ public class GridIoMessageFactory implements 
MessageFactoryProvider {
         factory.register((short)0, GridJobCancelRequest::new, new 
GridJobCancelRequestSerializer());
         factory.register((short)1, GridJobExecuteRequest::new);
         factory.register((short)2, GridJobExecuteResponse::new);
-        factory.register((short)3, GridJobSiblingsRequest::new);
+        factory.register((short)3, GridJobSiblingsRequest::new, new 
GridJobSiblingsRequestSerializer());
         factory.register((short)4, GridJobSiblingsResponse::new);
         factory.register((short)5, GridTaskCancelRequest::new);
         factory.register((short)6, GridTaskSessionRequest::new);
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
index 3b112dae4cc..1b7f7a32df3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
@@ -702,10 +702,9 @@ public class GridJobProcessor extends GridProcessorAdapter 
{
             }
         };
 
-        boolean loc = ctx.localNodeId().equals(taskNodeId);
-
         // 1. Create unique topic name.
-        Object topic = TOPIC_JOB_SIBLINGS.topic(ses.getId(), 
topicIdGen.getAndIncrement());
+        long topicId = topicIdGen.getAndIncrement();
+        Object topic = TOPIC_JOB_SIBLINGS.topic(ses.getId(), topicId);
 
         try {
             // 2. Register listener.
@@ -713,9 +712,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
 
             // 3. Send message.
             ctx.io().sendToGridTopic(taskNode, TOPIC_JOB_SIBLINGS,
-                new GridJobSiblingsRequest(ses.getId(),
-                    loc ? topic : null,
-                    loc ? null : U.marshal(marsh, topic)),
+                new GridJobSiblingsRequest(ses.getId(), topicId),
                 SYSTEM_POOL);
 
             // 4. Listen to discovery events.
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
index 8b0a863f858..668dd35abe0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
@@ -1411,13 +1411,7 @@ public class GridTaskProcessor extends 
GridProcessorAdapter implements IgniteCha
                 }
 
                 try {
-                    Object topic = req.topic();
-
-                    if (topic == null) {
-                        assert req.topicBytes() != null;
-
-                        topic = U.unmarshal(marsh, req.topicBytes(), 
U.resolveClassLoader(ctx.config()));
-                    }
+                    Object topic = TOPIC_JOB_SIBLINGS.topic(req.sessionId(), 
req.topicId());
 
                     boolean loc = ctx.localNodeId().equals(nodeId);
 

Reply via email to