This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-4.17
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git

commit ff7ea0d7efd69fd2988cfb5c6891e1e28e903f61
Author: Hang Chen <[email protected]>
AuthorDate: Thu Mar 19 23:37:54 2026 -0700

    Fix read thread blocking in sendResponseAndWait causing READ_ENTRY_REQUEST 
p99 latency spike (#4730)
    
    * Fix read thread blocking in sendResponseAndWait causing 
READ_ENTRY_REQUEST p99 latency spike
    
    * address comments
    
    (cherry picked from commit 8664dd962935dc23d40218f1bf228661776580fd)
---
 .../bookkeeper/conf/ServerConfiguration.java       |   8 +-
 .../bookkeeper/proto/PacketProcessorBase.java      |  51 ++++--
 .../bookkeeper/proto/ReadEntryProcessorTest.java   | 178 +++++++++++++++++++++
 3 files changed, 220 insertions(+), 17 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index 4d9eabb241..4bf0ded738 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -1075,10 +1075,16 @@ public class ServerConfiguration extends 
AbstractConfiguration<ServerConfigurati
     /**
      * Get max number of reads in progress. 0 == unlimited.
      *
+     * <p>This limit bounds the memory used by read responses that have been 
read from storage
+     * but not yet flushed to the network. Since read response writes are 
non-blocking,
+     * without this limit a slow consumer could cause unbounded memory growth.
+     * The default value of 10000 provides a reasonable balance between 
throughput and memory usage.
+     * Tune based on your average entry size: memoryBudget / avgEntrySize.
+     *
      * @return Max number of reads in progress.
      */
     public int getMaxReadsInProgressLimit() {
-        return this.getInt(MAX_READS_IN_PROGRESS_LIMIT, 0);
+        return this.getInt(MAX_READS_IN_PROGRESS_LIMIT, 10000);
     }
 
     /**
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
index 889d3790d2..5ecda0355c 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
@@ -19,8 +19,8 @@ package org.apache.bookkeeper.proto;
 
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelPromise;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.proto.BookieProtocol.Request;
 import org.apache.bookkeeper.stats.OpStatsLogger;
@@ -74,10 +74,12 @@ abstract class PacketProcessorBase<T extends Request> 
implements Runnable {
     protected void sendReadReqResponse(int rc, Object response, OpStatsLogger 
statsLogger, boolean throttle) {
         if (throttle) {
             sendResponseAndWait(rc, response, statsLogger);
+            // onReadRequestFinish is called asynchronously in the 
ChannelFutureListener
+            // inside sendResponseAndWait to maintain throttling without 
blocking the thread.
         } else {
             sendResponse(rc, response, statsLogger);
+            requestProcessor.onReadRequestFinish();
         }
-        requestProcessor.onReadRequestFinish();
     }
 
     protected void sendResponse(int rc, Object response, OpStatsLogger 
statsLogger) {
@@ -150,27 +152,44 @@ abstract class PacketProcessorBase<T extends Request> 
implements Runnable {
     }
 
     /**
-     * Write on the channel and wait until the write is completed.
+     * Write on the channel and notify completion via a listener.
      *
-     * <p>That will make the thread to get blocked until we're able to
-     * write everything on the TCP stack, providing auto-throttling
-     * and avoiding using too much memory when handling read-requests.
+     * <p>This provides auto-throttling by holding the read semaphore until 
the write completes,
+     * without blocking the read thread pool thread. The read thread is freed 
immediately to
+     * process other requests, while the semaphore prevents unbounded read 
concurrency.
      */
     protected void sendResponseAndWait(int rc, Object response, OpStatsLogger 
statsLogger) {
+        // Capture fields before the processor may be recycled after this 
method returns.
+        final long capturedEnqueueNanos = this.enqueueNanos;
+        final BookieRequestProcessor processor = this.requestProcessor;
         try {
             Channel channel = requestHandler.ctx().channel();
             ChannelFuture future = channel.writeAndFlush(response);
-            if (!channel.eventLoop().inEventLoop()) {
-                future.get();
+            future.addListener((ChannelFutureListener) f -> {
+                if (!f.isSuccess() && logger.isDebugEnabled()) {
+                    logger.debug("Netty channel write exception. ", f.cause());
+                }
+                if (BookieProtocol.EOK == rc) {
+                    statsLogger.registerSuccessfulEvent(
+                            MathUtils.elapsedNanos(capturedEnqueueNanos), 
TimeUnit.NANOSECONDS);
+                } else {
+                    statsLogger.registerFailedEvent(
+                            MathUtils.elapsedNanos(capturedEnqueueNanos), 
TimeUnit.NANOSECONDS);
+                }
+                processor.onReadRequestFinish();
+            });
+        } catch (Exception e) {
+            if (logger.isDebugEnabled()) {
+                logger.debug("Netty channel write exception. ", e);
             }
-        } catch (ExecutionException | InterruptedException e) {
-            logger.debug("Netty channel write exception. ", e);
-            return;
-        }
-        if (BookieProtocol.EOK == rc) {
-            
statsLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(enqueueNanos), 
TimeUnit.NANOSECONDS);
-        } else {
-            
statsLogger.registerFailedEvent(MathUtils.elapsedNanos(enqueueNanos), 
TimeUnit.NANOSECONDS);
+            if (BookieProtocol.EOK == rc) {
+                statsLogger.registerSuccessfulEvent(
+                        MathUtils.elapsedNanos(capturedEnqueueNanos), 
TimeUnit.NANOSECONDS);
+            } else {
+                statsLogger.registerFailedEvent(
+                        MathUtils.elapsedNanos(capturedEnqueueNanos), 
TimeUnit.NANOSECONDS);
+            }
+            processor.onReadRequestFinish();
         }
     }
 
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ReadEntryProcessorTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ReadEntryProcessorTest.java
index 251f900c09..2cdf744f24 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ReadEntryProcessorTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ReadEntryProcessorTest.java
@@ -19,11 +19,13 @@
 package org.apache.bookkeeper.proto;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -38,10 +40,12 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.bookie.BookieException;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.proto.BookieProtocol.ReadRequest;
 import org.apache.bookkeeper.proto.BookieProtocol.Response;
 import org.apache.bookkeeper.stats.NullStatsLogger;
@@ -195,4 +199,178 @@ public class ReadEntryProcessorTest {
         assertEquals(BookieProtocol.READENTRY, response.getOpCode());
         assertEquals(BookieProtocol.EOK, response.getErrorCode());
     }
+
+    /**
+     * Test that when throttleReadResponses=true and the caller is not in the 
Netty event loop,
+     * the read thread is not blocked by the write. onReadRequestFinish() 
should only be called
+     * after the write future completes, preserving throttling without 
blocking the thread.
+     */
+    @Test
+    public void testThrottledReadNonBlockingOnSuccess() throws Exception {
+        // Setup event loop to simulate read worker thread (not event loop 
thread)
+        EventLoop eventLoop = mock(EventLoop.class);
+        when(eventLoop.inEventLoop()).thenReturn(false);
+        doAnswer(inv -> {
+            ((Runnable) inv.getArgument(0)).run();
+            return null;
+        }).when(eventLoop).execute(any(Runnable.class));
+        when(channel.eventLoop()).thenReturn(eventLoop);
+
+        // Use a controllable promise so we can verify deferred behavior
+        DefaultChannelPromise writeFuture = new DefaultChannelPromise(channel);
+        doAnswer(inv -> 
writeFuture).when(channel).writeAndFlush(any(Response.class));
+
+        long ledgerId = System.currentTimeMillis();
+        ReadRequest request = ReadRequest.create(
+                BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, 1, (short) 
0, new byte[]{});
+        ReadEntryProcessor processor = ReadEntryProcessor.create(
+                request, requestHandler, requestProcessor, null, true /* 
throttle */);
+
+        // run() should return immediately without blocking on the write
+        processor.run();
+
+        // Write should have been issued
+        verify(channel, times(1)).writeAndFlush(any(Response.class));
+        // But onReadRequestFinish should NOT have been called yet — write not 
completed
+        verify(requestProcessor, never()).onReadRequestFinish();
+
+        // Complete the write
+        writeFuture.setSuccess();
+
+        // Now onReadRequestFinish should have been called
+        verify(requestProcessor, times(1)).onReadRequestFinish();
+    }
+
+    /**
+     * Test that onReadRequestFinish() is still called even when the write 
fails,
+     * so the read semaphore is always released.
+     */
+    @Test
+    public void testThrottledReadNonBlockingOnWriteFailure() throws Exception {
+        EventLoop eventLoop = mock(EventLoop.class);
+        when(eventLoop.inEventLoop()).thenReturn(false);
+        doAnswer(inv -> {
+            ((Runnable) inv.getArgument(0)).run();
+            return null;
+        }).when(eventLoop).execute(any(Runnable.class));
+        when(channel.eventLoop()).thenReturn(eventLoop);
+
+        DefaultChannelPromise writeFuture = new DefaultChannelPromise(channel);
+        doAnswer(inv -> 
writeFuture).when(channel).writeAndFlush(any(Response.class));
+
+        long ledgerId = System.currentTimeMillis();
+        ReadRequest request = ReadRequest.create(
+                BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, 1, (short) 
0, new byte[]{});
+        ReadEntryProcessor processor = ReadEntryProcessor.create(
+                request, requestHandler, requestProcessor, null, true /* 
throttle */);
+
+        processor.run();
+
+        verify(channel, times(1)).writeAndFlush(any(Response.class));
+        verify(requestProcessor, never()).onReadRequestFinish();
+
+        // Fail the write
+        writeFuture.setFailure(new IOException("channel write failed"));
+
+        // onReadRequestFinish must still be called to release the read 
semaphore
+        verify(requestProcessor, times(1)).onReadRequestFinish();
+    }
+
+    /**
+     * Test that when throttleReadResponses=false, onReadRequestFinish() is 
called
+     * synchronously before run() returns.
+     */
+    @Test
+    public void testNonThrottledReadCallsOnFinishSynchronously() throws 
Exception {
+        // sendResponse (non-throttle path) uses channel.isActive() and 
two-arg writeAndFlush
+        when(channel.isActive()).thenReturn(true);
+        when(channel.writeAndFlush(any(), 
any(ChannelPromise.class))).thenReturn(mock(ChannelPromise.class));
+
+        long ledgerId = System.currentTimeMillis();
+        ReadRequest request = ReadRequest.create(
+                BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, 1, (short) 
0, new byte[]{});
+        ReadEntryProcessor processor = ReadEntryProcessor.create(
+                request, requestHandler, requestProcessor, null, false /* no 
throttle */);
+
+        processor.run();
+
+        verify(channel, times(1)).writeAndFlush(any(), 
any(ChannelPromise.class));
+        // onReadRequestFinish should have been called synchronously
+        verify(requestProcessor, times(1)).onReadRequestFinish();
+    }
+
+    /**
+     * Verify that maxReadsInProgressLimit defaults to 10000 (enabled),
+     * ensuring non-blocking read response writes are bounded by default.
+     */
+    @Test
+    public void testDefaultMaxReadsInProgressLimitIsEnabled() {
+        ServerConfiguration conf = new ServerConfiguration();
+        assertEquals("maxReadsInProgressLimit should default to 10000",
+                10000, conf.getMaxReadsInProgressLimit());
+    }
+
+    /**
+     * Test that the read semaphore is held from request creation until the 
write future completes,
+     * not released when the read thread returns. This ensures that 
maxReadsInProgressLimit correctly
+     * bounds the number of read responses buffered in memory, even though the 
read thread is
+     * non-blocking.
+     */
+    @Test
+    public void testThrottledReadHoldsSemaphoreUntilWriteCompletes() throws 
Exception {
+        // Simulate maxReadsInProgressLimit=1 with a real semaphore
+        Semaphore readsSemaphore = new Semaphore(1);
+
+        doAnswer(inv -> {
+            readsSemaphore.acquireUninterruptibly();
+            return null;
+        }).when(requestProcessor).onReadRequestStart(any(Channel.class));
+        doAnswer(inv -> {
+            readsSemaphore.release();
+            return null;
+        }).when(requestProcessor).onReadRequestFinish();
+
+        // Setup non-event-loop thread
+        EventLoop eventLoop = mock(EventLoop.class);
+        when(eventLoop.inEventLoop()).thenReturn(false);
+        doAnswer(inv -> {
+            ((Runnable) inv.getArgument(0)).run();
+            return null;
+        }).when(eventLoop).execute(any(Runnable.class));
+        when(channel.eventLoop()).thenReturn(eventLoop);
+
+        // Controllable write future
+        DefaultChannelPromise writeFuture = new DefaultChannelPromise(channel);
+        doAnswer(inv -> 
writeFuture).when(channel).writeAndFlush(any(Response.class));
+
+        long ledgerId = System.currentTimeMillis();
+        ReadRequest request = ReadRequest.create(
+                BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, 1, (short) 
0, new byte[]{});
+
+        // create() calls onReadRequestStart → semaphore acquired
+        ReadEntryProcessor processor = ReadEntryProcessor.create(
+                request, requestHandler, requestProcessor, null, true /* 
throttle */);
+
+        // Semaphore should be acquired (1 permit used)
+        assertEquals("semaphore should have 0 permits after read started",
+                0, readsSemaphore.availablePermits());
+
+        // Run the processor — thread returns immediately (non-blocking)
+        processor.run();
+
+        // Semaphore should STILL be held (write not completed)
+        assertEquals("semaphore should still have 0 permits while write is in 
progress",
+                0, readsSemaphore.availablePermits());
+
+        // A second read would be unable to acquire the semaphore
+        assertFalse("second read should not be able to acquire semaphore",
+                readsSemaphore.tryAcquire());
+
+        // Complete the write
+        writeFuture.setSuccess();
+
+        // Now semaphore should be released — a new read can proceed
+        assertEquals("semaphore should have 1 permit after write completes",
+                1, readsSemaphore.availablePermits());
+    }
 }

Reply via email to