This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 7a36d4d736 [ISSUE #7757] Use `CompositeByteBuf` to prevent memory copy. (#7694) 7a36d4d736 is described below commit 7a36d4d736ae8d6d92658e3bdb18f1cd5c0afdb0 Author: 道君 <dao...@apache.org> AuthorDate: Wed Jan 17 09:49:23 2024 +0800 [ISSUE #7757] Use `CompositeByteBuf` to prevent memory copy. (#7694) * Use CompositeByteBuf to prevent mem_copy. * Fix code * Add tests * Remove useless UTs * Remove unused imports. --------- Co-authored-by: RongtongJin <jinrongton...@mails.ucas.ac.cn> --- .../rocketmq/remoting/netty/FileRegionEncoder.java | 20 ++++++++++++++++---- .../remoting/netty/FileRegionEncoderTest.java | 5 +++-- 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/FileRegionEncoder.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/FileRegionEncoder.java index 7373a56070..3522c7965c 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/FileRegionEncoder.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/FileRegionEncoder.java @@ -18,6 +18,9 @@ package org.apache.rocketmq.remoting.netty; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.FileRegion; import io.netty.handler.codec.MessageToByteEncoder; @@ -51,9 +54,12 @@ public class FileRegionEncoder extends MessageToByteEncoder<FileRegion> { WritableByteChannel writableByteChannel = new WritableByteChannel() { @Override public int write(ByteBuffer src) { - int prev = out.writerIndex(); - out.writeBytes(src); - return out.writerIndex() - prev; + // To prevent mem_copy. + CompositeByteBuf b = (CompositeByteBuf) out; + // Have to increase writerIndex manually. + ByteBuf unpooled = Unpooled.wrappedBuffer(src); + b.addComponent(true, unpooled); + return unpooled.readableBytes(); } @Override @@ -76,4 +82,10 @@ public class FileRegionEncoder extends MessageToByteEncoder<FileRegion> { msg.transferTo(writableByteChannel, transferred); } } -} + + @Override + protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, FileRegion msg, boolean preferDirect) throws Exception { + ByteBufAllocator allocator = ctx.alloc(); + return preferDirect ? allocator.compositeDirectBuffer() : allocator.compositeHeapBuffer(); + } +} \ No newline at end of file diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/FileRegionEncoderTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/FileRegionEncoderTest.java index 6c7327f258..0cbe627d80 100644 --- a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/FileRegionEncoderTest.java +++ b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/FileRegionEncoderTest.java @@ -21,14 +21,15 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.DefaultFileRegion; import io.netty.channel.FileRegion; import io.netty.channel.embedded.EmbeddedChannel; +import org.junit.Assert; +import org.junit.Test; + import java.io.BufferedOutputStream; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.util.Random; import java.util.UUID; -import org.junit.Assert; -import org.junit.Test; public class FileRegionEncoderTest {