This is an automated email from the ASF dual-hosted git repository.
ipolyzos pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 5ede2fc13 [BugFix] Fix ArrayIndexOutOfBoundsException in partial
updates after ADD COLUMN (#2594)
5ede2fc13 is described below
commit 5ede2fc13ebe85fe95fa4a20bdddc342911dfed7
Author: Anton Borisov <[email protected]>
AuthorDate: Thu Apr 2 14:34:03 2026 +0100
[BugFix] Fix ArrayIndexOutOfBoundsException in partial updates after ADD
COLUMN (#2594)
* [TASK-2293] Server-side schema alignment for partial updates
* Address comments
* add visiblefortesting annotation as its a fluss convention
---------
Co-authored-by: ipolyzos <[email protected]>
---
.../java/org/apache/fluss/server/kv/KvTablet.java | 187 +++++++++++--
.../server/kv/KvTabletSchemaEvolutionTest.java | 306 +++++++++++++++++++++
2 files changed, 464 insertions(+), 29 deletions(-)
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
index e72428a02..31f7f90c3 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
@@ -42,9 +42,11 @@ import org.apache.fluss.record.KvRecord;
import org.apache.fluss.record.KvRecordBatch;
import org.apache.fluss.record.KvRecordReadContext;
import org.apache.fluss.row.BinaryRow;
+import org.apache.fluss.row.InternalRow;
import org.apache.fluss.row.PaddingRow;
import org.apache.fluss.row.arrow.ArrowWriterPool;
import org.apache.fluss.row.arrow.ArrowWriterProvider;
+import org.apache.fluss.row.encode.RowEncoder;
import org.apache.fluss.row.encode.ValueDecoder;
import org.apache.fluss.rpc.protocol.MergeMode;
import org.apache.fluss.server.kv.autoinc.AutoIncIDRange;
@@ -86,6 +88,7 @@ import javax.annotation.concurrent.ThreadSafe;
import java.io.File;
import java.io.IOException;
import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
@@ -374,23 +377,31 @@ public final class KvTablet {
short latestSchemaId = (short) schemaInfo.getSchemaId();
validateSchemaId(kvRecords.schemaId(), latestSchemaId);
+ // Convert target column positions from client's schema to
latest
+ // schema positions using column IDs if schemas differ.
+ int[] resolvedTargetColumns = targetColumns;
+ if (targetColumns != null && kvRecords.schemaId() !=
latestSchemaId) {
+ Schema clientSchema =
schemaGetter.getSchema(kvRecords.schemaId());
+ resolvedTargetColumns =
+ convertTargetColumns(targetColumns,
clientSchema, latestSchema);
+ }
+
AutoIncrementUpdater currentAutoIncrementUpdater =
autoIncrementManager.getUpdaterForSchema(kvFormat,
latestSchemaId);
// Validate targetColumns doesn't contain auto-increment
column
-
currentAutoIncrementUpdater.validateTargetColumns(targetColumns);
+
currentAutoIncrementUpdater.validateTargetColumns(resolvedTargetColumns);
// Determine the row merger based on mergeMode:
// - DEFAULT: Use the configured merge engine (rowMerger)
// - OVERWRITE: Bypass merge engine, use pre-created
overwriteRowMerger
// to directly replace values (for undo recovery
scenarios)
- // We only support ADD COLUMN, so targetColumns is fine to
be used directly.
RowMerger currentMerger =
(mergeMode == MergeMode.OVERWRITE)
?
overwriteRowMerger.configureTargetColumns(
- targetColumns, latestSchemaId,
latestSchema)
+ resolvedTargetColumns,
latestSchemaId, latestSchema)
: rowMerger.configureTargetColumns(
- targetColumns, latestSchemaId,
latestSchema);
+ resolvedTargetColumns,
latestSchemaId, latestSchema);
RowType latestRowType = latestSchema.getRowType();
WalBuilder walBuilder = createWalBuilder(latestSchemaId,
latestRowType);
@@ -406,6 +417,8 @@ public final class KvTablet {
processKvRecords(
kvRecords,
kvRecords.schemaId(),
+ latestSchemaId,
+ latestSchema,
currentMerger,
currentAutoIncrementUpdater,
walBuilder,
@@ -460,6 +473,8 @@ public final class KvTablet {
private void processKvRecords(
KvRecordBatch kvRecords,
short schemaIdOfNewData,
+ short latestSchemaId,
+ Schema latestSchema,
RowMerger currentMerger,
AutoIncrementUpdater autoIncrementUpdater,
WalBuilder walBuilder,
@@ -473,32 +488,42 @@ public final class KvTablet {
KvRecordReadContext.createReadContext(kvFormat, schemaGetter);
ValueDecoder valueDecoder = new ValueDecoder(schemaGetter, kvFormat);
- for (KvRecord kvRecord : kvRecords.records(readContext)) {
- byte[] keyBytes = BytesUtils.toArray(kvRecord.getKey());
- KvPreWriteBuffer.Key key = KvPreWriteBuffer.Key.of(keyBytes);
- BinaryRow row = kvRecord.getRow();
- BinaryValue currentValue = row == null ? null : new
BinaryValue(schemaIdOfNewData, row);
+ try (SchemaAlignmentContext alignmentContext =
+ new SchemaAlignmentContext(latestSchemaId, latestSchema,
kvFormat)) {
+ for (KvRecord kvRecord : kvRecords.records(readContext)) {
+ byte[] keyBytes = BytesUtils.toArray(kvRecord.getKey());
+ KvPreWriteBuffer.Key key = KvPreWriteBuffer.Key.of(keyBytes);
+ BinaryRow row = kvRecord.getRow();
+ BinaryValue currentValue =
+ row == null ? null : new
BinaryValue(schemaIdOfNewData, row);
+
+ if (currentValue != null && schemaIdOfNewData !=
latestSchemaId) {
+ currentValue = alignToLatestSchema(currentValue,
alignmentContext);
+ }
- if (currentValue == null) {
- logOffset =
- processDeletion(
- key,
- currentMerger,
- valueDecoder,
- walBuilder,
- latestSchemaRow,
- logOffset);
- } else {
- logOffset =
- processUpsert(
- key,
- currentValue,
- currentMerger,
- autoIncrementUpdater,
- valueDecoder,
- walBuilder,
- latestSchemaRow,
- logOffset);
+ if (currentValue == null) {
+ logOffset =
+ processDeletion(
+ key,
+ currentMerger,
+ alignmentContext,
+ valueDecoder,
+ walBuilder,
+ latestSchemaRow,
+ logOffset);
+ } else {
+ logOffset =
+ processUpsert(
+ key,
+ currentValue,
+ currentMerger,
+ alignmentContext,
+ autoIncrementUpdater,
+ valueDecoder,
+ walBuilder,
+ latestSchemaRow,
+ logOffset);
+ }
}
}
}
@@ -506,6 +531,7 @@ public final class KvTablet {
private long processDeletion(
KvPreWriteBuffer.Key key,
RowMerger currentMerger,
+ SchemaAlignmentContext alignmentContext,
ValueDecoder valueDecoder,
WalBuilder walBuilder,
PaddingRow latestSchemaRow,
@@ -530,6 +556,9 @@ public final class KvTablet {
}
BinaryValue oldValue = valueDecoder.decodeValue(oldValueBytes);
+ if (oldValue.schemaId != alignmentContext.latestSchemaId) {
+ oldValue = alignToLatestSchema(oldValue, alignmentContext);
+ }
BinaryValue newValue = currentMerger.delete(oldValue);
// if newValue is null, it means the row should be deleted
@@ -544,6 +573,7 @@ public final class KvTablet {
KvPreWriteBuffer.Key key,
BinaryValue currentValue,
RowMerger currentMerger,
+ SchemaAlignmentContext alignmentContext,
AutoIncrementUpdater autoIncrementUpdater,
ValueDecoder valueDecoder,
WalBuilder walBuilder,
@@ -572,6 +602,9 @@ public final class KvTablet {
}
BinaryValue oldValue = valueDecoder.decodeValue(oldValueBytes);
+ if (oldValue.schemaId != alignmentContext.latestSchemaId) {
+ oldValue = alignToLatestSchema(oldValue, alignmentContext);
+ }
BinaryValue newValue = currentMerger.merge(oldValue, currentValue);
if (newValue == oldValue) {
@@ -628,6 +661,102 @@ public final class KvTablet {
}
}
+ /** Batch-constant state for aligning rows to the latest schema. */
+ private static class SchemaAlignmentContext implements AutoCloseable {
+ final short latestSchemaId;
+ final List<Integer> targetColIds;
+ final RowEncoder encoder;
+ final Map<Short, SourceSchemaMapping> cache = new HashMap<>();
+
+ SchemaAlignmentContext(short latestSchemaId, Schema latestSchema,
KvFormat kvFormat) {
+ this.latestSchemaId = latestSchemaId;
+ this.targetColIds = latestSchema.getColumnIds();
+ this.encoder = RowEncoder.create(kvFormat,
latestSchema.getRowType());
+ }
+
+ @Override
+ public void close() throws Exception {
+ encoder.close();
+ }
+
+ /** Cached field getters and column-id→position map for a single
source schema. */
+ private static class SourceSchemaMapping {
+ final Map<Integer, Integer> idToPos;
+ final InternalRow.FieldGetter[] getters;
+
+ SourceSchemaMapping(Schema sourceSchema) {
+ List<Integer> sourceColIds = sourceSchema.getColumnIds();
+ this.idToPos = new HashMap<>();
+ for (int i = 0; i < sourceColIds.size(); i++) {
+ idToPos.put(sourceColIds.get(i), i);
+ }
+ this.getters =
InternalRow.createFieldGetters(sourceSchema.getRowType());
+ }
+ }
+ }
+
+ /**
+ * Converts a {@link BinaryValue} from its source schema layout to the
latest schema layout
+ * using column IDs to map positions. New columns (present in latest but
not in source) are
+ * filled with null. Only call when {@code value.schemaId !=
latestSchemaId}.
+ */
+ private BinaryValue alignToLatestSchema(BinaryValue value,
SchemaAlignmentContext ctx) {
+ SchemaAlignmentContext.SourceSchemaMapping mapping =
+ ctx.cache.computeIfAbsent(
+ value.schemaId,
+ id ->
+ new SchemaAlignmentContext.SourceSchemaMapping(
+ schemaGetter.getSchema(id)));
+
+ ctx.encoder.startNewRow();
+ for (int targetPos = 0; targetPos < ctx.targetColIds.size();
targetPos++) {
+ Integer sourcePos =
mapping.idToPos.get(ctx.targetColIds.get(targetPos));
+ if (sourcePos == null) {
+ // Column added after the source schema — fill with null.
+ ctx.encoder.encodeField(targetPos, null);
+ } else {
+ ctx.encoder.encodeField(
+ targetPos,
mapping.getters[sourcePos].getFieldOrNull(value.row));
+ }
+ }
+ // copy() is required: the encoder reuses its internal buffer, so the
next
+ // startNewRow() would overwrite the row returned here.
+ return new BinaryValue(ctx.latestSchemaId,
ctx.encoder.finishRow().copy());
+ }
+
+ /**
+ * Converts target column positions from the client's (source) schema to
the latest (target)
+ * schema using column IDs. This is needed when a client sends a partial
update using an older
+ * schema whose positions need to be remapped to the latest schema layout.
+ */
+ @VisibleForTesting
+ static int[] convertTargetColumns(int[] positions, Schema sourceSchema,
Schema targetSchema) {
+ List<Integer> sourceColIds = sourceSchema.getColumnIds();
+ List<Integer> targetColIds = targetSchema.getColumnIds();
+
+ Map<Integer, Integer> targetIdToPos = new HashMap<>();
+ for (int i = 0; i < targetColIds.size(); i++) {
+ targetIdToPos.put(targetColIds.get(i), i);
+ }
+
+ int resultCount = 0;
+ int[] buffer = new int[positions.length];
+ for (int position : positions) {
+ int colId = sourceColIds.get(position);
+ Integer targetPos = targetIdToPos.get(colId);
+ if (targetPos != null) {
+ buffer[resultCount++] = targetPos;
+ }
+ // Column was dropped in the latest schema — skip it.
+ }
+ if (resultCount == positions.length) {
+ return buffer;
+ }
+ int[] result = new int[resultCount];
+ System.arraycopy(buffer, 0, result, 0, resultCount);
+ return result;
+ }
+
private WalBuilder createWalBuilder(int schemaId, RowType rowType) throws
Exception {
switch (logFormat) {
case INDEXED:
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletSchemaEvolutionTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletSchemaEvolutionTest.java
new file mode 100644
index 000000000..c9e5f7143
--- /dev/null
+++
b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletSchemaEvolutionTest.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.kv;
+
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.config.TableConfig;
+import org.apache.fluss.memory.TestingMemorySegmentPool;
+import org.apache.fluss.metadata.KvFormat;
+import org.apache.fluss.metadata.LogFormat;
+import org.apache.fluss.metadata.PhysicalTablePath;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.SchemaInfo;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.record.ChangeType;
+import org.apache.fluss.record.KvRecordBatch;
+import org.apache.fluss.record.KvRecordTestUtils;
+import org.apache.fluss.record.LogRecords;
+import org.apache.fluss.record.MemoryLogRecords;
+import org.apache.fluss.record.TestingSchemaGetter;
+import org.apache.fluss.server.kv.autoinc.AutoIncrementManager;
+import org.apache.fluss.server.kv.autoinc.TestingSequenceGeneratorFactory;
+import org.apache.fluss.server.kv.rowmerger.RowMerger;
+import org.apache.fluss.server.log.FetchIsolation;
+import org.apache.fluss.server.log.LogTablet;
+import org.apache.fluss.server.log.LogTestUtils;
+import org.apache.fluss.server.metrics.group.TestingMetricGroups;
+import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator;
+import org.apache.fluss.types.DataTypes;
+import org.apache.fluss.types.RowType;
+import org.apache.fluss.utils.clock.SystemClock;
+import org.apache.fluss.utils.concurrent.FlussScheduler;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static
org.apache.fluss.compression.ArrowCompressionInfo.DEFAULT_COMPRESSION;
+import static org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE;
+import static org.apache.fluss.record.LogRecordBatchFormat.NO_BATCH_SEQUENCE;
+import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID;
+import static
org.apache.fluss.testutils.DataTestUtils.createBasicMemoryLogRecords;
+import static org.apache.fluss.testutils.LogRecordsAssert.assertThatLogRecords;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Tests for {@link KvTablet} schema evolution handling.
+ *
+ * <p>Verifies that server-side row conversion (via column IDs) correctly
handles partial updates
+ * when the incoming batch or stored KV rows use an older schema.
+ */
+class KvTabletSchemaEvolutionTest {
+
+ private static final short SCHEMA_ID_V0 = 0;
+ private static final short SCHEMA_ID_V1 = 1;
+
+ // Schema v0: {a INT PK, b STRING, c STRING}
+ private static final Schema SCHEMA_V0 =
+ Schema.newBuilder()
+ .column("a", DataTypes.INT())
+ .column("b", DataTypes.STRING())
+ .column("c", DataTypes.STRING())
+ .primaryKey("a")
+ .build();
+
+ // Schema v1: ADD COLUMN d STRING (nullable)
+ // {a INT PK, b STRING, c STRING, d STRING}
+ private static final Schema SCHEMA_V1 =
+ Schema.newBuilder().fromSchema(SCHEMA_V0).column("d",
DataTypes.STRING()).build();
+
+ private static final RowType ROW_TYPE_V0 = SCHEMA_V0.getRowType();
+ private static final RowType ROW_TYPE_V1 = SCHEMA_V1.getRowType();
+
+ private final Configuration conf = new Configuration();
+
+ private @TempDir File tempLogDir;
+ private @TempDir File tmpKvDir;
+
+ private TestingSchemaGetter schemaGetter;
+ private LogTablet logTablet;
+ private KvTablet kvTablet;
+
+ @BeforeEach
+ void setUp() throws Exception {
+ TablePath tablePath = TablePath.of("testDb", "test_schema_evolution");
+ PhysicalTablePath physicalTablePath = PhysicalTablePath.of(tablePath);
+ // Start with schema v0
+ schemaGetter = new TestingSchemaGetter(new SchemaInfo(SCHEMA_V0,
SCHEMA_ID_V0));
+
+ File logTabletDir =
+ LogTestUtils.makeRandomLogTabletDir(
+ tempLogDir,
+ physicalTablePath.getDatabaseName(),
+ 0L,
+ physicalTablePath.getTableName());
+ logTablet =
+ LogTablet.create(
+ physicalTablePath,
+ logTabletDir,
+ conf,
+ TestingMetricGroups.TABLET_SERVER_METRICS,
+ 0,
+ new FlussScheduler(1),
+ LogFormat.ARROW,
+ 1,
+ true,
+ SystemClock.getInstance(),
+ true);
+
+ TableBucket tableBucket = logTablet.getTableBucket();
+ TableConfig tableConf = new TableConfig(new Configuration());
+ RowMerger rowMerger = RowMerger.create(tableConf, KvFormat.COMPACTED,
schemaGetter);
+ AutoIncrementManager autoIncrementManager =
+ new AutoIncrementManager(
+ schemaGetter, tablePath, tableConf, new
TestingSequenceGeneratorFactory());
+
+ kvTablet =
+ KvTablet.create(
+ physicalTablePath,
+ tableBucket,
+ logTablet,
+ tmpKvDir,
+ conf,
+ TestingMetricGroups.TABLET_SERVER_METRICS,
+ new RootAllocator(Long.MAX_VALUE),
+ new TestingMemorySegmentPool(10 * 1024),
+ KvFormat.COMPACTED,
+ rowMerger,
+ DEFAULT_COMPRESSION,
+ schemaGetter,
+ tableConf.getChangelogImage(),
+ KvManager.getDefaultRateLimiter(),
+ autoIncrementManager);
+ }
+
+ @AfterEach
+ void tearDown() throws Exception {
+ if (kvTablet != null) {
+ kvTablet.close();
+ }
+ if (logTablet != null) {
+ logTablet.close();
+ }
+ }
+
+ @Test
+ void testConvertTargetColumns_droppedColumnIsFiltered() {
+ // Build a target schema that doesn't contain column c (id=2).
+ // This simulates a DROP COLUMN scenario.
+ Schema schemaWithoutC =
+ Schema.newBuilder()
+ .column("a", DataTypes.INT())
+ .column("b", DataTypes.STRING())
+ .primaryKey("a")
+ .build();
+
+ // Positions [0, 2] in v0 = columns a(id=0) and c(id=2)
+ // Column c(id=2) does not exist in schemaWithoutC (which has ids 0, 1)
+ // → c should be silently dropped from the result
+ int[] positions = {0, 2};
+ int[] result = KvTablet.convertTargetColumns(positions, SCHEMA_V0,
schemaWithoutC);
+ // Only column a(id=0) survives → mapped to position 0 in
schemaWithoutC
+ assertThat(result).containsExactly(0);
+ }
+
+ @Test
+ void testPartialUpdateAfterAddColumn() throws Exception {
+ KvRecordTestUtils.KvRecordFactory recordFactoryV0 =
+ KvRecordTestUtils.KvRecordFactory.of(ROW_TYPE_V0);
+ KvRecordTestUtils.KvRecordBatchFactory batchFactoryV0 =
+ KvRecordTestUtils.KvRecordBatchFactory.of(SCHEMA_ID_V0);
+
+ // Step 1: Insert full row with schema v0: {a=1, b="b_val", c="c_val"}
+ KvRecordBatch insertBatch =
+ batchFactoryV0.ofRecords(
+ recordFactoryV0.ofRecord(
+ "k1".getBytes(), new Object[] {1, "b_val",
"c_val"}));
+ kvTablet.putAsLeader(insertBatch, null);
+
+ // Step 2: Evolve schema to v1 (ADD COLUMN d)
+ schemaGetter.updateLatestSchemaInfo(new SchemaInfo(SCHEMA_V1,
SCHEMA_ID_V1));
+
+ long beforePartialUpdate = logTablet.localLogEndOffset();
+
+ // Step 3: Send partial update with schemaId=0, targeting columns [0,
1] (a, b in v0)
+ // Row values: {a=1, b="new_b", c=null} (c is not targeted so its
value doesn't matter)
+ int[] targetColumns = {0, 1};
+ KvRecordBatch partialBatch =
+ batchFactoryV0.ofRecords(
+ recordFactoryV0.ofRecord("k1".getBytes(), new Object[]
{1, "new_b", null}));
+ kvTablet.putAsLeader(partialBatch, targetColumns);
+
+ // Step 4: Verify result: {a=1, b="new_b", c="c_val", d=null}
+ // - b updated to "new_b"
+ // - c preserved from old row
+ // - d is null (new column, not present in old or new data)
+ LogRecords actualLogRecords = readLogRecords(beforePartialUpdate);
+ MemoryLogRecords expectedLogs =
+ logRecords(
+ ROW_TYPE_V1,
+ SCHEMA_ID_V1,
+ beforePartialUpdate,
+ Arrays.asList(ChangeType.UPDATE_BEFORE,
ChangeType.UPDATE_AFTER),
+ Arrays.asList(
+ new Object[] {
+ 1, "b_val", "c_val", null
+ }, // before (old row aligned)
+ new Object[] {1, "new_b", "c_val", null} //
after (b updated)
+ ));
+
+ assertThatLogRecords(actualLogRecords)
+ .withSchema(ROW_TYPE_V1)
+ .assertCheckSum(true)
+ .isEqualTo(expectedLogs);
+ }
+
+ @Test
+ void testDeleteAfterAddColumn() throws Exception {
+ KvRecordTestUtils.KvRecordFactory recordFactoryV0 =
+ KvRecordTestUtils.KvRecordFactory.of(ROW_TYPE_V0);
+ KvRecordTestUtils.KvRecordBatchFactory batchFactoryV0 =
+ KvRecordTestUtils.KvRecordBatchFactory.of(SCHEMA_ID_V0);
+
+ // Step 1: Insert row with schema v0
+ KvRecordBatch insertBatch =
+ batchFactoryV0.ofRecords(
+ recordFactoryV0.ofRecord(
+ "k1".getBytes(), new Object[] {1, "b_val",
"c_val"}));
+ kvTablet.putAsLeader(insertBatch, null);
+
+ // Step 2: Evolve schema
+ schemaGetter.updateLatestSchemaInfo(new SchemaInfo(SCHEMA_V1,
SCHEMA_ID_V1));
+
+ long beforeDelete = logTablet.localLogEndOffset();
+
+ // Step 3: Delete with schemaId=0
+ KvRecordBatch deleteBatch =
+
batchFactoryV0.ofRecords(recordFactoryV0.ofRecord("k1".getBytes(), null));
+ kvTablet.putAsLeader(deleteBatch, null);
+
+ // Step 4: Verify DELETE log uses aligned (v1) row
+ LogRecords actualLogRecords = readLogRecords(beforeDelete);
+ MemoryLogRecords expectedLogs =
+ logRecords(
+ ROW_TYPE_V1,
+ SCHEMA_ID_V1,
+ beforeDelete,
+ Collections.singletonList(ChangeType.DELETE),
+ Collections.singletonList(new Object[] {1, "b_val",
"c_val", null}));
+
+ assertThatLogRecords(actualLogRecords)
+ .withSchema(ROW_TYPE_V1)
+ .assertCheckSum(true)
+ .isEqualTo(expectedLogs);
+ }
+
+ // ==================== Helper Methods ====================
+
+ private LogRecords readLogRecords(long startOffset) throws Exception {
+ return logTablet
+ .read(startOffset, Integer.MAX_VALUE, FetchIsolation.LOG_END,
false, null)
+ .getRecords();
+ }
+
+ private MemoryLogRecords logRecords(
+ RowType rowType,
+ short schemaId,
+ long baseOffset,
+ List<ChangeType> changeTypes,
+ List<Object[]> rows)
+ throws Exception {
+ return createBasicMemoryLogRecords(
+ rowType,
+ schemaId,
+ baseOffset,
+ -1L,
+ CURRENT_LOG_MAGIC_VALUE,
+ NO_WRITER_ID,
+ NO_BATCH_SEQUENCE,
+ changeTypes,
+ rows,
+ LogFormat.ARROW,
+ DEFAULT_COMPRESSION);
+ }
+}