This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch ci-add-column
in repository https://gitbox.apache.org/repos/asf/fluss.git

commit 639c492037d6a91d214b1e145ed8574113ced652
Author: Jark Wu <[email protected]>
AuthorDate: Mon Dec 1 20:23:30 2025 +0800

    WIP
---
 .../org/apache/fluss/server/RpcServiceBase.java    |  2 +-
 .../fluss/server/coordinator/MetadataManager.java  |  4 +-
 .../server/metadata/TabletServerMetadataCache.java |  3 +-
 .../fluss/server/replica/AdjustIsrManager.java     |  2 +-
 .../org/apache/fluss/server/replica/Replica.java   |  4 +-
 .../replica/fetcher/ReplicaFetcherManager.java     |  2 +-
 .../replica/fetcher/ReplicaFetcherThread.java      |  2 +-
 .../fluss/server/utils/RpcGatewayManager.java      |  2 +-
 .../fluss/server/utils/ServerRpcMessageUtils.java  |  2 +-
 .../org/apache/fluss/server/utils/timer/Timer.java |  2 +-
 .../fluss/server/utils/timer/TimingWheel.java      |  4 +-
 .../apache/fluss/server/zk/ZooKeeperClient.java    | 50 +++++++++++++++++-----
 .../server/authorizer/DefaultAuthorizerTest.java   |  4 +-
 .../CompletedSnapshotStoreManagerTest.java         |  8 ++--
 .../coordinator/CoordinatorChannelManagerTest.java |  6 +--
 .../fluss/server/coordinator/TableManagerTest.java |  2 -
 .../server/kv/prewrite/KvPreWriteBufferTest.java   |  4 +-
 .../kv/snapshot/CompletedSnapshotStoreTest.java    |  2 +-
 .../kv/snapshot/KvTabletSnapshotTargetTest.java    |  2 +-
 .../ZooKeeperCompletedSnapshotStoreTest.java       |  4 +-
 .../apache/fluss/server/log/LogManagerTest.java    |  2 +-
 .../org/apache/fluss/server/log/LogTabletTest.java |  4 +-
 .../server/log/remote/RemoteLogTabletTest.java     |  4 +-
 .../server/metadata/MetadataUpdateITCase.java      |  2 +-
 .../fluss/server/replica/AdjustIsrITCase.java      |  2 +-
 .../fluss/server/replica/AdjustIsrManagerTest.java |  2 +-
 .../replica/HighWatermarkPersistenceTest.java      |  2 +-
 .../replica/RemoveOfflineReplicaFromIsrITCase.java |  2 +-
 .../apache/fluss/server/replica/ReplicaTest.java   |  4 +-
 .../fluss/server/replica/ReplicaTestBase.java      |  4 +-
 .../fluss/server/tablet/TabletServiceITCase.java   |  4 +-
 .../server/testutils/FlussClusterExtension.java    |  4 +-
 fluss-test-coverage/pom.xml                        |  4 +-
 33 files changed, 88 insertions(+), 63 deletions(-)

diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java 
b/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java
index ededf510f..82f3d5ec2 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java
@@ -384,7 +384,7 @@ public abstract class RpcServiceBase extends 
RpcGatewayService implements AdminR
     @Override
     public CompletableFuture<GetFileSystemSecurityTokenResponse> 
getFileSystemSecurityToken(
             GetFileSystemSecurityTokenRequest request) {
-        // TODO:add ACL for per-table in 
https://github.com/apache/fluss/issues/752
+        // TODO: add ACL for per-table in 
https://github.com/apache/fluss/issues/752
         try {
             // In order to avoid repeatedly obtaining security token, cache it 
for a while.
             long currentTimeMs = System.currentTimeMillis();
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
index bf2cfc9fb..3d63c8ba1 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
@@ -576,14 +576,14 @@ public class MetadataManager {
                     zookeeperClient.getTables(tablePaths);
             // currently, we don't support schema evolution, so all schemas 
are version 1
             Map<TablePath, SchemaInfo> tablePath2SchemaInfos =
-                    zookeeperClient.getV1Schemas(tablePaths);
+                    zookeeperClient.getLatestSchemas(tablePaths);
             for (TablePath tablePath : tablePaths) {
                 if (!tablePath2TableRegistrations.containsKey(tablePath)) {
                     throw new TableNotExistException("Table '" + tablePath + 
"' does not exist.");
                 }
                 if (!tablePath2SchemaInfos.containsKey(tablePath)) {
                     throw new SchemaNotExistException(
-                            "Schema for '" + tablePath + "' with schema_id=1 
does not exist.");
+                            "Schema for '" + tablePath + "' does not exist.");
                 }
                 TableRegistration tableReg = 
tablePath2TableRegistrations.get(tablePath);
                 SchemaInfo schemaInfo = tablePath2SchemaInfos.get(tablePath);
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/metadata/TabletServerMetadataCache.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/metadata/TabletServerMetadataCache.java
index 666a74aae..f4ecff6f0 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/metadata/TabletServerMetadataCache.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/metadata/TabletServerMetadataCache.java
@@ -70,7 +70,6 @@ public class TabletServerMetadataCache implements 
ServerMetadataCache {
 
     private final ServerSchemaCache serverSchemaCache;
 
-    // todo: replace this in test with schemaMetadataManager.
     public TabletServerMetadataCache(MetadataManager metadataManager) {
         this.serverMetadataSnapshot = ServerMetadataSnapshot.empty();
         this.metadataManager = metadataManager;
@@ -436,7 +435,7 @@ public class TabletServerMetadataCache implements 
ServerMetadataCache {
     }
 
     @VisibleForTesting
-    public ServerSchemaCache getSchemaMetadataManager() {
+    public ServerSchemaCache getServerSchemaCache() {
         return serverSchemaCache;
     }
 }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/replica/AdjustIsrManager.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/replica/AdjustIsrManager.java
index 251d670b6..d3b5d63c9 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/replica/AdjustIsrManager.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/replica/AdjustIsrManager.java
@@ -77,7 +77,7 @@ public class AdjustIsrManager {
 
     public CompletableFuture<LeaderAndIsr> submit(
             TableBucket tableBucket, LeaderAndIsr leaderAndIsr) {
-        // TODOadd coordinatorEpoch.
+        // TODO add coordinatorEpoch.
         CompletableFuture<LeaderAndIsr> future = new CompletableFuture<>();
         AdjustIsrItem adjustIsrItem = new AdjustIsrItem(tableBucket, 
leaderAndIsr, future);
         boolean enqueued = unsentAdjustIsrMap.putIfAbsent(tableBucket, 
adjustIsrItem) == null;
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java 
b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
index c1933bd5c..d8c88cbd7 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
@@ -1518,7 +1518,7 @@ public final class Replica {
         List<Integer> isrToSend = new ArrayList<>(isrState.isr());
         isrToSend.add(newInSyncReplicaId);
 
-        // TODOadd server epoch to isr.
+        // TODO add server epoch to isr.
 
         LeaderAndIsr newLeaderAndIsr =
                 new LeaderAndIsr(
@@ -1540,7 +1540,7 @@ public final class Replica {
         // erroneously advance the HW if the `AdjustIsr` were to fail. Hence, 
the "maximal ISR"
         // for `PendingShrinkIsr` is the current ISR.
 
-        // TODOadd server epoch to isr.
+        // TODO add server epoch to isr.
 
         LeaderAndIsr newLeaderAndIsr =
                 new LeaderAndIsr(
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherManager.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherManager.java
index 06e94e9ae..9e4479b3e 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherManager.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherManager.java
@@ -225,7 +225,7 @@ public class ReplicaFetcherManager {
                     fetcherThread.getLeader().leaderServerId(),
                     initialFetchStatusMap);
         } catch (InterruptedException e) {
-            LOG.error("Interrupted whileadd buckets to fetcher threads.", e);
+            LOG.error("Interrupted while add buckets to fetcher threads.", e);
         }
     }
 
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThread.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThread.java
index 76195a0df..398bfb6ff 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThread.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThread.java
@@ -80,7 +80,7 @@ final class ReplicaFetcherThread extends ShutdownableThread {
     private final LeaderEndpoint leader;
     private final int fetchBackOffMs;
 
-    // manuallyadd timout logic in here, todo remove this timeout logic if
+    // manually add timout logic in here, todo remove this timeout logic if
     // we support global request timeout in #279
     private final int timeoutSeconds = 30;
 
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/utils/RpcGatewayManager.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/utils/RpcGatewayManager.java
index 062eec73d..c89fca6f7 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/utils/RpcGatewayManager.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/utils/RpcGatewayManager.java
@@ -65,7 +65,7 @@ public class RpcGatewayManager<T extends RpcGateway> 
implements AutoCloseable {
     }
 
     /**
-     * Add a server to the manager. It'll create a new gateway for the server 
andadd it to the
+     * Add a server to the manager. It'll create a new gateway for the server 
and add it to the
      * manager. If the server has already existed, it'll remove the already 
existing server before
      * adding the new one.
      */
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java
index 165fd23e9..815771166 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java
@@ -339,7 +339,7 @@ public class ServerRpcMessageUtils {
                 .collect(Collectors.toList());
     }
 
-    private static @Nullable TableChange.ColumnPosition toColumnPosition(int 
columnPositionType) {
+    private static TableChange.ColumnPosition toColumnPosition(int 
columnPositionType) {
         ColumnPositionType opType = 
ColumnPositionType.from(columnPositionType);
         switch (opType) {
             case LAST:
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/utils/timer/Timer.java 
b/fluss-server/src/main/java/org/apache/fluss/server/utils/timer/Timer.java
index 270d334ee..ea6aead7b 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/utils/timer/Timer.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/utils/timer/Timer.java
@@ -35,7 +35,7 @@ public interface Timer {
      * Add a new task to this executor. It will be executed after the task's 
delay (beginning from
      * the time of submission).
      *
-     * @param timerTask the task toadd
+     * @param timerTask the task to add
      */
     void add(TimerTask timerTask);
 
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/utils/timer/TimingWheel.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/utils/timer/TimingWheel.java
index 934777c93..1dab9f229 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/utils/timer/TimingWheel.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/utils/timer/TimingWheel.java
@@ -108,8 +108,8 @@ import java.util.concurrent.atomic.AtomicInteger;
  * while priority queue based timers takes O(log N) for both insert and delete 
where N is the number
  * of items in the queue.
  *
- * <p>This class is not thread-safe. There should not be anyadd calls while 
advanceClock is
- * executing. It is caller's responsibility to enforce it. Simultaneousadd 
calls are thread-safe.
+ * <p>This class is not thread-safe. There should not be any add calls while 
advanceClock is
+ * executing. It is caller's responsibility to enforce it. Simultaneous add 
calls are thread-safe.
  */
 @NotThreadSafe
 final class TimingWheel {
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java 
b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java
index e27ffd9c3..e80006e60 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java
@@ -536,18 +536,46 @@ public class ZooKeeperClient implements AutoCloseable {
                 "tables registration");
     }
 
-    /** Get the v1 schema for given tables in ZK. */
-    public Map<TablePath, SchemaInfo> getV1Schemas(Collection<TablePath> 
tablePaths)
+    /** Get the latest schema for given tables in ZK. */
+    public Map<TablePath, SchemaInfo> getLatestSchemas(Collection<TablePath> 
tablePaths)
             throws Exception {
-        Map<String, TablePath> path2TablePathMap =
-                tablePaths.stream()
-                        .collect(toMap(p -> SchemaZNode.path(p, 
DEFAULT_SCHEMA_ID), path -> path));
+        Map<String, TablePath> schemaChildren2TablePathMap =
+                tablePaths.stream().collect(toMap(SchemasZNode::path, path -> 
path));
+        List<ZkGetChildrenResponse> childrenResponses =
+                getChildrenInBackground(schemaChildren2TablePathMap.keySet());
+        // get the schema ids for each table
+        Map<TablePath, List<String>> schemaIdsForTables =
+                processGetChildrenResponses(
+                        childrenResponses,
+                        response -> 
schemaChildren2TablePathMap.get(response.getPath()),
+                        "schema children for tables");
+
+        // get the schema info for each latest schema id
+        Map<TablePath, Integer> latestSchemaIdMap = new HashMap<>();
+        Map<String, TablePath> path2TablePathMap = new HashMap<>();
+        schemaIdsForTables.forEach(
+                (tp, schemaIds) -> {
+                    int latestSchemaId =
+                            
schemaIds.stream().map(Integer::parseInt).reduce(Math::max).orElse(0);
+                    latestSchemaIdMap.put(tp, latestSchemaId);
+                    path2TablePathMap.put(SchemaZNode.path(tp, 
latestSchemaId), tp);
+                });
+
         List<ZkGetDataResponse> responses = 
getDataInBackground(path2TablePathMap.keySet());
-        return processGetDataResponses(
-                responses,
-                resp -> path2TablePathMap.get(resp.getPath()),
-                b -> new SchemaInfo(SchemaZNode.decode(b), DEFAULT_SCHEMA_ID),
-                "schema");
+        Map<TablePath, Schema> schemasForTables =
+                processGetDataResponses(
+                        responses,
+                        resp -> path2TablePathMap.get(resp.getPath()),
+                        SchemaZNode::decode,
+                        "schema");
+
+        Map<TablePath, SchemaInfo> result = new HashMap<>();
+        schemasForTables.forEach(
+                (tp, schema) -> {
+                    int schemaId = latestSchemaIdMap.get(tp);
+                    result.put(tp, new SchemaInfo(schema, schemaId));
+                });
+        return result;
     }
 
     /** Update the table in ZK. */
@@ -810,7 +838,7 @@ public class ZooKeeperClient implements AutoCloseable {
 
     /** Register schema to ZK metadata and return the schema id. */
     public int registerSchema(TablePath tablePath, Schema schema) throws 
Exception {
-        return registerSchema(tablePath, schema, getCurrentSchemaId(tablePath) 
+ 1);
+        return registerSchema(tablePath, schema, DEFAULT_SCHEMA_ID);
     }
 
     public int registerSchema(TablePath tablePath, Schema schema, int 
schemaId) throws Exception {
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/authorizer/DefaultAuthorizerTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/authorizer/DefaultAuthorizerTest.java
index abc575fd5..af69caffe 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/authorizer/DefaultAuthorizerTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/authorizer/DefaultAuthorizerTest.java
@@ -487,7 +487,7 @@ public class DefaultAuthorizerTest {
         Set<AccessControlEntry> expectedAcls = new HashSet<>();
 
         for (int i = 0; i < 50; i++) {
-            // each task represent that addColumn acl to different user on 
same resource.
+            // each task represent that add acl to different user on same 
resource.
             final AccessControlEntry accessControlEntry =
                     createAclEntry(String.valueOf(i), "host-1", READ);
             int finalI = i;
@@ -550,7 +550,7 @@ public class DefaultAuthorizerTest {
         // generate 50 concurrent acl operation tasks.
         List<Runnable> concurrentTasks = new ArrayList<>();
         for (int i = 0; i < 50; i++) {
-            // each task represent that addColumn acl to same user on same 
resource.
+            // each task represent that add acl to same user on same resource.
             Runnable runnable =
                     () -> {
                         addAcls(
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CompletedSnapshotStoreManagerTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CompletedSnapshotStoreManagerTest.java
index 4460cfc26..b8fec59ca 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CompletedSnapshotStoreManagerTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CompletedSnapshotStoreManagerTest.java
@@ -99,13 +99,13 @@ class CompletedSnapshotStoreManagerTest {
         CompletedSnapshotStoreManager completedSnapshotStoreManager =
                 
createCompletedSnapshotStoreManager(maxNumberOfSnapshotsToRetain);
 
-        // addColumn snapshots for a series of buckets
+        // add snapshots for a series of buckets
         Set<TableBucket> tableBuckets = createTableBuckets(2, 3);
         int snapshotNum = 3;
 
         Map<TableBucket, CompletedSnapshot> 
tableBucketLatestCompletedSnapshots = new HashMap<>();
         for (TableBucket tableBucket : tableBuckets) {
-            // addColumn some snapshots
+            // add some snapshots
             for (int snapshot = 0; snapshot < snapshotNum; snapshot++) {
                 CompletedSnapshot completedSnapshot =
                         KvTestUtils.mockCompletedSnapshot(tempDir, 
tableBucket, snapshot);
@@ -137,7 +137,7 @@ class CompletedSnapshotStoreManagerTest {
             assertThat(completedSnapshot)
                     
.isEqualTo(tableBucketLatestCompletedSnapshots.get(tableBucket));
 
-            // addColumn a new snapshot
+            // add a new snapshot
             long snapshotId = completedSnapshot.getSnapshotID() + 1;
             completedSnapshot = KvTestUtils.mockCompletedSnapshot(tempDir, 
tableBucket, snapshotId);
             addCompletedSnapshot(completedSnapshotStoreManager, 
completedSnapshot);
@@ -169,7 +169,7 @@ class CompletedSnapshotStoreManagerTest {
         Set<TableBucket> tableBuckets = createTableBuckets(1, 2);
         int snapshotNum = 3;
         for (TableBucket tableBucket : tableBuckets) {
-            // addColumn some snapshots
+            // add some snapshots
             for (int snapshot = 0; snapshot < snapshotNum; snapshot++) {
                 CompletedSnapshot completedSnapshot =
                         KvTestUtils.mockCompletedSnapshot(tempDir, 
tableBucket, snapshot);
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorChannelManagerTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorChannelManagerTest.java
index 581edafcc..bd4467c0a 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorChannelManagerTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorChannelManagerTest.java
@@ -69,14 +69,14 @@ class CoordinatorChannelManagerTest {
         // now, shouldn't send as we already remove the tablet server
         checkSendRequest(coordinatorChannelManager, server0.id(), false);
 
-        // test addColumn tablet server
-        // before addColumn, shouldn't send
+        // test add tablet server
+        // before add, shouldn't send
         ServerNode server1 = tabletServersNode.get(1);
         checkSendRequest(coordinatorChannelManager, server1.id(), false);
 
         coordinatorChannelManager.addTabletServer(server1);
 
-        // after addColumn the tablet server, should send
+        // after add the tablet server, should send
         // try to send message
         checkSendRequest(coordinatorChannelManager, server1.id(), true);
 
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerTest.java
index b8b348bdb..3c417efe8 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerTest.java
@@ -211,8 +211,6 @@ class TableManagerTest {
         
assertThat(coordinatorContext.getAllReplicasForTable(tableId)).isEmpty();
     }
 
-    void testSchemaChange() throws Exception {}
-
     @Test
     void testResumeDeletionAfterRestart() throws Exception {
         // first, create a table
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBufferTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBufferTest.java
index 74d145bda..b39ec39ac 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBufferTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBufferTest.java
@@ -163,13 +163,13 @@ class KvPreWriteBufferTest {
         }
         assertThat(getValue(buffer, "key6")).isNull();
 
-        // addColumn delete records.
+        // add delete records.
         elementCount = 5;
         bufferDelete(buffer, "key4", elementCount++);
         bufferDelete(buffer, "key3", elementCount++);
         assertThat(getValue(buffer, "key3")).isNull();
 
-        // addColumn update records
+        // add update records
         bufferPut(buffer, "key2", "value2-1", elementCount++);
         bufferPut(buffer, "key1", "value1-1", elementCount++);
         assertThat(getValue(buffer, "key1")).isEqualTo("value1-1");
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotStoreTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotStoreTest.java
index 87fb23259..8d45628d8 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotStoreTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotStoreTest.java
@@ -161,7 +161,7 @@ class CompletedSnapshotStoreTest {
         final CompletedSnapshot ckp = getSnapshot(ckpId);
 
         assertThatThrownBy(() -> completedSnapshotStore.add(ckp))
-                .as("We should get an exception when addColumn snapshot to 
failed..")
+                .as("We should get an exception when add snapshot to failed..")
                 .hasMessageContaining(errMsg)
                 .isInstanceOf(FlussException.class);
 
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTargetTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTargetTest.java
index 2451e7ecc..9f9ea078d 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTargetTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTargetTest.java
@@ -247,7 +247,7 @@ class KvTabletSnapshotTargetTest {
         final String errMsg = "Add to snapshot handle failed.";
         AtomicBoolean shouldFail = new AtomicBoolean(true);
         // we use a store will fail when the variable shouldFail is true
-        // addColumn the snapshot to store will fail
+        // add the snapshot to store will fail
         CompletedSnapshotHandleStore completedSnapshotHandleStore =
                 TestCompletedSnapshotHandleStore.newBuilder()
                         .setAddFunction(
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/ZooKeeperCompletedSnapshotStoreTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/ZooKeeperCompletedSnapshotStoreTest.java
index 5b890acd6..c6e90ff63 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/ZooKeeperCompletedSnapshotStoreTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/ZooKeeperCompletedSnapshotStoreTest.java
@@ -94,8 +94,8 @@ class ZooKeeperCompletedSnapshotStoreTest {
     }
 
     /**
-     * Tests that the snapshot does not exist in the store when we fail to 
addColumn it into the
-     * store (i.e., there exists an exception thrown by the method).
+     * Tests that the snapshot does not exist in the store when we fail to add 
it into the store
+     * (i.e., there exists an exception thrown by the method).
      */
     @Test
     void testAddSnapshotWithFailedRemove(@TempDir Path tmpDir) throws 
Exception {
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/log/LogManagerTest.java 
b/fluss-server/src/test/java/org/apache/fluss/server/log/LogManagerTest.java
index f4e536b86..2bbdd2a1c 100644
--- a/fluss-server/src/test/java/org/apache/fluss/server/log/LogManagerTest.java
+++ b/fluss-server/src/test/java/org/apache/fluss/server/log/LogManagerTest.java
@@ -79,7 +79,7 @@ final class LogManagerTest extends LogTestBase {
     private TableBucket tableBucket2;
     private LogManager logManager;
 
-    // TODOadd more tests refer to kafka's LogManagerTest.
+    // TODO add more tests refer to kafka's LogManagerTest.
 
     @BeforeAll
     static void baseBeforeAll() {
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/log/LogTabletTest.java 
b/fluss-server/src/test/java/org/apache/fluss/server/log/LogTabletTest.java
index 7af086823..7b88a53a5 100644
--- a/fluss-server/src/test/java/org/apache/fluss/server/log/LogTabletTest.java
+++ b/fluss-server/src/test/java/org/apache/fluss/server/log/LogTabletTest.java
@@ -73,7 +73,7 @@ final class LogTabletTest extends LogTestBase {
     private FlussScheduler scheduler;
     private File logDir;
 
-    // TODOadd more tests refer to kafka's UnifiedLogTest.
+    // TODO add more tests refer to kafka's UnifiedLogTest.
 
     @BeforeEach
     public void setup() throws Exception {
@@ -374,7 +374,7 @@ final class LogTabletTest extends LogTestBase {
 
         log.updateHighWatermark(log.localLogEndOffset());
 
-        // TODOadd delete log segment logic.
+        // TODO add delete log segment logic.
     }
 
     @Test
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogTabletTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogTabletTest.java
index 5bedd921e..0d571e1ea 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogTabletTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogTabletTest.java
@@ -94,7 +94,7 @@ class RemoteLogTabletTest extends RemoteLogTestBase {
         assertThat(remoteLogTablet.getRemoteLogStartOffset()).isEqualTo(0);
         
assertThat(remoteLogTablet.getRemoteLogEndOffset()).isEqualTo(OptionalLong.of(30));
 
-        // delete first remote log segment and addColumn another one remote 
log segments.
+        // delete first remote log segment and add another one remote log 
segments.
         remoteLogTablet.addAndDeleteLogSegments(
                 Collections.singletonList(remoteLogSegmentList.get(3)),
                 Collections.singletonList(remoteLogSegmentList.get(0)));
@@ -102,7 +102,7 @@ class RemoteLogTabletTest extends RemoteLogTestBase {
         assertThat(remoteLogTablet.getRemoteLogStartOffset()).isEqualTo(10);
         
assertThat(remoteLogTablet.getRemoteLogEndOffset()).isEqualTo(OptionalLong.of(40));
 
-        // delete all exist and append one. we will first addColumn then 
delete.
+        // delete all exist and append one. we will first add then delete.
         remoteLogTablet.addAndDeleteLogSegments(
                 Collections.singletonList(remoteLogSegmentList.get(4)), 
remoteLogSegmentList);
         assertThat(remoteLogTablet.getIdToRemoteLogSegmentMap()).hasSize(0);
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/metadata/MetadataUpdateITCase.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/metadata/MetadataUpdateITCase.java
index 35c36d30c..b6fdd9e93 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/metadata/MetadataUpdateITCase.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/metadata/MetadataUpdateITCase.java
@@ -157,7 +157,7 @@ class MetadataUpdateITCase {
                                 expectedTablePathById,
                                 Collections.emptyMap()));
 
-        // addColumn back tablet server2
+        // add back tablet server2
         FLUSS_CLUSTER_EXTENSION.startTabletServer(2);
         retry(
                 Duration.ofMinutes(1),
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/AdjustIsrITCase.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/AdjustIsrITCase.java
index bb2138881..933abd231 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/AdjustIsrITCase.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/AdjustIsrITCase.java
@@ -140,7 +140,7 @@ public class AdjustIsrITCase {
         isr.add(stopFollower);
         FLUSS_CLUSTER_EXTENSION.notifyLeaderAndIsr(
                 stopFollower, DATA1_TABLE_PATH, tb, newLeaderAndIsr, isr);
-        // retry until the stop follower addColumn back to ISR.
+        // retry until the stop follower add back to ISR.
         retry(
                 Duration.ofMinutes(1),
                 () ->
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/AdjustIsrManagerTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/AdjustIsrManagerTest.java
index 75fb9e248..3f8623569 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/AdjustIsrManagerTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/AdjustIsrManagerTest.java
@@ -32,7 +32,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 /** Test for {@link AdjustIsrManager}. */
 class AdjustIsrManagerTest {
 
-    // TODO addColumn more test refer to kafka AlterPartitionManagerTest, See: 
FLUSS-56278513
+    // TODO add more test refer to kafka AlterPartitionManagerTest, See: 
FLUSS-56278513
 
     @Test
     void testSubmitShrinkIsr() throws Exception {
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/HighWatermarkPersistenceTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/HighWatermarkPersistenceTest.java
index b089efd30..b093787f0 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/HighWatermarkPersistenceTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/HighWatermarkPersistenceTest.java
@@ -92,7 +92,7 @@ final class HighWatermarkPersistenceTest extends 
ReplicaTestBase {
         assertThat(highWatermark0).isEqualTo(10L);
         assertThat(replica0.getLogTablet().getHighWatermark()).isEqualTo(10L);
 
-        // addColumn another replica and set highWatermark.
+        // add another replica and set highWatermark.
         TableBucket tableBucket1 = new TableBucket(DATA2_TABLE_ID, 0);
         replicaManager.becomeLeaderOrFollower(
                 INITIAL_COORDINATOR_EPOCH,
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/RemoveOfflineReplicaFromIsrITCase.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/RemoveOfflineReplicaFromIsrITCase.java
index a7c65acda..9e13b1688 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/RemoveOfflineReplicaFromIsrITCase.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/RemoveOfflineReplicaFromIsrITCase.java
@@ -130,7 +130,7 @@ class RemoveOfflineReplicaFromIsrITCase {
         FLUSS_CLUSTER_EXTENSION.startTabletServer(follower);
         isr.add(follower);
 
-        // make sure the stopped follower can addColumn back to isr after 
restart
+        // make sure the stopped follower can add back to isr after restart
         retry(
                 Duration.ofMinutes(1),
                 () ->
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java
index a18d76442..c8e26ea7b 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java
@@ -95,8 +95,8 @@ import static 
org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Test for {@link Replica}. */
 final class ReplicaTest extends ReplicaTestBase {
-    // TODO addColumn more tests refer to kafka's PartitionTest.
-    // TODO addColumn more tests to cover partition table
+    // TODO add more tests refer to kafka's PartitionTest.
+    // TODO add more tests to cover partition table
 
     @Test
     void testMakeLeader() throws Exception {
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java
index c438da529..80d998a4d 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java
@@ -342,7 +342,7 @@ public class ReplicaTestBase {
     }
 
     // TODO this is only for single tablet server unit test.
-    // TODO addColumn more test cases for partition table which make leader by 
this method.
+    // TODO add more test cases for partition table which make leader by this 
method.
     protected void makeLogTableAsLeader(int bucketId) {
         makeLogTableAsLeader(new TableBucket(DATA1_TABLE_ID, bucketId), false);
     }
@@ -375,7 +375,7 @@ public class ReplicaTestBase {
     }
 
     // TODO this is only for single tablet server unit test.
-    // TODO addColumn more test cases for partition table which make leader by 
this method.
+    // TODO add more test cases for partition table which make leader by this 
method.
     protected void makeKvTableAsLeader(long tableId, TablePath tablePath, int 
bucketId) {
         makeKvTableAsLeader(
                 new TableBucket(tableId, bucketId), tablePath, 
INITIAL_LEADER_EPOCH, false);
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java
index 8b0b75c35..01cf2701a 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java
@@ -177,7 +177,7 @@ public class TabletServiceITCase {
     }
 
     @Test
-    @Disabled("TODO: addColumn back in 
https://github.com/apache/fluss/issues/771";)
+    @Disabled("TODO: add back in https://github.com/apache/fluss/issues/771";)
     void testProduceLogResponseReturnInOrder() throws Exception {
         long tableId =
                 createTable(
@@ -334,7 +334,7 @@ public class TabletServiceITCase {
     }
 
     @Test
-    @Disabled("TODO: addColumn back in 
https://github.com/apache/fluss/issues/777";)
+    @Disabled("TODO: add back in https://github.com/apache/fluss/issues/777";)
     void testFetchLogWithMinFetchSizeAndTimeout() throws Exception {
         long tableId =
                 createTable(FLUSS_CLUSTER_EXTENSION, DATA1_TABLE_PATH, 
DATA1_TABLE_DESCRIPTOR);
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java
index 8cd737e51..a1ee83fa6 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java
@@ -286,7 +286,7 @@ public final class FlussClusterExtension
     }
 
     private void startTabletServers() throws Exception {
-        // addColumn tablet server to make generate assignment for table 
possible
+        // add tablet server to make generate assignment for table possible
         for (int i = 0; i < initialNumOfTabletServers; i++) {
             startTabletServer(i);
         }
@@ -679,7 +679,7 @@ public final class FlussClusterExtension
                         int leader = leaderAndIsrOpt.get().leader();
                         TabletServer tabletServer = 
getTabletServerById(leader);
                         ServerSchemaCache serverSchemaCache =
-                                
tabletServer.getMetadataCache().getSchemaMetadataManager();
+                                
tabletServer.getMetadataCache().getServerSchemaCache();
                         Map<Long, SchemaInfo> latestSchemaByTablePath =
                                 serverSchemaCache.getLatestSchemaByTableId();
                         
assertThat(latestSchemaByTablePath).containsKey(tableId);
diff --git a/fluss-test-coverage/pom.xml b/fluss-test-coverage/pom.xml
index ca167c2d9..0c6f54cd5 100644
--- a/fluss-test-coverage/pom.xml
+++ b/fluss-test-coverage/pom.xml
@@ -164,7 +164,7 @@
                                             <excludes>
                                                 
<exclude>fluss-test-coverage/**</exclude>
                                                 
<exclude>fluss-test-utils/**</exclude>
-                                                <!-- exclude adapter classes 
to avoid Jacoco error: "Can'tadd different class with same name" -->
+                                                <!-- exclude adapter classes 
to avoid Jacoco error: "Can't add different class with same name" -->
                                                 
<exclude>fluss-flink/**/target/classes/org/apache/fluss/flink/adapter/**</exclude>
                                             </excludes>
                                         </resource>
@@ -206,7 +206,7 @@
                                             <excludes>
                                                 
<exclude>fluss-test-coverage/**</exclude>
                                                 
<exclude>fluss-test-utils/**</exclude>
-                                                <!-- exclude adapter classes 
to avoid Jacoco error: "Can'tadd different class with same name" -->
+                                                <!-- exclude adapter classes 
to avoid Jacoco error: "Can't add different class with same name" -->
                                                 
<exclude>fluss-flink/**/target/classes/org/apache/fluss/flink/adapter/**</exclude>
                                             </excludes>
                                         </resource>


Reply via email to