This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/fluss.git
commit 82b14f2ea4f87ed54f0898b0d2a9e925926cc302 Author: Jark Wu <[email protected]> AuthorDate: Sat Apr 4 21:16:52 2026 +0800 Revert "[BugFix] Fix ArrayIndexOutOfBoundsException in partial updates after ADD COLUMN (#2594)" This reverts commit 5ede2fc13ebe85fe95fa4a20bdddc342911dfed7. --- .../java/org/apache/fluss/server/kv/KvTablet.java | 187 ++----------- .../server/kv/KvTabletSchemaEvolutionTest.java | 306 --------------------- 2 files changed, 29 insertions(+), 464 deletions(-) diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java index 31f7f90c3..e72428a02 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java @@ -42,11 +42,9 @@ import org.apache.fluss.record.KvRecord; import org.apache.fluss.record.KvRecordBatch; import org.apache.fluss.record.KvRecordReadContext; import org.apache.fluss.row.BinaryRow; -import org.apache.fluss.row.InternalRow; import org.apache.fluss.row.PaddingRow; import org.apache.fluss.row.arrow.ArrowWriterPool; import org.apache.fluss.row.arrow.ArrowWriterProvider; -import org.apache.fluss.row.encode.RowEncoder; import org.apache.fluss.row.encode.ValueDecoder; import org.apache.fluss.rpc.protocol.MergeMode; import org.apache.fluss.server.kv.autoinc.AutoIncIDRange; @@ -88,7 +86,6 @@ import javax.annotation.concurrent.ThreadSafe; import java.io.File; import java.io.IOException; import java.util.Collection; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Executor; @@ -377,31 +374,23 @@ public final class KvTablet { short latestSchemaId = (short) schemaInfo.getSchemaId(); validateSchemaId(kvRecords.schemaId(), latestSchemaId); - // Convert target column positions from client's schema to latest - // schema positions using column IDs if schemas differ. - int[] resolvedTargetColumns = targetColumns; - if (targetColumns != null && kvRecords.schemaId() != latestSchemaId) { - Schema clientSchema = schemaGetter.getSchema(kvRecords.schemaId()); - resolvedTargetColumns = - convertTargetColumns(targetColumns, clientSchema, latestSchema); - } - AutoIncrementUpdater currentAutoIncrementUpdater = autoIncrementManager.getUpdaterForSchema(kvFormat, latestSchemaId); // Validate targetColumns doesn't contain auto-increment column - currentAutoIncrementUpdater.validateTargetColumns(resolvedTargetColumns); + currentAutoIncrementUpdater.validateTargetColumns(targetColumns); // Determine the row merger based on mergeMode: // - DEFAULT: Use the configured merge engine (rowMerger) // - OVERWRITE: Bypass merge engine, use pre-created overwriteRowMerger // to directly replace values (for undo recovery scenarios) + // We only support ADD COLUMN, so targetColumns is fine to be used directly. RowMerger currentMerger = (mergeMode == MergeMode.OVERWRITE) ? overwriteRowMerger.configureTargetColumns( - resolvedTargetColumns, latestSchemaId, latestSchema) + targetColumns, latestSchemaId, latestSchema) : rowMerger.configureTargetColumns( - resolvedTargetColumns, latestSchemaId, latestSchema); + targetColumns, latestSchemaId, latestSchema); RowType latestRowType = latestSchema.getRowType(); WalBuilder walBuilder = createWalBuilder(latestSchemaId, latestRowType); @@ -417,8 +406,6 @@ public final class KvTablet { processKvRecords( kvRecords, kvRecords.schemaId(), - latestSchemaId, - latestSchema, currentMerger, currentAutoIncrementUpdater, walBuilder, @@ -473,8 +460,6 @@ public final class KvTablet { private void processKvRecords( KvRecordBatch kvRecords, short schemaIdOfNewData, - short latestSchemaId, - Schema latestSchema, RowMerger currentMerger, AutoIncrementUpdater autoIncrementUpdater, WalBuilder walBuilder, @@ -488,42 +473,32 @@ public final class KvTablet { KvRecordReadContext.createReadContext(kvFormat, schemaGetter); ValueDecoder valueDecoder = new ValueDecoder(schemaGetter, kvFormat); - try (SchemaAlignmentContext alignmentContext = - new SchemaAlignmentContext(latestSchemaId, latestSchema, kvFormat)) { - for (KvRecord kvRecord : kvRecords.records(readContext)) { - byte[] keyBytes = BytesUtils.toArray(kvRecord.getKey()); - KvPreWriteBuffer.Key key = KvPreWriteBuffer.Key.of(keyBytes); - BinaryRow row = kvRecord.getRow(); - BinaryValue currentValue = - row == null ? null : new BinaryValue(schemaIdOfNewData, row); - - if (currentValue != null && schemaIdOfNewData != latestSchemaId) { - currentValue = alignToLatestSchema(currentValue, alignmentContext); - } + for (KvRecord kvRecord : kvRecords.records(readContext)) { + byte[] keyBytes = BytesUtils.toArray(kvRecord.getKey()); + KvPreWriteBuffer.Key key = KvPreWriteBuffer.Key.of(keyBytes); + BinaryRow row = kvRecord.getRow(); + BinaryValue currentValue = row == null ? null : new BinaryValue(schemaIdOfNewData, row); - if (currentValue == null) { - logOffset = - processDeletion( - key, - currentMerger, - alignmentContext, - valueDecoder, - walBuilder, - latestSchemaRow, - logOffset); - } else { - logOffset = - processUpsert( - key, - currentValue, - currentMerger, - alignmentContext, - autoIncrementUpdater, - valueDecoder, - walBuilder, - latestSchemaRow, - logOffset); - } + if (currentValue == null) { + logOffset = + processDeletion( + key, + currentMerger, + valueDecoder, + walBuilder, + latestSchemaRow, + logOffset); + } else { + logOffset = + processUpsert( + key, + currentValue, + currentMerger, + autoIncrementUpdater, + valueDecoder, + walBuilder, + latestSchemaRow, + logOffset); } } } @@ -531,7 +506,6 @@ public final class KvTablet { private long processDeletion( KvPreWriteBuffer.Key key, RowMerger currentMerger, - SchemaAlignmentContext alignmentContext, ValueDecoder valueDecoder, WalBuilder walBuilder, PaddingRow latestSchemaRow, @@ -556,9 +530,6 @@ public final class KvTablet { } BinaryValue oldValue = valueDecoder.decodeValue(oldValueBytes); - if (oldValue.schemaId != alignmentContext.latestSchemaId) { - oldValue = alignToLatestSchema(oldValue, alignmentContext); - } BinaryValue newValue = currentMerger.delete(oldValue); // if newValue is null, it means the row should be deleted @@ -573,7 +544,6 @@ public final class KvTablet { KvPreWriteBuffer.Key key, BinaryValue currentValue, RowMerger currentMerger, - SchemaAlignmentContext alignmentContext, AutoIncrementUpdater autoIncrementUpdater, ValueDecoder valueDecoder, WalBuilder walBuilder, @@ -602,9 +572,6 @@ public final class KvTablet { } BinaryValue oldValue = valueDecoder.decodeValue(oldValueBytes); - if (oldValue.schemaId != alignmentContext.latestSchemaId) { - oldValue = alignToLatestSchema(oldValue, alignmentContext); - } BinaryValue newValue = currentMerger.merge(oldValue, currentValue); if (newValue == oldValue) { @@ -661,102 +628,6 @@ public final class KvTablet { } } - /** Batch-constant state for aligning rows to the latest schema. */ - private static class SchemaAlignmentContext implements AutoCloseable { - final short latestSchemaId; - final List<Integer> targetColIds; - final RowEncoder encoder; - final Map<Short, SourceSchemaMapping> cache = new HashMap<>(); - - SchemaAlignmentContext(short latestSchemaId, Schema latestSchema, KvFormat kvFormat) { - this.latestSchemaId = latestSchemaId; - this.targetColIds = latestSchema.getColumnIds(); - this.encoder = RowEncoder.create(kvFormat, latestSchema.getRowType()); - } - - @Override - public void close() throws Exception { - encoder.close(); - } - - /** Cached field getters and column-id→position map for a single source schema. */ - private static class SourceSchemaMapping { - final Map<Integer, Integer> idToPos; - final InternalRow.FieldGetter[] getters; - - SourceSchemaMapping(Schema sourceSchema) { - List<Integer> sourceColIds = sourceSchema.getColumnIds(); - this.idToPos = new HashMap<>(); - for (int i = 0; i < sourceColIds.size(); i++) { - idToPos.put(sourceColIds.get(i), i); - } - this.getters = InternalRow.createFieldGetters(sourceSchema.getRowType()); - } - } - } - - /** - * Converts a {@link BinaryValue} from its source schema layout to the latest schema layout - * using column IDs to map positions. New columns (present in latest but not in source) are - * filled with null. Only call when {@code value.schemaId != latestSchemaId}. - */ - private BinaryValue alignToLatestSchema(BinaryValue value, SchemaAlignmentContext ctx) { - SchemaAlignmentContext.SourceSchemaMapping mapping = - ctx.cache.computeIfAbsent( - value.schemaId, - id -> - new SchemaAlignmentContext.SourceSchemaMapping( - schemaGetter.getSchema(id))); - - ctx.encoder.startNewRow(); - for (int targetPos = 0; targetPos < ctx.targetColIds.size(); targetPos++) { - Integer sourcePos = mapping.idToPos.get(ctx.targetColIds.get(targetPos)); - if (sourcePos == null) { - // Column added after the source schema — fill with null. - ctx.encoder.encodeField(targetPos, null); - } else { - ctx.encoder.encodeField( - targetPos, mapping.getters[sourcePos].getFieldOrNull(value.row)); - } - } - // copy() is required: the encoder reuses its internal buffer, so the next - // startNewRow() would overwrite the row returned here. - return new BinaryValue(ctx.latestSchemaId, ctx.encoder.finishRow().copy()); - } - - /** - * Converts target column positions from the client's (source) schema to the latest (target) - * schema using column IDs. This is needed when a client sends a partial update using an older - * schema whose positions need to be remapped to the latest schema layout. - */ - @VisibleForTesting - static int[] convertTargetColumns(int[] positions, Schema sourceSchema, Schema targetSchema) { - List<Integer> sourceColIds = sourceSchema.getColumnIds(); - List<Integer> targetColIds = targetSchema.getColumnIds(); - - Map<Integer, Integer> targetIdToPos = new HashMap<>(); - for (int i = 0; i < targetColIds.size(); i++) { - targetIdToPos.put(targetColIds.get(i), i); - } - - int resultCount = 0; - int[] buffer = new int[positions.length]; - for (int position : positions) { - int colId = sourceColIds.get(position); - Integer targetPos = targetIdToPos.get(colId); - if (targetPos != null) { - buffer[resultCount++] = targetPos; - } - // Column was dropped in the latest schema — skip it. - } - if (resultCount == positions.length) { - return buffer; - } - int[] result = new int[resultCount]; - System.arraycopy(buffer, 0, result, 0, resultCount); - return result; - } - private WalBuilder createWalBuilder(int schemaId, RowType rowType) throws Exception { switch (logFormat) { case INDEXED: diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletSchemaEvolutionTest.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletSchemaEvolutionTest.java deleted file mode 100644 index c9e5f7143..000000000 --- a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletSchemaEvolutionTest.java +++ /dev/null @@ -1,306 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.fluss.server.kv; - -import org.apache.fluss.config.Configuration; -import org.apache.fluss.config.TableConfig; -import org.apache.fluss.memory.TestingMemorySegmentPool; -import org.apache.fluss.metadata.KvFormat; -import org.apache.fluss.metadata.LogFormat; -import org.apache.fluss.metadata.PhysicalTablePath; -import org.apache.fluss.metadata.Schema; -import org.apache.fluss.metadata.SchemaInfo; -import org.apache.fluss.metadata.TableBucket; -import org.apache.fluss.metadata.TablePath; -import org.apache.fluss.record.ChangeType; -import org.apache.fluss.record.KvRecordBatch; -import org.apache.fluss.record.KvRecordTestUtils; -import org.apache.fluss.record.LogRecords; -import org.apache.fluss.record.MemoryLogRecords; -import org.apache.fluss.record.TestingSchemaGetter; -import org.apache.fluss.server.kv.autoinc.AutoIncrementManager; -import org.apache.fluss.server.kv.autoinc.TestingSequenceGeneratorFactory; -import org.apache.fluss.server.kv.rowmerger.RowMerger; -import org.apache.fluss.server.log.FetchIsolation; -import org.apache.fluss.server.log.LogTablet; -import org.apache.fluss.server.log.LogTestUtils; -import org.apache.fluss.server.metrics.group.TestingMetricGroups; -import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator; -import org.apache.fluss.types.DataTypes; -import org.apache.fluss.types.RowType; -import org.apache.fluss.utils.clock.SystemClock; -import org.apache.fluss.utils.concurrent.FlussScheduler; - -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; - -import java.io.File; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; - -import static org.apache.fluss.compression.ArrowCompressionInfo.DEFAULT_COMPRESSION; -import static org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE; -import static org.apache.fluss.record.LogRecordBatchFormat.NO_BATCH_SEQUENCE; -import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID; -import static org.apache.fluss.testutils.DataTestUtils.createBasicMemoryLogRecords; -import static org.apache.fluss.testutils.LogRecordsAssert.assertThatLogRecords; -import static org.assertj.core.api.Assertions.assertThat; - -/** - * Tests for {@link KvTablet} schema evolution handling. - * - * <p>Verifies that server-side row conversion (via column IDs) correctly handles partial updates - * when the incoming batch or stored KV rows use an older schema. - */ -class KvTabletSchemaEvolutionTest { - - private static final short SCHEMA_ID_V0 = 0; - private static final short SCHEMA_ID_V1 = 1; - - // Schema v0: {a INT PK, b STRING, c STRING} - private static final Schema SCHEMA_V0 = - Schema.newBuilder() - .column("a", DataTypes.INT()) - .column("b", DataTypes.STRING()) - .column("c", DataTypes.STRING()) - .primaryKey("a") - .build(); - - // Schema v1: ADD COLUMN d STRING (nullable) - // {a INT PK, b STRING, c STRING, d STRING} - private static final Schema SCHEMA_V1 = - Schema.newBuilder().fromSchema(SCHEMA_V0).column("d", DataTypes.STRING()).build(); - - private static final RowType ROW_TYPE_V0 = SCHEMA_V0.getRowType(); - private static final RowType ROW_TYPE_V1 = SCHEMA_V1.getRowType(); - - private final Configuration conf = new Configuration(); - - private @TempDir File tempLogDir; - private @TempDir File tmpKvDir; - - private TestingSchemaGetter schemaGetter; - private LogTablet logTablet; - private KvTablet kvTablet; - - @BeforeEach - void setUp() throws Exception { - TablePath tablePath = TablePath.of("testDb", "test_schema_evolution"); - PhysicalTablePath physicalTablePath = PhysicalTablePath.of(tablePath); - // Start with schema v0 - schemaGetter = new TestingSchemaGetter(new SchemaInfo(SCHEMA_V0, SCHEMA_ID_V0)); - - File logTabletDir = - LogTestUtils.makeRandomLogTabletDir( - tempLogDir, - physicalTablePath.getDatabaseName(), - 0L, - physicalTablePath.getTableName()); - logTablet = - LogTablet.create( - physicalTablePath, - logTabletDir, - conf, - TestingMetricGroups.TABLET_SERVER_METRICS, - 0, - new FlussScheduler(1), - LogFormat.ARROW, - 1, - true, - SystemClock.getInstance(), - true); - - TableBucket tableBucket = logTablet.getTableBucket(); - TableConfig tableConf = new TableConfig(new Configuration()); - RowMerger rowMerger = RowMerger.create(tableConf, KvFormat.COMPACTED, schemaGetter); - AutoIncrementManager autoIncrementManager = - new AutoIncrementManager( - schemaGetter, tablePath, tableConf, new TestingSequenceGeneratorFactory()); - - kvTablet = - KvTablet.create( - physicalTablePath, - tableBucket, - logTablet, - tmpKvDir, - conf, - TestingMetricGroups.TABLET_SERVER_METRICS, - new RootAllocator(Long.MAX_VALUE), - new TestingMemorySegmentPool(10 * 1024), - KvFormat.COMPACTED, - rowMerger, - DEFAULT_COMPRESSION, - schemaGetter, - tableConf.getChangelogImage(), - KvManager.getDefaultRateLimiter(), - autoIncrementManager); - } - - @AfterEach - void tearDown() throws Exception { - if (kvTablet != null) { - kvTablet.close(); - } - if (logTablet != null) { - logTablet.close(); - } - } - - @Test - void testConvertTargetColumns_droppedColumnIsFiltered() { - // Build a target schema that doesn't contain column c (id=2). - // This simulates a DROP COLUMN scenario. - Schema schemaWithoutC = - Schema.newBuilder() - .column("a", DataTypes.INT()) - .column("b", DataTypes.STRING()) - .primaryKey("a") - .build(); - - // Positions [0, 2] in v0 = columns a(id=0) and c(id=2) - // Column c(id=2) does not exist in schemaWithoutC (which has ids 0, 1) - // → c should be silently dropped from the result - int[] positions = {0, 2}; - int[] result = KvTablet.convertTargetColumns(positions, SCHEMA_V0, schemaWithoutC); - // Only column a(id=0) survives → mapped to position 0 in schemaWithoutC - assertThat(result).containsExactly(0); - } - - @Test - void testPartialUpdateAfterAddColumn() throws Exception { - KvRecordTestUtils.KvRecordFactory recordFactoryV0 = - KvRecordTestUtils.KvRecordFactory.of(ROW_TYPE_V0); - KvRecordTestUtils.KvRecordBatchFactory batchFactoryV0 = - KvRecordTestUtils.KvRecordBatchFactory.of(SCHEMA_ID_V0); - - // Step 1: Insert full row with schema v0: {a=1, b="b_val", c="c_val"} - KvRecordBatch insertBatch = - batchFactoryV0.ofRecords( - recordFactoryV0.ofRecord( - "k1".getBytes(), new Object[] {1, "b_val", "c_val"})); - kvTablet.putAsLeader(insertBatch, null); - - // Step 2: Evolve schema to v1 (ADD COLUMN d) - schemaGetter.updateLatestSchemaInfo(new SchemaInfo(SCHEMA_V1, SCHEMA_ID_V1)); - - long beforePartialUpdate = logTablet.localLogEndOffset(); - - // Step 3: Send partial update with schemaId=0, targeting columns [0, 1] (a, b in v0) - // Row values: {a=1, b="new_b", c=null} (c is not targeted so its value doesn't matter) - int[] targetColumns = {0, 1}; - KvRecordBatch partialBatch = - batchFactoryV0.ofRecords( - recordFactoryV0.ofRecord("k1".getBytes(), new Object[] {1, "new_b", null})); - kvTablet.putAsLeader(partialBatch, targetColumns); - - // Step 4: Verify result: {a=1, b="new_b", c="c_val", d=null} - // - b updated to "new_b" - // - c preserved from old row - // - d is null (new column, not present in old or new data) - LogRecords actualLogRecords = readLogRecords(beforePartialUpdate); - MemoryLogRecords expectedLogs = - logRecords( - ROW_TYPE_V1, - SCHEMA_ID_V1, - beforePartialUpdate, - Arrays.asList(ChangeType.UPDATE_BEFORE, ChangeType.UPDATE_AFTER), - Arrays.asList( - new Object[] { - 1, "b_val", "c_val", null - }, // before (old row aligned) - new Object[] {1, "new_b", "c_val", null} // after (b updated) - )); - - assertThatLogRecords(actualLogRecords) - .withSchema(ROW_TYPE_V1) - .assertCheckSum(true) - .isEqualTo(expectedLogs); - } - - @Test - void testDeleteAfterAddColumn() throws Exception { - KvRecordTestUtils.KvRecordFactory recordFactoryV0 = - KvRecordTestUtils.KvRecordFactory.of(ROW_TYPE_V0); - KvRecordTestUtils.KvRecordBatchFactory batchFactoryV0 = - KvRecordTestUtils.KvRecordBatchFactory.of(SCHEMA_ID_V0); - - // Step 1: Insert row with schema v0 - KvRecordBatch insertBatch = - batchFactoryV0.ofRecords( - recordFactoryV0.ofRecord( - "k1".getBytes(), new Object[] {1, "b_val", "c_val"})); - kvTablet.putAsLeader(insertBatch, null); - - // Step 2: Evolve schema - schemaGetter.updateLatestSchemaInfo(new SchemaInfo(SCHEMA_V1, SCHEMA_ID_V1)); - - long beforeDelete = logTablet.localLogEndOffset(); - - // Step 3: Delete with schemaId=0 - KvRecordBatch deleteBatch = - batchFactoryV0.ofRecords(recordFactoryV0.ofRecord("k1".getBytes(), null)); - kvTablet.putAsLeader(deleteBatch, null); - - // Step 4: Verify DELETE log uses aligned (v1) row - LogRecords actualLogRecords = readLogRecords(beforeDelete); - MemoryLogRecords expectedLogs = - logRecords( - ROW_TYPE_V1, - SCHEMA_ID_V1, - beforeDelete, - Collections.singletonList(ChangeType.DELETE), - Collections.singletonList(new Object[] {1, "b_val", "c_val", null})); - - assertThatLogRecords(actualLogRecords) - .withSchema(ROW_TYPE_V1) - .assertCheckSum(true) - .isEqualTo(expectedLogs); - } - - // ==================== Helper Methods ==================== - - private LogRecords readLogRecords(long startOffset) throws Exception { - return logTablet - .read(startOffset, Integer.MAX_VALUE, FetchIsolation.LOG_END, false, null) - .getRecords(); - } - - private MemoryLogRecords logRecords( - RowType rowType, - short schemaId, - long baseOffset, - List<ChangeType> changeTypes, - List<Object[]> rows) - throws Exception { - return createBasicMemoryLogRecords( - rowType, - schemaId, - baseOffset, - -1L, - CURRENT_LOG_MAGIC_VALUE, - NO_WRITER_ID, - NO_BATCH_SEQUENCE, - changeTypes, - rows, - LogFormat.ARROW, - DEFAULT_COMPRESSION); - } -}
