This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 654c130b78 [core] Add bucket & level statistics in ManifestFileMeta 
(#5345)
654c130b78 is described below

commit 654c130b78d590d8cb2c137d9e540038716315d6
Author: tsreaper <[email protected]>
AuthorDate: Thu Mar 27 15:53:07 2025 +0800

    [core] Add bucket & level statistics in ManifestFileMeta (#5345)
---
 .../org/apache/paimon/manifest/ManifestFile.java   | 14 +++-
 .../apache/paimon/manifest/ManifestFileMeta.java   | 69 ++++++++++++++--
 .../manifest/ManifestFileMetaSerializer.java       | 13 ++-
 .../org/apache/paimon/manifest/ManifestList.java   |  7 +-
 .../paimon/operation/AbstractFileStoreScan.java    | 15 +++-
 .../apache/paimon/operation/ManifestsReader.java   | 24 ++++++
 .../paimon/table/source/AbstractDataTableScan.java |  3 +-
 .../apache/paimon/table/source/InnerTableScan.java |  4 +
 .../apache/paimon/table/source/ReadBuilder.java    |  2 +
 .../paimon/table/source/ReadBuilderImpl.java       | 10 +++
 .../LegacyManifestFileMetaSerializerPaimon10.java} | 34 ++++++--
 .../apache/paimon/manifest/ManifestListTest.java   | 96 ++++++++++++++++++----
 .../paimon/manifest/ManifestTestDataGenerator.java | 16 +++-
 .../apache/paimon/flink/action/CompactAction.java  | 34 ++------
 14 files changed, 273 insertions(+), 68 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
index 3bb1ddf480..5453fbc107 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
@@ -129,6 +129,10 @@ public class ManifestFile extends 
ObjectsFile<ManifestEntry> {
         private long numAddedFiles = 0;
         private long numDeletedFiles = 0;
         private long schemaId = Long.MIN_VALUE;
+        private int minBucket = Integer.MAX_VALUE;
+        private int maxBucket = Integer.MIN_VALUE;
+        private int minLevel = Integer.MAX_VALUE;
+        private int maxLevel = Integer.MIN_VALUE;
 
         ManifestEntryWriter(FormatWriterFactory factory, Path path, String 
fileCompression) {
             super(
@@ -157,6 +161,10 @@ public class ManifestFile extends 
ObjectsFile<ManifestEntry> {
                     throw new UnsupportedOperationException("Unknown entry 
kind: " + entry.kind());
             }
             schemaId = Math.max(schemaId, entry.file().schemaId());
+            minBucket = Math.min(minBucket, entry.bucket());
+            maxBucket = Math.max(maxBucket, entry.bucket());
+            minLevel = Math.min(minLevel, entry.level());
+            maxLevel = Math.max(maxLevel, entry.level());
 
             partitionStatsCollector.collect(entry.partition());
         }
@@ -171,7 +179,11 @@ public class ManifestFile extends 
ObjectsFile<ManifestEntry> {
                     
partitionStatsSerializer.toBinaryAllMode(partitionStatsCollector.extract()),
                     numAddedFiles + numDeletedFiles > 0
                             ? schemaId
-                            : schemaManager.latest().get().id());
+                            : schemaManager.latest().get().id(),
+                    minBucket,
+                    maxBucket,
+                    minLevel,
+                    maxLevel);
         }
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java
index 242abb1897..aa0156360a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java
@@ -22,9 +22,12 @@ import org.apache.paimon.annotation.Public;
 import org.apache.paimon.stats.SimpleStats;
 import org.apache.paimon.types.BigIntType;
 import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.IntType;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.types.VarCharType;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Objects;
@@ -47,7 +50,11 @@ public class ManifestFileMeta {
                             new DataField(2, "_NUM_ADDED_FILES", new 
BigIntType(false)),
                             new DataField(3, "_NUM_DELETED_FILES", new 
BigIntType(false)),
                             new DataField(4, "_PARTITION_STATS", 
SimpleStats.SCHEMA),
-                            new DataField(5, "_SCHEMA_ID", new 
BigIntType(false))));
+                            new DataField(5, "_SCHEMA_ID", new 
BigIntType(false)),
+                            new DataField(6, "_MIN_BUCKET", new IntType(true)),
+                            new DataField(7, "_MAX_BUCKET", new IntType(true)),
+                            new DataField(8, "_MIN_LEVEL", new IntType(true)),
+                            new DataField(9, "_MAX_LEVEL", new 
IntType(true))));
 
     private final String fileName;
     private final long fileSize;
@@ -55,6 +62,10 @@ public class ManifestFileMeta {
     private final long numDeletedFiles;
     private final SimpleStats partitionStats;
     private final long schemaId;
+    private final @Nullable Integer minBucket;
+    private final @Nullable Integer maxBucket;
+    private final @Nullable Integer minLevel;
+    private final @Nullable Integer maxLevel;
 
     public ManifestFileMeta(
             String fileName,
@@ -62,13 +73,21 @@ public class ManifestFileMeta {
             long numAddedFiles,
             long numDeletedFiles,
             SimpleStats partitionStats,
-            long schemaId) {
+            long schemaId,
+            @Nullable Integer minBucket,
+            @Nullable Integer maxBucket,
+            @Nullable Integer minLevel,
+            @Nullable Integer maxLevel) {
         this.fileName = fileName;
         this.fileSize = fileSize;
         this.numAddedFiles = numAddedFiles;
         this.numDeletedFiles = numDeletedFiles;
         this.partitionStats = partitionStats;
         this.schemaId = schemaId;
+        this.minBucket = minBucket;
+        this.maxBucket = maxBucket;
+        this.minLevel = minLevel;
+        this.maxLevel = maxLevel;
     }
 
     public String fileName() {
@@ -95,6 +114,22 @@ public class ManifestFileMeta {
         return schemaId;
     }
 
+    public @Nullable Integer minBucket() {
+        return minBucket;
+    }
+
+    public @Nullable Integer maxBucket() {
+        return maxBucket;
+    }
+
+    public @Nullable Integer minLevel() {
+        return minLevel;
+    }
+
+    public @Nullable Integer maxLevel() {
+        return maxLevel;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (!(o instanceof ManifestFileMeta)) {
@@ -106,20 +141,42 @@ public class ManifestFileMeta {
                 && numAddedFiles == that.numAddedFiles
                 && numDeletedFiles == that.numDeletedFiles
                 && Objects.equals(partitionStats, that.partitionStats)
-                && schemaId == that.schemaId;
+                && schemaId == that.schemaId
+                && Objects.equals(minBucket, that.minBucket)
+                && Objects.equals(maxBucket, that.maxBucket)
+                && Objects.equals(minLevel, that.minLevel)
+                && Objects.equals(maxLevel, that.maxLevel);
     }
 
     @Override
     public int hashCode() {
         return Objects.hash(
-                fileName, fileSize, numAddedFiles, numDeletedFiles, 
partitionStats, schemaId);
+                fileName,
+                fileSize,
+                numAddedFiles,
+                numDeletedFiles,
+                partitionStats,
+                schemaId,
+                minBucket,
+                maxBucket,
+                minLevel,
+                maxLevel);
     }
 
     @Override
     public String toString() {
         return String.format(
-                "{%s, %d, %d, %d, %s, %d}",
-                fileName, fileSize, numAddedFiles, numDeletedFiles, 
partitionStats, schemaId);
+                "{%s, %d, %d, %d, %s, %d, %s, %s, %s, %s}",
+                fileName,
+                fileSize,
+                numAddedFiles,
+                numDeletedFiles,
+                partitionStats,
+                schemaId,
+                minBucket,
+                maxBucket,
+                minLevel,
+                maxLevel);
     }
 
     // ----------------------- Serialization -----------------------------
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMetaSerializer.java
 
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMetaSerializer.java
index fe791e5638..b3819e1277 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMetaSerializer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMetaSerializer.java
@@ -46,7 +46,11 @@ public class ManifestFileMetaSerializer extends 
VersionedObjectSerializer<Manife
                 meta.numAddedFiles(),
                 meta.numDeletedFiles(),
                 meta.partitionStats().toRow(),
-                meta.schemaId());
+                meta.schemaId(),
+                meta.minBucket(),
+                meta.maxBucket(),
+                meta.minLevel(),
+                meta.maxLevel());
     }
 
     @Override
@@ -60,12 +64,17 @@ public class ManifestFileMetaSerializer extends 
VersionedObjectSerializer<Manife
             }
             throw new IllegalArgumentException("Unsupported version: " + 
version);
         }
+
         return new ManifestFileMeta(
                 row.getString(0).toString(),
                 row.getLong(1),
                 row.getLong(2),
                 row.getLong(3),
                 SimpleStats.fromRow(row.getRow(4, 3)),
-                row.getLong(5));
+                row.getLong(5),
+                row.isNullAt(6) ? null : row.getInt(6),
+                row.isNullAt(7) ? null : row.getInt(7),
+                row.isNullAt(8) ? null : row.getInt(8),
+                row.isNullAt(9) ? null : row.getInt(9));
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestList.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestList.java
index ab2751ab70..1f2b235dfa 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestList.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestList.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.manifest;
 
 import org.apache.paimon.Snapshot;
+import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.format.FileFormat;
 import org.apache.paimon.format.FormatReaderFactory;
 import org.apache.paimon.format.FormatWriterFactory;
@@ -26,6 +27,7 @@ import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.ObjectSerializer;
 import org.apache.paimon.utils.ObjectsFile;
 import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.PathFactory;
@@ -44,9 +46,10 @@ import java.util.List;
  */
 public class ManifestList extends ObjectsFile<ManifestFileMeta> {
 
-    private ManifestList(
+    @VisibleForTesting
+    public ManifestList(
             FileIO fileIO,
-            ManifestFileMetaSerializer serializer,
+            ObjectSerializer<ManifestFileMeta> serializer,
             RowType schema,
             FormatReaderFactory readerFactory,
             FormatWriterFactory writerFactory,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index 861128155d..ffa88bb259 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -79,6 +79,7 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
 
     private Snapshot specifiedSnapshot = null;
     private boolean onlyReadRealBuckets = false;
+    private Integer specifiedBucket = null;
     private Filter<Integer> bucketFilter = null;
     private BiFilter<Integer, Integer> totalAwareBucketFilter = null;
     protected ScanMode scanMode = ScanMode.ALL;
@@ -132,14 +133,16 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
     }
 
     @Override
-    public FileStoreScan withBucket(int bucket) {
-        this.bucketFilter = i -> i == bucket;
+    public FileStoreScan onlyReadRealBuckets() {
+        manifestsReader.onlyReadRealBuckets();
+        this.onlyReadRealBuckets = true;
         return this;
     }
 
     @Override
-    public FileStoreScan onlyReadRealBuckets() {
-        this.onlyReadRealBuckets = true;
+    public FileStoreScan withBucket(int bucket) {
+        manifestsReader.withBucket(bucket);
+        specifiedBucket = bucket;
         return this;
     }
 
@@ -508,6 +511,10 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
                 return false;
             }
 
+            if (specifiedBucket != null && bucket != specifiedBucket) {
+                return false;
+            }
+
             if (bucketFilter != null && !bucketFilter.test(bucket)) {
                 return false;
             }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java
index 2eaa3646f7..afcbb82b73 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java
@@ -48,6 +48,8 @@ public class ManifestsReader {
     private final SnapshotManager snapshotManager;
     private final ManifestList.Factory manifestListFactory;
 
+    private boolean onlyReadRealBuckets = false;
+    @Nullable private Integer specifiedBucket = null;
     @Nullable private PartitionPredicate partitionFilter = null;
 
     public ManifestsReader(
@@ -61,6 +63,16 @@ public class ManifestsReader {
         this.manifestListFactory = manifestListFactory;
     }
 
+    public ManifestsReader onlyReadRealBuckets() {
+        this.onlyReadRealBuckets = true;
+        return this;
+    }
+
+    public ManifestsReader withBucket(int bucket) {
+        this.specifiedBucket = bucket;
+        return this;
+    }
+
     public ManifestsReader withPartitionFilter(Predicate predicate) {
         this.partitionFilter = PartitionPredicate.fromPredicate(partitionType, 
predicate);
         return this;
@@ -123,6 +135,18 @@ public class ManifestsReader {
 
     /** Note: Keep this thread-safe. */
     private boolean filterManifestFileMeta(ManifestFileMeta manifest) {
+        Integer minBucket = manifest.minBucket();
+        Integer maxBucket = manifest.maxBucket();
+        if (minBucket != null && maxBucket != null) {
+            if (onlyReadRealBuckets && maxBucket < 0) {
+                return false;
+            }
+            if (specifiedBucket != null
+                    && (specifiedBucket < minBucket || specifiedBucket > 
maxBucket)) {
+                return false;
+            }
+        }
+
         if (partitionFilter == null) {
             return true;
         }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
index 17a5b161b0..c6de102581 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
@@ -21,7 +21,6 @@ package org.apache.paimon.table.source;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.CoreOptions.ChangelogProducer;
 import org.apache.paimon.Snapshot;
-import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.consumer.Consumer;
 import org.apache.paimon.consumer.ConsumerManager;
 import org.apache.paimon.data.BinaryRow;
@@ -78,7 +77,7 @@ abstract class AbstractDataTableScan implements DataTableScan 
{
         this.snapshotReader = snapshotReader;
     }
 
-    @VisibleForTesting
+    @Override
     public AbstractDataTableScan withBucket(int bucket) {
         snapshotReader.withBucket(bucket);
         return this;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
index f7d609187d..1c7b153184 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
@@ -47,6 +47,10 @@ public interface InnerTableScan extends TableScan {
         return this;
     }
 
+    default InnerTableScan withBucket(int bucket) {
+        return this;
+    }
+
     default InnerTableScan withBucketFilter(Filter<Integer> bucketFilter) {
         return this;
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java
index d12de1211b..212d08d907 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java
@@ -100,6 +100,8 @@ public interface ReadBuilder extends Serializable {
     /** Push partition filter. */
     ReadBuilder withPartitionFilter(Map<String, String> partitionSpec);
 
+    ReadBuilder withBucket(int bucket);
+
     /**
      * Push bucket filter. Note that this method cannot be used simultaneously 
with {@link
      * #withShard(int, int)}.
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
index 95bfe6f24b..f86e742269 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
@@ -47,6 +47,7 @@ public class ReadBuilderImpl implements ReadBuilder {
 
     private Map<String, String> partitionSpec;
 
+    private @Nullable Integer specifiedBucket = null;
     private Filter<Integer> bucketFilter;
 
     private @Nullable RowType readType;
@@ -120,6 +121,12 @@ public class ReadBuilderImpl implements ReadBuilder {
         return this;
     }
 
+    @Override
+    public ReadBuilder withBucket(int bucket) {
+        this.specifiedBucket = bucket;
+        return this;
+    }
+
     @Override
     public ReadBuilder withBucketFilter(Filter<Integer> bucketFilter) {
         this.bucketFilter = bucketFilter;
@@ -161,6 +168,9 @@ public class ReadBuilderImpl implements ReadBuilder {
                         "Unsupported table scan type for shard configuring, 
the scan is: " + scan);
             }
         }
+        if (specifiedBucket != null) {
+            scan.withBucket(specifiedBucket);
+        }
         if (bucketFilter != null) {
             scan.withBucketFilter(bucketFilter);
         }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMetaSerializer.java
 
b/paimon-core/src/test/java/org/apache/paimon/manifest/LegacyManifestFileMetaSerializerPaimon10.java
similarity index 64%
copy from 
paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMetaSerializer.java
copy to 
paimon-core/src/test/java/org/apache/paimon/manifest/LegacyManifestFileMetaSerializerPaimon10.java
index fe791e5638..10a31bd67b 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMetaSerializer.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/manifest/LegacyManifestFileMetaSerializerPaimon10.java
@@ -22,15 +22,34 @@ import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.stats.SimpleStats;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.VarCharType;
 import org.apache.paimon.utils.VersionedObjectSerializer;
 
-/** Serializer for {@link ManifestFileMeta}. */
-public class ManifestFileMetaSerializer extends 
VersionedObjectSerializer<ManifestFileMeta> {
+import java.util.Arrays;
+
+/** Legacy serializer for {@link ManifestFileMeta} in Paimon 1.0. */
+public class LegacyManifestFileMetaSerializerPaimon10
+        extends VersionedObjectSerializer<ManifestFileMeta> {
 
     private static final long serialVersionUID = 1L;
 
-    public ManifestFileMetaSerializer() {
-        super(ManifestFileMeta.SCHEMA);
+    public static final RowType SCHEMA =
+            new RowType(
+                    false,
+                    Arrays.asList(
+                            new DataField(
+                                    0, "_FILE_NAME", new VarCharType(false, 
Integer.MAX_VALUE)),
+                            new DataField(1, "_FILE_SIZE", new 
BigIntType(false)),
+                            new DataField(2, "_NUM_ADDED_FILES", new 
BigIntType(false)),
+                            new DataField(3, "_NUM_DELETED_FILES", new 
BigIntType(false)),
+                            new DataField(4, "_PARTITION_STATS", 
SimpleStats.SCHEMA),
+                            new DataField(5, "_SCHEMA_ID", new 
BigIntType(false))));
+
+    public LegacyManifestFileMetaSerializerPaimon10() {
+        super(SCHEMA);
     }
 
     @Override
@@ -60,12 +79,17 @@ public class ManifestFileMetaSerializer extends 
VersionedObjectSerializer<Manife
             }
             throw new IllegalArgumentException("Unsupported version: " + 
version);
         }
+
         return new ManifestFileMeta(
                 row.getString(0).toString(),
                 row.getLong(1),
                 row.getLong(2),
                 row.getLong(3),
                 SimpleStats.fromRow(row.getRow(4, 3)),
-                row.getLong(5));
+                row.getLong(5),
+                null,
+                null,
+                null,
+                null);
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestListTest.java 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestListTest.java
index a6aaf3530f..875aa7a2b4 100644
--- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestListTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestListTest.java
@@ -25,8 +25,10 @@ import org.apache.paimon.fs.FileIOFinder;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.options.Options;
+import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FailingFileIO;
 import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.VersionedObjectSerializer;
 
 import org.junit.jupiter.api.RepeatedTest;
 import org.junit.jupiter.api.Test;
@@ -84,6 +86,67 @@ public class ManifestListTest {
         assertThat(manifestListName.startsWith("manifest-list-")).isTrue();
     }
 
+    // ============================ Compatibility tests 
===================================
+
+    @Test
+    public void testCanReadOldMetaPaimon10() throws Exception {
+        ManifestList legacyManifestList = createLegacyManifestListPaimon10();
+        List<ManifestFileMeta> metas = generateData();
+        String manifestListName = legacyManifestList.write(metas).getKey();
+
+        ManifestList manifestList = createManifestList(tempDir.toString());
+        List<ManifestFileMeta> actualMetas = 
manifestList.read(manifestListName);
+        assertThat(actualMetas).isEqualTo(getLegacyMetaPaimon10(metas));
+    }
+
+    @Test
+    public void testOldReaderCanReadNewMetaPaimon10() throws Exception {
+        ManifestList manifestList = createManifestList(tempDir.toString());
+        List<ManifestFileMeta> metas = generateData();
+        String manifestListName = manifestList.write(metas).getKey();
+
+        ManifestList legacyManifestList = createLegacyManifestListPaimon10();
+        List<ManifestFileMeta> actualMetas = 
legacyManifestList.read(manifestListName);
+        assertThat(actualMetas).isEqualTo(getLegacyMetaPaimon10(metas));
+    }
+
+    private ManifestList createLegacyManifestListPaimon10() {
+        FileStorePathFactory pathFactory = 
createPathFactory(tempDir.toString());
+        RowType legacyMetaType =
+                VersionedObjectSerializer.versionType(
+                        LegacyManifestFileMetaSerializerPaimon10.SCHEMA);
+        return new ManifestList(
+                LocalFileIO.create(),
+                new LegacyManifestFileMetaSerializerPaimon10(),
+                legacyMetaType,
+                avro.createReaderFactory(legacyMetaType),
+                avro.createWriterFactory(legacyMetaType),
+                "zstd",
+                pathFactory.manifestListFactory(),
+                null);
+    }
+
+    private List<ManifestFileMeta> 
getLegacyMetaPaimon10(List<ManifestFileMeta> metas) {
+        List<ManifestFileMeta> result = new ArrayList<>();
+        for (ManifestFileMeta meta : metas) {
+            result.add(
+                    new ManifestFileMeta(
+                            meta.fileName(),
+                            meta.fileSize(),
+                            meta.numAddedFiles(),
+                            meta.numDeletedFiles(),
+                            meta.partitionStats(),
+                            meta.schemaId(),
+                            null,
+                            null,
+                            null,
+                            null));
+        }
+        return result;
+    }
+
+    // ============================ Test utils 
===================================
+
     private List<ManifestFileMeta> generateData() {
         Random random = new Random();
         List<ManifestFileMeta> metas = new ArrayList<>();
@@ -97,22 +160,25 @@ public class ManifestListTest {
         return metas;
     }
 
+    private FileStorePathFactory createPathFactory(String pathStr) {
+        return new FileStorePathFactory(
+                new Path(pathStr),
+                TestKeyValueGenerator.DEFAULT_PART_TYPE,
+                "default",
+                CoreOptions.FILE_FORMAT.defaultValue().toString(),
+                CoreOptions.DATA_FILE_PREFIX.defaultValue(),
+                CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
+                CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue(),
+                CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
+                CoreOptions.FILE_COMPRESSION.defaultValue(),
+                null,
+                null);
+    }
+
     private ManifestList createManifestList(String pathStr) {
-        Path path = new Path(pathStr);
-        FileStorePathFactory pathFactory =
-                new FileStorePathFactory(
-                        path,
-                        TestKeyValueGenerator.DEFAULT_PART_TYPE,
-                        "default",
-                        CoreOptions.FILE_FORMAT.defaultValue().toString(),
-                        CoreOptions.DATA_FILE_PREFIX.defaultValue(),
-                        CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
-                        
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue(),
-                        
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
-                        CoreOptions.FILE_COMPRESSION.defaultValue(),
-                        null,
-                        null);
-        return new ManifestList.Factory(FileIOFinder.find(path), avro, "zstd", 
pathFactory, null)
+        FileStorePathFactory pathFactory = createPathFactory(pathStr);
+        return new ManifestList.Factory(
+                        FileIOFinder.find(new Path(pathStr)), avro, "zstd", 
pathFactory, null)
                 .create();
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestTestDataGenerator.java
 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestTestDataGenerator.java
index 0283eda853..959d3f9e3e 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestTestDataGenerator.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestTestDataGenerator.java
@@ -63,7 +63,7 @@ public class ManifestTestDataGenerator {
     }
 
     public ManifestEntry next() {
-        if (bufferedResults.size() > 0) {
+        if (!bufferedResults.isEmpty()) {
             return bufferedResults.poll();
         }
 
@@ -99,6 +99,10 @@ public class ManifestTestDataGenerator {
 
         long numAddedFiles = 0;
         long numDeletedFiles = 0;
+        int minBucket = Integer.MAX_VALUE;
+        int maxBucket = Integer.MIN_VALUE;
+        int minLevel = Integer.MAX_VALUE;
+        int maxLevel = Integer.MIN_VALUE;
         for (ManifestEntry entry : entries) {
             collector.collect(entry.partition());
             if (entry.kind() == FileKind.ADD) {
@@ -106,6 +110,10 @@ public class ManifestTestDataGenerator {
             } else {
                 numDeletedFiles++;
             }
+            minBucket = Math.min(minBucket, entry.bucket());
+            maxBucket = Math.max(maxBucket, entry.bucket());
+            minLevel = Math.min(minLevel, entry.level());
+            maxLevel = Math.max(maxLevel, entry.level());
         }
 
         return new ManifestFileMeta(
@@ -114,7 +122,11 @@ public class ManifestTestDataGenerator {
                 numAddedFiles,
                 numDeletedFiles,
                 serializer.toBinaryAllMode(collector.extract()),
-                0);
+                0,
+                minBucket,
+                maxBucket,
+                minLevel,
+                maxLevel);
     }
 
     private void mergeLevelsIfNeeded(BinaryRow partition, int bucket) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
index 9722847852..91bb30b45b 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
@@ -41,7 +41,6 @@ import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.utils.Filter;
 import org.apache.paimon.utils.InternalRowPartitionComputer;
 import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.Preconditions;
@@ -58,7 +57,6 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
-import java.io.Serializable;
 import java.time.Duration;
 import java.util.Collections;
 import java.util.HashMap;
@@ -248,9 +246,9 @@ public class CompactAction extends TableActionBase {
 
         List<BinaryRow> partitions =
                 fileStoreTable
-                        .newScan()
-                        .withBucketFilter(new PostponeBucketFilter())
-                        .listPartitions();
+                        .newSnapshotReader()
+                        .withBucket(BucketMode.POSTPONE_BUCKET)
+                        .partitions();
         if (partitions.isEmpty()) {
             return false;
         }
@@ -266,10 +264,8 @@ public class CompactAction extends TableActionBase {
             int bucketNum = 
options.get(FlinkConnectorOptions.POSTPONE_DEFAULT_BUCKET_NUM);
 
             Iterator<ManifestEntry> it =
-                    fileStoreTable
-                            .newSnapshotReader()
+                    table.newSnapshotReader()
                             
.withPartitionFilter(Collections.singletonList(partition))
-                            .withBucketFilter(new NormalBucketFilter())
                             .readFileIterator();
             if (it.hasNext()) {
                 bucketNum = it.next().totalBuckets();
@@ -289,7 +285,7 @@ public class CompactAction extends TableActionBase {
                             realTable
                                     .newReadBuilder()
                                     .withPartitionFilter(partitionSpec)
-                                    .withBucketFilter(new 
PostponeBucketFilter()),
+                                    .withBucket(BucketMode.POSTPONE_BUCKET),
                             
options.get(FlinkConnectorOptions.SCAN_PARALLELISM));
 
             DataStream<InternalRow> partitioned =
@@ -314,26 +310,6 @@ public class CompactAction extends TableActionBase {
         return true;
     }
 
-    private static class PostponeBucketFilter implements Filter<Integer>, 
Serializable {
-
-        private static final long serialVersionUID = 1L;
-
-        @Override
-        public boolean test(Integer bucket) {
-            return bucket == BucketMode.POSTPONE_BUCKET;
-        }
-    }
-
-    private static class NormalBucketFilter implements Filter<Integer>, 
Serializable {
-
-        private static final long serialVersionUID = 1L;
-
-        @Override
-        public boolean test(Integer bucket) {
-            return bucket >= 0;
-        }
-    }
-
     @Override
     public void run() throws Exception {
         if (buildImpl()) {

Reply via email to