This is an automated email from the ASF dual-hosted git repository.
shenwenbing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 951f10d9e7 [client] fix writeLac memory leak and thread safety issue
(#4713)
951f10d9e7 is described below
commit 951f10d9e7efb94da9f128fdc990f8bdab342a2c
Author: wenbingshen <[email protected]>
AuthorDate: Mon Mar 16 21:08:05 2026 +0800
[client] fix writeLac memory leak and thread safety issue (#4713)
* fix writeLac memory leak
---------
Co-authored-by: wenbingshen <[email protected]>
---
.../bookkeeper/client/PendingWriteLacOp.java | 28 ++--
.../bookkeeper/client/PendingWriteLacOpTest.java | 178 +++++++++++++++++++++
2 files changed, 194 insertions(+), 12 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java
index f9a5397daf..32e5f8a848 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java
@@ -17,6 +17,7 @@
*/
package org.apache.bookkeeper.client;
+import io.netty.util.ReferenceCountUtil;
import java.util.BitSet;
import java.util.List;
import org.apache.bookkeeper.client.AsyncCallback.AddLacCallback;
@@ -37,7 +38,6 @@ import org.slf4j.LoggerFactory;
*/
class PendingWriteLacOp implements WriteLacCallback {
private static final Logger LOG =
LoggerFactory.getLogger(PendingWriteLacOp.class);
- ByteBufList toSend;
AddLacCallback cb;
long lac;
Object ctx;
@@ -59,11 +59,11 @@ class PendingWriteLacOp implements WriteLacCallback {
this.cb = cb;
this.ctx = ctx;
this.lac = LedgerHandle.INVALID_ENTRY_ID;
- ackSet = lh.distributionSchedule.getAckSet();
+ ackSet = lh.getDistributionSchedule().getAckSet();
currentEnsemble = ensemble;
}
- void setLac(long lac) {
+ synchronized void setLac(long lac) {
this.lac = lac;
this.receivedResponseSet = new BitSet(
@@ -72,23 +72,29 @@ class PendingWriteLacOp implements WriteLacCallback {
lh.getLedgerMetadata().getWriteQuorumSize());
}
- void sendWriteLacRequest(int bookieIndex) {
+ void sendWriteLacRequest(int bookieIndex, ByteBufList toSend) {
clientCtx.getBookieClient().writeLac(currentEnsemble.get(bookieIndex),
lh.ledgerId, lh.ledgerKey, lac,
toSend, this, bookieIndex);
}
void initiate(ByteBufList toSend) {
- this.toSend = toSend;
-
- for (int i = 0; i < lh.distributionSchedule.getWriteQuorumSize(); i++)
{
-
sendWriteLacRequest(lh.distributionSchedule.getWriteSetBookieIndex(lac, i));
+ try {
+ for (int i = 0; i <
lh.getDistributionSchedule().getWriteQuorumSize(); i++) {
+
sendWriteLacRequest(lh.getDistributionSchedule().getWriteSetBookieIndex(lac,
i), toSend);
+ }
+ } finally {
+ ReferenceCountUtil.release(toSend);
}
+
}
@Override
- public void writeLacComplete(int rc, long ledgerId, BookieId addr, Object
ctx) {
+ public synchronized void writeLacComplete(int rc, long ledgerId, BookieId
addr, Object ctx) {
int bookieIndex = (Integer) ctx;
+ // We got response.
+ receivedResponseSet.clear(bookieIndex);
+
if (completed) {
return;
}
@@ -97,9 +103,6 @@ class PendingWriteLacOp implements WriteLacCallback {
lastSeenError = rc;
}
- // We got response.
- receivedResponseSet.clear(bookieIndex);
-
if (rc == BKException.Code.OK) {
if (ackSet.completeBookieAndCheck(bookieIndex) && !completed) {
completed = true;
@@ -115,4 +118,5 @@ class PendingWriteLacOp implements WriteLacCallback {
cb.addLacComplete(lastSeenError, lh, ctx);
}
}
+
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/PendingWriteLacOpTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/PendingWriteLacOpTest.java
new file mode 100644
index 0000000000..0a8fa83b70
--- /dev/null
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/PendingWriteLacOpTest.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.net.BookieId;
+import org.apache.bookkeeper.proto.BookieClient;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
+import org.apache.bookkeeper.util.ByteBufList;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit test of {@link PendingWriteLacOp}.
+ */
+public class PendingWriteLacOpTest implements AsyncCallback.AddLacCallback {
+
+ private LedgerHandle lh;
+ private ClientContext mockClientContext;
+ private BookieClient mockBookieClient;
+ private boolean callbackInvoked;
+
+ @Before
+ public void setup() {
+ lh = mock(LedgerHandle.class);
+ mockClientContext = mock(ClientContext.class);
+ mockBookieClient = mock(BookieClient.class);
+ doNothing().when(mockBookieClient).writeLac(any(BookieId.class),
anyLong(), any(byte[].class), anyLong(),
+ any(ByteBufList.class),
any(BookkeeperInternalCallbacks.WriteLacCallback.class), any(Object.class));
+ when(mockClientContext.getBookieClient()).thenReturn(mockBookieClient);
+ callbackInvoked = false;
+ }
+
+ @Test
+ public void testWriteLacOp332() {
+ // 3-3-2: ack quorum=2, complete after 2 OK responses, release toSend
after 3rd response
+ when(lh.getDistributionSchedule())
+ .thenReturn(new RoundRobinDistributionSchedule(3, 2, 3));
+
+ LedgerMetadata ledgerMetadata = mock(LedgerMetadata.class);
+ when(ledgerMetadata.getWriteQuorumSize()).thenReturn(3);
+ when(ledgerMetadata.getAckQuorumSize()).thenReturn(2);
+ when(lh.getLedgerMetadata()).thenReturn(ledgerMetadata);
+
+ PendingWriteLacOp writeLacOp = new PendingWriteLacOp(lh,
mockClientContext,
+ lh.getCurrentEnsemble(), this, null);
+ writeLacOp.setLac(1000);
+
+ assertEquals(1000, writeLacOp.lac);
+ assertFalse(writeLacOp.completed);
+ assertFalse(writeLacOp.receivedResponseSet.isEmpty());
+
+ writeLacOp.writeLacComplete(BKException.Code.OK, 2000, null, 0);
+ writeLacOp.writeLacComplete(BKException.Code.OK, 2000, null, 1);
+
+ assertTrue(callbackInvoked);
+ assertTrue(writeLacOp.completed);
+ assertFalse(writeLacOp.receivedResponseSet.isEmpty());
+
+ writeLacOp.writeLacComplete(BKException.Code.OK, 2000, null, 2);
+ assertTrue(writeLacOp.receivedResponseSet.isEmpty());
+ }
+
+ @Test
+ public void testWriteLacOp333() {
+ // 3-3-3: ack quorum=3, complete only after all 3 responses
+ when(lh.getDistributionSchedule())
+ .thenReturn(new RoundRobinDistributionSchedule(3, 3, 3));
+
+ LedgerMetadata ledgerMetadata = mock(LedgerMetadata.class);
+ when(ledgerMetadata.getWriteQuorumSize()).thenReturn(3);
+ when(ledgerMetadata.getAckQuorumSize()).thenReturn(3);
+ when(lh.getLedgerMetadata()).thenReturn(ledgerMetadata);
+
+ PendingWriteLacOp writeLacOp = new PendingWriteLacOp(lh,
mockClientContext,
+ lh.getCurrentEnsemble(), this, null);
+ writeLacOp.setLac(1000);
+
+ assertEquals(1000, writeLacOp.lac);
+ assertFalse(writeLacOp.completed);
+ assertFalse(writeLacOp.receivedResponseSet.isEmpty());
+
+ writeLacOp.writeLacComplete(BKException.Code.OK, 2000, null, 0);
+ writeLacOp.writeLacComplete(BKException.Code.OK, 2000, null, 1);
+
+ assertFalse(callbackInvoked);
+ assertFalse(writeLacOp.completed);
+ assertFalse(writeLacOp.receivedResponseSet.isEmpty());
+
+ writeLacOp.writeLacComplete(BKException.Code.OK, 2000, null, 2);
+ assertTrue(callbackInvoked);
+ assertTrue(writeLacOp.completed);
+ assertTrue(writeLacOp.receivedResponseSet.isEmpty());
+ }
+
+ @Test
+ public void testWriteLacOp111() {
+ // 1-1-1: single bookie, complete immediately on first response
+ when(lh.getDistributionSchedule())
+ .thenReturn(new RoundRobinDistributionSchedule(1, 1, 1));
+
+ LedgerMetadata ledgerMetadata = mock(LedgerMetadata.class);
+ when(ledgerMetadata.getWriteQuorumSize()).thenReturn(1);
+ when(ledgerMetadata.getAckQuorumSize()).thenReturn(1);
+ when(lh.getLedgerMetadata()).thenReturn(ledgerMetadata);
+
+ PendingWriteLacOp writeLacOp = new PendingWriteLacOp(lh,
mockClientContext,
+ lh.getCurrentEnsemble(), this, null);
+ writeLacOp.setLac(1000);
+
+ assertFalse(writeLacOp.completed);
+ assertFalse(writeLacOp.receivedResponseSet.isEmpty());
+
+ writeLacOp.writeLacComplete(BKException.Code.OK, 2000, null, 0);
+
+ assertTrue(callbackInvoked);
+ assertTrue(writeLacOp.completed);
+ assertTrue(writeLacOp.receivedResponseSet.isEmpty());
+ }
+
+ @Test
+ public void testInitiateReleasesBuffer() {
+ // Verify toSend buffer is released by initiate() after all requests
are sent
+ when(lh.getDistributionSchedule())
+ .thenReturn(new RoundRobinDistributionSchedule(3, 2, 3));
+
+ LedgerMetadata ledgerMetadata = mock(LedgerMetadata.class);
+ when(ledgerMetadata.getWriteQuorumSize()).thenReturn(3);
+ when(ledgerMetadata.getAckQuorumSize()).thenReturn(2);
+ when(lh.getLedgerMetadata()).thenReturn(ledgerMetadata);
+
when(lh.getCurrentEnsemble()).thenReturn(Arrays.asList(BookieId.parse("bookie1"),
+ BookieId.parse("bookie2"), BookieId.parse("bookie3")));
+
+ PendingWriteLacOp writeLacOp = new PendingWriteLacOp(lh,
mockClientContext,
+ lh.getCurrentEnsemble(), this, null);
+
+ writeLacOp.setLac(1000);
+
+ ByteBufList toSend = ByteBufList.get();
+ assertEquals(1, toSend.refCnt());
+
+ writeLacOp.initiate(toSend);
+
+ // After initiate(), the caller's reference should be released
+ assertEquals(0, toSend.refCnt());
+ }
+
+ @Override
+ public synchronized void addLacComplete(int rc, LedgerHandle lh, Object
ctx) {
+ callbackInvoked = true;
+ }
+}