This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new d9a31da7d0 [core] Add withLevel method in SnapshotReader to optimize 
RO table scanning (#5412)
d9a31da7d0 is described below

commit d9a31da7d01e911be4830ef874e40a20121fc5cc
Author: tsreaper <[email protected]>
AuthorDate: Tue Apr 8 12:59:36 2025 +0800

    [core] Add withLevel method in SnapshotReader to optimize RO table scanning 
(#5412)
---
 .../apache/paimon/operation/AbstractFileStoreScan.java    | 15 ++++++++++++++-
 .../java/org/apache/paimon/operation/FileStoreScan.java   |  2 ++
 .../java/org/apache/paimon/operation/ManifestsReader.java | 15 +++++++++++++++
 .../paimon/table/source/snapshot/SnapshotReader.java      |  3 +++
 .../paimon/table/source/snapshot/SnapshotReaderImpl.java  |  6 ++++++
 .../org/apache/paimon/table/system/AuditLogTable.java     |  6 ++++++
 .../apache/paimon/table/system/ReadOptimizedTable.java    |  8 ++++----
 .../snapshot/CompactionChangelogFollowUpScannerTest.java  |  2 +-
 8 files changed, 51 insertions(+), 6 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index ffa88bb259..ec856e57b5 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -83,6 +83,7 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
     private Filter<Integer> bucketFilter = null;
     private BiFilter<Integer, Integer> totalAwareBucketFilter = null;
     protected ScanMode scanMode = ScanMode.ALL;
+    private Integer specifiedLevel = null;
     private Filter<Integer> levelFilter = null;
     private Filter<ManifestEntry> manifestEntryFilter = null;
     private Filter<String> fileNameFilter = null;
@@ -191,6 +192,13 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
         return this;
     }
 
+    @Override
+    public FileStoreScan withLevel(int level) {
+        manifestsReader.withLevel(level);
+        this.specifiedLevel = level;
+        return this;
+    }
+
     @Override
     public FileStoreScan withLevelFilter(Filter<Integer> levelFilter) {
         this.levelFilter = levelFilter;
@@ -524,7 +532,12 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
                 return false;
             }
 
-            if (levelFilter != null && 
!levelFilter.test(levelGetter.apply(row))) {
+            int level = levelGetter.apply(row);
+            if (specifiedLevel != null && level != specifiedLevel) {
+                return false;
+            }
+
+            if (levelFilter != null && !levelFilter.test(level)) {
                 return false;
             }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
index 3b77c0a1a1..e3759960f8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
@@ -72,6 +72,8 @@ public interface FileStoreScan {
 
     FileStoreScan withKind(ScanMode scanMode);
 
+    FileStoreScan withLevel(int level);
+
     FileStoreScan withLevelFilter(Filter<Integer> levelFilter);
 
     FileStoreScan enableValueFilter();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java
index afcbb82b73..d58bb797e3 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java
@@ -50,6 +50,7 @@ public class ManifestsReader {
 
     private boolean onlyReadRealBuckets = false;
     @Nullable private Integer specifiedBucket = null;
+    @Nullable private Integer specifiedLevel = null;
     @Nullable private PartitionPredicate partitionFilter = null;
 
     public ManifestsReader(
@@ -73,6 +74,11 @@ public class ManifestsReader {
         return this;
     }
 
+    public ManifestsReader withLevel(int level) {
+        this.specifiedLevel = level;
+        return this;
+    }
+
     public ManifestsReader withPartitionFilter(Predicate predicate) {
         this.partitionFilter = PartitionPredicate.fromPredicate(partitionType, 
predicate);
         return this;
@@ -147,6 +153,15 @@ public class ManifestsReader {
             }
         }
 
+        Integer minLevel = manifest.minLevel();
+        Integer maxLevel = manifest.maxLevel();
+        if (minLevel != null && maxLevel != null) {
+            if (specifiedLevel != null
+                    && (specifiedLevel < minLevel || specifiedLevel > 
maxLevel)) {
+                return false;
+            }
+        }
+
         if (partitionFilter == null) {
             return true;
         }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
index 0cc4208830..a8504f3db3 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
@@ -80,6 +80,8 @@ public interface SnapshotReader {
 
     SnapshotReader withMode(ScanMode scanMode);
 
+    SnapshotReader withLevel(int level);
+
     SnapshotReader withLevelFilter(Filter<Integer> levelFilter);
 
     SnapshotReader enableValueFilter();
@@ -131,6 +133,7 @@ public interface SnapshotReader {
         /** Result splits. */
         List<Split> splits();
 
+        @SuppressWarnings({"unchecked", "rawtypes"})
         default List<DataSplit> dataSplits() {
             return (List) splits();
         }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index 931e9d4eba..93002027e3 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -245,6 +245,12 @@ public class SnapshotReaderImpl implements SnapshotReader {
         return this;
     }
 
+    @Override
+    public SnapshotReader withLevel(int level) {
+        scan.withLevel(level);
+        return this;
+    }
+
     @Override
     public SnapshotReader withLevelFilter(Filter<Integer> levelFilter) {
         scan.withLevelFilter(levelFilter);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index 57e18fe462..28b10bddd2 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -335,6 +335,12 @@ public class AuditLogTable implements DataTable, 
ReadonlyTable {
             return this;
         }
 
+        @Override
+        public SnapshotReader withLevel(int level) {
+            wrapped.withLevel(level);
+            return this;
+        }
+
         @Override
         public SnapshotReader withLevelFilter(Filter<Integer> levelFilter) {
             wrapped.withLevelFilter(levelFilter);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
index fc9527c195..1e252fc088 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
@@ -120,9 +120,9 @@ public class ReadOptimizedTable implements DataTable, 
ReadonlyTable {
 
     @Override
     public SnapshotReader newSnapshotReader() {
-        if (wrapped.schema().primaryKeys().size() > 0) {
+        if (!wrapped.schema().primaryKeys().isEmpty()) {
             return wrapped.newSnapshotReader()
-                    .withLevelFilter(level -> level == 
coreOptions().numLevels() - 1)
+                    .withLevel(coreOptions().numLevels() - 1)
                     .enableValueFilter();
         } else {
             return wrapped.newSnapshotReader();
@@ -132,7 +132,7 @@ public class ReadOptimizedTable implements DataTable, 
ReadonlyTable {
     @Override
     public DataTableBatchScan newScan() {
         return new DataTableBatchScan(
-                wrapped.schema().primaryKeys().size() > 0,
+                !wrapped.schema().primaryKeys().isEmpty(),
                 coreOptions(),
                 newSnapshotReader(),
                 DefaultValueAssigner.create(wrapped.schema()));
@@ -140,7 +140,7 @@ public class ReadOptimizedTable implements DataTable, 
ReadonlyTable {
 
     @Override
     public StreamDataTableScan newStreamScan() {
-        if (wrapped.schema().primaryKeys().size() > 0) {
+        if (!wrapped.schema().primaryKeys().isEmpty()) {
             throw new UnsupportedOperationException(
                     "Unsupported streaming scan for read optimized table");
         }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactionChangelogFollowUpScannerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactionChangelogFollowUpScannerTest.java
index a51d24eaf4..6a825c9a00 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactionChangelogFollowUpScannerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactionChangelogFollowUpScannerTest.java
@@ -67,7 +67,7 @@ public class CompactionChangelogFollowUpScannerTest extends 
ScannerTestBase {
 
         assertThat(snapshotManager.latestSnapshotId()).isEqualTo(5);
 
-        snapshotReader.withLevelFilter(level -> level == 
table.coreOptions().numLevels() - 1);
+        snapshotReader.withLevel(table.coreOptions().numLevels() - 1);
         TableRead read = table.newRead();
         ChangelogFollowUpScanner scanner = new ChangelogFollowUpScanner();
 

Reply via email to