This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 219054e81 [hotfix] Fix Netty ByteBuf leak on follower fetchLog timeout
(#2926)
219054e81 is described below
commit 219054e81b822db08f89b34b40102f35d9c8698f
Author: Yang Wang <[email protected]>
AuthorDate: Wed Mar 25 19:19:31 2026 +0800
[hotfix] Fix Netty ByteBuf leak on follower fetchLog timeout (#2926)
---
.../replica/fetcher/ReplicaFetcherThread.java | 40 +++++++++--
.../replica/fetcher/ReplicaFetcherThreadTest.java | 82 ++++++++++++++++++++++
.../replica/fetcher/TestingLeaderEndpoint.java | 69 +++++++++++++++++-
3 files changed, 182 insertions(+), 9 deletions(-)
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThread.java
b/fluss-server/src/main/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThread.java
index eaf0c0050..f7099140f 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThread.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThread.java
@@ -40,6 +40,7 @@ import org.apache.fluss.server.replica.Replica;
import org.apache.fluss.server.replica.ReplicaManager;
import org.apache.fluss.server.replica.fetcher.LeaderEndpoint.FetchData;
import org.apache.fluss.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.fluss.shaded.netty4.io.netty.util.ReferenceCountUtil;
import org.apache.fluss.utils.FileUtils;
import org.apache.fluss.utils.FlussPaths;
import org.apache.fluss.utils.concurrent.ShutdownableThread;
@@ -60,6 +61,7 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
@@ -82,7 +84,7 @@ final class ReplicaFetcherThread extends ShutdownableThread {
// manually add timout logic in here, todo remove this timeout logic if
// we support global request timeout in #279
- private final int timeoutSeconds = 30;
+ private final int timeoutSeconds;
// TODO this range-robin fair map will take effect after we introduce
fetch response limit size
// in FetchLogRequest. trace id: FLUSS-56111098
@@ -104,10 +106,20 @@ final class ReplicaFetcherThread extends
ShutdownableThread {
public ReplicaFetcherThread(
String name, ReplicaManager replicaManager, LeaderEndpoint leader,
int fetchBackOffMs) {
+ this(name, replicaManager, leader, fetchBackOffMs, 30);
+ }
+
+ ReplicaFetcherThread(
+ String name,
+ ReplicaManager replicaManager,
+ LeaderEndpoint leader,
+ int fetchBackOffMs,
+ int timeoutSeconds) {
super(name, false);
this.replicaManager = replicaManager;
this.leader = leader;
this.fetchBackOffMs = fetchBackOffMs;
+ this.timeoutSeconds = timeoutSeconds;
this.serverMetricGroup = replicaManager.getServerMetricGroup();
}
@@ -217,6 +229,7 @@ final class ReplicaFetcherThread extends ShutdownableThread
{
Set<TableBucket> bucketsWithError = new HashSet<>();
FetchData responseData = null;
FetchLogRequest fetchLogRequest = fetchLogContext.getFetchLogRequest();
+ CompletableFuture<FetchData> fetchFuture = null;
try {
LOG.trace(
"Sending fetch log request {} to leader {}",
@@ -224,7 +237,8 @@ final class ReplicaFetcherThread extends ShutdownableThread
{
leader.leaderServerId());
// TODO this need not blocking to wait fetch log complete, change
to async, see
// FLUSS-56115172.
- responseData =
leader.fetchLog(fetchLogContext).get(timeoutSeconds, TimeUnit.SECONDS);
+ fetchFuture = leader.fetchLog(fetchLogContext);
+ responseData = fetchFuture.get(timeoutSeconds, TimeUnit.SECONDS);
} catch (Throwable t) {
if (isRunning()) {
LOG.warn("Error in response for fetch log request {}",
fetchLogRequest, t);
@@ -232,6 +246,13 @@ final class ReplicaFetcherThread extends
ShutdownableThread {
bucketStatusMapLock,
() ->
bucketsWithError.addAll(fairBucketStatusMap.bucketSet()));
}
+ // If the fetch timed out but the future may still complete later,
+ // register a callback to release the ByteBuf when the late
response arrives.
+ // Without this, the pooled ByteBuf held by FetchLogResponse would
never be
+ // released, causing a Netty direct memory leak.
+ if (fetchFuture != null) {
+
fetchFuture.thenAccept(ReplicaFetcherThread::releaseFetchDataBuffer);
+ }
}
if (responseData != null) {
@@ -240,10 +261,7 @@ final class ReplicaFetcherThread extends
ShutdownableThread {
handleFetchLogResponse(responseData.getFetchLogResultMap(),
bucketsWithError);
} finally {
// release buffer handle by fetchLogResponse.
- ByteBuf parsedByteBuf =
responseData.getFetchLogResponse().getParsedByteBuf();
- if (parsedByteBuf != null) {
- parsedByteBuf.release();
- }
+ releaseFetchDataBuffer(responseData);
bucketStatusMapLock.unlock();
}
}
@@ -253,6 +271,16 @@ final class ReplicaFetcherThread extends
ShutdownableThread {
}
}
+ /** Releases the ByteBuf held by a FetchLogResponse in the given
FetchData. */
+ private static void releaseFetchDataBuffer(FetchData data) {
+ if (data != null) {
+ ByteBuf parsedByteBuf =
data.getFetchLogResponse().getParsedByteBuf();
+ if (parsedByteBuf != null) {
+ ReferenceCountUtil.safeRelease(parsedByteBuf);
+ }
+ }
+ }
+
private void handleFetchLogResponse(
Map<TableBucket, FetchLogResultForBucket> responseData,
Set<TableBucket> replicasWithError) {
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThreadTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThreadTest.java
index 0501d0374..7dfef651e 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThreadTest.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThreadTest.java
@@ -45,6 +45,7 @@ import org.apache.fluss.server.zk.ZooKeeperClient;
import org.apache.fluss.server.zk.ZooKeeperExtension;
import org.apache.fluss.server.zk.data.LeaderAndIsr;
import org.apache.fluss.server.zk.data.TableRegistration;
+import org.apache.fluss.shaded.netty4.io.netty.buffer.ByteBuf;
import org.apache.fluss.testutils.common.AllCallbackWrapper;
import org.apache.fluss.utils.clock.Clock;
import org.apache.fluss.utils.clock.ManualClock;
@@ -68,6 +69,7 @@ import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;
import static org.apache.fluss.record.TestData.DATA1;
@@ -374,6 +376,86 @@ public class ReplicaFetcherThreadTest {
() ->
assertThat(followerReplica.getLocalLogEndOffset()).isEqualTo(30L));
}
+ @Test
+ void testFetchTimeoutReleasesPooledByteBuf() throws Exception {
+ // This test verifies that when a fetchLog RPC times out, the pooled
ByteBuf
+ // held by the late-arriving FetchLogResponse is properly released.
+ // Without the fix, the ByteBuf would leak, causing Netty direct
memory growth.
+
+ ScheduledExecutorService scheduler =
Executors.newSingleThreadScheduledExecutor();
+ try {
+ Configuration conf = new Configuration();
+ ServerNode followerNode =
+ new ServerNode(
+ followerServerId,
+ "localhost",
+ 10001,
+ ServerType.TABLET_SERVER,
+ "rack2");
+ TestingLeaderEndpoint testingEndpoint =
+ new TestingLeaderEndpoint(conf, leaderRM, followerNode);
+
+ // Append records to leader so fetch responses carry actual data
+ CompletableFuture<List<ProduceLogResultForBucket>> future = new
CompletableFuture<>();
+ leaderRM.appendRecordsToLog(
+ 1000,
+ 1,
+ Collections.singletonMap(tb,
genMemoryLogRecordsByObject(DATA1)),
+ null,
+ future::complete);
+ assertThat(future.get()).containsOnly(new
ProduceLogResultForBucket(tb, 0, 10L));
+
+ // Configure the endpoint to delay responses by 3 seconds (longer
than 1s timeout)
+ testingEndpoint.setFetchDelay(scheduler, 3000);
+
+ // Create a fetcher with a very short timeout (1 second) to
trigger timeout quickly
+ ReplicaFetcherThread timeoutFetcher =
+ new ReplicaFetcherThread(
+ "test-timeout-fetcher",
+ followerRM,
+ testingEndpoint,
+ 1000,
+ 1 /* 1 second timeout */);
+
+ timeoutFetcher.addBuckets(
+ Collections.singletonMap(
+ tb,
+ new InitialFetchStatus(
+ DATA1_TABLE_ID, DATA1_TABLE_PATH,
leader.id(), 0L)));
+
+ // Start the fetcher - it will send fetches, each timing out after
1s,
+ // then the delayed responses arrive after 3s
+ timeoutFetcher.start();
+
+ // Wait until at least one delayed response has been allocated
+ retry(
+ Duration.ofSeconds(10),
+ () ->
assertThat(testingEndpoint.getAllAllocatedByteBufs()).isNotEmpty());
+
+ // Shutdown the fetcher to stop new requests
+ timeoutFetcher.shutdown();
+
+ // Wait until ALL allocated ByteBufs have been released (refCnt ==
0).
+ // The thenAccept callback releases them when late responses
arrive.
+ retry(
+ Duration.ofSeconds(15),
+ () -> {
+ java.util.List<ByteBuf> allBufs =
testingEndpoint.getAllAllocatedByteBufs();
+ for (int i = 0; i < allBufs.size(); i++) {
+ assertThat(allBufs.get(i).refCnt())
+ .as(
+ "Pooled ByteBuf #%d should be
released after"
+ + " fetch timeout. refCnt
> 0 means the"
+ + " buffer leaked.",
+ i)
+ .isEqualTo(0);
+ }
+ });
+ } finally {
+ scheduler.shutdownNow();
+ }
+ }
+
private void registerTableInZkClient() throws Exception {
ZOO_KEEPER_EXTENSION_WRAPPER.getCustomExtension().cleanupRoot();
zkClient.registerTable(
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/TestingLeaderEndpoint.java
b/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/TestingLeaderEndpoint.java
index 67e8b0ac6..b2ffd2892 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/TestingLeaderEndpoint.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/TestingLeaderEndpoint.java
@@ -31,12 +31,16 @@ import org.apache.fluss.server.entity.FetchReqInfo;
import org.apache.fluss.server.log.FetchParams;
import org.apache.fluss.server.replica.Replica;
import org.apache.fluss.server.replica.ReplicaManager;
+import org.apache.fluss.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.fluss.shaded.netty4.io.netty.buffer.PooledByteBufAllocator;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import static
org.apache.fluss.server.utils.ServerRpcMessageUtils.getFetchLogData;
import static org.apache.fluss.utils.function.ThrowingRunnable.unchecked;
@@ -51,6 +55,18 @@ public class TestingLeaderEndpoint implements LeaderEndpoint
{
/** The max fetch size for a bucket in bytes. */
private final int maxFetchSizeForBucket;
+ /**
+ * If set, fetchLog will return a future that completes after the
specified delay, simulating a
+ * slow leader. The response will carry a pooled ByteBuf to track buffer
release.
+ */
+ private volatile ScheduledExecutorService delayExecutor;
+
+ private volatile long delayMs;
+
+ /** All ByteBufs allocated for delayed fetch responses, for leak detection
in tests. */
+ private final java.util.concurrent.CopyOnWriteArrayList<ByteBuf>
allocatedByteBufs =
+ new java.util.concurrent.CopyOnWriteArrayList<>();
+
public TestingLeaderEndpoint(
Configuration conf, ReplicaManager replicaManager, ServerNode
localNode) {
this.replicaManager = replicaManager;
@@ -93,12 +109,59 @@ public class TestingLeaderEndpoint implements
LeaderEndpoint {
fetchLogRequest.getFollowerServerId(),
fetchLogRequest.getMaxBytes()),
fetchLogData,
null,
- result ->
- response.complete(
- new FetchData(new FetchLogResponse(),
processResult(result))));
+ result -> {
+ FetchData fetchData =
createFetchData(processResult(result));
+ if (delayExecutor != null && delayMs > 0) {
+ // Simulate a slow leader: complete the future after a
delay
+ delayExecutor.schedule(
+ () -> response.complete(fetchData), delayMs,
TimeUnit.MILLISECONDS);
+ } else {
+ response.complete(fetchData);
+ }
+ });
return response;
}
+ /**
+ * Enables delayed fetch responses to simulate a slow/unreachable leader.
+ *
+ * @param executor the scheduler to use for delayed completion
+ * @param delayMs the delay in milliseconds before completing the fetch
future
+ */
+ public void setFetchDelay(ScheduledExecutorService executor, long delayMs)
{
+ this.delayExecutor = executor;
+ this.delayMs = delayMs;
+ }
+
+ /** Clears the fetch delay, returning to immediate completion. */
+ public void clearFetchDelay() {
+ this.delayExecutor = null;
+ this.delayMs = 0;
+ }
+
+ /** Returns all pooled ByteBufs allocated for delayed fetch responses. */
+ public java.util.List<ByteBuf> getAllAllocatedByteBufs() {
+ return allocatedByteBufs;
+ }
+
+ /**
+ * Creates a FetchData with a FetchLogResponse that holds a pooled
ByteBuf, simulating what
+ * NettyClientHandler does for inner client FetchLogResponse (zero-copy
path).
+ */
+ private FetchData createFetchData(Map<TableBucket,
FetchLogResultForBucket> resultMap) {
+ FetchLogResponse fetchLogResponse = new FetchLogResponse();
+ if (delayExecutor != null && delayMs > 0) {
+ // Simulate the real NettyClientHandler behavior: the
FetchLogResponse holds a
+ // pooled ByteBuf that must be manually released via
getParsedByteBuf().release().
+ ByteBuf pooledBuf = PooledByteBufAllocator.DEFAULT.buffer(64);
+ pooledBuf.writeZero(64);
+ // Parse from the pooled buffer so that getParsedByteBuf() returns
it
+ fetchLogResponse.parseFrom(pooledBuf, 0);
+ allocatedByteBufs.add(pooledBuf);
+ }
+ return new FetchData(fetchLogResponse, resultMap);
+ }
+
@Override
public Optional<FetchLogContext> buildFetchLogContext(
Map<TableBucket, BucketFetchStatus> replicas) {