This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch bucket-stats
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit 4772101a86fc34be8cd0a1b0e3c6a90112d721fb
Author: tsreaper <[email protected]>
AuthorDate: Wed Mar 26 11:14:10 2025 +0800

    [fix] Fix comments
---
 .../org/apache/paimon/manifest/ManifestFile.java   |  8 ++++-
 .../apache/paimon/manifest/ManifestFileMeta.java   | 34 ++++++++++++++++++----
 .../manifest/ManifestFileMetaSerializer.java       |  8 +++--
 .../paimon/operation/AbstractFileStoreScan.java    | 19 ++++++------
 .../apache/paimon/operation/ManifestsReader.java   | 13 +++++----
 .../manifest/ManifestFileMetaSerializerTest.java   | 11 ++++++-
 .../paimon/manifest/ManifestTestDataGenerator.java | 14 +++++++--
 .../apache/paimon/flink/action/CompactAction.java  | 14 +--------
 8 files changed, 81 insertions(+), 40 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
index 2977b01b19..5453fbc107 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
@@ -131,6 +131,8 @@ public class ManifestFile extends 
ObjectsFile<ManifestEntry> {
         private long schemaId = Long.MIN_VALUE;
         private int minBucket = Integer.MAX_VALUE;
         private int maxBucket = Integer.MIN_VALUE;
+        private int minLevel = Integer.MAX_VALUE;
+        private int maxLevel = Integer.MIN_VALUE;
 
         ManifestEntryWriter(FormatWriterFactory factory, Path path, String 
fileCompression) {
             super(
@@ -161,6 +163,8 @@ public class ManifestFile extends 
ObjectsFile<ManifestEntry> {
             schemaId = Math.max(schemaId, entry.file().schemaId());
             minBucket = Math.min(minBucket, entry.bucket());
             maxBucket = Math.max(maxBucket, entry.bucket());
+            minLevel = Math.min(minLevel, entry.level());
+            maxLevel = Math.max(maxLevel, entry.level());
 
             partitionStatsCollector.collect(entry.partition());
         }
@@ -177,7 +181,9 @@ public class ManifestFile extends 
ObjectsFile<ManifestEntry> {
                             ? schemaId
                             : schemaManager.latest().get().id(),
                     minBucket,
-                    maxBucket);
+                    maxBucket,
+                    minLevel,
+                    maxLevel);
         }
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java
index 59fa0d2127..aa0156360a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java
@@ -52,7 +52,9 @@ public class ManifestFileMeta {
                             new DataField(4, "_PARTITION_STATS", 
SimpleStats.SCHEMA),
                             new DataField(5, "_SCHEMA_ID", new 
BigIntType(false)),
                             new DataField(6, "_MIN_BUCKET", new IntType(true)),
-                            new DataField(7, "_MAX_BUCKET", new 
IntType(true))));
+                            new DataField(7, "_MAX_BUCKET", new IntType(true)),
+                            new DataField(8, "_MIN_LEVEL", new IntType(true)),
+                            new DataField(9, "_MAX_LEVEL", new 
IntType(true))));
 
     private final String fileName;
     private final long fileSize;
@@ -62,6 +64,8 @@ public class ManifestFileMeta {
     private final long schemaId;
     private final @Nullable Integer minBucket;
     private final @Nullable Integer maxBucket;
+    private final @Nullable Integer minLevel;
+    private final @Nullable Integer maxLevel;
 
     public ManifestFileMeta(
             String fileName,
@@ -71,7 +75,9 @@ public class ManifestFileMeta {
             SimpleStats partitionStats,
             long schemaId,
             @Nullable Integer minBucket,
-            @Nullable Integer maxBucket) {
+            @Nullable Integer maxBucket,
+            @Nullable Integer minLevel,
+            @Nullable Integer maxLevel) {
         this.fileName = fileName;
         this.fileSize = fileSize;
         this.numAddedFiles = numAddedFiles;
@@ -80,6 +86,8 @@ public class ManifestFileMeta {
         this.schemaId = schemaId;
         this.minBucket = minBucket;
         this.maxBucket = maxBucket;
+        this.minLevel = minLevel;
+        this.maxLevel = maxLevel;
     }
 
     public String fileName() {
@@ -114,6 +122,14 @@ public class ManifestFileMeta {
         return maxBucket;
     }
 
+    public @Nullable Integer minLevel() {
+        return minLevel;
+    }
+
+    public @Nullable Integer maxLevel() {
+        return maxLevel;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (!(o instanceof ManifestFileMeta)) {
@@ -127,7 +143,9 @@ public class ManifestFileMeta {
                 && Objects.equals(partitionStats, that.partitionStats)
                 && schemaId == that.schemaId
                 && Objects.equals(minBucket, that.minBucket)
-                && Objects.equals(maxBucket, that.maxBucket);
+                && Objects.equals(maxBucket, that.maxBucket)
+                && Objects.equals(minLevel, that.minLevel)
+                && Objects.equals(maxLevel, that.maxLevel);
     }
 
     @Override
@@ -140,13 +158,15 @@ public class ManifestFileMeta {
                 partitionStats,
                 schemaId,
                 minBucket,
-                maxBucket);
+                maxBucket,
+                minLevel,
+                maxLevel);
     }
 
     @Override
     public String toString() {
         return String.format(
-                "{%s, %d, %d, %d, %s, %d, %s, %s}",
+                "{%s, %d, %d, %d, %s, %d, %s, %s, %s, %s}",
                 fileName,
                 fileSize,
                 numAddedFiles,
@@ -154,7 +174,9 @@ public class ManifestFileMeta {
                 partitionStats,
                 schemaId,
                 minBucket,
-                maxBucket);
+                maxBucket,
+                minLevel,
+                maxLevel);
     }
 
     // ----------------------- Serialization -----------------------------
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMetaSerializer.java
 
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMetaSerializer.java
index 3bd242ebf1..b4c8ac2d9b 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMetaSerializer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMetaSerializer.java
@@ -48,7 +48,9 @@ public class ManifestFileMetaSerializer extends 
VersionedObjectSerializer<Manife
                 meta.partitionStats().toRow(),
                 meta.schemaId(),
                 meta.minBucket(),
-                meta.maxBucket());
+                meta.maxBucket(),
+                meta.minLevel(),
+                meta.maxLevel());
     }
 
     @Override
@@ -68,6 +70,8 @@ public class ManifestFileMetaSerializer extends 
VersionedObjectSerializer<Manife
                 SimpleStats.fromRow(row.getRow(4, 3)),
                 row.getLong(5),
                 version >= 3 ? row.getInt(6) : null,
-                version >= 3 ? row.getInt(7) : null);
+                version >= 3 ? row.getInt(7) : null,
+                version >= 3 ? row.getInt(8) : null,
+                version >= 3 ? row.getInt(9) : null);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index 534d1c237b..47a46518f7 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -78,8 +78,8 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
     private final TableSchema schema;
 
     private Snapshot specifiedSnapshot = null;
-    private boolean onlyReadRealBuckets = false;
-    private Integer specifiedBucket = null;
+    private final Pair<Integer, Integer> specifiedBucketRange =
+            Pair.of(Integer.MIN_VALUE, Integer.MAX_VALUE);
     private Filter<Integer> bucketFilter = null;
     private BiFilter<Integer, Integer> totalAwareBucketFilter = null;
     protected ScanMode scanMode = ScanMode.ALL;
@@ -134,14 +134,16 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
 
     @Override
     public FileStoreScan onlyReadRealBuckets() {
-        this.onlyReadRealBuckets = true;
+        specifiedBucketRange.setLeft(Math.max(specifiedBucketRange.getLeft(), 
0));
+        manifestsReader.withBucketRange(specifiedBucketRange);
         return this;
     }
 
     @Override
     public FileStoreScan withBucket(int bucket) {
-        manifestsReader.withBucket(bucket);
-        specifiedBucket = bucket;
+        specifiedBucketRange.setLeft(Math.max(specifiedBucketRange.getLeft(), 
bucket));
+        
specifiedBucketRange.setRight(Math.min(specifiedBucketRange.getRight(), 
bucket));
+        manifestsReader.withBucketRange(specifiedBucketRange);
         return this;
     }
 
@@ -506,11 +508,8 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
             }
 
             int bucket = bucketGetter.apply(row);
-            if (onlyReadRealBuckets && bucket < 0) {
-                return false;
-            }
-
-            if (specifiedBucket != null && bucket != specifiedBucket) {
+            if (bucket < specifiedBucketRange.getLeft()
+                    || bucket > specifiedBucketRange.getRight()) {
                 return false;
             }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java
index 8a5cdf50e3..faf3f1362f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java
@@ -27,6 +27,7 @@ import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.stats.SimpleStats;
 import org.apache.paimon.table.source.ScanMode;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.SnapshotManager;
 
 import javax.annotation.Nullable;
@@ -49,7 +50,8 @@ public class ManifestsReader {
     private final ManifestList.Factory manifestListFactory;
 
     @Nullable private PartitionPredicate partitionFilter = null;
-    @Nullable private Integer specifiedBucket = null;
+    private Pair<Integer, Integer> specifiedBucketRange =
+            Pair.of(Integer.MIN_VALUE, Integer.MAX_VALUE);
 
     public ManifestsReader(
             RowType partitionType,
@@ -62,8 +64,8 @@ public class ManifestsReader {
         this.manifestListFactory = manifestListFactory;
     }
 
-    public ManifestsReader withBucket(int bucket) {
-        this.specifiedBucket = bucket;
+    public ManifestsReader withBucketRange(Pair<Integer, Integer> bucketRange) 
{
+        this.specifiedBucketRange = bucketRange;
         return this;
     }
 
@@ -131,8 +133,9 @@ public class ManifestsReader {
     private boolean filterManifestFileMeta(ManifestFileMeta manifest) {
         Integer minBucket = manifest.minBucket();
         Integer maxBucket = manifest.maxBucket();
-        if (specifiedBucket != null && minBucket != null && maxBucket != null) 
{
-            if (specifiedBucket < minBucket || specifiedBucket > maxBucket) {
+        if (minBucket != null && maxBucket != null) {
+            if (maxBucket < specifiedBucketRange.getLeft()
+                    || minBucket > specifiedBucketRange.getRight()) {
                 return false;
             }
         }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaSerializerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaSerializerTest.java
index ae1a610be0..cfaf788d2c 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaSerializerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaSerializerTest.java
@@ -71,7 +71,16 @@ public class ManifestFileMetaSerializerTest extends 
ObjectSerializerTestBase<Man
         SimpleStats partitionStats = new SimpleStats(minFields, maxFields, 
nullCounts);
         ManifestFileMeta meta =
                 new ManifestFileMeta(
-                        "test-manifest-file", 1024, 5, 2, partitionStats, 1, 
null, null);
+                        "test-manifest-file",
+                        1024,
+                        5,
+                        2,
+                        partitionStats,
+                        1,
+                        null,
+                        null,
+                        null,
+                        null);
 
         byte[] v2Bytes =
                 IOUtils.readFully(
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestTestDataGenerator.java
 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestTestDataGenerator.java
index 83f82869ff..959d3f9e3e 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestTestDataGenerator.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestTestDataGenerator.java
@@ -99,6 +99,10 @@ public class ManifestTestDataGenerator {
 
         long numAddedFiles = 0;
         long numDeletedFiles = 0;
+        int minBucket = Integer.MAX_VALUE;
+        int maxBucket = Integer.MIN_VALUE;
+        int minLevel = Integer.MAX_VALUE;
+        int maxLevel = Integer.MIN_VALUE;
         for (ManifestEntry entry : entries) {
             collector.collect(entry.partition());
             if (entry.kind() == FileKind.ADD) {
@@ -106,6 +110,10 @@ public class ManifestTestDataGenerator {
             } else {
                 numDeletedFiles++;
             }
+            minBucket = Math.min(minBucket, entry.bucket());
+            maxBucket = Math.max(maxBucket, entry.bucket());
+            minLevel = Math.min(minLevel, entry.level());
+            maxLevel = Math.max(maxLevel, entry.level());
         }
 
         return new ManifestFileMeta(
@@ -115,8 +123,10 @@ public class ManifestTestDataGenerator {
                 numDeletedFiles,
                 serializer.toBinaryAllMode(collector.extract()),
                 0,
-                0,
-                0);
+                minBucket,
+                maxBucket,
+                minLevel,
+                maxLevel);
     }
 
     private void mergeLevelsIfNeeded(BinaryRow partition, int bucket) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
index 79961e4340..900543cb5e 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
@@ -266,10 +266,8 @@ public class CompactAction extends TableActionBase {
             int bucketNum = 
options.get(FlinkConnectorOptions.POSTPONE_DEFAULT_BUCKET_NUM);
 
             Iterator<ManifestEntry> it =
-                    fileStoreTable
-                            .newSnapshotReader()
+                    table.newSnapshotReader()
                             
.withPartitionFilter(Collections.singletonList(partition))
-                            .withBucketFilter(new NormalBucketFilter())
                             .readFileIterator();
             if (it.hasNext()) {
                 bucketNum = it.next().totalBuckets();
@@ -314,16 +312,6 @@ public class CompactAction extends TableActionBase {
         return true;
     }
 
-    private static class NormalBucketFilter implements Filter<Integer>, 
Serializable {
-
-        private static final long serialVersionUID = 1L;
-
-        @Override
-        public boolean test(Integer bucket) {
-            return bucket >= 0;
-        }
-    }
-
     @Override
     public void run() throws Exception {
         if (buildImpl()) {

Reply via email to