This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 72ce8725b6d IGNITE-26024 Reuse allocated DirectMessageWriter instances
(#6317)
72ce8725b6d is described below
commit 72ce8725b6daa3e036737b219d54725c6159c39b
Author: Ivan Bessonov <[email protected]>
AuthorDate: Fri Jul 25 10:17:34 2025 +0300
IGNITE-26024 Reuse allocated DirectMessageWriter instances (#6317)
---
.../internal/network/netty/OutboundEncoder.java | 36 +++++++++++++++++++---
1 file changed, 31 insertions(+), 5 deletions(-)
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundEncoder.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundEncoder.java
index f3861cbd289..f842190a3f5 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundEncoder.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundEncoder.java
@@ -17,11 +17,15 @@
package org.apache.ignite.internal.network.netty;
+import static org.apache.ignite.internal.util.ArrayUtils.EMPTY_BYTE_BUFFER;
+
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.handler.stream.ChunkedInput;
+import io.netty.util.Attribute;
+import io.netty.util.AttributeKey;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
@@ -32,6 +36,7 @@ import
org.apache.ignite.internal.network.direct.DirectMessageWriter;
import org.apache.ignite.internal.network.message.ClassDescriptorListMessage;
import org.apache.ignite.internal.network.message.ClassDescriptorMessage;
import org.apache.ignite.internal.network.serialization.MessageSerializer;
+import org.apache.ignite.internal.network.serialization.MessageWriter;
import
org.apache.ignite.internal.network.serialization.PerSessionSerializationService;
/**
@@ -45,6 +50,9 @@ public class OutboundEncoder extends
MessageToMessageEncoder<OutNetworkObject> {
private static final NetworkMessagesFactory MSG_FACTORY = new
NetworkMessagesFactory();
+ /** Message writer channel attribute key. */
+ private static final AttributeKey<MessageWriter> WRITER_KEY =
AttributeKey.valueOf("WRITER");
+
/** Serialization registry. */
private final PerSessionSerializationService serializationService;
@@ -59,7 +67,16 @@ public class OutboundEncoder extends
MessageToMessageEncoder<OutNetworkObject> {
@Override
protected void encode(ChannelHandlerContext ctx, OutNetworkObject msg,
List<Object> out) throws Exception {
- out.add(new NetworkMessageChunkedInput(msg, serializationService));
+ Attribute<MessageWriter> writerAttr = ctx.channel().attr(WRITER_KEY);
+ MessageWriter writer = writerAttr.get();
+
+ if (writer == null) {
+ writer = new
DirectMessageWriter(serializationService.serializationRegistry(),
ConnectionManager.DIRECT_PROTOCOL_VERSION);
+
+ writerAttr.set(writer);
+ }
+
+ out.add(new NetworkMessageChunkedInput(msg, serializationService,
writer));
}
/**
@@ -75,7 +92,7 @@ public class OutboundEncoder extends
MessageToMessageEncoder<OutNetworkObject> {
private final MessageSerializer<ClassDescriptorListMessage>
descriptorSerializer;
/** Message writer. */
- private final DirectMessageWriter writer;
+ private final MessageWriter writer;
private final ClassDescriptorListMessage descriptors;
private final PerSessionSerializationService serializationService;
@@ -92,7 +109,8 @@ public class OutboundEncoder extends
MessageToMessageEncoder<OutNetworkObject> {
*/
private NetworkMessageChunkedInput(
OutNetworkObject outObject,
- PerSessionSerializationService serializationService
+ PerSessionSerializationService serializationService,
+ MessageWriter writer
) {
this.serializationService = serializationService;
this.msg = outObject.networkMessage();
@@ -100,7 +118,7 @@ public class OutboundEncoder extends
MessageToMessageEncoder<OutNetworkObject> {
List<ClassDescriptorMessage> outDescriptors = null;
List<ClassDescriptorMessage> outObjectDescriptors =
outObject.descriptors();
//noinspection ForLoopReplaceableByForEach
- for (int i = 0, descriptorsedSize = outObjectDescriptors.size(); i
< descriptorsedSize; i++) {
+ for (int i = 0, descriptorsSize = outObjectDescriptors.size(); i <
descriptorsSize; i++) {
ClassDescriptorMessage classDescriptorMessage =
outObjectDescriptors.get(i);
if
(!serializationService.isDescriptorSent(classDescriptorMessage.descriptorId()))
{
if (outDescriptors == null) {
@@ -122,7 +140,7 @@ public class OutboundEncoder extends
MessageToMessageEncoder<OutNetworkObject> {
}
this.serializer =
serializationService.createMessageSerializer(msg.groupType(),
msg.messageType());
- this.writer = new
DirectMessageWriter(serializationService.serializationRegistry(),
ConnectionManager.DIRECT_PROTOCOL_VERSION);
+ this.writer = writer;
}
@Override
@@ -165,12 +183,20 @@ public class OutboundEncoder extends
MessageToMessageEncoder<OutNetworkObject> {
}
} else {
finished = serializer.writeMessage(msg, writer);
+
+ if (finished) {
+ writer.reset();
+ }
+
break;
}
}
buffer.writerIndex(byteBuffer.position() - initialPosition);
+ // Do not hold a reference, might help GC to do its job better.
+ writer.setBuffer(EMPTY_BYTE_BUFFER);
+
return buffer;
}