This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new eb19eeaf0 [server] Enhance ReplicaManager#getReplicaOrException to 
return more accurate error messages (#1573)
eb19eeaf0 is described below

commit eb19eeaf018bfd1e8084e46c209ce64d91a7365a
Author: Liebing <[email protected]>
AuthorDate: Tue Aug 26 23:06:41 2025 +0800

    [server] Enhance ReplicaManager#getReplicaOrException to return more 
accurate error messages (#1573)
---
 .../server/metadata/ServerMetadataSnapshot.java    | 15 +++++
 .../server/metadata/TabletServerMetadataCache.java |  5 ++
 .../fluss/server/replica/ReplicaManager.java       |  6 +-
 .../metadata/TabletServerMetadataCacheTest.java    | 57 ++++++++++++++++
 .../fluss/server/replica/ReplicaManagerTest.java   | 75 ++++++++++++++++++++++
 5 files changed, 157 insertions(+), 1 deletion(-)

diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/metadata/ServerMetadataSnapshot.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/metadata/ServerMetadataSnapshot.java
index 11d45873e..a3faeb90e 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/metadata/ServerMetadataSnapshot.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/metadata/ServerMetadataSnapshot.java
@@ -22,6 +22,7 @@ import org.apache.fluss.cluster.Cluster;
 import org.apache.fluss.cluster.ServerNode;
 import org.apache.fluss.cluster.TabletServerInfo;
 import org.apache.fluss.metadata.PhysicalTablePath;
+import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.metadata.TablePath;
 
 import javax.annotation.Nullable;
@@ -179,4 +180,18 @@ public class ServerMetadataSnapshot {
     public Map<Integer, ServerInfo> getAliveTabletServers() {
         return aliveTabletServers;
     }
+
+    public boolean contains(TableBucket tableBucket) {
+        if (tableBucket.getPartitionId() == null) {
+            return 
bucketMetadataMapForTables.containsKey(tableBucket.getTableId())
+                    && bucketMetadataMapForTables
+                            .get(tableBucket.getTableId())
+                            .containsKey(tableBucket.getBucket());
+        } else {
+            return 
bucketMetadataMapForPartitions.containsKey(tableBucket.getPartitionId())
+                    && bucketMetadataMapForPartitions
+                            .get(tableBucket.getPartitionId())
+                            .containsKey(tableBucket.getBucket());
+        }
+    }
 }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/metadata/TabletServerMetadataCache.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/metadata/TabletServerMetadataCache.java
index 7a9bfda59..ff6855e13 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/metadata/TabletServerMetadataCache.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/metadata/TabletServerMetadataCache.java
@@ -21,6 +21,7 @@ import org.apache.fluss.annotation.VisibleForTesting;
 import org.apache.fluss.cluster.ServerNode;
 import org.apache.fluss.cluster.TabletServerInfo;
 import org.apache.fluss.metadata.PhysicalTablePath;
+import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.metadata.TableInfo;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.server.coordinator.MetadataManager;
@@ -160,6 +161,10 @@ public class TabletServerMetadataCache implements 
ServerMetadataCache {
         }
     }
 
+    public boolean contains(TableBucket tableBucket) {
+        return serverMetadataSnapshot.contains(tableBucket);
+    }
+
     public void updateClusterMetadata(ClusterMetadata clusterMetadata) {
         inLock(
                 metadataLock,
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
index 3f901ec5c..e85985767 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
@@ -1499,8 +1499,12 @@ public class ReplicaManager {
         HostedReplica replica = getReplica(tableBucket);
         if (replica instanceof OnlineReplica) {
             return ((OnlineReplica) replica).getReplica();
+        } else if (replica instanceof OfflineReplica) {
+            throw new StorageException(tableBucket + " is offline on server " 
+ serverId);
+        } else if ((replica instanceof NoneReplica) && 
metadataCache.contains(tableBucket)) {
+            throw new NotLeaderOrFollowerException(
+                    "server " + serverId + " is not leader or follower for " + 
tableBucket);
         } else {
-            // TODO add metadata cache to judge.
             throw new UnknownTableOrBucketException("Unknown table or bucket: 
" + tableBucket);
         }
     }
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/metadata/TabletServerMetadataCacheTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/metadata/TabletServerMetadataCacheTest.java
index 334949897..90f012b83 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/metadata/TabletServerMetadataCacheTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/metadata/TabletServerMetadataCacheTest.java
@@ -23,6 +23,7 @@ import org.apache.fluss.cluster.TabletServerInfo;
 import org.apache.fluss.config.Configuration;
 import org.apache.fluss.exception.TableNotExistException;
 import org.apache.fluss.metadata.PhysicalTablePath;
+import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.metadata.TableInfo;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.server.coordinator.MetadataManager;
@@ -244,6 +245,62 @@ public class TabletServerMetadataCacheTest {
                 initialBucketMetadata);
     }
 
+    @Test
+    void testContainsTableBucket() {
+        tableMetadataList =
+                Collections.singletonList(
+                        new TableMetadata(DATA1_TABLE_INFO, 
initialBucketMetadata));
+        partitionMetadataList =
+                Collections.singletonList(
+                        new PartitionMetadata(
+                                partitionTableId,
+                                partitionName1,
+                                partitionId1,
+                                initialBucketMetadata));
+        serverMetadataCache.updateClusterMetadata(
+                new ClusterMetadata(
+                        coordinatorServer,
+                        aliveTableServers,
+                        tableMetadataList,
+                        partitionMetadataList));
+
+        assertThat(
+                        serverMetadataCache.contains(
+                                new TableBucket(
+                                        DATA1_TABLE_INFO.getTableId(),
+                                        
initialBucketMetadata.get(0).getBucketId())))
+                .isTrue();
+        assertThat(
+                        serverMetadataCache.contains(
+                                new TableBucket(
+                                        DATA1_TABLE_INFO.getTableId(),
+                                        1L,
+                                        
initialBucketMetadata.get(0).getBucketId())))
+                .isFalse();
+        assertThat(
+                        serverMetadataCache.contains(
+                                new TableBucket(DATA1_TABLE_INFO.getTableId(), 
Integer.MAX_VALUE)))
+                .isFalse();
+
+        assertThat(
+                        serverMetadataCache.contains(
+                                new TableBucket(
+                                        partitionTableId,
+                                        partitionId1,
+                                        
initialBucketMetadata.get(0).getBucketId())))
+                .isTrue();
+        assertThat(
+                        serverMetadataCache.contains(
+                                new TableBucket(
+                                        partitionTableId,
+                                        
initialBucketMetadata.get(0).getBucketId())))
+                .isFalse();
+        assertThat(
+                        serverMetadataCache.contains(
+                                new TableBucket(partitionTableId, 
partitionId1, Integer.MAX_VALUE)))
+                .isFalse();
+    }
+
     private void assertTableMetadataEquals(
             long tableId,
             TableInfo expectedTableInfo,
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java
index f4d8ba5c1..0d90a9a25 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java
@@ -22,8 +22,10 @@ import org.apache.fluss.cluster.ServerNode;
 import org.apache.fluss.cluster.ServerType;
 import org.apache.fluss.exception.InvalidCoordinatorException;
 import org.apache.fluss.exception.InvalidRequiredAcksException;
+import org.apache.fluss.exception.NotLeaderOrFollowerException;
 import org.apache.fluss.exception.PartitionNotExistException;
 import org.apache.fluss.exception.TableNotExistException;
+import org.apache.fluss.exception.UnknownTableOrBucketException;
 import org.apache.fluss.metadata.PhysicalTablePath;
 import org.apache.fluss.metadata.Schema;
 import org.apache.fluss.metadata.TableBucket;
@@ -58,6 +60,7 @@ import org.apache.fluss.server.kv.rocksdb.RocksDBKv;
 import org.apache.fluss.server.kv.snapshot.CompletedSnapshot;
 import org.apache.fluss.server.log.FetchParams;
 import org.apache.fluss.server.log.ListOffsetsParam;
+import org.apache.fluss.server.metadata.BucketMetadata;
 import org.apache.fluss.server.metadata.ClusterMetadata;
 import org.apache.fluss.server.metadata.PartitionMetadata;
 import org.apache.fluss.server.metadata.ServerInfo;
@@ -1637,6 +1640,78 @@ class ReplicaManagerTest extends ReplicaTestBase {
                                 + "The latest known coordinator epoch is 2");
     }
 
+    @Test
+    void testGetReplicaOrException() {
+        // 1. Test online replica
+        TableBucket tb = new TableBucket(DATA1_TABLE_ID, 1);
+        makeLogTableAsLeader(tb.getBucket());
+        
assertThat(replicaManager.getReplicaOrException(tb).getTableBucket()).isEqualTo(tb);
+
+        // 2. Test offline replica
+        // TODO: Add test for offline replica
+
+        // 3. Test not leader or follower replica
+        Set<ServerInfo> tsServerInfoList =
+                new HashSet<>(
+                        Arrays.asList(
+                                new ServerInfo(
+                                        TABLET_SERVER_ID,
+                                        "rack0",
+                                        
Endpoint.fromListenersString("CLIENT://localhost:90"),
+                                        ServerType.TABLET_SERVER),
+                                new ServerInfo(
+                                        2,
+                                        "rack1",
+                                        
Endpoint.fromListenersString("CLIENT://localhost:91"),
+                                        ServerType.TABLET_SERVER),
+                                new ServerInfo(
+                                        3,
+                                        "rack2",
+                                        
Endpoint.fromListenersString("CLIENT://localhost:92"),
+                                        ServerType.TABLET_SERVER)));
+        TablePath tablePath = TablePath.of("test_db_1", 
"test_get_replica_or_exception");
+        long tableId = 150004L;
+        TableInfo tableInfo =
+                TableInfo.of(
+                        tablePath,
+                        tableId,
+                        1,
+                        DATA1_TABLE_DESCRIPTOR,
+                        System.currentTimeMillis(),
+                        System.currentTimeMillis());
+        TableBucket tableBucket1 = new TableBucket(tableId, 1);
+        TableMetadata tableMetadata1 =
+                new TableMetadata(
+                        tableInfo,
+                        Collections.singletonList(
+                                new BucketMetadata(
+                                        tableBucket1.getBucket(),
+                                        null,
+                                        null,
+                                        Arrays.asList(1, 2, 3))));
+        replicaManager.maybeUpdateMetadataCache(
+                0,
+                buildClusterMetadata(
+                        null,
+                        tsServerInfoList,
+                        Collections.singletonList(tableMetadata1),
+                        Collections.emptyList()));
+        assertThatThrownBy(() -> 
replicaManager.getReplicaOrException(tableBucket1))
+                .isInstanceOf(NotLeaderOrFollowerException.class);
+        // Online the replica
+        makeLogTableAsLeader(tableBucket1, false);
+        // Now it should return an online replica
+        
assertThat(replicaManager.getReplicaOrException(tableBucket1).getTableBucket())
+                .isEqualTo(tableBucket1);
+
+        // 4. Test really non-exist replica
+        assertThatThrownBy(
+                        () ->
+                                replicaManager.getReplicaOrException(
+                                        new TableBucket(DATA1_TABLE_ID, 
Integer.MAX_VALUE)))
+                .isInstanceOf(UnknownTableOrBucketException.class);
+    }
+
     private void assertReplicaEpochEquals(
             Replica replica, boolean isLeader, int leaderEpoch, int 
bucketEpoch) {
         assertThat(replica.isLeader()).isEqualTo(isLeader);

Reply via email to