This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch ci-add-column
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/ci-add-column by this push:
     new 8660164b0 KvTablet
8660164b0 is described below

commit 8660164b0a830f3edf01fb18c5abdb62a271a5ee
Author: Jark Wu <[email protected]>
AuthorDate: Mon Dec 1 17:33:44 2025 +0800

    KvTablet
---
 .../table/scanner/batch/SnapshotFilesReader.java   |   3 +-
 .../java/org/apache/fluss/record/BinaryValue.java  |  67 +++++++++
 .../org/apache/fluss/row/encode/ValueDecoder.java  |   5 +-
 .../java/org/apache/fluss/server/kv/KvTablet.java  | 151 +++++++++------------
 .../server/kv/partialupdate/PartialUpdater.java    |  45 +++---
 .../kv/partialupdate/PartialUpdaterCache.java      |   9 +-
 .../server/kv/rowmerger/DefaultRowMerger.java      |  14 +-
 .../server/kv/rowmerger/FirstRowRowMerger.java     |   8 +-
 .../fluss/server/kv/rowmerger/RowMerger.java       |  24 ++--
 .../server/kv/rowmerger/VersionedRowMerger.java    |   7 +-
 .../server/kv/rowmerger/DefaultRowMergerTest.java  |  57 ++++----
 .../kv/rowmerger/VersionedRowMergerTest.java       |  41 +++---
 .../fluss/server/tablet/TabletServiceITCase.java   |   2 +-
 13 files changed, 249 insertions(+), 184 deletions(-)

diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/SnapshotFilesReader.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/SnapshotFilesReader.java
index a7c5e2969..8a52814f3 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/SnapshotFilesReader.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/SnapshotFilesReader.java
@@ -22,6 +22,7 @@ import org.apache.fluss.exception.FlussRuntimeException;
 import org.apache.fluss.metadata.KvFormat;
 import org.apache.fluss.metadata.Schema;
 import org.apache.fluss.metadata.SchemaGetter;
+import org.apache.fluss.record.BinaryValue;
 import org.apache.fluss.rocksdb.RocksDBHandle;
 import org.apache.fluss.rocksdb.RocksIteratorWrapper;
 import org.apache.fluss.row.InternalRow;
@@ -164,7 +165,7 @@ class SnapshotFilesReader implements 
CloseableIterator<InternalRow> {
         byte[] value = rocksIteratorWrapper.value();
         rocksIteratorWrapper.next();
 
-        ValueDecoder.Value originValue = valueDecoder.decodeValue(value);
+        BinaryValue originValue = valueDecoder.decodeValue(value);
         InternalRow originRow = originValue.row;
         if (targetSchemaId != originValue.schemaId) {
             int[] indexMapping =
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/record/BinaryValue.java 
b/fluss-common/src/main/java/org/apache/fluss/record/BinaryValue.java
new file mode 100644
index 000000000..d02bff81b
--- /dev/null
+++ b/fluss-common/src/main/java/org/apache/fluss/record/BinaryValue.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.record;
+
+import org.apache.fluss.row.BinaryRow;
+import org.apache.fluss.utils.UnsafeUtils;
+
+import java.util.Objects;
+
+/** A value of key-value pair that contains schema id and binary row. */
+public class BinaryValue {
+
+    public static final int SCHEMA_ID_LENGTH = 2;
+
+    public final short schemaId;
+    public final BinaryRow row;
+
+    public BinaryValue(short schemaId, BinaryRow row) {
+        this.schemaId = schemaId;
+        this.row = row;
+    }
+
+    /**
+     * Encode the value (consisted of {@code row} with a {@code schemaId}) to 
a byte array value to
+     * be expected persisted to kv store.
+     */
+    public byte[] encodeValue() {
+        byte[] values = new byte[SCHEMA_ID_LENGTH + row.getSizeInBytes()];
+        UnsafeUtils.putShort(values, 0, schemaId);
+        row.copyTo(values, SCHEMA_ID_LENGTH);
+        return values;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        BinaryValue that = (BinaryValue) o;
+        return schemaId == that.schemaId && Objects.equals(row, that.row);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(schemaId, row);
+    }
+
+    @Override
+    public String toString() {
+        return "BinaryValue{" + "schemaId=" + schemaId + ", row=" + row + '}';
+    }
+}
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/row/encode/ValueDecoder.java 
b/fluss-common/src/main/java/org/apache/fluss/row/encode/ValueDecoder.java
index 5f90ddaef..c7ac792e4 100644
--- a/fluss-common/src/main/java/org/apache/fluss/row/encode/ValueDecoder.java
+++ b/fluss-common/src/main/java/org/apache/fluss/row/encode/ValueDecoder.java
@@ -21,6 +21,7 @@ import org.apache.fluss.memory.MemorySegment;
 import org.apache.fluss.metadata.KvFormat;
 import org.apache.fluss.metadata.Schema;
 import org.apache.fluss.metadata.SchemaGetter;
+import org.apache.fluss.record.BinaryValue;
 import org.apache.fluss.row.BinaryRow;
 import org.apache.fluss.row.decode.RowDecoder;
 import org.apache.fluss.types.DataType;
@@ -48,7 +49,7 @@ public class ValueDecoder {
     }
 
     /** Decode the value bytes and return the schema id and the row encoded in 
the value bytes. */
-    public Value decodeValue(byte[] valueBytes) {
+    public BinaryValue decodeValue(byte[] valueBytes) {
         MemorySegment memorySegment = MemorySegment.wrap(valueBytes);
         short schemaId = memorySegment.getShort(0);
 
@@ -65,7 +66,7 @@ public class ValueDecoder {
         BinaryRow row =
                 rowDecoder.decode(
                         memorySegment, SCHEMA_ID_LENGTH, valueBytes.length - 
SCHEMA_ID_LENGTH);
-        return new Value(schemaId, row);
+        return new BinaryValue(schemaId, row);
     }
 
     /** The schema id and {@link BinaryRow} stored as the value of kv store. */
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
index 70099878e..b6957c5dc 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
@@ -34,16 +34,16 @@ import org.apache.fluss.metadata.SchemaGetter;
 import org.apache.fluss.metadata.SchemaInfo;
 import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.record.BinaryValue;
 import org.apache.fluss.record.ChangeType;
 import org.apache.fluss.record.KvRecord;
 import org.apache.fluss.record.KvRecordBatch;
 import org.apache.fluss.record.KvRecordReadContext;
-import org.apache.fluss.row.InternalRow;
-import org.apache.fluss.row.ProjectedRow;
+import org.apache.fluss.row.BinaryRow;
+import org.apache.fluss.row.PaddingRow;
 import org.apache.fluss.row.arrow.ArrowWriterPool;
 import org.apache.fluss.row.arrow.ArrowWriterProvider;
 import org.apache.fluss.row.encode.ValueDecoder;
-import org.apache.fluss.row.encode.ValueEncoder;
 import org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer;
 import org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer.TruncateReason;
 import org.apache.fluss.server.kv.rocksdb.RocksDBKv;
@@ -254,6 +254,20 @@ public final class KvTablet {
     /**
      * Put the KvRecordBatch into the kv storage, and return the appended wal 
log info.
      *
+     * <p>Schema Evolution Handling:
+     *
+     * <p>We don't allow shema of input kv records to be larger than the 
latest schema id known by
+     * the tablet. Besides, we currently only support ADD COLUMN LAST 
operation, so the input row or
+     * old row must have same or fewer columns than latest schema. This helps 
to simplify the schema
+     * change handling.
+     *
+     * <p>1. We write the kv records into KvStore without converting it into 
latest schema for
+     * performance consideration. We have mechanisms that writer client 
dynamically use latest
+     * schema for writing records.
+     *
+     * <p>2. We always use the latest schema for writing WAL logs, because it 
anyway happens
+     * deserialization&serialization to convert the compacted format into 
Arrow format.
+     *
      * @param kvRecords the kv records to put into
      * @param targetColumns the target columns to put, null if put all columns
      */
@@ -266,28 +280,28 @@ public final class KvTablet {
 
                     SchemaInfo schemaInfo = schemaGetter.getLatestSchemaInfo();
                     Schema latestSchema = schemaInfo.getSchema();
-                    // In the following scenario, we need to support schema 
migration:
-                    // schema1: old data's schema -> valueDecoder
-                    // schema2: new data's schema
-                    // schema3: current schema (also the latest schema)
-                    // (schema1 <= or >= schema2) <= current schema id
                     short latestSchemaId = (short) schemaInfo.getSchemaId();
                     short schemaIdOfNewData = kvRecords.schemaId();
                     if (schemaIdOfNewData > latestSchemaId || 
schemaIdOfNewData < 0) {
+                        // TODO: we may need to support retriable exception 
here
                         throw new SchemaNotExistException(
                                 "Invalid schema id: "
                                         + schemaIdOfNewData
                                         + ", latest schema id: "
                                         + latestSchemaId);
                     }
-                    Schema schemaOfNewData = 
schemaGetter.getSchema(schemaIdOfNewData);
+
                     // we only support ADD COLUMN, so targetColumns is fine to 
be used directly
                     RowMerger currentMerger =
                             rowMerger.configureTargetColumns(
                                     targetColumns, latestSchemaId, 
latestSchema);
-                    RowType currentRowType = latestSchema.getRowType();
-                    WalBuilder walBuilder = createWalBuilder(latestSchemaId, 
currentRowType);
+                    RowType latestRowType = latestSchema.getRowType();
+                    WalBuilder walBuilder = createWalBuilder(latestSchemaId, 
latestRowType);
                     walBuilder.setWriterState(kvRecords.writerId(), 
kvRecords.batchSequence());
+                    // we only support ADD COLUMN LAST, so the BinaryRow after 
RowMerger is
+                    // only has fewer ending columns than latest schema, so we 
pad nulls to
+                    // the end of the BinaryRow to get the latest schema row.
+                    PaddingRow latestSchemaRow = new 
PaddingRow(latestRowType.getFieldCount());
                     // get offset to track the offset corresponded to the kv 
record
                     long logEndOffsetOfPrevBatch = 
logTablet.localLogEndOffset();
                     try {
@@ -296,22 +310,15 @@ public final class KvTablet {
                         // TODO: reuse the read context and decoder
                         KvRecordBatch.ReadContext readContext =
                                 
KvRecordReadContext.createReadContext(kvFormat, schemaGetter);
-
-                        // replace new row with current schema if the schema 
id is older than now
-                        ProjectedRow projectedRow =
-                                ProjectedRow.from(schemaOfNewData, 
latestSchema);
                         ValueDecoder valueDecoder = new 
ValueDecoder(schemaGetter, kvFormat);
                         for (KvRecord kvRecord : 
kvRecords.records(readContext)) {
-
                             byte[] keyBytes = 
BytesUtils.toArray(kvRecord.getKey());
                             KvPreWriteBuffer.Key key = 
KvPreWriteBuffer.Key.of(keyBytes);
-                            InternalRow row = kvRecord.getRow();
-                            if (row != null && schemaIdOfNewData < 
latestSchemaId) {
-                                row = projectedRow.replaceRow(row);
-                            }
-
-                            if (row == null) {
-                                DeleteBehavior deleteBehavior = 
rowMerger.deleteBehavior();
+                            BinaryRow row = kvRecord.getRow();
+                            BinaryValue currentValue =
+                                    row == null ? null : new 
BinaryValue(schemaIdOfNewData, row);
+                            if (currentValue == null) {
+                                DeleteBehavior deleteBehavior = 
currentMerger.deleteBehavior();
                                 if (deleteBehavior == DeleteBehavior.IGNORE) {
                                     // skip delete rows if the merger doesn't 
support yet
                                     continue;
@@ -321,99 +328,69 @@ public final class KvTablet {
                                                     + "The 
table.delete.behavior is set to 'disable'.");
                                 }
                                 // it's for deletion
-                                byte[] oldValue = getFromBufferOrKv(key);
+                                byte[] oldValueBytes = getFromBufferOrKv(key);
 
-                                if (oldValue == null) {
+                                if (oldValueBytes == null) {
                                     // there might be large amount of such 
deletion, so we don't log
                                     LOG.debug(
                                             "The specific key can't be found 
in kv tablet although the kv record is for deletion, "
                                                     + "ignore it directly as 
it doesn't exist in the kv tablet yet.");
                                 } else {
-                                    ValueDecoder.Value oldRowAndSchemaId =
-                                            valueDecoder.decodeValue(oldValue);
-                                    InternalRow oldRow = oldRowAndSchemaId.row;
-                                    if (oldRow != null
-                                            && oldRowAndSchemaId.schemaId != 
latestSchemaId) {
-                                        // todo: 后续想办法复用对应的projected row.
-                                        Schema schemaOfOldData =
-                                                
schemaGetter.getSchema(oldRowAndSchemaId.schemaId);
-                                        oldRow =
-                                                
ProjectedRow.from(schemaOfOldData, latestSchema)
-                                                        .replaceRow(oldRow);
-                                    }
-                                    InternalRow newRow = 
currentMerger.delete(oldRow);
+                                    BinaryValue oldValue = 
valueDecoder.decodeValue(oldValueBytes);
+                                    BinaryValue newValue = 
currentMerger.delete(oldValue);
                                     // if newRow is null, it means the row 
should be deleted
-                                    if (newRow == null) {
-                                        walBuilder.append(ChangeType.DELETE, 
oldRow);
+                                    if (newValue == null) {
+                                        walBuilder.append(
+                                                ChangeType.DELETE,
+                                                
latestSchemaRow.replaceRow(oldValue.row));
                                         kvPreWriteBuffer.delete(key, 
logOffset++);
                                     } else {
                                         // otherwise, it's a partial update, 
should produce -U,+U
-                                        
walBuilder.append(ChangeType.UPDATE_BEFORE, oldRow);
-                                        
walBuilder.append(ChangeType.UPDATE_AFTER, newRow);
+                                        walBuilder.append(
+                                                ChangeType.UPDATE_BEFORE,
+                                                
latestSchemaRow.replaceRow(oldValue.row));
+                                        walBuilder.append(
+                                                ChangeType.UPDATE_AFTER,
+                                                
latestSchemaRow.replaceRow(newValue.row));
                                         kvPreWriteBuffer.put(
-                                                key,
-                                                ValueEncoder.encodeRow(
-                                                        latestSchemaId,
-                                                        kvFormat,
-                                                        currentRowType,
-                                                        newRow),
-                                                logOffset + 1);
+                                                key, newValue.encodeValue(), 
logOffset + 1);
                                         logOffset += 2;
                                     }
                                 }
                             } else {
                                 // upsert operation
-                                byte[] oldValue = getFromBufferOrKv(key);
+                                byte[] oldValueBytes = getFromBufferOrKv(key);
                                 // it's update
-                                if (oldValue != null) {
-                                    ValueDecoder.Value oldRowAndSchemaId =
-                                            valueDecoder.decodeValue(oldValue);
-                                    InternalRow oldRow = oldRowAndSchemaId.row;
-                                    if (oldRow != null
-                                            && oldRowAndSchemaId.schemaId != 
latestSchemaId) {
-                                        // todo: 后续想办法复用对应的projected row.
-                                        Schema schemaOfOldData =
-                                                
schemaGetter.getSchema(oldRowAndSchemaId.schemaId);
-                                        oldRow =
-                                                
ProjectedRow.from(schemaOfOldData, latestSchema)
-                                                        .replaceRow(oldRow);
-                                    }
-
-                                    InternalRow newRow = 
currentMerger.merge(oldRow, row);
-                                    if (newRow == oldRow) {
-                                        // newRow is the same to oldRow, means 
nothing
+                                if (oldValueBytes != null) {
+                                    BinaryValue oldValue = 
valueDecoder.decodeValue(oldValueBytes);
+                                    BinaryValue newValue =
+                                            currentMerger.merge(oldValue, 
currentValue);
+                                    if (newValue == oldValue) {
+                                        // newValue is the same to oldValue, 
means nothing
                                         // happens (no update/delete), and 
input should be ignored
                                         continue;
                                     }
 
-                                    
walBuilder.append(ChangeType.UPDATE_BEFORE, oldRow);
-                                    walBuilder.append(ChangeType.UPDATE_AFTER, 
newRow);
+                                    walBuilder.append(
+                                            ChangeType.UPDATE_BEFORE,
+                                            
latestSchemaRow.replaceRow(oldValue.row));
+                                    walBuilder.append(
+                                            ChangeType.UPDATE_AFTER,
+                                            
latestSchemaRow.replaceRow(newValue.row));
                                     // logOffset is for -U, logOffset + 1 is 
for +U, we need to use
                                     // the log offset for +U
                                     kvPreWriteBuffer.put(
-                                            key,
-                                            ValueEncoder.encodeRow(
-                                                    latestSchemaId,
-                                                    kvFormat,
-                                                    currentRowType,
-                                                    newRow),
-                                            logOffset + 1);
+                                            key, newValue.encodeValue(), 
logOffset + 1);
                                     logOffset += 2;
                                 } else {
                                     // it's insert
-                                    // TODO: we shouldadd guarantees that all 
non-specified
-                                    // columns
+                                    // TODO: we should add guarantees that all 
non-specified columns
                                     //  of the input row are set to null.
-                                    InternalRow newRow = row;
-                                    walBuilder.append(ChangeType.INSERT, 
newRow);
+                                    walBuilder.append(
+                                            ChangeType.INSERT,
+                                            
latestSchemaRow.replaceRow(currentValue.row));
                                     kvPreWriteBuffer.put(
-                                            key,
-                                            ValueEncoder.encodeRow(
-                                                    latestSchemaId,
-                                                    kvFormat,
-                                                    currentRowType,
-                                                    newRow),
-                                            logOffset++);
+                                            key, currentValue.encodeValue(), 
logOffset++);
                                 }
                             }
                         }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/partialupdate/PartialUpdater.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/partialupdate/PartialUpdater.java
index 688ac8dab..d8741e010 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/partialupdate/PartialUpdater.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/partialupdate/PartialUpdater.java
@@ -20,7 +20,7 @@ package org.apache.fluss.server.kv.partialupdate;
 import org.apache.fluss.exception.InvalidTargetColumnException;
 import org.apache.fluss.metadata.KvFormat;
 import org.apache.fluss.metadata.Schema;
-import org.apache.fluss.row.BinaryRow;
+import org.apache.fluss.record.BinaryValue;
 import org.apache.fluss.row.InternalRow;
 import org.apache.fluss.row.encode.RowEncoder;
 import org.apache.fluss.types.DataType;
@@ -34,6 +34,7 @@ import java.util.BitSet;
 @NotThreadSafe
 public class PartialUpdater {
 
+    private final short targetSchemaId;
     private final InternalRow.FieldGetter[] flussFieldGetters;
 
     private final RowEncoder rowEncoder;
@@ -42,7 +43,8 @@ public class PartialUpdater {
     private final BitSet primaryKeyCols = new BitSet();
     private final DataType[] fieldDataTypes;
 
-    public PartialUpdater(KvFormat kvFormat, Schema schema, int[] 
targetColumns) {
+    public PartialUpdater(KvFormat kvFormat, short schemaId, Schema schema, 
int[] targetColumns) {
+        this.targetSchemaId = schemaId;
         for (int targetColumn : targetColumns) {
             partialUpdateCols.set(targetColumn);
         }
@@ -89,43 +91,44 @@ public class PartialUpdater {
     }
 
     /**
-     * Partial update the {@code oldRow} with the given new row {@code 
partialRow}. The {@code
-     * oldRow} may be null, in this case, the field don't exist in the {@code 
partialRow} will be
+     * Partial update the {@code oldValue} with the given new row {@code 
partialValue}. The {@code
+     * oldValue} may be null, in this case, the field don't exist in the 
{@code partialRow} will be
      * set to null.
      *
-     * @param oldRow the old row to be updated
-     * @param partialRow the new row to be updated.
-     * @return the updated row
+     * @param oldValue the old value to be updated
+     * @param partialValue the new value to be updated.
+     * @return the updated value (schema id + row bytes)
      */
-    public BinaryRow updateRow(@Nullable InternalRow oldRow, InternalRow 
partialRow) {
+    public BinaryValue updateRow(@Nullable BinaryValue oldValue, BinaryValue 
partialValue) {
         rowEncoder.startNewRow();
         // write each field
         for (int i = 0; i < fieldDataTypes.length; i++) {
             // use the partial row value
             if (partialUpdateCols.get(i)) {
-                rowEncoder.encodeField(i, 
flussFieldGetters[i].getFieldOrNull(partialRow));
+                rowEncoder.encodeField(i, 
flussFieldGetters[i].getFieldOrNull(partialValue.row));
             } else {
                 // use the old row value
-                if (oldRow == null) {
+                if (oldValue == null) {
                     rowEncoder.encodeField(i, null);
                 } else {
-                    rowEncoder.encodeField(i, 
flussFieldGetters[i].getFieldOrNull(oldRow));
+                    rowEncoder.encodeField(i, 
flussFieldGetters[i].getFieldOrNull(oldValue.row));
                 }
             }
         }
-        return rowEncoder.finishRow();
+        return new BinaryValue(targetSchemaId, rowEncoder.finishRow());
     }
 
     /**
-     * Partial delete the given {@code row}. If all the fields except for 
{@link #partialUpdateCols}
-     * in {@code row} are null, return null. Otherwise, update all the {@link 
#partialUpdateCols} in
-     * the {@code row} except for the primary key columns to null values, 
return the updated row.
+     * Partial delete the given {@code value}. If all the fields except for 
{@link
+     * #partialUpdateCols} in {@code value.row} are null, return null. 
Otherwise, update all the
+     * {@link #partialUpdateCols} in the {@code value.row} except for the 
primary key columns to
+     * null values, return the updated value.
      *
-     * @param row the row to be deleted
-     * @return the row after partial deleted
+     * @param value the value to be deleted
+     * @return the value after partial deleted
      */
-    public @Nullable BinaryRow deleteRow(InternalRow row) {
-        if (isFieldsNull(row, partialUpdateCols)) {
+    public @Nullable BinaryValue deleteRow(BinaryValue value) {
+        if (isFieldsNull(value.row, partialUpdateCols)) {
             return null;
         } else {
             rowEncoder.startNewRow();
@@ -137,10 +140,10 @@ public class PartialUpdater {
                     rowEncoder.encodeField(i, null);
                 } else {
                     // use the old row value
-                    rowEncoder.encodeField(i, 
flussFieldGetters[i].getFieldOrNull(row));
+                    rowEncoder.encodeField(i, 
flussFieldGetters[i].getFieldOrNull(value.row));
                 }
             }
-            return rowEncoder.finishRow();
+            return new BinaryValue(targetSchemaId, rowEncoder.finishRow());
         }
     }
 
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/partialupdate/PartialUpdaterCache.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/partialupdate/PartialUpdaterCache.java
index 3ce22854c..0c227aaf2 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/partialupdate/PartialUpdaterCache.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/partialupdate/PartialUpdaterCache.java
@@ -25,6 +25,7 @@ import com.github.benmanes.caffeine.cache.Caffeine;
 
 import javax.annotation.concurrent.ThreadSafe;
 
+import java.time.Duration;
 import java.util.Arrays;
 
 /** The cache for {@link PartialUpdater}. */
@@ -37,7 +38,11 @@ public class PartialUpdaterCache {
         // currently, the cache is used per-bucket, so we limit the cache size 
to 5 to have a
         // maximal 5 parallel partial updaters. This is a temporary solution 
and should be
         // shared across all buckets in the future.
-        this.rowPartialUpdaters = Caffeine.newBuilder().maximumSize(5).build();
+        this.rowPartialUpdaters =
+                Caffeine.newBuilder()
+                        .maximumSize(5)
+                        .expireAfterAccess(Duration.ofMinutes(5))
+                        .build();
     }
 
     // TODO: extend to tableId and schemaId when the cache is shared across 
all tables
@@ -45,7 +50,7 @@ public class PartialUpdaterCache {
             KvFormat kvFormat, short schemaId, Schema schema, int[] 
targetColumns) {
         return rowPartialUpdaters.get(
                 getPartialUpdaterKey(targetColumns, schemaId),
-                k -> new PartialUpdater(kvFormat, schema, targetColumns));
+                k -> new PartialUpdater(kvFormat, schemaId, schema, 
targetColumns));
     }
 
     private String getPartialUpdaterKey(int[] targetColumns, int schemaId) {
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/DefaultRowMerger.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/DefaultRowMerger.java
index af3500c59..2db73c819 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/DefaultRowMerger.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/DefaultRowMerger.java
@@ -20,7 +20,7 @@ package org.apache.fluss.server.kv.rowmerger;
 import org.apache.fluss.metadata.DeleteBehavior;
 import org.apache.fluss.metadata.KvFormat;
 import org.apache.fluss.metadata.Schema;
-import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.record.BinaryValue;
 import org.apache.fluss.server.kv.partialupdate.PartialUpdater;
 import org.apache.fluss.server.kv.partialupdate.PartialUpdaterCache;
 
@@ -46,14 +46,14 @@ public class DefaultRowMerger implements RowMerger {
 
     @Nullable
     @Override
-    public InternalRow merge(InternalRow oldRow, InternalRow newRow) {
+    public BinaryValue merge(BinaryValue oldValue, BinaryValue newValue) {
         // always retain the new row (latest row)
-        return newRow;
+        return newValue;
     }
 
     @Nullable
     @Override
-    public InternalRow delete(InternalRow oldRow) {
+    public BinaryValue delete(BinaryValue oldRow) {
         // returns null to indicate the row is deleted
         return null;
     }
@@ -98,13 +98,13 @@ public class DefaultRowMerger implements RowMerger {
 
         @Nullable
         @Override
-        public InternalRow merge(InternalRow oldRow, InternalRow newRow) {
-            return partialUpdater.updateRow(oldRow, newRow);
+        public BinaryValue merge(BinaryValue oldValue, BinaryValue newValue) {
+            return partialUpdater.updateRow(oldValue, newValue);
         }
 
         @Nullable
         @Override
-        public InternalRow delete(InternalRow oldRow) {
+        public BinaryValue delete(BinaryValue oldRow) {
             return partialUpdater.deleteRow(oldRow);
         }
 
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/FirstRowRowMerger.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/FirstRowRowMerger.java
index 79592e177..33bd6ee1f 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/FirstRowRowMerger.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/FirstRowRowMerger.java
@@ -20,7 +20,7 @@ package org.apache.fluss.server.kv.rowmerger;
 import org.apache.fluss.metadata.DeleteBehavior;
 import org.apache.fluss.metadata.MergeEngineType;
 import org.apache.fluss.metadata.Schema;
-import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.record.BinaryValue;
 
 import javax.annotation.Nullable;
 
@@ -44,14 +44,14 @@ public class FirstRowRowMerger implements RowMerger {
 
     @Nullable
     @Override
-    public InternalRow merge(InternalRow oldRow, InternalRow newRow) {
+    public BinaryValue merge(BinaryValue oldValue, BinaryValue newValue) {
         // always retain the old row (first row)
-        return oldRow;
+        return oldValue;
     }
 
     @Nullable
     @Override
-    public InternalRow delete(InternalRow oldRow) {
+    public BinaryValue delete(BinaryValue oldRow) {
         throw new UnsupportedOperationException(
                 "DELETE is not supported for the first_row merge engine.");
     }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/RowMerger.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/RowMerger.java
index 5f3414574..89bff8c0a 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/RowMerger.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/RowMerger.java
@@ -23,7 +23,7 @@ import org.apache.fluss.metadata.DeleteBehavior;
 import org.apache.fluss.metadata.KvFormat;
 import org.apache.fluss.metadata.MergeEngineType;
 import org.apache.fluss.metadata.Schema;
-import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.record.BinaryValue;
 
 import javax.annotation.Nullable;
 
@@ -33,14 +33,14 @@ import java.util.Optional;
 public interface RowMerger {
 
     /**
-     * Merge the old row with the new row.
+     * Merge the old value with the new value.
      *
-     * @param oldRow the old row
-     * @param newRow the new row
-     * @return the merged row, if the returned row is the same to the old row, 
then nothing happens
-     *     to the row (no update, no delete).
+     * @param oldValue the old value
+     * @param newValue the new row
+     * @return the merged value, if the returned row is the same to the old 
row, then nothing
+     *     happens to the row (no update, no delete).
      */
-    InternalRow merge(InternalRow oldRow, InternalRow newRow);
+    BinaryValue merge(BinaryValue oldValue, BinaryValue newValue);
 
     /**
      * Merge the old row with a delete row.
@@ -52,7 +52,7 @@ public interface RowMerger {
      * @return the merged row, or null if the row is deleted.
      */
     @Nullable
-    InternalRow delete(InternalRow oldRow);
+    BinaryValue delete(BinaryValue oldRow);
 
     /**
      * The behavior of delete operations on primary key tables.
@@ -61,7 +61,13 @@ public interface RowMerger {
      */
     DeleteBehavior deleteBehavior();
 
-    /** Dynamically configure the target columns to merge and return the 
effective merger. */
+    /**
+     * Dynamically configure the target columns to merge and return the 
effective merger.
+     *
+     * @param targetColumns the partial update target column positions, null 
means full update
+     * @param schemaId the schema id used to generate new rows
+     * @param schema the schema used to generate new rows
+     */
     RowMerger configureTargetColumns(@Nullable int[] targetColumns, short 
schemaId, Schema schema);
 
     /** Create a row merger based on the given configuration. */
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/VersionedRowMerger.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/VersionedRowMerger.java
index 9706230b5..4b555602e 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/VersionedRowMerger.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/VersionedRowMerger.java
@@ -20,6 +20,7 @@ package org.apache.fluss.server.kv.rowmerger;
 import org.apache.fluss.metadata.DeleteBehavior;
 import org.apache.fluss.metadata.MergeEngineType;
 import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.record.BinaryValue;
 import org.apache.fluss.row.InternalRow;
 import org.apache.fluss.row.TimestampLtz;
 import org.apache.fluss.row.TimestampNtz;
@@ -64,14 +65,14 @@ public class VersionedRowMerger implements RowMerger {
 
     @Nullable
     @Override
-    public InternalRow merge(InternalRow oldRow, InternalRow newRow) {
+    public BinaryValue merge(BinaryValue oldValue, BinaryValue newValue) {
         // return newRow if newRow's version is larger or equal than oldRow's 
version
-        return versionComparator.compare(oldRow, newRow) <= 0 ? newRow : 
oldRow;
+        return versionComparator.compare(oldValue.row, newValue.row) <= 0 ? 
newValue : oldValue;
     }
 
     @Nullable
     @Override
-    public InternalRow delete(InternalRow oldRow) {
+    public BinaryValue delete(BinaryValue oldRow) {
         throw new UnsupportedOperationException(
                 "DELETE is not supported for the versioned merge engine.");
     }
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/kv/rowmerger/DefaultRowMergerTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/rowmerger/DefaultRowMergerTest.java
index 95afb907d..3b5fc88c4 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/kv/rowmerger/DefaultRowMergerTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/rowmerger/DefaultRowMergerTest.java
@@ -20,9 +20,7 @@ package org.apache.fluss.server.kv.rowmerger;
 import org.apache.fluss.metadata.DeleteBehavior;
 import org.apache.fluss.metadata.KvFormat;
 import org.apache.fluss.metadata.Schema;
-import org.apache.fluss.row.BinaryRow;
-import org.apache.fluss.row.InternalRow;
-import org.apache.fluss.row.ProjectedRow;
+import org.apache.fluss.record.BinaryValue;
 import org.apache.fluss.types.DataTypes;
 
 import org.junit.jupiter.params.ParameterizedTest;
@@ -47,18 +45,21 @@ class DefaultRowMergerTest {
             Schema.newBuilder()
                     .fromColumns(
                             Arrays.asList(
-                                    new Schema.Column("age", 
DataTypes.STRING(), null, (short) 2),
                                     new Schema.Column("id", DataTypes.INT(), 
null, (short) 0),
-                                    new Schema.Column("name", 
DataTypes.STRING(), null, (short) 1)))
+                                    new Schema.Column("name", 
DataTypes.STRING(), null, (short) 1),
+                                    // add new column at end
+                                    new Schema.Column("age", 
DataTypes.STRING(), null, (short) 2)))
                     .primaryKey("id")
                     .build();
 
-    private BinaryRow createBinaryRow(int id, String name) {
-        return compactedRow(SCHEMA.getRowType(), new Object[] {id, name});
+    private BinaryValue createBinaryValue(int id, String name) {
+        return new BinaryValue(
+                (short) 1, compactedRow(SCHEMA.getRowType(), new Object[] {id, 
name}));
     }
 
-    private BinaryRow createBinaryRow(String age, int id, String name) {
-        return compactedRow(SCHEMA_2.getRowType(), new Object[] {age, id, 
name});
+    private BinaryValue createBinaryValue(int id, String name, String age) {
+        return new BinaryValue(
+                (short) 2, compactedRow(SCHEMA_2.getRowType(), new Object[] 
{id, name, age}));
     }
 
     @ParameterizedTest
@@ -67,26 +68,25 @@ class DefaultRowMergerTest {
         DefaultRowMerger merger = new DefaultRowMerger(KvFormat.COMPACTED, 
deleteBehavior);
         merger.configureTargetColumns(null, (byte) 1, SCHEMA);
 
-        InternalRow oldRow = createBinaryRow(1, "old");
-        BinaryRow newRow = createBinaryRow(1, "new");
+        BinaryValue oldValue = createBinaryValue(1, "old");
+        BinaryValue newValue = createBinaryValue(1, "new");
 
         // Test merge operation - should return new row
-        InternalRow mergedRow = merger.merge(oldRow, newRow);
-        assertThat(mergedRow).isSameAs(newRow);
+        BinaryValue mergedValue = merger.merge(oldValue, newValue);
+        assertThat(mergedValue).isSameAs(newValue);
 
         // Test delete operation - should return null (deleted)
-        InternalRow deletedRow = merger.delete(oldRow);
-        assertThat(deletedRow).isNull();
+        BinaryValue deletedValue = merger.delete(oldValue);
+        assertThat(deletedValue).isNull();
 
         // Test supportsDelete - should return true
         assertThat(merger.deleteBehavior()).isEqualTo(deleteBehavior);
 
         // Test schema change.
         merger.configureTargetColumns(null, (byte) 2, SCHEMA_2);
-        oldRow = ProjectedRow.from(SCHEMA, SCHEMA_2).replaceRow(oldRow);
-        newRow = createBinaryRow("20", 1, "new2");
-        assertThat(merger.merge(oldRow, newRow)).isSameAs(newRow);
-        assertThat(merger.delete(newRow)).isNull();
+        newValue = createBinaryValue(1, "new2", "20");
+        assertThat(merger.merge(oldValue, newValue)).isSameAs(newValue);
+        assertThat(merger.delete(newValue)).isNull();
     }
 
     @ParameterizedTest
@@ -98,20 +98,19 @@ class DefaultRowMergerTest {
         RowMerger partialMerger =
                 merger.configureTargetColumns(new int[] {0, 1}, (byte) 1, 
SCHEMA); // id + name
 
-        InternalRow oldRow = createBinaryRow(1, "old");
+        BinaryValue oldValue = createBinaryValue(1, "old");
 
-        InternalRow ignoredRow = partialMerger.delete(oldRow);
-        assertThat(ignoredRow).isNull();
+        BinaryValue ignoredValue = partialMerger.delete(oldValue);
+        assertThat(ignoredValue).isNull();
         assertThat(partialMerger.deleteBehavior()).isEqualTo(deleteBehavior);
 
-        assertThat(partialMerger.merge(null, oldRow)).isEqualTo(oldRow);
+        assertThat(partialMerger.merge(null, oldValue)).isEqualTo(oldValue);
 
         // schema change then partial update (except name column).
-        partialMerger = merger.configureTargetColumns(new int[] {0, 1}, (byte) 
2, SCHEMA_2);
-        oldRow = ProjectedRow.from(SCHEMA, SCHEMA_2).replaceRow(oldRow);
-        BinaryRow newRow = createBinaryRow("20", 1, null);
-        BinaryRow mergeRow = createBinaryRow("20", 1, "old");
-        assertThat(partialMerger.merge(oldRow, newRow)).isEqualTo(mergeRow);
-        
assertThat(partialMerger.delete(mergeRow)).isEqualTo(createBinaryRow(null, 1, 
"old"));
+        partialMerger = merger.configureTargetColumns(new int[] {0, 2}, (byte) 
2, SCHEMA_2);
+        BinaryValue newValue = createBinaryValue(1, null, "20");
+        BinaryValue mergeValue = createBinaryValue(1, "old", "20");
+        assertThat(partialMerger.merge(oldValue, 
newValue)).isEqualTo(mergeValue);
+        
assertThat(partialMerger.delete(mergeValue)).isEqualTo(createBinaryValue(1, 
"old", null));
     }
 }
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/kv/rowmerger/VersionedRowMergerTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/rowmerger/VersionedRowMergerTest.java
index 008c2fafa..9163b23b0 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/kv/rowmerger/VersionedRowMergerTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/rowmerger/VersionedRowMergerTest.java
@@ -19,9 +19,7 @@ package org.apache.fluss.server.kv.rowmerger;
 
 import org.apache.fluss.metadata.DeleteBehavior;
 import org.apache.fluss.metadata.Schema;
-import org.apache.fluss.row.BinaryRow;
-import org.apache.fluss.row.InternalRow;
-import org.apache.fluss.row.ProjectedRow;
+import org.apache.fluss.record.BinaryValue;
 import org.apache.fluss.row.TimestampLtz;
 import org.apache.fluss.row.TimestampNtz;
 import org.apache.fluss.types.DataType;
@@ -132,13 +130,13 @@ class VersionedRowMergerTest {
         merger.configureTargetColumns(null, (short) 1, schema);
 
         for (TestSpec testSpec : testSpecs) {
-            BinaryRow oldRow = compactedRow(rowType, new Object[] 
{testSpec.oldValue, "dummy"});
-            BinaryRow newRow = compactedRow(rowType, new Object[] 
{testSpec.newValue, "dummy"});
-            InternalRow mergedRow = merger.merge(oldRow, newRow);
+            BinaryValue oldValue = binaryValue(rowType, new Object[] 
{testSpec.oldValue, "dummy"});
+            BinaryValue newValue = binaryValue(rowType, new Object[] 
{testSpec.newValue, "dummy"});
+            BinaryValue mergedValue = merger.merge(oldValue, newValue);
             if (testSpec.expected.equals("old")) {
-                assertThat(mergedRow).isSameAs(oldRow);
+                assertThat(mergedValue).isSameAs(oldValue);
             } else if (testSpec.expected.equals("new")) {
-                assertThat(mergedRow).isSameAs(newRow);
+                assertThat(mergedValue).isSameAs(newValue);
             } else {
                 throw new IllegalArgumentException("Unknown expected value: " 
+ testSpec.expected);
             }
@@ -169,27 +167,29 @@ class VersionedRowMergerTest {
         RowType rowType = schema.getRowType();
         VersionedRowMerger merger = new VersionedRowMerger("a", 
DeleteBehavior.DISABLE);
         merger.configureTargetColumns(null, (short) 1, schema);
-        InternalRow oldRow = compactedRow(rowType, new Object[] {11, "dummy"});
-        BinaryRow newRow = compactedRow(rowType, new Object[] {2, "dummy"});
-        assertThat(merger.merge(oldRow, newRow)).isSameAs(oldRow);
+        BinaryValue oldValue = binaryValue(rowType, new Object[] {11, 
"dummy"});
+        BinaryValue newValue = binaryValue(rowType, new Object[] {2, "dummy"});
+        assertThat(merger.merge(oldValue, newValue)).isSameAs(oldValue);
 
         Schema schema2 =
                 Schema.newBuilder()
                         .fromColumns(
                                 Arrays.asList(
-                                        new Schema.Column("c", 
DataTypes.STRING(), null, (short) 2),
                                         new Schema.Column("a", 
DataTypes.INT(), null, (short) 0),
+                                        new Schema.Column("b", 
DataTypes.STRING(), null, (short) 1),
+                                        // add new column at end
                                         new Schema.Column(
-                                                "b", DataTypes.STRING(), null, 
(short) 1)))
+                                                "c", DataTypes.STRING(), null, 
(short) 2)))
                         .build();
         rowType = schema2.getRowType();
         merger.configureTargetColumns(null, (short) 2, schema2);
 
-        oldRow = ProjectedRow.from(schema, schema2).replaceRow(oldRow);
-        newRow = compactedRow(rowType, new Object[] {"a", 2, "dummy"});
-        assertThat(merger.merge(oldRow, newRow)).isSameAs(oldRow);
-        newRow = compactedRow(rowType, new Object[] {"b", 20, "dummy"});
-        assertThat(merger.merge(oldRow, newRow)).isSameAs(newRow);
+        newValue = binaryValue(rowType, new Object[] {2, "dummy", "a"});
+        // version is smaller than old value
+        assertThat(merger.merge(oldValue, newValue)).isSameAs(oldValue);
+        newValue = binaryValue(rowType, new Object[] {20, "dummy", "b"});
+        // version is greater than old value
+        assertThat(merger.merge(oldValue, newValue)).isSameAs(newValue);
     }
 
     private static TimestampNtz timestampNtz(String timestamp) {
@@ -201,6 +201,11 @@ class VersionedRowMergerTest {
         return TimestampLtz.fromInstant(instant);
     }
 
+    private static BinaryValue binaryValue(RowType rowType, Object[] objects) {
+        int schemaId = rowType.getFieldCount() == 2 ? 1 : 2;
+        return new BinaryValue((short) schemaId, compactedRow(rowType, 
objects));
+    }
+
     /** Test specification for {@link VersionedRowMerger}. */
     private static class TestSpec {
         private final Object oldValue;
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java
index 5a5910d1e..8b0b75c35 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java
@@ -330,7 +330,7 @@ public class TabletServiceITCase {
                 tableId,
                 0,
                 Errors.INVALID_COLUMN_PROJECTION.code(),
-                "Projected field id 2 is not contains in [0, 1]");
+                "Projected field id 2 is not contained in [0, 1]");
     }
 
     @Test


Reply via email to