This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch branch-4.17
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git

commit 143e5a8d58703c349196cd16701e9b9c63df79c3
Author: fengyubiao <[email protected]>
AuthorDate: Wed May 14 09:37:52 2025 +0800

    [fix] Write stuck due to pending add callback by multiple threads (#4557)
    
    [fix] Write stuck due to pending add callback by multiple threads (#4557)
    
    (cherry picked from commit e47926bc71c8f4b9fc548646b2abb46700923b37)
---
 .../apache/bookkeeper/proto/BookieClientImpl.java  |  30 +++--
 .../bookkeeper/proto/PerChannelBookieClient.java   |   8 +-
 .../proto/ClientSocketDisconnectTest.java          | 144 +++++++++++++++++++++
 3 files changed, 171 insertions(+), 11 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java
index a12d9fd64d..bd04e8b5cc 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java
@@ -22,6 +22,7 @@ package org.apache.bookkeeper.proto;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.protobuf.ExtensionRegistry;
 import io.netty.buffer.ByteBuf;
@@ -370,7 +371,9 @@ public class BookieClientImpl implements BookieClient, 
PerChannelBookieClientFac
         }
     }
 
-    private static class ChannelReadyForAddEntryCallback
+    // Without test, this class should be modifier with "private".
+    @VisibleForTesting
+    static class ChannelReadyForAddEntryCallback
         implements GenericCallback<PerChannelBookieClient> {
         private final Handle<ChannelReadyForAddEntryCallback> recyclerHandle;
 
@@ -380,7 +383,9 @@ public class BookieClientImpl implements BookieClient, 
PerChannelBookieClientFac
         private long entryId;
         private BookieId addr;
         private Object ctx;
-        private WriteCallback cb;
+        // Without test, this class should be modifier with "private".
+        @VisibleForTesting
+        WriteCallback cb;
         private int options;
         private byte[] masterKey;
         private boolean allowFastFail;
@@ -409,17 +414,24 @@ public class BookieClientImpl implements BookieClient, 
PerChannelBookieClientFac
         @Override
         public void operationComplete(final int rc,
                                       PerChannelBookieClient pcbc) {
-            try {
-                if (rc != BKException.Code.OK) {
-                    bookieClient.completeAdd(rc, ledgerId, entryId, addr, cb, 
ctx);
-                } else {
+            if (rc != BKException.Code.OK) {
+                bookieClient.executor.executeOrdered(ledgerId, () -> {
+                    try {
+                        bookieClient.completeAdd(rc, ledgerId, entryId, addr, 
cb, ctx);
+                    } finally {
+                        ReferenceCountUtil.release(toSend);
+                    }
+                    recycle();
+                });
+            } else {
+                try {
                     pcbc.addEntry(ledgerId, masterKey, entryId,
                             toSend, cb, ctx, options, allowFastFail, 
writeFlags);
+                } finally {
+                    ReferenceCountUtil.release(toSend);
                 }
-            } finally {
-                ReferenceCountUtil.release(toSend);
+                recycle();
             }
-            recycle();
         }
 
         private ChannelReadyForAddEntryCallback(
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index d5faaa2d71..b163918b89 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -600,11 +600,15 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
         }
 
         ChannelFuture future = bootstrap.connect(bookieAddr);
-        future.addListener(contextPreservingListener(new 
ConnectionFutureListener(startTime)));
-        future.addListener(x -> makeWritable());
+        addChannelListeners(future, startTime);
         return future;
     }
 
+    protected void addChannelListeners(ChannelFuture future, long 
connectStartTime) {
+        future.addListener(contextPreservingListener(new 
ConnectionFutureListener(connectStartTime)));
+        future.addListener(x -> makeWritable());
+    }
+
     void cleanDisconnectAndClose() {
         disconnect();
         close();
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ClientSocketDisconnectTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ClientSocketDisconnectTest.java
new file mode 100644
index 0000000000..2b4eb74397
--- /dev/null
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ClientSocketDisconnectTest.java
@@ -0,0 +1,144 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.proto;
+
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.api.BookKeeper;
+import org.apache.bookkeeper.client.api.DigestType;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.net.BookieId;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.apache.bookkeeper.tls.SecurityException;
+import org.apache.bookkeeper.util.EventLoopUtil;
+import org.junit.Assert;
+import org.junit.jupiter.api.Test;
+
+@Slf4j
+public class ClientSocketDisconnectTest extends BookKeeperClusterTestCase {
+
+    public ClientSocketDisconnectTest() {
+        super(1);
+        this.useUUIDasBookieId = true;
+    }
+
+    public static class PerChannelBookieClientDecorator extends 
PerChannelBookieClient {
+
+        private final ThreadCounter threadCounter;
+        private final AtomicInteger failurePredicate = new AtomicInteger();
+
+        public PerChannelBookieClientDecorator(PerChannelBookieClient client, 
BookieId addr, ThreadCounter tCounter)
+                throws SecurityException {
+            super(client.executor, client.eventLoopGroup, addr, 
client.bookieAddressResolver);
+            this.threadCounter = tCounter;
+        }
+
+        // Inject a disconnection per two connections.
+        protected void addChannelListeners(ChannelFuture future, long 
connectStartTime) {
+            future.addListener((ChannelFutureListener) future1 -> {
+                if (failurePredicate.incrementAndGet() % 2 == 1) {
+                    future1.channel().close();
+                }
+            });
+            super.addChannelListeners(future, connectStartTime);
+        }
+
+        // Records the thread who running "PendingAddOp.writeComplete".
+        @Override
+        protected void 
connectIfNeededAndDoOp(BookkeeperInternalCallbacks.GenericCallback<PerChannelBookieClient>
 op) {
+            BookieClientImpl.ChannelReadyForAddEntryCallback callback =
+                    (BookieClientImpl.ChannelReadyForAddEntryCallback) op;
+            BookkeeperInternalCallbacks.WriteCallback originalCallback = 
callback.cb;
+            callback.cb = (rc, ledgerId, entryId, addr, ctx) -> {
+                threadCounter.record();
+                originalCallback.writeComplete(rc, ledgerId, entryId, addr, 
ctx);
+            };
+            super.connectIfNeededAndDoOp(op);
+        }
+    }
+
+    private static class ThreadCounter {
+
+        private final Map<Thread, AtomicInteger> records = new 
ConcurrentHashMap<>();
+
+        public void record() {
+            Thread currentThread = Thread.currentThread();
+            records.computeIfAbsent(currentThread, k -> new AtomicInteger());
+            records.get(currentThread).incrementAndGet();
+        }
+    }
+
+    @Test
+    public void testAddEntriesCallbackWithBKClientThread() throws Exception {
+        // Create BKC and a ledger handle.
+        ClientConfiguration conf = new ClientConfiguration();
+        conf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
+        org.apache.bookkeeper.client.BookKeeper bkc =
+                (org.apache.bookkeeper.client.BookKeeper) 
BookKeeper.newBuilder(conf)
+                .eventLoopGroup(
+                        EventLoopUtil.getClientEventLoopGroup(conf, new 
DefaultThreadFactory("test-io")))
+                .build();
+        final BookieClientImpl bookieClient = (BookieClientImpl) 
bkc.getClientCtx().getBookieClient();
+        LedgerHandle lh = (LedgerHandle) bkc.newCreateLedgerOp()
+                .withEnsembleSize(1)
+                .withWriteQuorumSize(1)
+                .withAckQuorumSize(1)
+                .withDigestType(DigestType.CRC32C)
+                .withPassword(new byte[0])
+                .execute().join();
+
+        // Inject two operations.
+        // 1. Inject a disconnection when connecting successfully.
+        // 2. Records the thread who running "PendingAddOp.writeComplete".
+        final ThreadCounter callbackThreadRecorder = new ThreadCounter();
+        List<BookieId> ensemble = lh.getLedgerMetadata()
+                .getAllEnsembles().entrySet().iterator().next().getValue();
+        DefaultPerChannelBookieClientPool clientPool =
+                (DefaultPerChannelBookieClientPool) 
bookieClient.lookupClient(ensemble.get(0));
+        PerChannelBookieClient[] clients = clientPool.clients;
+
+        // Write 100 entries and wait for finishing.
+        for (int i = 0; i < clients.length; i++) {
+            clients[i] = new PerChannelBookieClientDecorator(clients[i], 
ensemble.get(0), callbackThreadRecorder);
+        }
+        int addCount = 1000;
+        CountDownLatch countDownLatch = new CountDownLatch(addCount);
+        for (int i = 0; i < addCount; i++) {
+            lh.asyncAddEntry(new byte[]{1}, (rc, lh1, entryId, ctx) -> {
+                countDownLatch.countDown();
+            }, i);
+        }
+        countDownLatch.await();
+
+        // Verify: all callback will run in the "BookKeeperClientWorker" 
thread.
+        for (Thread callbackThread : callbackThreadRecorder.records.keySet()) {
+            Assert.assertTrue(callbackThread.getName(), 
callbackThread.getName().startsWith("BookKeeperClientWorker"));
+        }
+    }
+}
\ No newline at end of file

Reply via email to