This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 72ce8725b6d IGNITE-26024 Reuse allocated DirectMessageWriter instances 
(#6317)
72ce8725b6d is described below

commit 72ce8725b6daa3e036737b219d54725c6159c39b
Author: Ivan Bessonov <[email protected]>
AuthorDate: Fri Jul 25 10:17:34 2025 +0300

    IGNITE-26024 Reuse allocated DirectMessageWriter instances (#6317)
---
 .../internal/network/netty/OutboundEncoder.java    | 36 +++++++++++++++++++---
 1 file changed, 31 insertions(+), 5 deletions(-)

diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundEncoder.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundEncoder.java
index f3861cbd289..f842190a3f5 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundEncoder.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundEncoder.java
@@ -17,11 +17,15 @@
 
 package org.apache.ignite.internal.network.netty;
 
+import static org.apache.ignite.internal.util.ArrayUtils.EMPTY_BYTE_BUFFER;
+
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.MessageToMessageEncoder;
 import io.netty.handler.stream.ChunkedInput;
+import io.netty.util.Attribute;
+import io.netty.util.AttributeKey;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
@@ -32,6 +36,7 @@ import 
org.apache.ignite.internal.network.direct.DirectMessageWriter;
 import org.apache.ignite.internal.network.message.ClassDescriptorListMessage;
 import org.apache.ignite.internal.network.message.ClassDescriptorMessage;
 import org.apache.ignite.internal.network.serialization.MessageSerializer;
+import org.apache.ignite.internal.network.serialization.MessageWriter;
 import 
org.apache.ignite.internal.network.serialization.PerSessionSerializationService;
 
 /**
@@ -45,6 +50,9 @@ public class OutboundEncoder extends 
MessageToMessageEncoder<OutNetworkObject> {
 
     private static final NetworkMessagesFactory MSG_FACTORY = new 
NetworkMessagesFactory();
 
+    /** Message writer channel attribute key. */
+    private static final AttributeKey<MessageWriter> WRITER_KEY = 
AttributeKey.valueOf("WRITER");
+
     /** Serialization registry. */
     private final PerSessionSerializationService serializationService;
 
@@ -59,7 +67,16 @@ public class OutboundEncoder extends 
MessageToMessageEncoder<OutNetworkObject> {
 
     @Override
     protected void encode(ChannelHandlerContext ctx, OutNetworkObject msg, 
List<Object> out) throws Exception {
-        out.add(new NetworkMessageChunkedInput(msg, serializationService));
+        Attribute<MessageWriter> writerAttr = ctx.channel().attr(WRITER_KEY);
+        MessageWriter writer = writerAttr.get();
+
+        if (writer == null) {
+            writer = new 
DirectMessageWriter(serializationService.serializationRegistry(), 
ConnectionManager.DIRECT_PROTOCOL_VERSION);
+
+            writerAttr.set(writer);
+        }
+
+        out.add(new NetworkMessageChunkedInput(msg, serializationService, 
writer));
     }
 
     /**
@@ -75,7 +92,7 @@ public class OutboundEncoder extends 
MessageToMessageEncoder<OutNetworkObject> {
         private final MessageSerializer<ClassDescriptorListMessage> 
descriptorSerializer;
 
         /** Message writer. */
-        private final DirectMessageWriter writer;
+        private final MessageWriter writer;
 
         private final ClassDescriptorListMessage descriptors;
         private final PerSessionSerializationService serializationService;
@@ -92,7 +109,8 @@ public class OutboundEncoder extends 
MessageToMessageEncoder<OutNetworkObject> {
          */
         private NetworkMessageChunkedInput(
                 OutNetworkObject outObject,
-                PerSessionSerializationService serializationService
+                PerSessionSerializationService serializationService,
+                MessageWriter writer
         ) {
             this.serializationService = serializationService;
             this.msg = outObject.networkMessage();
@@ -100,7 +118,7 @@ public class OutboundEncoder extends 
MessageToMessageEncoder<OutNetworkObject> {
             List<ClassDescriptorMessage> outDescriptors = null;
             List<ClassDescriptorMessage> outObjectDescriptors = 
outObject.descriptors();
             //noinspection ForLoopReplaceableByForEach
-            for (int i = 0, descriptorsedSize = outObjectDescriptors.size(); i 
< descriptorsedSize; i++) {
+            for (int i = 0, descriptorsSize = outObjectDescriptors.size(); i < 
descriptorsSize; i++) {
                 ClassDescriptorMessage classDescriptorMessage = 
outObjectDescriptors.get(i);
                 if 
(!serializationService.isDescriptorSent(classDescriptorMessage.descriptorId())) 
{
                     if (outDescriptors == null) {
@@ -122,7 +140,7 @@ public class OutboundEncoder extends 
MessageToMessageEncoder<OutNetworkObject> {
             }
 
             this.serializer = 
serializationService.createMessageSerializer(msg.groupType(), 
msg.messageType());
-            this.writer = new 
DirectMessageWriter(serializationService.serializationRegistry(), 
ConnectionManager.DIRECT_PROTOCOL_VERSION);
+            this.writer = writer;
         }
 
         @Override
@@ -165,12 +183,20 @@ public class OutboundEncoder extends 
MessageToMessageEncoder<OutNetworkObject> {
                     }
                 } else {
                     finished = serializer.writeMessage(msg, writer);
+
+                    if (finished) {
+                        writer.reset();
+                    }
+
                     break;
                 }
             }
 
             buffer.writerIndex(byteBuffer.position() - initialPosition);
 
+            // Do not hold a reference, might help GC to do its job better.
+            writer.setBuffer(EMPTY_BYTE_BUFFER);
+
             return buffer;
         }
 

Reply via email to