This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 7461e3b1b1 [core] filter irrelevant column files in 
DataEvolutionFileStoreScan (#6647)
7461e3b1b1 is described below

commit 7461e3b1b1eecddeb3462252eb3787424036728d
Author: Faiz <[email protected]>
AuthorDate: Mon Nov 24 21:39:13 2025 +0800

    [core] filter irrelevant column files in DataEvolutionFileStoreScan (#6647)
---
 .../paimon/operation/AbstractFileStoreScan.java    |  6 ++++
 .../operation/DataEvolutionFileStoreScan.java      | 33 ++++++++++++++++++++--
 .../org/apache/paimon/operation/FileStoreScan.java |  3 ++
 .../paimon/table/source/AbstractDataTableScan.java |  1 +
 .../table/source/snapshot/SnapshotReader.java      |  3 ++
 .../table/source/snapshot/SnapshotReaderImpl.java  |  7 +++++
 .../apache/paimon/table/system/AuditLogTable.java  |  6 ++++
 .../paimon/table/DataEvolutionTableTest.java       | 32 ++++++++++++++++++++-
 8 files changed, 88 insertions(+), 3 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index cd1b86c740..065c8471bf 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -38,6 +38,7 @@ import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.source.ScanMode;
+import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.BiFilter;
 import org.apache.paimon.utils.Filter;
 import org.apache.paimon.utils.Pair;
@@ -244,6 +245,11 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
         return this;
     }
 
+    @Override
+    public FileStoreScan withReadType(RowType readType) {
+        return this;
+    }
+
     @Nullable
     @Override
     public Integer parallelism() {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
index 0e2ab2ab19..97812d8ffb 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
@@ -33,9 +33,12 @@ import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.stats.SimpleStats;
 import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.RangeHelper;
 import org.apache.paimon.utils.SnapshotManager;
 
+import javax.annotation.Nullable;
+
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Comparator;
@@ -50,6 +53,7 @@ import static 
org.apache.paimon.format.blob.BlobFileFormat.isBlobFile;
 public class DataEvolutionFileStoreScan extends AppendOnlyFileStoreScan {
 
     private boolean dropStats = false;
+    @Nullable private RowType readType;
 
     public DataEvolutionFileStoreScan(
             ManifestsReader manifestsReader,
@@ -121,6 +125,14 @@ public class DataEvolutionFileStoreScan extends 
AppendOnlyFileStoreScan {
         return false;
     }
 
+    @Override
+    public FileStoreScan withReadType(RowType readType) {
+        if (readType != null && !readType.getFields().isEmpty()) {
+            this.readType = readType;
+        }
+        return this;
+    }
+
     @Override
     protected List<ManifestEntry> 
postFilterManifestEntries(List<ManifestEntry> entries) {
         if (inputFilter == null) {
@@ -239,19 +251,36 @@ public class DataEvolutionFileStoreScan extends 
AppendOnlyFileStoreScan {
     /** Note: Keep this thread-safe. */
     @Override
     protected boolean filterByStats(ManifestEntry entry) {
+        DataFileMeta file = entry.file();
+
+        if (readType != null) {
+            boolean containsReadCol = false;
+            RowType fileType =
+                    
scanTableSchema(file.schemaId()).project(file.writeCols()).logicalRowType();
+            for (String field : readType.getFieldNames()) {
+                if (fileType.containsField(field)) {
+                    containsReadCol = true;
+                    break;
+                }
+            }
+            if (!containsReadCol) {
+                return false;
+            }
+        }
+
         // If indices is null, all entries should be kept
         if (this.rowIdList == null) {
             return true;
         }
 
         // If entry.firstRowId does not exist, keep the entry
-        Long firstRowId = entry.file().firstRowId();
+        Long firstRowId = file.firstRowId();
         if (firstRowId == null) {
             return true;
         }
 
         // Check if any value in indices is in the range [firstRowId, 
firstRowId + rowCount)
-        long rowCount = entry.file().rowCount();
+        long rowCount = file.rowCount();
         long endRowId = firstRowId + rowCount;
 
         for (Long index : this.rowIdList) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
index eb52fb645f..d57dacaff7 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
@@ -30,6 +30,7 @@ import org.apache.paimon.operation.metrics.ScanMetrics;
 import org.apache.paimon.partition.PartitionPredicate;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.table.source.ScanMode;
+import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.BiFilter;
 import org.apache.paimon.utils.Filter;
 
@@ -91,6 +92,8 @@ public interface FileStoreScan {
 
     FileStoreScan withRowIds(List<Long> indices);
 
+    FileStoreScan withReadType(RowType readType);
+
     @Nullable
     Integer parallelism();
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
index cdbb5bc3c4..7270e1ed4c 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
@@ -119,6 +119,7 @@ abstract class AbstractDataTableScan implements 
DataTableScan {
     @Override
     public InnerTableScan withReadType(@Nullable RowType readType) {
         this.readType = readType;
+        snapshotReader.withReadType(readType);
         return this;
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
index 2e9b43c9fc..0ae4fe28e6 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
@@ -34,6 +34,7 @@ import org.apache.paimon.table.source.ScanMode;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.SplitGenerator;
 import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.BiFilter;
 import org.apache.paimon.utils.ChangelogManager;
 import org.apache.paimon.utils.FileStorePathFactory;
@@ -112,6 +113,8 @@ public interface SnapshotReader {
 
     SnapshotReader withRowIds(List<Long> indices);
 
+    SnapshotReader withReadType(RowType readType);
+
     /** Get splits plan from snapshot. */
     Plan read();
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index df3435589d..d92a7537e3 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -49,6 +49,7 @@ import org.apache.paimon.table.source.DeletionFile;
 import org.apache.paimon.table.source.PlanImpl;
 import org.apache.paimon.table.source.ScanMode;
 import org.apache.paimon.table.source.SplitGenerator;
+import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.BiFilter;
 import org.apache.paimon.utils.ChangelogManager;
 import org.apache.paimon.utils.DVMetaCache;
@@ -312,6 +313,12 @@ public class SnapshotReaderImpl implements SnapshotReader {
         return this;
     }
 
+    @Override
+    public SnapshotReader withReadType(RowType readType) {
+        scan.withReadType(readType);
+        return this;
+    }
+
     @Override
     public SnapshotReader withDataFileNameFilter(Filter<String> 
fileNameFilter) {
         scan.withDataFileNameFilter(fileNameFilter);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index 018b6add66..63561ba0de 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -431,6 +431,12 @@ public class AuditLogTable implements DataTable, 
ReadonlyTable {
             return this;
         }
 
+        @Override
+        public SnapshotReader withReadType(RowType readType) {
+            wrapped.withReadType(readType);
+            return this;
+        }
+
         @Override
         public Plan read() {
             return wrapped.read();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
index 1a9613860c..b094dc2604 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
@@ -646,6 +646,37 @@ public class DataEvolutionTableTest extends TableTestBase {
                     i.getAndIncrement();
                 });
         assertThat(i.get()).isEqualTo(2);
+
+        RowType writeType1 = 
schema.rowType().project(Collections.singletonList("f2"));
+        try (BatchTableWrite write1 = 
builder.newWrite().withWriteType(writeType1)) {
+            write1.write(GenericRow.of(BinaryString.fromString("a2")));
+            write1.write(GenericRow.of(BinaryString.fromString("b2")));
+
+            BatchTableCommit commit = builder.newCommit();
+            List<CommitMessage> commitables = write1.prepareCommit();
+            setFirstRowId(commitables, 0L);
+            commit.commit(commitables);
+        }
+
+        List<Long> rowIds10 = Collections.singletonList(0L);
+        List<Split> split10 = 
readBuilder.withRowIds(rowIds10).newScan().plan().splits();
+
+        // without projection, all datafiles needed to assemble a row should 
be scanned out
+        List<DataFileMeta> fileMetas10 = ((DataSplit) 
split10.get(0)).dataFiles();
+        assertThat(fileMetas10.size()).isEqualTo(2);
+
+        List<Long> rowIds11 = Collections.singletonList(0L);
+        List<Split> split11 =
+                readBuilder
+                        .withRowIds(rowIds11)
+                        .withProjection(new int[] {0})
+                        .newScan()
+                        .plan()
+                        .splits();
+
+        // with projection, irrelevant datafiles should be filtered
+        List<DataFileMeta> fileMetas11 = ((DataSplit) 
split11.get(0)).dataFiles();
+        assertThat(fileMetas11.size()).isEqualTo(1);
     }
 
     @Test
@@ -796,7 +827,6 @@ public class DataEvolutionTableTest extends TableTestBase {
                                         table.rowType().project("f1")));
         RecordReader<InternalRow> reader =
                 
readBuilder.newRead().createReader(readBuilder.newScan().plan());
-        
Assertions.assertThat(reader).isInstanceOf(DataEvolutionFileReader.class);
 
         GlobalIndexFileReadWrite indexFileReadWrite =
                 new GlobalIndexFileReadWrite(

Reply via email to