This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c9c710  Fixed unnecessary copy to heap, see 
https://github.com/apache/pulsar/pull/10330 (#2701)
3c9c710 is described below

commit 3c9c7102538909fd3764ea7314e7618d6d9458fd
Author: Andrey Yegorov <[email protected]>
AuthorDate: Sun Apr 25 18:16:01 2021 -0700

    Fixed unnecessary copy to heap, see 
https://github.com/apache/pulsar/pull/10330 (#2701)
    
    Descriptions of the changes in this PR:
    
    Handling CompositeByteBuf in a way that avoids unnecessary data copy.
    
    ### Motivation
    
    https://github.com/apache/pulsar/pull/10330
    
    https://github.com/apache/pulsar/pull/10330#issuecomment-825619753
    
    ### Changes
    
    Handling CompositeByteBuf in a way that avoids unnecessary data copy.
---
 .../bookkeeper/proto/checksum/DigestManager.java   | 17 ++++++++--
 .../org/apache/bookkeeper/util/ByteBufList.java    | 36 ++++++++++++++++++++--
 .../apache/bookkeeper/util/ByteBufListTest.java    | 34 ++++++++++++++++++++
 3 files changed, 82 insertions(+), 5 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java
index 034dd6e..87d8541 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java
@@ -19,7 +19,9 @@ package org.apache.bookkeeper.proto.checksum;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.CompositeByteBuf;
 import io.netty.buffer.Unpooled;
+import io.netty.util.ReferenceCountUtil;
 
 import java.security.GeneralSecurityException;
 import java.security.NoSuchAlgorithmException;
@@ -110,10 +112,21 @@ public abstract class DigestManager {
         headersBuffer.writeLong(length);
 
         update(headersBuffer);
-        update(data);
+
+        // don't unwrap slices
+        final ByteBuf unwrapped = data.unwrap() != null && data.unwrap() 
instanceof CompositeByteBuf
+                ? data.unwrap() : data;
+        ReferenceCountUtil.retain(unwrapped);
+        ReferenceCountUtil.release(data);
+
+        if (unwrapped instanceof CompositeByteBuf) {
+            ((CompositeByteBuf) unwrapped).forEach(this::update);
+        } else {
+            update(unwrapped);
+        }
         populateValueAndReset(headersBuffer);
 
-        return ByteBufList.get(headersBuffer, data);
+        return ByteBufList.get(headersBuffer, unwrapped);
     }
 
     /**
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java
index 355cf3f..d136ff0 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java
@@ -23,6 +23,7 @@ package org.apache.bookkeeper.util;
 import com.google.common.annotations.VisibleForTesting;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.CompositeByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelHandler.Sharable;
 import io.netty.channel.ChannelHandlerContext;
@@ -136,14 +137,43 @@ public class ByteBufList extends AbstractReferenceCounted 
{
      * Append a {@link ByteBuf} at the end of this {@link ByteBufList}.
      */
     public void add(ByteBuf buf) {
-        buffers.add(buf);
+        final ByteBuf unwrapped = buf.unwrap() != null && buf.unwrap() 
instanceof CompositeByteBuf
+                ? buf.unwrap() : buf;
+        ReferenceCountUtil.retain(unwrapped);
+        ReferenceCountUtil.release(buf);
+
+        if (unwrapped instanceof CompositeByteBuf) {
+            ((CompositeByteBuf) unwrapped).forEach(b -> {
+                ReferenceCountUtil.retain(b);
+                buffers.add(b);
+            });
+            ReferenceCountUtil.release(unwrapped);
+        } else {
+            buffers.add(unwrapped);
+        }
     }
 
     /**
      * Prepend a {@link ByteBuf} at the beginning of this {@link ByteBufList}.
      */
     public void prepend(ByteBuf buf) {
-        buffers.add(0, buf);
+        // don't unwrap slices
+        final ByteBuf unwrapped = buf.unwrap() != null && buf.unwrap() 
instanceof CompositeByteBuf
+                ? buf.unwrap() : buf;
+        ReferenceCountUtil.retain(unwrapped);
+        ReferenceCountUtil.release(buf);
+
+        if (unwrapped instanceof CompositeByteBuf) {
+            CompositeByteBuf composite = (CompositeByteBuf) unwrapped;
+            for (int i = composite.numComponents() - 1; i >= 0; i--) {
+                ByteBuf b = composite.component(i);
+                ReferenceCountUtil.retain(b);
+                buffers.add(0, b);
+            }
+            ReferenceCountUtil.release(unwrapped);
+        } else {
+            buffers.add(0, unwrapped);
+        }
     }
 
     /**
@@ -259,7 +289,7 @@ public class ByteBufList extends AbstractReferenceCounted {
     @Override
     protected void deallocate() {
         for (int i = 0; i < buffers.size(); i++) {
-            buffers.get(i).release();
+            ReferenceCountUtil.release(buffers.get(i));
         }
 
         buffers.clear();
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/ByteBufListTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/ByteBufListTest.java
index 19c841b..65f51e2 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/ByteBufListTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/ByteBufListTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertEquals;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.CompositeByteBuf;
 import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.Channel;
@@ -89,6 +90,39 @@ public class ByteBufListTest {
     }
 
     @Test
+    public void testComposite() throws Exception {
+        ByteBuf b1 = PooledByteBufAllocator.DEFAULT.heapBuffer(128, 128);
+        b1.writerIndex(b1.capacity());
+        ByteBuf b2 = PooledByteBufAllocator.DEFAULT.heapBuffer(128, 128);
+        b2.writerIndex(b2.capacity());
+
+        CompositeByteBuf composite = 
PooledByteBufAllocator.DEFAULT.compositeBuffer();
+        composite.addComponent(b1);
+        composite.addComponent(b2);
+
+        ByteBufList buf = ByteBufList.get(composite);
+
+        // composite is unwrapped into two parts
+        assertEquals(2, buf.size());
+        // and released
+        assertEquals(composite.refCnt(), 0);
+
+        assertEquals(256, buf.readableBytes());
+        assertEquals(b1, buf.getBuffer(0));
+        assertEquals(b2, buf.getBuffer(1));
+
+        assertEquals(buf.refCnt(), 1);
+        assertEquals(b1.refCnt(), 1);
+        assertEquals(b2.refCnt(), 1);
+
+        buf.release();
+
+        assertEquals(buf.refCnt(), 0);
+        assertEquals(b1.refCnt(), 0);
+        assertEquals(b2.refCnt(), 0);
+    }
+
+    @Test
     public void testClone() throws Exception {
         ByteBuf b1 = PooledByteBufAllocator.DEFAULT.heapBuffer(128, 128);
         b1.writerIndex(b1.capacity());

Reply via email to