This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 97d6f34e2 [client] Fix the MetadataRequest sent by LogFetcher timeout
exception when upgrading cluster (#1666)
97d6f34e2 is described below
commit 97d6f34e2b174615841d2b71d248ff0cf6ac97b4
Author: yunhong <[email protected]>
AuthorDate: Mon Sep 15 22:29:33 2025 +0800
[client] Fix the MetadataRequest sent by LogFetcher timeout exception when
upgrading cluster (#1666)
---
.../fluss/client/metadata/MetadataUpdater.java | 2 +-
.../fluss/client/table/scanner/log/LogFetcher.java | 59 ++++++++++++++++------
.../client/admin/ClientToServerITCaseBase.java | 2 +
.../fluss/client/metadata/MetadataUpdaterTest.java | 4 +-
.../client/table/scanner/log/LogFetcherTest.java | 43 ++++++++++++++--
.../org/apache/fluss/config/ConfigOptions.java | 4 +-
website/docs/maintenance/configuration.md | 2 +-
7 files changed, 92 insertions(+), 24 deletions(-)
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java
b/fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java
index 1730235f4..92d81bd10 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java
@@ -254,7 +254,7 @@ public class MetadataUpdater {
}
@VisibleForTesting
- protected void updateMetadata(
+ public void updateMetadata(
@Nullable Set<TablePath> tablePaths,
@Nullable Collection<PhysicalTablePath> tablePartitionNames,
@Nullable Collection<Long> tablePartitionIds)
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java
index d195dc6f0..ff67575e2 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java
@@ -162,14 +162,19 @@ public class LogFetcher implements Closeable {
* Set up a fetch request for any node that we have assigned buckets for
which doesn't already
* have an in-flight fetch or pending fetch data.
*/
- public synchronized void sendFetches() {
- Map<Integer, FetchLogRequest> fetchRequestMap =
prepareFetchLogRequests();
- fetchRequestMap.forEach(
- (nodeId, fetchLogRequest) -> {
- LOG.debug("Adding pending request for node id {}", nodeId);
- nodesWithPendingFetchRequests.add(nodeId);
- sendFetchRequest(nodeId, fetchLogRequest);
- });
+ public void sendFetches() {
+ checkAndUpdateMetadata(fetchableBuckets());
+ synchronized (this) {
+ // NOTE: Don't perform heavy I/O operations or synchronous waits
inside this lock to
+ // avoid blocking the future complete of FetchLogResponse.
+ Map<Integer, FetchLogRequest> fetchRequestMap =
prepareFetchLogRequests();
+ fetchRequestMap.forEach(
+ (nodeId, fetchLogRequest) -> {
+ LOG.debug("Adding pending request for node id {}",
nodeId);
+ nodesWithPendingFetchRequests.add(nodeId);
+ sendFetchRequest(nodeId, fetchLogRequest);
+ });
+ }
}
/**
@@ -190,6 +195,31 @@ public class LogFetcher implements Closeable {
logFetchBuffer.wakeup();
}
+ private void checkAndUpdateMetadata(List<TableBucket> tableBuckets) {
+ // If the table is partitioned table, check if we need update
partition metadata.
+ List<Long> partitionIds = isPartitioned ? new ArrayList<>() : null;
+ // If the table is none-partitioned table, check if we need update
table metadata.
+ boolean needUpdate = false;
+ for (TableBucket tb : tableBuckets) {
+ if (getTableBucketLeader(tb) != null) {
+ continue;
+ }
+
+ if (isPartitioned) {
+ partitionIds.add(tb.getPartitionId());
+ } else {
+ needUpdate = true;
+ break;
+ }
+ }
+
+ if (isPartitioned && !partitionIds.isEmpty()) {
+ metadataUpdater.updateMetadata(Collections.singleton(tablePath),
null, partitionIds);
+ } else if (needUpdate) {
+ metadataUpdater.updateTableOrPartitionMetadata(tablePath, null);
+ }
+ }
+
private void sendFetchRequest(int destination, FetchLogRequest
fetchLogRequest) {
TableOrPartitions tableOrPartitionsInFetchRequest =
getTableOrPartitionsInFetchRequest(fetchLogRequest);
@@ -243,18 +273,21 @@ public class LogFetcher implements Closeable {
return new TableOrPartitions(tableIdsInFetchRequest,
tablePartitionsInFetchRequest);
}
- private static class TableOrPartitions {
+ /** A helper class to hold table ids or table partitions. */
+ @VisibleForTesting
+ static class TableOrPartitions {
private final @Nullable Set<Long> tableIds;
private final @Nullable Set<TablePartition> tablePartitions;
- private TableOrPartitions(
+ TableOrPartitions(
@Nullable Set<Long> tableIds, @Nullable Set<TablePartition>
tablePartitions) {
this.tableIds = tableIds;
this.tablePartitions = tablePartitions;
}
}
- private void invalidTableOrPartitions(TableOrPartitions tableOrPartitions)
{
+ @VisibleForTesting
+ void invalidTableOrPartitions(TableOrPartitions tableOrPartitions) {
Set<PhysicalTablePath> physicalTablePaths =
metadataUpdater.getPhysicalTablePathByIds(
tableOrPartitions.tableIds,
tableOrPartitions.tablePartitions);
@@ -404,9 +437,6 @@ public class LogFetcher implements Closeable {
LOG.trace(
"Skipping fetch request for bucket {} because leader
is not available.",
tb);
- // try to get the latest metadata info of this table because
the leader for this
- // bucket is unknown.
- metadataUpdater.updateTableOrPartitionMetadata(tablePath,
tb.getPartitionId());
} else if (nodesWithPendingFetchRequests.contains(leader)) {
LOG.trace(
"Skipping fetch request for bucket {} because previous
request "
@@ -472,7 +502,6 @@ public class LogFetcher implements Closeable {
}
private Integer getTableBucketLeader(TableBucket tableBucket) {
- metadataUpdater.checkAndUpdateMetadata(tablePath, tableBucket);
if (metadataUpdater.getBucketLocation(tableBucket).isPresent()) {
BucketLocation bucketLocation =
metadataUpdater.getBucketLocation(tableBucket).get();
if (bucketLocation.getLeader() != null) {
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/admin/ClientToServerITCaseBase.java
b/fluss-client/src/test/java/org/apache/fluss/client/admin/ClientToServerITCaseBase.java
index 3925384e7..f26ee3d6a 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/admin/ClientToServerITCaseBase.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/admin/ClientToServerITCaseBase.java
@@ -121,6 +121,8 @@ public abstract class ClientToServerITCaseBase {
conf.set(ConfigOptions.CLIENT_WRITER_BATCH_SIZE,
MemorySize.parse("1kb"));
conf.set(ConfigOptions.MAX_PARTITION_NUM, 10);
conf.set(ConfigOptions.MAX_BUCKET_NUM, 30);
+
+ conf.set(ConfigOptions.NETTY_CLIENT_NUM_NETWORK_THREADS, 1);
return conf;
}
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterTest.java
b/fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterTest.java
index 514025ae9..5471d8b60 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterTest.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterTest.java
@@ -20,7 +20,6 @@ package org.apache.fluss.client.metadata;
import org.apache.fluss.client.Connection;
import org.apache.fluss.client.ConnectionFactory;
import org.apache.fluss.client.admin.Admin;
-import org.apache.fluss.client.utils.MetadataUtils;
import org.apache.fluss.cluster.Cluster;
import org.apache.fluss.cluster.ServerNode;
import org.apache.fluss.config.Configuration;
@@ -34,6 +33,7 @@ import org.junit.jupiter.api.extension.RegisterExtension;
import java.util.Collections;
import java.util.List;
+import static
org.apache.fluss.client.utils.MetadataUtils.sendMetadataRequestAndRebuildCluster;
import static org.apache.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR;
import static org.assertj.core.api.Assertions.assertThat;
@@ -64,7 +64,7 @@ class MetadataUpdaterTest {
// any N levels UnmodifiableCollection
for (int i = 0; i < 20000; i++) {
cluster =
- MetadataUtils.sendMetadataRequestAndRebuildCluster(
+ sendMetadataRequestAndRebuildCluster(
FLUSS_CLUSTER_EXTENSION.newCoordinatorClient(),
true,
cluster,
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherTest.java
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherTest.java
index 634bcb5ea..b131986e7 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherTest.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherTest.java
@@ -39,6 +39,10 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import static org.apache.fluss.record.TestData.DATA1;
import static org.apache.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR;
@@ -55,6 +59,7 @@ public class LogFetcherTest extends ClientToServerITCaseBase {
private long tableId;
private final int bucketId0 = 0;
private final int bucketId1 = 1;
+ private LogScannerStatus logScannerStatus;
// TODO covert this test to UT as kafka.
@@ -74,9 +79,8 @@ public class LogFetcherTest extends ClientToServerITCaseBase {
// add bucket 0 and bucket 1 to log scanner status.
scanBuckets.put(new TableBucket(tableId, bucketId0), 0L);
scanBuckets.put(new TableBucket(tableId, bucketId1), 0L);
- LogScannerStatus logScannerStatus = new LogScannerStatus();
+ logScannerStatus = new LogScannerStatus();
logScannerStatus.assignScanBuckets(scanBuckets);
- TestingScannerMetricGroup scannerMetricGroup =
TestingScannerMetricGroup.newInstance();
logFetcher =
new LogFetcher(
DATA1_TABLE_INFO,
@@ -84,7 +88,7 @@ public class LogFetcherTest extends ClientToServerITCaseBase {
logScannerStatus,
clientConf,
metadataUpdater,
- scannerMetricGroup,
+ TestingScannerMetricGroup.newInstance(),
new RemoteFileDownloader(1));
}
@@ -183,6 +187,39 @@ public class LogFetcherTest extends
ClientToServerITCaseBase {
assertThat(records.get(tb0).size()).isEqualTo(10);
}
+ @Test
+ void testFetchWithInvalidTableOrPartitions() throws Exception {
+ MetadataUpdater metadataUpdater1 =
+ new MetadataUpdater(clientConf,
FLUSS_CLUSTER_EXTENSION.getRpcClient());
+ logFetcher =
+ new LogFetcher(
+ DATA1_TABLE_INFO,
+ null,
+ logScannerStatus,
+ clientConf,
+ metadataUpdater1,
+ TestingScannerMetricGroup.newInstance(),
+ new RemoteFileDownloader(1));
+
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ Future<?> future =
+ executor.submit(
+ () -> {
+ // If this test blocked, please checking whether
it was blocked with
+ // the same reason as
https://github.com/apache/fluss/pull/1666
+ for (int i = 0; i < 1000; i++) {
+ logFetcher.sendFetches();
+ logFetcher.invalidTableOrPartitions(
+ new LogFetcher.TableOrPartitions(
+
Collections.singleton(tableId), null));
+ }
+ });
+
+ future.get(30, TimeUnit.SECONDS);
+ assertThat(future.isDone()).isTrue();
+ executor.shutdownNow();
+ }
+
private void addRecordsToBucket(
TableBucket tableBucket, MemoryLogRecords logRecords, long
expectedBaseOffset)
throws Exception {
diff --git
a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
index 343434d38..07de50bfc 100644
--- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
+++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
@@ -763,10 +763,10 @@ public class ConfigOptions {
public static final ConfigOption<Integer> NETTY_CLIENT_NUM_NETWORK_THREADS
=
key("netty.client.num-network-threads")
.intType()
- .defaultValue(3)
+ .defaultValue(4)
.withDescription(
"The number of threads that the client uses for
sending requests to the "
- + "network and receiving responses from
network. The default value is 3");
+ + "network and receiving responses from
network. The default value is 4");
// ------------------------------------------------------------------------
// Client Settings
diff --git a/website/docs/maintenance/configuration.md
b/website/docs/maintenance/configuration.md
index c5dc0ed7e..79ca3c347 100644
--- a/website/docs/maintenance/configuration.md
+++ b/website/docs/maintenance/configuration.md
@@ -92,7 +92,7 @@ during the Fluss cluster working.
| netty.server.num-worker-threads | Integer | 8 | The number of
threads that the server uses for processing requests, which may include disk
and remote I/O. |
| netty.server.max-queued-requests | Integer | 500 | The number of queued
requests allowed for worker threads, before blocking the I/O threads.
|
| netty.connection.max-idle-time | Duration | 10min | Close idle
connections after the given time specified by this config.
|
-| netty.client.num-network-threads | Integer | 3 | The number of
threads that the client uses for sending requests to the network and receiving
responses from network. The default value is 3 |
+| netty.client.num-network-threads | Integer | 4 | The number of
threads that the client uses for sending requests to the network and receiving
responses from network. The default value is 4 |
## Log