This is an automated email from the ASF dual-hosted git repository.

ipolyzos pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 5ede2fc13 [BugFix] Fix ArrayIndexOutOfBoundsException in partial 
updates after ADD COLUMN (#2594)
5ede2fc13 is described below

commit 5ede2fc13ebe85fe95fa4a20bdddc342911dfed7
Author: Anton Borisov <[email protected]>
AuthorDate: Thu Apr 2 14:34:03 2026 +0100

    [BugFix] Fix ArrayIndexOutOfBoundsException in partial updates after ADD 
COLUMN (#2594)
    
    * [TASK-2293] Server-side schema alignment for partial updates
    
    * Address comments
    
    * add visiblefortesting annotation as its a fluss convention
    
    ---------
    
    Co-authored-by: ipolyzos <[email protected]>
---
 .../java/org/apache/fluss/server/kv/KvTablet.java  | 187 +++++++++++--
 .../server/kv/KvTabletSchemaEvolutionTest.java     | 306 +++++++++++++++++++++
 2 files changed, 464 insertions(+), 29 deletions(-)

diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
index e72428a02..31f7f90c3 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
@@ -42,9 +42,11 @@ import org.apache.fluss.record.KvRecord;
 import org.apache.fluss.record.KvRecordBatch;
 import org.apache.fluss.record.KvRecordReadContext;
 import org.apache.fluss.row.BinaryRow;
+import org.apache.fluss.row.InternalRow;
 import org.apache.fluss.row.PaddingRow;
 import org.apache.fluss.row.arrow.ArrowWriterPool;
 import org.apache.fluss.row.arrow.ArrowWriterProvider;
+import org.apache.fluss.row.encode.RowEncoder;
 import org.apache.fluss.row.encode.ValueDecoder;
 import org.apache.fluss.rpc.protocol.MergeMode;
 import org.apache.fluss.server.kv.autoinc.AutoIncIDRange;
@@ -86,6 +88,7 @@ import javax.annotation.concurrent.ThreadSafe;
 import java.io.File;
 import java.io.IOException;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Executor;
@@ -374,23 +377,31 @@ public final class KvTablet {
                     short latestSchemaId = (short) schemaInfo.getSchemaId();
                     validateSchemaId(kvRecords.schemaId(), latestSchemaId);
 
+                    // Convert target column positions from client's schema to 
latest
+                    // schema positions using column IDs if schemas differ.
+                    int[] resolvedTargetColumns = targetColumns;
+                    if (targetColumns != null && kvRecords.schemaId() != 
latestSchemaId) {
+                        Schema clientSchema = 
schemaGetter.getSchema(kvRecords.schemaId());
+                        resolvedTargetColumns =
+                                convertTargetColumns(targetColumns, 
clientSchema, latestSchema);
+                    }
+
                     AutoIncrementUpdater currentAutoIncrementUpdater =
                             autoIncrementManager.getUpdaterForSchema(kvFormat, 
latestSchemaId);
 
                     // Validate targetColumns doesn't contain auto-increment 
column
-                    
currentAutoIncrementUpdater.validateTargetColumns(targetColumns);
+                    
currentAutoIncrementUpdater.validateTargetColumns(resolvedTargetColumns);
 
                     // Determine the row merger based on mergeMode:
                     // - DEFAULT: Use the configured merge engine (rowMerger)
                     // - OVERWRITE: Bypass merge engine, use pre-created 
overwriteRowMerger
                     //   to directly replace values (for undo recovery 
scenarios)
-                    // We only support ADD COLUMN, so targetColumns is fine to 
be used directly.
                     RowMerger currentMerger =
                             (mergeMode == MergeMode.OVERWRITE)
                                     ? 
overwriteRowMerger.configureTargetColumns(
-                                            targetColumns, latestSchemaId, 
latestSchema)
+                                            resolvedTargetColumns, 
latestSchemaId, latestSchema)
                                     : rowMerger.configureTargetColumns(
-                                            targetColumns, latestSchemaId, 
latestSchema);
+                                            resolvedTargetColumns, 
latestSchemaId, latestSchema);
 
                     RowType latestRowType = latestSchema.getRowType();
                     WalBuilder walBuilder = createWalBuilder(latestSchemaId, 
latestRowType);
@@ -406,6 +417,8 @@ public final class KvTablet {
                         processKvRecords(
                                 kvRecords,
                                 kvRecords.schemaId(),
+                                latestSchemaId,
+                                latestSchema,
                                 currentMerger,
                                 currentAutoIncrementUpdater,
                                 walBuilder,
@@ -460,6 +473,8 @@ public final class KvTablet {
     private void processKvRecords(
             KvRecordBatch kvRecords,
             short schemaIdOfNewData,
+            short latestSchemaId,
+            Schema latestSchema,
             RowMerger currentMerger,
             AutoIncrementUpdater autoIncrementUpdater,
             WalBuilder walBuilder,
@@ -473,32 +488,42 @@ public final class KvTablet {
                 KvRecordReadContext.createReadContext(kvFormat, schemaGetter);
         ValueDecoder valueDecoder = new ValueDecoder(schemaGetter, kvFormat);
 
-        for (KvRecord kvRecord : kvRecords.records(readContext)) {
-            byte[] keyBytes = BytesUtils.toArray(kvRecord.getKey());
-            KvPreWriteBuffer.Key key = KvPreWriteBuffer.Key.of(keyBytes);
-            BinaryRow row = kvRecord.getRow();
-            BinaryValue currentValue = row == null ? null : new 
BinaryValue(schemaIdOfNewData, row);
+        try (SchemaAlignmentContext alignmentContext =
+                new SchemaAlignmentContext(latestSchemaId, latestSchema, 
kvFormat)) {
+            for (KvRecord kvRecord : kvRecords.records(readContext)) {
+                byte[] keyBytes = BytesUtils.toArray(kvRecord.getKey());
+                KvPreWriteBuffer.Key key = KvPreWriteBuffer.Key.of(keyBytes);
+                BinaryRow row = kvRecord.getRow();
+                BinaryValue currentValue =
+                        row == null ? null : new 
BinaryValue(schemaIdOfNewData, row);
+
+                if (currentValue != null && schemaIdOfNewData != 
latestSchemaId) {
+                    currentValue = alignToLatestSchema(currentValue, 
alignmentContext);
+                }
 
-            if (currentValue == null) {
-                logOffset =
-                        processDeletion(
-                                key,
-                                currentMerger,
-                                valueDecoder,
-                                walBuilder,
-                                latestSchemaRow,
-                                logOffset);
-            } else {
-                logOffset =
-                        processUpsert(
-                                key,
-                                currentValue,
-                                currentMerger,
-                                autoIncrementUpdater,
-                                valueDecoder,
-                                walBuilder,
-                                latestSchemaRow,
-                                logOffset);
+                if (currentValue == null) {
+                    logOffset =
+                            processDeletion(
+                                    key,
+                                    currentMerger,
+                                    alignmentContext,
+                                    valueDecoder,
+                                    walBuilder,
+                                    latestSchemaRow,
+                                    logOffset);
+                } else {
+                    logOffset =
+                            processUpsert(
+                                    key,
+                                    currentValue,
+                                    currentMerger,
+                                    alignmentContext,
+                                    autoIncrementUpdater,
+                                    valueDecoder,
+                                    walBuilder,
+                                    latestSchemaRow,
+                                    logOffset);
+                }
             }
         }
     }
@@ -506,6 +531,7 @@ public final class KvTablet {
     private long processDeletion(
             KvPreWriteBuffer.Key key,
             RowMerger currentMerger,
+            SchemaAlignmentContext alignmentContext,
             ValueDecoder valueDecoder,
             WalBuilder walBuilder,
             PaddingRow latestSchemaRow,
@@ -530,6 +556,9 @@ public final class KvTablet {
         }
 
         BinaryValue oldValue = valueDecoder.decodeValue(oldValueBytes);
+        if (oldValue.schemaId != alignmentContext.latestSchemaId) {
+            oldValue = alignToLatestSchema(oldValue, alignmentContext);
+        }
         BinaryValue newValue = currentMerger.delete(oldValue);
 
         // if newValue is null, it means the row should be deleted
@@ -544,6 +573,7 @@ public final class KvTablet {
             KvPreWriteBuffer.Key key,
             BinaryValue currentValue,
             RowMerger currentMerger,
+            SchemaAlignmentContext alignmentContext,
             AutoIncrementUpdater autoIncrementUpdater,
             ValueDecoder valueDecoder,
             WalBuilder walBuilder,
@@ -572,6 +602,9 @@ public final class KvTablet {
         }
 
         BinaryValue oldValue = valueDecoder.decodeValue(oldValueBytes);
+        if (oldValue.schemaId != alignmentContext.latestSchemaId) {
+            oldValue = alignToLatestSchema(oldValue, alignmentContext);
+        }
         BinaryValue newValue = currentMerger.merge(oldValue, currentValue);
 
         if (newValue == oldValue) {
@@ -628,6 +661,102 @@ public final class KvTablet {
         }
     }
 
+    /** Batch-constant state for aligning rows to the latest schema. */
+    private static class SchemaAlignmentContext implements AutoCloseable {
+        final short latestSchemaId;
+        final List<Integer> targetColIds;
+        final RowEncoder encoder;
+        final Map<Short, SourceSchemaMapping> cache = new HashMap<>();
+
+        SchemaAlignmentContext(short latestSchemaId, Schema latestSchema, 
KvFormat kvFormat) {
+            this.latestSchemaId = latestSchemaId;
+            this.targetColIds = latestSchema.getColumnIds();
+            this.encoder = RowEncoder.create(kvFormat, 
latestSchema.getRowType());
+        }
+
+        @Override
+        public void close() throws Exception {
+            encoder.close();
+        }
+
+        /** Cached field getters and column-id→position map for a single 
source schema. */
+        private static class SourceSchemaMapping {
+            final Map<Integer, Integer> idToPos;
+            final InternalRow.FieldGetter[] getters;
+
+            SourceSchemaMapping(Schema sourceSchema) {
+                List<Integer> sourceColIds = sourceSchema.getColumnIds();
+                this.idToPos = new HashMap<>();
+                for (int i = 0; i < sourceColIds.size(); i++) {
+                    idToPos.put(sourceColIds.get(i), i);
+                }
+                this.getters = 
InternalRow.createFieldGetters(sourceSchema.getRowType());
+            }
+        }
+    }
+
+    /**
+     * Converts a {@link BinaryValue} from its source schema layout to the 
latest schema layout
+     * using column IDs to map positions. New columns (present in latest but 
not in source) are
+     * filled with null. Only call when {@code value.schemaId != 
latestSchemaId}.
+     */
+    private BinaryValue alignToLatestSchema(BinaryValue value, 
SchemaAlignmentContext ctx) {
+        SchemaAlignmentContext.SourceSchemaMapping mapping =
+                ctx.cache.computeIfAbsent(
+                        value.schemaId,
+                        id ->
+                                new SchemaAlignmentContext.SourceSchemaMapping(
+                                        schemaGetter.getSchema(id)));
+
+        ctx.encoder.startNewRow();
+        for (int targetPos = 0; targetPos < ctx.targetColIds.size(); 
targetPos++) {
+            Integer sourcePos = 
mapping.idToPos.get(ctx.targetColIds.get(targetPos));
+            if (sourcePos == null) {
+                // Column added after the source schema — fill with null.
+                ctx.encoder.encodeField(targetPos, null);
+            } else {
+                ctx.encoder.encodeField(
+                        targetPos, 
mapping.getters[sourcePos].getFieldOrNull(value.row));
+            }
+        }
+        // copy() is required: the encoder reuses its internal buffer, so the 
next
+        // startNewRow() would overwrite the row returned here.
+        return new BinaryValue(ctx.latestSchemaId, 
ctx.encoder.finishRow().copy());
+    }
+
+    /**
+     * Converts target column positions from the client's (source) schema to 
the latest (target)
+     * schema using column IDs. This is needed when a client sends a partial 
update using an older
+     * schema whose positions need to be remapped to the latest schema layout.
+     */
+    @VisibleForTesting
+    static int[] convertTargetColumns(int[] positions, Schema sourceSchema, 
Schema targetSchema) {
+        List<Integer> sourceColIds = sourceSchema.getColumnIds();
+        List<Integer> targetColIds = targetSchema.getColumnIds();
+
+        Map<Integer, Integer> targetIdToPos = new HashMap<>();
+        for (int i = 0; i < targetColIds.size(); i++) {
+            targetIdToPos.put(targetColIds.get(i), i);
+        }
+
+        int resultCount = 0;
+        int[] buffer = new int[positions.length];
+        for (int position : positions) {
+            int colId = sourceColIds.get(position);
+            Integer targetPos = targetIdToPos.get(colId);
+            if (targetPos != null) {
+                buffer[resultCount++] = targetPos;
+            }
+            // Column was dropped in the latest schema — skip it.
+        }
+        if (resultCount == positions.length) {
+            return buffer;
+        }
+        int[] result = new int[resultCount];
+        System.arraycopy(buffer, 0, result, 0, resultCount);
+        return result;
+    }
+
     private WalBuilder createWalBuilder(int schemaId, RowType rowType) throws 
Exception {
         switch (logFormat) {
             case INDEXED:
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletSchemaEvolutionTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletSchemaEvolutionTest.java
new file mode 100644
index 000000000..c9e5f7143
--- /dev/null
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletSchemaEvolutionTest.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.kv;
+
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.config.TableConfig;
+import org.apache.fluss.memory.TestingMemorySegmentPool;
+import org.apache.fluss.metadata.KvFormat;
+import org.apache.fluss.metadata.LogFormat;
+import org.apache.fluss.metadata.PhysicalTablePath;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.SchemaInfo;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.record.ChangeType;
+import org.apache.fluss.record.KvRecordBatch;
+import org.apache.fluss.record.KvRecordTestUtils;
+import org.apache.fluss.record.LogRecords;
+import org.apache.fluss.record.MemoryLogRecords;
+import org.apache.fluss.record.TestingSchemaGetter;
+import org.apache.fluss.server.kv.autoinc.AutoIncrementManager;
+import org.apache.fluss.server.kv.autoinc.TestingSequenceGeneratorFactory;
+import org.apache.fluss.server.kv.rowmerger.RowMerger;
+import org.apache.fluss.server.log.FetchIsolation;
+import org.apache.fluss.server.log.LogTablet;
+import org.apache.fluss.server.log.LogTestUtils;
+import org.apache.fluss.server.metrics.group.TestingMetricGroups;
+import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator;
+import org.apache.fluss.types.DataTypes;
+import org.apache.fluss.types.RowType;
+import org.apache.fluss.utils.clock.SystemClock;
+import org.apache.fluss.utils.concurrent.FlussScheduler;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static 
org.apache.fluss.compression.ArrowCompressionInfo.DEFAULT_COMPRESSION;
+import static org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE;
+import static org.apache.fluss.record.LogRecordBatchFormat.NO_BATCH_SEQUENCE;
+import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID;
+import static 
org.apache.fluss.testutils.DataTestUtils.createBasicMemoryLogRecords;
+import static org.apache.fluss.testutils.LogRecordsAssert.assertThatLogRecords;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Tests for {@link KvTablet} schema evolution handling.
+ *
+ * <p>Verifies that server-side row conversion (via column IDs) correctly 
handles partial updates
+ * when the incoming batch or stored KV rows use an older schema.
+ */
+class KvTabletSchemaEvolutionTest {
+
+    private static final short SCHEMA_ID_V0 = 0;
+    private static final short SCHEMA_ID_V1 = 1;
+
+    // Schema v0: {a INT PK, b STRING, c STRING}
+    private static final Schema SCHEMA_V0 =
+            Schema.newBuilder()
+                    .column("a", DataTypes.INT())
+                    .column("b", DataTypes.STRING())
+                    .column("c", DataTypes.STRING())
+                    .primaryKey("a")
+                    .build();
+
+    // Schema v1: ADD COLUMN d STRING (nullable)
+    // {a INT PK, b STRING, c STRING, d STRING}
+    private static final Schema SCHEMA_V1 =
+            Schema.newBuilder().fromSchema(SCHEMA_V0).column("d", 
DataTypes.STRING()).build();
+
+    private static final RowType ROW_TYPE_V0 = SCHEMA_V0.getRowType();
+    private static final RowType ROW_TYPE_V1 = SCHEMA_V1.getRowType();
+
+    private final Configuration conf = new Configuration();
+
+    private @TempDir File tempLogDir;
+    private @TempDir File tmpKvDir;
+
+    private TestingSchemaGetter schemaGetter;
+    private LogTablet logTablet;
+    private KvTablet kvTablet;
+
+    @BeforeEach
+    void setUp() throws Exception {
+        TablePath tablePath = TablePath.of("testDb", "test_schema_evolution");
+        PhysicalTablePath physicalTablePath = PhysicalTablePath.of(tablePath);
+        // Start with schema v0
+        schemaGetter = new TestingSchemaGetter(new SchemaInfo(SCHEMA_V0, 
SCHEMA_ID_V0));
+
+        File logTabletDir =
+                LogTestUtils.makeRandomLogTabletDir(
+                        tempLogDir,
+                        physicalTablePath.getDatabaseName(),
+                        0L,
+                        physicalTablePath.getTableName());
+        logTablet =
+                LogTablet.create(
+                        physicalTablePath,
+                        logTabletDir,
+                        conf,
+                        TestingMetricGroups.TABLET_SERVER_METRICS,
+                        0,
+                        new FlussScheduler(1),
+                        LogFormat.ARROW,
+                        1,
+                        true,
+                        SystemClock.getInstance(),
+                        true);
+
+        TableBucket tableBucket = logTablet.getTableBucket();
+        TableConfig tableConf = new TableConfig(new Configuration());
+        RowMerger rowMerger = RowMerger.create(tableConf, KvFormat.COMPACTED, 
schemaGetter);
+        AutoIncrementManager autoIncrementManager =
+                new AutoIncrementManager(
+                        schemaGetter, tablePath, tableConf, new 
TestingSequenceGeneratorFactory());
+
+        kvTablet =
+                KvTablet.create(
+                        physicalTablePath,
+                        tableBucket,
+                        logTablet,
+                        tmpKvDir,
+                        conf,
+                        TestingMetricGroups.TABLET_SERVER_METRICS,
+                        new RootAllocator(Long.MAX_VALUE),
+                        new TestingMemorySegmentPool(10 * 1024),
+                        KvFormat.COMPACTED,
+                        rowMerger,
+                        DEFAULT_COMPRESSION,
+                        schemaGetter,
+                        tableConf.getChangelogImage(),
+                        KvManager.getDefaultRateLimiter(),
+                        autoIncrementManager);
+    }
+
+    @AfterEach
+    void tearDown() throws Exception {
+        if (kvTablet != null) {
+            kvTablet.close();
+        }
+        if (logTablet != null) {
+            logTablet.close();
+        }
+    }
+
+    @Test
+    void testConvertTargetColumns_droppedColumnIsFiltered() {
+        // Build a target schema that doesn't contain column c (id=2).
+        // This simulates a DROP COLUMN scenario.
+        Schema schemaWithoutC =
+                Schema.newBuilder()
+                        .column("a", DataTypes.INT())
+                        .column("b", DataTypes.STRING())
+                        .primaryKey("a")
+                        .build();
+
+        // Positions [0, 2] in v0 = columns a(id=0) and c(id=2)
+        // Column c(id=2) does not exist in schemaWithoutC (which has ids 0, 1)
+        // → c should be silently dropped from the result
+        int[] positions = {0, 2};
+        int[] result = KvTablet.convertTargetColumns(positions, SCHEMA_V0, 
schemaWithoutC);
+        // Only column a(id=0) survives → mapped to position 0 in 
schemaWithoutC
+        assertThat(result).containsExactly(0);
+    }
+
+    @Test
+    void testPartialUpdateAfterAddColumn() throws Exception {
+        KvRecordTestUtils.KvRecordFactory recordFactoryV0 =
+                KvRecordTestUtils.KvRecordFactory.of(ROW_TYPE_V0);
+        KvRecordTestUtils.KvRecordBatchFactory batchFactoryV0 =
+                KvRecordTestUtils.KvRecordBatchFactory.of(SCHEMA_ID_V0);
+
+        // Step 1: Insert full row with schema v0: {a=1, b="b_val", c="c_val"}
+        KvRecordBatch insertBatch =
+                batchFactoryV0.ofRecords(
+                        recordFactoryV0.ofRecord(
+                                "k1".getBytes(), new Object[] {1, "b_val", 
"c_val"}));
+        kvTablet.putAsLeader(insertBatch, null);
+
+        // Step 2: Evolve schema to v1 (ADD COLUMN d)
+        schemaGetter.updateLatestSchemaInfo(new SchemaInfo(SCHEMA_V1, 
SCHEMA_ID_V1));
+
+        long beforePartialUpdate = logTablet.localLogEndOffset();
+
+        // Step 3: Send partial update with schemaId=0, targeting columns [0, 
1] (a, b in v0)
+        // Row values: {a=1, b="new_b", c=null} (c is not targeted so its 
value doesn't matter)
+        int[] targetColumns = {0, 1};
+        KvRecordBatch partialBatch =
+                batchFactoryV0.ofRecords(
+                        recordFactoryV0.ofRecord("k1".getBytes(), new Object[] 
{1, "new_b", null}));
+        kvTablet.putAsLeader(partialBatch, targetColumns);
+
+        // Step 4: Verify result: {a=1, b="new_b", c="c_val", d=null}
+        // - b updated to "new_b"
+        // - c preserved from old row
+        // - d is null (new column, not present in old or new data)
+        LogRecords actualLogRecords = readLogRecords(beforePartialUpdate);
+        MemoryLogRecords expectedLogs =
+                logRecords(
+                        ROW_TYPE_V1,
+                        SCHEMA_ID_V1,
+                        beforePartialUpdate,
+                        Arrays.asList(ChangeType.UPDATE_BEFORE, 
ChangeType.UPDATE_AFTER),
+                        Arrays.asList(
+                                new Object[] {
+                                    1, "b_val", "c_val", null
+                                }, // before (old row aligned)
+                                new Object[] {1, "new_b", "c_val", null} // 
after (b updated)
+                                ));
+
+        assertThatLogRecords(actualLogRecords)
+                .withSchema(ROW_TYPE_V1)
+                .assertCheckSum(true)
+                .isEqualTo(expectedLogs);
+    }
+
+    @Test
+    void testDeleteAfterAddColumn() throws Exception {
+        KvRecordTestUtils.KvRecordFactory recordFactoryV0 =
+                KvRecordTestUtils.KvRecordFactory.of(ROW_TYPE_V0);
+        KvRecordTestUtils.KvRecordBatchFactory batchFactoryV0 =
+                KvRecordTestUtils.KvRecordBatchFactory.of(SCHEMA_ID_V0);
+
+        // Step 1: Insert row with schema v0
+        KvRecordBatch insertBatch =
+                batchFactoryV0.ofRecords(
+                        recordFactoryV0.ofRecord(
+                                "k1".getBytes(), new Object[] {1, "b_val", 
"c_val"}));
+        kvTablet.putAsLeader(insertBatch, null);
+
+        // Step 2: Evolve schema
+        schemaGetter.updateLatestSchemaInfo(new SchemaInfo(SCHEMA_V1, 
SCHEMA_ID_V1));
+
+        long beforeDelete = logTablet.localLogEndOffset();
+
+        // Step 3: Delete with schemaId=0
+        KvRecordBatch deleteBatch =
+                
batchFactoryV0.ofRecords(recordFactoryV0.ofRecord("k1".getBytes(), null));
+        kvTablet.putAsLeader(deleteBatch, null);
+
+        // Step 4: Verify DELETE log uses aligned (v1) row
+        LogRecords actualLogRecords = readLogRecords(beforeDelete);
+        MemoryLogRecords expectedLogs =
+                logRecords(
+                        ROW_TYPE_V1,
+                        SCHEMA_ID_V1,
+                        beforeDelete,
+                        Collections.singletonList(ChangeType.DELETE),
+                        Collections.singletonList(new Object[] {1, "b_val", 
"c_val", null}));
+
+        assertThatLogRecords(actualLogRecords)
+                .withSchema(ROW_TYPE_V1)
+                .assertCheckSum(true)
+                .isEqualTo(expectedLogs);
+    }
+
+    // ==================== Helper Methods ====================
+
+    private LogRecords readLogRecords(long startOffset) throws Exception {
+        return logTablet
+                .read(startOffset, Integer.MAX_VALUE, FetchIsolation.LOG_END, 
false, null)
+                .getRecords();
+    }
+
+    private MemoryLogRecords logRecords(
+            RowType rowType,
+            short schemaId,
+            long baseOffset,
+            List<ChangeType> changeTypes,
+            List<Object[]> rows)
+            throws Exception {
+        return createBasicMemoryLogRecords(
+                rowType,
+                schemaId,
+                baseOffset,
+                -1L,
+                CURRENT_LOG_MAGIC_VALUE,
+                NO_WRITER_ID,
+                NO_BATCH_SEQUENCE,
+                changeTypes,
+                rows,
+                LogFormat.ARROW,
+                DEFAULT_COMPRESSION);
+    }
+}

Reply via email to