This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-4.17
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git

commit 9b0e9e823efe463f4b3cd030b9c381c926d9dcf9
Author: wenbingshen <[email protected]>
AuthorDate: Mon Mar 16 21:08:05 2026 +0800

    [client] fix writeLac memory leak and thread safety issue (#4713)
    
    * fix writeLac memory leak
    
    ---------
    
    Co-authored-by: wenbingshen <[email protected]>
    (cherry picked from commit 951f10d9e7efb94da9f128fdc990f8bdab342a2c)
---
 .../bookkeeper/client/PendingWriteLacOp.java       |  28 ++--
 .../bookkeeper/client/PendingWriteLacOpTest.java   | 178 +++++++++++++++++++++
 2 files changed, 194 insertions(+), 12 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java
index f9a5397daf..32e5f8a848 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java
@@ -17,6 +17,7 @@
  */
 package org.apache.bookkeeper.client;
 
+import io.netty.util.ReferenceCountUtil;
 import java.util.BitSet;
 import java.util.List;
 import org.apache.bookkeeper.client.AsyncCallback.AddLacCallback;
@@ -37,7 +38,6 @@ import org.slf4j.LoggerFactory;
  */
 class PendingWriteLacOp implements WriteLacCallback {
     private static final Logger LOG = 
LoggerFactory.getLogger(PendingWriteLacOp.class);
-    ByteBufList toSend;
     AddLacCallback cb;
     long lac;
     Object ctx;
@@ -59,11 +59,11 @@ class PendingWriteLacOp implements WriteLacCallback {
         this.cb = cb;
         this.ctx = ctx;
         this.lac = LedgerHandle.INVALID_ENTRY_ID;
-        ackSet = lh.distributionSchedule.getAckSet();
+        ackSet = lh.getDistributionSchedule().getAckSet();
         currentEnsemble = ensemble;
     }
 
-    void setLac(long lac) {
+    synchronized void setLac(long lac) {
         this.lac = lac;
 
         this.receivedResponseSet = new BitSet(
@@ -72,23 +72,29 @@ class PendingWriteLacOp implements WriteLacCallback {
                 lh.getLedgerMetadata().getWriteQuorumSize());
     }
 
-    void sendWriteLacRequest(int bookieIndex) {
+    void sendWriteLacRequest(int bookieIndex, ByteBufList toSend) {
         clientCtx.getBookieClient().writeLac(currentEnsemble.get(bookieIndex),
                                              lh.ledgerId, lh.ledgerKey, lac, 
toSend, this, bookieIndex);
     }
 
     void initiate(ByteBufList toSend) {
-        this.toSend = toSend;
-
-        for (int i = 0; i < lh.distributionSchedule.getWriteQuorumSize(); i++) 
{
-            
sendWriteLacRequest(lh.distributionSchedule.getWriteSetBookieIndex(lac, i));
+        try {
+            for (int i = 0; i < 
lh.getDistributionSchedule().getWriteQuorumSize(); i++) {
+                
sendWriteLacRequest(lh.getDistributionSchedule().getWriteSetBookieIndex(lac, 
i), toSend);
+            }
+        } finally {
+            ReferenceCountUtil.release(toSend);
         }
+
     }
 
     @Override
-    public void writeLacComplete(int rc, long ledgerId, BookieId addr, Object 
ctx) {
+    public synchronized void writeLacComplete(int rc, long ledgerId, BookieId 
addr, Object ctx) {
         int bookieIndex = (Integer) ctx;
 
+        // We got response.
+        receivedResponseSet.clear(bookieIndex);
+
         if (completed) {
             return;
         }
@@ -97,9 +103,6 @@ class PendingWriteLacOp implements WriteLacCallback {
             lastSeenError = rc;
         }
 
-        // We got response.
-        receivedResponseSet.clear(bookieIndex);
-
         if (rc == BKException.Code.OK) {
             if (ackSet.completeBookieAndCheck(bookieIndex) && !completed) {
                 completed = true;
@@ -115,4 +118,5 @@ class PendingWriteLacOp implements WriteLacCallback {
             cb.addLacComplete(lastSeenError, lh, ctx);
         }
     }
+
 }
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/PendingWriteLacOpTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/PendingWriteLacOpTest.java
new file mode 100644
index 0000000000..0a8fa83b70
--- /dev/null
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/PendingWriteLacOpTest.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.net.BookieId;
+import org.apache.bookkeeper.proto.BookieClient;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
+import org.apache.bookkeeper.util.ByteBufList;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit test of {@link PendingWriteLacOp}.
+ */
+public class PendingWriteLacOpTest implements AsyncCallback.AddLacCallback {
+
+    private LedgerHandle lh;
+    private ClientContext mockClientContext;
+    private BookieClient mockBookieClient;
+    private boolean callbackInvoked;
+
+    @Before
+    public void setup() {
+        lh = mock(LedgerHandle.class);
+        mockClientContext = mock(ClientContext.class);
+        mockBookieClient = mock(BookieClient.class);
+        doNothing().when(mockBookieClient).writeLac(any(BookieId.class), 
anyLong(), any(byte[].class), anyLong(),
+                any(ByteBufList.class), 
any(BookkeeperInternalCallbacks.WriteLacCallback.class), any(Object.class));
+        when(mockClientContext.getBookieClient()).thenReturn(mockBookieClient);
+        callbackInvoked = false;
+    }
+
+    @Test
+    public void testWriteLacOp332() {
+        // 3-3-2: ack quorum=2, complete after 2 OK responses, release toSend 
after 3rd response
+        when(lh.getDistributionSchedule())
+                .thenReturn(new RoundRobinDistributionSchedule(3, 2, 3));
+
+        LedgerMetadata ledgerMetadata = mock(LedgerMetadata.class);
+        when(ledgerMetadata.getWriteQuorumSize()).thenReturn(3);
+        when(ledgerMetadata.getAckQuorumSize()).thenReturn(2);
+        when(lh.getLedgerMetadata()).thenReturn(ledgerMetadata);
+
+        PendingWriteLacOp writeLacOp = new PendingWriteLacOp(lh, 
mockClientContext,
+                lh.getCurrentEnsemble(), this, null);
+        writeLacOp.setLac(1000);
+
+        assertEquals(1000, writeLacOp.lac);
+        assertFalse(writeLacOp.completed);
+        assertFalse(writeLacOp.receivedResponseSet.isEmpty());
+
+        writeLacOp.writeLacComplete(BKException.Code.OK, 2000, null, 0);
+        writeLacOp.writeLacComplete(BKException.Code.OK, 2000, null, 1);
+
+        assertTrue(callbackInvoked);
+        assertTrue(writeLacOp.completed);
+        assertFalse(writeLacOp.receivedResponseSet.isEmpty());
+
+        writeLacOp.writeLacComplete(BKException.Code.OK, 2000, null, 2);
+        assertTrue(writeLacOp.receivedResponseSet.isEmpty());
+    }
+
+    @Test
+    public void testWriteLacOp333() {
+        // 3-3-3: ack quorum=3, complete only after all 3 responses
+        when(lh.getDistributionSchedule())
+                .thenReturn(new RoundRobinDistributionSchedule(3, 3, 3));
+
+        LedgerMetadata ledgerMetadata = mock(LedgerMetadata.class);
+        when(ledgerMetadata.getWriteQuorumSize()).thenReturn(3);
+        when(ledgerMetadata.getAckQuorumSize()).thenReturn(3);
+        when(lh.getLedgerMetadata()).thenReturn(ledgerMetadata);
+
+        PendingWriteLacOp writeLacOp = new PendingWriteLacOp(lh, 
mockClientContext,
+                lh.getCurrentEnsemble(), this, null);
+        writeLacOp.setLac(1000);
+
+        assertEquals(1000, writeLacOp.lac);
+        assertFalse(writeLacOp.completed);
+        assertFalse(writeLacOp.receivedResponseSet.isEmpty());
+
+        writeLacOp.writeLacComplete(BKException.Code.OK, 2000, null, 0);
+        writeLacOp.writeLacComplete(BKException.Code.OK, 2000, null, 1);
+
+        assertFalse(callbackInvoked);
+        assertFalse(writeLacOp.completed);
+        assertFalse(writeLacOp.receivedResponseSet.isEmpty());
+
+        writeLacOp.writeLacComplete(BKException.Code.OK, 2000, null, 2);
+        assertTrue(callbackInvoked);
+        assertTrue(writeLacOp.completed);
+        assertTrue(writeLacOp.receivedResponseSet.isEmpty());
+    }
+
+    @Test
+    public void testWriteLacOp111() {
+        // 1-1-1: single bookie, complete immediately on first response
+        when(lh.getDistributionSchedule())
+                .thenReturn(new RoundRobinDistributionSchedule(1, 1, 1));
+
+        LedgerMetadata ledgerMetadata = mock(LedgerMetadata.class);
+        when(ledgerMetadata.getWriteQuorumSize()).thenReturn(1);
+        when(ledgerMetadata.getAckQuorumSize()).thenReturn(1);
+        when(lh.getLedgerMetadata()).thenReturn(ledgerMetadata);
+
+        PendingWriteLacOp writeLacOp = new PendingWriteLacOp(lh, 
mockClientContext,
+                lh.getCurrentEnsemble(), this, null);
+        writeLacOp.setLac(1000);
+
+        assertFalse(writeLacOp.completed);
+        assertFalse(writeLacOp.receivedResponseSet.isEmpty());
+
+        writeLacOp.writeLacComplete(BKException.Code.OK, 2000, null, 0);
+
+        assertTrue(callbackInvoked);
+        assertTrue(writeLacOp.completed);
+        assertTrue(writeLacOp.receivedResponseSet.isEmpty());
+    }
+
+    @Test
+    public void testInitiateReleasesBuffer() {
+        // Verify toSend buffer is released by initiate() after all requests 
are sent
+        when(lh.getDistributionSchedule())
+                .thenReturn(new RoundRobinDistributionSchedule(3, 2, 3));
+
+        LedgerMetadata ledgerMetadata = mock(LedgerMetadata.class);
+        when(ledgerMetadata.getWriteQuorumSize()).thenReturn(3);
+        when(ledgerMetadata.getAckQuorumSize()).thenReturn(2);
+        when(lh.getLedgerMetadata()).thenReturn(ledgerMetadata);
+        
when(lh.getCurrentEnsemble()).thenReturn(Arrays.asList(BookieId.parse("bookie1"),
+                BookieId.parse("bookie2"), BookieId.parse("bookie3")));
+
+        PendingWriteLacOp writeLacOp = new PendingWriteLacOp(lh, 
mockClientContext,
+                lh.getCurrentEnsemble(), this, null);
+
+        writeLacOp.setLac(1000);
+
+        ByteBufList toSend = ByteBufList.get();
+        assertEquals(1, toSend.refCnt());
+
+        writeLacOp.initiate(toSend);
+
+        // After initiate(), the caller's reference should be released
+        assertEquals(0, toSend.refCnt());
+    }
+
+    @Override
+    public synchronized void addLacComplete(int rc, LedgerHandle lh, Object 
ctx) {
+        callbackInvoked = true;
+    }
+}

Reply via email to