This is an automated email from the ASF dual-hosted git repository.
yong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 8664dd9629 Fix read thread blocking in sendResponseAndWait causing
READ_ENTRY_REQUEST p99 latency spike (#4730)
8664dd9629 is described below
commit 8664dd962935dc23d40218f1bf228661776580fd
Author: Hang Chen <[email protected]>
AuthorDate: Thu Mar 19 23:37:54 2026 -0700
Fix read thread blocking in sendResponseAndWait causing READ_ENTRY_REQUEST
p99 latency spike (#4730)
* Fix read thread blocking in sendResponseAndWait causing
READ_ENTRY_REQUEST p99 latency spike
* address comments
---
.../bookkeeper/conf/ServerConfiguration.java | 8 +-
.../bookkeeper/proto/PacketProcessorBase.java | 51 ++++--
.../bookkeeper/proto/ReadEntryProcessorTest.java | 178 +++++++++++++++++++++
3 files changed, 220 insertions(+), 17 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index 79daeb3f5d..4f95443e31 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -1075,10 +1075,16 @@ public class ServerConfiguration extends
AbstractConfiguration<ServerConfigurati
/**
* Get max number of reads in progress. 0 == unlimited.
*
+ * <p>This limit bounds the memory used by read responses that have been
read from storage
+ * but not yet flushed to the network. Since read response writes are
non-blocking,
+ * without this limit a slow consumer could cause unbounded memory growth.
+ * The default value of 10000 provides a reasonable balance between
throughput and memory usage.
+ * Tune based on your average entry size: memoryBudget / avgEntrySize.
+ *
* @return Max number of reads in progress.
*/
public int getMaxReadsInProgressLimit() {
- return this.getInt(MAX_READS_IN_PROGRESS_LIMIT, 0);
+ return this.getInt(MAX_READS_IN_PROGRESS_LIMIT, 10000);
}
/**
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
index f0c079de0b..1139fd3130 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
@@ -19,8 +19,8 @@ package org.apache.bookkeeper.proto;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelPromise;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.common.util.MathUtils;
import org.apache.bookkeeper.proto.BookieProtocol.Request;
@@ -74,10 +74,12 @@ abstract class PacketProcessorBase<T extends Request>
implements Runnable {
protected void sendReadReqResponse(int rc, Object response, OpStatsLogger
statsLogger, boolean throttle) {
if (throttle) {
sendResponseAndWait(rc, response, statsLogger);
+ // onReadRequestFinish is called asynchronously in the
ChannelFutureListener
+ // inside sendResponseAndWait to maintain throttling without
blocking the thread.
} else {
sendResponse(rc, response, statsLogger);
+ requestProcessor.onReadRequestFinish();
}
- requestProcessor.onReadRequestFinish();
}
protected void sendResponse(int rc, Object response, OpStatsLogger
statsLogger) {
@@ -150,27 +152,44 @@ abstract class PacketProcessorBase<T extends Request>
implements Runnable {
}
/**
- * Write on the channel and wait until the write is completed.
+ * Write on the channel and notify completion via a listener.
*
- * <p>That will make the thread to get blocked until we're able to
- * write everything on the TCP stack, providing auto-throttling
- * and avoiding using too much memory when handling read-requests.
+ * <p>This provides auto-throttling by holding the read semaphore until
the write completes,
+ * without blocking the read thread pool thread. The read thread is freed
immediately to
+ * process other requests, while the semaphore prevents unbounded read
concurrency.
*/
protected void sendResponseAndWait(int rc, Object response, OpStatsLogger
statsLogger) {
+ // Capture fields before the processor may be recycled after this
method returns.
+ final long capturedEnqueueNanos = this.enqueueNanos;
+ final BookieRequestProcessor processor = this.requestProcessor;
try {
Channel channel = requestHandler.ctx().channel();
ChannelFuture future = channel.writeAndFlush(response);
- if (!channel.eventLoop().inEventLoop()) {
- future.get();
+ future.addListener((ChannelFutureListener) f -> {
+ if (!f.isSuccess() && logger.isDebugEnabled()) {
+ logger.debug("Netty channel write exception. ", f.cause());
+ }
+ if (BookieProtocol.EOK == rc) {
+ statsLogger.registerSuccessfulEvent(
+ MathUtils.elapsedNanos(capturedEnqueueNanos),
TimeUnit.NANOSECONDS);
+ } else {
+ statsLogger.registerFailedEvent(
+ MathUtils.elapsedNanos(capturedEnqueueNanos),
TimeUnit.NANOSECONDS);
+ }
+ processor.onReadRequestFinish();
+ });
+ } catch (Exception e) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Netty channel write exception. ", e);
}
- } catch (ExecutionException | InterruptedException e) {
- logger.debug("Netty channel write exception. ", e);
- return;
- }
- if (BookieProtocol.EOK == rc) {
-
statsLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(enqueueNanos),
TimeUnit.NANOSECONDS);
- } else {
-
statsLogger.registerFailedEvent(MathUtils.elapsedNanos(enqueueNanos),
TimeUnit.NANOSECONDS);
+ if (BookieProtocol.EOK == rc) {
+ statsLogger.registerSuccessfulEvent(
+ MathUtils.elapsedNanos(capturedEnqueueNanos),
TimeUnit.NANOSECONDS);
+ } else {
+ statsLogger.registerFailedEvent(
+ MathUtils.elapsedNanos(capturedEnqueueNanos),
TimeUnit.NANOSECONDS);
+ }
+ processor.onReadRequestFinish();
}
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ReadEntryProcessorTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ReadEntryProcessorTest.java
index 251f900c09..2cdf744f24 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ReadEntryProcessorTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ReadEntryProcessorTest.java
@@ -19,11 +19,13 @@
package org.apache.bookkeeper.proto;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -38,10 +40,12 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.proto.BookieProtocol.ReadRequest;
import org.apache.bookkeeper.proto.BookieProtocol.Response;
import org.apache.bookkeeper.stats.NullStatsLogger;
@@ -195,4 +199,178 @@ public class ReadEntryProcessorTest {
assertEquals(BookieProtocol.READENTRY, response.getOpCode());
assertEquals(BookieProtocol.EOK, response.getErrorCode());
}
+
+ /**
+ * Test that when throttleReadResponses=true and the caller is not in the
Netty event loop,
+ * the read thread is not blocked by the write. onReadRequestFinish()
should only be called
+ * after the write future completes, preserving throttling without
blocking the thread.
+ */
+ @Test
+ public void testThrottledReadNonBlockingOnSuccess() throws Exception {
+ // Setup event loop to simulate read worker thread (not event loop
thread)
+ EventLoop eventLoop = mock(EventLoop.class);
+ when(eventLoop.inEventLoop()).thenReturn(false);
+ doAnswer(inv -> {
+ ((Runnable) inv.getArgument(0)).run();
+ return null;
+ }).when(eventLoop).execute(any(Runnable.class));
+ when(channel.eventLoop()).thenReturn(eventLoop);
+
+ // Use a controllable promise so we can verify deferred behavior
+ DefaultChannelPromise writeFuture = new DefaultChannelPromise(channel);
+ doAnswer(inv ->
writeFuture).when(channel).writeAndFlush(any(Response.class));
+
+ long ledgerId = System.currentTimeMillis();
+ ReadRequest request = ReadRequest.create(
+ BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, 1, (short)
0, new byte[]{});
+ ReadEntryProcessor processor = ReadEntryProcessor.create(
+ request, requestHandler, requestProcessor, null, true /*
throttle */);
+
+ // run() should return immediately without blocking on the write
+ processor.run();
+
+ // Write should have been issued
+ verify(channel, times(1)).writeAndFlush(any(Response.class));
+ // But onReadRequestFinish should NOT have been called yet — write not
completed
+ verify(requestProcessor, never()).onReadRequestFinish();
+
+ // Complete the write
+ writeFuture.setSuccess();
+
+ // Now onReadRequestFinish should have been called
+ verify(requestProcessor, times(1)).onReadRequestFinish();
+ }
+
+ /**
+ * Test that onReadRequestFinish() is still called even when the write
fails,
+ * so the read semaphore is always released.
+ */
+ @Test
+ public void testThrottledReadNonBlockingOnWriteFailure() throws Exception {
+ EventLoop eventLoop = mock(EventLoop.class);
+ when(eventLoop.inEventLoop()).thenReturn(false);
+ doAnswer(inv -> {
+ ((Runnable) inv.getArgument(0)).run();
+ return null;
+ }).when(eventLoop).execute(any(Runnable.class));
+ when(channel.eventLoop()).thenReturn(eventLoop);
+
+ DefaultChannelPromise writeFuture = new DefaultChannelPromise(channel);
+ doAnswer(inv ->
writeFuture).when(channel).writeAndFlush(any(Response.class));
+
+ long ledgerId = System.currentTimeMillis();
+ ReadRequest request = ReadRequest.create(
+ BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, 1, (short)
0, new byte[]{});
+ ReadEntryProcessor processor = ReadEntryProcessor.create(
+ request, requestHandler, requestProcessor, null, true /*
throttle */);
+
+ processor.run();
+
+ verify(channel, times(1)).writeAndFlush(any(Response.class));
+ verify(requestProcessor, never()).onReadRequestFinish();
+
+ // Fail the write
+ writeFuture.setFailure(new IOException("channel write failed"));
+
+ // onReadRequestFinish must still be called to release the read
semaphore
+ verify(requestProcessor, times(1)).onReadRequestFinish();
+ }
+
+ /**
+ * Test that when throttleReadResponses=false, onReadRequestFinish() is
called
+ * synchronously before run() returns.
+ */
+ @Test
+ public void testNonThrottledReadCallsOnFinishSynchronously() throws
Exception {
+ // sendResponse (non-throttle path) uses channel.isActive() and
two-arg writeAndFlush
+ when(channel.isActive()).thenReturn(true);
+ when(channel.writeAndFlush(any(),
any(ChannelPromise.class))).thenReturn(mock(ChannelPromise.class));
+
+ long ledgerId = System.currentTimeMillis();
+ ReadRequest request = ReadRequest.create(
+ BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, 1, (short)
0, new byte[]{});
+ ReadEntryProcessor processor = ReadEntryProcessor.create(
+ request, requestHandler, requestProcessor, null, false /* no
throttle */);
+
+ processor.run();
+
+ verify(channel, times(1)).writeAndFlush(any(),
any(ChannelPromise.class));
+ // onReadRequestFinish should have been called synchronously
+ verify(requestProcessor, times(1)).onReadRequestFinish();
+ }
+
+ /**
+ * Verify that maxReadsInProgressLimit defaults to 10000 (enabled),
+ * ensuring non-blocking read response writes are bounded by default.
+ */
+ @Test
+ public void testDefaultMaxReadsInProgressLimitIsEnabled() {
+ ServerConfiguration conf = new ServerConfiguration();
+ assertEquals("maxReadsInProgressLimit should default to 10000",
+ 10000, conf.getMaxReadsInProgressLimit());
+ }
+
+ /**
+ * Test that the read semaphore is held from request creation until the
write future completes,
+ * not released when the read thread returns. This ensures that
maxReadsInProgressLimit correctly
+ * bounds the number of read responses buffered in memory, even though the
read thread is
+ * non-blocking.
+ */
+ @Test
+ public void testThrottledReadHoldsSemaphoreUntilWriteCompletes() throws
Exception {
+ // Simulate maxReadsInProgressLimit=1 with a real semaphore
+ Semaphore readsSemaphore = new Semaphore(1);
+
+ doAnswer(inv -> {
+ readsSemaphore.acquireUninterruptibly();
+ return null;
+ }).when(requestProcessor).onReadRequestStart(any(Channel.class));
+ doAnswer(inv -> {
+ readsSemaphore.release();
+ return null;
+ }).when(requestProcessor).onReadRequestFinish();
+
+ // Setup non-event-loop thread
+ EventLoop eventLoop = mock(EventLoop.class);
+ when(eventLoop.inEventLoop()).thenReturn(false);
+ doAnswer(inv -> {
+ ((Runnable) inv.getArgument(0)).run();
+ return null;
+ }).when(eventLoop).execute(any(Runnable.class));
+ when(channel.eventLoop()).thenReturn(eventLoop);
+
+ // Controllable write future
+ DefaultChannelPromise writeFuture = new DefaultChannelPromise(channel);
+ doAnswer(inv ->
writeFuture).when(channel).writeAndFlush(any(Response.class));
+
+ long ledgerId = System.currentTimeMillis();
+ ReadRequest request = ReadRequest.create(
+ BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, 1, (short)
0, new byte[]{});
+
+ // create() calls onReadRequestStart → semaphore acquired
+ ReadEntryProcessor processor = ReadEntryProcessor.create(
+ request, requestHandler, requestProcessor, null, true /*
throttle */);
+
+ // Semaphore should be acquired (1 permit used)
+ assertEquals("semaphore should have 0 permits after read started",
+ 0, readsSemaphore.availablePermits());
+
+ // Run the processor — thread returns immediately (non-blocking)
+ processor.run();
+
+ // Semaphore should STILL be held (write not completed)
+ assertEquals("semaphore should still have 0 permits while write is in
progress",
+ 0, readsSemaphore.availablePermits());
+
+ // A second read would be unable to acquire the semaphore
+ assertFalse("second read should not be able to acquire semaphore",
+ readsSemaphore.tryAcquire());
+
+ // Complete the write
+ writeFuture.setSuccess();
+
+ // Now semaphore should be released — a new read can proceed
+ assertEquals("semaphore should have 1 permit after write completes",
+ 1, readsSemaphore.availablePermits());
+ }
}