This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch ci-add-column in repository https://gitbox.apache.org/repos/asf/fluss.git
commit ff3db43d42e64d9d0691455ad9dbccfb7f5728e5 Author: Jark Wu <[email protected]> AuthorDate: Mon Dec 1 02:35:27 2025 +0800 WIP3 --- .../fluss/flink/sink/writer/FlinkSinkWriter.java | 2 +- .../source/reader/FlinkSourceSplitReader.java | 3 +- .../flink/utils/FlussRowToFlinkRowConverter.java | 1 - .../fluss/lake/iceberg/conf/HadoopUtils.java | 5 +- .../iceberg/maintenance/IcebergRewriteITCase.java | 2 +- .../fluss/rpc/netty/client/ServerConnection.java | 3 +- fluss-rpc/src/main/proto/FlussApi.proto | 12 +-- .../coordinator/CompletedSnapshotStoreManager.java | 2 +- .../coordinator/CoordinatorEventProcessor.java | 1 - .../fluss/server/coordinator/MetadataManager.java | 8 +- .../fluss/server/coordinator/SchemaUpdate.java | 28 +++--- .../fluss/server/coordinator/UpdateSchema.java | 28 ------ .../event/watcher/TableChangeWatcher.java | 9 -- .../server/entity/BatchRegisterLeadAndIsr.java | 2 +- .../apache/fluss/server/kv/KvRecoverHelper.java | 99 +++++++++------------- .../java/org/apache/fluss/server/kv/KvTablet.java | 11 +-- .../server/kv/snapshot/CompletedSnapshotStore.java | 4 +- .../fluss/server/kv/snapshot/SnapshotLocation.java | 2 +- ...MetadataManager.java => ServerSchemaCache.java} | 7 +- .../server/metadata/TabletServerMetadataCache.java | 15 ++-- ...ManagerTest.java => ServerSchemaCacheTest.java} | 16 ++-- .../server/testutils/FlussClusterExtension.java | 6 +- 22 files changed, 98 insertions(+), 168 deletions(-) diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/FlinkSinkWriter.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/FlinkSinkWriter.java index 97a70ce25..fe0eb28ea 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/FlinkSinkWriter.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/FlinkSinkWriter.java @@ -110,7 +110,7 @@ public abstract class FlinkSinkWriter<InputT> implements SinkWriter<InputT> { connection = ConnectionFactory.createConnection(flussConfig, flinkMetricRegistry); table = connection.getTable(tablePath); LOG.info( - "Current Fluss Schema is {}, Table RowType is {}", + "Current Fluss Schema is {}, input Flink RowType is {}", table.getTableInfo().getSchema(), tableRowType); sanityCheck(table.getTableInfo()); diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java index 10997e91e..a19572dc9 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java @@ -217,8 +217,7 @@ public class FlinkSourceSplitReader implements SplitReader<RecordAndPos, SourceS (LakeSnapshotAndFlussLogSplit) sourceSplitBase; if (lakeSnapshotAndFlussLogSplit.isStreaming()) { // is streaming split which has no stopping offset, we need also - // subscribe - // change log + // subscribe change log subscribeLog( lakeSnapshotAndFlussLogSplit, lakeSnapshotAndFlussLogSplit.getStartingOffset()); diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlussRowToFlinkRowConverter.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlussRowToFlinkRowConverter.java index 4dc5287b0..0846b284b 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlussRowToFlinkRowConverter.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlussRowToFlinkRowConverter.java @@ -47,7 +47,6 @@ public class FlussRowToFlinkRowConverter { private final InternalRow.FieldGetter[] flussFieldGetters; private final int[] indexMapping; - // todo: remove public FlussRowToFlinkRowConverter(RowType rowType) { this(rowType, toFlinkRowType(rowType)); } diff --git a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/conf/HadoopUtils.java b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/conf/HadoopUtils.java index 9c86a92c1..5aad1d5bf 100644 --- a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/conf/HadoopUtils.java +++ b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/conf/HadoopUtils.java @@ -79,8 +79,7 @@ public class HadoopUtils { } // Approach 3: Fluss configuration - // add all configuration key with prefix 'iceberg.hadoop.' in fluss conf to hadoop - // conf + // add all configuration key with prefix 'iceberg.hadoop.' in fluss conf to hadoop conf for (String key : flussConfiguration.keySet()) { for (String prefix : FLUSS_CONFIG_PREFIXES) { if (key.startsWith(prefix)) { @@ -109,7 +108,7 @@ public class HadoopUtils { } /** - * Search Hadoop configuration files in the given path, andadd them to the configuration if + * Search Hadoop configuration files in the given path, and add them to the configuration if * found. */ private static boolean addHadoopConfIfFound( diff --git a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteITCase.java b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteITCase.java index 133c23322..4b6d9de71 100644 --- a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteITCase.java +++ b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteITCase.java @@ -147,7 +147,7 @@ class IcebergRewriteITCase extends FlinkIcebergTieringTestBase { // add pos-delete and trigger compaction rows = Arrays.asList(row(4, "v1"), row(4, "v2")); flussRows.add(writeIcebergTableRecords(t1, t1Bucket, 6, false, rows).get(1)); - // rewritten files should fail to commit due to conflict,add check here + // rewritten files should fail to commit due to conflict, add check here checkRecords(getIcebergRecords(t1), flussRows); // 4 data file and 1 delete file checkFileStatusInIcebergTable(t1, 4, true); diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java index 7f4d6e84e..a09a50c09 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java @@ -69,8 +69,7 @@ final class ServerConnection { private final ServerNode node; - // TODO: add max inflight requests limit like Kafka's - // "max.in.flight.requests.per.connection" + // TODO: add max inflight requests limit like Kafka's "max.in.flight.requests.per.connection" private final Map<Integer, InflightRequest> inflightRequests = MapUtils.newConcurrentHashMap(); private final CompletableFuture<Void> closeFuture = new CompletableFuture<>(); private final ConnectionMetrics connectionMetrics; diff --git a/fluss-rpc/src/main/proto/FlussApi.proto b/fluss-rpc/src/main/proto/FlussApi.proto index 9feb38dc1..a08eeaba4 100644 --- a/fluss-rpc/src/main/proto/FlussApi.proto +++ b/fluss-rpc/src/main/proto/FlussApi.proto @@ -918,33 +918,33 @@ message PbAlterConfig { required int32 op_type = 3; // SET=0, DELETE=1, APPEND=2, SUBTRACT=3 } -message PbAddColumn{ +message PbAddColumn { required string column_name = 1; required bytes data_type_json = 2; optional string comment = 3; required int32 column_position_type = 4; // LAST=0,FIRST=1,AFTER=3 } -message PbDropColumn{ +message PbDropColumn { required string column_name = 1; } -message PbRenameColumn{ +message PbRenameColumn { required string old_column_name = 1; required string new_column_name = 2; } -message PbModifyColumn{ +message PbModifyColumn { required string column_name = 1; - required bytes data_type_json = 2; + optional bytes data_type_json = 2; optional string comment = 3; optional int32 column_position_type = 4; // LAST=0,FIRST=1,AFTER=3 } -message PbDescribeConfig{ +message PbDescribeConfig { required string config_key = 1; optional string config_value = 2; required string config_source = 3; diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CompletedSnapshotStoreManager.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CompletedSnapshotStoreManager.java index def57231d..b7c92289b 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CompletedSnapshotStoreManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CompletedSnapshotStoreManager.java @@ -148,7 +148,7 @@ public class CompletedSnapshotStoreManager { () -> getAllSnapshotSize(bucket)); } else { LOG.warn( - "Failed toadd bucketMetricGroup for tableBucket {} when creating completed snapshot.", + "Failed to add bucketMetricGroup for tableBucket {} when creating completed snapshot.", bucket); } return snapshotStore; diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java index fbe0986b1..d1d551121 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java @@ -556,7 +556,6 @@ public class CoordinatorEventProcessor implements EventProcessor { private void processCreateTable(CreateTableEvent createTableEvent) { long tableId = createTableEvent.getTableInfo().getTableId(); - int schemaId = createTableEvent.getTableInfo().getSchemaId(); // skip the table if it already exists if (coordinatorContext.containsTableId(tableId)) { return; diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java index 19b13aaa9..bf2cfc9fb 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java @@ -330,11 +330,7 @@ public class MetadataManager { // validate the table column changes if (!schemaChanges.isEmpty()) { - UpdateSchema schemaUpdate = new SchemaUpdate(table); - for (TableChange schemaChange : schemaChanges) { - schemaUpdate = schemaUpdate.applySchemaChange(schemaChange); - } - Schema newSchema = schemaUpdate.getSchema(); + Schema newSchema = SchemaUpdate.applySchemaChanges(table, schemaChanges); // update the schema zookeeperClient.registerSchema(tablePath, newSchema); } @@ -485,7 +481,7 @@ public class MetadataManager { if (toEnableDataLake) { TableInfo newTableInfo = newTableRegistration.toTableInfo(tablePath, schemaInfo); - // if the table is lake table, we need toadd it to lake table tiering manager + // if the table is lake table, we need to add it to lake table tiering manager lakeTableTieringManager.addNewLakeTable(newTableInfo); } else if (toDisableDataLake) { lakeTableTieringManager.removeLakeTable(newTableRegistration.tableId); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java index 1cdc702d7..9efab2a51 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java @@ -29,12 +29,20 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; /** Schema update. */ -public class SchemaUpdate implements UpdateSchema { +public class SchemaUpdate { + + /** Apply schema changes to the given table info and return the updated schema. */ + public static Schema applySchemaChanges(TableInfo tableInfo, List<TableChange> changes) { + SchemaUpdate schemaUpdate = new SchemaUpdate(tableInfo); + for (TableChange change : changes) { + schemaUpdate = schemaUpdate.applySchemaChange(change); + } + return schemaUpdate.getSchema(); + } + private final List<Schema.Column> columns; private final AtomicInteger highestFieldId; private final List<String> primaryKeys; - private final List<String> bucketKeys; - private final List<String> partitionKeys; private final Map<String, Schema.Column> existedColumns; public SchemaUpdate(TableInfo tableInfo) { @@ -42,15 +50,12 @@ public class SchemaUpdate implements UpdateSchema { this.existedColumns = new HashMap<>(); this.highestFieldId = new AtomicInteger(tableInfo.getSchema().getHighestFieldId()); this.primaryKeys = tableInfo.getPrimaryKeys(); - this.bucketKeys = tableInfo.getBucketKeys(); - this.partitionKeys = tableInfo.getPartitionKeys(); this.columns.addAll(tableInfo.getSchema().getColumns()); for (Schema.Column column : columns) { existedColumns.put(column.getName(), column); } } - @Override public Schema getSchema() { Schema.Builder builder = Schema.newBuilder() @@ -63,8 +68,7 @@ public class SchemaUpdate implements UpdateSchema { return builder.build(); } - @Override - public UpdateSchema applySchemaChange(TableChange columnChange) { + public SchemaUpdate applySchemaChange(TableChange columnChange) { if (columnChange instanceof TableChange.AddColumn) { return addColumn((TableChange.AddColumn) columnChange); } else if (columnChange instanceof TableChange.ModifyColumn) { @@ -78,7 +82,7 @@ public class SchemaUpdate implements UpdateSchema { "Unknown column change type " + columnChange.getClass().getName()); } - private UpdateSchema addColumn(TableChange.AddColumn addColumn) { + private SchemaUpdate addColumn(TableChange.AddColumn addColumn) { if (existedColumns.containsKey(addColumn.getName())) { throw new IllegalArgumentException( "Column " + addColumn.getName() + " already exists."); @@ -105,15 +109,15 @@ public class SchemaUpdate implements UpdateSchema { return this; } - private UpdateSchema dropColumn(TableChange.DropColumn dropColumn) { + private SchemaUpdate dropColumn(TableChange.DropColumn dropColumn) { throw new SchemaChangeException("Not support drop column now."); } - private UpdateSchema modifiedColumn(TableChange.ModifyColumn modifyColumn) { + private SchemaUpdate modifiedColumn(TableChange.ModifyColumn modifyColumn) { throw new SchemaChangeException("Not support modify column now."); } - private UpdateSchema renameColumn(TableChange.RenameColumn renameColumn) { + private SchemaUpdate renameColumn(TableChange.RenameColumn renameColumn) { throw new SchemaChangeException("Not support rename column now."); } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/UpdateSchema.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/UpdateSchema.java deleted file mode 100644 index 1d51704ac..000000000 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/UpdateSchema.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.fluss.server.coordinator; - -import org.apache.fluss.metadata.Schema; -import org.apache.fluss.metadata.TableChange; - -/** Schema update. */ -public interface UpdateSchema { - Schema getSchema(); - - UpdateSchema applySchemaChange(TableChange columnChange); -} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/watcher/TableChangeWatcher.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/watcher/TableChangeWatcher.java index acfb7e689..09c273e7d 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/watcher/TableChangeWatcher.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/watcher/TableChangeWatcher.java @@ -246,16 +246,7 @@ public class TableChangeWatcher { private void processSchemaChange(TablePath tablePath, int schemaId) { try { - int currentSchemaId = zooKeeperClient.getCurrentSchemaId(tablePath); SchemaInfo schemaInfo; - if (schemaId != currentSchemaId) { - LOG.warn( - "Schema id {} is not equal to current schema id {}. Skipping schema change processing.", - schemaId, - currentSchemaId); - return; - } - Optional<SchemaInfo> optSchema = zooKeeperClient.getSchemaById(tablePath, schemaId); if (!optSchema.isPresent()) { LOG.error("No schema for table {} in zookeeper.", tablePath); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/entity/BatchRegisterLeadAndIsr.java b/fluss-server/src/main/java/org/apache/fluss/server/entity/BatchRegisterLeadAndIsr.java index fad96a596..460562bc1 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/entity/BatchRegisterLeadAndIsr.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/entity/BatchRegisterLeadAndIsr.java @@ -55,7 +55,7 @@ public class BatchRegisterLeadAndIsr { tableBucket, leaderAndIsr, partitionName, liveReplicas)); } else { throw new IllegalArgumentException( - "Try toadd a bucket with different tableId or partitionId in collection when try to batch register to Zookeeper." + "Try to add a bucket with different tableId or partitionId in collection when try to batch register to Zookeeper." + "batch tableId=" + tableId + " batch partitionId:" diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvRecoverHelper.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvRecoverHelper.java index 1f1c4192d..96bb0eba8 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvRecoverHelper.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvRecoverHelper.java @@ -66,15 +66,6 @@ public class KvRecoverHelper { private InternalRow.FieldGetter[] currentFieldGetters; - public KvRecoverHelper( - KvTablet kvTablet, - LogTablet logTablet, - long recoverPointOffset, - KvRecoverContext recoverContext, - KvFormat kvFormat) { - throw new UnsupportedOperationException(); - } - public KvRecoverHelper( KvTablet kvTablet, LogTablet logTablet, @@ -136,64 +127,52 @@ public class KvRecoverHelper { FetchIsolation fetchIsolation, ThrowingConsumer<KeyValueAndLogOffset, Exception> resumeRecordConsumer) throws Exception { - long nextFetchOffset = startFetchOffset; - while (true) { - LogRecords logRecords = - logTablet - .read( - nextFetchOffset, - recoverContext.maxFetchLogSizeInRecoverKv, - fetchIsolation, - true, - null) - .getRecords(); - if (logRecords == MemoryLogRecords.EMPTY) { - break; - } - - for (LogRecordBatch logRecordBatch : logRecords.batches()) { - // short schemaId = logRecordBatch.schemaId(); - // if (currentSchemaId == null) { - // initSchema(schemaId); - // } else if (currentSchemaId != schemaId) { - // throw new KvStorageException( - // String.format( - // "Can't recover kv tablet for table bucket from - // log %s since the schema changes from schema id %d to schema id %d. " - // + "Currently, schema change is not - // supported.", - // recoverContext.tableBucket, currentSchemaId, - // schemaId)); - // } + try (LogRecordReadContext readContext = + LogRecordReadContext.createArrowReadContext( + currentRowType, currentSchemaId, schemaGetter)) { + long nextFetchOffset = startFetchOffset; + while (true) { + LogRecords logRecords = + logTablet + .read( + nextFetchOffset, + recoverContext.maxFetchLogSizeInRecoverKv, + fetchIsolation, + true, + null) + .getRecords(); + if (logRecords == MemoryLogRecords.EMPTY) { + break; + } - // todo: currentRowType和currentSchemaId无法对齐 - try (LogRecordReadContext readContext = - LogRecordReadContext.createArrowReadContext( - currentRowType, currentSchemaId, schemaGetter); - CloseableIterator<LogRecord> logRecordIter = - logRecordBatch.records(readContext)) { - while (logRecordIter.hasNext()) { - LogRecord logRecord = logRecordIter.next(); - if (logRecord.getChangeType() != ChangeType.UPDATE_BEFORE) { - InternalRow logRow = logRecord.getRow(); - byte[] key = keyEncoder.encodeKey(logRow); - byte[] value = null; - if (logRecord.getChangeType() != ChangeType.DELETE) { - // the log row format may not compatible with kv row format, - // e.g, arrow vs. compacted, thus needs a conversion here. - BinaryRow row = toKvRow(logRecord.getRow()); - // todo: short value是否会有问题,感觉可以先check一下 - value = ValueEncoder.encodeValue(currentSchemaId.shortValue(), row); + for (LogRecordBatch logRecordBatch : logRecords.batches()) { + try (CloseableIterator<LogRecord> logRecordIter = + logRecordBatch.records(readContext)) { + while (logRecordIter.hasNext()) { + LogRecord logRecord = logRecordIter.next(); + if (logRecord.getChangeType() != ChangeType.UPDATE_BEFORE) { + InternalRow logRow = logRecord.getRow(); + byte[] key = keyEncoder.encodeKey(logRow); + byte[] value = null; + if (logRecord.getChangeType() != ChangeType.DELETE) { + // the log row format may not compatible with kv row format, + // e.g, arrow vs. compacted, thus needs a conversion here. + BinaryRow row = toKvRow(logRecord.getRow()); + value = + ValueEncoder.encodeValue( + currentSchemaId.shortValue(), row); + } + resumeRecordConsumer.accept( + new KeyValueAndLogOffset( + key, value, logRecord.logOffset())); } - resumeRecordConsumer.accept( - new KeyValueAndLogOffset(key, value, logRecord.logOffset())); } } + nextFetchOffset = logRecordBatch.nextLogOffset(); } - nextFetchOffset = logRecordBatch.nextLogOffset(); } + return nextFetchOffset; } - return nextFetchOffset; } // TODO: this is very in-efficient, because the conversion is CPU heavy. Should be optimized in diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java index 176e749f5..70099878e 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java @@ -83,7 +83,6 @@ import java.util.concurrent.Executor; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; -import static org.apache.fluss.utils.SchemaUtil.getTargetColumns; import static org.apache.fluss.utils.concurrent.LockUtils.inReadLock; import static org.apache.fluss.utils.concurrent.LockUtils.inWriteLock; @@ -271,7 +270,7 @@ public final class KvTablet { // schema1: old data's schema -> valueDecoder // schema2: new data's schema // schema3: current schema (also the latest schema) - // schema1 <= schema2 <= current schema id + // (schema1 <= or >= schema2) <= current schema id short latestSchemaId = (short) schemaInfo.getSchemaId(); short schemaIdOfNewData = kvRecords.schemaId(); if (schemaIdOfNewData > latestSchemaId || schemaIdOfNewData < 0) { @@ -282,14 +281,10 @@ public final class KvTablet { + latestSchemaId); } Schema schemaOfNewData = schemaGetter.getSchema(schemaIdOfNewData); + // we only support ADD COLUMN, so targetColumns is fine to be used directly RowMerger currentMerger = rowMerger.configureTargetColumns( - schemaIdOfNewData == latestSchemaId - ? targetColumns - : getTargetColumns( - targetColumns, schemaOfNewData, latestSchema), - latestSchemaId, - latestSchema); + targetColumns, latestSchemaId, latestSchema); RowType currentRowType = latestSchema.getRowType(); WalBuilder walBuilder = createWalBuilder(latestSchemaId, currentRowType); walBuilder.setWriterState(kvRecords.writerId(), kvRecords.batchSequence()); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotStore.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotStore.java index b8d941221..3e93cc46a 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotStore.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotStore.java @@ -99,7 +99,7 @@ public class CompletedSnapshotStore { * Synchronously writes the new snapshots to snapshot handle store and asynchronously removes * older ones. * - * @param snapshot Completed snapshot toadd. + * @param snapshot Completed snapshot to add. */ @VisibleForTesting CompletedSnapshot addSnapshotAndSubsumeOldestOne( @@ -116,7 +116,7 @@ public class CompletedSnapshotStore { completedSnapshotHandleStore.add( snapshot.getTableBucket(), snapshot.getSnapshotID(), completedSnapshotHandle); - // Nowadd the new one. If it fails, we don't want to lose existing data. + // Now add the new one. If it fails, we don't want to lose existing data. completedSnapshots.addLast(snapshot); // Remove completed snapshot from queue and snapshotStateHandleStore, not discard. diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/SnapshotLocation.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/SnapshotLocation.java index e09c93b22..fe517f268 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/SnapshotLocation.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/SnapshotLocation.java @@ -328,7 +328,7 @@ public class SnapshotLocation { Exception latestException = null; for (int attempt = 0; attempt < 10; attempt++) { try { - // todo: mayadd entropy injection? + // todo: may add entropy injection? this.kvFilePath = createFilePath(); this.outStream = fs.create(kvFilePath, FileSystem.WriteMode.NO_OVERWRITE); return; diff --git a/fluss-server/src/main/java/org/apache/fluss/server/metadata/SchemaMetadataManager.java b/fluss-server/src/main/java/org/apache/fluss/server/metadata/ServerSchemaCache.java similarity index 97% rename from fluss-server/src/main/java/org/apache/fluss/server/metadata/SchemaMetadataManager.java rename to fluss-server/src/main/java/org/apache/fluss/server/metadata/ServerSchemaCache.java index 6bed8dfe3..0f98b092d 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/metadata/SchemaMetadataManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/metadata/ServerSchemaCache.java @@ -41,15 +41,14 @@ import java.util.concurrent.ExecutionException; * 1. latest schema of each subscribed table, updated by UpdateMetadata request. 2. history schemas * of each table, updated by lookup from tablet server. */ -// TODO SchemaManager -public class SchemaMetadataManager { +public class ServerSchemaCache { - private MetadataManager metadataManager; + private final MetadataManager metadataManager; private final Map<TablePath, SchemaInfo> latestSchemaByTablePath; private final Map<TablePath, Integer> subscriberCounters; private final Cache<TableSchemaKey, Schema> schemaCache; - public SchemaMetadataManager(MetadataManager metadataManager) { + public ServerSchemaCache(MetadataManager metadataManager) { this.metadataManager = metadataManager; // thread safe is guaranteed by subscriberCounters. this.subscriberCounters = MapUtils.newConcurrentHashMap(); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/metadata/TabletServerMetadataCache.java b/fluss-server/src/main/java/org/apache/fluss/server/metadata/TabletServerMetadataCache.java index 6a11a53ae..9a73975ce 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/metadata/TabletServerMetadataCache.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/metadata/TabletServerMetadataCache.java @@ -68,13 +68,13 @@ public class TabletServerMetadataCache implements ServerMetadataCache { private final MetadataManager metadataManager; - private final SchemaMetadataManager schemaMetadataManager; + private final ServerSchemaCache serverSchemaCache; // todo: replace this in test with schemaMetadataManager. public TabletServerMetadataCache(MetadataManager metadataManager) { this.serverMetadataSnapshot = ServerMetadataSnapshot.empty(); this.metadataManager = metadataManager; - this.schemaMetadataManager = new SchemaMetadataManager(metadataManager); + this.serverSchemaCache = new ServerSchemaCache(metadataManager); } @Override @@ -142,12 +142,12 @@ public class TabletServerMetadataCache implements ServerMetadataCache { public SchemaGetter subscribeWithInitialSchema( TablePath tablePath, int initialSchemaId, Schema initialSchema) { - return schemaMetadataManager.subscribeWithInitialSchema( + return serverSchemaCache.subscribeWithInitialSchema( tablePath, initialSchemaId, initialSchema); } public void updateLatestSchema(TablePath tablePath, SchemaInfo schemaInfo) { - schemaMetadataManager.updateLatestSchema( + serverSchemaCache.updateLatestSchema( tablePath, (short) schemaInfo.getSchemaId(), schemaInfo.getSchema()); } @@ -207,8 +207,7 @@ public class TabletServerMetadataCache implements ServerMetadataCache { // todo: apply schema id and schema info if needs int schemaId = tableInfo.getSchemaId(); Schema schema = tableInfo.getSchema(); - schemaMetadataManager.updateLatestSchema( - tablePath, (short) schemaId, schema); + serverSchemaCache.updateLatestSchema(tablePath, (short) schemaId, schema); if (tableId == DELETED_TABLE_ID) { Long removedTableId = tableIdByPath.remove(tablePath); @@ -437,7 +436,7 @@ public class TabletServerMetadataCache implements ServerMetadataCache { } @VisibleForTesting - public SchemaMetadataManager getSchemaMetadataManager() { - return schemaMetadataManager; + public ServerSchemaCache getSchemaMetadataManager() { + return serverSchemaCache; } } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/metadata/SchemaMetadataManagerTest.java b/fluss-server/src/test/java/org/apache/fluss/server/metadata/ServerSchemaCacheTest.java similarity index 94% rename from fluss-server/src/test/java/org/apache/fluss/server/metadata/SchemaMetadataManagerTest.java rename to fluss-server/src/test/java/org/apache/fluss/server/metadata/ServerSchemaCacheTest.java index 672910b04..f72277ba8 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/metadata/SchemaMetadataManagerTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/metadata/ServerSchemaCacheTest.java @@ -44,13 +44,13 @@ import static org.apache.fluss.record.TestData.DATA2_TABLE_DESCRIPTOR; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -/** Test for {@link SchemaMetadataManager}. */ -public class SchemaMetadataManagerTest { +/** Test for {@link ServerSchemaCache}. */ +public class ServerSchemaCacheTest { @Test void testPublishAndsubscribeSchemaChange() { - SchemaMetadataManager manager = - new SchemaMetadataManager(new TestingMetadataManager(Collections.emptyList())); + ServerSchemaCache manager = + new ServerSchemaCache(new TestingMetadataManager(Collections.emptyList())); SchemaGetter subscriber1 = manager.subscribeWithInitialSchema( @@ -94,8 +94,8 @@ public class SchemaMetadataManagerTest { DATA2_TABLE_DESCRIPTOR, System.currentTimeMillis(), System.currentTimeMillis()); - SchemaMetadataManager manager = - new SchemaMetadataManager( + ServerSchemaCache manager = + new ServerSchemaCache( new TestingMetadataManager(Arrays.asList(DATA1_TABLE_INFO, tableInfo2))); SchemaGetter schemaGetter = manager.subscribeWithInitialSchema(DATA1_TABLE_PATH, (short) 2, DATA2_SCHEMA); @@ -107,8 +107,8 @@ public class SchemaMetadataManagerTest { @Test void testUnsubscribeSchemaChange() { - SchemaMetadataManager manager = - new SchemaMetadataManager(new TestingMetadataManager(Collections.emptyList())); + ServerSchemaCache manager = + new ServerSchemaCache(new TestingMetadataManager(Collections.emptyList())); SchemaGetter subscriber1 = manager.subscribeWithInitialSchema( new TablePath("test_tb", "test_table_1"), (short) 1, DATA1_SCHEMA); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java b/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java index 37f3dd6cb..ef1cb92ba 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java @@ -51,8 +51,8 @@ import org.apache.fluss.server.coordinator.MetadataManager; import org.apache.fluss.server.entity.NotifyLeaderAndIsrData; import org.apache.fluss.server.kv.snapshot.CompletedSnapshot; import org.apache.fluss.server.kv.snapshot.CompletedSnapshotHandle; -import org.apache.fluss.server.metadata.SchemaMetadataManager; import org.apache.fluss.server.metadata.ServerInfo; +import org.apache.fluss.server.metadata.ServerSchemaCache; import org.apache.fluss.server.metadata.TabletServerMetadataCache; import org.apache.fluss.server.replica.Replica; import org.apache.fluss.server.replica.ReplicaManager; @@ -678,10 +678,10 @@ public final class FlussClusterExtension assertThat(leaderAndIsrOpt).isPresent(); int leader = leaderAndIsrOpt.get().leader(); TabletServer tabletServer = getTabletServerById(leader); - SchemaMetadataManager schemaMetadataManager = + ServerSchemaCache serverSchemaCache = tabletServer.getMetadataCache().getSchemaMetadataManager(); Map<TablePath, SchemaInfo> latestSchemaByTablePath = - schemaMetadataManager.getLatestSchemaByTablePath(); + serverSchemaCache.getLatestSchemaByTablePath(); assertThat(latestSchemaByTablePath).containsKey(tablePath); assertThat(latestSchemaByTablePath.get(tablePath).getSchemaId()) .isEqualTo(schemaId);
