This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new d9a31da7d0 [core] Add withLevel method in SnapshotReader to optimize
RO table scanning (#5412)
d9a31da7d0 is described below
commit d9a31da7d01e911be4830ef874e40a20121fc5cc
Author: tsreaper <[email protected]>
AuthorDate: Tue Apr 8 12:59:36 2025 +0800
[core] Add withLevel method in SnapshotReader to optimize RO table scanning
(#5412)
---
.../apache/paimon/operation/AbstractFileStoreScan.java | 15 ++++++++++++++-
.../java/org/apache/paimon/operation/FileStoreScan.java | 2 ++
.../java/org/apache/paimon/operation/ManifestsReader.java | 15 +++++++++++++++
.../paimon/table/source/snapshot/SnapshotReader.java | 3 +++
.../paimon/table/source/snapshot/SnapshotReaderImpl.java | 6 ++++++
.../org/apache/paimon/table/system/AuditLogTable.java | 6 ++++++
.../apache/paimon/table/system/ReadOptimizedTable.java | 8 ++++----
.../snapshot/CompactionChangelogFollowUpScannerTest.java | 2 +-
8 files changed, 51 insertions(+), 6 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index ffa88bb259..ec856e57b5 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -83,6 +83,7 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
private Filter<Integer> bucketFilter = null;
private BiFilter<Integer, Integer> totalAwareBucketFilter = null;
protected ScanMode scanMode = ScanMode.ALL;
+ private Integer specifiedLevel = null;
private Filter<Integer> levelFilter = null;
private Filter<ManifestEntry> manifestEntryFilter = null;
private Filter<String> fileNameFilter = null;
@@ -191,6 +192,13 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
return this;
}
+ @Override
+ public FileStoreScan withLevel(int level) {
+ manifestsReader.withLevel(level);
+ this.specifiedLevel = level;
+ return this;
+ }
+
@Override
public FileStoreScan withLevelFilter(Filter<Integer> levelFilter) {
this.levelFilter = levelFilter;
@@ -524,7 +532,12 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
return false;
}
- if (levelFilter != null &&
!levelFilter.test(levelGetter.apply(row))) {
+ int level = levelGetter.apply(row);
+ if (specifiedLevel != null && level != specifiedLevel) {
+ return false;
+ }
+
+ if (levelFilter != null && !levelFilter.test(level)) {
return false;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
index 3b77c0a1a1..e3759960f8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
@@ -72,6 +72,8 @@ public interface FileStoreScan {
FileStoreScan withKind(ScanMode scanMode);
+ FileStoreScan withLevel(int level);
+
FileStoreScan withLevelFilter(Filter<Integer> levelFilter);
FileStoreScan enableValueFilter();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java
b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java
index afcbb82b73..d58bb797e3 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java
@@ -50,6 +50,7 @@ public class ManifestsReader {
private boolean onlyReadRealBuckets = false;
@Nullable private Integer specifiedBucket = null;
+ @Nullable private Integer specifiedLevel = null;
@Nullable private PartitionPredicate partitionFilter = null;
public ManifestsReader(
@@ -73,6 +74,11 @@ public class ManifestsReader {
return this;
}
+ public ManifestsReader withLevel(int level) {
+ this.specifiedLevel = level;
+ return this;
+ }
+
public ManifestsReader withPartitionFilter(Predicate predicate) {
this.partitionFilter = PartitionPredicate.fromPredicate(partitionType,
predicate);
return this;
@@ -147,6 +153,15 @@ public class ManifestsReader {
}
}
+ Integer minLevel = manifest.minLevel();
+ Integer maxLevel = manifest.maxLevel();
+ if (minLevel != null && maxLevel != null) {
+ if (specifiedLevel != null
+ && (specifiedLevel < minLevel || specifiedLevel >
maxLevel)) {
+ return false;
+ }
+ }
+
if (partitionFilter == null) {
return true;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
index 0cc4208830..a8504f3db3 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
@@ -80,6 +80,8 @@ public interface SnapshotReader {
SnapshotReader withMode(ScanMode scanMode);
+ SnapshotReader withLevel(int level);
+
SnapshotReader withLevelFilter(Filter<Integer> levelFilter);
SnapshotReader enableValueFilter();
@@ -131,6 +133,7 @@ public interface SnapshotReader {
/** Result splits. */
List<Split> splits();
+ @SuppressWarnings({"unchecked", "rawtypes"})
default List<DataSplit> dataSplits() {
return (List) splits();
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index 931e9d4eba..93002027e3 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -245,6 +245,12 @@ public class SnapshotReaderImpl implements SnapshotReader {
return this;
}
+ @Override
+ public SnapshotReader withLevel(int level) {
+ scan.withLevel(level);
+ return this;
+ }
+
@Override
public SnapshotReader withLevelFilter(Filter<Integer> levelFilter) {
scan.withLevelFilter(levelFilter);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index 57e18fe462..28b10bddd2 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -335,6 +335,12 @@ public class AuditLogTable implements DataTable,
ReadonlyTable {
return this;
}
+ @Override
+ public SnapshotReader withLevel(int level) {
+ wrapped.withLevel(level);
+ return this;
+ }
+
@Override
public SnapshotReader withLevelFilter(Filter<Integer> levelFilter) {
wrapped.withLevelFilter(levelFilter);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
index fc9527c195..1e252fc088 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
@@ -120,9 +120,9 @@ public class ReadOptimizedTable implements DataTable,
ReadonlyTable {
@Override
public SnapshotReader newSnapshotReader() {
- if (wrapped.schema().primaryKeys().size() > 0) {
+ if (!wrapped.schema().primaryKeys().isEmpty()) {
return wrapped.newSnapshotReader()
- .withLevelFilter(level -> level ==
coreOptions().numLevels() - 1)
+ .withLevel(coreOptions().numLevels() - 1)
.enableValueFilter();
} else {
return wrapped.newSnapshotReader();
@@ -132,7 +132,7 @@ public class ReadOptimizedTable implements DataTable,
ReadonlyTable {
@Override
public DataTableBatchScan newScan() {
return new DataTableBatchScan(
- wrapped.schema().primaryKeys().size() > 0,
+ !wrapped.schema().primaryKeys().isEmpty(),
coreOptions(),
newSnapshotReader(),
DefaultValueAssigner.create(wrapped.schema()));
@@ -140,7 +140,7 @@ public class ReadOptimizedTable implements DataTable,
ReadonlyTable {
@Override
public StreamDataTableScan newStreamScan() {
- if (wrapped.schema().primaryKeys().size() > 0) {
+ if (!wrapped.schema().primaryKeys().isEmpty()) {
throw new UnsupportedOperationException(
"Unsupported streaming scan for read optimized table");
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactionChangelogFollowUpScannerTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactionChangelogFollowUpScannerTest.java
index a51d24eaf4..6a825c9a00 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactionChangelogFollowUpScannerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactionChangelogFollowUpScannerTest.java
@@ -67,7 +67,7 @@ public class CompactionChangelogFollowUpScannerTest extends
ScannerTestBase {
assertThat(snapshotManager.latestSnapshotId()).isEqualTo(5);
- snapshotReader.withLevelFilter(level -> level ==
table.coreOptions().numLevels() - 1);
+ snapshotReader.withLevel(table.coreOptions().numLevels() - 1);
TableRead read = table.newRead();
ChangelogFollowUpScanner scanner = new ChangelogFollowUpScanner();