This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch ci-add-column in repository https://gitbox.apache.org/repos/asf/fluss.git
commit 57166d866e427c79a4e133095442fb261e6fbdd1 Author: Jark Wu <[email protected]> AuthorDate: Mon Dec 1 19:19:48 2025 +0800 ServerSchemaCache TablePath -> tableId --- .../org/apache/fluss/server/log/FetchParams.java | 2 +- .../java/org/apache/fluss/server/log/LocalLog.java | 2 - .../server/log/remote/FsRemoteLogOutputStream.java | 2 +- .../fluss/server/log/remote/LogSegmentFiles.java | 2 +- .../fluss/server/log/remote/LogTieringTask.java | 4 +- .../fluss/server/metadata/ServerSchemaCache.java | 74 ++++++++++++---------- .../server/metadata/TabletServerMetadataCache.java | 10 +-- .../org/apache/fluss/server/replica/Replica.java | 1 + .../server/metadata/ServerSchemaCacheTest.java | 46 ++++++-------- .../fluss/server/replica/ReplicaManagerTest.java | 16 +++-- .../apache/fluss/server/replica/ReplicaTest.java | 6 +- .../server/testutils/FlussClusterExtension.java | 12 ++-- 12 files changed, 89 insertions(+), 88 deletions(-) diff --git a/fluss-server/src/main/java/org/apache/fluss/server/log/FetchParams.java b/fluss-server/src/main/java/org/apache/fluss/server/log/FetchParams.java index b08ad67f8..0ec42a090 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/log/FetchParams.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/log/FetchParams.java @@ -67,7 +67,7 @@ public final class FetchParams { private final int minFetchBytes; private final long maxWaitMs; - // TODO:add more params like epoch etc. + // TODO: add more params like epoch etc. public FetchParams(int replicaId, int maxFetchBytes) { this(replicaId, true, maxFetchBytes, DEFAULT_MIN_FETCH_BYTES, DEFAULT_MAX_WAIT_MS); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/log/LocalLog.java b/fluss-server/src/main/java/org/apache/fluss/server/log/LocalLog.java index 086e23b0e..130b3855d 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/log/LocalLog.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/log/LocalLog.java @@ -405,8 +405,6 @@ public final class LocalLog { (maxOffsetMetadata.getSegmentBaseOffset() == segment.getBaseOffset()) ? maxOffsetMetadata.getRelativePositionInSegment() : segment.getSizeInBytes(); - - // segment需要查询 fetchDataInfo = segment.read(readOffset, maxLength, maxPosition, minOneMessage, projection); if (fetchDataInfo == null) { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/FsRemoteLogOutputStream.java b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/FsRemoteLogOutputStream.java index 05233e0ff..835a94ff6 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/FsRemoteLogOutputStream.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/FsRemoteLogOutputStream.java @@ -203,7 +203,7 @@ public class FsRemoteLogOutputStream extends FSDataOutputStream { Exception latestException = null; for (int attempt = 0; attempt < 10; attempt++) { try { - // todo: mayadd entropy injection? + // todo: may add entropy injection? this.outStream = fs.create(remoteLogFilePath, FileSystem.WriteMode.NO_OVERWRITE); return; } catch (Exception e) { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/LogSegmentFiles.java b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/LogSegmentFiles.java index 86a6bbf9e..73aee521e 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/LogSegmentFiles.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/LogSegmentFiles.java @@ -40,7 +40,7 @@ public class LogSegmentFiles { private final Path offsetIndex; private final Path timeIndex; private final @Nullable Path writerIdIndex; - // TODOadd leader epoch index after introduce leader epoch. + // TODO add leader epoch index after introduce leader epoch. public LogSegmentFiles( Path logSegment, Path offsetIndex, Path timeIndex, @Nullable Path writerIdIndex) { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/LogTieringTask.java b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/LogTieringTask.java index 0b425b8fc..cdde3842d 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/LogTieringTask.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/LogTieringTask.java @@ -227,7 +227,7 @@ public class LogTieringTask implements Runnable { } /** - * Copy the given log segments to remote andadd the successfully copied segment to the {@code + * Copy the given log segments to remote and add the successfully copied segment to the {@code * copiedSegments} parameter. * * @return the end offset of the last segment copied to remote. @@ -347,7 +347,7 @@ public class LogTieringTask implements Runnable { // the commit failed, it means the commit snapshot is invalid or register zk // failed, we will revert this commit and delete the remote log manifest // file. - // TODO:add the fail reason in the future. + // TODO: add the fail reason in the future. LOG.error( "Commit remote log manifest failed for table bucket {}. We will delete the" + " written remote log manifest file", diff --git a/fluss-server/src/main/java/org/apache/fluss/server/metadata/ServerSchemaCache.java b/fluss-server/src/main/java/org/apache/fluss/server/metadata/ServerSchemaCache.java index 0f98b092d..374e20501 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/metadata/ServerSchemaCache.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/metadata/ServerSchemaCache.java @@ -29,6 +29,8 @@ import org.apache.fluss.shaded.guava32.com.google.common.cache.CacheBuilder; import org.apache.fluss.shaded.guava32.com.google.common.util.concurrent.UncheckedExecutionException; import org.apache.fluss.utils.MapUtils; +import javax.annotation.concurrent.ThreadSafe; + import java.util.Map; import java.util.Objects; import java.util.concurrent.CompletableFuture; @@ -41,74 +43,77 @@ import java.util.concurrent.ExecutionException; * 1. latest schema of each subscribed table, updated by UpdateMetadata request. 2. history schemas * of each table, updated by lookup from tablet server. */ +@ThreadSafe public class ServerSchemaCache { private final MetadataManager metadataManager; - private final Map<TablePath, SchemaInfo> latestSchemaByTablePath; - private final Map<TablePath, Integer> subscriberCounters; + private final Map<Long, SchemaInfo> latestSchemaByTableId; + // table_id -> subscriber count + private final Map<Long, Integer> subscriberCounters; private final Cache<TableSchemaKey, Schema> schemaCache; public ServerSchemaCache(MetadataManager metadataManager) { this.metadataManager = metadataManager; // thread safe is guaranteed by subscriberCounters. this.subscriberCounters = MapUtils.newConcurrentHashMap(); - this.latestSchemaByTablePath = MapUtils.newConcurrentHashMap(); + this.latestSchemaByTableId = MapUtils.newConcurrentHashMap(); this.schemaCache = CacheBuilder.newBuilder().maximumSize(100).build(); } public SchemaGetter subscribeWithInitialSchema( - TablePath tablePath, int initialSchemaId, Schema initialSchema) { + long tableId, TablePath tablePath, int initialSchemaId, Schema initialSchema) { subscriberCounters.compute( - tablePath, + tableId, (key, oldValue) -> { - updateSchema(tablePath, initialSchemaId, initialSchema); + updateSchema(tableId, initialSchemaId, initialSchema); if (oldValue == null) { return 1; } else { return oldValue + 1; } }); - return new SchemaMetadataSubscriberImpl(tablePath); + return new SchemaMetadataSubscriberImpl(tableId, tablePath); } - public void updateLatestSchema(TablePath tablePath, short schemaId, Schema schema) { + public void updateLatestSchema(long tableId, short schemaId, Schema schema) { // only update if tablePath is subscribed. subscriberCounters.computeIfPresent( - tablePath, + tableId, (key, oldValue) -> { - updateSchema(tablePath, schemaId, schema); + updateSchema(tableId, schemaId, schema); return oldValue; }); } - private void unsubscribe(TablePath tablePath) { + private void unsubscribe(long tableId) { subscriberCounters.compute( - tablePath, + tableId, (key, oldValue) -> { if (oldValue != null && oldValue > 1) { return oldValue - 1; } - latestSchemaByTablePath.remove(tablePath); + latestSchemaByTableId.remove(tableId); return null; }); } - private Schema getFlussSchema(TablePath tablePath, int schemaId) throws ExecutionException { + private Schema getFlussSchema(long tableId, TablePath tablePath, int schemaId) + throws ExecutionException { return schemaCache.get( - new TableSchemaKey(tablePath, schemaId), + new TableSchemaKey(tableId, schemaId), () -> { - SchemaInfo schemaInfo = latestSchemaByTablePath.get(tablePath); + SchemaInfo schemaInfo = latestSchemaByTableId.get(tableId); if (schemaInfo != null && schemaInfo.getSchemaId() == schemaId) { - return latestSchemaByTablePath.get(tablePath).getSchema(); + return latestSchemaByTableId.get(tableId).getSchema(); } else { return metadataManager.getSchemaById(tablePath, schemaId).getSchema(); } }); } - private void updateSchema(TablePath tablePath, int schemaId, Schema schema) { - latestSchemaByTablePath.compute( - tablePath, + private void updateSchema(long tableId, int schemaId, Schema schema) { + latestSchemaByTableId.compute( + tableId, (key, oldValue) -> { if (oldValue == null || oldValue.getSchemaId() < schemaId) { return new SchemaInfo(schema, schemaId); @@ -117,25 +122,25 @@ public class ServerSchemaCache { } }); - schemaCache.put(new TableSchemaKey(tablePath, schemaId), schema); + schemaCache.put(new TableSchemaKey(tableId, schemaId), schema); } @VisibleForTesting - public Map<TablePath, SchemaInfo> getLatestSchemaByTablePath() { - return latestSchemaByTablePath; + public Map<Long, SchemaInfo> getLatestSchemaByTableId() { + return latestSchemaByTableId; } @VisibleForTesting - Map<TablePath, Integer> getSubscriberCounters() { + Map<Long, Integer> getSubscriberCounters() { return subscriberCounters; } private static class TableSchemaKey { - private final TablePath tablePath; + private final long tableId; private final int schemaId; - TableSchemaKey(TablePath tablePath, int schemaId) { - this.tablePath = tablePath; + TableSchemaKey(long tableId, int schemaId) { + this.tableId = tableId; this.schemaId = schemaId; } @@ -145,34 +150,35 @@ public class ServerSchemaCache { return false; } TableSchemaKey that = (TableSchemaKey) o; - return schemaId == that.schemaId && Objects.equals(tablePath, that.tablePath); + return schemaId == that.schemaId && Objects.equals(tableId, that.tableId); } @Override public int hashCode() { - return Objects.hash(tablePath, schemaId); + return Objects.hash(tableId, schemaId); } } private class SchemaMetadataSubscriberImpl implements SchemaGetter { + private final long tableId; private final TablePath tablePath; private volatile boolean released; - public SchemaMetadataSubscriberImpl(TablePath tablePath) { + public SchemaMetadataSubscriberImpl(long tableId, TablePath tablePath) { + this.tableId = tableId; this.tablePath = tablePath; this.released = false; } @Override public SchemaInfo getLatestSchemaInfo() { - return latestSchemaByTablePath.get(tablePath); + return latestSchemaByTableId.get(tableId); } @Override public Schema getSchema(int schemaId) { - // todo: support with get table. try { - return getFlussSchema(tablePath, schemaId); + return getFlussSchema(tableId, tablePath, schemaId); } catch (ExecutionException | UncheckedExecutionException executionException) { Throwable cause = executionException.getCause(); if (cause instanceof SchemaNotExistException) { @@ -192,7 +198,7 @@ public class ServerSchemaCache { @Override public void release() { if (!released) { - unsubscribe(tablePath); + unsubscribe(tableId); released = true; } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/metadata/TabletServerMetadataCache.java b/fluss-server/src/main/java/org/apache/fluss/server/metadata/TabletServerMetadataCache.java index 9a73975ce..666a74aae 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/metadata/TabletServerMetadataCache.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/metadata/TabletServerMetadataCache.java @@ -141,14 +141,14 @@ public class TabletServerMetadataCache implements ServerMetadataCache { } public SchemaGetter subscribeWithInitialSchema( - TablePath tablePath, int initialSchemaId, Schema initialSchema) { + TablePath tablePath, long tableId, int initialSchemaId, Schema initialSchema) { return serverSchemaCache.subscribeWithInitialSchema( - tablePath, initialSchemaId, initialSchema); + tableId, tablePath, initialSchemaId, initialSchema); } - public void updateLatestSchema(TablePath tablePath, SchemaInfo schemaInfo) { + public void updateLatestSchema(long tableId, SchemaInfo schemaInfo) { serverSchemaCache.updateLatestSchema( - tablePath, (short) schemaInfo.getSchemaId(), schemaInfo.getSchema()); + tableId, (short) schemaInfo.getSchemaId(), schemaInfo.getSchema()); } public Optional<PartitionMetadata> getPartitionMetadata(PhysicalTablePath partitionPath) { @@ -207,7 +207,7 @@ public class TabletServerMetadataCache implements ServerMetadataCache { // todo: apply schema id and schema info if needs int schemaId = tableInfo.getSchemaId(); Schema schema = tableInfo.getSchema(); - serverSchemaCache.updateLatestSchema(tablePath, (short) schemaId, schema); + serverSchemaCache.updateLatestSchema(tableId, (short) schemaId, schema); if (tableId == DELETED_TABLE_ID) { Long removedTableId = tableIdByPath.remove(tablePath); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java index ad2dcb24d..c1933bd5c 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java @@ -237,6 +237,7 @@ public final class Replica { this.schemaGetter = metadataCache.subscribeWithInitialSchema( physicalPath.getTablePath(), + tableInfo.getTableId(), tableInfo.getSchemaId(), tableInfo.getSchema()); this.tableConfig = tableInfo.getTableConfig(); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/metadata/ServerSchemaCacheTest.java b/fluss-server/src/test/java/org/apache/fluss/server/metadata/ServerSchemaCacheTest.java index f72277ba8..c42c7b296 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/metadata/ServerSchemaCacheTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/metadata/ServerSchemaCacheTest.java @@ -48,36 +48,33 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; public class ServerSchemaCacheTest { @Test - void testPublishAndsubscribeSchemaChange() { + void testPublishAndSubscribeSchemaChange() { ServerSchemaCache manager = new ServerSchemaCache(new TestingMetadataManager(Collections.emptyList())); SchemaGetter subscriber1 = manager.subscribeWithInitialSchema( - new TablePath("test_tb", "test_table_1"), (short) 1, DATA1_SCHEMA); + 1L, new TablePath("test_tb", "test_table_1"), (short) 1, DATA1_SCHEMA); SchemaGetter subscriber2 = manager.subscribeWithInitialSchema( - new TablePath("test_tb", "test_table_2"), (short) 1, DATA1_SCHEMA); + 2L, new TablePath("test_tb", "test_table_2"), (short) 1, DATA1_SCHEMA); assertThat(subscriber1.getLatestSchemaInfo()).isEqualTo(new SchemaInfo(DATA1_SCHEMA, 1)); assertThat(subscriber1.getLatestSchemaInfo()).isEqualTo(new SchemaInfo(DATA1_SCHEMA, 1)); - manager.updateLatestSchema( - new TablePath("test_tb", "test_table_2"), (short) 2, DATA2_SCHEMA); + manager.updateLatestSchema(2L, (short) 2, DATA2_SCHEMA); assertThat(subscriber1.getLatestSchemaInfo()).isNotNull(); assertThat(subscriber1.getLatestSchemaInfo()).isEqualTo(new SchemaInfo(DATA1_SCHEMA, 1)); assertThat(subscriber2.getLatestSchemaInfo()).isNotNull(); assertThat(subscriber2.getLatestSchemaInfo()).isEqualTo(new SchemaInfo(DATA2_SCHEMA, 2)); - manager.updateLatestSchema( - new TablePath("test_tb", "test_table_1"), (short) 2, DATA2_SCHEMA); + manager.updateLatestSchema(1L, (short) 2, DATA2_SCHEMA); assertThat(subscriber1.getLatestSchemaInfo()).isNotNull(); assertThat(subscriber1.getLatestSchemaInfo()).isEqualTo(new SchemaInfo(DATA2_SCHEMA, 2)); assertThat(subscriber2.getLatestSchemaInfo()).isNotNull(); assertThat(subscriber2.getLatestSchemaInfo()).isEqualTo(new SchemaInfo(DATA2_SCHEMA, 2)); // one more redundant schema change. - manager.updateLatestSchema( - new TablePath("test_tb", "test_table_1"), (short) 1, DATA2_SCHEMA); + manager.updateLatestSchema(1L, (short) 1, DATA2_SCHEMA); assertThat(subscriber1.getLatestSchemaInfo()).isNotNull(); assertThat(subscriber1.getLatestSchemaInfo()).isEqualTo(new SchemaInfo(DATA2_SCHEMA, 2)); assertThat(subscriber2.getLatestSchemaInfo()).isNotNull(); @@ -98,7 +95,8 @@ public class ServerSchemaCacheTest { new ServerSchemaCache( new TestingMetadataManager(Arrays.asList(DATA1_TABLE_INFO, tableInfo2))); SchemaGetter schemaGetter = - manager.subscribeWithInitialSchema(DATA1_TABLE_PATH, (short) 2, DATA2_SCHEMA); + manager.subscribeWithInitialSchema( + DATA1_TABLE_ID, DATA1_TABLE_PATH, (short) 2, DATA2_SCHEMA); assertThat(schemaGetter.getSchema(2)).isEqualTo(DATA2_SCHEMA); assertThat(schemaGetter.getSchema(1)).isEqualTo(DATA1_SCHEMA); assertThatThrownBy(() -> schemaGetter.getSchema(3)) @@ -111,39 +109,35 @@ public class ServerSchemaCacheTest { new ServerSchemaCache(new TestingMetadataManager(Collections.emptyList())); SchemaGetter subscriber1 = manager.subscribeWithInitialSchema( - new TablePath("test_tb", "test_table_1"), (short) 1, DATA1_SCHEMA); + 1L, new TablePath("test_tb", "test_table_1"), (short) 1, DATA1_SCHEMA); SchemaGetter subscriber2 = manager.subscribeWithInitialSchema( - new TablePath("test_tb", "test_table_1"), (short) 1, DATA1_SCHEMA); + 1L, new TablePath("test_tb", "test_table_1"), (short) 1, DATA1_SCHEMA); SchemaGetter subscriber3 = manager.subscribeWithInitialSchema( - new TablePath("test_tb", "test_table_2"), (short) 1, DATA1_SCHEMA); - assertThat(manager.getLatestSchemaByTablePath()).hasSize(2); + 2L, new TablePath("test_tb", "test_table_2"), (short) 1, DATA1_SCHEMA); + assertThat(manager.getLatestSchemaByTableId()).hasSize(2); assertThat(manager.getSubscriberCounters()).hasSize(2); - assertThat(manager.getSubscriberCounters().get(new TablePath("test_tb", "test_table_1"))) - .isEqualTo(2); + assertThat(manager.getSubscriberCounters().get(1L)).isEqualTo(2); subscriber1.release(); - assertThat(manager.getLatestSchemaByTablePath()).hasSize(2); + assertThat(manager.getLatestSchemaByTableId()).hasSize(2); assertThat(manager.getSubscriberCounters()).hasSize(2); - assertThat(manager.getSubscriberCounters().get(new TablePath("test_tb", "test_table_1"))) - .isEqualTo(1); + assertThat(manager.getSubscriberCounters().get(1L)).isEqualTo(1); // one more redundant unsubscribe. subscriber1.release(); - assertThat(manager.getLatestSchemaByTablePath()).hasSize(2); + assertThat(manager.getLatestSchemaByTableId()).hasSize(2); assertThat(manager.getSubscriberCounters()).hasSize(2); - assertThat(manager.getSubscriberCounters().get(new TablePath("test_tb", "test_table_1"))) - .isEqualTo(1); + assertThat(manager.getSubscriberCounters().get(1L)).isEqualTo(1); subscriber2.release(); - assertThat(manager.getLatestSchemaByTablePath()).hasSize(1); + assertThat(manager.getLatestSchemaByTableId()).hasSize(1); assertThat(manager.getSubscriberCounters()).hasSize(1); - assertThat(manager.getSubscriberCounters().get(new TablePath("test_tb", "test_table_1"))) - .isNull(); + assertThat(manager.getSubscriberCounters().get(1L)).isNull(); subscriber3.release(); - assertThat(manager.getLatestSchemaByTablePath()).isEmpty(); + assertThat(manager.getLatestSchemaByTableId()).isEmpty(); assertThat(manager.getSubscriberCounters()).isEmpty(); subscriber3.release(); } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java index 92c24f871..a669af2c1 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java @@ -193,7 +193,7 @@ class ReplicaManagerTest extends ReplicaTestBase { void testFetchLog() throws Exception { SchemaGetter schemaGetter = serverMetadataCache.subscribeWithInitialSchema( - DATA1_TABLE_PATH, DEFAULT_SCHEMA_ID, DATA1_SCHEMA); + DATA1_TABLE_PATH, DATA1_TABLE_ID, DEFAULT_SCHEMA_ID, DATA1_SCHEMA); TableBucket tb = new TableBucket(DATA1_TABLE_ID, 1); makeLogTableAsLeader(tb.getBucket()); @@ -309,7 +309,7 @@ class ReplicaManagerTest extends ReplicaTestBase { void testFetchLogWithMaxBytesLimit() throws Exception { SchemaGetter schemaGetter = serverMetadataCache.subscribeWithInitialSchema( - DATA1_TABLE_PATH, DEFAULT_SCHEMA_ID, DATA1_SCHEMA); + DATA1_TABLE_PATH, DATA1_TABLE_ID, DEFAULT_SCHEMA_ID, DATA1_SCHEMA); TableBucket tb = new TableBucket(DATA1_TABLE_ID, 1); makeLogTableAsLeader(tb.getBucket()); @@ -432,7 +432,8 @@ class ReplicaManagerTest extends ReplicaTestBase { assertThat(records1).isNotNull(); assertThat(records2).isNotNull(); SchemaGetter schemaGetter = - serverMetadataCache.subscribeWithInitialSchema(DATA1_TABLE_PATH, 1, DATA1_SCHEMA); + serverMetadataCache.subscribeWithInitialSchema( + DATA1_TABLE_PATH, DATA1_TABLE_ID, 1, DATA1_SCHEMA); if (records1.sizeInBytes() == 0) { assertThat(records2.sizeInBytes() > 0).isTrue(); assertMemoryRecordsEquals( @@ -498,7 +499,7 @@ class ReplicaManagerTest extends ReplicaTestBase { makeKvTableAsLeader(DATA1_TABLE_ID_PK, DATA1_TABLE_PATH_PK, tb.getBucket()); SchemaGetter schemaGetter = serverMetadataCache.subscribeWithInitialSchema( - DATA1_TABLE_PATH_PK, 1, DATA1_SCHEMA_PK); + DATA1_TABLE_PATH_PK, DATA1_TABLE_ID_PK, 1, DATA1_SCHEMA_PK); // 1. put kv records to kv store. List<Tuple2<Object[], Object[]>> data1 = @@ -628,7 +629,7 @@ class ReplicaManagerTest extends ReplicaTestBase { makeKvTableAsLeader(DATA1_TABLE_ID_PK, DATA1_TABLE_PATH_PK, tb.getBucket()); SchemaGetter schemaGetter = serverMetadataCache.subscribeWithInitialSchema( - DATA1_TABLE_PATH_PK, 1, DATA1_SCHEMA_PK); + DATA1_TABLE_PATH_PK, DATA1_TABLE_ID_PK, 1, DATA1_SCHEMA_PK); // put 10 batches delete non-exists key batch to kv store. CompletableFuture<List<PutKvResultForBucket>> future; @@ -906,7 +907,8 @@ class ReplicaManagerTest extends ReplicaTestBase { CompletableFuture<LimitScanResultForBucket> limitFuture = new CompletableFuture<>(); replicaManager.limitScan(tb, 10, limitFuture::complete); SchemaGetter schemaGetter = - serverMetadataCache.subscribeWithInitialSchema(DATA1_TABLE_PATH, 1, DATA1_SCHEMA); + serverMetadataCache.subscribeWithInitialSchema( + DATA1_TABLE_PATH, DATA1_TABLE_ID, 1, DATA1_SCHEMA); assertMemoryRecordsEquals( DATA1_ROW_TYPE, schemaGetter, @@ -1463,7 +1465,7 @@ class ReplicaManagerTest extends ReplicaTestBase { FetchLogResultForBucket r = r1.getValue(); SchemaGetter schemaGetter = serverMetadataCache.subscribeWithInitialSchema( - DATA1_TABLE_PATH, 1, DATA1_SCHEMA); + DATA1_TABLE_PATH, DATA1_TABLE_ID, 1, DATA1_SCHEMA); assertLogRecordsEqualsWithRowKind( DEFAULT_SCHEMA_ID, DATA1_ROW_TYPE, diff --git a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java index fcb4ec88c..a18d76442 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java @@ -153,7 +153,7 @@ final class ReplicaTest extends ReplicaTestBase { // schema evolution. serverMetadataCache.updateLatestSchema( - DATA1_TABLE_PATH, new SchemaInfo(DATA2_SCHEMA, (short) 2)); + DATA1_TABLE_ID, new SchemaInfo(DATA2_SCHEMA, (short) 2)); mr = createRecordsWithoutBaseLogOffset( DATA2_ROW_TYPE, @@ -402,7 +402,7 @@ final class ReplicaTest extends ReplicaTestBase { currentOffset += 2; short newSchemaId = 2; serverMetadataCache.updateLatestSchema( - DATA1_TABLE_PATH_PK, new SchemaInfo(DATA2_SCHEMA, newSchemaId)); + DATA1_TABLE_ID, new SchemaInfo(DATA2_SCHEMA, newSchemaId)); KvRecordTestUtils.KvRecordBatchFactory kvRecordBatchFactory2 = KvRecordTestUtils.KvRecordBatchFactory.of(newSchemaId); KvRecordTestUtils.KvRecordFactory kvRecordFactory2 = @@ -727,7 +727,7 @@ final class ReplicaTest extends ReplicaTestBase { // update schema. zkClient.registerSchema(DATA1_TABLE_PATH_PK, DATA2_SCHEMA, newSchemaId); serverMetadataCache.updateLatestSchema( - DATA1_TABLE_PATH_PK, new SchemaInfo(DATA2_SCHEMA, newSchemaId)); + DATA1_TABLE_ID, new SchemaInfo(DATA2_SCHEMA, newSchemaId)); // write data with new schema putRecordsToLeader( kvReplica, diff --git a/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java b/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java index ef1cb92ba..8cd737e51 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java @@ -670,9 +670,9 @@ public final class FlussClusterExtension () -> { TableRegistration tableRegistration = zkClient.getTable(tablePath).get(); int bucketCount = tableRegistration.bucketCount; + long tableId = tableRegistration.tableId; for (int bucketId = 0; bucketId < bucketCount; bucketId++) { - TableBucket tableBucket = - new TableBucket(tableRegistration.tableId, bucketId); + TableBucket tableBucket = new TableBucket(tableId, bucketId); Optional<LeaderAndIsr> leaderAndIsrOpt = zkClient.getLeaderAndIsr(tableBucket); assertThat(leaderAndIsrOpt).isPresent(); @@ -680,10 +680,10 @@ public final class FlussClusterExtension TabletServer tabletServer = getTabletServerById(leader); ServerSchemaCache serverSchemaCache = tabletServer.getMetadataCache().getSchemaMetadataManager(); - Map<TablePath, SchemaInfo> latestSchemaByTablePath = - serverSchemaCache.getLatestSchemaByTablePath(); - assertThat(latestSchemaByTablePath).containsKey(tablePath); - assertThat(latestSchemaByTablePath.get(tablePath).getSchemaId()) + Map<Long, SchemaInfo> latestSchemaByTablePath = + serverSchemaCache.getLatestSchemaByTableId(); + assertThat(latestSchemaByTablePath).containsKey(tableId); + assertThat(latestSchemaByTablePath.get(tableId).getSchemaId()) .isEqualTo(schemaId); } });
