This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch ci-add-column
in repository https://gitbox.apache.org/repos/asf/fluss.git

commit ff3db43d42e64d9d0691455ad9dbccfb7f5728e5
Author: Jark Wu <[email protected]>
AuthorDate: Mon Dec 1 02:35:27 2025 +0800

    WIP3
---
 .../fluss/flink/sink/writer/FlinkSinkWriter.java   |  2 +-
 .../source/reader/FlinkSourceSplitReader.java      |  3 +-
 .../flink/utils/FlussRowToFlinkRowConverter.java   |  1 -
 .../fluss/lake/iceberg/conf/HadoopUtils.java       |  5 +-
 .../iceberg/maintenance/IcebergRewriteITCase.java  |  2 +-
 .../fluss/rpc/netty/client/ServerConnection.java   |  3 +-
 fluss-rpc/src/main/proto/FlussApi.proto            | 12 +--
 .../coordinator/CompletedSnapshotStoreManager.java |  2 +-
 .../coordinator/CoordinatorEventProcessor.java     |  1 -
 .../fluss/server/coordinator/MetadataManager.java  |  8 +-
 .../fluss/server/coordinator/SchemaUpdate.java     | 28 +++---
 .../fluss/server/coordinator/UpdateSchema.java     | 28 ------
 .../event/watcher/TableChangeWatcher.java          |  9 --
 .../server/entity/BatchRegisterLeadAndIsr.java     |  2 +-
 .../apache/fluss/server/kv/KvRecoverHelper.java    | 99 +++++++++-------------
 .../java/org/apache/fluss/server/kv/KvTablet.java  | 11 +--
 .../server/kv/snapshot/CompletedSnapshotStore.java |  4 +-
 .../fluss/server/kv/snapshot/SnapshotLocation.java |  2 +-
 ...MetadataManager.java => ServerSchemaCache.java} |  7 +-
 .../server/metadata/TabletServerMetadataCache.java | 15 ++--
 ...ManagerTest.java => ServerSchemaCacheTest.java} | 16 ++--
 .../server/testutils/FlussClusterExtension.java    |  6 +-
 22 files changed, 98 insertions(+), 168 deletions(-)

diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/FlinkSinkWriter.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/FlinkSinkWriter.java
index 97a70ce25..fe0eb28ea 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/FlinkSinkWriter.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/FlinkSinkWriter.java
@@ -110,7 +110,7 @@ public abstract class FlinkSinkWriter<InputT> implements 
SinkWriter<InputT> {
         connection = ConnectionFactory.createConnection(flussConfig, 
flinkMetricRegistry);
         table = connection.getTable(tablePath);
         LOG.info(
-                "Current Fluss Schema is {}, Table RowType is {}",
+                "Current Fluss Schema is {}, input Flink RowType is {}",
                 table.getTableInfo().getSchema(),
                 tableRowType);
         sanityCheck(table.getTableInfo());
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java
index 10997e91e..a19572dc9 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java
@@ -217,8 +217,7 @@ public class FlinkSourceSplitReader implements 
SplitReader<RecordAndPos, SourceS
                             (LakeSnapshotAndFlussLogSplit) sourceSplitBase;
                     if (lakeSnapshotAndFlussLogSplit.isStreaming()) {
                         // is streaming split which has no stopping offset, we 
need also
-                        // subscribe
-                        // change log
+                        // subscribe change log
                         subscribeLog(
                                 lakeSnapshotAndFlussLogSplit,
                                 
lakeSnapshotAndFlussLogSplit.getStartingOffset());
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlussRowToFlinkRowConverter.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlussRowToFlinkRowConverter.java
index 4dc5287b0..0846b284b 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlussRowToFlinkRowConverter.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlussRowToFlinkRowConverter.java
@@ -47,7 +47,6 @@ public class FlussRowToFlinkRowConverter {
     private final InternalRow.FieldGetter[] flussFieldGetters;
     private final int[] indexMapping;
 
-    // todo: remove
     public FlussRowToFlinkRowConverter(RowType rowType) {
         this(rowType, toFlinkRowType(rowType));
     }
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/conf/HadoopUtils.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/conf/HadoopUtils.java
index 9c86a92c1..5aad1d5bf 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/conf/HadoopUtils.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/conf/HadoopUtils.java
@@ -79,8 +79,7 @@ public class HadoopUtils {
         }
 
         // Approach 3: Fluss configuration
-        // add all configuration key with prefix 'iceberg.hadoop.' in fluss 
conf to hadoop
-        // conf
+        // add all configuration key with prefix 'iceberg.hadoop.' in fluss 
conf to hadoop conf
         for (String key : flussConfiguration.keySet()) {
             for (String prefix : FLUSS_CONFIG_PREFIXES) {
                 if (key.startsWith(prefix)) {
@@ -109,7 +108,7 @@ public class HadoopUtils {
     }
 
     /**
-     * Search Hadoop configuration files in the given path, andadd them to the 
configuration if
+     * Search Hadoop configuration files in the given path, and add them to 
the configuration if
      * found.
      */
     private static boolean addHadoopConfIfFound(
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteITCase.java
 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteITCase.java
index 133c23322..4b6d9de71 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteITCase.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteITCase.java
@@ -147,7 +147,7 @@ class IcebergRewriteITCase extends 
FlinkIcebergTieringTestBase {
             // add pos-delete and trigger compaction
             rows = Arrays.asList(row(4, "v1"), row(4, "v2"));
             flussRows.add(writeIcebergTableRecords(t1, t1Bucket, 6, false, 
rows).get(1));
-            // rewritten files should fail to commit due to conflict,add check 
here
+            // rewritten files should fail to commit due to conflict, add 
check here
             checkRecords(getIcebergRecords(t1), flussRows);
             // 4 data file and 1 delete file
             checkFileStatusInIcebergTable(t1, 4, true);
diff --git 
a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java
 
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java
index 7f4d6e84e..a09a50c09 100644
--- 
a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java
+++ 
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java
@@ -69,8 +69,7 @@ final class ServerConnection {
 
     private final ServerNode node;
 
-    // TODO: add max inflight requests limit like Kafka's
-    // "max.in.flight.requests.per.connection"
+    // TODO: add max inflight requests limit like Kafka's 
"max.in.flight.requests.per.connection"
     private final Map<Integer, InflightRequest> inflightRequests = 
MapUtils.newConcurrentHashMap();
     private final CompletableFuture<Void> closeFuture = new 
CompletableFuture<>();
     private final ConnectionMetrics connectionMetrics;
diff --git a/fluss-rpc/src/main/proto/FlussApi.proto 
b/fluss-rpc/src/main/proto/FlussApi.proto
index 9feb38dc1..a08eeaba4 100644
--- a/fluss-rpc/src/main/proto/FlussApi.proto
+++ b/fluss-rpc/src/main/proto/FlussApi.proto
@@ -918,33 +918,33 @@ message PbAlterConfig {
   required int32 op_type = 3; // SET=0, DELETE=1, APPEND=2, SUBTRACT=3
 }
 
-message PbAddColumn{
+message PbAddColumn {
   required string column_name = 1;
   required bytes data_type_json = 2;
   optional string comment = 3;
   required int32 column_position_type = 4; // LAST=0,FIRST=1,AFTER=3
 }
 
-message PbDropColumn{
+message PbDropColumn {
   required string column_name = 1;
 }
 
-message PbRenameColumn{
+message PbRenameColumn {
   required string old_column_name = 1;
   required string new_column_name = 2;
 }
 
 
-message PbModifyColumn{
+message PbModifyColumn {
   required string column_name = 1;
-  required bytes data_type_json = 2;
+  optional bytes data_type_json = 2;
   optional string comment = 3;
   optional int32 column_position_type = 4; // LAST=0,FIRST=1,AFTER=3
 }
 
 
 
-message PbDescribeConfig{
+message PbDescribeConfig {
   required string config_key = 1;
   optional string config_value = 2;
   required string config_source = 3;
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CompletedSnapshotStoreManager.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CompletedSnapshotStoreManager.java
index def57231d..b7c92289b 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CompletedSnapshotStoreManager.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CompletedSnapshotStoreManager.java
@@ -148,7 +148,7 @@ public class CompletedSnapshotStoreManager {
                                     () -> getAllSnapshotSize(bucket));
                         } else {
                             LOG.warn(
-                                    "Failed toadd bucketMetricGroup for 
tableBucket {} when creating completed snapshot.",
+                                    "Failed to add bucketMetricGroup for 
tableBucket {} when creating completed snapshot.",
                                     bucket);
                         }
                         return snapshotStore;
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
index fbe0986b1..d1d551121 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
@@ -556,7 +556,6 @@ public class CoordinatorEventProcessor implements 
EventProcessor {
 
     private void processCreateTable(CreateTableEvent createTableEvent) {
         long tableId = createTableEvent.getTableInfo().getTableId();
-        int schemaId = createTableEvent.getTableInfo().getSchemaId();
         // skip the table if it already exists
         if (coordinatorContext.containsTableId(tableId)) {
             return;
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
index 19b13aaa9..bf2cfc9fb 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
@@ -330,11 +330,7 @@ public class MetadataManager {
 
             // validate the table column changes
             if (!schemaChanges.isEmpty()) {
-                UpdateSchema schemaUpdate = new SchemaUpdate(table);
-                for (TableChange schemaChange : schemaChanges) {
-                    schemaUpdate = 
schemaUpdate.applySchemaChange(schemaChange);
-                }
-                Schema newSchema = schemaUpdate.getSchema();
+                Schema newSchema = SchemaUpdate.applySchemaChanges(table, 
schemaChanges);
                 // update the schema
                 zookeeperClient.registerSchema(tablePath, newSchema);
             }
@@ -485,7 +481,7 @@ public class MetadataManager {
 
         if (toEnableDataLake) {
             TableInfo newTableInfo = 
newTableRegistration.toTableInfo(tablePath, schemaInfo);
-            // if the table is lake table, we need toadd it to lake table 
tiering manager
+            // if the table is lake table, we need to add it to lake table 
tiering manager
             lakeTableTieringManager.addNewLakeTable(newTableInfo);
         } else if (toDisableDataLake) {
             
lakeTableTieringManager.removeLakeTable(newTableRegistration.tableId);
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java
index 1cdc702d7..9efab2a51 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java
@@ -29,12 +29,20 @@ import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /** Schema update. */
-public class SchemaUpdate implements UpdateSchema {
+public class SchemaUpdate {
+
+    /** Apply schema changes to the given table info and return the updated 
schema. */
+    public static Schema applySchemaChanges(TableInfo tableInfo, 
List<TableChange> changes) {
+        SchemaUpdate schemaUpdate = new SchemaUpdate(tableInfo);
+        for (TableChange change : changes) {
+            schemaUpdate = schemaUpdate.applySchemaChange(change);
+        }
+        return schemaUpdate.getSchema();
+    }
+
     private final List<Schema.Column> columns;
     private final AtomicInteger highestFieldId;
     private final List<String> primaryKeys;
-    private final List<String> bucketKeys;
-    private final List<String> partitionKeys;
     private final Map<String, Schema.Column> existedColumns;
 
     public SchemaUpdate(TableInfo tableInfo) {
@@ -42,15 +50,12 @@ public class SchemaUpdate implements UpdateSchema {
         this.existedColumns = new HashMap<>();
         this.highestFieldId = new 
AtomicInteger(tableInfo.getSchema().getHighestFieldId());
         this.primaryKeys = tableInfo.getPrimaryKeys();
-        this.bucketKeys = tableInfo.getBucketKeys();
-        this.partitionKeys = tableInfo.getPartitionKeys();
         this.columns.addAll(tableInfo.getSchema().getColumns());
         for (Schema.Column column : columns) {
             existedColumns.put(column.getName(), column);
         }
     }
 
-    @Override
     public Schema getSchema() {
         Schema.Builder builder =
                 Schema.newBuilder()
@@ -63,8 +68,7 @@ public class SchemaUpdate implements UpdateSchema {
         return builder.build();
     }
 
-    @Override
-    public UpdateSchema applySchemaChange(TableChange columnChange) {
+    public SchemaUpdate applySchemaChange(TableChange columnChange) {
         if (columnChange instanceof TableChange.AddColumn) {
             return addColumn((TableChange.AddColumn) columnChange);
         } else if (columnChange instanceof TableChange.ModifyColumn) {
@@ -78,7 +82,7 @@ public class SchemaUpdate implements UpdateSchema {
                 "Unknown column change type " + 
columnChange.getClass().getName());
     }
 
-    private UpdateSchema addColumn(TableChange.AddColumn addColumn) {
+    private SchemaUpdate addColumn(TableChange.AddColumn addColumn) {
         if (existedColumns.containsKey(addColumn.getName())) {
             throw new IllegalArgumentException(
                     "Column " + addColumn.getName() + " already exists.");
@@ -105,15 +109,15 @@ public class SchemaUpdate implements UpdateSchema {
         return this;
     }
 
-    private UpdateSchema dropColumn(TableChange.DropColumn dropColumn) {
+    private SchemaUpdate dropColumn(TableChange.DropColumn dropColumn) {
         throw new SchemaChangeException("Not support drop column now.");
     }
 
-    private UpdateSchema modifiedColumn(TableChange.ModifyColumn modifyColumn) 
{
+    private SchemaUpdate modifiedColumn(TableChange.ModifyColumn modifyColumn) 
{
         throw new SchemaChangeException("Not support modify column now.");
     }
 
-    private UpdateSchema renameColumn(TableChange.RenameColumn renameColumn) {
+    private SchemaUpdate renameColumn(TableChange.RenameColumn renameColumn) {
         throw new SchemaChangeException("Not support rename column now.");
     }
 }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/UpdateSchema.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/UpdateSchema.java
deleted file mode 100644
index 1d51704ac..000000000
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/UpdateSchema.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.fluss.server.coordinator;
-
-import org.apache.fluss.metadata.Schema;
-import org.apache.fluss.metadata.TableChange;
-
-/** Schema update. */
-public interface UpdateSchema {
-    Schema getSchema();
-
-    UpdateSchema applySchemaChange(TableChange columnChange);
-}
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/watcher/TableChangeWatcher.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/watcher/TableChangeWatcher.java
index acfb7e689..09c273e7d 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/watcher/TableChangeWatcher.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/watcher/TableChangeWatcher.java
@@ -246,16 +246,7 @@ public class TableChangeWatcher {
     private void processSchemaChange(TablePath tablePath, int schemaId) {
 
         try {
-            int currentSchemaId = 
zooKeeperClient.getCurrentSchemaId(tablePath);
             SchemaInfo schemaInfo;
-            if (schemaId != currentSchemaId) {
-                LOG.warn(
-                        "Schema id {} is not equal to current schema id {}. 
Skipping schema change processing.",
-                        schemaId,
-                        currentSchemaId);
-                return;
-            }
-
             Optional<SchemaInfo> optSchema = 
zooKeeperClient.getSchemaById(tablePath, schemaId);
             if (!optSchema.isPresent()) {
                 LOG.error("No schema for table {} in zookeeper.", tablePath);
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/entity/BatchRegisterLeadAndIsr.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/entity/BatchRegisterLeadAndIsr.java
index fad96a596..460562bc1 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/entity/BatchRegisterLeadAndIsr.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/entity/BatchRegisterLeadAndIsr.java
@@ -55,7 +55,7 @@ public class BatchRegisterLeadAndIsr {
                             tableBucket, leaderAndIsr, partitionName, 
liveReplicas));
         } else {
             throw new IllegalArgumentException(
-                    "Try toadd a bucket with different tableId or partitionId 
in collection when try to batch register to Zookeeper."
+                    "Try to add a bucket with different tableId or partitionId 
in collection when try to batch register to Zookeeper."
                             + "batch tableId="
                             + tableId
                             + " batch partitionId:"
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvRecoverHelper.java 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvRecoverHelper.java
index 1f1c4192d..96bb0eba8 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvRecoverHelper.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvRecoverHelper.java
@@ -66,15 +66,6 @@ public class KvRecoverHelper {
 
     private InternalRow.FieldGetter[] currentFieldGetters;
 
-    public KvRecoverHelper(
-            KvTablet kvTablet,
-            LogTablet logTablet,
-            long recoverPointOffset,
-            KvRecoverContext recoverContext,
-            KvFormat kvFormat) {
-        throw new UnsupportedOperationException();
-    }
-
     public KvRecoverHelper(
             KvTablet kvTablet,
             LogTablet logTablet,
@@ -136,64 +127,52 @@ public class KvRecoverHelper {
             FetchIsolation fetchIsolation,
             ThrowingConsumer<KeyValueAndLogOffset, Exception> 
resumeRecordConsumer)
             throws Exception {
-        long nextFetchOffset = startFetchOffset;
-        while (true) {
-            LogRecords logRecords =
-                    logTablet
-                            .read(
-                                    nextFetchOffset,
-                                    recoverContext.maxFetchLogSizeInRecoverKv,
-                                    fetchIsolation,
-                                    true,
-                                    null)
-                            .getRecords();
-            if (logRecords == MemoryLogRecords.EMPTY) {
-                break;
-            }
-
-            for (LogRecordBatch logRecordBatch : logRecords.batches()) {
-                //                short schemaId = logRecordBatch.schemaId();
-                //                if (currentSchemaId == null) {
-                //                    initSchema(schemaId);
-                //                } else if (currentSchemaId != schemaId) {
-                //                    throw new KvStorageException(
-                //                            String.format(
-                //                                    "Can't recover kv tablet 
for table bucket from
-                // log %s since the schema changes from schema id %d to schema 
id %d. "
-                //                                            + "Currently, 
schema change is not
-                // supported.",
-                //                                    
recoverContext.tableBucket, currentSchemaId,
-                // schemaId));
-                //                }
+        try (LogRecordReadContext readContext =
+                LogRecordReadContext.createArrowReadContext(
+                        currentRowType, currentSchemaId, schemaGetter)) {
+            long nextFetchOffset = startFetchOffset;
+            while (true) {
+                LogRecords logRecords =
+                        logTablet
+                                .read(
+                                        nextFetchOffset,
+                                        
recoverContext.maxFetchLogSizeInRecoverKv,
+                                        fetchIsolation,
+                                        true,
+                                        null)
+                                .getRecords();
+                if (logRecords == MemoryLogRecords.EMPTY) {
+                    break;
+                }
 
-                // todo: currentRowType和currentSchemaId无法对齐
-                try (LogRecordReadContext readContext =
-                                LogRecordReadContext.createArrowReadContext(
-                                        currentRowType, currentSchemaId, 
schemaGetter);
-                        CloseableIterator<LogRecord> logRecordIter =
-                                logRecordBatch.records(readContext)) {
-                    while (logRecordIter.hasNext()) {
-                        LogRecord logRecord = logRecordIter.next();
-                        if (logRecord.getChangeType() != 
ChangeType.UPDATE_BEFORE) {
-                            InternalRow logRow = logRecord.getRow();
-                            byte[] key = keyEncoder.encodeKey(logRow);
-                            byte[] value = null;
-                            if (logRecord.getChangeType() != 
ChangeType.DELETE) {
-                                // the log row format may not compatible with 
kv row format,
-                                // e.g, arrow vs. compacted, thus needs a 
conversion here.
-                                BinaryRow row = toKvRow(logRecord.getRow());
-                                // todo: short value是否会有问题,感觉可以先check一下
-                                value = 
ValueEncoder.encodeValue(currentSchemaId.shortValue(), row);
+                for (LogRecordBatch logRecordBatch : logRecords.batches()) {
+                    try (CloseableIterator<LogRecord> logRecordIter =
+                            logRecordBatch.records(readContext)) {
+                        while (logRecordIter.hasNext()) {
+                            LogRecord logRecord = logRecordIter.next();
+                            if (logRecord.getChangeType() != 
ChangeType.UPDATE_BEFORE) {
+                                InternalRow logRow = logRecord.getRow();
+                                byte[] key = keyEncoder.encodeKey(logRow);
+                                byte[] value = null;
+                                if (logRecord.getChangeType() != 
ChangeType.DELETE) {
+                                    // the log row format may not compatible 
with kv row format,
+                                    // e.g, arrow vs. compacted, thus needs a 
conversion here.
+                                    BinaryRow row = 
toKvRow(logRecord.getRow());
+                                    value =
+                                            ValueEncoder.encodeValue(
+                                                    
currentSchemaId.shortValue(), row);
+                                }
+                                resumeRecordConsumer.accept(
+                                        new KeyValueAndLogOffset(
+                                                key, value, 
logRecord.logOffset()));
                             }
-                            resumeRecordConsumer.accept(
-                                    new KeyValueAndLogOffset(key, value, 
logRecord.logOffset()));
                         }
                     }
+                    nextFetchOffset = logRecordBatch.nextLogOffset();
                 }
-                nextFetchOffset = logRecordBatch.nextLogOffset();
             }
+            return nextFetchOffset;
         }
-        return nextFetchOffset;
     }
 
     // TODO: this is very in-efficient, because the conversion is CPU heavy. 
Should be optimized in
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
index 176e749f5..70099878e 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
@@ -83,7 +83,6 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-import static org.apache.fluss.utils.SchemaUtil.getTargetColumns;
 import static org.apache.fluss.utils.concurrent.LockUtils.inReadLock;
 import static org.apache.fluss.utils.concurrent.LockUtils.inWriteLock;
 
@@ -271,7 +270,7 @@ public final class KvTablet {
                     // schema1: old data's schema -> valueDecoder
                     // schema2: new data's schema
                     // schema3: current schema (also the latest schema)
-                    // schema1 <= schema2 <= current schema id
+                    // (schema1 <= or >= schema2) <= current schema id
                     short latestSchemaId = (short) schemaInfo.getSchemaId();
                     short schemaIdOfNewData = kvRecords.schemaId();
                     if (schemaIdOfNewData > latestSchemaId || 
schemaIdOfNewData < 0) {
@@ -282,14 +281,10 @@ public final class KvTablet {
                                         + latestSchemaId);
                     }
                     Schema schemaOfNewData = 
schemaGetter.getSchema(schemaIdOfNewData);
+                    // we only support ADD COLUMN, so targetColumns is fine to 
be used directly
                     RowMerger currentMerger =
                             rowMerger.configureTargetColumns(
-                                    schemaIdOfNewData == latestSchemaId
-                                            ? targetColumns
-                                            : getTargetColumns(
-                                                    targetColumns, 
schemaOfNewData, latestSchema),
-                                    latestSchemaId,
-                                    latestSchema);
+                                    targetColumns, latestSchemaId, 
latestSchema);
                     RowType currentRowType = latestSchema.getRowType();
                     WalBuilder walBuilder = createWalBuilder(latestSchemaId, 
currentRowType);
                     walBuilder.setWriterState(kvRecords.writerId(), 
kvRecords.batchSequence());
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotStore.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotStore.java
index b8d941221..3e93cc46a 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotStore.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotStore.java
@@ -99,7 +99,7 @@ public class CompletedSnapshotStore {
      * Synchronously writes the new snapshots to snapshot handle store and 
asynchronously removes
      * older ones.
      *
-     * @param snapshot Completed snapshot toadd.
+     * @param snapshot Completed snapshot to add.
      */
     @VisibleForTesting
     CompletedSnapshot addSnapshotAndSubsumeOldestOne(
@@ -116,7 +116,7 @@ public class CompletedSnapshotStore {
         completedSnapshotHandleStore.add(
                 snapshot.getTableBucket(), snapshot.getSnapshotID(), 
completedSnapshotHandle);
 
-        // Nowadd the new one. If it fails, we don't want to lose existing 
data.
+        // Now add the new one. If it fails, we don't want to lose existing 
data.
         completedSnapshots.addLast(snapshot);
 
         // Remove completed snapshot from queue and snapshotStateHandleStore, 
not discard.
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/SnapshotLocation.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/SnapshotLocation.java
index e09c93b22..fe517f268 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/SnapshotLocation.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/SnapshotLocation.java
@@ -328,7 +328,7 @@ public class SnapshotLocation {
             Exception latestException = null;
             for (int attempt = 0; attempt < 10; attempt++) {
                 try {
-                    // todo: mayadd entropy injection?
+                    // todo: may add entropy injection?
                     this.kvFilePath = createFilePath();
                     this.outStream = fs.create(kvFilePath, 
FileSystem.WriteMode.NO_OVERWRITE);
                     return;
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/metadata/SchemaMetadataManager.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/metadata/ServerSchemaCache.java
similarity index 97%
rename from 
fluss-server/src/main/java/org/apache/fluss/server/metadata/SchemaMetadataManager.java
rename to 
fluss-server/src/main/java/org/apache/fluss/server/metadata/ServerSchemaCache.java
index 6bed8dfe3..0f98b092d 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/metadata/SchemaMetadataManager.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/metadata/ServerSchemaCache.java
@@ -41,15 +41,14 @@ import java.util.concurrent.ExecutionException;
  * 1. latest schema of each subscribed table, updated by UpdateMetadata 
request. 2. history schemas
  * of each table, updated by lookup from tablet server.
  */
-// TODO SchemaManager
-public class SchemaMetadataManager {
+public class ServerSchemaCache {
 
-    private MetadataManager metadataManager;
+    private final MetadataManager metadataManager;
     private final Map<TablePath, SchemaInfo> latestSchemaByTablePath;
     private final Map<TablePath, Integer> subscriberCounters;
     private final Cache<TableSchemaKey, Schema> schemaCache;
 
-    public SchemaMetadataManager(MetadataManager metadataManager) {
+    public ServerSchemaCache(MetadataManager metadataManager) {
         this.metadataManager = metadataManager;
         // thread safe is guaranteed by subscriberCounters.
         this.subscriberCounters = MapUtils.newConcurrentHashMap();
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/metadata/TabletServerMetadataCache.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/metadata/TabletServerMetadataCache.java
index 6a11a53ae..9a73975ce 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/metadata/TabletServerMetadataCache.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/metadata/TabletServerMetadataCache.java
@@ -68,13 +68,13 @@ public class TabletServerMetadataCache implements 
ServerMetadataCache {
 
     private final MetadataManager metadataManager;
 
-    private final SchemaMetadataManager schemaMetadataManager;
+    private final ServerSchemaCache serverSchemaCache;
 
     // todo: replace this in test with schemaMetadataManager.
     public TabletServerMetadataCache(MetadataManager metadataManager) {
         this.serverMetadataSnapshot = ServerMetadataSnapshot.empty();
         this.metadataManager = metadataManager;
-        this.schemaMetadataManager = new 
SchemaMetadataManager(metadataManager);
+        this.serverSchemaCache = new ServerSchemaCache(metadataManager);
     }
 
     @Override
@@ -142,12 +142,12 @@ public class TabletServerMetadataCache implements 
ServerMetadataCache {
 
     public SchemaGetter subscribeWithInitialSchema(
             TablePath tablePath, int initialSchemaId, Schema initialSchema) {
-        return schemaMetadataManager.subscribeWithInitialSchema(
+        return serverSchemaCache.subscribeWithInitialSchema(
                 tablePath, initialSchemaId, initialSchema);
     }
 
     public void updateLatestSchema(TablePath tablePath, SchemaInfo schemaInfo) 
{
-        schemaMetadataManager.updateLatestSchema(
+        serverSchemaCache.updateLatestSchema(
                 tablePath, (short) schemaInfo.getSchemaId(), 
schemaInfo.getSchema());
     }
 
@@ -207,8 +207,7 @@ public class TabletServerMetadataCache implements 
ServerMetadataCache {
                         // todo: apply schema id and schema info if needs
                         int schemaId = tableInfo.getSchemaId();
                         Schema schema = tableInfo.getSchema();
-                        schemaMetadataManager.updateLatestSchema(
-                                tablePath, (short) schemaId, schema);
+                        serverSchemaCache.updateLatestSchema(tablePath, 
(short) schemaId, schema);
 
                         if (tableId == DELETED_TABLE_ID) {
                             Long removedTableId = 
tableIdByPath.remove(tablePath);
@@ -437,7 +436,7 @@ public class TabletServerMetadataCache implements 
ServerMetadataCache {
     }
 
     @VisibleForTesting
-    public SchemaMetadataManager getSchemaMetadataManager() {
-        return schemaMetadataManager;
+    public ServerSchemaCache getSchemaMetadataManager() {
+        return serverSchemaCache;
     }
 }
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/metadata/SchemaMetadataManagerTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/metadata/ServerSchemaCacheTest.java
similarity index 94%
rename from 
fluss-server/src/test/java/org/apache/fluss/server/metadata/SchemaMetadataManagerTest.java
rename to 
fluss-server/src/test/java/org/apache/fluss/server/metadata/ServerSchemaCacheTest.java
index 672910b04..f72277ba8 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/metadata/SchemaMetadataManagerTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/metadata/ServerSchemaCacheTest.java
@@ -44,13 +44,13 @@ import static 
org.apache.fluss.record.TestData.DATA2_TABLE_DESCRIPTOR;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
-/** Test for {@link SchemaMetadataManager}. */
-public class SchemaMetadataManagerTest {
+/** Test for {@link ServerSchemaCache}. */
+public class ServerSchemaCacheTest {
 
     @Test
     void testPublishAndsubscribeSchemaChange() {
-        SchemaMetadataManager manager =
-                new SchemaMetadataManager(new 
TestingMetadataManager(Collections.emptyList()));
+        ServerSchemaCache manager =
+                new ServerSchemaCache(new 
TestingMetadataManager(Collections.emptyList()));
 
         SchemaGetter subscriber1 =
                 manager.subscribeWithInitialSchema(
@@ -94,8 +94,8 @@ public class SchemaMetadataManagerTest {
                         DATA2_TABLE_DESCRIPTOR,
                         System.currentTimeMillis(),
                         System.currentTimeMillis());
-        SchemaMetadataManager manager =
-                new SchemaMetadataManager(
+        ServerSchemaCache manager =
+                new ServerSchemaCache(
                         new 
TestingMetadataManager(Arrays.asList(DATA1_TABLE_INFO, tableInfo2)));
         SchemaGetter schemaGetter =
                 manager.subscribeWithInitialSchema(DATA1_TABLE_PATH, (short) 
2, DATA2_SCHEMA);
@@ -107,8 +107,8 @@ public class SchemaMetadataManagerTest {
 
     @Test
     void testUnsubscribeSchemaChange() {
-        SchemaMetadataManager manager =
-                new SchemaMetadataManager(new 
TestingMetadataManager(Collections.emptyList()));
+        ServerSchemaCache manager =
+                new ServerSchemaCache(new 
TestingMetadataManager(Collections.emptyList()));
         SchemaGetter subscriber1 =
                 manager.subscribeWithInitialSchema(
                         new TablePath("test_tb", "test_table_1"), (short) 1, 
DATA1_SCHEMA);
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java
index 37f3dd6cb..ef1cb92ba 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java
@@ -51,8 +51,8 @@ import org.apache.fluss.server.coordinator.MetadataManager;
 import org.apache.fluss.server.entity.NotifyLeaderAndIsrData;
 import org.apache.fluss.server.kv.snapshot.CompletedSnapshot;
 import org.apache.fluss.server.kv.snapshot.CompletedSnapshotHandle;
-import org.apache.fluss.server.metadata.SchemaMetadataManager;
 import org.apache.fluss.server.metadata.ServerInfo;
+import org.apache.fluss.server.metadata.ServerSchemaCache;
 import org.apache.fluss.server.metadata.TabletServerMetadataCache;
 import org.apache.fluss.server.replica.Replica;
 import org.apache.fluss.server.replica.ReplicaManager;
@@ -678,10 +678,10 @@ public final class FlussClusterExtension
                         assertThat(leaderAndIsrOpt).isPresent();
                         int leader = leaderAndIsrOpt.get().leader();
                         TabletServer tabletServer = 
getTabletServerById(leader);
-                        SchemaMetadataManager schemaMetadataManager =
+                        ServerSchemaCache serverSchemaCache =
                                 
tabletServer.getMetadataCache().getSchemaMetadataManager();
                         Map<TablePath, SchemaInfo> latestSchemaByTablePath =
-                                
schemaMetadataManager.getLatestSchemaByTablePath();
+                                serverSchemaCache.getLatestSchemaByTablePath();
                         
assertThat(latestSchemaByTablePath).containsKey(tablePath);
                         
assertThat(latestSchemaByTablePath.get(tablePath).getSchemaId())
                                 .isEqualTo(schemaId);


Reply via email to