This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch ci-add-column in repository https://gitbox.apache.org/repos/asf/fluss.git
commit df0abce847b5d0dffbe54c670bd0e4bc4a5b5b6d Author: Jark Wu <[email protected]> AuthorDate: Mon Dec 1 00:00:31 2025 +0800 server projection cache --- .../scanner/log/DefaultCompletedFetchTest.java | 3 +- .../org/apache/fluss/record/FileLogProjection.java | 105 ++++++++------------- .../fluss/record/ProjectionPushdownCache.java | 86 +++++++++++++++++ .../apache/fluss/record/FileLogProjectionTest.java | 55 ++++------- .../org/apache/fluss/server/log/FetchParams.java | 6 +- .../fluss/server/replica/ReplicaManager.java | 5 +- .../org/apache/fluss/server/kv/KvTabletTest.java | 11 ++- .../apache/fluss/server/log/FetchParamsTest.java | 14 ++- .../apache/fluss/server/replica/ReplicaTest.java | 16 +++- 9 files changed, 182 insertions(+), 119 deletions(-) diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java index bfd686dc1..d487a2d46 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java @@ -29,6 +29,7 @@ import org.apache.fluss.record.FileLogProjection; import org.apache.fluss.record.FileLogRecords; import org.apache.fluss.record.LogRecordReadContext; import org.apache.fluss.record.MemoryLogRecords; +import org.apache.fluss.record.ProjectionPushdownCache; import org.apache.fluss.record.TestingSchemaGetter; import org.apache.fluss.row.InternalRow; import org.apache.fluss.rpc.entity.FetchLogResultForBucket; @@ -265,7 +266,7 @@ public class DefaultCompletedFetchTest { rowType, DEFAULT_SCHEMA_ID, 0L, 1000L, magic, objects, LogFormat.ARROW)); fileLogRecords.flush(); - FileLogProjection fileLogProjection = new FileLogProjection(); + FileLogProjection fileLogProjection = new FileLogProjection(new ProjectionPushdownCache()); fileLogProjection.setCurrentProjection( DATA2_TABLE_ID, testingSchemaGetter, diff --git a/fluss-common/src/main/java/org/apache/fluss/record/FileLogProjection.java b/fluss-common/src/main/java/org/apache/fluss/record/FileLogProjection.java index 11c50bb3d..546275f55 100644 --- a/fluss-common/src/main/java/org/apache/fluss/record/FileLogProjection.java +++ b/fluss-common/src/main/java/org/apache/fluss/record/FileLogProjection.java @@ -54,7 +54,6 @@ import java.util.BitSet; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Objects; import static org.apache.fluss.record.DefaultLogRecordBatch.APPEND_ONLY_FLAG_MASK; import static org.apache.fluss.record.LogRecordBatchFormat.LENGTH_OFFSET; @@ -71,7 +70,6 @@ import static org.apache.fluss.record.LogRecordBatchFormat.recordsCountOffset; import static org.apache.fluss.record.LogRecordBatchFormat.schemaIdOffset; import static org.apache.fluss.utils.FileUtils.readFully; import static org.apache.fluss.utils.FileUtils.readFullyOrFail; -import static org.apache.fluss.utils.Preconditions.checkNotNull; import static org.apache.fluss.utils.Preconditions.checkState; /** Column projection util on Arrow format {@link FileLogRecords}. */ @@ -85,8 +83,8 @@ public class FileLogProjection { private static final int ARROW_HEADER_SIZE = ARROW_IPC_CONTINUATION_LENGTH + ARROW_IPC_METADATA_SIZE_LENGTH; - final Map<ProjectionKey, ProjectionInfo> projectionsCache = new HashMap<>(); - ProjectionInfo currentProjection; + // the projection cache shared in the TabletServer + private final ProjectionPushdownCache projectionsCache; // shared resources for multiple projections private final ByteArrayOutputStream outputStream; @@ -103,9 +101,10 @@ public class FileLogProjection { private SchemaGetter schemaGetter; private long tableId; private ArrowCompressionInfo compressionInfo; - private int[] selectedFieldPositions; + private int[] selectedFieldIds; - public FileLogProjection() { + public FileLogProjection(ProjectionPushdownCache projectionsCache) { + this.projectionsCache = projectionsCache; this.outputStream = new ByteArrayOutputStream(); this.writeChannel = new WriteChannel(Channels.newChannel(outputStream)); // fluss use little endian for encoding log records batch @@ -122,14 +121,7 @@ public class FileLogProjection { this.tableId = tableId; this.schemaGetter = schemaGetter; this.compressionInfo = compressionInfo; - - // Currently, only add last column is supported.Thus selectedFieldPositions is always same - // from same selectedFieldIds. - // TODO: if support drop column or add column in middle, this selectedFieldPositions should - // be re-calculated for each schema. - this.selectedFieldPositions = - selectedFieldPositions( - schemaGetter.getLatestSchemaInfo().getSchema(), selectedFieldIds); + this.selectedFieldIds = selectedFieldIds; } /** @@ -144,6 +136,8 @@ public class FileLogProjection { MultiBytesView.Builder builder = MultiBytesView.builder(); int position = start; + ProjectionInfo currentProjection = null; + short prevSchemaId = -1; // The condition is an optimization to avoid read log header when there is no enough bytes, // So we use V0 header size here for a conservative judgment. In the end, the condition // of (position >= end - recordBatchHeaderSize) will ensure the final correctness. @@ -162,8 +156,12 @@ public class FileLogProjection { int recordBatchHeaderSize = recordBatchHeaderSize(magic); int batchSizeInBytes = LOG_OVERHEAD + logHeaderBuffer.getInt(LENGTH_OFFSET); short schemaId = logHeaderBuffer.getShort(schemaIdOffset(magic)); - setCurrentSchema(schemaId); - checkNotNull(currentProjection, "There is no projection registered yet."); + + // reuse projection in the current log file + if (currentProjection == null || prevSchemaId != schemaId) { + prevSchemaId = schemaId; + currentProjection = getOrCreateProjectionInfo(schemaId); + } if (position > end - batchSizeInBytes) { // the remaining bytes in the file are not enough to read a full batch @@ -405,20 +403,22 @@ public class FileLogProjection { return logHeaderBuffer; } - private void setCurrentSchema(short schemaId) { + private ProjectionInfo getOrCreateProjectionInfo(short schemaId) { + ProjectionInfo cachedProjection = + projectionsCache.getProjectionInfo(tableId, schemaId, selectedFieldIds); + if (cachedProjection == null) { + cachedProjection = createProjectionInfo(schemaId, selectedFieldIds); + projectionsCache.setProjectionInfo( + tableId, schemaId, selectedFieldIds, cachedProjection); + } + return cachedProjection; + } + + private ProjectionInfo createProjectionInfo(short schemaId, int[] selectedFieldIds) { org.apache.fluss.metadata.Schema schema = schemaGetter.getSchema(schemaId); + int[] selectedFieldPositions = + selectedFieldPositions(schemaGetter.getSchema(schemaId), selectedFieldIds); RowType rowType = schema.getRowType(); - ProjectionKey projectionKey = new ProjectionKey(tableId, schemaId); - if (projectionsCache.containsKey(projectionKey)) { - // the schema and projection should identical for the same table id. - currentProjection = projectionsCache.get(projectionKey); - if (!Arrays.equals(currentProjection.selectedFieldPositions, selectedFieldPositions) - || !currentProjection.schema.equals(rowType)) { - throw new InvalidColumnProjectionException( - "The schema and projection should be identical for the same table id."); - } - return; - } // initialize the projection util information Schema arrowSchema = ArrowUtils.toArrowSchema(rowType); @@ -451,16 +451,14 @@ public class FileLogProjection { CompressionUtil.createBodyCompression(compressionInfo.createCompressionCodec()); int metadataLength = ArrowUtils.estimateArrowMetadataLength(projectedArrowSchema, bodyCompression); - currentProjection = - new ProjectionInfo( - nodesProjection, - buffersProjection, - bufferIndex, - rowType, - metadataLength, - bodyCompression, - selectedFieldPositions); - projectionsCache.put(projectionKey, currentProjection); + return new ProjectionInfo( + nodesProjection, + buffersProjection, + bufferIndex, + rowType, + metadataLength, + bodyCompression, + selectedFieldPositions); } int[] selectedFieldPositions(org.apache.fluss.metadata.Schema schema, int[] projectedFields) { @@ -478,7 +476,8 @@ public class FileLogProjection { if (position == null) { throw new InvalidColumnProjectionException( String.format( - "Projected field id %s is not contains in %s", fieldId, columnIds)); + "Projected field id %s is not contained in %s", + fieldId, columnIds)); } selectedFieldPositions[i] = position; @@ -493,31 +492,7 @@ public class FileLogProjection { return selectedFieldPositions; } - static final class ProjectionKey { - private final long tableId; - private final short schemaId; - - ProjectionKey(long tableId, short schemaId) { - this.tableId = tableId; - this.schemaId = schemaId; - } - - @Override - public boolean equals(Object o) { - if (!(o instanceof ProjectionKey)) { - return false; - } - ProjectionKey that = (ProjectionKey) o; - return tableId == that.tableId && schemaId == that.schemaId; - } - - @Override - public int hashCode() { - return Objects.hash(tableId, schemaId); - } - } - - static final class ProjectionInfo { + public static final class ProjectionInfo { final BitSet nodesProjection; final BitSet buffersProjection; final int bufferCount; @@ -545,7 +520,7 @@ public class FileLogProjection { } /** Metadata of a projected arrow record batch. */ - public static final class ProjectedArrowBatch { + static final class ProjectedArrowBatch { /** Number of records. */ final long numRecords; diff --git a/fluss-common/src/main/java/org/apache/fluss/record/ProjectionPushdownCache.java b/fluss-common/src/main/java/org/apache/fluss/record/ProjectionPushdownCache.java new file mode 100644 index 000000000..dc5821e70 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/record/ProjectionPushdownCache.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.record; + +import org.apache.fluss.record.FileLogProjection.ProjectionInfo; +import org.apache.fluss.shaded.guava32.com.google.common.cache.Cache; +import org.apache.fluss.shaded.guava32.com.google.common.cache.CacheBuilder; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.ThreadSafe; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Objects; + +@ThreadSafe +public class ProjectionPushdownCache { + + final Cache<ProjectionKey, ProjectionInfo> projectionCache; + + public ProjectionPushdownCache() { + // currently, the cache is shared at TabletServer level, so we give a large max size, but + // give a short expiration time. + // TODO: make the cache parameter configurable + this.projectionCache = + CacheBuilder.newBuilder() + .maximumSize(1000) + .expireAfterAccess(Duration.ofMinutes(3)) + .build(); + } + + @Nullable + public ProjectionInfo getProjectionInfo(long tableId, short schemaId, int[] selectedColumnIds) { + ProjectionKey key = new ProjectionKey(tableId, schemaId, selectedColumnIds); + return projectionCache.getIfPresent(key); + } + + public void setProjectionInfo( + long tableId, short schemaId, int[] selectedColumnIds, ProjectionInfo projectionInfo) { + ProjectionKey key = new ProjectionKey(tableId, schemaId, selectedColumnIds); + projectionCache.put(key, projectionInfo); + } + + static final class ProjectionKey { + private final long tableId; + private final short schemaId; + private final int[] selectedColumnIds; + + ProjectionKey(long tableId, short schemaId, int[] selectedColumnIds) { + this.tableId = tableId; + this.schemaId = schemaId; + this.selectedColumnIds = selectedColumnIds; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof ProjectionKey)) { + return false; + } + ProjectionKey that = (ProjectionKey) o; + return tableId == that.tableId + && schemaId == that.schemaId + && Arrays.equals(selectedColumnIds, that.selectedColumnIds); + } + + @Override + public int hashCode() { + return Objects.hash(tableId, schemaId, Arrays.hashCode(selectedColumnIds)); + } + } +} diff --git a/fluss-common/src/test/java/org/apache/fluss/record/FileLogProjectionTest.java b/fluss-common/src/test/java/org/apache/fluss/record/FileLogProjectionTest.java index 2c3ff36ef..9a27089ac 100644 --- a/fluss-common/src/test/java/org/apache/fluss/record/FileLogProjectionTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/record/FileLogProjectionTest.java @@ -78,7 +78,8 @@ class FileLogProjectionTest { TestData.DATA2, TestData.DATA2); - FileLogProjection projection = new FileLogProjection(); + ProjectionPushdownCache cache = new ProjectionPushdownCache(); + FileLogProjection projection = new FileLogProjection(cache); // get schema during running. doProjection( 1L, @@ -87,16 +88,13 @@ class FileLogProjectionTest { recordsOfData2RowType, new int[] {0, 2}, recordsOfData2RowType.sizeInBytes()); - FileLogProjection.ProjectionInfo info1 = projection.currentProjection; + assertThat(cache.projectionCache.size()).isEqualTo(1); + FileLogProjection.ProjectionInfo info1 = + cache.getProjectionInfo(1L, schemaId, new int[] {0, 2}); assertThat(info1).isNotNull(); assertThat(info1.nodesProjection.stream().toArray()).isEqualTo(new int[] {0, 2}); // a int: [0,1] ; b string: [2,3,4] ; c string: [5,6,7] assertThat(info1.buffersProjection.stream().toArray()).isEqualTo(new int[] {0, 1, 5, 6, 7}); - assertThat(projection.projectionsCache).hasSize(1); - assertThat( - projection.projectionsCache.get( - new FileLogProjection.ProjectionKey(1L, schemaId))) - .isSameAs(info1); doProjection( 2L, @@ -105,37 +103,13 @@ class FileLogProjectionTest { recordsOfData2RowType, new int[] {1}, recordsOfData2RowType.sizeInBytes()); - FileLogProjection.ProjectionInfo info2 = projection.currentProjection; + assertThat(cache.projectionCache.size()).isEqualTo(2); + FileLogProjection.ProjectionInfo info2 = + cache.getProjectionInfo(2L, schemaId, new int[] {1}); assertThat(info2).isNotNull(); assertThat(info2.nodesProjection.stream().toArray()).isEqualTo(new int[] {1}); // a int: [0,1] ; b string: [2,3,4] ; c string: [5,6,7] assertThat(info2.buffersProjection.stream().toArray()).isEqualTo(new int[] {2, 3, 4}); - assertThat(projection.projectionsCache).hasSize(2); - assertThat( - projection.projectionsCache.get( - new FileLogProjection.ProjectionKey(2L, schemaId))) - .isSameAs(info2); - - doProjection( - 1L, - schemaId, - projection, - recordsOfData2RowType, - new int[] {0, 2}, - recordsOfData2RowType.sizeInBytes()); - assertThat(projection.currentProjection).isNotNull().isSameAs(info1); - - assertThatThrownBy( - () -> - doProjection( - 1L, - schemaId, - projection, - recordsOfData2RowType, - new int[] {1}, - recordsOfData2RowType.sizeInBytes())) - .isInstanceOf(InvalidColumnProjectionException.class) - .hasMessage("The schema and projection should be identical for the same table id."); } @Test @@ -149,7 +123,7 @@ class FileLogProjectionTest { TestData.DATA2_ROW_TYPE, TestData.DATA2, TestData.DATA2); - FileLogProjection projection = new FileLogProjection(); + FileLogProjection projection = new FileLogProjection(new ProjectionPushdownCache()); assertThatThrownBy( () -> doProjection( @@ -160,7 +134,7 @@ class FileLogProjectionTest { new int[] {3}, recordsOfData2RowType.sizeInBytes())) .isInstanceOf(InvalidColumnProjectionException.class) - .hasMessage("Projected field id 3 is not contains in [0, 1, 2]"); + .hasMessage("Projected field id 3 is not contained in [0, 1, 2]"); assertThatThrownBy( () -> @@ -220,7 +194,7 @@ class FileLogProjectionTest { doProjection( 1L, schemaId, - new FileLogProjection(), + new FileLogProjection(new ProjectionPushdownCache()), fileLogRecords, projectedFields, Integer.MAX_VALUE); @@ -248,7 +222,7 @@ class FileLogProjectionTest { TestData.DATA1_ROW_TYPE, TestData.DATA1, TestData.ANOTHER_DATA1); - FileLogProjection projection = new FileLogProjection(); + FileLogProjection projection = new FileLogProjection(new ProjectionPushdownCache()); // overwrite the wrong decoding byte order endian projection.getLogHeaderBuffer().order(ByteOrder.BIG_ENDIAN); assertThatThrownBy( @@ -305,7 +279,10 @@ class FileLogProjectionTest { int maxBytes = totalSize / i; List<Object[]> results = doProjection( - new FileLogProjection(), fileLogRecords, new int[] {0, 1}, maxBytes); + new FileLogProjection(new ProjectionPushdownCache()), + fileLogRecords, + new int[] {0, 1}, + maxBytes); if (results.isEmpty()) { hasEmpty = true; } else if (results.size() == TestData.DATA1.size()) { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/log/FetchParams.java b/fluss-server/src/main/java/org/apache/fluss/server/log/FetchParams.java index 847bfa883..b08ad67f8 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/log/FetchParams.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/log/FetchParams.java @@ -21,6 +21,7 @@ import org.apache.fluss.annotation.VisibleForTesting; import org.apache.fluss.compression.ArrowCompressionInfo; import org.apache.fluss.metadata.SchemaGetter; import org.apache.fluss.record.FileLogProjection; +import org.apache.fluss.record.ProjectionPushdownCache; import org.apache.fluss.rpc.messages.FetchLogRequest; import javax.annotation.Nullable; @@ -99,13 +100,14 @@ public final class FetchParams { int maxFetchBytes, SchemaGetter schemaGetter, ArrowCompressionInfo compressionInfo, - @Nullable int[] projectedFields) { + @Nullable int[] projectedFields, + ProjectionPushdownCache projectionCache) { this.fetchOffset = fetchOffset; this.maxFetchBytes = maxFetchBytes; if (projectedFields != null) { projectionEnabled = true; if (fileLogProjection == null) { - fileLogProjection = new FileLogProjection(); + fileLogProjection = new FileLogProjection(projectionCache); } fileLogProjection.setCurrentProjection( diff --git a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java index 9f6e4d8ad..206586bf1 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java @@ -39,6 +39,7 @@ import org.apache.fluss.metrics.MetricNames; import org.apache.fluss.metrics.groups.MetricGroup; import org.apache.fluss.record.KvRecordBatch; import org.apache.fluss.record.MemoryLogRecords; +import org.apache.fluss.record.ProjectionPushdownCache; import org.apache.fluss.remote.RemoteLogFetchInfo; import org.apache.fluss.remote.RemoteLogSegment; import org.apache.fluss.rpc.RpcClient; @@ -149,6 +150,7 @@ public class ReplicaManager { private final Map<TableBucket, HostedReplica> allReplicas = MapUtils.newConcurrentHashMap(); private final TabletServerMetadataCache metadataCache; + private final ProjectionPushdownCache projectionsCache = new ProjectionPushdownCache(); private final Lock replicaStateChangeLock = new ReentrantLock(); /** @@ -1083,7 +1085,8 @@ public class ReplicaManager { adjustedMaxBytes, replica.getSchemaGetter(), replica.getArrowCompressionInfo(), - fetchReqInfo.getProjectFields()); + fetchReqInfo.getProjectFields(), + projectionsCache); LogReadInfo readInfo = replica.fetchRecords(fetchParams); // Once we read from a non-empty bucket, we stop ignoring request and bucket diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java index 87b768608..f2013a91a 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java @@ -38,6 +38,7 @@ import org.apache.fluss.record.KvRecordTestUtils; import org.apache.fluss.record.LogRecords; import org.apache.fluss.record.LogTestBase; import org.apache.fluss.record.MemoryLogRecords; +import org.apache.fluss.record.ProjectionPushdownCache; import org.apache.fluss.record.TestData; import org.apache.fluss.record.TestingSchemaGetter; import org.apache.fluss.record.bytesview.MultiBytesView; @@ -651,9 +652,10 @@ class KvTabletTest { TablePath tablePath = TablePath.of("testDb", tableName); initLogTabletAndKvTablet(tablePath, DATA1_SCHEMA_PK, config); RowType rowType = DATA1_SCHEMA_PK.getRowType(); + ProjectionPushdownCache projectionCache = new ProjectionPushdownCache(); FileLogProjection logProjection = null; if (doProjection) { - logProjection = new FileLogProjection(); + logProjection = new FileLogProjection(projectionCache); logProjection.setCurrentProjection( 0L, schemaGetter, DEFAULT_COMPRESSION, new int[] {0}); } @@ -749,7 +751,7 @@ class KvTabletTest { schemaGetter.updateLatestSchemaInfo(new SchemaInfo(DATA2_SCHEMA, 2)); readLogRowType = doProjection ? DATA2_ROW_TYPE.project(new int[] {0}) : DATA2_ROW_TYPE; if (doProjection) { - logProjection = new FileLogProjection(); + logProjection = new FileLogProjection(projectionCache); logProjection.setCurrentProjection( 0L, schemaGetter, DEFAULT_COMPRESSION, new int[] {0}); } @@ -834,9 +836,10 @@ class KvTabletTest { KvRecordTestUtils.KvRecordFactory kvRecordFactory = KvRecordTestUtils.KvRecordFactory.of(rowType); + ProjectionPushdownCache projectionCache = new ProjectionPushdownCache(); FileLogProjection logProjection = null; if (doProjection) { - logProjection = new FileLogProjection(); + logProjection = new FileLogProjection(projectionCache); logProjection.setCurrentProjection( 0L, schemaGetter, DEFAULT_COMPRESSION, new int[] {0}); } @@ -956,7 +959,7 @@ class KvTabletTest { ? newSchema.getRowType().project(new int[] {0}) : newSchema.getRowType(); if (doProjection) { - logProjection = new FileLogProjection(); + logProjection = new FileLogProjection(projectionCache); logProjection.setCurrentProjection( 0L, schemaGetter, DEFAULT_COMPRESSION, new int[] {0}); } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/log/FetchParamsTest.java b/fluss-server/src/test/java/org/apache/fluss/server/log/FetchParamsTest.java index 132de4482..d4c2be57b 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/log/FetchParamsTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/log/FetchParamsTest.java @@ -19,6 +19,7 @@ package org.apache.fluss.server.log; import org.apache.fluss.metadata.SchemaInfo; import org.apache.fluss.record.FileLogProjection; +import org.apache.fluss.record.ProjectionPushdownCache; import org.apache.fluss.record.TestData; import org.apache.fluss.record.TestingSchemaGetter; @@ -33,13 +34,15 @@ class FetchParamsTest { @Test void testSetCurrentFetch() { FetchParams fetchParams = new FetchParams(1, 100); + ProjectionPushdownCache projectionCache = new ProjectionPushdownCache(); fetchParams.setCurrentFetch( 1L, 20L, 1024, new TestingSchemaGetter(new SchemaInfo(TestData.DATA1_SCHEMA, (short) 1)), DEFAULT_COMPRESSION, - null); + null, + projectionCache); assertThat(fetchParams.fetchOffset()).isEqualTo(20L); assertThat(fetchParams.maxFetchBytes()).isEqualTo(1024); assertThat(fetchParams.projection()).isNull(); @@ -50,7 +53,8 @@ class FetchParamsTest { 512, new TestingSchemaGetter(new SchemaInfo(TestData.DATA2_SCHEMA, (short) 1)), DEFAULT_COMPRESSION, - new int[] {0, 2}); + new int[] {0, 2}, + projectionCache); assertThat(fetchParams.fetchOffset()).isEqualTo(30L); assertThat(fetchParams.maxFetchBytes()).isEqualTo(512); assertThat(fetchParams.projection()).isNotNull(); @@ -63,7 +67,8 @@ class FetchParamsTest { 256, new TestingSchemaGetter(new SchemaInfo(TestData.DATA1_SCHEMA, (short) 1)), DEFAULT_COMPRESSION, - null); + null, + projectionCache); assertThat(fetchParams.projection()).isNull(); fetchParams.setCurrentFetch( @@ -72,7 +77,8 @@ class FetchParamsTest { 512, new TestingSchemaGetter(new SchemaInfo(TestData.DATA2_SCHEMA, (short) 1)), DEFAULT_COMPRESSION, - new int[] {0, 2}); + new int[] {0, 2}, + projectionCache); // the FileLogProjection should be cached assertThat(fetchParams.projection()).isNotNull().isSameAs(prevProjection); } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java index e45a5557c..fcb4ec88c 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java @@ -31,6 +31,7 @@ import org.apache.fluss.record.KvRecordTestUtils; import org.apache.fluss.record.LogRecordBatch; import org.apache.fluss.record.LogRecords; import org.apache.fluss.record.MemoryLogRecords; +import org.apache.fluss.record.ProjectionPushdownCache; import org.apache.fluss.server.entity.NotifyLeaderAndIsrData; import org.apache.fluss.server.kv.KvTablet; import org.apache.fluss.server.kv.snapshot.CompletedSnapshot; @@ -137,8 +138,15 @@ final class ReplicaTest extends ReplicaTestBase { (int) conf.get(ConfigOptions.CLIENT_SCANNER_LOG_FETCH_MAX_BYTES) .getBytes()); + ProjectionPushdownCache projectionCache = new ProjectionPushdownCache(); fetchParams.setCurrentFetch( - DATA1_TABLE_ID, 0, Integer.MAX_VALUE, schemaGetter, DEFAULT_COMPRESSION, null); + DATA1_TABLE_ID, + 0, + Integer.MAX_VALUE, + schemaGetter, + DEFAULT_COMPRESSION, + null, + projectionCache); LogReadInfo logReadInfo = logReplica.fetchRecords(fetchParams); assertLogRecordsEquals( DATA1_ROW_TYPE, logReadInfo.getFetchedData().getRecords(), DATA1, schemaGetter); @@ -163,7 +171,8 @@ final class ReplicaTest extends ReplicaTestBase { Integer.MAX_VALUE, schemaGetter, DEFAULT_COMPRESSION, - null); + null, + projectionCache); logReadInfo = logReplica.fetchRecords(fetchParams); assertLogRecordsEquals( 2, DATA2_ROW_TYPE, logReadInfo.getFetchedData().getRecords(), DATA2, schemaGetter); @@ -808,7 +817,8 @@ final class ReplicaTest extends ReplicaTestBase { Integer.MAX_VALUE, replica.getSchemaGetter(), DEFAULT_COMPRESSION, - null); + null, + new ProjectionPushdownCache()); LogReadInfo logReadInfo = replica.fetchRecords(fetchParams); return logReadInfo.getFetchedData().getRecords(); }
