This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new eb19eeaf0 [server] Enhance ReplicaManager#getReplicaOrException to
return more accurate error messages (#1573)
eb19eeaf0 is described below
commit eb19eeaf018bfd1e8084e46c209ce64d91a7365a
Author: Liebing <[email protected]>
AuthorDate: Tue Aug 26 23:06:41 2025 +0800
[server] Enhance ReplicaManager#getReplicaOrException to return more
accurate error messages (#1573)
---
.../server/metadata/ServerMetadataSnapshot.java | 15 +++++
.../server/metadata/TabletServerMetadataCache.java | 5 ++
.../fluss/server/replica/ReplicaManager.java | 6 +-
.../metadata/TabletServerMetadataCacheTest.java | 57 ++++++++++++++++
.../fluss/server/replica/ReplicaManagerTest.java | 75 ++++++++++++++++++++++
5 files changed, 157 insertions(+), 1 deletion(-)
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/metadata/ServerMetadataSnapshot.java
b/fluss-server/src/main/java/org/apache/fluss/server/metadata/ServerMetadataSnapshot.java
index 11d45873e..a3faeb90e 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/metadata/ServerMetadataSnapshot.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/metadata/ServerMetadataSnapshot.java
@@ -22,6 +22,7 @@ import org.apache.fluss.cluster.Cluster;
import org.apache.fluss.cluster.ServerNode;
import org.apache.fluss.cluster.TabletServerInfo;
import org.apache.fluss.metadata.PhysicalTablePath;
+import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TablePath;
import javax.annotation.Nullable;
@@ -179,4 +180,18 @@ public class ServerMetadataSnapshot {
public Map<Integer, ServerInfo> getAliveTabletServers() {
return aliveTabletServers;
}
+
+ public boolean contains(TableBucket tableBucket) {
+ if (tableBucket.getPartitionId() == null) {
+ return
bucketMetadataMapForTables.containsKey(tableBucket.getTableId())
+ && bucketMetadataMapForTables
+ .get(tableBucket.getTableId())
+ .containsKey(tableBucket.getBucket());
+ } else {
+ return
bucketMetadataMapForPartitions.containsKey(tableBucket.getPartitionId())
+ && bucketMetadataMapForPartitions
+ .get(tableBucket.getPartitionId())
+ .containsKey(tableBucket.getBucket());
+ }
+ }
}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/metadata/TabletServerMetadataCache.java
b/fluss-server/src/main/java/org/apache/fluss/server/metadata/TabletServerMetadataCache.java
index 7a9bfda59..ff6855e13 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/metadata/TabletServerMetadataCache.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/metadata/TabletServerMetadataCache.java
@@ -21,6 +21,7 @@ import org.apache.fluss.annotation.VisibleForTesting;
import org.apache.fluss.cluster.ServerNode;
import org.apache.fluss.cluster.TabletServerInfo;
import org.apache.fluss.metadata.PhysicalTablePath;
+import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.server.coordinator.MetadataManager;
@@ -160,6 +161,10 @@ public class TabletServerMetadataCache implements
ServerMetadataCache {
}
}
+ public boolean contains(TableBucket tableBucket) {
+ return serverMetadataSnapshot.contains(tableBucket);
+ }
+
public void updateClusterMetadata(ClusterMetadata clusterMetadata) {
inLock(
metadataLock,
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
index 3f901ec5c..e85985767 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
@@ -1499,8 +1499,12 @@ public class ReplicaManager {
HostedReplica replica = getReplica(tableBucket);
if (replica instanceof OnlineReplica) {
return ((OnlineReplica) replica).getReplica();
+ } else if (replica instanceof OfflineReplica) {
+ throw new StorageException(tableBucket + " is offline on server "
+ serverId);
+ } else if ((replica instanceof NoneReplica) &&
metadataCache.contains(tableBucket)) {
+ throw new NotLeaderOrFollowerException(
+ "server " + serverId + " is not leader or follower for " +
tableBucket);
} else {
- // TODO add metadata cache to judge.
throw new UnknownTableOrBucketException("Unknown table or bucket:
" + tableBucket);
}
}
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/metadata/TabletServerMetadataCacheTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/metadata/TabletServerMetadataCacheTest.java
index 334949897..90f012b83 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/metadata/TabletServerMetadataCacheTest.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/metadata/TabletServerMetadataCacheTest.java
@@ -23,6 +23,7 @@ import org.apache.fluss.cluster.TabletServerInfo;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.exception.TableNotExistException;
import org.apache.fluss.metadata.PhysicalTablePath;
+import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.server.coordinator.MetadataManager;
@@ -244,6 +245,62 @@ public class TabletServerMetadataCacheTest {
initialBucketMetadata);
}
+ @Test
+ void testContainsTableBucket() {
+ tableMetadataList =
+ Collections.singletonList(
+ new TableMetadata(DATA1_TABLE_INFO,
initialBucketMetadata));
+ partitionMetadataList =
+ Collections.singletonList(
+ new PartitionMetadata(
+ partitionTableId,
+ partitionName1,
+ partitionId1,
+ initialBucketMetadata));
+ serverMetadataCache.updateClusterMetadata(
+ new ClusterMetadata(
+ coordinatorServer,
+ aliveTableServers,
+ tableMetadataList,
+ partitionMetadataList));
+
+ assertThat(
+ serverMetadataCache.contains(
+ new TableBucket(
+ DATA1_TABLE_INFO.getTableId(),
+
initialBucketMetadata.get(0).getBucketId())))
+ .isTrue();
+ assertThat(
+ serverMetadataCache.contains(
+ new TableBucket(
+ DATA1_TABLE_INFO.getTableId(),
+ 1L,
+
initialBucketMetadata.get(0).getBucketId())))
+ .isFalse();
+ assertThat(
+ serverMetadataCache.contains(
+ new TableBucket(DATA1_TABLE_INFO.getTableId(),
Integer.MAX_VALUE)))
+ .isFalse();
+
+ assertThat(
+ serverMetadataCache.contains(
+ new TableBucket(
+ partitionTableId,
+ partitionId1,
+
initialBucketMetadata.get(0).getBucketId())))
+ .isTrue();
+ assertThat(
+ serverMetadataCache.contains(
+ new TableBucket(
+ partitionTableId,
+
initialBucketMetadata.get(0).getBucketId())))
+ .isFalse();
+ assertThat(
+ serverMetadataCache.contains(
+ new TableBucket(partitionTableId,
partitionId1, Integer.MAX_VALUE)))
+ .isFalse();
+ }
+
private void assertTableMetadataEquals(
long tableId,
TableInfo expectedTableInfo,
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java
index f4d8ba5c1..0d90a9a25 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java
@@ -22,8 +22,10 @@ import org.apache.fluss.cluster.ServerNode;
import org.apache.fluss.cluster.ServerType;
import org.apache.fluss.exception.InvalidCoordinatorException;
import org.apache.fluss.exception.InvalidRequiredAcksException;
+import org.apache.fluss.exception.NotLeaderOrFollowerException;
import org.apache.fluss.exception.PartitionNotExistException;
import org.apache.fluss.exception.TableNotExistException;
+import org.apache.fluss.exception.UnknownTableOrBucketException;
import org.apache.fluss.metadata.PhysicalTablePath;
import org.apache.fluss.metadata.Schema;
import org.apache.fluss.metadata.TableBucket;
@@ -58,6 +60,7 @@ import org.apache.fluss.server.kv.rocksdb.RocksDBKv;
import org.apache.fluss.server.kv.snapshot.CompletedSnapshot;
import org.apache.fluss.server.log.FetchParams;
import org.apache.fluss.server.log.ListOffsetsParam;
+import org.apache.fluss.server.metadata.BucketMetadata;
import org.apache.fluss.server.metadata.ClusterMetadata;
import org.apache.fluss.server.metadata.PartitionMetadata;
import org.apache.fluss.server.metadata.ServerInfo;
@@ -1637,6 +1640,78 @@ class ReplicaManagerTest extends ReplicaTestBase {
+ "The latest known coordinator epoch is 2");
}
+ @Test
+ void testGetReplicaOrException() {
+ // 1. Test online replica
+ TableBucket tb = new TableBucket(DATA1_TABLE_ID, 1);
+ makeLogTableAsLeader(tb.getBucket());
+
assertThat(replicaManager.getReplicaOrException(tb).getTableBucket()).isEqualTo(tb);
+
+ // 2. Test offline replica
+ // TODO: Add test for offline replica
+
+ // 3. Test not leader or follower replica
+ Set<ServerInfo> tsServerInfoList =
+ new HashSet<>(
+ Arrays.asList(
+ new ServerInfo(
+ TABLET_SERVER_ID,
+ "rack0",
+
Endpoint.fromListenersString("CLIENT://localhost:90"),
+ ServerType.TABLET_SERVER),
+ new ServerInfo(
+ 2,
+ "rack1",
+
Endpoint.fromListenersString("CLIENT://localhost:91"),
+ ServerType.TABLET_SERVER),
+ new ServerInfo(
+ 3,
+ "rack2",
+
Endpoint.fromListenersString("CLIENT://localhost:92"),
+ ServerType.TABLET_SERVER)));
+ TablePath tablePath = TablePath.of("test_db_1",
"test_get_replica_or_exception");
+ long tableId = 150004L;
+ TableInfo tableInfo =
+ TableInfo.of(
+ tablePath,
+ tableId,
+ 1,
+ DATA1_TABLE_DESCRIPTOR,
+ System.currentTimeMillis(),
+ System.currentTimeMillis());
+ TableBucket tableBucket1 = new TableBucket(tableId, 1);
+ TableMetadata tableMetadata1 =
+ new TableMetadata(
+ tableInfo,
+ Collections.singletonList(
+ new BucketMetadata(
+ tableBucket1.getBucket(),
+ null,
+ null,
+ Arrays.asList(1, 2, 3))));
+ replicaManager.maybeUpdateMetadataCache(
+ 0,
+ buildClusterMetadata(
+ null,
+ tsServerInfoList,
+ Collections.singletonList(tableMetadata1),
+ Collections.emptyList()));
+ assertThatThrownBy(() ->
replicaManager.getReplicaOrException(tableBucket1))
+ .isInstanceOf(NotLeaderOrFollowerException.class);
+ // Online the replica
+ makeLogTableAsLeader(tableBucket1, false);
+ // Now it should return an online replica
+
assertThat(replicaManager.getReplicaOrException(tableBucket1).getTableBucket())
+ .isEqualTo(tableBucket1);
+
+ // 4. Test really non-exist replica
+ assertThatThrownBy(
+ () ->
+ replicaManager.getReplicaOrException(
+ new TableBucket(DATA1_TABLE_ID,
Integer.MAX_VALUE)))
+ .isInstanceOf(UnknownTableOrBucketException.class);
+ }
+
private void assertReplicaEpochEquals(
Replica replica, boolean isLeader, int leaderEpoch, int
bucketEpoch) {
assertThat(replica.isLeader()).isEqualTo(isLeader);