This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 7a36d4d736 [ISSUE #7757] Use `CompositeByteBuf` to prevent memory 
copy. (#7694)
7a36d4d736 is described below

commit 7a36d4d736ae8d6d92658e3bdb18f1cd5c0afdb0
Author: 道君 <dao...@apache.org>
AuthorDate: Wed Jan 17 09:49:23 2024 +0800

    [ISSUE #7757] Use `CompositeByteBuf` to prevent memory copy. (#7694)
    
    * Use CompositeByteBuf to prevent mem_copy.
    
    * Fix code
    
    * Add tests
    
    * Remove useless UTs
    
    * Remove unused imports.
    
    ---------
    
    Co-authored-by: RongtongJin <jinrongton...@mails.ucas.ac.cn>
---
 .../rocketmq/remoting/netty/FileRegionEncoder.java   | 20 ++++++++++++++++----
 .../remoting/netty/FileRegionEncoderTest.java        |  5 +++--
 2 files changed, 19 insertions(+), 6 deletions(-)

diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/FileRegionEncoder.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/FileRegionEncoder.java
index 7373a56070..3522c7965c 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/FileRegionEncoder.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/FileRegionEncoder.java
@@ -18,6 +18,9 @@
 package org.apache.rocketmq.remoting.netty;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.FileRegion;
 import io.netty.handler.codec.MessageToByteEncoder;
@@ -51,9 +54,12 @@ public class FileRegionEncoder extends 
MessageToByteEncoder<FileRegion> {
         WritableByteChannel writableByteChannel = new WritableByteChannel() {
             @Override
             public int write(ByteBuffer src) {
-                int prev = out.writerIndex();
-                out.writeBytes(src);
-                return out.writerIndex() - prev;
+                // To prevent mem_copy.
+                CompositeByteBuf b = (CompositeByteBuf) out;
+                // Have to increase writerIndex manually.
+                ByteBuf unpooled = Unpooled.wrappedBuffer(src);
+                b.addComponent(true, unpooled);
+                return unpooled.readableBytes();
             }
 
             @Override
@@ -76,4 +82,10 @@ public class FileRegionEncoder extends 
MessageToByteEncoder<FileRegion> {
             msg.transferTo(writableByteChannel, transferred);
         }
     }
-}
+
+    @Override
+    protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, FileRegion 
msg, boolean preferDirect) throws Exception {
+        ByteBufAllocator allocator = ctx.alloc();
+        return preferDirect ? allocator.compositeDirectBuffer() : 
allocator.compositeHeapBuffer();
+    }
+}
\ No newline at end of file
diff --git 
a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/FileRegionEncoderTest.java
 
b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/FileRegionEncoderTest.java
index 6c7327f258..0cbe627d80 100644
--- 
a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/FileRegionEncoderTest.java
+++ 
b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/FileRegionEncoderTest.java
@@ -21,14 +21,15 @@ import io.netty.buffer.ByteBuf;
 import io.netty.channel.DefaultFileRegion;
 import io.netty.channel.FileRegion;
 import io.netty.channel.embedded.EmbeddedChannel;
+import org.junit.Assert;
+import org.junit.Test;
+
 import java.io.BufferedOutputStream;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.Random;
 import java.util.UUID;
-import org.junit.Assert;
-import org.junit.Test;
 
 public class FileRegionEncoderTest {
 

Reply via email to