This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 1694d67713 [fix] Fix ByteBuf release/retain in PerChannelBookClient
(#4289)
1694d67713 is described below
commit 1694d67713496a67efdf2e3e076cf798713346c8
Author: fengyubiao <[email protected]>
AuthorDate: Thu Apr 18 06:37:29 2024 +0800
[fix] Fix ByteBuf release/retain in PerChannelBookClient (#4289)
* [fix] ByteBuf release/retain incorrect
* improve the code comment
* fix other cases
* modify the code comment
* improve the code
* improve the test
* add description
---
.../bookkeeper/proto/PerChannelBookieClient.java | 11 +-
.../apache/bookkeeper/test/BookieClientTest.java | 171 +++++++++++++++++++++
2 files changed, 179 insertions(+), 3 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 95eef54111..8485d21b74 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -842,10 +842,10 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
cb, ctx, ledgerId,
entryId));
final Channel c = channel;
if (c == null) {
- // usually checked in writeAndFlush, but we have extra check
- // because we need to release toSend.
+ // Manually release the binary data(variable "request") that we
manually created when it can not be sent out
+ // because the channel is switching.
errorOut(completionKey);
- ReferenceCountUtil.release(toSend);
+ ReferenceCountUtil.release(request);
return;
} else {
// addEntry times out on backpressure
@@ -1180,6 +1180,7 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
if (channel == null) {
LOG.warn("Operation {} failed: channel == null",
StringUtils.requestToString(request));
errorOut(key);
+ ReferenceCountUtil.release(request);
return;
}
@@ -1194,6 +1195,7 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
StringUtils.requestToString(request));
errorOut(key, BKException.Code.TooManyRequestsException);
+ ReferenceCountUtil.release(request);
return;
}
@@ -1215,6 +1217,9 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
} catch (Throwable e) {
LOG.warn("Operation {} failed",
StringUtils.requestToString(request), e);
errorOut(key);
+ // If the request goes into the writeAndFlush, it should be
handled well by Netty. So all the exceptions we
+ // get here, we can release the request.
+ ReferenceCountUtil.release(request);
}
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
index 60f89159a0..2fadbbd2c2 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
@@ -27,8 +27,10 @@ import static org.junit.Assert.assertTrue;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
+import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.ReferenceCounted;
@@ -36,16 +38,20 @@ import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.time.Duration;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.bookie.MockUncleanShutdownDetection;
import org.apache.bookkeeper.bookie.TestBookieImpl;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BKException.Code;
+import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeperClientStats;
import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
import org.apache.bookkeeper.client.api.WriteFlag;
@@ -57,6 +63,7 @@ import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookieClient;
import org.apache.bookkeeper.proto.BookieClientImpl;
+import org.apache.bookkeeper.proto.BookieProtoEncoding;
import org.apache.bookkeeper.proto.BookieProtocol;
import org.apache.bookkeeper.proto.BookieServer;
import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetBookieInfoCallback;
@@ -64,6 +71,8 @@ import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
import org.apache.bookkeeper.proto.BookkeeperProtocol;
import org.apache.bookkeeper.proto.DataFormats;
+import org.apache.bookkeeper.proto.PerChannelBookieClient;
+import org.apache.bookkeeper.proto.PerChannelBookieClientPool;
import org.apache.bookkeeper.proto.checksum.DigestManager;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.test.TestStatsProvider.TestOpStatsLogger;
@@ -71,6 +80,7 @@ import
org.apache.bookkeeper.test.TestStatsProvider.TestStatsLogger;
import org.apache.bookkeeper.util.ByteBufList;
import org.apache.bookkeeper.util.IOUtils;
import org.awaitility.Awaitility;
+import org.awaitility.reflect.WhiteboxImpl;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -78,6 +88,7 @@ import org.junit.Test;
/**
* Test the bookie client.
*/
+@Slf4j
public class BookieClientTest {
BookieServer bs;
File tmpDir;
@@ -745,4 +756,164 @@ public class BookieClientTest {
assertTrue(Arrays.equals(kbData, bytes));
}
}
+
+ /**
+ * Explain the stacks of "BookieClientImpl.addEntry" here
+ * 1.`BookieClientImpl.addEntry`.
+ * a.Retain the `ByteBuf` before get `PerChannelBookieClient`. We call
this `ByteBuf` as `toSend` in the
+ * following sections. `toSend.recCnf` is `2` now.
+ * 2.`Get PerChannelBookieClient`.
+ * 3.`ChannelReadyForAddEntryCallback.operationComplete`
+ * a.`PerChannelBookieClient.addEntry`
+ * a-1.Build a new ByteBuf for request command. We call this `ByteBuf`
new as `request` in the following
+ * sections.
+ * a-2.`channle.writeAndFlush(request)` or release the ByteBuf when
`channel` is switching.
+ * Note the callback will be called immediately if the channel is
switching.
+ * b.Release the `ByteBuf` since it has been retained at `step 1`.
`toSend.recCnf` should be `1` now.
+ */
+ public void testDataRefCnfWhenReconnect(boolean useV2WireProtocol, boolean
smallPayload,
+ boolean withDelayReconnect,
boolean withDelayAddEntry,
+ int tryTimes) throws Exception {
+ final long ledgerId = 1;
+ final BookieId addr = bs.getBookieId();
+ // Build passwd.
+ byte[] passwd = new byte[20];
+ Arrays.fill(passwd, (byte) 'a');
+ // Build digest manager.
+ DigestManager digestManager = DigestManager.instantiate(1, passwd,
+
BookKeeper.DigestType.toProtoDigestType(BookKeeper.DigestType.DUMMY),
+ PooledByteBufAllocator.DEFAULT, useV2WireProtocol);
+ // Build client.
+ ClientConfiguration clientConf = new ClientConfiguration();
+ clientConf.setUseV2WireProtocol(useV2WireProtocol);
+ BookieClientImpl client = new BookieClientImpl(clientConf,
eventLoopGroup,
+ UnpooledByteBufAllocator.DEFAULT, executor, scheduler,
NullStatsLogger.INSTANCE,
+ BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
+
+ // Inject a reconnect event.
+ // 1. Get the channel that will be used.
+ // 2. Call add entry.
+ // 3. Another thread close the channel that is using.
+ for (int i = 0; i < tryTimes; i++) {
+ long entryId = i + 1;
+ long lac = i;
+ // Build payload.
+ int payloadLen;
+ ByteBuf payload;
+ if (smallPayload) {
+ payloadLen = 1;
+ payload = PooledByteBufAllocator.DEFAULT.buffer(1);
+ payload.writeByte(1);
+ } else {
+ payloadLen = BookieProtoEncoding.SMALL_ENTRY_SIZE_THRESHOLD;
+ payload = PooledByteBufAllocator.DEFAULT.buffer();
+ byte[] bs = new byte[payloadLen];
+ payload.writeBytes(bs);
+ }
+
+ // Digest.
+ ReferenceCounted bb =
digestManager.computeDigestAndPackageForSending(entryId, lac,
+ payloadLen * entryId, payload, passwd,
BookieProtocol.FLAG_NONE);
+ log.info("Before send. bb.refCnf: {}", bb.refCnt());
+
+ // Step: get the channel that will be used.
+ PerChannelBookieClientPool perChannelBookieClientPool =
client.lookupClient(addr);
+ AtomicReference<PerChannelBookieClient> perChannelBookieClient =
new AtomicReference<>();
+ perChannelBookieClientPool.obtain((rc, result) ->
perChannelBookieClient.set(result), ledgerId);
+ Awaitility.await().untilAsserted(() -> {
+ assertNotNull(perChannelBookieClient.get());
+ });
+
+ // Step: Inject a reconnect event.
+ final int delayMillis = i;
+ new Thread(() -> {
+ if (withDelayReconnect) {
+ sleep(delayMillis);
+ }
+ Channel channel =
WhiteboxImpl.getInternalState(perChannelBookieClient.get(), "channel");
+ if (channel != null) {
+ channel.close();
+ }
+ }).start();
+ if (withDelayAddEntry) {
+ sleep(delayMillis);
+ }
+
+ // Step: add entry.
+ AtomicBoolean callbackExecuted = new AtomicBoolean();
+ WriteCallback callback = (rc, lId, eId, socketAddr, ctx) -> {
+ log.info("Writing is finished. rc: {}, withDelayReconnect: {},
withDelayAddEntry: {}, ledgerId: {},"
+ + " entryId: {}, socketAddr: {}, ctx: {}",
+ rc, withDelayReconnect, withDelayAddEntry, lId, eId,
socketAddr, ctx);
+ callbackExecuted.set(true);
+ };
+ client.addEntry(addr, ledgerId, passwd, entryId, bb, callback, i,
BookieProtocol.FLAG_NONE, false,
+ WriteFlag.NONE);
+ // Wait for adding entry is finish.
+ Awaitility.await().untilAsserted(() ->
assertTrue(callbackExecuted.get()));
+ // The steps have be explained on the method description.
+ // Since the step "3-a-2" always runs before the step "3-b", so
the "callbackExecuted" will be finished
+ // before the step "3-b". Add a sleep to wait the step "3-a-2" is
finish.
+ Thread.sleep(100);
+ // Check the ref count.
+ Awaitility.await().atMost(Duration.ofSeconds(60)).untilAsserted(()
-> {
+ assertEquals(1, bb.refCnt());
+ // V2 will release this original data if it is a small.
+ if (!useV2WireProtocol && !smallPayload) {
+ assertEquals(1, payload.refCnt());
+ }
+ });
+ bb.release();
+ // V2 will release this original data if it is a small.
+ if (!useV2WireProtocol && !smallPayload) {
+ payload.release();
+ }
+ }
+ // cleanup.
+ client.close();
+ }
+
+ private void sleep(int milliSeconds) {
+ try {
+ if (milliSeconds > 0) {
+ Thread.sleep(1);
+ }
+ } catch (InterruptedException e) {
+ log.warn("Error occurs", e);
+ }
+ }
+
+ /**
+ * Relate to https://github.com/apache/bookkeeper/pull/4289.
+ */
+ @Test
+ public void testDataRefCnfWhenReconnectV2() throws Exception {
+ // Large payload.
+ // Run this test may not reproduce the issue, you can reproduce the
issue this way:
+ // 1. Add two break points.
+ // a. At the line "Channel c = channel" in the method
PerChannelBookieClient.addEntry.
+ // b. At the line "channel = null" in the method
"PerChannelBookieClient.channelInactive".
+ // 2. Make the break point b to run earlier than the break point a
during debugging.
+ testDataRefCnfWhenReconnect(true, false, false, false, 10);
+ testDataRefCnfWhenReconnect(true, false, true, false, 10);
+ testDataRefCnfWhenReconnect(true, false, false, true, 10);
+
+ // Small payload.
+ // There is no issue without
https://github.com/apache/bookkeeper/pull/4289, just add a test for this
scenario.
+ testDataRefCnfWhenReconnect(true, true, false, false, 10);
+ testDataRefCnfWhenReconnect(true, true, true, false, 10);
+ testDataRefCnfWhenReconnect(true, true, false, true, 10);
+ }
+
+ /**
+ * Please see the comment of the scenario "Large payload" in the {@link
#testDataRefCnfWhenReconnectV2()} if you
+ * can not reproduce the issue when running this test.
+ * Relate to https://github.com/apache/bookkeeper/pull/4289.
+ */
+ @Test
+ public void testDataRefCnfWhenReconnectV3() throws Exception {
+ testDataRefCnfWhenReconnect(false, true, false, false, 10);
+ testDataRefCnfWhenReconnect(false, true, true, false, 10);
+ testDataRefCnfWhenReconnect(false, true, false, true, 10);
+ }
}