This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 61c4ece67 [client] Add unavailable tabletServer set for 
MetadataUpdater to avoid serverNode info of all tabletServers are invalid 
(#2098)
61c4ece67 is described below

commit 61c4ece67ef9b16f8b5d3d1284bb0abdd9513afe
Author: yunhong <[email protected]>
AuthorDate: Thu Dec 4 13:50:08 2025 +0800

    [client] Add unavailable tabletServer set for MetadataUpdater to avoid 
serverNode info of all tabletServers are invalid (#2098)
---
 .../org/apache/fluss/client/FlussConnection.java   | 16 +++++--
 .../fluss/client/metadata/MetadataUpdater.java     | 50 +++++++++++++++++-----
 .../apache/fluss/client/utils/MetadataUtils.java   | 25 +++++++----
 .../client/metadata/MetadataUpdaterITCase.java     |  2 +-
 .../client/metadata/TestingMetadataUpdater.java    | 17 +++++---
 .../client/table/scanner/log/LogFetcherTest.java   |  2 +-
 6 files changed, 84 insertions(+), 28 deletions(-)

diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/FlussConnection.java 
b/fluss-client/src/main/java/org/apache/fluss/client/FlussConnection.java
index 7af5d6b89..0990756ce 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/FlussConnection.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/FlussConnection.java
@@ -29,6 +29,7 @@ import 
org.apache.fluss.client.token.DefaultSecurityTokenProvider;
 import org.apache.fluss.client.token.SecurityTokenManager;
 import org.apache.fluss.client.token.SecurityTokenProvider;
 import org.apache.fluss.client.write.WriterClient;
+import org.apache.fluss.cluster.ServerNode;
 import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.config.Configuration;
 import org.apache.fluss.exception.FlussRuntimeException;
@@ -42,6 +43,7 @@ import org.apache.fluss.rpc.metrics.ClientMetricGroup;
 
 import java.time.Duration;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 
 import static 
org.apache.fluss.client.utils.MetadataUtils.getOneAvailableTabletServerNode;
@@ -162,9 +164,17 @@ public final class FlussConnection implements Connection {
                     // todo: may add retry logic when no any available tablet 
server?
                     AdminReadOnlyGateway gateway =
                             GatewayClientProxy.createGatewayProxy(
-                                    () ->
-                                            getOneAvailableTabletServerNode(
-                                                    
metadataUpdater.getCluster()),
+                                    () -> {
+                                        ServerNode serverNode =
+                                                
getOneAvailableTabletServerNode(
+                                                        
metadataUpdater.getCluster(),
+                                                        new HashSet<>());
+                                        if (serverNode == null) {
+                                            throw new FlussRuntimeException(
+                                                    "no available tablet 
server");
+                                        }
+                                        return serverNode;
+                                    },
                                     rpcClient,
                                     AdminReadOnlyGateway.class);
                     SecurityTokenProvider securityTokenProvider =
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java
index 5f838ada8..00e80e463 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java
@@ -50,11 +50,14 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 
+import static 
org.apache.fluss.client.utils.MetadataUtils.getOneAvailableTabletServerNode;
 import static 
org.apache.fluss.client.utils.MetadataUtils.sendMetadataRequestAndRebuildCluster;
 import static org.apache.fluss.utils.ExceptionUtils.stripExecutionException;
 
@@ -65,16 +68,19 @@ public class MetadataUpdater {
     private static final int MAX_RETRY_TIMES = 3;
     private static final int RETRY_INTERVAL_MS = 100;
 
+    private final Configuration conf;
     private final RpcClient rpcClient;
+    private final Set<Integer> unavailableTabletServerIds = new 
CopyOnWriteArraySet<>();
     protected volatile Cluster cluster;
 
-    public MetadataUpdater(Configuration configuration, RpcClient rpcClient) {
-        this(rpcClient, initializeCluster(configuration, rpcClient));
+    public MetadataUpdater(Configuration conf, RpcClient rpcClient) {
+        this(rpcClient, conf, initializeCluster(conf, rpcClient));
     }
 
     @VisibleForTesting
-    public MetadataUpdater(RpcClient rpcClient, Cluster cluster) {
+    public MetadataUpdater(RpcClient rpcClient, Configuration conf, Cluster 
cluster) {
         this.rpcClient = rpcClient;
+        this.conf = conf;
         this.cluster = cluster;
     }
 
@@ -240,19 +246,43 @@ public class MetadataUpdater {
             @Nullable Collection<PhysicalTablePath> tablePartitionNames,
             @Nullable Collection<Long> tablePartitionIds)
             throws PartitionNotExistException {
+        ServerNode serverNode =
+                getOneAvailableTabletServerNode(cluster, 
unavailableTabletServerIds);
         try {
             synchronized (this) {
-                cluster =
-                        sendMetadataRequestAndRebuildCluster(
-                                cluster,
-                                rpcClient,
-                                tablePaths,
-                                tablePartitionNames,
-                                tablePartitionIds);
+                if (serverNode == null) {
+                    LOG.info(
+                            "No available tablet server to update metadata, 
try to re-initialize cluster using bootstrap server.");
+                    cluster = initializeCluster(conf, rpcClient);
+                } else {
+                    cluster =
+                            sendMetadataRequestAndRebuildCluster(
+                                    cluster,
+                                    rpcClient,
+                                    tablePaths,
+                                    tablePartitionNames,
+                                    tablePartitionIds,
+                                    serverNode);
+                }
+            }
+
+            Map<Integer, ServerNode> aliveTabletServers = 
cluster.getAliveTabletServers();
+            
unavailableTabletServerIds.removeIf(aliveTabletServers::containsKey);
+            if (!unavailableTabletServerIds.isEmpty()) {
+                LOG.info(
+                        "After update metadata, unavailable tabletServer set: 
{}",
+                        unavailableTabletServerIds);
             }
         } catch (Exception e) {
             Throwable t = stripExecutionException(e);
             if (t instanceof RetriableException || t instanceof 
TimeoutException) {
+                if (serverNode != null) {
+                    unavailableTabletServerIds.add(serverNode.id());
+                    LOG.warn(
+                            "tabletServer {} is unavailable for updating 
metadata for retriable exception. unavailable tabletServer set {}",
+                            serverNode,
+                            unavailableTabletServerIds);
+                }
                 LOG.warn("Failed to update metadata, but the exception is 
re-triable.", t);
             } else if (t instanceof PartitionNotExistException) {
                 LOG.warn("Failed to update metadata because the partition does 
not exist", t);
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/utils/MetadataUtils.java 
b/fluss-client/src/main/java/org/apache/fluss/client/utils/MetadataUtils.java
index 9ef857f17..14aa480da 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/utils/MetadataUtils.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/utils/MetadataUtils.java
@@ -21,7 +21,6 @@ import org.apache.fluss.cluster.BucketLocation;
 import org.apache.fluss.cluster.Cluster;
 import org.apache.fluss.cluster.ServerNode;
 import org.apache.fluss.cluster.ServerType;
-import org.apache.fluss.exception.FlussRuntimeException;
 import org.apache.fluss.exception.StaleMetadataException;
 import org.apache.fluss.metadata.PhysicalTablePath;
 import org.apache.fluss.metadata.TableBucket;
@@ -39,6 +38,9 @@ import org.apache.fluss.rpc.messages.PbServerNode;
 import org.apache.fluss.rpc.messages.PbTableMetadata;
 import org.apache.fluss.rpc.messages.PbTablePath;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
@@ -55,6 +57,8 @@ import java.util.concurrent.TimeoutException;
 /** Utils for metadata for client. */
 public class MetadataUtils {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(MetadataUtils.class);
+
     private static final Random randOffset = new Random();
 
     /**
@@ -79,13 +83,12 @@ public class MetadataUtils {
             RpcClient client,
             @Nullable Set<TablePath> tablePaths,
             @Nullable Collection<PhysicalTablePath> tablePartitionNames,
-            @Nullable Collection<Long> tablePartitionIds)
+            @Nullable Collection<Long> tablePartitionIds,
+            ServerNode serverNode)
             throws ExecutionException, InterruptedException, TimeoutException {
         AdminReadOnlyGateway gateway =
                 GatewayClientProxy.createGatewayProxy(
-                        () -> getOneAvailableTabletServerNode(cluster),
-                        client,
-                        AdminReadOnlyGateway.class);
+                        () -> serverNode, client, AdminReadOnlyGateway.class);
         return sendMetadataRequestAndRebuildCluster(
                 gateway, true, cluster, tablePaths, tablePartitionNames, 
tablePartitionIds);
     }
@@ -253,10 +256,16 @@ public class MetadataUtils {
         }
     }
 
-    public static ServerNode getOneAvailableTabletServerNode(Cluster cluster) {
-        List<ServerNode> aliveTabletServers = 
cluster.getAliveTabletServerList();
+    public static @Nullable ServerNode getOneAvailableTabletServerNode(
+            Cluster cluster, Set<Integer> unavailableTabletServerIds) {
+        List<ServerNode> aliveTabletServers = new 
ArrayList<>(cluster.getAliveTabletServerList());
+        if (!unavailableTabletServerIds.isEmpty()) {
+            aliveTabletServers.removeIf(
+                    serverNode -> 
unavailableTabletServerIds.contains(serverNode.id()));
+        }
+
         if (aliveTabletServers.isEmpty()) {
-            throw new FlussRuntimeException("no alive tablet server in 
cluster");
+            return null;
         }
         // just pick one random server node
         int offset = randOffset.nextInt(aliveTabletServers.size());
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterITCase.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterITCase.java
index f7151dbda..caecbe545 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterITCase.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterITCase.java
@@ -111,7 +111,7 @@ class MetadataUpdaterITCase {
                         Collections.emptyMap(),
                         Collections.emptyMap());
 
-        metadataUpdater = new MetadataUpdater(rpcClient, newCluster);
+        metadataUpdater = new MetadataUpdater(rpcClient, new Configuration(), 
newCluster);
         // shouldn't update metadata to empty since the empty metadata will be 
ignored
         metadataUpdater.updateMetadata(null, null, null);
         assertThat(metadataUpdater.getCluster().getAliveTabletServers())
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingMetadataUpdater.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingMetadataUpdater.java
index b89025a78..4d863a29a 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingMetadataUpdater.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingMetadataUpdater.java
@@ -55,17 +55,23 @@ public class TestingMetadataUpdater extends MetadataUpdater 
{
     private final Map<Integer, TestTabletServerGateway> tabletServerGatewayMap;
 
     public TestingMetadataUpdater(Map<TablePath, TableInfo> tableInfos) {
-        this(COORDINATOR, Arrays.asList(NODE1, NODE2, NODE3), tableInfos, 
null);
+        this(
+                COORDINATOR,
+                Arrays.asList(NODE1, NODE2, NODE3),
+                tableInfos,
+                null,
+                new Configuration());
     }
 
     private TestingMetadataUpdater(
             ServerNode coordinatorServer,
             List<ServerNode> tabletServers,
             Map<TablePath, TableInfo> tableInfos,
-            Map<Integer, TestTabletServerGateway> customGateways) {
+            Map<Integer, TestTabletServerGateway> customGateways,
+            Configuration conf) {
         super(
-                RpcClient.create(
-                        new Configuration(), 
TestingClientMetricGroup.newInstance(), false),
+                RpcClient.create(conf, TestingClientMetricGroup.newInstance(), 
false),
+                conf,
                 Cluster.empty());
         initializeCluster(coordinatorServer, tabletServers, tableInfos);
         coordinatorGateway = new TestCoordinatorGateway();
@@ -122,7 +128,8 @@ public class TestingMetadataUpdater extends MetadataUpdater 
{
                     COORDINATOR,
                     Arrays.asList(NODE1, NODE2, NODE3),
                     tableInfos,
-                    customGateways.isEmpty() ? null : customGateways);
+                    customGateways.isEmpty() ? null : customGateways,
+                    new Configuration());
         }
     }
 
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherTest.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherTest.java
index 0f65685c8..3393a37a3 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherTest.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherTest.java
@@ -262,7 +262,7 @@ public class LogFetcherTest extends 
ClientToServerITCaseBase {
                         oldCluster.getTableIdByPath(),
                         oldCluster.getPartitionIdByPath(),
                         oldCluster.getTableInfoByPath());
-        metadataUpdater = new MetadataUpdater(rpcClient, newCluster);
+        metadataUpdater = new MetadataUpdater(rpcClient, clientConf, 
newCluster);
 
         LogScannerStatus logScannerStatus = new LogScannerStatus();
         logScannerStatus.assignScanBuckets(Collections.singletonMap(tb0, 0L));

Reply via email to