This is an automated email from the ASF dual-hosted git repository.
ipolyzos pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new cec9650d2 [rpc] Client zero-copy lazy parse ByteBuf to avoid deep
memory copy (#2948)
cec9650d2 is described below
commit cec9650d26e1a8223c789d63a737277247479e96
Author: Anton Borisov <[email protected]>
AuthorDate: Thu Apr 2 13:50:46 2026 +0100
[rpc] Client zero-copy lazy parse ByteBuf to avoid deep memory copy (#2948)
* [rpc] Client zero-copy lazy parse ByteBuf to avoid deep memory copy
(#1184)
* add some notes for future reference
* address comment
* fix CoordinatorHighAvailabilityITCase: remove isInnerClient arg
---------
Co-authored-by: ipolyzos <[email protected]>
---
.../org/apache/fluss/client/FlussConnection.java | 2 +-
.../table/scanner/log/DefaultCompletedFetch.java | 29 +++-
.../fluss/client/table/scanner/log/LogFetcher.java | 41 ++++-
.../fluss/client/metadata/MetadataUpdaterTest.java | 2 +-
.../client/metadata/TestingClientSchemaGetter.java | 2 +-
.../client/metadata/TestingMetadataUpdater.java | 2 +-
.../security/acl/FlussAuthorizationITCase.java | 4 +-
.../DefaultCompletedFetchBufferLifecycleTest.java | 165 +++++++++++++++++++++
.../scanner/log/DefaultCompletedFetchTest.java | 6 +-
.../table/scanner/log/LogFetchBufferTest.java | 3 +-
.../table/scanner/log/LogFetchCollectorTest.java | 2 +-
.../fluss/client/write/RecordAccumulatorTest.java | 3 +-
.../org/apache/fluss/rpc/messages/ApiMessage.java | 4 +-
.../committer/FlussTableLakeSnapshotCommitter.java | 3 +-
.../source/enumerator/TieringSourceEnumerator.java | 2 +-
.../main/java/org/apache/fluss/rpc/RpcClient.java | 7 +-
.../apache/fluss/rpc/netty/client/NettyClient.java | 13 +-
.../fluss/rpc/netty/client/NettyClientHandler.java | 36 +----
.../fluss/rpc/netty/client/ServerConnection.java | 11 +-
.../apache/fluss/rpc/netty/NettyMetricsTest.java | 2 +-
.../rpc/netty/authenticate/AuthenticationTest.java | 22 +--
.../authenticate/SaslAuthenticationITCase.java | 2 +-
.../fluss/rpc/netty/client/NettyClientTest.java | 4 +-
.../rpc/netty/client/ServerConnectionTest.java | 20 +--
.../fluss/rpc/protocol/MessageCodecTest.java | 2 +-
.../server/coordinator/CoordinatorServer.java | 2 +-
.../apache/fluss/server/tablet/TabletServer.java | 2 +-
.../org/apache/fluss/server/ServerITCaseBase.java | 3 +-
.../coordinator/CoordinatorChannelManagerTest.java | 3 +-
.../CoordinatorHighAvailabilityITCase.java | 2 +-
.../server/coordinator/TableManagerITCase.java | 3 +-
.../coordinator/TestCoordinatorChannelManager.java | 2 +-
.../statemachine/ReplicaStateMachineTest.java | 3 +-
.../statemachine/TableBucketStateMachineTest.java | 3 +-
.../fluss/server/replica/ReplicaTestBase.java | 2 +-
.../replica/fetcher/ReplicaFetcherThreadTest.java | 2 +-
.../server/testutils/FlussClusterExtension.java | 3 +-
.../fluss/server/utils/RpcGatewayManagerTest.java | 2 +-
38 files changed, 292 insertions(+), 129 deletions(-)
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/FlussConnection.java
b/fluss-client/src/main/java/org/apache/fluss/client/FlussConnection.java
index 0990756ce..d98e45869 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/FlussConnection.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/FlussConnection.java
@@ -81,7 +81,7 @@ public final class FlussConnection implements Connection {
String clientId = conf.getString(ConfigOptions.CLIENT_ID);
this.metricRegistry = metricRegistry;
this.clientMetricGroup = new ClientMetricGroup(metricRegistry,
clientId);
- this.rpcClient = RpcClient.create(conf, clientMetricGroup, false);
+ this.rpcClient = RpcClient.create(conf, clientMetricGroup);
// TODO this maybe remove after we introduce client metadata.
this.metadataUpdater = new MetadataUpdater(conf, rpcClient);
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetch.java
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetch.java
index 95078bf36..8f9cc9639 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetch.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetch.java
@@ -22,6 +22,9 @@ import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.record.LogRecordReadContext;
import org.apache.fluss.rpc.entity.FetchLogResultForBucket;
import org.apache.fluss.rpc.messages.FetchLogRequest;
+import org.apache.fluss.shaded.netty4.io.netty.buffer.ByteBuf;
+
+import javax.annotation.Nullable;
/**
* {@link DefaultCompletedFetch} is a {@link CompletedFetch} that represents a
completed fetch that
@@ -30,13 +33,21 @@ import org.apache.fluss.rpc.messages.FetchLogRequest;
@Internal
class DefaultCompletedFetch extends CompletedFetch {
+ /**
+ * The parsed ByteBuf backing the lazily-parsed records data. This
reference is retained to keep
+ * the underlying network buffer alive while the records are being
consumed. Released in {@link
+ * #drain()}.
+ */
+ @Nullable private ByteBuf parsedByteBuf;
+
public DefaultCompletedFetch(
TableBucket tableBucket,
FetchLogResultForBucket fetchLogResultForBucket,
LogRecordReadContext readContext,
LogScannerStatus logScannerStatus,
boolean isCheckCrc,
- Long fetchOffset) {
+ Long fetchOffset,
+ @Nullable ByteBuf parsedByteBuf) {
super(
tableBucket,
fetchLogResultForBucket.getError(),
@@ -47,5 +58,21 @@ class DefaultCompletedFetch extends CompletedFetch {
logScannerStatus,
isCheckCrc,
fetchOffset);
+ this.parsedByteBuf = parsedByteBuf;
+ }
+
+ @Override
+ void drain() {
+ try {
+ // Note: if super.drain() throws, isConsumed stays false in the
parent.
+ // The finally block below still nulls parsedByteBuf after
releasing it,
+ // so a subsequent drain() call will not double-release the buffer.
+ super.drain();
+ } finally {
+ if (parsedByteBuf != null) {
+ parsedByteBuf.release();
+ parsedByteBuf = null;
+ }
+ }
}
}
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java
index c8f87984b..14a2a3d31 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java
@@ -51,6 +51,7 @@ import org.apache.fluss.rpc.messages.PbFetchLogRespForBucket;
import org.apache.fluss.rpc.messages.PbFetchLogRespForTable;
import org.apache.fluss.rpc.protocol.ApiError;
import org.apache.fluss.rpc.protocol.Errors;
+import org.apache.fluss.shaded.netty4.io.netty.buffer.ByteBuf;
import org.apache.fluss.utils.IOUtils;
import org.apache.fluss.utils.Projection;
@@ -339,6 +340,11 @@ public class LogFetcher implements Closeable {
/** Implements the core logic for a successful fetch log response. */
private synchronized void handleFetchLogResponse(
int destination, long requestStartTime, FetchLogResponse
fetchLogResponse) {
+ // Capture the parsed ByteBuf for buffer lifecycle management. The
response may
+ // have been lazily parsed from the network buffer. Each
DefaultCompletedFetch
+ // that references the buffer's records data must retain it. We
release the base
+ // reference in the finally block.
+ ByteBuf parsedByteBuf = fetchLogResponse.getParsedByteBuf();
try {
if (isClosed) {
return;
@@ -384,11 +390,14 @@ public class LogFetcher implements Closeable {
fetchResultForBucket.getHighWatermark());
} else {
LogRecords logRecords =
fetchResultForBucket.recordsOrEmpty();
- if (!MemoryLogRecords.EMPTY.equals(logRecords)
- || fetchResultForBucket.getErrorCode() !=
Errors.NONE.code()) {
- // In oder to not signal notEmptyCondition,
add completed
- // fetch to buffer until log records is not
empty.
- DefaultCompletedFetch completedFetch =
+ boolean hasRecords =
!MemoryLogRecords.EMPTY.equals(logRecords);
+ if (hasRecords) {
+ // Retain the parsed buffer so it stays alive
while
+ // this CompletedFetch's records are being
consumed.
+ if (parsedByteBuf != null) {
+ parsedByteBuf.retain();
+ }
+ logFetchBuffer.add(
new DefaultCompletedFetch(
tb,
fetchResultForBucket,
@@ -397,14 +406,32 @@ public class LogFetcher implements Closeable {
// skipping CRC check if
projection push downed as
// the data is pruned
isCheckCrcs,
- fetchOffset);
- logFetchBuffer.add(completedFetch);
+ fetchOffset,
+ parsedByteBuf));
+ } else if (fetchResultForBucket.getErrorCode() !=
Errors.NONE.code()) {
+ // Error-only bucket: no records to back, so no
+ // buffer reference needed.
+ logFetchBuffer.add(
+ new DefaultCompletedFetch(
+ tb,
+ fetchResultForBucket,
+ readContext,
+ logScannerStatus,
+ isCheckCrcs,
+ fetchOffset,
+ null));
}
}
}
}
}
} finally {
+ // Release the base reference from the network buffer. Any
CompletedFetch
+ // objects created above hold their own retained references,
keeping the
+ // buffer alive until they are drained.
+ if (parsedByteBuf != null) {
+ parsedByteBuf.release();
+ }
LOG.debug("Removing pending request for node: {}", destination);
nodesWithPendingFetchRequests.remove(destination);
}
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterTest.java
b/fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterTest.java
index 6717cfe5a..1cda13b93 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterTest.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterTest.java
@@ -50,7 +50,7 @@ public class MetadataUpdaterTest {
void testInitializeClusterWithRetries() throws Exception {
Configuration configuration = new Configuration();
RpcClient rpcClient =
- RpcClient.create(configuration,
TestingClientMetricGroup.newInstance(), false);
+ RpcClient.create(configuration,
TestingClientMetricGroup.newInstance());
// retry lower than max retry count.
AdminReadOnlyGateway gateway = new TestingAdminReadOnlyGateway(2);
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingClientSchemaGetter.java
b/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingClientSchemaGetter.java
index ffb2c54dc..055017383 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingClientSchemaGetter.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingClientSchemaGetter.java
@@ -38,7 +38,7 @@ public class TestingClientSchemaGetter extends
ClientSchemaGetter {
tablePath,
latestSchemaInfo,
new FlussAdmin(
- RpcClient.create(conf,
TestingClientMetricGroup.newInstance(), false),
+ RpcClient.create(conf,
TestingClientMetricGroup.newInstance()),
metadataUpdater));
}
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingMetadataUpdater.java
b/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingMetadataUpdater.java
index 206395135..6c67d297e 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingMetadataUpdater.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingMetadataUpdater.java
@@ -70,7 +70,7 @@ public class TestingMetadataUpdater extends MetadataUpdater {
Map<Integer, TestTabletServerGateway> customGateways,
Configuration conf) {
super(
- RpcClient.create(conf, TestingClientMetricGroup.newInstance(),
false),
+ RpcClient.create(conf, TestingClientMetricGroup.newInstance()),
conf,
Cluster.empty());
initializeCluster(coordinatorServer, tabletServers, tableInfos);
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java
b/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java
index 8c301c89a..e606052ef 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java
@@ -512,7 +512,7 @@ public class FlussAuthorizationITCase {
Collections.singleton(DATA1_TABLE_PATH_PK), null,
null);
try (RpcClient rpcClient =
- RpcClient.create(guestConf,
TestingClientMetricGroup.newInstance(), false)) {
+ RpcClient.create(guestConf,
TestingClientMetricGroup.newInstance())) {
AdminGateway guestGateway =
GatewayClientProxy.createGatewayProxy(
() ->
FLUSS_CLUSTER_EXTENSION.getCoordinatorServerNode("CLIENT"),
@@ -836,7 +836,7 @@ public class FlussAuthorizationITCase {
new
ControlledShutdownRequest().setTabletServerId(-1).setTabletServerEpoch(-1);
try (RpcClient rpcClient =
- RpcClient.create(guestConf,
TestingClientMetricGroup.newInstance(), false)) {
+ RpcClient.create(guestConf,
TestingClientMetricGroup.newInstance())) {
CoordinatorGateway guestGateway =
GatewayClientProxy.createGatewayProxy(
() ->
FLUSS_CLUSTER_EXTENSION.getCoordinatorServerNode("CLIENT"),
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchBufferLifecycleTest.java
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchBufferLifecycleTest.java
new file mode 100644
index 000000000..090bc932f
--- /dev/null
+++
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchBufferLifecycleTest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.table.scanner.log;
+
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.record.LogRecordReadContext;
+import org.apache.fluss.rpc.entity.FetchLogResultForBucket;
+import org.apache.fluss.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.fluss.shaded.netty4.io.netty.buffer.Unpooled;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.fluss.record.TestData.DATA1;
+import static org.apache.fluss.record.TestData.DATA1_ROW_TYPE;
+import static org.apache.fluss.record.TestData.DEFAULT_SCHEMA_ID;
+import static org.apache.fluss.record.TestData.TEST_SCHEMA_GETTER;
+import static
org.apache.fluss.testutils.DataTestUtils.genMemoryLogRecordsByObject;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link DefaultCompletedFetch} zero-copy buffer lifecycle (issue
#1184). */
+class DefaultCompletedFetchBufferLifecycleTest {
+
+ private final TableBucket tb0 = new TableBucket(1, 0);
+ private final TableBucket tb1 = new TableBucket(1, 1);
+ private LogScannerStatus logScannerStatus;
+ private LogRecordReadContext readContext;
+
+ @BeforeEach
+ void setup() {
+ Map<TableBucket, Long> scanBuckets = new HashMap<>();
+ scanBuckets.put(tb0, 0L);
+ scanBuckets.put(tb1, 0L);
+ logScannerStatus = new LogScannerStatus();
+ logScannerStatus.assignScanBuckets(scanBuckets);
+ readContext =
+ LogRecordReadContext.createArrowReadContext(
+ DATA1_ROW_TYPE, DEFAULT_SCHEMA_ID, TEST_SCHEMA_GETTER);
+ }
+
+ @AfterEach
+ void afterEach() {
+ if (readContext != null) {
+ readContext.close();
+ readContext = null;
+ }
+ }
+
+ @Test
+ void testBufferReleasedOnDrain() throws Exception {
+ ByteBuf buf = Unpooled.buffer(64);
+ buf.writeBytes(new byte[64]);
+
+ buf.retain();
+ DefaultCompletedFetch fetch = makeCompletedFetch(tb0, buf);
+ buf.release(); // base reference
+ assertThat(buf.refCnt()).isEqualTo(1);
+
+ fetch.drain();
+ assertThat(buf.refCnt()).isEqualTo(0);
+ }
+
+ @Test
+ void testMultipleCompletedFetchShareBuffer() throws Exception {
+ ByteBuf buf = Unpooled.buffer(64);
+ buf.writeBytes(new byte[64]);
+
+ buf.retain();
+ DefaultCompletedFetch fetch1 = makeCompletedFetch(tb0, buf);
+ buf.retain();
+ DefaultCompletedFetch fetch2 = makeCompletedFetch(tb1, buf);
+ buf.release(); // base reference
+ assertThat(buf.refCnt()).isEqualTo(2);
+
+ fetch1.drain();
+ assertThat(buf.refCnt()).isEqualTo(1);
+
+ fetch2.drain();
+ assertThat(buf.refCnt()).isEqualTo(0);
+ }
+
+ @Test
+ void testDrainIsIdempotent() throws Exception {
+ ByteBuf buf = Unpooled.buffer(64);
+ buf.writeBytes(new byte[64]);
+ buf.retain();
+
+ DefaultCompletedFetch fetch = makeCompletedFetch(tb0, buf);
+ buf.release();
+
+ fetch.drain();
+ assertThat(buf.refCnt()).isEqualTo(0);
+ fetch.drain(); // no-op, no exception
+ }
+
+ @Test
+ void testNullBufferHandledGracefully() throws Exception {
+ DefaultCompletedFetch fetch = makeCompletedFetch(tb0, null);
+ fetch.drain();
+ }
+
+ @Test
+ void testLogFetchBufferCloseReleasesBuffer() throws Exception {
+ ByteBuf buf = Unpooled.buffer(64);
+ buf.writeBytes(new byte[64]);
+
+ buf.retain();
+ DefaultCompletedFetch fetch = makeCompletedFetch(tb0, buf);
+ buf.release();
+
+ try (LogFetchBuffer logFetchBuffer = new LogFetchBuffer()) {
+ logFetchBuffer.add(fetch);
+ assertThat(buf.refCnt()).isEqualTo(1);
+ }
+ assertThat(buf.refCnt()).isEqualTo(0);
+ }
+
+ @Test
+ void testRetainAllUnsubscribeReleasesBuffer() throws Exception {
+ ByteBuf buf = Unpooled.buffer(64);
+ buf.writeBytes(new byte[64]);
+
+ buf.retain();
+ DefaultCompletedFetch fetch = makeCompletedFetch(tb0, buf);
+ buf.release();
+
+ try (LogFetchBuffer logFetchBuffer = new LogFetchBuffer()) {
+ logFetchBuffer.add(fetch);
+ logFetchBuffer.retainAll(Collections.singleton(tb1));
+ assertThat(buf.refCnt()).isEqualTo(0);
+ }
+ }
+
+ private DefaultCompletedFetch makeCompletedFetch(TableBucket tableBucket,
ByteBuf parsedByteBuf)
+ throws Exception {
+ return new DefaultCompletedFetch(
+ tableBucket,
+ new FetchLogResultForBucket(tableBucket,
genMemoryLogRecordsByObject(DATA1), 10L),
+ readContext,
+ logScannerStatus,
+ true,
+ 0L,
+ parsedByteBuf);
+ }
+}
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java
index c4624617d..427f59d4d 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java
@@ -345,7 +345,8 @@ public class DefaultCompletedFetchTest {
tableInfo.getSchemaId(),
tableInfo.getSchema())),
logScannerStatus,
true,
- fetchOffset);
+ fetchOffset,
+ null);
List<ScanRecord> scanRecords = defaultCompletedFetch.fetchRecords(3);
// close the read context to release arrow root resource,
// this is important to test complex types
@@ -398,7 +399,8 @@ public class DefaultCompletedFetchTest {
new TestingSchemaGetter(tableInfo.getSchemaId(),
tableInfo.getSchema())),
logScannerStatus,
true,
- offset);
+ offset,
+ null);
}
private static Collection<Arguments> typeAndMagic() {
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetchBufferTest.java
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetchBufferTest.java
index 095131b1e..abfd0f1d1 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetchBufferTest.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetchBufferTest.java
@@ -264,7 +264,8 @@ public class LogFetchBufferTest {
readContext,
logScannerStatus,
true,
- 0L);
+ 0L,
+ null);
}
private PendingFetch makePendingFetch(TableBucket tableBucket) throws
Exception {
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetchCollectorTest.java
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetchCollectorTest.java
index 0fface6b1..2ce40ce55 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetchCollectorTest.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetchCollectorTest.java
@@ -208,6 +208,6 @@ public class LogFetchCollectorTest {
private DefaultCompletedFetch makeCompletedFetch(
TableBucket tableBucket, FetchLogResultForBucket resultForBucket,
long offset) {
return new DefaultCompletedFetch(
- tableBucket, resultForBucket, readContext, logScannerStatus,
true, offset);
+ tableBucket, resultForBucket, readContext, logScannerStatus,
true, offset, null);
}
}
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java
b/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java
index d9e2291ce..0e4ab9c97 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java
@@ -678,8 +678,7 @@ class RecordAccumulatorTest {
conf.getInt(ConfigOptions.CLIENT_WRITER_MAX_INFLIGHT_REQUESTS_PER_BUCKET),
GatewayClientProxy.createGatewayProxy(
() -> cluster.getRandomTabletServer(),
- RpcClient.create(
- conf,
TestingClientMetricGroup.newInstance(), false),
+ RpcClient.create(conf,
TestingClientMetricGroup.newInstance()),
TabletServerGateway.class),
null),
TestingWriterMetricGroup.newInstance(),
diff --git
a/fluss-common/src/main/java/org/apache/fluss/rpc/messages/ApiMessage.java
b/fluss-common/src/main/java/org/apache/fluss/rpc/messages/ApiMessage.java
index 4e58cc02b..55c000e89 100644
--- a/fluss-common/src/main/java/org/apache/fluss/rpc/messages/ApiMessage.java
+++ b/fluss-common/src/main/java/org/apache/fluss/rpc/messages/ApiMessage.java
@@ -52,8 +52,8 @@ public interface ApiMessage {
* (i.e. zero-copy) only for {@code "[optional|required] bytes records =
?"} (nested) fields. If
* there is any lazy deserialization happens, the {@link
#isLazilyParsed()} returns true.
*
- * <p>Note: the current message will hold the reference of {@link
ByteBuf}, please remember to
- * release the {@link ByteBuf} until the message has been fully consumed.
+ * <p>Note: the caller takes ownership of the {@link ByteBuf} reference.
Do not release the
+ * buffer before the message has been fully consumed; release it only
after processing is done.
*/
void parseFrom(ByteBuf buffer, int size);
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java
index 30c3a4bf3..76474a5b2 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java
@@ -83,8 +83,7 @@ public class FlussTableLakeSnapshotCommitter implements
AutoCloseable {
String clientId = flussConf.getString(ConfigOptions.CLIENT_ID);
MetricRegistry metricRegistry = MetricRegistry.create(flussConf, null);
// don't care about metrics, but pass a ClientMetricGroup to make
compiler happy
- rpcClient =
- RpcClient.create(flussConf, new
ClientMetricGroup(metricRegistry, clientId), false);
+ rpcClient = RpcClient.create(flussConf, new
ClientMetricGroup(metricRegistry, clientId));
MetadataUpdater metadataUpdater = new MetadataUpdater(flussConf,
rpcClient);
this.coordinatorGateway =
GatewayClientProxy.createGatewayProxy(
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java
index 337222f4e..e81362fa8 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java
@@ -147,7 +147,7 @@ public class TieringSourceEnumerator
FlinkMetricRegistry metricRegistry = new
FlinkMetricRegistry(enumeratorMetricGroup);
ClientMetricGroup clientMetricGroup =
new ClientMetricGroup(metricRegistry, "LakeTieringService");
- this.rpcClient = RpcClient.create(flussConf, clientMetricGroup, false);
+ this.rpcClient = RpcClient.create(flussConf, clientMetricGroup);
MetadataUpdater metadataUpdater = new MetadataUpdater(flussConf,
rpcClient);
this.coordinatorGateway =
GatewayClientProxy.createGatewayProxy(
diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/RpcClient.java
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/RpcClient.java
index 804058efb..ce4e6fd2d 100644
--- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/RpcClient.java
+++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/RpcClient.java
@@ -37,13 +37,10 @@ public interface RpcClient extends AutoCloseable {
*
* @param conf The configuration to use.
* @param clientMetricGroup The client metric group
- * @param isInnerClient Whether it is an inner client used for communicate
from server to
- * server.
* @return The RPC client.
*/
- static RpcClient create(
- Configuration conf, ClientMetricGroup clientMetricGroup, boolean
isInnerClient) {
- return new NettyClient(conf, clientMetricGroup, isInnerClient);
+ static RpcClient create(Configuration conf, ClientMetricGroup
clientMetricGroup) {
+ return new NettyClient(conf, clientMetricGroup);
}
/**
diff --git
a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/NettyClient.java
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/NettyClient.java
index f567b8584..ebeab7170 100644
--- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/NettyClient.java
+++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/NettyClient.java
@@ -76,16 +76,9 @@ public final class NettyClient implements RpcClient {
private final Supplier<ClientAuthenticator> authenticatorSupplier;
- /**
- * Whether the NettyClient is used as inner network client (Communicating
between Fluss's
- * servers).
- */
- private final boolean isInnerClient;
-
private volatile boolean isClosed = false;
- public NettyClient(
- Configuration conf, ClientMetricGroup clientMetricGroup, boolean
isInnerClient) {
+ public NettyClient(Configuration conf, ClientMetricGroup
clientMetricGroup) {
this.connections = MapUtils.newConcurrentHashMap();
// build bootstrap
@@ -106,7 +99,6 @@ public final class NettyClient implements RpcClient {
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new
ClientChannelInitializer(connectionMaxIdle));
- this.isInnerClient = isInnerClient;
this.clientMetricGroup = clientMetricGroup;
this.authenticatorSupplier =
AuthenticationFactory.loadClientAuthenticatorSupplier(conf);
NettyMetrics.registerNettyMetrics(clientMetricGroup, pooledAllocator);
@@ -197,8 +189,7 @@ public final class NettyClient implements RpcClient {
node,
clientMetricGroup,
authenticatorSupplier.get(),
- (con, ignore) -> connections.remove(serverId, con),
- isInnerClient);
+ (con, ignore) -> connections.remove(serverId,
con));
});
}
diff --git
a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/NettyClientHandler.java
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/NettyClientHandler.java
index a2c03028d..1eef39f05 100644
---
a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/NettyClientHandler.java
+++
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/NettyClientHandler.java
@@ -20,12 +20,10 @@ package org.apache.fluss.rpc.netty.client;
import org.apache.fluss.exception.CorruptMessageException;
import org.apache.fluss.rpc.messages.ApiMessage;
import org.apache.fluss.rpc.messages.ErrorResponse;
-import org.apache.fluss.rpc.messages.FetchLogResponse;
import org.apache.fluss.rpc.protocol.ApiError;
import org.apache.fluss.rpc.protocol.ApiMethod;
import org.apache.fluss.rpc.protocol.ResponseType;
import org.apache.fluss.shaded.netty4.io.netty.buffer.ByteBuf;
-import org.apache.fluss.shaded.netty4.io.netty.buffer.Unpooled;
import org.apache.fluss.shaded.netty4.io.netty.channel.ChannelHandlerContext;
import
org.apache.fluss.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
import org.apache.fluss.shaded.netty4.io.netty.handler.timeout.IdleState;
@@ -49,15 +47,8 @@ public final class NettyClientHandler extends
ChannelInboundHandlerAdapter {
private final ClientHandlerCallback callback;
- /**
- * Whether the NettyClientHandler is used as inner network client
(Communicating between Fluss's
- * servers).
- */
- private final boolean isInnerClient;
-
- public NettyClientHandler(ClientHandlerCallback callback, boolean
isInnerClient) {
+ public NettyClientHandler(ClientHandlerCallback callback) {
this.callback = callback;
- this.isInnerClient = isInnerClient;
}
@Override
@@ -90,26 +81,11 @@ public final class NettyClientHandler extends
ChannelInboundHandlerAdapter {
}
ApiMessage response = apiMethod.getResponseConstructor().get();
if (response.isLazilyParsed()) {
- if (isInnerClient && response instanceof FetchLogResponse)
{
- // For the FetchLogResponse returned by the
FetchLogRequest sent by the
- // follower's TabletServer, we needn't perform an
unHeap-to-heap memory
- // copy to preserve zero-copy capabilities. This
requires users to manually
- // call ApiMessage#getParsedByteBuf().release() to
release the ByteBuf after
- // processing the response.
- // TODO for the FetchLogResponse returned by the
FetchLogRequest sent by the
- // Fluss client, We also aim to avoid this memory copy
operation, traced by
- // https://github.com/apache/fluss/issues/1184
- response.parseFrom(buffer, messageSize);
- } else {
- // copy the buffer into a heap buffer, this can avoid
the network buffer
- // being released before the bytes fields of the
response are lazily parsed.
- ByteBuf copiedBuffer = Unpooled.buffer(messageSize,
messageSize);
- copiedBuffer.writeBytes(buffer, messageSize);
- // response parsed from the copied buffer can be
safely cached in user
- // queues.
- response.parseFrom(copiedBuffer, messageSize);
- buffer.release();
- }
+ // Parse lazily from the original buffer without copying.
The
+ // consumer is responsible for releasing the buffer via
+ // ApiMessage#getParsedByteBuf().release() after the
response
+ // has been fully consumed.
+ response.parseFrom(buffer, messageSize);
} else {
response.parseFrom(buffer, messageSize);
// eagerly release the buffer to make the buffer recycle
faster
diff --git
a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java
index a3fd7afbd..7d4af5333 100644
---
a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java
+++
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java
@@ -106,8 +106,7 @@ final class ServerConnection {
ServerNode node,
ClientMetricGroup clientMetricGroup,
ClientAuthenticator authenticator,
- BiConsumer<ServerConnection, Throwable> closeCallback,
- boolean isInnerClient) {
+ BiConsumer<ServerConnection, Throwable> closeCallback) {
this.node = node;
this.state = ConnectionState.CONNECTING;
this.connectionMetrics =
clientMetricGroup.createConnectionMetricGroup(node.uid());
@@ -119,7 +118,7 @@ final class ServerConnection {
// callback is not registered when connection established.
bootstrap
.connect(node.host(), node.port())
- .addListener(future -> establishConnection((ChannelFuture)
future, isInnerClient));
+ .addListener(future -> establishConnection((ChannelFuture)
future));
}
public ServerNode getServerNode() {
@@ -250,7 +249,7 @@ final class ServerConnection {
//
------------------------------------------------------------------------------------------
- private void establishConnection(ChannelFuture future, boolean
isInnerClient) {
+ private void establishConnection(ChannelFuture future) {
synchronized (lock) {
if (future.isSuccess()) {
if (state.isDisconnected()) {
@@ -262,9 +261,7 @@ final class ServerConnection {
LOG.debug("Established connection to server {}.", node);
channel = future.channel();
channel.pipeline()
- .addLast(
- "handler",
- new NettyClientHandler(new ResponseCallback(),
isInnerClient));
+ .addLast("handler", new NettyClientHandler(new
ResponseCallback()));
// start checking api versions
switchState(ConnectionState.CHECKING_API_VERSIONS);
// TODO: set correct client software name and version, used
for metrics in server
diff --git
a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/NettyMetricsTest.java
b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/NettyMetricsTest.java
index b56c476cd..7b6c26440 100644
--- a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/NettyMetricsTest.java
+++ b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/NettyMetricsTest.java
@@ -73,7 +73,7 @@ public class NettyMetricsTest {
}
ClientMetricGroup clientMetricGroup =
TestingClientMetricGroup.newInstance();
clientGroup =
clientMetricGroup.addGroup(NettyMetrics.NETTY_METRIC_GROUP);
- nettyClient = new NettyClient(conf, clientMetricGroup, false);
+ nettyClient = new NettyClient(conf, clientMetricGroup);
}
@AfterEach
diff --git
a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/authenticate/AuthenticationTest.java
b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/authenticate/AuthenticationTest.java
index 8ad62d94f..e96fa283a 100644
---
a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/authenticate/AuthenticationTest.java
+++
b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/authenticate/AuthenticationTest.java
@@ -73,7 +73,7 @@ public class AuthenticationTest {
clientConfig.setString("client.security.sasl.username", "root");
clientConfig.setString("client.security.sasl.password", "password");
try (NettyClient nettyClient =
- new NettyClient(clientConfig,
TestingClientMetricGroup.newInstance(), false)) {
+ new NettyClient(clientConfig,
TestingClientMetricGroup.newInstance())) {
verifyGetTableNamesList(nettyClient, usernamePasswordServerNode);
}
}
@@ -85,14 +85,14 @@ public class AuthenticationTest {
// test normal mutual auth
try (NettyClient nettyClient =
- new NettyClient(clientConfig,
TestingClientMetricGroup.newInstance(), false)) {
+ new NettyClient(clientConfig,
TestingClientMetricGroup.newInstance())) {
verifyGetTableNamesList(nettyClient, mutualAuthServerNode);
}
// test invalid challenge from server
clientConfig.setString("client.security.mutual.error-type",
"SERVER_ERROR_CHALLENGE");
try (NettyClient nettyClient =
- new NettyClient(clientConfig,
TestingClientMetricGroup.newInstance(), false)) {
+ new NettyClient(clientConfig,
TestingClientMetricGroup.newInstance())) {
assertThatThrownBy(() -> verifyGetTableNamesList(nettyClient,
mutualAuthServerNode))
.hasRootCauseExactlyInstanceOf(AuthenticationException.class)
.rootCause()
@@ -102,7 +102,7 @@ public class AuthenticationTest {
// test invalid token from client
clientConfig.setString("client.security.mutual.error-type",
"CLIENT_ERROR_SECOND_TOKEN");
try (NettyClient nettyClient =
- new NettyClient(clientConfig,
TestingClientMetricGroup.newInstance(), false)) {
+ new NettyClient(clientConfig,
TestingClientMetricGroup.newInstance())) {
assertThatThrownBy(() -> verifyGetTableNamesList(nettyClient,
mutualAuthServerNode))
.rootCause()
.hasMessageContaining("Invalid token value");
@@ -115,7 +115,7 @@ public class AuthenticationTest {
clientConfig.set(ConfigOptions.CLIENT_SECURITY_PROTOCOL, "mutual");
clientConfig.setString("client.security.mutual.error-type",
"SERVER_NO_CHALLENGE");
try (NettyClient nettyClient =
- new NettyClient(clientConfig,
TestingClientMetricGroup.newInstance(), false)) {
+ new NettyClient(clientConfig,
TestingClientMetricGroup.newInstance())) {
assertThatThrownBy(() -> verifyGetTableNamesList(nettyClient,
mutualAuthServerNode))
.hasRootCauseExactlyInstanceOf(IllegalStateException.class)
@@ -130,7 +130,7 @@ public class AuthenticationTest {
clientConfig.set(ConfigOptions.CLIENT_SECURITY_PROTOCOL, "mutual");
clientConfig.setString("client.security.mutual.error-type",
"RETRIABLE_EXCEPTION");
try (NettyClient nettyClient =
- new NettyClient(clientConfig,
TestingClientMetricGroup.newInstance(), false)) {
+ new NettyClient(clientConfig,
TestingClientMetricGroup.newInstance())) {
verifyGetTableNamesList(nettyClient, mutualAuthServerNode);
}
}
@@ -139,7 +139,7 @@ public class AuthenticationTest {
void testClientLackAuthenticateProtocol() throws Exception {
Configuration clientConfig = new Configuration();
try (NettyClient nettyClient =
- new NettyClient(clientConfig,
TestingClientMetricGroup.newInstance(), false)) {
+ new NettyClient(clientConfig,
TestingClientMetricGroup.newInstance())) {
assertThatThrownBy(
() -> verifyGetTableNamesList(nettyClient,
usernamePasswordServerNode))
.cause()
@@ -154,7 +154,7 @@ public class AuthenticationTest {
Configuration clientConfig = new Configuration();
clientConfig.set(ConfigOptions.CLIENT_SECURITY_PROTOCOL, "mutual");
try (NettyClient nettyClient =
- new NettyClient(clientConfig,
TestingClientMetricGroup.newInstance(), false)) {
+ new NettyClient(clientConfig,
TestingClientMetricGroup.newInstance())) {
assertThatThrownBy(
() -> verifyGetTableNamesList(nettyClient,
usernamePasswordServerNode))
.cause()
@@ -172,7 +172,7 @@ public class AuthenticationTest {
clientConfig.setString("client.security.sasl.username", "root");
clientConfig.setString("client.security.sasl.password", "password2");
try (NettyClient nettyClient =
- new NettyClient(clientConfig,
TestingClientMetricGroup.newInstance(), false)) {
+ new NettyClient(clientConfig,
TestingClientMetricGroup.newInstance())) {
assertThatThrownBy(
() -> verifyGetTableNamesList(nettyClient,
usernamePasswordServerNode))
.cause()
@@ -190,12 +190,12 @@ public class AuthenticationTest {
clientConfig.setString("client.security.sasl.password", "password");
try (NettyClient nettyClient =
- new NettyClient(clientConfig,
TestingClientMetricGroup.newInstance(), false)) {
+ new NettyClient(clientConfig,
TestingClientMetricGroup.newInstance())) {
verifyGetTableNamesList(nettyClient, usernamePasswordServerNode);
// client2 with wrong password after client1 successes to
authenticate.
clientConfig.setString("client.security.sasl.password",
"password2");
try (NettyClient nettyClient2 =
- new NettyClient(clientConfig,
TestingClientMetricGroup.newInstance(), false)) {
+ new NettyClient(clientConfig,
TestingClientMetricGroup.newInstance())) {
assertThatThrownBy(
() ->
verifyGetTableNamesList(
diff --git
a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/authenticate/SaslAuthenticationITCase.java
b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/authenticate/SaslAuthenticationITCase.java
index a96a826aa..92e837e18 100644
---
a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/authenticate/SaslAuthenticationITCase.java
+++
b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/authenticate/SaslAuthenticationITCase.java
@@ -206,7 +206,7 @@ public class SaslAuthenticationITCase {
new ServerNode(
1, "localhost", availablePort1.getPort(),
ServerType.TABLET_SERVER);
try (NettyClient nettyClient =
- new NettyClient(clientConfig,
TestingClientMetricGroup.newInstance(), false)) {
+ new NettyClient(clientConfig,
TestingClientMetricGroup.newInstance())) {
ListTablesRequest request =
new
ListTablesRequest().setDatabaseName("test-database");
ListTablesResponse listTablesResponse =
diff --git
a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/NettyClientTest.java
b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/NettyClientTest.java
index 4c0ba7b14..329f5d2f2 100644
---
a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/NettyClientTest.java
+++
b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/NettyClientTest.java
@@ -70,7 +70,7 @@ final class NettyClientTest {
conf = new Configuration();
// 3 worker threads is enough for this test
conf.setInt(ConfigOptions.NETTY_SERVER_NUM_WORKER_THREADS, 3);
- nettyClient = new NettyClient(conf,
TestingClientMetricGroup.newInstance(), false);
+ nettyClient = new NettyClient(conf,
TestingClientMetricGroup.newInstance());
buildNettyServer(1);
}
@@ -219,7 +219,7 @@ final class NettyClientTest {
.get();
assertThat(nettyClient.connections().size()).isEqualTo(1);
try (NettyClient client =
- new NettyClient(conf,
TestingClientMetricGroup.newInstance(), false)) {
+ new NettyClient(conf,
TestingClientMetricGroup.newInstance())) {
client.sendRequest(
new ServerNode(
2,
diff --git
a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/ServerConnectionTest.java
b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/ServerConnectionTest.java
index 41e4bef2c..8ea452928 100644
---
a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/ServerConnectionTest.java
+++
b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/ServerConnectionTest.java
@@ -126,8 +126,7 @@ public class ServerConnectionTest {
serverNode,
TestingClientMetricGroup.newInstance(),
clientAuthenticator,
- (con, ignore) -> {},
- false);
+ (con, ignore) -> {});
ConnectionState connectionState = connection.getConnectionState();
assertThat(connectionState).isEqualTo(ConnectionState.CONNECTING);
@@ -157,20 +156,10 @@ public class ServerConnectionTest {
ClientMetricGroup client = new ClientMetricGroup(metricRegistry,
"client");
ServerConnection connection =
new ServerConnection(
- bootstrap,
- serverNode,
- client,
- clientAuthenticator,
- (con, ignore) -> {},
- false);
+ bootstrap, serverNode, client, clientAuthenticator,
(con, ignore) -> {});
ServerConnection connection2 =
new ServerConnection(
- bootstrap,
- serverNode2,
- client,
- clientAuthenticator,
- (con, ignore) -> {},
- false);
+ bootstrap, serverNode2, client, clientAuthenticator,
(con, ignore) -> {});
LookupRequest request = new LookupRequest().setTableId(1);
PbLookupReqForBucket pbLookupReqForBucket = request.addBucketsReq();
pbLookupReqForBucket.setBucketId(1);
@@ -226,8 +215,7 @@ public class ServerConnectionTest {
wrongServerTypeNode,
TestingClientMetricGroup.newInstance(),
clientAuthenticator,
- (con, ignore) -> {},
- false);
+ (con, ignore) -> {});
// Pending request will be rejected with InvalidServerTypeException
which is
// InvalidRequestException.
diff --git
a/fluss-rpc/src/test/java/org/apache/fluss/rpc/protocol/MessageCodecTest.java
b/fluss-rpc/src/test/java/org/apache/fluss/rpc/protocol/MessageCodecTest.java
index 29c9f1489..eb5d492cb 100644
---
a/fluss-rpc/src/test/java/org/apache/fluss/rpc/protocol/MessageCodecTest.java
+++
b/fluss-rpc/src/test/java/org/apache/fluss/rpc/protocol/MessageCodecTest.java
@@ -62,7 +62,7 @@ class MessageCodecTest {
@BeforeEach
void beforeEach() {
this.responseReceiver = new ResponseReceiver();
- this.clientHandler = new NettyClientHandler(responseReceiver, false);
+ this.clientHandler = new NettyClientHandler(responseReceiver);
this.requestChannel = new RequestChannel(100);
MetricGroup metricGroup = NOPMetricsGroup.newInstance();
this.serverHandler =
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java
index 55ba87108..25a0d866f 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java
@@ -297,7 +297,7 @@ public class CoordinatorServer extends ServerBase {
synchronized (lock) {
this.clientMetricGroup = new ClientMetricGroup(metricRegistry,
SERVER_NAME);
- this.rpcClient = RpcClient.create(conf, clientMetricGroup, true);
+ this.rpcClient = RpcClient.create(conf, clientMetricGroup);
this.coordinatorChannelManager = new
CoordinatorChannelManager(rpcClient);
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java
b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java
index 65ab96324..26f45d23c 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java
@@ -240,7 +240,7 @@ public class TabletServer extends ServerBase {
// to fetch log.
this.clientMetricGroup =
new ClientMetricGroup(metricRegistry, SERVER_NAME + "-" +
serverId);
- this.rpcClient = RpcClient.create(conf, clientMetricGroup, true);
+ this.rpcClient = RpcClient.create(conf, clientMetricGroup);
this.coordinatorGateway =
GatewayClientProxy.createGatewayProxy(
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/ServerITCaseBase.java
b/fluss-server/src/test/java/org/apache/fluss/server/ServerITCaseBase.java
index 254d1a4ac..4055613ce 100644
--- a/fluss-server/src/test/java/org/apache/fluss/server/ServerITCaseBase.java
+++ b/fluss-server/src/test/java/org/apache/fluss/server/ServerITCaseBase.java
@@ -126,8 +126,7 @@ public abstract class ServerITCaseBase {
private void testConnectionToServer() throws Exception {
try (NettyClient client =
- new NettyClient(
- new Configuration(),
TestingClientMetricGroup.newInstance(), false)) {
+ new NettyClient(new Configuration(),
TestingClientMetricGroup.newInstance())) {
RpcGateway gateway =
GatewayClientProxy.createGatewayProxy(
this::getServerNode, client, getRpcGatewayClass());
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorChannelManagerTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorChannelManagerTest.java
index bd4467c0a..9d8eda15b 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorChannelManagerTest.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorChannelManagerTest.java
@@ -54,8 +54,7 @@ class CoordinatorChannelManagerTest {
Configuration configuration = new Configuration();
CoordinatorChannelManager coordinatorChannelManager =
new CoordinatorChannelManager(
- RpcClient.create(
- configuration,
TestingClientMetricGroup.newInstance(), false));
+ RpcClient.create(configuration,
TestingClientMetricGroup.newInstance()));
List<ServerNode> tabletServersNode =
FLUSS_CLUSTER_EXTENSION.getTabletServerNodes();
// test start up using server 0
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorHighAvailabilityITCase.java
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorHighAvailabilityITCase.java
index 9fa4f2ad2..5d2bcd574 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorHighAvailabilityITCase.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorHighAvailabilityITCase.java
@@ -91,7 +91,7 @@ class CoordinatorHighAvailabilityITCase {
@BeforeEach
void setUp() throws Exception {
Configuration clientConf = new Configuration();
- rpcClient = RpcClient.create(clientConf,
TestingClientMetricGroup.newInstance(), false);
+ rpcClient = RpcClient.create(clientConf,
TestingClientMetricGroup.newInstance());
}
@AfterEach
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java
index 4233f7c94..9f6c13064 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java
@@ -544,8 +544,7 @@ class TableManagerITCase {
configuration,
new ClientMetricGroup(
MetricRegistry.create(configuration, null),
- "fluss-cluster-extension"),
- false)) {
+ "fluss-cluster-extension"))) {
ServerNode serverNode =
FLUSS_CLUSTER_EXTENSION.getCoordinatorServerNode(CLIENT_LISTENER);
AdminGateway adminGatewayForClient =
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorChannelManager.java
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorChannelManager.java
index 3ed99d6c9..a25c4f4ce 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorChannelManager.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorChannelManager.java
@@ -36,7 +36,7 @@ public class TestCoordinatorChannelManager extends
CoordinatorChannelManager {
}
public TestCoordinatorChannelManager(Map<Integer, TabletServerGateway>
gateways) {
- super(RpcClient.create(new Configuration(),
TestingClientMetricGroup.newInstance(), false));
+ super(RpcClient.create(new Configuration(),
TestingClientMetricGroup.newInstance()));
this.gateways = gateways;
}
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachineTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachineTest.java
index 70578b614..c7c7591d5 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachineTest.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachineTest.java
@@ -294,8 +294,7 @@ class ReplicaStateMachineTest {
new CoordinatorChannelManager(
RpcClient.create(
new Configuration(),
- TestingClientMetricGroup.newInstance(),
- false)),
+
TestingClientMetricGroup.newInstance())),
(event) -> {
// do nothing
},
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java
index b029c75e9..bbac45c6b 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java
@@ -272,8 +272,7 @@ class TableBucketStateMachineTest {
new CoordinatorChannelManager(
RpcClient.create(
new Configuration(),
- TestingClientMetricGroup.newInstance(),
- false)),
+
TestingClientMetricGroup.newInstance())),
coordinatorContext,
autoPartitionManager,
lakeTableTieringManager,
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java
index 4b1474f64..f829ca892 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java
@@ -210,7 +210,7 @@ public class ReplicaTestBase {
new LakeCatalogDynamicLoader(new
Configuration(), null, true)));
initMetadataCache(serverMetadataCache);
- rpcClient = RpcClient.create(conf,
TestingClientMetricGroup.newInstance(), false);
+ rpcClient = RpcClient.create(conf,
TestingClientMetricGroup.newInstance());
snapshotReporter = new TestingCompletedKvSnapshotCommitter();
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThreadTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThreadTest.java
index 7dfef651e..556c5bea4 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThreadTest.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThreadTest.java
@@ -524,7 +524,7 @@ public class ReplicaFetcherThreadTest {
null,
conf,
new LakeCatalogDynamicLoader(conf,
null, true))),
- RpcClient.create(conf,
TestingClientMetricGroup.newInstance(), false),
+ RpcClient.create(conf,
TestingClientMetricGroup.newInstance()),
TestingMetricGroups.TABLET_SERVER_METRICS,
manualClock,
ioExecutor);
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java
b/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java
index 37d3f6c46..9498419e0 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java
@@ -219,8 +219,7 @@ public final class FlussClusterExtension
RpcClient.create(
conf,
new ClientMetricGroup(
- MetricRegistry.create(conf, null),
"fluss-cluster-extension"),
- false);
+ MetricRegistry.create(conf, null),
"fluss-cluster-extension"));
startCoordinatorServer();
startTabletServers();
// wait coordinator knows all tablet servers to make cluster
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/utils/RpcGatewayManagerTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/utils/RpcGatewayManagerTest.java
index f059358a3..8e07921a7 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/utils/RpcGatewayManagerTest.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/utils/RpcGatewayManagerTest.java
@@ -36,7 +36,7 @@ class RpcGatewayManagerTest {
RpcGatewayManager<TabletServerGateway> gatewayRpcGatewayManager =
new RpcGatewayManager<>(
new NettyClient(
- new Configuration(),
TestingClientMetricGroup.newInstance(), false),
+ new Configuration(),
TestingClientMetricGroup.newInstance()),
TabletServerGateway.class);
ServerNode serverNode1 =