This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch ci-add-column
in repository https://gitbox.apache.org/repos/asf/fluss.git

commit df0abce847b5d0dffbe54c670bd0e4bc4a5b5b6d
Author: Jark Wu <[email protected]>
AuthorDate: Mon Dec 1 00:00:31 2025 +0800

    server projection cache
---
 .../scanner/log/DefaultCompletedFetchTest.java     |   3 +-
 .../org/apache/fluss/record/FileLogProjection.java | 105 ++++++++-------------
 .../fluss/record/ProjectionPushdownCache.java      |  86 +++++++++++++++++
 .../apache/fluss/record/FileLogProjectionTest.java |  55 ++++-------
 .../org/apache/fluss/server/log/FetchParams.java   |   6 +-
 .../fluss/server/replica/ReplicaManager.java       |   5 +-
 .../org/apache/fluss/server/kv/KvTabletTest.java   |  11 ++-
 .../apache/fluss/server/log/FetchParamsTest.java   |  14 ++-
 .../apache/fluss/server/replica/ReplicaTest.java   |  16 +++-
 9 files changed, 182 insertions(+), 119 deletions(-)

diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java
index bfd686dc1..d487a2d46 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java
@@ -29,6 +29,7 @@ import org.apache.fluss.record.FileLogProjection;
 import org.apache.fluss.record.FileLogRecords;
 import org.apache.fluss.record.LogRecordReadContext;
 import org.apache.fluss.record.MemoryLogRecords;
+import org.apache.fluss.record.ProjectionPushdownCache;
 import org.apache.fluss.record.TestingSchemaGetter;
 import org.apache.fluss.row.InternalRow;
 import org.apache.fluss.rpc.entity.FetchLogResultForBucket;
@@ -265,7 +266,7 @@ public class DefaultCompletedFetchTest {
                         rowType, DEFAULT_SCHEMA_ID, 0L, 1000L, magic, objects, 
LogFormat.ARROW));
         fileLogRecords.flush();
 
-        FileLogProjection fileLogProjection = new FileLogProjection();
+        FileLogProjection fileLogProjection = new FileLogProjection(new 
ProjectionPushdownCache());
         fileLogProjection.setCurrentProjection(
                 DATA2_TABLE_ID,
                 testingSchemaGetter,
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/record/FileLogProjection.java 
b/fluss-common/src/main/java/org/apache/fluss/record/FileLogProjection.java
index 11c50bb3d..546275f55 100644
--- a/fluss-common/src/main/java/org/apache/fluss/record/FileLogProjection.java
+++ b/fluss-common/src/main/java/org/apache/fluss/record/FileLogProjection.java
@@ -54,7 +54,6 @@ import java.util.BitSet;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 
 import static 
org.apache.fluss.record.DefaultLogRecordBatch.APPEND_ONLY_FLAG_MASK;
 import static org.apache.fluss.record.LogRecordBatchFormat.LENGTH_OFFSET;
@@ -71,7 +70,6 @@ import static 
org.apache.fluss.record.LogRecordBatchFormat.recordsCountOffset;
 import static org.apache.fluss.record.LogRecordBatchFormat.schemaIdOffset;
 import static org.apache.fluss.utils.FileUtils.readFully;
 import static org.apache.fluss.utils.FileUtils.readFullyOrFail;
-import static org.apache.fluss.utils.Preconditions.checkNotNull;
 import static org.apache.fluss.utils.Preconditions.checkState;
 
 /** Column projection util on Arrow format {@link FileLogRecords}. */
@@ -85,8 +83,8 @@ public class FileLogProjection {
     private static final int ARROW_HEADER_SIZE =
             ARROW_IPC_CONTINUATION_LENGTH + ARROW_IPC_METADATA_SIZE_LENGTH;
 
-    final Map<ProjectionKey, ProjectionInfo> projectionsCache = new 
HashMap<>();
-    ProjectionInfo currentProjection;
+    // the projection cache shared in the TabletServer
+    private final ProjectionPushdownCache projectionsCache;
 
     // shared resources for multiple projections
     private final ByteArrayOutputStream outputStream;
@@ -103,9 +101,10 @@ public class FileLogProjection {
     private SchemaGetter schemaGetter;
     private long tableId;
     private ArrowCompressionInfo compressionInfo;
-    private int[] selectedFieldPositions;
+    private int[] selectedFieldIds;
 
-    public FileLogProjection() {
+    public FileLogProjection(ProjectionPushdownCache projectionsCache) {
+        this.projectionsCache = projectionsCache;
         this.outputStream = new ByteArrayOutputStream();
         this.writeChannel = new 
WriteChannel(Channels.newChannel(outputStream));
         // fluss use little endian for encoding log records batch
@@ -122,14 +121,7 @@ public class FileLogProjection {
         this.tableId = tableId;
         this.schemaGetter = schemaGetter;
         this.compressionInfo = compressionInfo;
-
-        // Currently, only add last column is supported.Thus 
selectedFieldPositions is always same
-        // from same selectedFieldIds.
-        // TODO: if support drop column or add column in middle, this 
selectedFieldPositions should
-        // be re-calculated for each schema.
-        this.selectedFieldPositions =
-                selectedFieldPositions(
-                        schemaGetter.getLatestSchemaInfo().getSchema(), 
selectedFieldIds);
+        this.selectedFieldIds = selectedFieldIds;
     }
 
     /**
@@ -144,6 +136,8 @@ public class FileLogProjection {
         MultiBytesView.Builder builder = MultiBytesView.builder();
         int position = start;
 
+        ProjectionInfo currentProjection = null;
+        short prevSchemaId = -1;
         // The condition is an optimization to avoid read log header when 
there is no enough bytes,
         // So we use V0 header size here for a conservative judgment. In the 
end, the condition
         // of (position >= end - recordBatchHeaderSize) will ensure the final 
correctness.
@@ -162,8 +156,12 @@ public class FileLogProjection {
             int recordBatchHeaderSize = recordBatchHeaderSize(magic);
             int batchSizeInBytes = LOG_OVERHEAD + 
logHeaderBuffer.getInt(LENGTH_OFFSET);
             short schemaId = logHeaderBuffer.getShort(schemaIdOffset(magic));
-            setCurrentSchema(schemaId);
-            checkNotNull(currentProjection, "There is no projection registered 
yet.");
+
+            // reuse projection in the current log file
+            if (currentProjection == null || prevSchemaId != schemaId) {
+                prevSchemaId = schemaId;
+                currentProjection = getOrCreateProjectionInfo(schemaId);
+            }
 
             if (position > end - batchSizeInBytes) {
                 // the remaining bytes in the file are not enough to read a 
full batch
@@ -405,20 +403,22 @@ public class FileLogProjection {
         return logHeaderBuffer;
     }
 
-    private void setCurrentSchema(short schemaId) {
+    private ProjectionInfo getOrCreateProjectionInfo(short schemaId) {
+        ProjectionInfo cachedProjection =
+                projectionsCache.getProjectionInfo(tableId, schemaId, 
selectedFieldIds);
+        if (cachedProjection == null) {
+            cachedProjection = createProjectionInfo(schemaId, 
selectedFieldIds);
+            projectionsCache.setProjectionInfo(
+                    tableId, schemaId, selectedFieldIds, cachedProjection);
+        }
+        return cachedProjection;
+    }
+
+    private ProjectionInfo createProjectionInfo(short schemaId, int[] 
selectedFieldIds) {
         org.apache.fluss.metadata.Schema schema = 
schemaGetter.getSchema(schemaId);
+        int[] selectedFieldPositions =
+                selectedFieldPositions(schemaGetter.getSchema(schemaId), 
selectedFieldIds);
         RowType rowType = schema.getRowType();
-        ProjectionKey projectionKey = new ProjectionKey(tableId, schemaId);
-        if (projectionsCache.containsKey(projectionKey)) {
-            // the schema and projection should identical for the same table 
id.
-            currentProjection = projectionsCache.get(projectionKey);
-            if (!Arrays.equals(currentProjection.selectedFieldPositions, 
selectedFieldPositions)
-                    || !currentProjection.schema.equals(rowType)) {
-                throw new InvalidColumnProjectionException(
-                        "The schema and projection should be identical for the 
same table id.");
-            }
-            return;
-        }
 
         // initialize the projection util information
         Schema arrowSchema = ArrowUtils.toArrowSchema(rowType);
@@ -451,16 +451,14 @@ public class FileLogProjection {
                 
CompressionUtil.createBodyCompression(compressionInfo.createCompressionCodec());
         int metadataLength =
                 ArrowUtils.estimateArrowMetadataLength(projectedArrowSchema, 
bodyCompression);
-        currentProjection =
-                new ProjectionInfo(
-                        nodesProjection,
-                        buffersProjection,
-                        bufferIndex,
-                        rowType,
-                        metadataLength,
-                        bodyCompression,
-                        selectedFieldPositions);
-        projectionsCache.put(projectionKey, currentProjection);
+        return new ProjectionInfo(
+                nodesProjection,
+                buffersProjection,
+                bufferIndex,
+                rowType,
+                metadataLength,
+                bodyCompression,
+                selectedFieldPositions);
     }
 
     int[] selectedFieldPositions(org.apache.fluss.metadata.Schema schema, 
int[] projectedFields) {
@@ -478,7 +476,8 @@ public class FileLogProjection {
             if (position == null) {
                 throw new InvalidColumnProjectionException(
                         String.format(
-                                "Projected field id %s is not contains in %s", 
fieldId, columnIds));
+                                "Projected field id %s is not contained in %s",
+                                fieldId, columnIds));
             }
 
             selectedFieldPositions[i] = position;
@@ -493,31 +492,7 @@ public class FileLogProjection {
         return selectedFieldPositions;
     }
 
-    static final class ProjectionKey {
-        private final long tableId;
-        private final short schemaId;
-
-        ProjectionKey(long tableId, short schemaId) {
-            this.tableId = tableId;
-            this.schemaId = schemaId;
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (!(o instanceof ProjectionKey)) {
-                return false;
-            }
-            ProjectionKey that = (ProjectionKey) o;
-            return tableId == that.tableId && schemaId == that.schemaId;
-        }
-
-        @Override
-        public int hashCode() {
-            return Objects.hash(tableId, schemaId);
-        }
-    }
-
-    static final class ProjectionInfo {
+    public static final class ProjectionInfo {
         final BitSet nodesProjection;
         final BitSet buffersProjection;
         final int bufferCount;
@@ -545,7 +520,7 @@ public class FileLogProjection {
     }
 
     /** Metadata of a projected arrow record batch. */
-    public static final class ProjectedArrowBatch {
+    static final class ProjectedArrowBatch {
         /** Number of records. */
         final long numRecords;
 
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/record/ProjectionPushdownCache.java
 
b/fluss-common/src/main/java/org/apache/fluss/record/ProjectionPushdownCache.java
new file mode 100644
index 000000000..dc5821e70
--- /dev/null
+++ 
b/fluss-common/src/main/java/org/apache/fluss/record/ProjectionPushdownCache.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.record;
+
+import org.apache.fluss.record.FileLogProjection.ProjectionInfo;
+import org.apache.fluss.shaded.guava32.com.google.common.cache.Cache;
+import org.apache.fluss.shaded.guava32.com.google.common.cache.CacheBuilder;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Objects;
+
+@ThreadSafe
+public class ProjectionPushdownCache {
+
+    final Cache<ProjectionKey, ProjectionInfo> projectionCache;
+
+    public ProjectionPushdownCache() {
+        // currently, the cache is shared at TabletServer level, so we give a 
large max size, but
+        // give a short expiration time.
+        // TODO: make the cache parameter configurable
+        this.projectionCache =
+                CacheBuilder.newBuilder()
+                        .maximumSize(1000)
+                        .expireAfterAccess(Duration.ofMinutes(3))
+                        .build();
+    }
+
+    @Nullable
+    public ProjectionInfo getProjectionInfo(long tableId, short schemaId, 
int[] selectedColumnIds) {
+        ProjectionKey key = new ProjectionKey(tableId, schemaId, 
selectedColumnIds);
+        return projectionCache.getIfPresent(key);
+    }
+
+    public void setProjectionInfo(
+            long tableId, short schemaId, int[] selectedColumnIds, 
ProjectionInfo projectionInfo) {
+        ProjectionKey key = new ProjectionKey(tableId, schemaId, 
selectedColumnIds);
+        projectionCache.put(key, projectionInfo);
+    }
+
+    static final class ProjectionKey {
+        private final long tableId;
+        private final short schemaId;
+        private final int[] selectedColumnIds;
+
+        ProjectionKey(long tableId, short schemaId, int[] selectedColumnIds) {
+            this.tableId = tableId;
+            this.schemaId = schemaId;
+            this.selectedColumnIds = selectedColumnIds;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (!(o instanceof ProjectionKey)) {
+                return false;
+            }
+            ProjectionKey that = (ProjectionKey) o;
+            return tableId == that.tableId
+                    && schemaId == that.schemaId
+                    && Arrays.equals(selectedColumnIds, 
that.selectedColumnIds);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(tableId, schemaId, 
Arrays.hashCode(selectedColumnIds));
+        }
+    }
+}
diff --git 
a/fluss-common/src/test/java/org/apache/fluss/record/FileLogProjectionTest.java 
b/fluss-common/src/test/java/org/apache/fluss/record/FileLogProjectionTest.java
index 2c3ff36ef..9a27089ac 100644
--- 
a/fluss-common/src/test/java/org/apache/fluss/record/FileLogProjectionTest.java
+++ 
b/fluss-common/src/test/java/org/apache/fluss/record/FileLogProjectionTest.java
@@ -78,7 +78,8 @@ class FileLogProjectionTest {
                         TestData.DATA2,
                         TestData.DATA2);
 
-        FileLogProjection projection = new FileLogProjection();
+        ProjectionPushdownCache cache = new ProjectionPushdownCache();
+        FileLogProjection projection = new FileLogProjection(cache);
         // get schema during running.
         doProjection(
                 1L,
@@ -87,16 +88,13 @@ class FileLogProjectionTest {
                 recordsOfData2RowType,
                 new int[] {0, 2},
                 recordsOfData2RowType.sizeInBytes());
-        FileLogProjection.ProjectionInfo info1 = projection.currentProjection;
+        assertThat(cache.projectionCache.size()).isEqualTo(1);
+        FileLogProjection.ProjectionInfo info1 =
+                cache.getProjectionInfo(1L, schemaId, new int[] {0, 2});
         assertThat(info1).isNotNull();
         assertThat(info1.nodesProjection.stream().toArray()).isEqualTo(new 
int[] {0, 2});
         // a int: [0,1] ; b string: [2,3,4] ; c string: [5,6,7]
         assertThat(info1.buffersProjection.stream().toArray()).isEqualTo(new 
int[] {0, 1, 5, 6, 7});
-        assertThat(projection.projectionsCache).hasSize(1);
-        assertThat(
-                        projection.projectionsCache.get(
-                                new FileLogProjection.ProjectionKey(1L, 
schemaId)))
-                .isSameAs(info1);
 
         doProjection(
                 2L,
@@ -105,37 +103,13 @@ class FileLogProjectionTest {
                 recordsOfData2RowType,
                 new int[] {1},
                 recordsOfData2RowType.sizeInBytes());
-        FileLogProjection.ProjectionInfo info2 = projection.currentProjection;
+        assertThat(cache.projectionCache.size()).isEqualTo(2);
+        FileLogProjection.ProjectionInfo info2 =
+                cache.getProjectionInfo(2L, schemaId, new int[] {1});
         assertThat(info2).isNotNull();
         assertThat(info2.nodesProjection.stream().toArray()).isEqualTo(new 
int[] {1});
         // a int: [0,1] ; b string: [2,3,4] ; c string: [5,6,7]
         assertThat(info2.buffersProjection.stream().toArray()).isEqualTo(new 
int[] {2, 3, 4});
-        assertThat(projection.projectionsCache).hasSize(2);
-        assertThat(
-                        projection.projectionsCache.get(
-                                new FileLogProjection.ProjectionKey(2L, 
schemaId)))
-                .isSameAs(info2);
-
-        doProjection(
-                1L,
-                schemaId,
-                projection,
-                recordsOfData2RowType,
-                new int[] {0, 2},
-                recordsOfData2RowType.sizeInBytes());
-        assertThat(projection.currentProjection).isNotNull().isSameAs(info1);
-
-        assertThatThrownBy(
-                        () ->
-                                doProjection(
-                                        1L,
-                                        schemaId,
-                                        projection,
-                                        recordsOfData2RowType,
-                                        new int[] {1},
-                                        recordsOfData2RowType.sizeInBytes()))
-                .isInstanceOf(InvalidColumnProjectionException.class)
-                .hasMessage("The schema and projection should be identical for 
the same table id.");
     }
 
     @Test
@@ -149,7 +123,7 @@ class FileLogProjectionTest {
                         TestData.DATA2_ROW_TYPE,
                         TestData.DATA2,
                         TestData.DATA2);
-        FileLogProjection projection = new FileLogProjection();
+        FileLogProjection projection = new FileLogProjection(new 
ProjectionPushdownCache());
         assertThatThrownBy(
                         () ->
                                 doProjection(
@@ -160,7 +134,7 @@ class FileLogProjectionTest {
                                         new int[] {3},
                                         recordsOfData2RowType.sizeInBytes()))
                 .isInstanceOf(InvalidColumnProjectionException.class)
-                .hasMessage("Projected field id 3 is not contains in [0, 1, 
2]");
+                .hasMessage("Projected field id 3 is not contained in [0, 1, 
2]");
 
         assertThatThrownBy(
                         () ->
@@ -220,7 +194,7 @@ class FileLogProjectionTest {
                 doProjection(
                         1L,
                         schemaId,
-                        new FileLogProjection(),
+                        new FileLogProjection(new ProjectionPushdownCache()),
                         fileLogRecords,
                         projectedFields,
                         Integer.MAX_VALUE);
@@ -248,7 +222,7 @@ class FileLogProjectionTest {
                         TestData.DATA1_ROW_TYPE,
                         TestData.DATA1,
                         TestData.ANOTHER_DATA1);
-        FileLogProjection projection = new FileLogProjection();
+        FileLogProjection projection = new FileLogProjection(new 
ProjectionPushdownCache());
         // overwrite the wrong decoding byte order endian
         projection.getLogHeaderBuffer().order(ByteOrder.BIG_ENDIAN);
         assertThatThrownBy(
@@ -305,7 +279,10 @@ class FileLogProjectionTest {
             int maxBytes = totalSize / i;
             List<Object[]> results =
                     doProjection(
-                            new FileLogProjection(), fileLogRecords, new int[] 
{0, 1}, maxBytes);
+                            new FileLogProjection(new 
ProjectionPushdownCache()),
+                            fileLogRecords,
+                            new int[] {0, 1},
+                            maxBytes);
             if (results.isEmpty()) {
                 hasEmpty = true;
             } else if (results.size() == TestData.DATA1.size()) {
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/log/FetchParams.java 
b/fluss-server/src/main/java/org/apache/fluss/server/log/FetchParams.java
index 847bfa883..b08ad67f8 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/log/FetchParams.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/log/FetchParams.java
@@ -21,6 +21,7 @@ import org.apache.fluss.annotation.VisibleForTesting;
 import org.apache.fluss.compression.ArrowCompressionInfo;
 import org.apache.fluss.metadata.SchemaGetter;
 import org.apache.fluss.record.FileLogProjection;
+import org.apache.fluss.record.ProjectionPushdownCache;
 import org.apache.fluss.rpc.messages.FetchLogRequest;
 
 import javax.annotation.Nullable;
@@ -99,13 +100,14 @@ public final class FetchParams {
             int maxFetchBytes,
             SchemaGetter schemaGetter,
             ArrowCompressionInfo compressionInfo,
-            @Nullable int[] projectedFields) {
+            @Nullable int[] projectedFields,
+            ProjectionPushdownCache projectionCache) {
         this.fetchOffset = fetchOffset;
         this.maxFetchBytes = maxFetchBytes;
         if (projectedFields != null) {
             projectionEnabled = true;
             if (fileLogProjection == null) {
-                fileLogProjection = new FileLogProjection();
+                fileLogProjection = new FileLogProjection(projectionCache);
             }
 
             fileLogProjection.setCurrentProjection(
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
index 9f6e4d8ad..206586bf1 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
@@ -39,6 +39,7 @@ import org.apache.fluss.metrics.MetricNames;
 import org.apache.fluss.metrics.groups.MetricGroup;
 import org.apache.fluss.record.KvRecordBatch;
 import org.apache.fluss.record.MemoryLogRecords;
+import org.apache.fluss.record.ProjectionPushdownCache;
 import org.apache.fluss.remote.RemoteLogFetchInfo;
 import org.apache.fluss.remote.RemoteLogSegment;
 import org.apache.fluss.rpc.RpcClient;
@@ -149,6 +150,7 @@ public class ReplicaManager {
     private final Map<TableBucket, HostedReplica> allReplicas = 
MapUtils.newConcurrentHashMap();
 
     private final TabletServerMetadataCache metadataCache;
+    private final ProjectionPushdownCache projectionsCache = new 
ProjectionPushdownCache();
     private final Lock replicaStateChangeLock = new ReentrantLock();
 
     /**
@@ -1083,7 +1085,8 @@ public class ReplicaManager {
                         adjustedMaxBytes,
                         replica.getSchemaGetter(),
                         replica.getArrowCompressionInfo(),
-                        fetchReqInfo.getProjectFields());
+                        fetchReqInfo.getProjectFields(),
+                        projectionsCache);
                 LogReadInfo readInfo = replica.fetchRecords(fetchParams);
 
                 // Once we read from a non-empty bucket, we stop ignoring 
request and bucket
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java
index 87b768608..f2013a91a 100644
--- a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java
+++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java
@@ -38,6 +38,7 @@ import org.apache.fluss.record.KvRecordTestUtils;
 import org.apache.fluss.record.LogRecords;
 import org.apache.fluss.record.LogTestBase;
 import org.apache.fluss.record.MemoryLogRecords;
+import org.apache.fluss.record.ProjectionPushdownCache;
 import org.apache.fluss.record.TestData;
 import org.apache.fluss.record.TestingSchemaGetter;
 import org.apache.fluss.record.bytesview.MultiBytesView;
@@ -651,9 +652,10 @@ class KvTabletTest {
         TablePath tablePath = TablePath.of("testDb", tableName);
         initLogTabletAndKvTablet(tablePath, DATA1_SCHEMA_PK, config);
         RowType rowType = DATA1_SCHEMA_PK.getRowType();
+        ProjectionPushdownCache projectionCache = new 
ProjectionPushdownCache();
         FileLogProjection logProjection = null;
         if (doProjection) {
-            logProjection = new FileLogProjection();
+            logProjection = new FileLogProjection(projectionCache);
             logProjection.setCurrentProjection(
                     0L, schemaGetter, DEFAULT_COMPRESSION, new int[] {0});
         }
@@ -749,7 +751,7 @@ class KvTabletTest {
         schemaGetter.updateLatestSchemaInfo(new SchemaInfo(DATA2_SCHEMA, 2));
         readLogRowType = doProjection ? DATA2_ROW_TYPE.project(new int[] {0}) 
: DATA2_ROW_TYPE;
         if (doProjection) {
-            logProjection = new FileLogProjection();
+            logProjection = new FileLogProjection(projectionCache);
             logProjection.setCurrentProjection(
                     0L, schemaGetter, DEFAULT_COMPRESSION, new int[] {0});
         }
@@ -834,9 +836,10 @@ class KvTabletTest {
         KvRecordTestUtils.KvRecordFactory kvRecordFactory =
                 KvRecordTestUtils.KvRecordFactory.of(rowType);
 
+        ProjectionPushdownCache projectionCache = new 
ProjectionPushdownCache();
         FileLogProjection logProjection = null;
         if (doProjection) {
-            logProjection = new FileLogProjection();
+            logProjection = new FileLogProjection(projectionCache);
             logProjection.setCurrentProjection(
                     0L, schemaGetter, DEFAULT_COMPRESSION, new int[] {0});
         }
@@ -956,7 +959,7 @@ class KvTabletTest {
                         ? newSchema.getRowType().project(new int[] {0})
                         : newSchema.getRowType();
         if (doProjection) {
-            logProjection = new FileLogProjection();
+            logProjection = new FileLogProjection(projectionCache);
             logProjection.setCurrentProjection(
                     0L, schemaGetter, DEFAULT_COMPRESSION, new int[] {0});
         }
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/log/FetchParamsTest.java 
b/fluss-server/src/test/java/org/apache/fluss/server/log/FetchParamsTest.java
index 132de4482..d4c2be57b 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/log/FetchParamsTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/log/FetchParamsTest.java
@@ -19,6 +19,7 @@ package org.apache.fluss.server.log;
 
 import org.apache.fluss.metadata.SchemaInfo;
 import org.apache.fluss.record.FileLogProjection;
+import org.apache.fluss.record.ProjectionPushdownCache;
 import org.apache.fluss.record.TestData;
 import org.apache.fluss.record.TestingSchemaGetter;
 
@@ -33,13 +34,15 @@ class FetchParamsTest {
     @Test
     void testSetCurrentFetch() {
         FetchParams fetchParams = new FetchParams(1, 100);
+        ProjectionPushdownCache projectionCache = new 
ProjectionPushdownCache();
         fetchParams.setCurrentFetch(
                 1L,
                 20L,
                 1024,
                 new TestingSchemaGetter(new SchemaInfo(TestData.DATA1_SCHEMA, 
(short) 1)),
                 DEFAULT_COMPRESSION,
-                null);
+                null,
+                projectionCache);
         assertThat(fetchParams.fetchOffset()).isEqualTo(20L);
         assertThat(fetchParams.maxFetchBytes()).isEqualTo(1024);
         assertThat(fetchParams.projection()).isNull();
@@ -50,7 +53,8 @@ class FetchParamsTest {
                 512,
                 new TestingSchemaGetter(new SchemaInfo(TestData.DATA2_SCHEMA, 
(short) 1)),
                 DEFAULT_COMPRESSION,
-                new int[] {0, 2});
+                new int[] {0, 2},
+                projectionCache);
         assertThat(fetchParams.fetchOffset()).isEqualTo(30L);
         assertThat(fetchParams.maxFetchBytes()).isEqualTo(512);
         assertThat(fetchParams.projection()).isNotNull();
@@ -63,7 +67,8 @@ class FetchParamsTest {
                 256,
                 new TestingSchemaGetter(new SchemaInfo(TestData.DATA1_SCHEMA, 
(short) 1)),
                 DEFAULT_COMPRESSION,
-                null);
+                null,
+                projectionCache);
         assertThat(fetchParams.projection()).isNull();
 
         fetchParams.setCurrentFetch(
@@ -72,7 +77,8 @@ class FetchParamsTest {
                 512,
                 new TestingSchemaGetter(new SchemaInfo(TestData.DATA2_SCHEMA, 
(short) 1)),
                 DEFAULT_COMPRESSION,
-                new int[] {0, 2});
+                new int[] {0, 2},
+                projectionCache);
         // the FileLogProjection should be cached
         
assertThat(fetchParams.projection()).isNotNull().isSameAs(prevProjection);
     }
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java
index e45a5557c..fcb4ec88c 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java
@@ -31,6 +31,7 @@ import org.apache.fluss.record.KvRecordTestUtils;
 import org.apache.fluss.record.LogRecordBatch;
 import org.apache.fluss.record.LogRecords;
 import org.apache.fluss.record.MemoryLogRecords;
+import org.apache.fluss.record.ProjectionPushdownCache;
 import org.apache.fluss.server.entity.NotifyLeaderAndIsrData;
 import org.apache.fluss.server.kv.KvTablet;
 import org.apache.fluss.server.kv.snapshot.CompletedSnapshot;
@@ -137,8 +138,15 @@ final class ReplicaTest extends ReplicaTestBase {
                         (int)
                                 
conf.get(ConfigOptions.CLIENT_SCANNER_LOG_FETCH_MAX_BYTES)
                                         .getBytes());
+        ProjectionPushdownCache projectionCache = new 
ProjectionPushdownCache();
         fetchParams.setCurrentFetch(
-                DATA1_TABLE_ID, 0, Integer.MAX_VALUE, schemaGetter, 
DEFAULT_COMPRESSION, null);
+                DATA1_TABLE_ID,
+                0,
+                Integer.MAX_VALUE,
+                schemaGetter,
+                DEFAULT_COMPRESSION,
+                null,
+                projectionCache);
         LogReadInfo logReadInfo = logReplica.fetchRecords(fetchParams);
         assertLogRecordsEquals(
                 DATA1_ROW_TYPE, logReadInfo.getFetchedData().getRecords(), 
DATA1, schemaGetter);
@@ -163,7 +171,8 @@ final class ReplicaTest extends ReplicaTestBase {
                 Integer.MAX_VALUE,
                 schemaGetter,
                 DEFAULT_COMPRESSION,
-                null);
+                null,
+                projectionCache);
         logReadInfo = logReplica.fetchRecords(fetchParams);
         assertLogRecordsEquals(
                 2, DATA2_ROW_TYPE, logReadInfo.getFetchedData().getRecords(), 
DATA2, schemaGetter);
@@ -808,7 +817,8 @@ final class ReplicaTest extends ReplicaTestBase {
                 Integer.MAX_VALUE,
                 replica.getSchemaGetter(),
                 DEFAULT_COMPRESSION,
-                null);
+                null,
+                new ProjectionPushdownCache());
         LogReadInfo logReadInfo = replica.fetchRecords(fetchParams);
         return logReadInfo.getFetchedData().getRecords();
     }

Reply via email to