This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 3c9c710 Fixed unnecessary copy to heap, see
https://github.com/apache/pulsar/pull/10330 (#2701)
3c9c710 is described below
commit 3c9c7102538909fd3764ea7314e7618d6d9458fd
Author: Andrey Yegorov <[email protected]>
AuthorDate: Sun Apr 25 18:16:01 2021 -0700
Fixed unnecessary copy to heap, see
https://github.com/apache/pulsar/pull/10330 (#2701)
Descriptions of the changes in this PR:
Handling CompositeByteBuf in a way that avoids unnecessary data copy.
### Motivation
https://github.com/apache/pulsar/pull/10330
https://github.com/apache/pulsar/pull/10330#issuecomment-825619753
### Changes
Handling CompositeByteBuf in a way that avoids unnecessary data copy.
---
.../bookkeeper/proto/checksum/DigestManager.java | 17 ++++++++--
.../org/apache/bookkeeper/util/ByteBufList.java | 36 ++++++++++++++++++++--
.../apache/bookkeeper/util/ByteBufListTest.java | 34 ++++++++++++++++++++
3 files changed, 82 insertions(+), 5 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java
index 034dd6e..87d8541 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java
@@ -19,7 +19,9 @@ package org.apache.bookkeeper.proto.checksum;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
+import io.netty.util.ReferenceCountUtil;
import java.security.GeneralSecurityException;
import java.security.NoSuchAlgorithmException;
@@ -110,10 +112,21 @@ public abstract class DigestManager {
headersBuffer.writeLong(length);
update(headersBuffer);
- update(data);
+
+ // don't unwrap slices
+ final ByteBuf unwrapped = data.unwrap() != null && data.unwrap()
instanceof CompositeByteBuf
+ ? data.unwrap() : data;
+ ReferenceCountUtil.retain(unwrapped);
+ ReferenceCountUtil.release(data);
+
+ if (unwrapped instanceof CompositeByteBuf) {
+ ((CompositeByteBuf) unwrapped).forEach(this::update);
+ } else {
+ update(unwrapped);
+ }
populateValueAndReset(headersBuffer);
- return ByteBufList.get(headersBuffer, data);
+ return ByteBufList.get(headersBuffer, unwrapped);
}
/**
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java
index 355cf3f..d136ff0 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java
@@ -23,6 +23,7 @@ package org.apache.bookkeeper.util;
import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
@@ -136,14 +137,43 @@ public class ByteBufList extends AbstractReferenceCounted
{
* Append a {@link ByteBuf} at the end of this {@link ByteBufList}.
*/
public void add(ByteBuf buf) {
- buffers.add(buf);
+ final ByteBuf unwrapped = buf.unwrap() != null && buf.unwrap()
instanceof CompositeByteBuf
+ ? buf.unwrap() : buf;
+ ReferenceCountUtil.retain(unwrapped);
+ ReferenceCountUtil.release(buf);
+
+ if (unwrapped instanceof CompositeByteBuf) {
+ ((CompositeByteBuf) unwrapped).forEach(b -> {
+ ReferenceCountUtil.retain(b);
+ buffers.add(b);
+ });
+ ReferenceCountUtil.release(unwrapped);
+ } else {
+ buffers.add(unwrapped);
+ }
}
/**
* Prepend a {@link ByteBuf} at the beginning of this {@link ByteBufList}.
*/
public void prepend(ByteBuf buf) {
- buffers.add(0, buf);
+ // don't unwrap slices
+ final ByteBuf unwrapped = buf.unwrap() != null && buf.unwrap()
instanceof CompositeByteBuf
+ ? buf.unwrap() : buf;
+ ReferenceCountUtil.retain(unwrapped);
+ ReferenceCountUtil.release(buf);
+
+ if (unwrapped instanceof CompositeByteBuf) {
+ CompositeByteBuf composite = (CompositeByteBuf) unwrapped;
+ for (int i = composite.numComponents() - 1; i >= 0; i--) {
+ ByteBuf b = composite.component(i);
+ ReferenceCountUtil.retain(b);
+ buffers.add(0, b);
+ }
+ ReferenceCountUtil.release(unwrapped);
+ } else {
+ buffers.add(0, unwrapped);
+ }
}
/**
@@ -259,7 +289,7 @@ public class ByteBufList extends AbstractReferenceCounted {
@Override
protected void deallocate() {
for (int i = 0; i < buffers.size(); i++) {
- buffers.get(i).release();
+ ReferenceCountUtil.release(buffers.get(i));
}
buffers.clear();
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/ByteBufListTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/ByteBufListTest.java
index 19c841b..65f51e2 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/ByteBufListTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/ByteBufListTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertEquals;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
@@ -89,6 +90,39 @@ public class ByteBufListTest {
}
@Test
+ public void testComposite() throws Exception {
+ ByteBuf b1 = PooledByteBufAllocator.DEFAULT.heapBuffer(128, 128);
+ b1.writerIndex(b1.capacity());
+ ByteBuf b2 = PooledByteBufAllocator.DEFAULT.heapBuffer(128, 128);
+ b2.writerIndex(b2.capacity());
+
+ CompositeByteBuf composite =
PooledByteBufAllocator.DEFAULT.compositeBuffer();
+ composite.addComponent(b1);
+ composite.addComponent(b2);
+
+ ByteBufList buf = ByteBufList.get(composite);
+
+ // composite is unwrapped into two parts
+ assertEquals(2, buf.size());
+ // and released
+ assertEquals(composite.refCnt(), 0);
+
+ assertEquals(256, buf.readableBytes());
+ assertEquals(b1, buf.getBuffer(0));
+ assertEquals(b2, buf.getBuffer(1));
+
+ assertEquals(buf.refCnt(), 1);
+ assertEquals(b1.refCnt(), 1);
+ assertEquals(b2.refCnt(), 1);
+
+ buf.release();
+
+ assertEquals(buf.refCnt(), 0);
+ assertEquals(b1.refCnt(), 0);
+ assertEquals(b2.refCnt(), 0);
+ }
+
+ @Test
public void testClone() throws Exception {
ByteBuf b1 = PooledByteBufAllocator.DEFAULT.heapBuffer(128, 128);
b1.writerIndex(b1.capacity());