This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 7461e3b1b1 [core] filter irrelevant column files in
DataEvolutionFileStoreScan (#6647)
7461e3b1b1 is described below
commit 7461e3b1b1eecddeb3462252eb3787424036728d
Author: Faiz <[email protected]>
AuthorDate: Mon Nov 24 21:39:13 2025 +0800
[core] filter irrelevant column files in DataEvolutionFileStoreScan (#6647)
---
.../paimon/operation/AbstractFileStoreScan.java | 6 ++++
.../operation/DataEvolutionFileStoreScan.java | 33 ++++++++++++++++++++--
.../org/apache/paimon/operation/FileStoreScan.java | 3 ++
.../paimon/table/source/AbstractDataTableScan.java | 1 +
.../table/source/snapshot/SnapshotReader.java | 3 ++
.../table/source/snapshot/SnapshotReaderImpl.java | 7 +++++
.../apache/paimon/table/system/AuditLogTable.java | 6 ++++
.../paimon/table/DataEvolutionTableTest.java | 32 ++++++++++++++++++++-
8 files changed, 88 insertions(+), 3 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index cd1b86c740..065c8471bf 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -38,6 +38,7 @@ import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.source.ScanMode;
+import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.BiFilter;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.Pair;
@@ -244,6 +245,11 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
return this;
}
+ @Override
+ public FileStoreScan withReadType(RowType readType) {
+ return this;
+ }
+
@Nullable
@Override
public Integer parallelism() {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
index 0e2ab2ab19..97812d8ffb 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
@@ -33,9 +33,12 @@ import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.stats.SimpleStats;
import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.RangeHelper;
import org.apache.paimon.utils.SnapshotManager;
+import javax.annotation.Nullable;
+
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
@@ -50,6 +53,7 @@ import static
org.apache.paimon.format.blob.BlobFileFormat.isBlobFile;
public class DataEvolutionFileStoreScan extends AppendOnlyFileStoreScan {
private boolean dropStats = false;
+ @Nullable private RowType readType;
public DataEvolutionFileStoreScan(
ManifestsReader manifestsReader,
@@ -121,6 +125,14 @@ public class DataEvolutionFileStoreScan extends
AppendOnlyFileStoreScan {
return false;
}
+ @Override
+ public FileStoreScan withReadType(RowType readType) {
+ if (readType != null && !readType.getFields().isEmpty()) {
+ this.readType = readType;
+ }
+ return this;
+ }
+
@Override
protected List<ManifestEntry>
postFilterManifestEntries(List<ManifestEntry> entries) {
if (inputFilter == null) {
@@ -239,19 +251,36 @@ public class DataEvolutionFileStoreScan extends
AppendOnlyFileStoreScan {
/** Note: Keep this thread-safe. */
@Override
protected boolean filterByStats(ManifestEntry entry) {
+ DataFileMeta file = entry.file();
+
+ if (readType != null) {
+ boolean containsReadCol = false;
+ RowType fileType =
+
scanTableSchema(file.schemaId()).project(file.writeCols()).logicalRowType();
+ for (String field : readType.getFieldNames()) {
+ if (fileType.containsField(field)) {
+ containsReadCol = true;
+ break;
+ }
+ }
+ if (!containsReadCol) {
+ return false;
+ }
+ }
+
// If indices is null, all entries should be kept
if (this.rowIdList == null) {
return true;
}
// If entry.firstRowId does not exist, keep the entry
- Long firstRowId = entry.file().firstRowId();
+ Long firstRowId = file.firstRowId();
if (firstRowId == null) {
return true;
}
// Check if any value in indices is in the range [firstRowId,
firstRowId + rowCount)
- long rowCount = entry.file().rowCount();
+ long rowCount = file.rowCount();
long endRowId = firstRowId + rowCount;
for (Long index : this.rowIdList) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
index eb52fb645f..d57dacaff7 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
@@ -30,6 +30,7 @@ import org.apache.paimon.operation.metrics.ScanMetrics;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.source.ScanMode;
+import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.BiFilter;
import org.apache.paimon.utils.Filter;
@@ -91,6 +92,8 @@ public interface FileStoreScan {
FileStoreScan withRowIds(List<Long> indices);
+ FileStoreScan withReadType(RowType readType);
+
@Nullable
Integer parallelism();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
index cdbb5bc3c4..7270e1ed4c 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
@@ -119,6 +119,7 @@ abstract class AbstractDataTableScan implements
DataTableScan {
@Override
public InnerTableScan withReadType(@Nullable RowType readType) {
this.readType = readType;
+ snapshotReader.withReadType(readType);
return this;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
index 2e9b43c9fc..0ae4fe28e6 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
@@ -34,6 +34,7 @@ import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.SplitGenerator;
import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.BiFilter;
import org.apache.paimon.utils.ChangelogManager;
import org.apache.paimon.utils.FileStorePathFactory;
@@ -112,6 +113,8 @@ public interface SnapshotReader {
SnapshotReader withRowIds(List<Long> indices);
+ SnapshotReader withReadType(RowType readType);
+
/** Get splits plan from snapshot. */
Plan read();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index df3435589d..d92a7537e3 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -49,6 +49,7 @@ import org.apache.paimon.table.source.DeletionFile;
import org.apache.paimon.table.source.PlanImpl;
import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.table.source.SplitGenerator;
+import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.BiFilter;
import org.apache.paimon.utils.ChangelogManager;
import org.apache.paimon.utils.DVMetaCache;
@@ -312,6 +313,12 @@ public class SnapshotReaderImpl implements SnapshotReader {
return this;
}
+ @Override
+ public SnapshotReader withReadType(RowType readType) {
+ scan.withReadType(readType);
+ return this;
+ }
+
@Override
public SnapshotReader withDataFileNameFilter(Filter<String>
fileNameFilter) {
scan.withDataFileNameFilter(fileNameFilter);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index 018b6add66..63561ba0de 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -431,6 +431,12 @@ public class AuditLogTable implements DataTable,
ReadonlyTable {
return this;
}
+ @Override
+ public SnapshotReader withReadType(RowType readType) {
+ wrapped.withReadType(readType);
+ return this;
+ }
+
@Override
public Plan read() {
return wrapped.read();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
index 1a9613860c..b094dc2604 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
@@ -646,6 +646,37 @@ public class DataEvolutionTableTest extends TableTestBase {
i.getAndIncrement();
});
assertThat(i.get()).isEqualTo(2);
+
+ RowType writeType1 =
schema.rowType().project(Collections.singletonList("f2"));
+ try (BatchTableWrite write1 =
builder.newWrite().withWriteType(writeType1)) {
+ write1.write(GenericRow.of(BinaryString.fromString("a2")));
+ write1.write(GenericRow.of(BinaryString.fromString("b2")));
+
+ BatchTableCommit commit = builder.newCommit();
+ List<CommitMessage> commitables = write1.prepareCommit();
+ setFirstRowId(commitables, 0L);
+ commit.commit(commitables);
+ }
+
+ List<Long> rowIds10 = Collections.singletonList(0L);
+ List<Split> split10 =
readBuilder.withRowIds(rowIds10).newScan().plan().splits();
+
+ // without projection, all datafiles needed to assemble a row should
be scanned out
+ List<DataFileMeta> fileMetas10 = ((DataSplit)
split10.get(0)).dataFiles();
+ assertThat(fileMetas10.size()).isEqualTo(2);
+
+ List<Long> rowIds11 = Collections.singletonList(0L);
+ List<Split> split11 =
+ readBuilder
+ .withRowIds(rowIds11)
+ .withProjection(new int[] {0})
+ .newScan()
+ .plan()
+ .splits();
+
+ // with projection, irrelevant datafiles should be filtered
+ List<DataFileMeta> fileMetas11 = ((DataSplit)
split11.get(0)).dataFiles();
+ assertThat(fileMetas11.size()).isEqualTo(1);
}
@Test
@@ -796,7 +827,6 @@ public class DataEvolutionTableTest extends TableTestBase {
table.rowType().project("f1")));
RecordReader<InternalRow> reader =
readBuilder.newRead().createReader(readBuilder.newScan().plan());
-
Assertions.assertThat(reader).isInstanceOf(DataEvolutionFileReader.class);
GlobalIndexFileReadWrite indexFileReadWrite =
new GlobalIndexFileReadWrite(