This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 61c4ece67 [client] Add unavailable tabletServer set for
MetadataUpdater to avoid serverNode info of all tabletServers are invalid
(#2098)
61c4ece67 is described below
commit 61c4ece67ef9b16f8b5d3d1284bb0abdd9513afe
Author: yunhong <[email protected]>
AuthorDate: Thu Dec 4 13:50:08 2025 +0800
[client] Add unavailable tabletServer set for MetadataUpdater to avoid
serverNode info of all tabletServers are invalid (#2098)
---
.../org/apache/fluss/client/FlussConnection.java | 16 +++++--
.../fluss/client/metadata/MetadataUpdater.java | 50 +++++++++++++++++-----
.../apache/fluss/client/utils/MetadataUtils.java | 25 +++++++----
.../client/metadata/MetadataUpdaterITCase.java | 2 +-
.../client/metadata/TestingMetadataUpdater.java | 17 +++++---
.../client/table/scanner/log/LogFetcherTest.java | 2 +-
6 files changed, 84 insertions(+), 28 deletions(-)
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/FlussConnection.java
b/fluss-client/src/main/java/org/apache/fluss/client/FlussConnection.java
index 7af5d6b89..0990756ce 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/FlussConnection.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/FlussConnection.java
@@ -29,6 +29,7 @@ import
org.apache.fluss.client.token.DefaultSecurityTokenProvider;
import org.apache.fluss.client.token.SecurityTokenManager;
import org.apache.fluss.client.token.SecurityTokenProvider;
import org.apache.fluss.client.write.WriterClient;
+import org.apache.fluss.cluster.ServerNode;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.exception.FlussRuntimeException;
@@ -42,6 +43,7 @@ import org.apache.fluss.rpc.metrics.ClientMetricGroup;
import java.time.Duration;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import static
org.apache.fluss.client.utils.MetadataUtils.getOneAvailableTabletServerNode;
@@ -162,9 +164,17 @@ public final class FlussConnection implements Connection {
// todo: may add retry logic when no any available tablet
server?
AdminReadOnlyGateway gateway =
GatewayClientProxy.createGatewayProxy(
- () ->
- getOneAvailableTabletServerNode(
-
metadataUpdater.getCluster()),
+ () -> {
+ ServerNode serverNode =
+
getOneAvailableTabletServerNode(
+
metadataUpdater.getCluster(),
+ new HashSet<>());
+ if (serverNode == null) {
+ throw new FlussRuntimeException(
+ "no available tablet
server");
+ }
+ return serverNode;
+ },
rpcClient,
AdminReadOnlyGateway.class);
SecurityTokenProvider securityTokenProvider =
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java
b/fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java
index 5f838ada8..00e80e463 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java
@@ -50,11 +50,14 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
+import static
org.apache.fluss.client.utils.MetadataUtils.getOneAvailableTabletServerNode;
import static
org.apache.fluss.client.utils.MetadataUtils.sendMetadataRequestAndRebuildCluster;
import static org.apache.fluss.utils.ExceptionUtils.stripExecutionException;
@@ -65,16 +68,19 @@ public class MetadataUpdater {
private static final int MAX_RETRY_TIMES = 3;
private static final int RETRY_INTERVAL_MS = 100;
+ private final Configuration conf;
private final RpcClient rpcClient;
+ private final Set<Integer> unavailableTabletServerIds = new
CopyOnWriteArraySet<>();
protected volatile Cluster cluster;
- public MetadataUpdater(Configuration configuration, RpcClient rpcClient) {
- this(rpcClient, initializeCluster(configuration, rpcClient));
+ public MetadataUpdater(Configuration conf, RpcClient rpcClient) {
+ this(rpcClient, conf, initializeCluster(conf, rpcClient));
}
@VisibleForTesting
- public MetadataUpdater(RpcClient rpcClient, Cluster cluster) {
+ public MetadataUpdater(RpcClient rpcClient, Configuration conf, Cluster
cluster) {
this.rpcClient = rpcClient;
+ this.conf = conf;
this.cluster = cluster;
}
@@ -240,19 +246,43 @@ public class MetadataUpdater {
@Nullable Collection<PhysicalTablePath> tablePartitionNames,
@Nullable Collection<Long> tablePartitionIds)
throws PartitionNotExistException {
+ ServerNode serverNode =
+ getOneAvailableTabletServerNode(cluster,
unavailableTabletServerIds);
try {
synchronized (this) {
- cluster =
- sendMetadataRequestAndRebuildCluster(
- cluster,
- rpcClient,
- tablePaths,
- tablePartitionNames,
- tablePartitionIds);
+ if (serverNode == null) {
+ LOG.info(
+ "No available tablet server to update metadata,
try to re-initialize cluster using bootstrap server.");
+ cluster = initializeCluster(conf, rpcClient);
+ } else {
+ cluster =
+ sendMetadataRequestAndRebuildCluster(
+ cluster,
+ rpcClient,
+ tablePaths,
+ tablePartitionNames,
+ tablePartitionIds,
+ serverNode);
+ }
+ }
+
+ Map<Integer, ServerNode> aliveTabletServers =
cluster.getAliveTabletServers();
+
unavailableTabletServerIds.removeIf(aliveTabletServers::containsKey);
+ if (!unavailableTabletServerIds.isEmpty()) {
+ LOG.info(
+ "After update metadata, unavailable tabletServer set:
{}",
+ unavailableTabletServerIds);
}
} catch (Exception e) {
Throwable t = stripExecutionException(e);
if (t instanceof RetriableException || t instanceof
TimeoutException) {
+ if (serverNode != null) {
+ unavailableTabletServerIds.add(serverNode.id());
+ LOG.warn(
+ "tabletServer {} is unavailable for updating
metadata for retriable exception. unavailable tabletServer set {}",
+ serverNode,
+ unavailableTabletServerIds);
+ }
LOG.warn("Failed to update metadata, but the exception is
re-triable.", t);
} else if (t instanceof PartitionNotExistException) {
LOG.warn("Failed to update metadata because the partition does
not exist", t);
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/utils/MetadataUtils.java
b/fluss-client/src/main/java/org/apache/fluss/client/utils/MetadataUtils.java
index 9ef857f17..14aa480da 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/utils/MetadataUtils.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/utils/MetadataUtils.java
@@ -21,7 +21,6 @@ import org.apache.fluss.cluster.BucketLocation;
import org.apache.fluss.cluster.Cluster;
import org.apache.fluss.cluster.ServerNode;
import org.apache.fluss.cluster.ServerType;
-import org.apache.fluss.exception.FlussRuntimeException;
import org.apache.fluss.exception.StaleMetadataException;
import org.apache.fluss.metadata.PhysicalTablePath;
import org.apache.fluss.metadata.TableBucket;
@@ -39,6 +38,9 @@ import org.apache.fluss.rpc.messages.PbServerNode;
import org.apache.fluss.rpc.messages.PbTableMetadata;
import org.apache.fluss.rpc.messages.PbTablePath;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import javax.annotation.Nullable;
import java.util.ArrayList;
@@ -55,6 +57,8 @@ import java.util.concurrent.TimeoutException;
/** Utils for metadata for client. */
public class MetadataUtils {
+ private static final Logger LOG =
LoggerFactory.getLogger(MetadataUtils.class);
+
private static final Random randOffset = new Random();
/**
@@ -79,13 +83,12 @@ public class MetadataUtils {
RpcClient client,
@Nullable Set<TablePath> tablePaths,
@Nullable Collection<PhysicalTablePath> tablePartitionNames,
- @Nullable Collection<Long> tablePartitionIds)
+ @Nullable Collection<Long> tablePartitionIds,
+ ServerNode serverNode)
throws ExecutionException, InterruptedException, TimeoutException {
AdminReadOnlyGateway gateway =
GatewayClientProxy.createGatewayProxy(
- () -> getOneAvailableTabletServerNode(cluster),
- client,
- AdminReadOnlyGateway.class);
+ () -> serverNode, client, AdminReadOnlyGateway.class);
return sendMetadataRequestAndRebuildCluster(
gateway, true, cluster, tablePaths, tablePartitionNames,
tablePartitionIds);
}
@@ -253,10 +256,16 @@ public class MetadataUtils {
}
}
- public static ServerNode getOneAvailableTabletServerNode(Cluster cluster) {
- List<ServerNode> aliveTabletServers =
cluster.getAliveTabletServerList();
+ public static @Nullable ServerNode getOneAvailableTabletServerNode(
+ Cluster cluster, Set<Integer> unavailableTabletServerIds) {
+ List<ServerNode> aliveTabletServers = new
ArrayList<>(cluster.getAliveTabletServerList());
+ if (!unavailableTabletServerIds.isEmpty()) {
+ aliveTabletServers.removeIf(
+ serverNode ->
unavailableTabletServerIds.contains(serverNode.id()));
+ }
+
if (aliveTabletServers.isEmpty()) {
- throw new FlussRuntimeException("no alive tablet server in
cluster");
+ return null;
}
// just pick one random server node
int offset = randOffset.nextInt(aliveTabletServers.size());
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterITCase.java
b/fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterITCase.java
index f7151dbda..caecbe545 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterITCase.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterITCase.java
@@ -111,7 +111,7 @@ class MetadataUpdaterITCase {
Collections.emptyMap(),
Collections.emptyMap());
- metadataUpdater = new MetadataUpdater(rpcClient, newCluster);
+ metadataUpdater = new MetadataUpdater(rpcClient, new Configuration(),
newCluster);
// shouldn't update metadata to empty since the empty metadata will be
ignored
metadataUpdater.updateMetadata(null, null, null);
assertThat(metadataUpdater.getCluster().getAliveTabletServers())
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingMetadataUpdater.java
b/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingMetadataUpdater.java
index b89025a78..4d863a29a 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingMetadataUpdater.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingMetadataUpdater.java
@@ -55,17 +55,23 @@ public class TestingMetadataUpdater extends MetadataUpdater
{
private final Map<Integer, TestTabletServerGateway> tabletServerGatewayMap;
public TestingMetadataUpdater(Map<TablePath, TableInfo> tableInfos) {
- this(COORDINATOR, Arrays.asList(NODE1, NODE2, NODE3), tableInfos,
null);
+ this(
+ COORDINATOR,
+ Arrays.asList(NODE1, NODE2, NODE3),
+ tableInfos,
+ null,
+ new Configuration());
}
private TestingMetadataUpdater(
ServerNode coordinatorServer,
List<ServerNode> tabletServers,
Map<TablePath, TableInfo> tableInfos,
- Map<Integer, TestTabletServerGateway> customGateways) {
+ Map<Integer, TestTabletServerGateway> customGateways,
+ Configuration conf) {
super(
- RpcClient.create(
- new Configuration(),
TestingClientMetricGroup.newInstance(), false),
+ RpcClient.create(conf, TestingClientMetricGroup.newInstance(),
false),
+ conf,
Cluster.empty());
initializeCluster(coordinatorServer, tabletServers, tableInfos);
coordinatorGateway = new TestCoordinatorGateway();
@@ -122,7 +128,8 @@ public class TestingMetadataUpdater extends MetadataUpdater
{
COORDINATOR,
Arrays.asList(NODE1, NODE2, NODE3),
tableInfos,
- customGateways.isEmpty() ? null : customGateways);
+ customGateways.isEmpty() ? null : customGateways,
+ new Configuration());
}
}
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherTest.java
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherTest.java
index 0f65685c8..3393a37a3 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherTest.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherTest.java
@@ -262,7 +262,7 @@ public class LogFetcherTest extends
ClientToServerITCaseBase {
oldCluster.getTableIdByPath(),
oldCluster.getPartitionIdByPath(),
oldCluster.getTableInfoByPath());
- metadataUpdater = new MetadataUpdater(rpcClient, newCluster);
+ metadataUpdater = new MetadataUpdater(rpcClient, clientConf,
newCluster);
LogScannerStatus logScannerStatus = new LogScannerStatus();
logScannerStatus.assignScanBuckets(Collections.singletonMap(tb0, 0L));