This is an automated email from the ASF dual-hosted git repository.

ipolyzos pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new cec9650d2 [rpc] Client zero-copy lazy parse ByteBuf to avoid deep 
memory copy (#2948)
cec9650d2 is described below

commit cec9650d26e1a8223c789d63a737277247479e96
Author: Anton Borisov <[email protected]>
AuthorDate: Thu Apr 2 13:50:46 2026 +0100

    [rpc] Client zero-copy lazy parse ByteBuf to avoid deep memory copy (#2948)
    
    * [rpc] Client zero-copy lazy parse ByteBuf to avoid deep memory copy 
(#1184)
    
    * add some notes for future reference
    
    * address comment
    
    * fix CoordinatorHighAvailabilityITCase: remove isInnerClient arg
    
    ---------
    
    Co-authored-by: ipolyzos <[email protected]>
---
 .../org/apache/fluss/client/FlussConnection.java   |   2 +-
 .../table/scanner/log/DefaultCompletedFetch.java   |  29 +++-
 .../fluss/client/table/scanner/log/LogFetcher.java |  41 ++++-
 .../fluss/client/metadata/MetadataUpdaterTest.java |   2 +-
 .../client/metadata/TestingClientSchemaGetter.java |   2 +-
 .../client/metadata/TestingMetadataUpdater.java    |   2 +-
 .../security/acl/FlussAuthorizationITCase.java     |   4 +-
 .../DefaultCompletedFetchBufferLifecycleTest.java  | 165 +++++++++++++++++++++
 .../scanner/log/DefaultCompletedFetchTest.java     |   6 +-
 .../table/scanner/log/LogFetchBufferTest.java      |   3 +-
 .../table/scanner/log/LogFetchCollectorTest.java   |   2 +-
 .../fluss/client/write/RecordAccumulatorTest.java  |   3 +-
 .../org/apache/fluss/rpc/messages/ApiMessage.java  |   4 +-
 .../committer/FlussTableLakeSnapshotCommitter.java |   3 +-
 .../source/enumerator/TieringSourceEnumerator.java |   2 +-
 .../main/java/org/apache/fluss/rpc/RpcClient.java  |   7 +-
 .../apache/fluss/rpc/netty/client/NettyClient.java |  13 +-
 .../fluss/rpc/netty/client/NettyClientHandler.java |  36 +----
 .../fluss/rpc/netty/client/ServerConnection.java   |  11 +-
 .../apache/fluss/rpc/netty/NettyMetricsTest.java   |   2 +-
 .../rpc/netty/authenticate/AuthenticationTest.java |  22 +--
 .../authenticate/SaslAuthenticationITCase.java     |   2 +-
 .../fluss/rpc/netty/client/NettyClientTest.java    |   4 +-
 .../rpc/netty/client/ServerConnectionTest.java     |  20 +--
 .../fluss/rpc/protocol/MessageCodecTest.java       |   2 +-
 .../server/coordinator/CoordinatorServer.java      |   2 +-
 .../apache/fluss/server/tablet/TabletServer.java   |   2 +-
 .../org/apache/fluss/server/ServerITCaseBase.java  |   3 +-
 .../coordinator/CoordinatorChannelManagerTest.java |   3 +-
 .../CoordinatorHighAvailabilityITCase.java         |   2 +-
 .../server/coordinator/TableManagerITCase.java     |   3 +-
 .../coordinator/TestCoordinatorChannelManager.java |   2 +-
 .../statemachine/ReplicaStateMachineTest.java      |   3 +-
 .../statemachine/TableBucketStateMachineTest.java  |   3 +-
 .../fluss/server/replica/ReplicaTestBase.java      |   2 +-
 .../replica/fetcher/ReplicaFetcherThreadTest.java  |   2 +-
 .../server/testutils/FlussClusterExtension.java    |   3 +-
 .../fluss/server/utils/RpcGatewayManagerTest.java  |   2 +-
 38 files changed, 292 insertions(+), 129 deletions(-)

diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/FlussConnection.java 
b/fluss-client/src/main/java/org/apache/fluss/client/FlussConnection.java
index 0990756ce..d98e45869 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/FlussConnection.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/FlussConnection.java
@@ -81,7 +81,7 @@ public final class FlussConnection implements Connection {
         String clientId = conf.getString(ConfigOptions.CLIENT_ID);
         this.metricRegistry = metricRegistry;
         this.clientMetricGroup = new ClientMetricGroup(metricRegistry, 
clientId);
-        this.rpcClient = RpcClient.create(conf, clientMetricGroup, false);
+        this.rpcClient = RpcClient.create(conf, clientMetricGroup);
 
         // TODO this maybe remove after we introduce client metadata.
         this.metadataUpdater = new MetadataUpdater(conf, rpcClient);
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetch.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetch.java
index 95078bf36..8f9cc9639 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetch.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetch.java
@@ -22,6 +22,9 @@ import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.record.LogRecordReadContext;
 import org.apache.fluss.rpc.entity.FetchLogResultForBucket;
 import org.apache.fluss.rpc.messages.FetchLogRequest;
+import org.apache.fluss.shaded.netty4.io.netty.buffer.ByteBuf;
+
+import javax.annotation.Nullable;
 
 /**
  * {@link DefaultCompletedFetch} is a {@link CompletedFetch} that represents a 
completed fetch that
@@ -30,13 +33,21 @@ import org.apache.fluss.rpc.messages.FetchLogRequest;
 @Internal
 class DefaultCompletedFetch extends CompletedFetch {
 
+    /**
+     * The parsed ByteBuf backing the lazily-parsed records data. This 
reference is retained to keep
+     * the underlying network buffer alive while the records are being 
consumed. Released in {@link
+     * #drain()}.
+     */
+    @Nullable private ByteBuf parsedByteBuf;
+
     public DefaultCompletedFetch(
             TableBucket tableBucket,
             FetchLogResultForBucket fetchLogResultForBucket,
             LogRecordReadContext readContext,
             LogScannerStatus logScannerStatus,
             boolean isCheckCrc,
-            Long fetchOffset) {
+            Long fetchOffset,
+            @Nullable ByteBuf parsedByteBuf) {
         super(
                 tableBucket,
                 fetchLogResultForBucket.getError(),
@@ -47,5 +58,21 @@ class DefaultCompletedFetch extends CompletedFetch {
                 logScannerStatus,
                 isCheckCrc,
                 fetchOffset);
+        this.parsedByteBuf = parsedByteBuf;
+    }
+
+    @Override
+    void drain() {
+        try {
+            // Note: if super.drain() throws, isConsumed stays false in the 
parent.
+            // The finally block below still nulls parsedByteBuf after 
releasing it,
+            // so a subsequent drain() call will not double-release the buffer.
+            super.drain();
+        } finally {
+            if (parsedByteBuf != null) {
+                parsedByteBuf.release();
+                parsedByteBuf = null;
+            }
+        }
     }
 }
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java
index c8f87984b..14a2a3d31 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java
@@ -51,6 +51,7 @@ import org.apache.fluss.rpc.messages.PbFetchLogRespForBucket;
 import org.apache.fluss.rpc.messages.PbFetchLogRespForTable;
 import org.apache.fluss.rpc.protocol.ApiError;
 import org.apache.fluss.rpc.protocol.Errors;
+import org.apache.fluss.shaded.netty4.io.netty.buffer.ByteBuf;
 import org.apache.fluss.utils.IOUtils;
 import org.apache.fluss.utils.Projection;
 
@@ -339,6 +340,11 @@ public class LogFetcher implements Closeable {
     /** Implements the core logic for a successful fetch log response. */
     private synchronized void handleFetchLogResponse(
             int destination, long requestStartTime, FetchLogResponse 
fetchLogResponse) {
+        // Capture the parsed ByteBuf for buffer lifecycle management. The 
response may
+        // have been lazily parsed from the network buffer. Each 
DefaultCompletedFetch
+        // that references the buffer's records data must retain it. We 
release the base
+        // reference in the finally block.
+        ByteBuf parsedByteBuf = fetchLogResponse.getParsedByteBuf();
         try {
             if (isClosed) {
                 return;
@@ -384,11 +390,14 @@ public class LogFetcher implements Closeable {
                                     fetchResultForBucket.getHighWatermark());
                         } else {
                             LogRecords logRecords = 
fetchResultForBucket.recordsOrEmpty();
-                            if (!MemoryLogRecords.EMPTY.equals(logRecords)
-                                    || fetchResultForBucket.getErrorCode() != 
Errors.NONE.code()) {
-                                // In oder to not signal notEmptyCondition, 
add completed
-                                // fetch to buffer until log records is not 
empty.
-                                DefaultCompletedFetch completedFetch =
+                            boolean hasRecords = 
!MemoryLogRecords.EMPTY.equals(logRecords);
+                            if (hasRecords) {
+                                // Retain the parsed buffer so it stays alive 
while
+                                // this CompletedFetch's records are being 
consumed.
+                                if (parsedByteBuf != null) {
+                                    parsedByteBuf.retain();
+                                }
+                                logFetchBuffer.add(
                                         new DefaultCompletedFetch(
                                                 tb,
                                                 fetchResultForBucket,
@@ -397,14 +406,32 @@ public class LogFetcher implements Closeable {
                                                 // skipping CRC check if 
projection push downed as
                                                 // the data is pruned
                                                 isCheckCrcs,
-                                                fetchOffset);
-                                logFetchBuffer.add(completedFetch);
+                                                fetchOffset,
+                                                parsedByteBuf));
+                            } else if (fetchResultForBucket.getErrorCode() != 
Errors.NONE.code()) {
+                                // Error-only bucket: no records to back, so no
+                                // buffer reference needed.
+                                logFetchBuffer.add(
+                                        new DefaultCompletedFetch(
+                                                tb,
+                                                fetchResultForBucket,
+                                                readContext,
+                                                logScannerStatus,
+                                                isCheckCrcs,
+                                                fetchOffset,
+                                                null));
                             }
                         }
                     }
                 }
             }
         } finally {
+            // Release the base reference from the network buffer. Any 
CompletedFetch
+            // objects created above hold their own retained references, 
keeping the
+            // buffer alive until they are drained.
+            if (parsedByteBuf != null) {
+                parsedByteBuf.release();
+            }
             LOG.debug("Removing pending request for node: {}", destination);
             nodesWithPendingFetchRequests.remove(destination);
         }
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterTest.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterTest.java
index 6717cfe5a..1cda13b93 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterTest.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterTest.java
@@ -50,7 +50,7 @@ public class MetadataUpdaterTest {
     void testInitializeClusterWithRetries() throws Exception {
         Configuration configuration = new Configuration();
         RpcClient rpcClient =
-                RpcClient.create(configuration, 
TestingClientMetricGroup.newInstance(), false);
+                RpcClient.create(configuration, 
TestingClientMetricGroup.newInstance());
 
         // retry lower than max retry count.
         AdminReadOnlyGateway gateway = new TestingAdminReadOnlyGateway(2);
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingClientSchemaGetter.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingClientSchemaGetter.java
index ffb2c54dc..055017383 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingClientSchemaGetter.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingClientSchemaGetter.java
@@ -38,7 +38,7 @@ public class TestingClientSchemaGetter extends 
ClientSchemaGetter {
                 tablePath,
                 latestSchemaInfo,
                 new FlussAdmin(
-                        RpcClient.create(conf, 
TestingClientMetricGroup.newInstance(), false),
+                        RpcClient.create(conf, 
TestingClientMetricGroup.newInstance()),
                         metadataUpdater));
     }
 
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingMetadataUpdater.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingMetadataUpdater.java
index 206395135..6c67d297e 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingMetadataUpdater.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingMetadataUpdater.java
@@ -70,7 +70,7 @@ public class TestingMetadataUpdater extends MetadataUpdater {
             Map<Integer, TestTabletServerGateway> customGateways,
             Configuration conf) {
         super(
-                RpcClient.create(conf, TestingClientMetricGroup.newInstance(), 
false),
+                RpcClient.create(conf, TestingClientMetricGroup.newInstance()),
                 conf,
                 Cluster.empty());
         initializeCluster(coordinatorServer, tabletServers, tableInfos);
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java
index 8c301c89a..e606052ef 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java
@@ -512,7 +512,7 @@ public class FlussAuthorizationITCase {
                         Collections.singleton(DATA1_TABLE_PATH_PK), null, 
null);
 
         try (RpcClient rpcClient =
-                RpcClient.create(guestConf, 
TestingClientMetricGroup.newInstance(), false)) {
+                RpcClient.create(guestConf, 
TestingClientMetricGroup.newInstance())) {
             AdminGateway guestGateway =
                     GatewayClientProxy.createGatewayProxy(
                             () -> 
FLUSS_CLUSTER_EXTENSION.getCoordinatorServerNode("CLIENT"),
@@ -836,7 +836,7 @@ public class FlussAuthorizationITCase {
                 new 
ControlledShutdownRequest().setTabletServerId(-1).setTabletServerEpoch(-1);
 
         try (RpcClient rpcClient =
-                RpcClient.create(guestConf, 
TestingClientMetricGroup.newInstance(), false)) {
+                RpcClient.create(guestConf, 
TestingClientMetricGroup.newInstance())) {
             CoordinatorGateway guestGateway =
                     GatewayClientProxy.createGatewayProxy(
                             () -> 
FLUSS_CLUSTER_EXTENSION.getCoordinatorServerNode("CLIENT"),
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchBufferLifecycleTest.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchBufferLifecycleTest.java
new file mode 100644
index 000000000..090bc932f
--- /dev/null
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchBufferLifecycleTest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.table.scanner.log;
+
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.record.LogRecordReadContext;
+import org.apache.fluss.rpc.entity.FetchLogResultForBucket;
+import org.apache.fluss.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.fluss.shaded.netty4.io.netty.buffer.Unpooled;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.fluss.record.TestData.DATA1;
+import static org.apache.fluss.record.TestData.DATA1_ROW_TYPE;
+import static org.apache.fluss.record.TestData.DEFAULT_SCHEMA_ID;
+import static org.apache.fluss.record.TestData.TEST_SCHEMA_GETTER;
+import static 
org.apache.fluss.testutils.DataTestUtils.genMemoryLogRecordsByObject;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link DefaultCompletedFetch} zero-copy buffer lifecycle (issue 
#1184). */
+class DefaultCompletedFetchBufferLifecycleTest {
+
+    private final TableBucket tb0 = new TableBucket(1, 0);
+    private final TableBucket tb1 = new TableBucket(1, 1);
+    private LogScannerStatus logScannerStatus;
+    private LogRecordReadContext readContext;
+
+    @BeforeEach
+    void setup() {
+        Map<TableBucket, Long> scanBuckets = new HashMap<>();
+        scanBuckets.put(tb0, 0L);
+        scanBuckets.put(tb1, 0L);
+        logScannerStatus = new LogScannerStatus();
+        logScannerStatus.assignScanBuckets(scanBuckets);
+        readContext =
+                LogRecordReadContext.createArrowReadContext(
+                        DATA1_ROW_TYPE, DEFAULT_SCHEMA_ID, TEST_SCHEMA_GETTER);
+    }
+
+    @AfterEach
+    void afterEach() {
+        if (readContext != null) {
+            readContext.close();
+            readContext = null;
+        }
+    }
+
+    @Test
+    void testBufferReleasedOnDrain() throws Exception {
+        ByteBuf buf = Unpooled.buffer(64);
+        buf.writeBytes(new byte[64]);
+
+        buf.retain();
+        DefaultCompletedFetch fetch = makeCompletedFetch(tb0, buf);
+        buf.release(); // base reference
+        assertThat(buf.refCnt()).isEqualTo(1);
+
+        fetch.drain();
+        assertThat(buf.refCnt()).isEqualTo(0);
+    }
+
+    @Test
+    void testMultipleCompletedFetchShareBuffer() throws Exception {
+        ByteBuf buf = Unpooled.buffer(64);
+        buf.writeBytes(new byte[64]);
+
+        buf.retain();
+        DefaultCompletedFetch fetch1 = makeCompletedFetch(tb0, buf);
+        buf.retain();
+        DefaultCompletedFetch fetch2 = makeCompletedFetch(tb1, buf);
+        buf.release(); // base reference
+        assertThat(buf.refCnt()).isEqualTo(2);
+
+        fetch1.drain();
+        assertThat(buf.refCnt()).isEqualTo(1);
+
+        fetch2.drain();
+        assertThat(buf.refCnt()).isEqualTo(0);
+    }
+
+    @Test
+    void testDrainIsIdempotent() throws Exception {
+        ByteBuf buf = Unpooled.buffer(64);
+        buf.writeBytes(new byte[64]);
+        buf.retain();
+
+        DefaultCompletedFetch fetch = makeCompletedFetch(tb0, buf);
+        buf.release();
+
+        fetch.drain();
+        assertThat(buf.refCnt()).isEqualTo(0);
+        fetch.drain(); // no-op, no exception
+    }
+
+    @Test
+    void testNullBufferHandledGracefully() throws Exception {
+        DefaultCompletedFetch fetch = makeCompletedFetch(tb0, null);
+        fetch.drain();
+    }
+
+    @Test
+    void testLogFetchBufferCloseReleasesBuffer() throws Exception {
+        ByteBuf buf = Unpooled.buffer(64);
+        buf.writeBytes(new byte[64]);
+
+        buf.retain();
+        DefaultCompletedFetch fetch = makeCompletedFetch(tb0, buf);
+        buf.release();
+
+        try (LogFetchBuffer logFetchBuffer = new LogFetchBuffer()) {
+            logFetchBuffer.add(fetch);
+            assertThat(buf.refCnt()).isEqualTo(1);
+        }
+        assertThat(buf.refCnt()).isEqualTo(0);
+    }
+
+    @Test
+    void testRetainAllUnsubscribeReleasesBuffer() throws Exception {
+        ByteBuf buf = Unpooled.buffer(64);
+        buf.writeBytes(new byte[64]);
+
+        buf.retain();
+        DefaultCompletedFetch fetch = makeCompletedFetch(tb0, buf);
+        buf.release();
+
+        try (LogFetchBuffer logFetchBuffer = new LogFetchBuffer()) {
+            logFetchBuffer.add(fetch);
+            logFetchBuffer.retainAll(Collections.singleton(tb1));
+            assertThat(buf.refCnt()).isEqualTo(0);
+        }
+    }
+
+    private DefaultCompletedFetch makeCompletedFetch(TableBucket tableBucket, 
ByteBuf parsedByteBuf)
+            throws Exception {
+        return new DefaultCompletedFetch(
+                tableBucket,
+                new FetchLogResultForBucket(tableBucket, 
genMemoryLogRecordsByObject(DATA1), 10L),
+                readContext,
+                logScannerStatus,
+                true,
+                0L,
+                parsedByteBuf);
+    }
+}
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java
index c4624617d..427f59d4d 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java
@@ -345,7 +345,8 @@ public class DefaultCompletedFetchTest {
                                         tableInfo.getSchemaId(), 
tableInfo.getSchema())),
                         logScannerStatus,
                         true,
-                        fetchOffset);
+                        fetchOffset,
+                        null);
         List<ScanRecord> scanRecords = defaultCompletedFetch.fetchRecords(3);
         // close the read context to release arrow root resource,
         // this is important to test complex types
@@ -398,7 +399,8 @@ public class DefaultCompletedFetchTest {
                         new TestingSchemaGetter(tableInfo.getSchemaId(), 
tableInfo.getSchema())),
                 logScannerStatus,
                 true,
-                offset);
+                offset,
+                null);
     }
 
     private static Collection<Arguments> typeAndMagic() {
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetchBufferTest.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetchBufferTest.java
index 095131b1e..abfd0f1d1 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetchBufferTest.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetchBufferTest.java
@@ -264,7 +264,8 @@ public class LogFetchBufferTest {
                 readContext,
                 logScannerStatus,
                 true,
-                0L);
+                0L,
+                null);
     }
 
     private PendingFetch makePendingFetch(TableBucket tableBucket) throws 
Exception {
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetchCollectorTest.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetchCollectorTest.java
index 0fface6b1..2ce40ce55 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetchCollectorTest.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetchCollectorTest.java
@@ -208,6 +208,6 @@ public class LogFetchCollectorTest {
     private DefaultCompletedFetch makeCompletedFetch(
             TableBucket tableBucket, FetchLogResultForBucket resultForBucket, 
long offset) {
         return new DefaultCompletedFetch(
-                tableBucket, resultForBucket, readContext, logScannerStatus, 
true, offset);
+                tableBucket, resultForBucket, readContext, logScannerStatus, 
true, offset, null);
     }
 }
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java
index d9e2291ce..0e4ab9c97 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java
@@ -678,8 +678,7 @@ class RecordAccumulatorTest {
                         
conf.getInt(ConfigOptions.CLIENT_WRITER_MAX_INFLIGHT_REQUESTS_PER_BUCKET),
                         GatewayClientProxy.createGatewayProxy(
                                 () -> cluster.getRandomTabletServer(),
-                                RpcClient.create(
-                                        conf, 
TestingClientMetricGroup.newInstance(), false),
+                                RpcClient.create(conf, 
TestingClientMetricGroup.newInstance()),
                                 TabletServerGateway.class),
                         null),
                 TestingWriterMetricGroup.newInstance(),
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/rpc/messages/ApiMessage.java 
b/fluss-common/src/main/java/org/apache/fluss/rpc/messages/ApiMessage.java
index 4e58cc02b..55c000e89 100644
--- a/fluss-common/src/main/java/org/apache/fluss/rpc/messages/ApiMessage.java
+++ b/fluss-common/src/main/java/org/apache/fluss/rpc/messages/ApiMessage.java
@@ -52,8 +52,8 @@ public interface ApiMessage {
      * (i.e. zero-copy) only for {@code "[optional|required] bytes records = 
?"} (nested) fields. If
      * there is any lazy deserialization happens, the {@link 
#isLazilyParsed()} returns true.
      *
-     * <p>Note: the current message will hold the reference of {@link 
ByteBuf}, please remember to
-     * release the {@link ByteBuf} until the message has been fully consumed.
+     * <p>Note: the caller takes ownership of the {@link ByteBuf} reference. 
Do not release the
+     * buffer before the message has been fully consumed; release it only 
after processing is done.
      */
     void parseFrom(ByteBuf buffer, int size);
 
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java
index 30c3a4bf3..76474a5b2 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java
@@ -83,8 +83,7 @@ public class FlussTableLakeSnapshotCommitter implements 
AutoCloseable {
         String clientId = flussConf.getString(ConfigOptions.CLIENT_ID);
         MetricRegistry metricRegistry = MetricRegistry.create(flussConf, null);
         // don't care about metrics, but pass a ClientMetricGroup to make 
compiler happy
-        rpcClient =
-                RpcClient.create(flussConf, new 
ClientMetricGroup(metricRegistry, clientId), false);
+        rpcClient = RpcClient.create(flussConf, new 
ClientMetricGroup(metricRegistry, clientId));
         MetadataUpdater metadataUpdater = new MetadataUpdater(flussConf, 
rpcClient);
         this.coordinatorGateway =
                 GatewayClientProxy.createGatewayProxy(
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java
index 337222f4e..e81362fa8 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java
@@ -147,7 +147,7 @@ public class TieringSourceEnumerator
         FlinkMetricRegistry metricRegistry = new 
FlinkMetricRegistry(enumeratorMetricGroup);
         ClientMetricGroup clientMetricGroup =
                 new ClientMetricGroup(metricRegistry, "LakeTieringService");
-        this.rpcClient = RpcClient.create(flussConf, clientMetricGroup, false);
+        this.rpcClient = RpcClient.create(flussConf, clientMetricGroup);
         MetadataUpdater metadataUpdater = new MetadataUpdater(flussConf, 
rpcClient);
         this.coordinatorGateway =
                 GatewayClientProxy.createGatewayProxy(
diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/RpcClient.java 
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/RpcClient.java
index 804058efb..ce4e6fd2d 100644
--- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/RpcClient.java
+++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/RpcClient.java
@@ -37,13 +37,10 @@ public interface RpcClient extends AutoCloseable {
      *
      * @param conf The configuration to use.
      * @param clientMetricGroup The client metric group
-     * @param isInnerClient Whether it is an inner client used for communicate 
from server to
-     *     server.
      * @return The RPC client.
      */
-    static RpcClient create(
-            Configuration conf, ClientMetricGroup clientMetricGroup, boolean 
isInnerClient) {
-        return new NettyClient(conf, clientMetricGroup, isInnerClient);
+    static RpcClient create(Configuration conf, ClientMetricGroup 
clientMetricGroup) {
+        return new NettyClient(conf, clientMetricGroup);
     }
 
     /**
diff --git 
a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/NettyClient.java 
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/NettyClient.java
index f567b8584..ebeab7170 100644
--- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/NettyClient.java
+++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/NettyClient.java
@@ -76,16 +76,9 @@ public final class NettyClient implements RpcClient {
 
     private final Supplier<ClientAuthenticator> authenticatorSupplier;
 
-    /**
-     * Whether the NettyClient is used as inner network client (Communicating 
between Fluss's
-     * servers).
-     */
-    private final boolean isInnerClient;
-
     private volatile boolean isClosed = false;
 
-    public NettyClient(
-            Configuration conf, ClientMetricGroup clientMetricGroup, boolean 
isInnerClient) {
+    public NettyClient(Configuration conf, ClientMetricGroup 
clientMetricGroup) {
         this.connections = MapUtils.newConcurrentHashMap();
 
         // build bootstrap
@@ -106,7 +99,6 @@ public final class NettyClient implements RpcClient {
                         .option(ChannelOption.TCP_NODELAY, true)
                         .option(ChannelOption.SO_KEEPALIVE, true)
                         .handler(new 
ClientChannelInitializer(connectionMaxIdle));
-        this.isInnerClient = isInnerClient;
         this.clientMetricGroup = clientMetricGroup;
         this.authenticatorSupplier = 
AuthenticationFactory.loadClientAuthenticatorSupplier(conf);
         NettyMetrics.registerNettyMetrics(clientMetricGroup, pooledAllocator);
@@ -197,8 +189,7 @@ public final class NettyClient implements RpcClient {
                             node,
                             clientMetricGroup,
                             authenticatorSupplier.get(),
-                            (con, ignore) -> connections.remove(serverId, con),
-                            isInnerClient);
+                            (con, ignore) -> connections.remove(serverId, 
con));
                 });
     }
 
diff --git 
a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/NettyClientHandler.java
 
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/NettyClientHandler.java
index a2c03028d..1eef39f05 100644
--- 
a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/NettyClientHandler.java
+++ 
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/NettyClientHandler.java
@@ -20,12 +20,10 @@ package org.apache.fluss.rpc.netty.client;
 import org.apache.fluss.exception.CorruptMessageException;
 import org.apache.fluss.rpc.messages.ApiMessage;
 import org.apache.fluss.rpc.messages.ErrorResponse;
-import org.apache.fluss.rpc.messages.FetchLogResponse;
 import org.apache.fluss.rpc.protocol.ApiError;
 import org.apache.fluss.rpc.protocol.ApiMethod;
 import org.apache.fluss.rpc.protocol.ResponseType;
 import org.apache.fluss.shaded.netty4.io.netty.buffer.ByteBuf;
-import org.apache.fluss.shaded.netty4.io.netty.buffer.Unpooled;
 import org.apache.fluss.shaded.netty4.io.netty.channel.ChannelHandlerContext;
 import 
org.apache.fluss.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
 import org.apache.fluss.shaded.netty4.io.netty.handler.timeout.IdleState;
@@ -49,15 +47,8 @@ public final class NettyClientHandler extends 
ChannelInboundHandlerAdapter {
 
     private final ClientHandlerCallback callback;
 
-    /**
-     * Whether the NettyClientHandler is used as inner network client 
(Communicating between Fluss's
-     * servers).
-     */
-    private final boolean isInnerClient;
-
-    public NettyClientHandler(ClientHandlerCallback callback, boolean 
isInnerClient) {
+    public NettyClientHandler(ClientHandlerCallback callback) {
         this.callback = callback;
-        this.isInnerClient = isInnerClient;
     }
 
     @Override
@@ -90,26 +81,11 @@ public final class NettyClientHandler extends 
ChannelInboundHandlerAdapter {
                 }
                 ApiMessage response = apiMethod.getResponseConstructor().get();
                 if (response.isLazilyParsed()) {
-                    if (isInnerClient && response instanceof FetchLogResponse) 
{
-                        // For the FetchLogResponse returned by the 
FetchLogRequest sent by the
-                        // follower's TabletServer, we needn't perform an 
unHeap-to-heap memory
-                        // copy to preserve zero-copy capabilities. This 
requires users to manually
-                        // call ApiMessage#getParsedByteBuf().release() to 
release the ByteBuf after
-                        // processing the response.
-                        // TODO for the FetchLogResponse returned by the 
FetchLogRequest sent by the
-                        // Fluss client, We also aim to avoid this memory copy 
operation, traced by
-                        // https://github.com/apache/fluss/issues/1184
-                        response.parseFrom(buffer, messageSize);
-                    } else {
-                        // copy the buffer into a heap buffer, this can avoid 
the network buffer
-                        // being released before the bytes fields of the 
response are lazily parsed.
-                        ByteBuf copiedBuffer = Unpooled.buffer(messageSize, 
messageSize);
-                        copiedBuffer.writeBytes(buffer, messageSize);
-                        // response parsed from the copied buffer can be 
safely cached in user
-                        // queues.
-                        response.parseFrom(copiedBuffer, messageSize);
-                        buffer.release();
-                    }
+                    // Parse lazily from the original buffer without copying. 
The
+                    // consumer is responsible for releasing the buffer via
+                    // ApiMessage#getParsedByteBuf().release() after the 
response
+                    // has been fully consumed.
+                    response.parseFrom(buffer, messageSize);
                 } else {
                     response.parseFrom(buffer, messageSize);
                     // eagerly release the buffer to make the buffer recycle 
faster
diff --git 
a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java
 
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java
index a3fd7afbd..7d4af5333 100644
--- 
a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java
+++ 
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java
@@ -106,8 +106,7 @@ final class ServerConnection {
             ServerNode node,
             ClientMetricGroup clientMetricGroup,
             ClientAuthenticator authenticator,
-            BiConsumer<ServerConnection, Throwable> closeCallback,
-            boolean isInnerClient) {
+            BiConsumer<ServerConnection, Throwable> closeCallback) {
         this.node = node;
         this.state = ConnectionState.CONNECTING;
         this.connectionMetrics = 
clientMetricGroup.createConnectionMetricGroup(node.uid());
@@ -119,7 +118,7 @@ final class ServerConnection {
         // callback is not registered when connection established.
         bootstrap
                 .connect(node.host(), node.port())
-                .addListener(future -> establishConnection((ChannelFuture) 
future, isInnerClient));
+                .addListener(future -> establishConnection((ChannelFuture) 
future));
     }
 
     public ServerNode getServerNode() {
@@ -250,7 +249,7 @@ final class ServerConnection {
 
     // 
------------------------------------------------------------------------------------------
 
-    private void establishConnection(ChannelFuture future, boolean 
isInnerClient) {
+    private void establishConnection(ChannelFuture future) {
         synchronized (lock) {
             if (future.isSuccess()) {
                 if (state.isDisconnected()) {
@@ -262,9 +261,7 @@ final class ServerConnection {
                 LOG.debug("Established connection to server {}.", node);
                 channel = future.channel();
                 channel.pipeline()
-                        .addLast(
-                                "handler",
-                                new NettyClientHandler(new ResponseCallback(), 
isInnerClient));
+                        .addLast("handler", new NettyClientHandler(new 
ResponseCallback()));
                 // start checking api versions
                 switchState(ConnectionState.CHECKING_API_VERSIONS);
                 // TODO: set correct client software name and version, used 
for metrics in server
diff --git 
a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/NettyMetricsTest.java 
b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/NettyMetricsTest.java
index b56c476cd..7b6c26440 100644
--- a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/NettyMetricsTest.java
+++ b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/NettyMetricsTest.java
@@ -73,7 +73,7 @@ public class NettyMetricsTest {
         }
         ClientMetricGroup clientMetricGroup = 
TestingClientMetricGroup.newInstance();
         clientGroup = 
clientMetricGroup.addGroup(NettyMetrics.NETTY_METRIC_GROUP);
-        nettyClient = new NettyClient(conf, clientMetricGroup, false);
+        nettyClient = new NettyClient(conf, clientMetricGroup);
     }
 
     @AfterEach
diff --git 
a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/authenticate/AuthenticationTest.java
 
b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/authenticate/AuthenticationTest.java
index 8ad62d94f..e96fa283a 100644
--- 
a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/authenticate/AuthenticationTest.java
+++ 
b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/authenticate/AuthenticationTest.java
@@ -73,7 +73,7 @@ public class AuthenticationTest {
         clientConfig.setString("client.security.sasl.username", "root");
         clientConfig.setString("client.security.sasl.password", "password");
         try (NettyClient nettyClient =
-                new NettyClient(clientConfig, 
TestingClientMetricGroup.newInstance(), false)) {
+                new NettyClient(clientConfig, 
TestingClientMetricGroup.newInstance())) {
             verifyGetTableNamesList(nettyClient, usernamePasswordServerNode);
         }
     }
@@ -85,14 +85,14 @@ public class AuthenticationTest {
 
         // test normal mutual auth
         try (NettyClient nettyClient =
-                new NettyClient(clientConfig, 
TestingClientMetricGroup.newInstance(), false)) {
+                new NettyClient(clientConfig, 
TestingClientMetricGroup.newInstance())) {
             verifyGetTableNamesList(nettyClient, mutualAuthServerNode);
         }
 
         // test invalid challenge from server
         clientConfig.setString("client.security.mutual.error-type", 
"SERVER_ERROR_CHALLENGE");
         try (NettyClient nettyClient =
-                new NettyClient(clientConfig, 
TestingClientMetricGroup.newInstance(), false)) {
+                new NettyClient(clientConfig, 
TestingClientMetricGroup.newInstance())) {
             assertThatThrownBy(() -> verifyGetTableNamesList(nettyClient, 
mutualAuthServerNode))
                     
.hasRootCauseExactlyInstanceOf(AuthenticationException.class)
                     .rootCause()
@@ -102,7 +102,7 @@ public class AuthenticationTest {
         // test invalid token from client
         clientConfig.setString("client.security.mutual.error-type", 
"CLIENT_ERROR_SECOND_TOKEN");
         try (NettyClient nettyClient =
-                new NettyClient(clientConfig, 
TestingClientMetricGroup.newInstance(), false)) {
+                new NettyClient(clientConfig, 
TestingClientMetricGroup.newInstance())) {
             assertThatThrownBy(() -> verifyGetTableNamesList(nettyClient, 
mutualAuthServerNode))
                     .rootCause()
                     .hasMessageContaining("Invalid token value");
@@ -115,7 +115,7 @@ public class AuthenticationTest {
         clientConfig.set(ConfigOptions.CLIENT_SECURITY_PROTOCOL, "mutual");
         clientConfig.setString("client.security.mutual.error-type", 
"SERVER_NO_CHALLENGE");
         try (NettyClient nettyClient =
-                new NettyClient(clientConfig, 
TestingClientMetricGroup.newInstance(), false)) {
+                new NettyClient(clientConfig, 
TestingClientMetricGroup.newInstance())) {
 
             assertThatThrownBy(() -> verifyGetTableNamesList(nettyClient, 
mutualAuthServerNode))
                     .hasRootCauseExactlyInstanceOf(IllegalStateException.class)
@@ -130,7 +130,7 @@ public class AuthenticationTest {
         clientConfig.set(ConfigOptions.CLIENT_SECURITY_PROTOCOL, "mutual");
         clientConfig.setString("client.security.mutual.error-type", 
"RETRIABLE_EXCEPTION");
         try (NettyClient nettyClient =
-                new NettyClient(clientConfig, 
TestingClientMetricGroup.newInstance(), false)) {
+                new NettyClient(clientConfig, 
TestingClientMetricGroup.newInstance())) {
             verifyGetTableNamesList(nettyClient, mutualAuthServerNode);
         }
     }
@@ -139,7 +139,7 @@ public class AuthenticationTest {
     void testClientLackAuthenticateProtocol() throws Exception {
         Configuration clientConfig = new Configuration();
         try (NettyClient nettyClient =
-                new NettyClient(clientConfig, 
TestingClientMetricGroup.newInstance(), false)) {
+                new NettyClient(clientConfig, 
TestingClientMetricGroup.newInstance())) {
             assertThatThrownBy(
                             () -> verifyGetTableNamesList(nettyClient, 
usernamePasswordServerNode))
                     .cause()
@@ -154,7 +154,7 @@ public class AuthenticationTest {
         Configuration clientConfig = new Configuration();
         clientConfig.set(ConfigOptions.CLIENT_SECURITY_PROTOCOL, "mutual");
         try (NettyClient nettyClient =
-                new NettyClient(clientConfig, 
TestingClientMetricGroup.newInstance(), false)) {
+                new NettyClient(clientConfig, 
TestingClientMetricGroup.newInstance())) {
             assertThatThrownBy(
                             () -> verifyGetTableNamesList(nettyClient, 
usernamePasswordServerNode))
                     .cause()
@@ -172,7 +172,7 @@ public class AuthenticationTest {
         clientConfig.setString("client.security.sasl.username", "root");
         clientConfig.setString("client.security.sasl.password", "password2");
         try (NettyClient nettyClient =
-                new NettyClient(clientConfig, 
TestingClientMetricGroup.newInstance(), false)) {
+                new NettyClient(clientConfig, 
TestingClientMetricGroup.newInstance())) {
             assertThatThrownBy(
                             () -> verifyGetTableNamesList(nettyClient, 
usernamePasswordServerNode))
                     .cause()
@@ -190,12 +190,12 @@ public class AuthenticationTest {
         clientConfig.setString("client.security.sasl.password", "password");
 
         try (NettyClient nettyClient =
-                new NettyClient(clientConfig, 
TestingClientMetricGroup.newInstance(), false)) {
+                new NettyClient(clientConfig, 
TestingClientMetricGroup.newInstance())) {
             verifyGetTableNamesList(nettyClient, usernamePasswordServerNode);
             // client2 with wrong password after client1 successes to 
authenticate.
             clientConfig.setString("client.security.sasl.password", 
"password2");
             try (NettyClient nettyClient2 =
-                    new NettyClient(clientConfig, 
TestingClientMetricGroup.newInstance(), false)) {
+                    new NettyClient(clientConfig, 
TestingClientMetricGroup.newInstance())) {
                 assertThatThrownBy(
                                 () ->
                                         verifyGetTableNamesList(
diff --git 
a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/authenticate/SaslAuthenticationITCase.java
 
b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/authenticate/SaslAuthenticationITCase.java
index a96a826aa..92e837e18 100644
--- 
a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/authenticate/SaslAuthenticationITCase.java
+++ 
b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/authenticate/SaslAuthenticationITCase.java
@@ -206,7 +206,7 @@ public class SaslAuthenticationITCase {
                     new ServerNode(
                             1, "localhost", availablePort1.getPort(), 
ServerType.TABLET_SERVER);
             try (NettyClient nettyClient =
-                    new NettyClient(clientConfig, 
TestingClientMetricGroup.newInstance(), false)) {
+                    new NettyClient(clientConfig, 
TestingClientMetricGroup.newInstance())) {
                 ListTablesRequest request =
                         new 
ListTablesRequest().setDatabaseName("test-database");
                 ListTablesResponse listTablesResponse =
diff --git 
a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/NettyClientTest.java
 
b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/NettyClientTest.java
index 4c0ba7b14..329f5d2f2 100644
--- 
a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/NettyClientTest.java
+++ 
b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/NettyClientTest.java
@@ -70,7 +70,7 @@ final class NettyClientTest {
         conf = new Configuration();
         // 3 worker threads is enough for this test
         conf.setInt(ConfigOptions.NETTY_SERVER_NUM_WORKER_THREADS, 3);
-        nettyClient = new NettyClient(conf, 
TestingClientMetricGroup.newInstance(), false);
+        nettyClient = new NettyClient(conf, 
TestingClientMetricGroup.newInstance());
         buildNettyServer(1);
     }
 
@@ -219,7 +219,7 @@ final class NettyClientTest {
                     .get();
             assertThat(nettyClient.connections().size()).isEqualTo(1);
             try (NettyClient client =
-                    new NettyClient(conf, 
TestingClientMetricGroup.newInstance(), false)) {
+                    new NettyClient(conf, 
TestingClientMetricGroup.newInstance())) {
                 client.sendRequest(
                                 new ServerNode(
                                         2,
diff --git 
a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/ServerConnectionTest.java
 
b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/ServerConnectionTest.java
index 41e4bef2c..8ea452928 100644
--- 
a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/ServerConnectionTest.java
+++ 
b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/ServerConnectionTest.java
@@ -126,8 +126,7 @@ public class ServerConnectionTest {
                         serverNode,
                         TestingClientMetricGroup.newInstance(),
                         clientAuthenticator,
-                        (con, ignore) -> {},
-                        false);
+                        (con, ignore) -> {});
         ConnectionState connectionState = connection.getConnectionState();
         assertThat(connectionState).isEqualTo(ConnectionState.CONNECTING);
 
@@ -157,20 +156,10 @@ public class ServerConnectionTest {
         ClientMetricGroup client = new ClientMetricGroup(metricRegistry, 
"client");
         ServerConnection connection =
                 new ServerConnection(
-                        bootstrap,
-                        serverNode,
-                        client,
-                        clientAuthenticator,
-                        (con, ignore) -> {},
-                        false);
+                        bootstrap, serverNode, client, clientAuthenticator, 
(con, ignore) -> {});
         ServerConnection connection2 =
                 new ServerConnection(
-                        bootstrap,
-                        serverNode2,
-                        client,
-                        clientAuthenticator,
-                        (con, ignore) -> {},
-                        false);
+                        bootstrap, serverNode2, client, clientAuthenticator, 
(con, ignore) -> {});
         LookupRequest request = new LookupRequest().setTableId(1);
         PbLookupReqForBucket pbLookupReqForBucket = request.addBucketsReq();
         pbLookupReqForBucket.setBucketId(1);
@@ -226,8 +215,7 @@ public class ServerConnectionTest {
                         wrongServerTypeNode,
                         TestingClientMetricGroup.newInstance(),
                         clientAuthenticator,
-                        (con, ignore) -> {},
-                        false);
+                        (con, ignore) -> {});
 
         // Pending request will be rejected with InvalidServerTypeException 
which is
         // InvalidRequestException.
diff --git 
a/fluss-rpc/src/test/java/org/apache/fluss/rpc/protocol/MessageCodecTest.java 
b/fluss-rpc/src/test/java/org/apache/fluss/rpc/protocol/MessageCodecTest.java
index 29c9f1489..eb5d492cb 100644
--- 
a/fluss-rpc/src/test/java/org/apache/fluss/rpc/protocol/MessageCodecTest.java
+++ 
b/fluss-rpc/src/test/java/org/apache/fluss/rpc/protocol/MessageCodecTest.java
@@ -62,7 +62,7 @@ class MessageCodecTest {
     @BeforeEach
     void beforeEach() {
         this.responseReceiver = new ResponseReceiver();
-        this.clientHandler = new NettyClientHandler(responseReceiver, false);
+        this.clientHandler = new NettyClientHandler(responseReceiver);
         this.requestChannel = new RequestChannel(100);
         MetricGroup metricGroup = NOPMetricsGroup.newInstance();
         this.serverHandler =
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java
index 55ba87108..25a0d866f 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java
@@ -297,7 +297,7 @@ public class CoordinatorServer extends ServerBase {
 
         synchronized (lock) {
             this.clientMetricGroup = new ClientMetricGroup(metricRegistry, 
SERVER_NAME);
-            this.rpcClient = RpcClient.create(conf, clientMetricGroup, true);
+            this.rpcClient = RpcClient.create(conf, clientMetricGroup);
 
             this.coordinatorChannelManager = new 
CoordinatorChannelManager(rpcClient);
 
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java 
b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java
index 65ab96324..26f45d23c 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java
@@ -240,7 +240,7 @@ public class TabletServer extends ServerBase {
             // to fetch log.
             this.clientMetricGroup =
                     new ClientMetricGroup(metricRegistry, SERVER_NAME + "-" + 
serverId);
-            this.rpcClient = RpcClient.create(conf, clientMetricGroup, true);
+            this.rpcClient = RpcClient.create(conf, clientMetricGroup);
 
             this.coordinatorGateway =
                     GatewayClientProxy.createGatewayProxy(
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/ServerITCaseBase.java 
b/fluss-server/src/test/java/org/apache/fluss/server/ServerITCaseBase.java
index 254d1a4ac..4055613ce 100644
--- a/fluss-server/src/test/java/org/apache/fluss/server/ServerITCaseBase.java
+++ b/fluss-server/src/test/java/org/apache/fluss/server/ServerITCaseBase.java
@@ -126,8 +126,7 @@ public abstract class ServerITCaseBase {
 
     private void testConnectionToServer() throws Exception {
         try (NettyClient client =
-                new NettyClient(
-                        new Configuration(), 
TestingClientMetricGroup.newInstance(), false)) {
+                new NettyClient(new Configuration(), 
TestingClientMetricGroup.newInstance())) {
             RpcGateway gateway =
                     GatewayClientProxy.createGatewayProxy(
                             this::getServerNode, client, getRpcGatewayClass());
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorChannelManagerTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorChannelManagerTest.java
index bd4467c0a..9d8eda15b 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorChannelManagerTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorChannelManagerTest.java
@@ -54,8 +54,7 @@ class CoordinatorChannelManagerTest {
         Configuration configuration = new Configuration();
         CoordinatorChannelManager coordinatorChannelManager =
                 new CoordinatorChannelManager(
-                        RpcClient.create(
-                                configuration, 
TestingClientMetricGroup.newInstance(), false));
+                        RpcClient.create(configuration, 
TestingClientMetricGroup.newInstance()));
         List<ServerNode> tabletServersNode = 
FLUSS_CLUSTER_EXTENSION.getTabletServerNodes();
 
         // test start up using server 0
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorHighAvailabilityITCase.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorHighAvailabilityITCase.java
index 9fa4f2ad2..5d2bcd574 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorHighAvailabilityITCase.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorHighAvailabilityITCase.java
@@ -91,7 +91,7 @@ class CoordinatorHighAvailabilityITCase {
     @BeforeEach
     void setUp() throws Exception {
         Configuration clientConf = new Configuration();
-        rpcClient = RpcClient.create(clientConf, 
TestingClientMetricGroup.newInstance(), false);
+        rpcClient = RpcClient.create(clientConf, 
TestingClientMetricGroup.newInstance());
     }
 
     @AfterEach
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java
index 4233f7c94..9f6c13064 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java
@@ -544,8 +544,7 @@ class TableManagerITCase {
                         configuration,
                         new ClientMetricGroup(
                                 MetricRegistry.create(configuration, null),
-                                "fluss-cluster-extension"),
-                        false)) {
+                                "fluss-cluster-extension"))) {
             ServerNode serverNode =
                     
FLUSS_CLUSTER_EXTENSION.getCoordinatorServerNode(CLIENT_LISTENER);
             AdminGateway adminGatewayForClient =
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorChannelManager.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorChannelManager.java
index 3ed99d6c9..a25c4f4ce 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorChannelManager.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorChannelManager.java
@@ -36,7 +36,7 @@ public class TestCoordinatorChannelManager extends 
CoordinatorChannelManager {
     }
 
     public TestCoordinatorChannelManager(Map<Integer, TabletServerGateway> 
gateways) {
-        super(RpcClient.create(new Configuration(), 
TestingClientMetricGroup.newInstance(), false));
+        super(RpcClient.create(new Configuration(), 
TestingClientMetricGroup.newInstance()));
         this.gateways = gateways;
     }
 
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachineTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachineTest.java
index 70578b614..c7c7591d5 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachineTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachineTest.java
@@ -294,8 +294,7 @@ class ReplicaStateMachineTest {
                         new CoordinatorChannelManager(
                                 RpcClient.create(
                                         new Configuration(),
-                                        TestingClientMetricGroup.newInstance(),
-                                        false)),
+                                        
TestingClientMetricGroup.newInstance())),
                         (event) -> {
                             // do nothing
                         },
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java
index b029c75e9..bbac45c6b 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java
@@ -272,8 +272,7 @@ class TableBucketStateMachineTest {
                         new CoordinatorChannelManager(
                                 RpcClient.create(
                                         new Configuration(),
-                                        TestingClientMetricGroup.newInstance(),
-                                        false)),
+                                        
TestingClientMetricGroup.newInstance())),
                         coordinatorContext,
                         autoPartitionManager,
                         lakeTableTieringManager,
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java
index 4b1474f64..f829ca892 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java
@@ -210,7 +210,7 @@ public class ReplicaTestBase {
                                 new LakeCatalogDynamicLoader(new 
Configuration(), null, true)));
         initMetadataCache(serverMetadataCache);
 
-        rpcClient = RpcClient.create(conf, 
TestingClientMetricGroup.newInstance(), false);
+        rpcClient = RpcClient.create(conf, 
TestingClientMetricGroup.newInstance());
 
         snapshotReporter = new TestingCompletedKvSnapshotCommitter();
 
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThreadTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThreadTest.java
index 7dfef651e..556c5bea4 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThreadTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThreadTest.java
@@ -524,7 +524,7 @@ public class ReplicaFetcherThreadTest {
                                         null,
                                         conf,
                                         new LakeCatalogDynamicLoader(conf, 
null, true))),
-                        RpcClient.create(conf, 
TestingClientMetricGroup.newInstance(), false),
+                        RpcClient.create(conf, 
TestingClientMetricGroup.newInstance()),
                         TestingMetricGroups.TABLET_SERVER_METRICS,
                         manualClock,
                         ioExecutor);
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java
index 37d3f6c46..9498419e0 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java
@@ -219,8 +219,7 @@ public final class FlussClusterExtension
                 RpcClient.create(
                         conf,
                         new ClientMetricGroup(
-                                MetricRegistry.create(conf, null), 
"fluss-cluster-extension"),
-                        false);
+                                MetricRegistry.create(conf, null), 
"fluss-cluster-extension"));
         startCoordinatorServer();
         startTabletServers();
         // wait coordinator knows all tablet servers to make cluster
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/utils/RpcGatewayManagerTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/utils/RpcGatewayManagerTest.java
index f059358a3..8e07921a7 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/utils/RpcGatewayManagerTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/utils/RpcGatewayManagerTest.java
@@ -36,7 +36,7 @@ class RpcGatewayManagerTest {
         RpcGatewayManager<TabletServerGateway> gatewayRpcGatewayManager =
                 new RpcGatewayManager<>(
                         new NettyClient(
-                                new Configuration(), 
TestingClientMetricGroup.newInstance(), false),
+                                new Configuration(), 
TestingClientMetricGroup.newInstance()),
                         TabletServerGateway.class);
 
         ServerNode serverNode1 =

Reply via email to