This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git

commit 82b14f2ea4f87ed54f0898b0d2a9e925926cc302
Author: Jark Wu <[email protected]>
AuthorDate: Sat Apr 4 21:16:52 2026 +0800

    Revert "[BugFix] Fix ArrayIndexOutOfBoundsException in partial updates 
after ADD COLUMN (#2594)"
    
    This reverts commit 5ede2fc13ebe85fe95fa4a20bdddc342911dfed7.
---
 .../java/org/apache/fluss/server/kv/KvTablet.java  | 187 ++-----------
 .../server/kv/KvTabletSchemaEvolutionTest.java     | 306 ---------------------
 2 files changed, 29 insertions(+), 464 deletions(-)

diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
index 31f7f90c3..e72428a02 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
@@ -42,11 +42,9 @@ import org.apache.fluss.record.KvRecord;
 import org.apache.fluss.record.KvRecordBatch;
 import org.apache.fluss.record.KvRecordReadContext;
 import org.apache.fluss.row.BinaryRow;
-import org.apache.fluss.row.InternalRow;
 import org.apache.fluss.row.PaddingRow;
 import org.apache.fluss.row.arrow.ArrowWriterPool;
 import org.apache.fluss.row.arrow.ArrowWriterProvider;
-import org.apache.fluss.row.encode.RowEncoder;
 import org.apache.fluss.row.encode.ValueDecoder;
 import org.apache.fluss.rpc.protocol.MergeMode;
 import org.apache.fluss.server.kv.autoinc.AutoIncIDRange;
@@ -88,7 +86,6 @@ import javax.annotation.concurrent.ThreadSafe;
 import java.io.File;
 import java.io.IOException;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Executor;
@@ -377,31 +374,23 @@ public final class KvTablet {
                     short latestSchemaId = (short) schemaInfo.getSchemaId();
                     validateSchemaId(kvRecords.schemaId(), latestSchemaId);
 
-                    // Convert target column positions from client's schema to 
latest
-                    // schema positions using column IDs if schemas differ.
-                    int[] resolvedTargetColumns = targetColumns;
-                    if (targetColumns != null && kvRecords.schemaId() != 
latestSchemaId) {
-                        Schema clientSchema = 
schemaGetter.getSchema(kvRecords.schemaId());
-                        resolvedTargetColumns =
-                                convertTargetColumns(targetColumns, 
clientSchema, latestSchema);
-                    }
-
                     AutoIncrementUpdater currentAutoIncrementUpdater =
                             autoIncrementManager.getUpdaterForSchema(kvFormat, 
latestSchemaId);
 
                     // Validate targetColumns doesn't contain auto-increment 
column
-                    
currentAutoIncrementUpdater.validateTargetColumns(resolvedTargetColumns);
+                    
currentAutoIncrementUpdater.validateTargetColumns(targetColumns);
 
                     // Determine the row merger based on mergeMode:
                     // - DEFAULT: Use the configured merge engine (rowMerger)
                     // - OVERWRITE: Bypass merge engine, use pre-created 
overwriteRowMerger
                     //   to directly replace values (for undo recovery 
scenarios)
+                    // We only support ADD COLUMN, so targetColumns is fine to 
be used directly.
                     RowMerger currentMerger =
                             (mergeMode == MergeMode.OVERWRITE)
                                     ? 
overwriteRowMerger.configureTargetColumns(
-                                            resolvedTargetColumns, 
latestSchemaId, latestSchema)
+                                            targetColumns, latestSchemaId, 
latestSchema)
                                     : rowMerger.configureTargetColumns(
-                                            resolvedTargetColumns, 
latestSchemaId, latestSchema);
+                                            targetColumns, latestSchemaId, 
latestSchema);
 
                     RowType latestRowType = latestSchema.getRowType();
                     WalBuilder walBuilder = createWalBuilder(latestSchemaId, 
latestRowType);
@@ -417,8 +406,6 @@ public final class KvTablet {
                         processKvRecords(
                                 kvRecords,
                                 kvRecords.schemaId(),
-                                latestSchemaId,
-                                latestSchema,
                                 currentMerger,
                                 currentAutoIncrementUpdater,
                                 walBuilder,
@@ -473,8 +460,6 @@ public final class KvTablet {
     private void processKvRecords(
             KvRecordBatch kvRecords,
             short schemaIdOfNewData,
-            short latestSchemaId,
-            Schema latestSchema,
             RowMerger currentMerger,
             AutoIncrementUpdater autoIncrementUpdater,
             WalBuilder walBuilder,
@@ -488,42 +473,32 @@ public final class KvTablet {
                 KvRecordReadContext.createReadContext(kvFormat, schemaGetter);
         ValueDecoder valueDecoder = new ValueDecoder(schemaGetter, kvFormat);
 
-        try (SchemaAlignmentContext alignmentContext =
-                new SchemaAlignmentContext(latestSchemaId, latestSchema, 
kvFormat)) {
-            for (KvRecord kvRecord : kvRecords.records(readContext)) {
-                byte[] keyBytes = BytesUtils.toArray(kvRecord.getKey());
-                KvPreWriteBuffer.Key key = KvPreWriteBuffer.Key.of(keyBytes);
-                BinaryRow row = kvRecord.getRow();
-                BinaryValue currentValue =
-                        row == null ? null : new 
BinaryValue(schemaIdOfNewData, row);
-
-                if (currentValue != null && schemaIdOfNewData != 
latestSchemaId) {
-                    currentValue = alignToLatestSchema(currentValue, 
alignmentContext);
-                }
+        for (KvRecord kvRecord : kvRecords.records(readContext)) {
+            byte[] keyBytes = BytesUtils.toArray(kvRecord.getKey());
+            KvPreWriteBuffer.Key key = KvPreWriteBuffer.Key.of(keyBytes);
+            BinaryRow row = kvRecord.getRow();
+            BinaryValue currentValue = row == null ? null : new 
BinaryValue(schemaIdOfNewData, row);
 
-                if (currentValue == null) {
-                    logOffset =
-                            processDeletion(
-                                    key,
-                                    currentMerger,
-                                    alignmentContext,
-                                    valueDecoder,
-                                    walBuilder,
-                                    latestSchemaRow,
-                                    logOffset);
-                } else {
-                    logOffset =
-                            processUpsert(
-                                    key,
-                                    currentValue,
-                                    currentMerger,
-                                    alignmentContext,
-                                    autoIncrementUpdater,
-                                    valueDecoder,
-                                    walBuilder,
-                                    latestSchemaRow,
-                                    logOffset);
-                }
+            if (currentValue == null) {
+                logOffset =
+                        processDeletion(
+                                key,
+                                currentMerger,
+                                valueDecoder,
+                                walBuilder,
+                                latestSchemaRow,
+                                logOffset);
+            } else {
+                logOffset =
+                        processUpsert(
+                                key,
+                                currentValue,
+                                currentMerger,
+                                autoIncrementUpdater,
+                                valueDecoder,
+                                walBuilder,
+                                latestSchemaRow,
+                                logOffset);
             }
         }
     }
@@ -531,7 +506,6 @@ public final class KvTablet {
     private long processDeletion(
             KvPreWriteBuffer.Key key,
             RowMerger currentMerger,
-            SchemaAlignmentContext alignmentContext,
             ValueDecoder valueDecoder,
             WalBuilder walBuilder,
             PaddingRow latestSchemaRow,
@@ -556,9 +530,6 @@ public final class KvTablet {
         }
 
         BinaryValue oldValue = valueDecoder.decodeValue(oldValueBytes);
-        if (oldValue.schemaId != alignmentContext.latestSchemaId) {
-            oldValue = alignToLatestSchema(oldValue, alignmentContext);
-        }
         BinaryValue newValue = currentMerger.delete(oldValue);
 
         // if newValue is null, it means the row should be deleted
@@ -573,7 +544,6 @@ public final class KvTablet {
             KvPreWriteBuffer.Key key,
             BinaryValue currentValue,
             RowMerger currentMerger,
-            SchemaAlignmentContext alignmentContext,
             AutoIncrementUpdater autoIncrementUpdater,
             ValueDecoder valueDecoder,
             WalBuilder walBuilder,
@@ -602,9 +572,6 @@ public final class KvTablet {
         }
 
         BinaryValue oldValue = valueDecoder.decodeValue(oldValueBytes);
-        if (oldValue.schemaId != alignmentContext.latestSchemaId) {
-            oldValue = alignToLatestSchema(oldValue, alignmentContext);
-        }
         BinaryValue newValue = currentMerger.merge(oldValue, currentValue);
 
         if (newValue == oldValue) {
@@ -661,102 +628,6 @@ public final class KvTablet {
         }
     }
 
-    /** Batch-constant state for aligning rows to the latest schema. */
-    private static class SchemaAlignmentContext implements AutoCloseable {
-        final short latestSchemaId;
-        final List<Integer> targetColIds;
-        final RowEncoder encoder;
-        final Map<Short, SourceSchemaMapping> cache = new HashMap<>();
-
-        SchemaAlignmentContext(short latestSchemaId, Schema latestSchema, 
KvFormat kvFormat) {
-            this.latestSchemaId = latestSchemaId;
-            this.targetColIds = latestSchema.getColumnIds();
-            this.encoder = RowEncoder.create(kvFormat, 
latestSchema.getRowType());
-        }
-
-        @Override
-        public void close() throws Exception {
-            encoder.close();
-        }
-
-        /** Cached field getters and column-id→position map for a single 
source schema. */
-        private static class SourceSchemaMapping {
-            final Map<Integer, Integer> idToPos;
-            final InternalRow.FieldGetter[] getters;
-
-            SourceSchemaMapping(Schema sourceSchema) {
-                List<Integer> sourceColIds = sourceSchema.getColumnIds();
-                this.idToPos = new HashMap<>();
-                for (int i = 0; i < sourceColIds.size(); i++) {
-                    idToPos.put(sourceColIds.get(i), i);
-                }
-                this.getters = 
InternalRow.createFieldGetters(sourceSchema.getRowType());
-            }
-        }
-    }
-
-    /**
-     * Converts a {@link BinaryValue} from its source schema layout to the 
latest schema layout
-     * using column IDs to map positions. New columns (present in latest but 
not in source) are
-     * filled with null. Only call when {@code value.schemaId != 
latestSchemaId}.
-     */
-    private BinaryValue alignToLatestSchema(BinaryValue value, 
SchemaAlignmentContext ctx) {
-        SchemaAlignmentContext.SourceSchemaMapping mapping =
-                ctx.cache.computeIfAbsent(
-                        value.schemaId,
-                        id ->
-                                new SchemaAlignmentContext.SourceSchemaMapping(
-                                        schemaGetter.getSchema(id)));
-
-        ctx.encoder.startNewRow();
-        for (int targetPos = 0; targetPos < ctx.targetColIds.size(); 
targetPos++) {
-            Integer sourcePos = 
mapping.idToPos.get(ctx.targetColIds.get(targetPos));
-            if (sourcePos == null) {
-                // Column added after the source schema — fill with null.
-                ctx.encoder.encodeField(targetPos, null);
-            } else {
-                ctx.encoder.encodeField(
-                        targetPos, 
mapping.getters[sourcePos].getFieldOrNull(value.row));
-            }
-        }
-        // copy() is required: the encoder reuses its internal buffer, so the 
next
-        // startNewRow() would overwrite the row returned here.
-        return new BinaryValue(ctx.latestSchemaId, 
ctx.encoder.finishRow().copy());
-    }
-
-    /**
-     * Converts target column positions from the client's (source) schema to 
the latest (target)
-     * schema using column IDs. This is needed when a client sends a partial 
update using an older
-     * schema whose positions need to be remapped to the latest schema layout.
-     */
-    @VisibleForTesting
-    static int[] convertTargetColumns(int[] positions, Schema sourceSchema, 
Schema targetSchema) {
-        List<Integer> sourceColIds = sourceSchema.getColumnIds();
-        List<Integer> targetColIds = targetSchema.getColumnIds();
-
-        Map<Integer, Integer> targetIdToPos = new HashMap<>();
-        for (int i = 0; i < targetColIds.size(); i++) {
-            targetIdToPos.put(targetColIds.get(i), i);
-        }
-
-        int resultCount = 0;
-        int[] buffer = new int[positions.length];
-        for (int position : positions) {
-            int colId = sourceColIds.get(position);
-            Integer targetPos = targetIdToPos.get(colId);
-            if (targetPos != null) {
-                buffer[resultCount++] = targetPos;
-            }
-            // Column was dropped in the latest schema — skip it.
-        }
-        if (resultCount == positions.length) {
-            return buffer;
-        }
-        int[] result = new int[resultCount];
-        System.arraycopy(buffer, 0, result, 0, resultCount);
-        return result;
-    }
-
     private WalBuilder createWalBuilder(int schemaId, RowType rowType) throws 
Exception {
         switch (logFormat) {
             case INDEXED:
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletSchemaEvolutionTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletSchemaEvolutionTest.java
deleted file mode 100644
index c9e5f7143..000000000
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletSchemaEvolutionTest.java
+++ /dev/null
@@ -1,306 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.fluss.server.kv;
-
-import org.apache.fluss.config.Configuration;
-import org.apache.fluss.config.TableConfig;
-import org.apache.fluss.memory.TestingMemorySegmentPool;
-import org.apache.fluss.metadata.KvFormat;
-import org.apache.fluss.metadata.LogFormat;
-import org.apache.fluss.metadata.PhysicalTablePath;
-import org.apache.fluss.metadata.Schema;
-import org.apache.fluss.metadata.SchemaInfo;
-import org.apache.fluss.metadata.TableBucket;
-import org.apache.fluss.metadata.TablePath;
-import org.apache.fluss.record.ChangeType;
-import org.apache.fluss.record.KvRecordBatch;
-import org.apache.fluss.record.KvRecordTestUtils;
-import org.apache.fluss.record.LogRecords;
-import org.apache.fluss.record.MemoryLogRecords;
-import org.apache.fluss.record.TestingSchemaGetter;
-import org.apache.fluss.server.kv.autoinc.AutoIncrementManager;
-import org.apache.fluss.server.kv.autoinc.TestingSequenceGeneratorFactory;
-import org.apache.fluss.server.kv.rowmerger.RowMerger;
-import org.apache.fluss.server.log.FetchIsolation;
-import org.apache.fluss.server.log.LogTablet;
-import org.apache.fluss.server.log.LogTestUtils;
-import org.apache.fluss.server.metrics.group.TestingMetricGroups;
-import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator;
-import org.apache.fluss.types.DataTypes;
-import org.apache.fluss.types.RowType;
-import org.apache.fluss.utils.clock.SystemClock;
-import org.apache.fluss.utils.concurrent.FlussScheduler;
-
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.io.TempDir;
-
-import java.io.File;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-import static 
org.apache.fluss.compression.ArrowCompressionInfo.DEFAULT_COMPRESSION;
-import static org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE;
-import static org.apache.fluss.record.LogRecordBatchFormat.NO_BATCH_SEQUENCE;
-import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID;
-import static 
org.apache.fluss.testutils.DataTestUtils.createBasicMemoryLogRecords;
-import static org.apache.fluss.testutils.LogRecordsAssert.assertThatLogRecords;
-import static org.assertj.core.api.Assertions.assertThat;
-
-/**
- * Tests for {@link KvTablet} schema evolution handling.
- *
- * <p>Verifies that server-side row conversion (via column IDs) correctly 
handles partial updates
- * when the incoming batch or stored KV rows use an older schema.
- */
-class KvTabletSchemaEvolutionTest {
-
-    private static final short SCHEMA_ID_V0 = 0;
-    private static final short SCHEMA_ID_V1 = 1;
-
-    // Schema v0: {a INT PK, b STRING, c STRING}
-    private static final Schema SCHEMA_V0 =
-            Schema.newBuilder()
-                    .column("a", DataTypes.INT())
-                    .column("b", DataTypes.STRING())
-                    .column("c", DataTypes.STRING())
-                    .primaryKey("a")
-                    .build();
-
-    // Schema v1: ADD COLUMN d STRING (nullable)
-    // {a INT PK, b STRING, c STRING, d STRING}
-    private static final Schema SCHEMA_V1 =
-            Schema.newBuilder().fromSchema(SCHEMA_V0).column("d", 
DataTypes.STRING()).build();
-
-    private static final RowType ROW_TYPE_V0 = SCHEMA_V0.getRowType();
-    private static final RowType ROW_TYPE_V1 = SCHEMA_V1.getRowType();
-
-    private final Configuration conf = new Configuration();
-
-    private @TempDir File tempLogDir;
-    private @TempDir File tmpKvDir;
-
-    private TestingSchemaGetter schemaGetter;
-    private LogTablet logTablet;
-    private KvTablet kvTablet;
-
-    @BeforeEach
-    void setUp() throws Exception {
-        TablePath tablePath = TablePath.of("testDb", "test_schema_evolution");
-        PhysicalTablePath physicalTablePath = PhysicalTablePath.of(tablePath);
-        // Start with schema v0
-        schemaGetter = new TestingSchemaGetter(new SchemaInfo(SCHEMA_V0, 
SCHEMA_ID_V0));
-
-        File logTabletDir =
-                LogTestUtils.makeRandomLogTabletDir(
-                        tempLogDir,
-                        physicalTablePath.getDatabaseName(),
-                        0L,
-                        physicalTablePath.getTableName());
-        logTablet =
-                LogTablet.create(
-                        physicalTablePath,
-                        logTabletDir,
-                        conf,
-                        TestingMetricGroups.TABLET_SERVER_METRICS,
-                        0,
-                        new FlussScheduler(1),
-                        LogFormat.ARROW,
-                        1,
-                        true,
-                        SystemClock.getInstance(),
-                        true);
-
-        TableBucket tableBucket = logTablet.getTableBucket();
-        TableConfig tableConf = new TableConfig(new Configuration());
-        RowMerger rowMerger = RowMerger.create(tableConf, KvFormat.COMPACTED, 
schemaGetter);
-        AutoIncrementManager autoIncrementManager =
-                new AutoIncrementManager(
-                        schemaGetter, tablePath, tableConf, new 
TestingSequenceGeneratorFactory());
-
-        kvTablet =
-                KvTablet.create(
-                        physicalTablePath,
-                        tableBucket,
-                        logTablet,
-                        tmpKvDir,
-                        conf,
-                        TestingMetricGroups.TABLET_SERVER_METRICS,
-                        new RootAllocator(Long.MAX_VALUE),
-                        new TestingMemorySegmentPool(10 * 1024),
-                        KvFormat.COMPACTED,
-                        rowMerger,
-                        DEFAULT_COMPRESSION,
-                        schemaGetter,
-                        tableConf.getChangelogImage(),
-                        KvManager.getDefaultRateLimiter(),
-                        autoIncrementManager);
-    }
-
-    @AfterEach
-    void tearDown() throws Exception {
-        if (kvTablet != null) {
-            kvTablet.close();
-        }
-        if (logTablet != null) {
-            logTablet.close();
-        }
-    }
-
-    @Test
-    void testConvertTargetColumns_droppedColumnIsFiltered() {
-        // Build a target schema that doesn't contain column c (id=2).
-        // This simulates a DROP COLUMN scenario.
-        Schema schemaWithoutC =
-                Schema.newBuilder()
-                        .column("a", DataTypes.INT())
-                        .column("b", DataTypes.STRING())
-                        .primaryKey("a")
-                        .build();
-
-        // Positions [0, 2] in v0 = columns a(id=0) and c(id=2)
-        // Column c(id=2) does not exist in schemaWithoutC (which has ids 0, 1)
-        // → c should be silently dropped from the result
-        int[] positions = {0, 2};
-        int[] result = KvTablet.convertTargetColumns(positions, SCHEMA_V0, 
schemaWithoutC);
-        // Only column a(id=0) survives → mapped to position 0 in 
schemaWithoutC
-        assertThat(result).containsExactly(0);
-    }
-
-    @Test
-    void testPartialUpdateAfterAddColumn() throws Exception {
-        KvRecordTestUtils.KvRecordFactory recordFactoryV0 =
-                KvRecordTestUtils.KvRecordFactory.of(ROW_TYPE_V0);
-        KvRecordTestUtils.KvRecordBatchFactory batchFactoryV0 =
-                KvRecordTestUtils.KvRecordBatchFactory.of(SCHEMA_ID_V0);
-
-        // Step 1: Insert full row with schema v0: {a=1, b="b_val", c="c_val"}
-        KvRecordBatch insertBatch =
-                batchFactoryV0.ofRecords(
-                        recordFactoryV0.ofRecord(
-                                "k1".getBytes(), new Object[] {1, "b_val", 
"c_val"}));
-        kvTablet.putAsLeader(insertBatch, null);
-
-        // Step 2: Evolve schema to v1 (ADD COLUMN d)
-        schemaGetter.updateLatestSchemaInfo(new SchemaInfo(SCHEMA_V1, 
SCHEMA_ID_V1));
-
-        long beforePartialUpdate = logTablet.localLogEndOffset();
-
-        // Step 3: Send partial update with schemaId=0, targeting columns [0, 
1] (a, b in v0)
-        // Row values: {a=1, b="new_b", c=null} (c is not targeted so its 
value doesn't matter)
-        int[] targetColumns = {0, 1};
-        KvRecordBatch partialBatch =
-                batchFactoryV0.ofRecords(
-                        recordFactoryV0.ofRecord("k1".getBytes(), new Object[] 
{1, "new_b", null}));
-        kvTablet.putAsLeader(partialBatch, targetColumns);
-
-        // Step 4: Verify result: {a=1, b="new_b", c="c_val", d=null}
-        // - b updated to "new_b"
-        // - c preserved from old row
-        // - d is null (new column, not present in old or new data)
-        LogRecords actualLogRecords = readLogRecords(beforePartialUpdate);
-        MemoryLogRecords expectedLogs =
-                logRecords(
-                        ROW_TYPE_V1,
-                        SCHEMA_ID_V1,
-                        beforePartialUpdate,
-                        Arrays.asList(ChangeType.UPDATE_BEFORE, 
ChangeType.UPDATE_AFTER),
-                        Arrays.asList(
-                                new Object[] {
-                                    1, "b_val", "c_val", null
-                                }, // before (old row aligned)
-                                new Object[] {1, "new_b", "c_val", null} // 
after (b updated)
-                                ));
-
-        assertThatLogRecords(actualLogRecords)
-                .withSchema(ROW_TYPE_V1)
-                .assertCheckSum(true)
-                .isEqualTo(expectedLogs);
-    }
-
-    @Test
-    void testDeleteAfterAddColumn() throws Exception {
-        KvRecordTestUtils.KvRecordFactory recordFactoryV0 =
-                KvRecordTestUtils.KvRecordFactory.of(ROW_TYPE_V0);
-        KvRecordTestUtils.KvRecordBatchFactory batchFactoryV0 =
-                KvRecordTestUtils.KvRecordBatchFactory.of(SCHEMA_ID_V0);
-
-        // Step 1: Insert row with schema v0
-        KvRecordBatch insertBatch =
-                batchFactoryV0.ofRecords(
-                        recordFactoryV0.ofRecord(
-                                "k1".getBytes(), new Object[] {1, "b_val", 
"c_val"}));
-        kvTablet.putAsLeader(insertBatch, null);
-
-        // Step 2: Evolve schema
-        schemaGetter.updateLatestSchemaInfo(new SchemaInfo(SCHEMA_V1, 
SCHEMA_ID_V1));
-
-        long beforeDelete = logTablet.localLogEndOffset();
-
-        // Step 3: Delete with schemaId=0
-        KvRecordBatch deleteBatch =
-                
batchFactoryV0.ofRecords(recordFactoryV0.ofRecord("k1".getBytes(), null));
-        kvTablet.putAsLeader(deleteBatch, null);
-
-        // Step 4: Verify DELETE log uses aligned (v1) row
-        LogRecords actualLogRecords = readLogRecords(beforeDelete);
-        MemoryLogRecords expectedLogs =
-                logRecords(
-                        ROW_TYPE_V1,
-                        SCHEMA_ID_V1,
-                        beforeDelete,
-                        Collections.singletonList(ChangeType.DELETE),
-                        Collections.singletonList(new Object[] {1, "b_val", 
"c_val", null}));
-
-        assertThatLogRecords(actualLogRecords)
-                .withSchema(ROW_TYPE_V1)
-                .assertCheckSum(true)
-                .isEqualTo(expectedLogs);
-    }
-
-    // ==================== Helper Methods ====================
-
-    private LogRecords readLogRecords(long startOffset) throws Exception {
-        return logTablet
-                .read(startOffset, Integer.MAX_VALUE, FetchIsolation.LOG_END, 
false, null)
-                .getRecords();
-    }
-
-    private MemoryLogRecords logRecords(
-            RowType rowType,
-            short schemaId,
-            long baseOffset,
-            List<ChangeType> changeTypes,
-            List<Object[]> rows)
-            throws Exception {
-        return createBasicMemoryLogRecords(
-                rowType,
-                schemaId,
-                baseOffset,
-                -1L,
-                CURRENT_LOG_MAGIC_VALUE,
-                NO_WRITER_ID,
-                NO_BATCH_SEQUENCE,
-                changeTypes,
-                rows,
-                LogFormat.ARROW,
-                DEFAULT_COMPRESSION);
-    }
-}

Reply via email to