This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch ci-add-column
in repository https://gitbox.apache.org/repos/asf/fluss.git

commit 57166d866e427c79a4e133095442fb261e6fbdd1
Author: Jark Wu <[email protected]>
AuthorDate: Mon Dec 1 19:19:48 2025 +0800

    ServerSchemaCache TablePath -> tableId
---
 .../org/apache/fluss/server/log/FetchParams.java   |  2 +-
 .../java/org/apache/fluss/server/log/LocalLog.java |  2 -
 .../server/log/remote/FsRemoteLogOutputStream.java |  2 +-
 .../fluss/server/log/remote/LogSegmentFiles.java   |  2 +-
 .../fluss/server/log/remote/LogTieringTask.java    |  4 +-
 .../fluss/server/metadata/ServerSchemaCache.java   | 74 ++++++++++++----------
 .../server/metadata/TabletServerMetadataCache.java | 10 +--
 .../org/apache/fluss/server/replica/Replica.java   |  1 +
 .../server/metadata/ServerSchemaCacheTest.java     | 46 ++++++--------
 .../fluss/server/replica/ReplicaManagerTest.java   | 16 +++--
 .../apache/fluss/server/replica/ReplicaTest.java   |  6 +-
 .../server/testutils/FlussClusterExtension.java    | 12 ++--
 12 files changed, 89 insertions(+), 88 deletions(-)

diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/log/FetchParams.java 
b/fluss-server/src/main/java/org/apache/fluss/server/log/FetchParams.java
index b08ad67f8..0ec42a090 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/log/FetchParams.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/log/FetchParams.java
@@ -67,7 +67,7 @@ public final class FetchParams {
 
     private final int minFetchBytes;
     private final long maxWaitMs;
-    // TODO:add more params like epoch etc.
+    // TODO: add more params like epoch etc.
 
     public FetchParams(int replicaId, int maxFetchBytes) {
         this(replicaId, true, maxFetchBytes, DEFAULT_MIN_FETCH_BYTES, 
DEFAULT_MAX_WAIT_MS);
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/log/LocalLog.java 
b/fluss-server/src/main/java/org/apache/fluss/server/log/LocalLog.java
index 086e23b0e..130b3855d 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/log/LocalLog.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/log/LocalLog.java
@@ -405,8 +405,6 @@ public final class LocalLog {
                         (maxOffsetMetadata.getSegmentBaseOffset() == 
segment.getBaseOffset())
                                 ? 
maxOffsetMetadata.getRelativePositionInSegment()
                                 : segment.getSizeInBytes();
-
-                // segment需要查询
                 fetchDataInfo =
                         segment.read(readOffset, maxLength, maxPosition, 
minOneMessage, projection);
                 if (fetchDataInfo == null) {
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/FsRemoteLogOutputStream.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/FsRemoteLogOutputStream.java
index 05233e0ff..835a94ff6 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/FsRemoteLogOutputStream.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/FsRemoteLogOutputStream.java
@@ -203,7 +203,7 @@ public class FsRemoteLogOutputStream extends 
FSDataOutputStream {
         Exception latestException = null;
         for (int attempt = 0; attempt < 10; attempt++) {
             try {
-                // todo: mayadd entropy injection?
+                // todo: may add entropy injection?
                 this.outStream = fs.create(remoteLogFilePath, 
FileSystem.WriteMode.NO_OVERWRITE);
                 return;
             } catch (Exception e) {
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/LogSegmentFiles.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/LogSegmentFiles.java
index 86a6bbf9e..73aee521e 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/LogSegmentFiles.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/LogSegmentFiles.java
@@ -40,7 +40,7 @@ public class LogSegmentFiles {
     private final Path offsetIndex;
     private final Path timeIndex;
     private final @Nullable Path writerIdIndex;
-    // TODOadd leader epoch index after introduce leader epoch.
+    // TODO add leader epoch index after introduce leader epoch.
 
     public LogSegmentFiles(
             Path logSegment, Path offsetIndex, Path timeIndex, @Nullable Path 
writerIdIndex) {
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/LogTieringTask.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/LogTieringTask.java
index 0b425b8fc..cdde3842d 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/LogTieringTask.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/LogTieringTask.java
@@ -227,7 +227,7 @@ public class LogTieringTask implements Runnable {
     }
 
     /**
-     * Copy the given log segments to remote andadd the successfully copied 
segment to the {@code
+     * Copy the given log segments to remote and add the successfully copied 
segment to the {@code
      * copiedSegments} parameter.
      *
      * @return the end offset of the last segment copied to remote.
@@ -347,7 +347,7 @@ public class LogTieringTask implements Runnable {
                     // the commit failed, it means the commit snapshot is 
invalid or register zk
                     // failed, we will revert this commit and delete the 
remote log manifest
                     // file.
-                    // TODO:add the fail reason in the future.
+                    // TODO: add the fail reason in the future.
                     LOG.error(
                             "Commit remote log manifest failed for table 
bucket {}. We will delete the"
                                     + " written remote log manifest file",
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/metadata/ServerSchemaCache.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/metadata/ServerSchemaCache.java
index 0f98b092d..374e20501 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/metadata/ServerSchemaCache.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/metadata/ServerSchemaCache.java
@@ -29,6 +29,8 @@ import 
org.apache.fluss.shaded.guava32.com.google.common.cache.CacheBuilder;
 import 
org.apache.fluss.shaded.guava32.com.google.common.util.concurrent.UncheckedExecutionException;
 import org.apache.fluss.utils.MapUtils;
 
+import javax.annotation.concurrent.ThreadSafe;
+
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
@@ -41,74 +43,77 @@ import java.util.concurrent.ExecutionException;
  * 1. latest schema of each subscribed table, updated by UpdateMetadata 
request. 2. history schemas
  * of each table, updated by lookup from tablet server.
  */
+@ThreadSafe
 public class ServerSchemaCache {
 
     private final MetadataManager metadataManager;
-    private final Map<TablePath, SchemaInfo> latestSchemaByTablePath;
-    private final Map<TablePath, Integer> subscriberCounters;
+    private final Map<Long, SchemaInfo> latestSchemaByTableId;
+    // table_id -> subscriber count
+    private final Map<Long, Integer> subscriberCounters;
     private final Cache<TableSchemaKey, Schema> schemaCache;
 
     public ServerSchemaCache(MetadataManager metadataManager) {
         this.metadataManager = metadataManager;
         // thread safe is guaranteed by subscriberCounters.
         this.subscriberCounters = MapUtils.newConcurrentHashMap();
-        this.latestSchemaByTablePath = MapUtils.newConcurrentHashMap();
+        this.latestSchemaByTableId = MapUtils.newConcurrentHashMap();
         this.schemaCache = CacheBuilder.newBuilder().maximumSize(100).build();
     }
 
     public SchemaGetter subscribeWithInitialSchema(
-            TablePath tablePath, int initialSchemaId, Schema initialSchema) {
+            long tableId, TablePath tablePath, int initialSchemaId, Schema 
initialSchema) {
         subscriberCounters.compute(
-                tablePath,
+                tableId,
                 (key, oldValue) -> {
-                    updateSchema(tablePath, initialSchemaId, initialSchema);
+                    updateSchema(tableId, initialSchemaId, initialSchema);
                     if (oldValue == null) {
                         return 1;
                     } else {
                         return oldValue + 1;
                     }
                 });
-        return new SchemaMetadataSubscriberImpl(tablePath);
+        return new SchemaMetadataSubscriberImpl(tableId, tablePath);
     }
 
-    public void updateLatestSchema(TablePath tablePath, short schemaId, Schema 
schema) {
+    public void updateLatestSchema(long tableId, short schemaId, Schema 
schema) {
         // only update if tablePath is subscribed.
         subscriberCounters.computeIfPresent(
-                tablePath,
+                tableId,
                 (key, oldValue) -> {
-                    updateSchema(tablePath, schemaId, schema);
+                    updateSchema(tableId, schemaId, schema);
                     return oldValue;
                 });
     }
 
-    private void unsubscribe(TablePath tablePath) {
+    private void unsubscribe(long tableId) {
         subscriberCounters.compute(
-                tablePath,
+                tableId,
                 (key, oldValue) -> {
                     if (oldValue != null && oldValue > 1) {
                         return oldValue - 1;
                     }
-                    latestSchemaByTablePath.remove(tablePath);
+                    latestSchemaByTableId.remove(tableId);
                     return null;
                 });
     }
 
-    private Schema getFlussSchema(TablePath tablePath, int schemaId) throws 
ExecutionException {
+    private Schema getFlussSchema(long tableId, TablePath tablePath, int 
schemaId)
+            throws ExecutionException {
         return schemaCache.get(
-                new TableSchemaKey(tablePath, schemaId),
+                new TableSchemaKey(tableId, schemaId),
                 () -> {
-                    SchemaInfo schemaInfo = 
latestSchemaByTablePath.get(tablePath);
+                    SchemaInfo schemaInfo = latestSchemaByTableId.get(tableId);
                     if (schemaInfo != null && schemaInfo.getSchemaId() == 
schemaId) {
-                        return 
latestSchemaByTablePath.get(tablePath).getSchema();
+                        return latestSchemaByTableId.get(tableId).getSchema();
                     } else {
                         return metadataManager.getSchemaById(tablePath, 
schemaId).getSchema();
                     }
                 });
     }
 
-    private void updateSchema(TablePath tablePath, int schemaId, Schema 
schema) {
-        latestSchemaByTablePath.compute(
-                tablePath,
+    private void updateSchema(long tableId, int schemaId, Schema schema) {
+        latestSchemaByTableId.compute(
+                tableId,
                 (key, oldValue) -> {
                     if (oldValue == null || oldValue.getSchemaId() < schemaId) 
{
                         return new SchemaInfo(schema, schemaId);
@@ -117,25 +122,25 @@ public class ServerSchemaCache {
                     }
                 });
 
-        schemaCache.put(new TableSchemaKey(tablePath, schemaId), schema);
+        schemaCache.put(new TableSchemaKey(tableId, schemaId), schema);
     }
 
     @VisibleForTesting
-    public Map<TablePath, SchemaInfo> getLatestSchemaByTablePath() {
-        return latestSchemaByTablePath;
+    public Map<Long, SchemaInfo> getLatestSchemaByTableId() {
+        return latestSchemaByTableId;
     }
 
     @VisibleForTesting
-    Map<TablePath, Integer> getSubscriberCounters() {
+    Map<Long, Integer> getSubscriberCounters() {
         return subscriberCounters;
     }
 
     private static class TableSchemaKey {
-        private final TablePath tablePath;
+        private final long tableId;
         private final int schemaId;
 
-        TableSchemaKey(TablePath tablePath, int schemaId) {
-            this.tablePath = tablePath;
+        TableSchemaKey(long tableId, int schemaId) {
+            this.tableId = tableId;
             this.schemaId = schemaId;
         }
 
@@ -145,34 +150,35 @@ public class ServerSchemaCache {
                 return false;
             }
             TableSchemaKey that = (TableSchemaKey) o;
-            return schemaId == that.schemaId && Objects.equals(tablePath, 
that.tablePath);
+            return schemaId == that.schemaId && Objects.equals(tableId, 
that.tableId);
         }
 
         @Override
         public int hashCode() {
-            return Objects.hash(tablePath, schemaId);
+            return Objects.hash(tableId, schemaId);
         }
     }
 
     private class SchemaMetadataSubscriberImpl implements SchemaGetter {
+        private final long tableId;
         private final TablePath tablePath;
         private volatile boolean released;
 
-        public SchemaMetadataSubscriberImpl(TablePath tablePath) {
+        public SchemaMetadataSubscriberImpl(long tableId, TablePath tablePath) 
{
+            this.tableId = tableId;
             this.tablePath = tablePath;
             this.released = false;
         }
 
         @Override
         public SchemaInfo getLatestSchemaInfo() {
-            return latestSchemaByTablePath.get(tablePath);
+            return latestSchemaByTableId.get(tableId);
         }
 
         @Override
         public Schema getSchema(int schemaId) {
-            // todo:  support with get table.
             try {
-                return getFlussSchema(tablePath, schemaId);
+                return getFlussSchema(tableId, tablePath, schemaId);
             } catch (ExecutionException | UncheckedExecutionException 
executionException) {
                 Throwable cause = executionException.getCause();
                 if (cause instanceof SchemaNotExistException) {
@@ -192,7 +198,7 @@ public class ServerSchemaCache {
         @Override
         public void release() {
             if (!released) {
-                unsubscribe(tablePath);
+                unsubscribe(tableId);
                 released = true;
             }
         }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/metadata/TabletServerMetadataCache.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/metadata/TabletServerMetadataCache.java
index 9a73975ce..666a74aae 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/metadata/TabletServerMetadataCache.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/metadata/TabletServerMetadataCache.java
@@ -141,14 +141,14 @@ public class TabletServerMetadataCache implements 
ServerMetadataCache {
     }
 
     public SchemaGetter subscribeWithInitialSchema(
-            TablePath tablePath, int initialSchemaId, Schema initialSchema) {
+            TablePath tablePath, long tableId, int initialSchemaId, Schema 
initialSchema) {
         return serverSchemaCache.subscribeWithInitialSchema(
-                tablePath, initialSchemaId, initialSchema);
+                tableId, tablePath, initialSchemaId, initialSchema);
     }
 
-    public void updateLatestSchema(TablePath tablePath, SchemaInfo schemaInfo) 
{
+    public void updateLatestSchema(long tableId, SchemaInfo schemaInfo) {
         serverSchemaCache.updateLatestSchema(
-                tablePath, (short) schemaInfo.getSchemaId(), 
schemaInfo.getSchema());
+                tableId, (short) schemaInfo.getSchemaId(), 
schemaInfo.getSchema());
     }
 
     public Optional<PartitionMetadata> getPartitionMetadata(PhysicalTablePath 
partitionPath) {
@@ -207,7 +207,7 @@ public class TabletServerMetadataCache implements 
ServerMetadataCache {
                         // todo: apply schema id and schema info if needs
                         int schemaId = tableInfo.getSchemaId();
                         Schema schema = tableInfo.getSchema();
-                        serverSchemaCache.updateLatestSchema(tablePath, 
(short) schemaId, schema);
+                        serverSchemaCache.updateLatestSchema(tableId, (short) 
schemaId, schema);
 
                         if (tableId == DELETED_TABLE_ID) {
                             Long removedTableId = 
tableIdByPath.remove(tablePath);
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java 
b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
index ad2dcb24d..c1933bd5c 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
@@ -237,6 +237,7 @@ public final class Replica {
         this.schemaGetter =
                 metadataCache.subscribeWithInitialSchema(
                         physicalPath.getTablePath(),
+                        tableInfo.getTableId(),
                         tableInfo.getSchemaId(),
                         tableInfo.getSchema());
         this.tableConfig = tableInfo.getTableConfig();
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/metadata/ServerSchemaCacheTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/metadata/ServerSchemaCacheTest.java
index f72277ba8..c42c7b296 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/metadata/ServerSchemaCacheTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/metadata/ServerSchemaCacheTest.java
@@ -48,36 +48,33 @@ import static 
org.assertj.core.api.Assertions.assertThatThrownBy;
 public class ServerSchemaCacheTest {
 
     @Test
-    void testPublishAndsubscribeSchemaChange() {
+    void testPublishAndSubscribeSchemaChange() {
         ServerSchemaCache manager =
                 new ServerSchemaCache(new 
TestingMetadataManager(Collections.emptyList()));
 
         SchemaGetter subscriber1 =
                 manager.subscribeWithInitialSchema(
-                        new TablePath("test_tb", "test_table_1"), (short) 1, 
DATA1_SCHEMA);
+                        1L, new TablePath("test_tb", "test_table_1"), (short) 
1, DATA1_SCHEMA);
         SchemaGetter subscriber2 =
                 manager.subscribeWithInitialSchema(
-                        new TablePath("test_tb", "test_table_2"), (short) 1, 
DATA1_SCHEMA);
+                        2L, new TablePath("test_tb", "test_table_2"), (short) 
1, DATA1_SCHEMA);
         assertThat(subscriber1.getLatestSchemaInfo()).isEqualTo(new 
SchemaInfo(DATA1_SCHEMA, 1));
         assertThat(subscriber1.getLatestSchemaInfo()).isEqualTo(new 
SchemaInfo(DATA1_SCHEMA, 1));
 
-        manager.updateLatestSchema(
-                new TablePath("test_tb", "test_table_2"), (short) 2, 
DATA2_SCHEMA);
+        manager.updateLatestSchema(2L, (short) 2, DATA2_SCHEMA);
         assertThat(subscriber1.getLatestSchemaInfo()).isNotNull();
         assertThat(subscriber1.getLatestSchemaInfo()).isEqualTo(new 
SchemaInfo(DATA1_SCHEMA, 1));
         assertThat(subscriber2.getLatestSchemaInfo()).isNotNull();
         assertThat(subscriber2.getLatestSchemaInfo()).isEqualTo(new 
SchemaInfo(DATA2_SCHEMA, 2));
 
-        manager.updateLatestSchema(
-                new TablePath("test_tb", "test_table_1"), (short) 2, 
DATA2_SCHEMA);
+        manager.updateLatestSchema(1L, (short) 2, DATA2_SCHEMA);
         assertThat(subscriber1.getLatestSchemaInfo()).isNotNull();
         assertThat(subscriber1.getLatestSchemaInfo()).isEqualTo(new 
SchemaInfo(DATA2_SCHEMA, 2));
         assertThat(subscriber2.getLatestSchemaInfo()).isNotNull();
         assertThat(subscriber2.getLatestSchemaInfo()).isEqualTo(new 
SchemaInfo(DATA2_SCHEMA, 2));
 
         // one more redundant schema change.
-        manager.updateLatestSchema(
-                new TablePath("test_tb", "test_table_1"), (short) 1, 
DATA2_SCHEMA);
+        manager.updateLatestSchema(1L, (short) 1, DATA2_SCHEMA);
         assertThat(subscriber1.getLatestSchemaInfo()).isNotNull();
         assertThat(subscriber1.getLatestSchemaInfo()).isEqualTo(new 
SchemaInfo(DATA2_SCHEMA, 2));
         assertThat(subscriber2.getLatestSchemaInfo()).isNotNull();
@@ -98,7 +95,8 @@ public class ServerSchemaCacheTest {
                 new ServerSchemaCache(
                         new 
TestingMetadataManager(Arrays.asList(DATA1_TABLE_INFO, tableInfo2)));
         SchemaGetter schemaGetter =
-                manager.subscribeWithInitialSchema(DATA1_TABLE_PATH, (short) 
2, DATA2_SCHEMA);
+                manager.subscribeWithInitialSchema(
+                        DATA1_TABLE_ID, DATA1_TABLE_PATH, (short) 2, 
DATA2_SCHEMA);
         assertThat(schemaGetter.getSchema(2)).isEqualTo(DATA2_SCHEMA);
         assertThat(schemaGetter.getSchema(1)).isEqualTo(DATA1_SCHEMA);
         assertThatThrownBy(() -> schemaGetter.getSchema(3))
@@ -111,39 +109,35 @@ public class ServerSchemaCacheTest {
                 new ServerSchemaCache(new 
TestingMetadataManager(Collections.emptyList()));
         SchemaGetter subscriber1 =
                 manager.subscribeWithInitialSchema(
-                        new TablePath("test_tb", "test_table_1"), (short) 1, 
DATA1_SCHEMA);
+                        1L, new TablePath("test_tb", "test_table_1"), (short) 
1, DATA1_SCHEMA);
         SchemaGetter subscriber2 =
                 manager.subscribeWithInitialSchema(
-                        new TablePath("test_tb", "test_table_1"), (short) 1, 
DATA1_SCHEMA);
+                        1L, new TablePath("test_tb", "test_table_1"), (short) 
1, DATA1_SCHEMA);
         SchemaGetter subscriber3 =
                 manager.subscribeWithInitialSchema(
-                        new TablePath("test_tb", "test_table_2"), (short) 1, 
DATA1_SCHEMA);
-        assertThat(manager.getLatestSchemaByTablePath()).hasSize(2);
+                        2L, new TablePath("test_tb", "test_table_2"), (short) 
1, DATA1_SCHEMA);
+        assertThat(manager.getLatestSchemaByTableId()).hasSize(2);
         assertThat(manager.getSubscriberCounters()).hasSize(2);
-        assertThat(manager.getSubscriberCounters().get(new 
TablePath("test_tb", "test_table_1")))
-                .isEqualTo(2);
+        assertThat(manager.getSubscriberCounters().get(1L)).isEqualTo(2);
 
         subscriber1.release();
-        assertThat(manager.getLatestSchemaByTablePath()).hasSize(2);
+        assertThat(manager.getLatestSchemaByTableId()).hasSize(2);
         assertThat(manager.getSubscriberCounters()).hasSize(2);
-        assertThat(manager.getSubscriberCounters().get(new 
TablePath("test_tb", "test_table_1")))
-                .isEqualTo(1);
+        assertThat(manager.getSubscriberCounters().get(1L)).isEqualTo(1);
 
         // one more redundant unsubscribe.
         subscriber1.release();
-        assertThat(manager.getLatestSchemaByTablePath()).hasSize(2);
+        assertThat(manager.getLatestSchemaByTableId()).hasSize(2);
         assertThat(manager.getSubscriberCounters()).hasSize(2);
-        assertThat(manager.getSubscriberCounters().get(new 
TablePath("test_tb", "test_table_1")))
-                .isEqualTo(1);
+        assertThat(manager.getSubscriberCounters().get(1L)).isEqualTo(1);
 
         subscriber2.release();
-        assertThat(manager.getLatestSchemaByTablePath()).hasSize(1);
+        assertThat(manager.getLatestSchemaByTableId()).hasSize(1);
         assertThat(manager.getSubscriberCounters()).hasSize(1);
-        assertThat(manager.getSubscriberCounters().get(new 
TablePath("test_tb", "test_table_1")))
-                .isNull();
+        assertThat(manager.getSubscriberCounters().get(1L)).isNull();
 
         subscriber3.release();
-        assertThat(manager.getLatestSchemaByTablePath()).isEmpty();
+        assertThat(manager.getLatestSchemaByTableId()).isEmpty();
         assertThat(manager.getSubscriberCounters()).isEmpty();
         subscriber3.release();
     }
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java
index 92c24f871..a669af2c1 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java
@@ -193,7 +193,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
     void testFetchLog() throws Exception {
         SchemaGetter schemaGetter =
                 serverMetadataCache.subscribeWithInitialSchema(
-                        DATA1_TABLE_PATH, DEFAULT_SCHEMA_ID, DATA1_SCHEMA);
+                        DATA1_TABLE_PATH, DATA1_TABLE_ID, DEFAULT_SCHEMA_ID, 
DATA1_SCHEMA);
         TableBucket tb = new TableBucket(DATA1_TABLE_ID, 1);
         makeLogTableAsLeader(tb.getBucket());
 
@@ -309,7 +309,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
     void testFetchLogWithMaxBytesLimit() throws Exception {
         SchemaGetter schemaGetter =
                 serverMetadataCache.subscribeWithInitialSchema(
-                        DATA1_TABLE_PATH, DEFAULT_SCHEMA_ID, DATA1_SCHEMA);
+                        DATA1_TABLE_PATH, DATA1_TABLE_ID, DEFAULT_SCHEMA_ID, 
DATA1_SCHEMA);
         TableBucket tb = new TableBucket(DATA1_TABLE_ID, 1);
         makeLogTableAsLeader(tb.getBucket());
 
@@ -432,7 +432,8 @@ class ReplicaManagerTest extends ReplicaTestBase {
         assertThat(records1).isNotNull();
         assertThat(records2).isNotNull();
         SchemaGetter schemaGetter =
-                
serverMetadataCache.subscribeWithInitialSchema(DATA1_TABLE_PATH, 1, 
DATA1_SCHEMA);
+                serverMetadataCache.subscribeWithInitialSchema(
+                        DATA1_TABLE_PATH, DATA1_TABLE_ID, 1, DATA1_SCHEMA);
         if (records1.sizeInBytes() == 0) {
             assertThat(records2.sizeInBytes() > 0).isTrue();
             assertMemoryRecordsEquals(
@@ -498,7 +499,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
         makeKvTableAsLeader(DATA1_TABLE_ID_PK, DATA1_TABLE_PATH_PK, 
tb.getBucket());
         SchemaGetter schemaGetter =
                 serverMetadataCache.subscribeWithInitialSchema(
-                        DATA1_TABLE_PATH_PK, 1, DATA1_SCHEMA_PK);
+                        DATA1_TABLE_PATH_PK, DATA1_TABLE_ID_PK, 1, 
DATA1_SCHEMA_PK);
 
         // 1. put kv records to kv store.
         List<Tuple2<Object[], Object[]>> data1 =
@@ -628,7 +629,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
         makeKvTableAsLeader(DATA1_TABLE_ID_PK, DATA1_TABLE_PATH_PK, 
tb.getBucket());
         SchemaGetter schemaGetter =
                 serverMetadataCache.subscribeWithInitialSchema(
-                        DATA1_TABLE_PATH_PK, 1, DATA1_SCHEMA_PK);
+                        DATA1_TABLE_PATH_PK, DATA1_TABLE_ID_PK, 1, 
DATA1_SCHEMA_PK);
 
         // put 10 batches delete non-exists key batch to kv store.
         CompletableFuture<List<PutKvResultForBucket>> future;
@@ -906,7 +907,8 @@ class ReplicaManagerTest extends ReplicaTestBase {
         CompletableFuture<LimitScanResultForBucket> limitFuture = new 
CompletableFuture<>();
         replicaManager.limitScan(tb, 10, limitFuture::complete);
         SchemaGetter schemaGetter =
-                
serverMetadataCache.subscribeWithInitialSchema(DATA1_TABLE_PATH, 1, 
DATA1_SCHEMA);
+                serverMetadataCache.subscribeWithInitialSchema(
+                        DATA1_TABLE_PATH, DATA1_TABLE_ID, 1, DATA1_SCHEMA);
         assertMemoryRecordsEquals(
                 DATA1_ROW_TYPE,
                 schemaGetter,
@@ -1463,7 +1465,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
             FetchLogResultForBucket r = r1.getValue();
             SchemaGetter schemaGetter =
                     serverMetadataCache.subscribeWithInitialSchema(
-                            DATA1_TABLE_PATH, 1, DATA1_SCHEMA);
+                            DATA1_TABLE_PATH, DATA1_TABLE_ID, 1, DATA1_SCHEMA);
             assertLogRecordsEqualsWithRowKind(
                     DEFAULT_SCHEMA_ID,
                     DATA1_ROW_TYPE,
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java
index fcb4ec88c..a18d76442 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java
@@ -153,7 +153,7 @@ final class ReplicaTest extends ReplicaTestBase {
 
         // schema evolution.
         serverMetadataCache.updateLatestSchema(
-                DATA1_TABLE_PATH, new SchemaInfo(DATA2_SCHEMA, (short) 2));
+                DATA1_TABLE_ID, new SchemaInfo(DATA2_SCHEMA, (short) 2));
         mr =
                 createRecordsWithoutBaseLogOffset(
                         DATA2_ROW_TYPE,
@@ -402,7 +402,7 @@ final class ReplicaTest extends ReplicaTestBase {
         currentOffset += 2;
         short newSchemaId = 2;
         serverMetadataCache.updateLatestSchema(
-                DATA1_TABLE_PATH_PK, new SchemaInfo(DATA2_SCHEMA, 
newSchemaId));
+                DATA1_TABLE_ID, new SchemaInfo(DATA2_SCHEMA, newSchemaId));
         KvRecordTestUtils.KvRecordBatchFactory kvRecordBatchFactory2 =
                 KvRecordTestUtils.KvRecordBatchFactory.of(newSchemaId);
         KvRecordTestUtils.KvRecordFactory kvRecordFactory2 =
@@ -727,7 +727,7 @@ final class ReplicaTest extends ReplicaTestBase {
         // update schema.
         zkClient.registerSchema(DATA1_TABLE_PATH_PK, DATA2_SCHEMA, 
newSchemaId);
         serverMetadataCache.updateLatestSchema(
-                DATA1_TABLE_PATH_PK, new SchemaInfo(DATA2_SCHEMA, 
newSchemaId));
+                DATA1_TABLE_ID, new SchemaInfo(DATA2_SCHEMA, newSchemaId));
         // write data with new schema
         putRecordsToLeader(
                 kvReplica,
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java
index ef1cb92ba..8cd737e51 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java
@@ -670,9 +670,9 @@ public final class FlussClusterExtension
                 () -> {
                     TableRegistration tableRegistration = 
zkClient.getTable(tablePath).get();
                     int bucketCount = tableRegistration.bucketCount;
+                    long tableId = tableRegistration.tableId;
                     for (int bucketId = 0; bucketId < bucketCount; bucketId++) 
{
-                        TableBucket tableBucket =
-                                new TableBucket(tableRegistration.tableId, 
bucketId);
+                        TableBucket tableBucket = new TableBucket(tableId, 
bucketId);
                         Optional<LeaderAndIsr> leaderAndIsrOpt =
                                 zkClient.getLeaderAndIsr(tableBucket);
                         assertThat(leaderAndIsrOpt).isPresent();
@@ -680,10 +680,10 @@ public final class FlussClusterExtension
                         TabletServer tabletServer = 
getTabletServerById(leader);
                         ServerSchemaCache serverSchemaCache =
                                 
tabletServer.getMetadataCache().getSchemaMetadataManager();
-                        Map<TablePath, SchemaInfo> latestSchemaByTablePath =
-                                serverSchemaCache.getLatestSchemaByTablePath();
-                        
assertThat(latestSchemaByTablePath).containsKey(tablePath);
-                        
assertThat(latestSchemaByTablePath.get(tablePath).getSchemaId())
+                        Map<Long, SchemaInfo> latestSchemaByTablePath =
+                                serverSchemaCache.getLatestSchemaByTableId();
+                        
assertThat(latestSchemaByTablePath).containsKey(tableId);
+                        
assertThat(latestSchemaByTablePath.get(tableId).getSchemaId())
                                 .isEqualTo(schemaId);
                     }
                 });

Reply via email to