This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 219054e81 [hotfix] Fix Netty ByteBuf leak on follower fetchLog timeout 
(#2926)
219054e81 is described below

commit 219054e81b822db08f89b34b40102f35d9c8698f
Author: Yang Wang <[email protected]>
AuthorDate: Wed Mar 25 19:19:31 2026 +0800

    [hotfix] Fix Netty ByteBuf leak on follower fetchLog timeout (#2926)
---
 .../replica/fetcher/ReplicaFetcherThread.java      | 40 +++++++++--
 .../replica/fetcher/ReplicaFetcherThreadTest.java  | 82 ++++++++++++++++++++++
 .../replica/fetcher/TestingLeaderEndpoint.java     | 69 +++++++++++++++++-
 3 files changed, 182 insertions(+), 9 deletions(-)

diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThread.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThread.java
index eaf0c0050..f7099140f 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThread.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThread.java
@@ -40,6 +40,7 @@ import org.apache.fluss.server.replica.Replica;
 import org.apache.fluss.server.replica.ReplicaManager;
 import org.apache.fluss.server.replica.fetcher.LeaderEndpoint.FetchData;
 import org.apache.fluss.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.fluss.shaded.netty4.io.netty.util.ReferenceCountUtil;
 import org.apache.fluss.utils.FileUtils;
 import org.apache.fluss.utils.FlussPaths;
 import org.apache.fluss.utils.concurrent.ShutdownableThread;
@@ -60,6 +61,7 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
@@ -82,7 +84,7 @@ final class ReplicaFetcherThread extends ShutdownableThread {
 
     // manually add timout logic in here, todo remove this timeout logic if
     // we support global request timeout in #279
-    private final int timeoutSeconds = 30;
+    private final int timeoutSeconds;
 
     // TODO this range-robin fair map will take effect after we introduce 
fetch response limit size
     // in FetchLogRequest. trace id: FLUSS-56111098
@@ -104,10 +106,20 @@ final class ReplicaFetcherThread extends 
ShutdownableThread {
 
     public ReplicaFetcherThread(
             String name, ReplicaManager replicaManager, LeaderEndpoint leader, 
int fetchBackOffMs) {
+        this(name, replicaManager, leader, fetchBackOffMs, 30);
+    }
+
+    ReplicaFetcherThread(
+            String name,
+            ReplicaManager replicaManager,
+            LeaderEndpoint leader,
+            int fetchBackOffMs,
+            int timeoutSeconds) {
         super(name, false);
         this.replicaManager = replicaManager;
         this.leader = leader;
         this.fetchBackOffMs = fetchBackOffMs;
+        this.timeoutSeconds = timeoutSeconds;
         this.serverMetricGroup = replicaManager.getServerMetricGroup();
     }
 
@@ -217,6 +229,7 @@ final class ReplicaFetcherThread extends ShutdownableThread 
{
         Set<TableBucket> bucketsWithError = new HashSet<>();
         FetchData responseData = null;
         FetchLogRequest fetchLogRequest = fetchLogContext.getFetchLogRequest();
+        CompletableFuture<FetchData> fetchFuture = null;
         try {
             LOG.trace(
                     "Sending fetch log request {} to leader {}",
@@ -224,7 +237,8 @@ final class ReplicaFetcherThread extends ShutdownableThread 
{
                     leader.leaderServerId());
             // TODO this need not blocking to wait fetch log complete, change 
to async, see
             // FLUSS-56115172.
-            responseData = 
leader.fetchLog(fetchLogContext).get(timeoutSeconds, TimeUnit.SECONDS);
+            fetchFuture = leader.fetchLog(fetchLogContext);
+            responseData = fetchFuture.get(timeoutSeconds, TimeUnit.SECONDS);
         } catch (Throwable t) {
             if (isRunning()) {
                 LOG.warn("Error in response for fetch log request {}", 
fetchLogRequest, t);
@@ -232,6 +246,13 @@ final class ReplicaFetcherThread extends 
ShutdownableThread {
                         bucketStatusMapLock,
                         () -> 
bucketsWithError.addAll(fairBucketStatusMap.bucketSet()));
             }
+            // If the fetch timed out but the future may still complete later,
+            // register a callback to release the ByteBuf when the late 
response arrives.
+            // Without this, the pooled ByteBuf held by FetchLogResponse would 
never be
+            // released, causing a Netty direct memory leak.
+            if (fetchFuture != null) {
+                
fetchFuture.thenAccept(ReplicaFetcherThread::releaseFetchDataBuffer);
+            }
         }
 
         if (responseData != null) {
@@ -240,10 +261,7 @@ final class ReplicaFetcherThread extends 
ShutdownableThread {
                 handleFetchLogResponse(responseData.getFetchLogResultMap(), 
bucketsWithError);
             } finally {
                 // release buffer handle by fetchLogResponse.
-                ByteBuf parsedByteBuf = 
responseData.getFetchLogResponse().getParsedByteBuf();
-                if (parsedByteBuf != null) {
-                    parsedByteBuf.release();
-                }
+                releaseFetchDataBuffer(responseData);
                 bucketStatusMapLock.unlock();
             }
         }
@@ -253,6 +271,16 @@ final class ReplicaFetcherThread extends 
ShutdownableThread {
         }
     }
 
+    /** Releases the ByteBuf held by a FetchLogResponse in the given 
FetchData. */
+    private static void releaseFetchDataBuffer(FetchData data) {
+        if (data != null) {
+            ByteBuf parsedByteBuf = 
data.getFetchLogResponse().getParsedByteBuf();
+            if (parsedByteBuf != null) {
+                ReferenceCountUtil.safeRelease(parsedByteBuf);
+            }
+        }
+    }
+
     private void handleFetchLogResponse(
             Map<TableBucket, FetchLogResultForBucket> responseData,
             Set<TableBucket> replicasWithError) {
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThreadTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThreadTest.java
index 0501d0374..7dfef651e 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThreadTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThreadTest.java
@@ -45,6 +45,7 @@ import org.apache.fluss.server.zk.ZooKeeperClient;
 import org.apache.fluss.server.zk.ZooKeeperExtension;
 import org.apache.fluss.server.zk.data.LeaderAndIsr;
 import org.apache.fluss.server.zk.data.TableRegistration;
+import org.apache.fluss.shaded.netty4.io.netty.buffer.ByteBuf;
 import org.apache.fluss.testutils.common.AllCallbackWrapper;
 import org.apache.fluss.utils.clock.Clock;
 import org.apache.fluss.utils.clock.ManualClock;
@@ -68,6 +69,7 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.function.Consumer;
 
 import static org.apache.fluss.record.TestData.DATA1;
@@ -374,6 +376,86 @@ public class ReplicaFetcherThreadTest {
                 () -> 
assertThat(followerReplica.getLocalLogEndOffset()).isEqualTo(30L));
     }
 
+    @Test
+    void testFetchTimeoutReleasesPooledByteBuf() throws Exception {
+        // This test verifies that when a fetchLog RPC times out, the pooled 
ByteBuf
+        // held by the late-arriving FetchLogResponse is properly released.
+        // Without the fix, the ByteBuf would leak, causing Netty direct 
memory growth.
+
+        ScheduledExecutorService scheduler = 
Executors.newSingleThreadScheduledExecutor();
+        try {
+            Configuration conf = new Configuration();
+            ServerNode followerNode =
+                    new ServerNode(
+                            followerServerId,
+                            "localhost",
+                            10001,
+                            ServerType.TABLET_SERVER,
+                            "rack2");
+            TestingLeaderEndpoint testingEndpoint =
+                    new TestingLeaderEndpoint(conf, leaderRM, followerNode);
+
+            // Append records to leader so fetch responses carry actual data
+            CompletableFuture<List<ProduceLogResultForBucket>> future = new 
CompletableFuture<>();
+            leaderRM.appendRecordsToLog(
+                    1000,
+                    1,
+                    Collections.singletonMap(tb, 
genMemoryLogRecordsByObject(DATA1)),
+                    null,
+                    future::complete);
+            assertThat(future.get()).containsOnly(new 
ProduceLogResultForBucket(tb, 0, 10L));
+
+            // Configure the endpoint to delay responses by 3 seconds (longer 
than 1s timeout)
+            testingEndpoint.setFetchDelay(scheduler, 3000);
+
+            // Create a fetcher with a very short timeout (1 second) to 
trigger timeout quickly
+            ReplicaFetcherThread timeoutFetcher =
+                    new ReplicaFetcherThread(
+                            "test-timeout-fetcher",
+                            followerRM,
+                            testingEndpoint,
+                            1000,
+                            1 /* 1 second timeout */);
+
+            timeoutFetcher.addBuckets(
+                    Collections.singletonMap(
+                            tb,
+                            new InitialFetchStatus(
+                                    DATA1_TABLE_ID, DATA1_TABLE_PATH, 
leader.id(), 0L)));
+
+            // Start the fetcher - it will send fetches, each timing out after 
1s,
+            // then the delayed responses arrive after 3s
+            timeoutFetcher.start();
+
+            // Wait until at least one delayed response has been allocated
+            retry(
+                    Duration.ofSeconds(10),
+                    () -> 
assertThat(testingEndpoint.getAllAllocatedByteBufs()).isNotEmpty());
+
+            // Shutdown the fetcher to stop new requests
+            timeoutFetcher.shutdown();
+
+            // Wait until ALL allocated ByteBufs have been released (refCnt == 
0).
+            // The thenAccept callback releases them when late responses 
arrive.
+            retry(
+                    Duration.ofSeconds(15),
+                    () -> {
+                        java.util.List<ByteBuf> allBufs = 
testingEndpoint.getAllAllocatedByteBufs();
+                        for (int i = 0; i < allBufs.size(); i++) {
+                            assertThat(allBufs.get(i).refCnt())
+                                    .as(
+                                            "Pooled ByteBuf #%d should be 
released after"
+                                                    + " fetch timeout. refCnt 
> 0 means the"
+                                                    + " buffer leaked.",
+                                            i)
+                                    .isEqualTo(0);
+                        }
+                    });
+        } finally {
+            scheduler.shutdownNow();
+        }
+    }
+
     private void registerTableInZkClient() throws Exception {
         ZOO_KEEPER_EXTENSION_WRAPPER.getCustomExtension().cleanupRoot();
         zkClient.registerTable(
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/TestingLeaderEndpoint.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/TestingLeaderEndpoint.java
index 67e8b0ac6..b2ffd2892 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/TestingLeaderEndpoint.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/TestingLeaderEndpoint.java
@@ -31,12 +31,16 @@ import org.apache.fluss.server.entity.FetchReqInfo;
 import org.apache.fluss.server.log.FetchParams;
 import org.apache.fluss.server.replica.Replica;
 import org.apache.fluss.server.replica.ReplicaManager;
+import org.apache.fluss.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.fluss.shaded.netty4.io.netty.buffer.PooledByteBufAllocator;
 
 import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import static 
org.apache.fluss.server.utils.ServerRpcMessageUtils.getFetchLogData;
 import static org.apache.fluss.utils.function.ThrowingRunnable.unchecked;
@@ -51,6 +55,18 @@ public class TestingLeaderEndpoint implements LeaderEndpoint 
{
     /** The max fetch size for a bucket in bytes. */
     private final int maxFetchSizeForBucket;
 
+    /**
+     * If set, fetchLog will return a future that completes after the 
specified delay, simulating a
+     * slow leader. The response will carry a pooled ByteBuf to track buffer 
release.
+     */
+    private volatile ScheduledExecutorService delayExecutor;
+
+    private volatile long delayMs;
+
+    /** All ByteBufs allocated for delayed fetch responses, for leak detection 
in tests. */
+    private final java.util.concurrent.CopyOnWriteArrayList<ByteBuf> 
allocatedByteBufs =
+            new java.util.concurrent.CopyOnWriteArrayList<>();
+
     public TestingLeaderEndpoint(
             Configuration conf, ReplicaManager replicaManager, ServerNode 
localNode) {
         this.replicaManager = replicaManager;
@@ -93,12 +109,59 @@ public class TestingLeaderEndpoint implements 
LeaderEndpoint {
                         fetchLogRequest.getFollowerServerId(), 
fetchLogRequest.getMaxBytes()),
                 fetchLogData,
                 null,
-                result ->
-                        response.complete(
-                                new FetchData(new FetchLogResponse(), 
processResult(result))));
+                result -> {
+                    FetchData fetchData = 
createFetchData(processResult(result));
+                    if (delayExecutor != null && delayMs > 0) {
+                        // Simulate a slow leader: complete the future after a 
delay
+                        delayExecutor.schedule(
+                                () -> response.complete(fetchData), delayMs, 
TimeUnit.MILLISECONDS);
+                    } else {
+                        response.complete(fetchData);
+                    }
+                });
         return response;
     }
 
+    /**
+     * Enables delayed fetch responses to simulate a slow/unreachable leader.
+     *
+     * @param executor the scheduler to use for delayed completion
+     * @param delayMs the delay in milliseconds before completing the fetch 
future
+     */
+    public void setFetchDelay(ScheduledExecutorService executor, long delayMs) 
{
+        this.delayExecutor = executor;
+        this.delayMs = delayMs;
+    }
+
+    /** Clears the fetch delay, returning to immediate completion. */
+    public void clearFetchDelay() {
+        this.delayExecutor = null;
+        this.delayMs = 0;
+    }
+
+    /** Returns all pooled ByteBufs allocated for delayed fetch responses. */
+    public java.util.List<ByteBuf> getAllAllocatedByteBufs() {
+        return allocatedByteBufs;
+    }
+
+    /**
+     * Creates a FetchData with a FetchLogResponse that holds a pooled 
ByteBuf, simulating what
+     * NettyClientHandler does for inner client FetchLogResponse (zero-copy 
path).
+     */
+    private FetchData createFetchData(Map<TableBucket, 
FetchLogResultForBucket> resultMap) {
+        FetchLogResponse fetchLogResponse = new FetchLogResponse();
+        if (delayExecutor != null && delayMs > 0) {
+            // Simulate the real NettyClientHandler behavior: the 
FetchLogResponse holds a
+            // pooled ByteBuf that must be manually released via 
getParsedByteBuf().release().
+            ByteBuf pooledBuf = PooledByteBufAllocator.DEFAULT.buffer(64);
+            pooledBuf.writeZero(64);
+            // Parse from the pooled buffer so that getParsedByteBuf() returns 
it
+            fetchLogResponse.parseFrom(pooledBuf, 0);
+            allocatedByteBufs.add(pooledBuf);
+        }
+        return new FetchData(fetchLogResponse, resultMap);
+    }
+
     @Override
     public Optional<FetchLogContext> buildFetchLogContext(
             Map<TableBucket, BucketFetchStatus> replicas) {

Reply via email to