This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 6210708b09 [core] Add totalBuckets to CommitMessage and DataSplit 
(#5343)
6210708b09 is described below

commit 6210708b096a98ad4af67724e75e008f8c31b843
Author: tsreaper <[email protected]>
AuthorDate: Thu Mar 27 15:55:54 2025 +0800

    [core] Add totalBuckets to CommitMessage and DataSplit (#5343)
---
 .../paimon/append/UnawareAppendCompactionTask.java |   6 +-
 .../paimon/iceberg/migrate/IcebergMigrator.java    |   3 +-
 .../org/apache/paimon/migrate/FileMetaUtils.java   |   4 +-
 .../paimon/operation/AbstractFileStoreWrite.java   |  51 +++++--
 .../paimon/operation/FileStoreCommitImpl.java      |   7 +-
 .../org/apache/paimon/operation/FileStoreScan.java |   7 +-
 .../apache/paimon/operation/FileStoreWrite.java    |  16 ++-
 .../apache/paimon/table/sink/CommitMessage.java    |   6 +
 .../paimon/table/sink/CommitMessageImpl.java       |  20 ++-
 .../sink/CommitMessageLegacyV2Serializer.java      |   1 +
 .../paimon/table/sink/CommitMessageSerializer.java |  13 +-
 .../apache/paimon/table/sink/TableCommitImpl.java  |   2 +-
 .../org/apache/paimon/table/source/DataSplit.java  |  23 +++-
 .../snapshot/IncrementalDeltaStartingScanner.java  |  24 ++--
 .../table/source/snapshot/SnapshotReaderImpl.java  |  66 ++++++---
 .../org/apache/paimon/TestAppendFileStore.java     |   2 +
 .../test/java/org/apache/paimon/TestFileStore.java |   1 +
 .../DeletionVectorsMaintainerTest.java             |   2 +
 .../paimon/index/HashBucketAssignerTest.java       |  21 +--
 ...festCommittableSerializerCompatibilityTest.java |  78 +++++++++++
 .../ManifestCommittableSerializerTest.java         |  17 +--
 .../paimon/operation/FileStoreTestUtils.java       |   1 +
 .../paimon/operation/PartitionExpireTest.java      |   1 +
 .../apache/paimon/operation/TestCommitThread.java  |  22 ++-
 .../table/sink/CommitMessageSerializerTest.java    |  10 +-
 .../org/apache/paimon/table/source/SplitTest.java  |  68 ++++++++++
 .../src/test/resources/compatibility/datasplit-v6  | Bin 0 -> 976 bytes
 .../compatibility/manifest-committable-v7          | Bin 0 -> 3454 bytes
 .../apache/paimon/flink/action/CompactAction.java  |  22 ++-
 .../flink/action/RemoveUnexistingFilesAction.java  |   1 +
 ...ucketNewFilesCompactionCoordinatorOperator.java |   4 +
 ...wareBucketNewFilesCompactionWorkerOperator.java |   1 +
 .../ChangelogCompactCoordinateOperator.java        |   2 +
 .../compact/changelog/ChangelogCompactTask.java    |   9 ++
 .../changelog/ChangelogCompactTaskSerializer.java  |   5 +-
 .../postpone/PostponeBucketCompactSplitSource.java |   1 +
 .../RemovePostponeBucketFilesOperator.java         |   1 +
 .../RewritePostponeBucketCommittableOperator.java  |   9 +-
 .../paimon/flink/sink/RewriteFileIndexSink.java    |  10 +-
 .../paimon/flink/PostponeBucketTableITCase.java    |  36 ++---
 ...tNewFilesCompactionCoordinatorOperatorTest.java |   1 +
 .../ChangelogCompactTaskSerializerTest.java        |   3 +-
 .../flink/sink/CommittableSerializerTest.java      |   2 +-
 .../paimon/flink/sink/CommitterOperatorTest.java   |   6 +
 .../sink/MultiTableCommittableSerializerTest.java  |   4 +-
 .../WrappedManifestCommittableSerializerTest.java  |  13 +-
 .../sink/partition/PartitionMarkDoneTest.java      |   2 +
 .../format/orc/filter/OrcSimpleStatsExtractor.java | 148 +++++++++------------
 .../apache/paimon/hive/migrate/HiveMigrator.java   |   3 +-
 .../scala/org/apache/paimon/spark/ScanHelper.scala |   1 +
 .../paimon/spark/commands/PaimonCommand.scala      |   1 +
 .../paimon/spark/commands/PaimonSparkWriter.scala  |   2 +
 .../spark/commands/SparkDeletionVectors.scala      |   7 +-
 .../procedure/SparkRemoveUnexistingFiles.scala     |   1 +
 54 files changed, 549 insertions(+), 218 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendCompactionTask.java
 
b/paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendCompactionTask.java
index b510f8346e..37c3b78af0 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendCompactionTask.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendCompactionTask.java
@@ -108,8 +108,10 @@ public class UnawareAppendCompactionTask {
                 new CompactIncrement(compactBefore, compactAfter, 
Collections.emptyList());
         return new CommitMessageImpl(
                 partition,
-                0, // bucket 0 is bucket for unaware-bucket table for 
compatibility with the old
-                // design
+                // bucket 0 is bucket for unaware-bucket table
+                // for compatibility with the old design
+                0,
+                table.coreOptions().bucket(),
                 DataIncrement.emptyIncrement(),
                 compactIncrement,
                 indexIncrement);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrator.java
 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrator.java
index 8fa08287d8..29ae1bdde4 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrator.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrator.java
@@ -446,7 +446,8 @@ public class IcebergMigrator implements Migrator {
             }
             List<DataFileMeta> fileMetas =
                     construct(icebergDataFileMetas, fileIO, paimonTable, 
newDir, rollback);
-            return FileMetaUtils.commitFile(partitionRow, fileMetas);
+            return FileMetaUtils.commitFile(
+                    partitionRow, paimonTable.coreOptions().bucket(), 
fileMetas);
         }
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
index 0057b2b432..e72120abac 100644
--- a/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
@@ -83,10 +83,12 @@ public class FileMetaUtils {
                 .collect(Collectors.toList());
     }
 
-    public static CommitMessage commitFile(BinaryRow partition, 
List<DataFileMeta> dataFileMetas) {
+    public static CommitMessage commitFile(
+            BinaryRow partition, int totalBuckets, List<DataFileMeta> 
dataFileMetas) {
         return new CommitMessageImpl(
                 partition,
                 0,
+                totalBuckets,
                 new DataIncrement(dataFileMetas, Collections.emptyList(), 
Collections.emptyList()),
                 new CompactIncrement(
                         Collections.emptyList(), Collections.emptyList(), 
Collections.emptyList()));
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
index f045a056c9..37647bc9e7 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
@@ -74,7 +74,7 @@ public abstract class AbstractFileStoreWrite<T> implements 
FileStoreWrite<T> {
     private final int writerNumberMax;
     @Nullable private final IndexMaintainer.Factory<T> indexFactory;
     @Nullable private final DeletionVectorsMaintainer.Factory 
dvMaintainerFactory;
-    private final int totalBuckets;
+    private final int numBuckets;
     private final RowType partitionType;
 
     @Nullable protected IOManager ioManager;
@@ -84,12 +84,13 @@ public abstract class AbstractFileStoreWrite<T> implements 
FileStoreWrite<T> {
     private ExecutorService lazyCompactExecutor;
     private boolean closeCompactExecutorWhenLeaving = true;
     private boolean ignorePreviousFiles = false;
+    private boolean ignoreNumBucketCheck = false;
     protected boolean isStreamingMode = false;
 
     protected CompactionMetrics compactionMetrics = null;
     protected final String tableName;
     private boolean isInsertOnly;
-    private boolean legacyPartitionName;
+    private final boolean legacyPartitionName;
 
     protected AbstractFileStoreWrite(
             SnapshotManager snapshotManager,
@@ -98,7 +99,7 @@ public abstract class AbstractFileStoreWrite<T> implements 
FileStoreWrite<T> {
             @Nullable DeletionVectorsMaintainer.Factory dvMaintainerFactory,
             String tableName,
             CoreOptions options,
-            int totalBuckets,
+            int numBuckets,
             RowType partitionType,
             int writerNumberMax,
             boolean legacyPartitionName) {
@@ -112,7 +113,7 @@ public abstract class AbstractFileStoreWrite<T> implements 
FileStoreWrite<T> {
         }
         this.indexFactory = indexFactory;
         this.dvMaintainerFactory = dvMaintainerFactory;
-        this.totalBuckets = totalBuckets;
+        this.numBuckets = numBuckets;
         this.partitionType = partitionType;
         this.writers = new HashMap<>();
         this.tableName = tableName;
@@ -136,6 +137,11 @@ public abstract class AbstractFileStoreWrite<T> implements 
FileStoreWrite<T> {
         this.ignorePreviousFiles = ignorePreviousFiles;
     }
 
+    @Override
+    public void withIgnoreNumBucketCheck(boolean ignoreNumBucketCheck) {
+        this.ignoreNumBucketCheck = ignoreNumBucketCheck;
+    }
+
     @Override
     public void withCompactExecutor(ExecutorService compactExecutor) {
         this.lazyCompactExecutor = compactExecutor;
@@ -228,6 +234,7 @@ public abstract class AbstractFileStoreWrite<T> implements 
FileStoreWrite<T> {
                         new CommitMessageImpl(
                                 partition,
                                 bucket,
+                                writerContainer.totalBuckets,
                                 increment.newFilesIncrement(),
                                 increment.compactIncrement(),
                                 new IndexIncrement(newIndexFiles));
@@ -348,6 +355,7 @@ public abstract class AbstractFileStoreWrite<T> implements 
FileStoreWrite<T> {
                         new State<>(
                                 partition,
                                 bucket,
+                                writerContainer.totalBuckets,
                                 writerContainer.baseSnapshotId,
                                 writerContainer.lastModifiedCommitIdentifier,
                                 dataFiles,
@@ -380,6 +388,7 @@ public abstract class AbstractFileStoreWrite<T> implements 
FileStoreWrite<T> {
             WriterContainer<T> writerContainer =
                     new WriterContainer<>(
                             writer,
+                            state.totalBuckets,
                             state.indexMaintainer,
                             state.deletionVectorsMaintainer,
                             state.baseSnapshotId);
@@ -409,7 +418,7 @@ public abstract class AbstractFileStoreWrite<T> implements 
FileStoreWrite<T> {
     }
 
     private long writerNumber() {
-        return writers.values().stream().mapToLong(e -> 
e.values().size()).sum();
+        return writers.values().stream().mapToLong(Map::size).sum();
     }
 
     @VisibleForTesting
@@ -429,9 +438,13 @@ public abstract class AbstractFileStoreWrite<T> implements 
FileStoreWrite<T> {
 
         Snapshot latestSnapshot = snapshotManager.latestSnapshot();
         List<DataFileMeta> restoreFiles = new ArrayList<>();
+        int totalBuckets;
         if (!ignorePreviousFiles && latestSnapshot != null) {
-            restoreFiles = scanExistingFileMetas(latestSnapshot, partition, 
bucket);
+            totalBuckets = scanExistingFileMetas(latestSnapshot, partition, 
bucket, restoreFiles);
+        } else {
+            totalBuckets = getDefaultBucketNum(partition);
         }
+
         IndexMaintainer<T> indexMaintainer =
                 indexFactory == null
                         ? null
@@ -455,6 +468,7 @@ public abstract class AbstractFileStoreWrite<T> implements 
FileStoreWrite<T> {
         notifyNewWriter(writer);
         return new WriterContainer<>(
                 writer,
+                totalBuckets,
                 indexMaintainer,
                 deletionVectorsMaintainer,
                 latestSnapshot == null ? null : latestSnapshot.id());
@@ -471,13 +485,16 @@ public abstract class AbstractFileStoreWrite<T> 
implements FileStoreWrite<T> {
         return this;
     }
 
-    private List<DataFileMeta> scanExistingFileMetas(
-            Snapshot snapshot, BinaryRow partition, int bucket) {
-        List<DataFileMeta> existingFileMetas = new ArrayList<>();
+    private int scanExistingFileMetas(
+            Snapshot snapshot,
+            BinaryRow partition,
+            int bucket,
+            List<DataFileMeta> existingFileMetas) {
         List<ManifestEntry> files =
                 scan.withSnapshot(snapshot).withPartitionBucket(partition, 
bucket).plan().files();
+        int totalBuckets = getDefaultBucketNum(partition);
         for (ManifestEntry entry : files) {
-            if (entry.totalBuckets() != totalBuckets) {
+            if (!ignoreNumBucketCheck && entry.totalBuckets() != numBuckets) {
                 String partInfo =
                         partitionType.getFieldCount() > 0
                                 ? "partition "
@@ -491,11 +508,18 @@ public abstract class AbstractFileStoreWrite<T> 
implements FileStoreWrite<T> {
                         String.format(
                                 "Try to write %s with a new bucket num %d, but 
the previous bucket num is %d. "
                                         + "Please switch to batch mode, and 
perform INSERT OVERWRITE to rescale current data layout first.",
-                                partInfo, totalBuckets, entry.totalBuckets()));
+                                partInfo, numBuckets, entry.totalBuckets()));
             }
+            totalBuckets = entry.totalBuckets();
             existingFileMetas.add(entry.file());
         }
-        return existingFileMetas;
+        return totalBuckets;
+    }
+
+    // TODO see comments on FileStoreWrite#withIgnoreNumBucketCheck for what 
is needed to support
+    //  writing partitions with different buckets
+    public int getDefaultBucketNum(BinaryRow partition) {
+        return numBuckets;
     }
 
     private ExecutorService compactExecutor() {
@@ -534,6 +558,7 @@ public abstract class AbstractFileStoreWrite<T> implements 
FileStoreWrite<T> {
     @VisibleForTesting
     public static class WriterContainer<T> {
         public final RecordWriter<T> writer;
+        public final int totalBuckets;
         @Nullable public final IndexMaintainer<T> indexMaintainer;
         @Nullable public final DeletionVectorsMaintainer 
deletionVectorsMaintainer;
         protected final long baseSnapshotId;
@@ -541,10 +566,12 @@ public abstract class AbstractFileStoreWrite<T> 
implements FileStoreWrite<T> {
 
         protected WriterContainer(
                 RecordWriter<T> writer,
+                int totalBuckets,
                 @Nullable IndexMaintainer<T> indexMaintainer,
                 @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer,
                 Long baseSnapshotId) {
             this.writer = writer;
+            this.totalBuckets = totalBuckets;
             this.indexMaintainer = indexMaintainer;
             this.deletionVectorsMaintainer = deletionVectorsMaintainer;
             this.baseSnapshotId =
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index ba66294262..ee672f4ac6 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -704,8 +704,13 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
     }
 
     private ManifestEntry makeEntry(FileKind kind, CommitMessage 
commitMessage, DataFileMeta file) {
+        Integer totalBuckets = commitMessage.totalBuckets();
+        if (totalBuckets == null) {
+            totalBuckets = numBucket;
+        }
+
         return new ManifestEntry(
-                kind, commitMessage.partition(), commitMessage.bucket(), 
numBucket, file);
+                kind, commitMessage.partition(), commitMessage.bucket(), 
totalBuckets, file);
     }
 
     private int tryCommit(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
index 3e0bd25475..3b77c0a1a1 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
@@ -20,7 +20,6 @@ package org.apache.paimon.operation;
 
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.manifest.BucketEntry;
 import org.apache.paimon.manifest.FileKind;
 import org.apache.paimon.manifest.ManifestCacheFilter;
@@ -148,13 +147,13 @@ public interface FileStoreScan {
         }
 
         /** Return a map group by partition and bucket. */
-        static Map<BinaryRow, Map<Integer, List<DataFileMeta>>> 
groupByPartFiles(
+        static Map<BinaryRow, Map<Integer, List<ManifestEntry>>> 
groupByPartFiles(
                 List<ManifestEntry> files) {
-            Map<BinaryRow, Map<Integer, List<DataFileMeta>>> groupBy = new 
LinkedHashMap<>();
+            Map<BinaryRow, Map<Integer, List<ManifestEntry>>> groupBy = new 
LinkedHashMap<>();
             for (ManifestEntry entry : files) {
                 groupBy.computeIfAbsent(entry.partition(), k -> new 
LinkedHashMap<>())
                         .computeIfAbsent(entry.bucket(), k -> new 
ArrayList<>())
-                        .add(entry.file());
+                        .add(entry);
             }
             return groupBy;
         }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java
index 993e07d532..f10f9fb62c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java
@@ -73,6 +73,16 @@ public interface FileStoreWrite<T> extends 
Restorable<List<FileStoreWrite.State<
      */
     void withIgnorePreviousFiles(boolean ignorePreviousFiles);
 
+    /**
+     * Ignores the check that the written partition must have the same number 
of buckets with the
+     * table option.
+     *
+     * <p>TODO: to support writing partitions with different total buckets, 
we'll also need a
+     * special {@link org.apache.paimon.table.sink.ChannelComputer} and {@link
+     * org.apache.paimon.table.sink.KeyAndBucketExtractor} to deal with 
different bucket numbers.
+     */
+    void withIgnoreNumBucketCheck(boolean ignoreNumBucketCheck);
+
     /**
      * We detect whether it is in batch mode, if so, we do some optimization.
      *
@@ -151,6 +161,7 @@ public interface FileStoreWrite<T> extends 
Restorable<List<FileStoreWrite.State<
 
         protected final BinaryRow partition;
         protected final int bucket;
+        protected final int totalBuckets;
 
         protected final long baseSnapshotId;
         protected final long lastModifiedCommitIdentifier;
@@ -163,6 +174,7 @@ public interface FileStoreWrite<T> extends 
Restorable<List<FileStoreWrite.State<
         protected State(
                 BinaryRow partition,
                 int bucket,
+                int totalBuckets,
                 long baseSnapshotId,
                 long lastModifiedCommitIdentifier,
                 Collection<DataFileMeta> dataFiles,
@@ -172,6 +184,7 @@ public interface FileStoreWrite<T> extends 
Restorable<List<FileStoreWrite.State<
                 CommitIncrement commitIncrement) {
             this.partition = partition;
             this.bucket = bucket;
+            this.totalBuckets = totalBuckets;
             this.baseSnapshotId = baseSnapshotId;
             this.lastModifiedCommitIdentifier = lastModifiedCommitIdentifier;
             this.dataFiles = new ArrayList<>(dataFiles);
@@ -184,9 +197,10 @@ public interface FileStoreWrite<T> extends 
Restorable<List<FileStoreWrite.State<
         @Override
         public String toString() {
             return String.format(
-                    "{%s, %d, %d, %d, %s, %d, %s, %s, %s}",
+                    "{%s, %d, %d, %d, %d, %s, %d, %s, %s, %s}",
                     partition,
                     bucket,
+                    totalBuckets,
                     baseSnapshotId,
                     lastModifiedCommitIdentifier,
                     dataFiles,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessage.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessage.java
index 32a8d66aa5..47f82bcfe8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessage.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessage.java
@@ -21,6 +21,8 @@ package org.apache.paimon.table.sink;
 import org.apache.paimon.annotation.Public;
 import org.apache.paimon.data.BinaryRow;
 
+import javax.annotation.Nullable;
+
 import java.io.Serializable;
 
 /**
@@ -36,4 +38,8 @@ public interface CommitMessage extends Serializable {
 
     /** Bucket of this commit message. */
     int bucket();
+
+    /** Total number of buckets in this partition. */
+    @Nullable
+    Integer totalBuckets();
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageImpl.java
index b95e96ac1f..5831066816 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageImpl.java
@@ -26,6 +26,8 @@ import org.apache.paimon.io.DataInputViewStreamWrapper;
 import org.apache.paimon.io.DataOutputViewStreamWrapper;
 import org.apache.paimon.io.IndexIncrement;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
@@ -45,6 +47,7 @@ public class CommitMessageImpl implements CommitMessage {
 
     private transient BinaryRow partition;
     private transient int bucket;
+    private transient @Nullable Integer totalBuckets;
     private transient DataIncrement dataIncrement;
     private transient CompactIncrement compactIncrement;
     private transient IndexIncrement indexIncrement;
@@ -53,11 +56,13 @@ public class CommitMessageImpl implements CommitMessage {
     public CommitMessageImpl(
             BinaryRow partition,
             int bucket,
+            @Nullable Integer totalBuckets,
             DataIncrement dataIncrement,
             CompactIncrement compactIncrement) {
         this(
                 partition,
                 bucket,
+                totalBuckets,
                 dataIncrement,
                 compactIncrement,
                 new IndexIncrement(Collections.emptyList()));
@@ -66,11 +71,13 @@ public class CommitMessageImpl implements CommitMessage {
     public CommitMessageImpl(
             BinaryRow partition,
             int bucket,
+            @Nullable Integer totalBuckets,
             DataIncrement dataIncrement,
             CompactIncrement compactIncrement,
             IndexIncrement indexIncrement) {
         this.partition = partition;
         this.bucket = bucket;
+        this.totalBuckets = totalBuckets;
         this.dataIncrement = dataIncrement;
         this.compactIncrement = compactIncrement;
         this.indexIncrement = indexIncrement;
@@ -86,6 +93,11 @@ public class CommitMessageImpl implements CommitMessage {
         return bucket;
     }
 
+    @Override
+    public @Nullable Integer totalBuckets() {
+        return totalBuckets;
+    }
+
     public DataIncrement newFilesIncrement() {
         return dataIncrement;
     }
@@ -116,6 +128,7 @@ public class CommitMessageImpl implements CommitMessage {
         CommitMessageImpl message = (CommitMessageImpl) 
CACHE.get().deserialize(version, bytes);
         this.partition = message.partition;
         this.bucket = message.bucket;
+        this.totalBuckets = message.totalBuckets;
         this.dataIncrement = message.dataIncrement;
         this.compactIncrement = message.compactIncrement;
         this.indexIncrement = message.indexIncrement;
@@ -133,6 +146,7 @@ public class CommitMessageImpl implements CommitMessage {
         CommitMessageImpl that = (CommitMessageImpl) o;
         return bucket == that.bucket
                 && Objects.equals(partition, that.partition)
+                && Objects.equals(totalBuckets, that.totalBuckets)
                 && Objects.equals(dataIncrement, that.dataIncrement)
                 && Objects.equals(compactIncrement, that.compactIncrement)
                 && Objects.equals(indexIncrement, that.indexIncrement);
@@ -140,7 +154,8 @@ public class CommitMessageImpl implements CommitMessage {
 
     @Override
     public int hashCode() {
-        return Objects.hash(partition, bucket, dataIncrement, 
compactIncrement, indexIncrement);
+        return Objects.hash(
+                partition, bucket, totalBuckets, dataIncrement, 
compactIncrement, indexIncrement);
     }
 
     @Override
@@ -149,9 +164,10 @@ public class CommitMessageImpl implements CommitMessage {
                 "FileCommittable {"
                         + "partition = %s, "
                         + "bucket = %d, "
+                        + "totalBuckets = %s, "
                         + "newFilesIncrement = %s, "
                         + "compactIncrement = %s, "
                         + "indexIncrement = %s}",
-                partition, bucket, dataIncrement, compactIncrement, 
indexIncrement);
+                partition, bucket, totalBuckets, dataIncrement, 
compactIncrement, indexIncrement);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageLegacyV2Serializer.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageLegacyV2Serializer.java
index 5da96da765..fe1bc6adb9 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageLegacyV2Serializer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageLegacyV2Serializer.java
@@ -71,6 +71,7 @@ public class CommitMessageLegacyV2Serializer {
         return new CommitMessageImpl(
                 deserializeBinaryRow(view),
                 view.readInt(),
+                null,
                 new DataIncrement(
                         dataFileSerializer.deserializeList(view),
                         Collections.emptyList(),
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
index 1c0b67d409..4d91359ec0 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
@@ -48,7 +48,7 @@ import static 
org.apache.paimon.utils.SerializationUtils.serializeBinaryRow;
 /** {@link VersionedSerializer} for {@link CommitMessage}. */
 public class CommitMessageSerializer implements 
VersionedSerializer<CommitMessage> {
 
-    private static final int CURRENT_VERSION = 6;
+    private static final int CURRENT_VERSION = 7;
 
     private final DataFileMetaSerializer dataFileSerializer;
     private final IndexFileMetaSerializer indexEntrySerializer;
@@ -85,8 +85,18 @@ public class CommitMessageSerializer implements 
VersionedSerializer<CommitMessag
 
     private void serialize(CommitMessage obj, DataOutputView view) throws 
IOException {
         CommitMessageImpl message = (CommitMessageImpl) obj;
+
         serializeBinaryRow(obj.partition(), view);
         view.writeInt(obj.bucket());
+
+        Integer totalBuckets = obj.totalBuckets();
+        if (totalBuckets != null) {
+            view.writeBoolean(true);
+            view.writeInt(totalBuckets);
+        } else {
+            view.writeBoolean(false);
+        }
+
         
dataFileSerializer.serializeList(message.newFilesIncrement().newFiles(), view);
         
dataFileSerializer.serializeList(message.newFilesIncrement().deletedFiles(), 
view);
         
dataFileSerializer.serializeList(message.newFilesIncrement().changelogFiles(), 
view);
@@ -120,6 +130,7 @@ public class CommitMessageSerializer implements 
VersionedSerializer<CommitMessag
         return new CommitMessageImpl(
                 deserializeBinaryRow(view),
                 view.readInt(),
+                version >= 7 && view.readBoolean() ? view.readInt() : null,
                 new DataIncrement(
                         fileDeserializer.get(), fileDeserializer.get(), 
fileDeserializer.get()),
                 new CompactIncrement(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
index e91153f5bc..e4f5cea8c8 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
@@ -300,7 +300,7 @@ public class TableCommitImpl implements InnerTableCommit {
                                 f -> nonExists.test(f) ? singletonList(f) : 
emptyList(),
                                 files));
 
-        if (nonExistFiles.size() > 0) {
+        if (!nonExistFiles.isEmpty()) {
             String message =
                     String.join(
                             "\n",
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
index 5e39d3a71b..c34fca6997 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
@@ -58,12 +58,13 @@ public class DataSplit implements Split {
 
     private static final long serialVersionUID = 7L;
     private static final long MAGIC = -2394839472490812314L;
-    private static final int VERSION = 5;
+    private static final int VERSION = 6;
 
     private long snapshotId = 0;
     private BinaryRow partition;
     private int bucket = -1;
     private String bucketPath;
+    @Nullable private Integer totalBuckets;
 
     private List<DataFileMeta> beforeFiles = new ArrayList<>();
     @Nullable private List<DeletionFile> beforeDeletionFiles;
@@ -92,6 +93,10 @@ public class DataSplit implements Split {
         return bucketPath;
     }
 
+    public @Nullable Integer totalBuckets() {
+        return totalBuckets;
+    }
+
     public List<DataFileMeta> beforeFiles() {
         return beforeFiles;
     }
@@ -276,6 +281,7 @@ public class DataSplit implements Split {
                 && rawConvertible == dataSplit.rawConvertible
                 && Objects.equals(partition, dataSplit.partition)
                 && Objects.equals(bucketPath, dataSplit.bucketPath)
+                && Objects.equals(totalBuckets, dataSplit.totalBuckets)
                 && Objects.equals(beforeFiles, dataSplit.beforeFiles)
                 && Objects.equals(beforeDeletionFiles, 
dataSplit.beforeDeletionFiles)
                 && Objects.equals(dataFiles, dataSplit.dataFiles)
@@ -289,6 +295,7 @@ public class DataSplit implements Split {
                 partition,
                 bucket,
                 bucketPath,
+                totalBuckets,
                 beforeFiles,
                 beforeDeletionFiles,
                 dataFiles,
@@ -310,6 +317,7 @@ public class DataSplit implements Split {
         this.partition = other.partition;
         this.bucket = other.bucket;
         this.bucketPath = other.bucketPath;
+        this.totalBuckets = other.totalBuckets;
         this.beforeFiles = other.beforeFiles;
         this.beforeDeletionFiles = other.beforeDeletionFiles;
         this.dataFiles = other.dataFiles;
@@ -325,6 +333,12 @@ public class DataSplit implements Split {
         SerializationUtils.serializeBinaryRow(partition, out);
         out.writeInt(bucket);
         out.writeUTF(bucketPath);
+        if (totalBuckets != null) {
+            out.writeBoolean(true);
+            out.writeInt(totalBuckets);
+        } else {
+            out.writeBoolean(false);
+        }
 
         DataFileMetaSerializer dataFileSer = new DataFileMetaSerializer();
         out.writeInt(beforeFiles.size());
@@ -354,6 +368,7 @@ public class DataSplit implements Split {
         BinaryRow partition = SerializationUtils.deserializeBinaryRow(in);
         int bucket = in.readInt();
         String bucketPath = in.readUTF();
+        Integer totalBuckets = version >= 6 && in.readBoolean() ? in.readInt() 
: null;
 
         FunctionWithIOException<DataInputView, DataFileMeta> dataFileSer =
                 getFileMetaSerde(version);
@@ -385,6 +400,7 @@ public class DataSplit implements Split {
                         .withPartition(partition)
                         .withBucket(bucket)
                         .withBucketPath(bucketPath)
+                        .withTotalBuckets(totalBuckets)
                         .withBeforeFiles(beforeFiles)
                         .withDataFiles(dataFiles)
                         .isStreaming(isStreaming)
@@ -458,6 +474,11 @@ public class DataSplit implements Split {
             return this;
         }
 
+        public Builder withTotalBuckets(Integer totalBuckets) {
+            this.split.totalBuckets = totalBuckets;
+            return this;
+        }
+
         public Builder withBeforeFiles(List<DataFileMeta> beforeFiles) {
             this.split.beforeFiles = new ArrayList<>(beforeFiles);
             return this;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalDeltaStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalDeltaStartingScanner.java
index 80a56e4f0c..df837bab0a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalDeltaStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalDeltaStartingScanner.java
@@ -20,7 +20,6 @@ package org.apache.paimon.table.source.snapshot;
 
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.manifest.FileKind;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.ManifestFileMeta;
@@ -71,7 +70,7 @@ public class IncrementalDeltaStartingScanner extends 
AbstractStartingScanner {
 
     @Override
     public Result scan(SnapshotReader reader) {
-        Map<Pair<BinaryRow, Integer>, List<DataFileMeta>> grouped = new 
ConcurrentHashMap<>();
+        Map<Pair<BinaryRow, Integer>, List<ManifestEntry>> grouped = new 
ConcurrentHashMap<>();
         ManifestsReader manifestsReader = reader.manifestsReader();
 
         List<Long> snapshots =
@@ -114,29 +113,28 @@ public class IncrementalDeltaStartingScanner extends 
AbstractStartingScanner {
             ManifestEntry entry = entries.next();
             checkArgument(
                     entry.kind() == FileKind.ADD, "Delta or changelog should 
only have ADD files.");
-            grouped.compute(
-                    Pair.of(entry.partition(), entry.bucket()),
-                    (key, files) -> {
-                        if (files == null) {
-                            files = new ArrayList<>();
-                        }
-                        files.add(entry.file());
-                        return files;
-                    });
+            grouped.computeIfAbsent(
+                            Pair.of(entry.partition(), entry.bucket()), ignore 
-> new ArrayList<>())
+                    .add(entry);
         }
 
         List<Split> result = new ArrayList<>();
-        for (Map.Entry<Pair<BinaryRow, Integer>, List<DataFileMeta>> entry : 
grouped.entrySet()) {
+        for (Map.Entry<Pair<BinaryRow, Integer>, List<ManifestEntry>> entry : 
grouped.entrySet()) {
             BinaryRow partition = entry.getKey().getLeft();
             int bucket = entry.getKey().getRight();
             String bucketPath = reader.pathFactory().bucketPath(partition, 
bucket).toString();
             for (SplitGenerator.SplitGroup splitGroup :
-                    reader.splitGenerator().splitForBatch(entry.getValue())) {
+                    reader.splitGenerator()
+                            .splitForBatch(
+                                    entry.getValue().stream()
+                                            .map(ManifestEntry::file)
+                                            .collect(Collectors.toList()))) {
                 DataSplit.Builder dataSplitBuilder =
                         DataSplit.builder()
                                 .withSnapshot(endingSnapshotId)
                                 .withPartition(partition)
                                 .withBucket(bucket)
+                                
.withTotalBuckets(entry.getValue().get(0).totalBuckets())
                                 .withDataFiles(splitGroup.files)
                                 .rawConvertible(splitGroup.rawConvertible)
                                 .withBucketPath(bucketPath);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index 649dc5b1e4..931e9d4eba 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -65,6 +65,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
 
 import static org.apache.paimon.Snapshot.FIRST_SNAPSHOT_ID;
 import static 
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
@@ -311,17 +312,17 @@ public class SnapshotReaderImpl implements SnapshotReader 
{
         FileStoreScan.Plan plan = scan.plan();
         @Nullable Snapshot snapshot = plan.snapshot();
 
-        Map<BinaryRow, Map<Integer, List<DataFileMeta>>> files =
+        Map<BinaryRow, Map<Integer, List<ManifestEntry>>> grouped =
                 groupByPartFiles(plan.files(FileKind.ADD));
         if (options.scanPlanSortPartition()) {
-            Map<BinaryRow, Map<Integer, List<DataFileMeta>>> newFiles = new 
LinkedHashMap<>();
-            files.entrySet().stream()
+            Map<BinaryRow, Map<Integer, List<ManifestEntry>>> sorted = new 
LinkedHashMap<>();
+            grouped.entrySet().stream()
                     .sorted((o1, o2) -> 
partitionComparator().compare(o1.getKey(), o2.getKey()))
-                    .forEach(entry -> newFiles.put(entry.getKey(), 
entry.getValue()));
-            files = newFiles;
+                    .forEach(entry -> sorted.put(entry.getKey(), 
entry.getValue()));
+            grouped = sorted;
         }
         List<DataSplit> splits =
-                generateSplits(snapshot, scanMode != ScanMode.ALL, 
splitGenerator, files);
+                generateSplits(snapshot, scanMode != ScanMode.ALL, 
splitGenerator, grouped);
         return new PlanImpl(
                 plan.watermark(), snapshot == null ? null : snapshot.id(), 
(List) splits);
     }
@@ -330,7 +331,7 @@ public class SnapshotReaderImpl implements SnapshotReader {
             @Nullable Snapshot snapshot,
             boolean isStreaming,
             SplitGenerator splitGenerator,
-            Map<BinaryRow, Map<Integer, List<DataFileMeta>>> groupedDataFiles) 
{
+            Map<BinaryRow, Map<Integer, List<ManifestEntry>>> 
groupedManifestEntries) {
         List<DataSplit> splits = new ArrayList<>();
         // Read deletion indexes at once to reduce file IO
         Map<Pair<BinaryRow, Integer>, List<IndexFileMeta>> 
deletionIndexFilesMap = null;
@@ -338,22 +339,28 @@ public class SnapshotReaderImpl implements SnapshotReader 
{
             deletionIndexFilesMap =
                     deletionVectors && snapshot != null
                             ? indexFileHandler.scan(
-                                    snapshot, DELETION_VECTORS_INDEX, 
groupedDataFiles.keySet())
+                                    snapshot,
+                                    DELETION_VECTORS_INDEX,
+                                    groupedManifestEntries.keySet())
                             : Collections.emptyMap();
         }
-        for (Map.Entry<BinaryRow, Map<Integer, List<DataFileMeta>>> entry :
-                groupedDataFiles.entrySet()) {
+        for (Map.Entry<BinaryRow, Map<Integer, List<ManifestEntry>>> entry :
+                groupedManifestEntries.entrySet()) {
             BinaryRow partition = entry.getKey();
-            Map<Integer, List<DataFileMeta>> buckets = entry.getValue();
-            for (Map.Entry<Integer, List<DataFileMeta>> bucketEntry : 
buckets.entrySet()) {
+            Map<Integer, List<ManifestEntry>> buckets = entry.getValue();
+            for (Map.Entry<Integer, List<ManifestEntry>> bucketEntry : 
buckets.entrySet()) {
                 int bucket = bucketEntry.getKey();
-                List<DataFileMeta> bucketFiles = bucketEntry.getValue();
+                List<DataFileMeta> bucketFiles =
+                        bucketEntry.getValue().stream()
+                                .map(ManifestEntry::file)
+                                .collect(Collectors.toList());
                 DataSplit.Builder builder =
                         DataSplit.builder()
                                 .withSnapshot(
                                         snapshot == null ? FIRST_SNAPSHOT_ID - 
1 : snapshot.id())
                                 .withPartition(partition)
                                 .withBucket(bucket)
+                                
.withTotalBuckets(bucketEntry.getValue().get(0).totalBuckets())
                                 .isStreaming(isStreaming);
                 List<SplitGenerator.SplitGroup> splitGroups =
                         isStreaming
@@ -406,9 +413,9 @@ public class SnapshotReaderImpl implements SnapshotReader {
         withMode(ScanMode.DELTA);
         FileStoreScan.Plan plan = scan.plan();
 
-        Map<BinaryRow, Map<Integer, List<DataFileMeta>>> beforeFiles =
+        Map<BinaryRow, Map<Integer, List<ManifestEntry>>> beforeFiles =
                 groupByPartFiles(plan.files(FileKind.DELETE));
-        Map<BinaryRow, Map<Integer, List<DataFileMeta>>> dataFiles =
+        Map<BinaryRow, Map<Integer, List<ManifestEntry>>> dataFiles =
                 groupByPartFiles(plan.files(FileKind.ADD));
         Snapshot beforeSnapshot = 
snapshotManager.snapshot(plan.snapshot().id() - 1);
         return toChangesPlan(true, plan, beforeSnapshot, beforeFiles, 
dataFiles);
@@ -418,8 +425,8 @@ public class SnapshotReaderImpl implements SnapshotReader {
             boolean isStreaming,
             FileStoreScan.Plan plan,
             Snapshot beforeSnapshot,
-            Map<BinaryRow, Map<Integer, List<DataFileMeta>>> beforeFiles,
-            Map<BinaryRow, Map<Integer, List<DataFileMeta>>> dataFiles) {
+            Map<BinaryRow, Map<Integer, List<ManifestEntry>>> beforeFiles,
+            Map<BinaryRow, Map<Integer, List<ManifestEntry>>> dataFiles) {
         Snapshot snapshot = plan.snapshot();
         List<DataSplit> splits = new ArrayList<>();
         Map<BinaryRow, Set<Integer>> buckets = new HashMap<>();
@@ -450,23 +457,38 @@ public class SnapshotReaderImpl implements SnapshotReader 
{
         for (Map.Entry<BinaryRow, Set<Integer>> entry : buckets.entrySet()) {
             BinaryRow part = entry.getKey();
             for (Integer bucket : entry.getValue()) {
-                List<DataFileMeta> before =
+                List<ManifestEntry> beforeEntries =
                         beforeFiles
                                 .getOrDefault(part, Collections.emptyMap())
                                 .getOrDefault(bucket, Collections.emptyList());
-                List<DataFileMeta> data =
+                List<ManifestEntry> dataEntries =
                         dataFiles
                                 .getOrDefault(part, Collections.emptyMap())
                                 .getOrDefault(bucket, Collections.emptyList());
 
                 // deduplicate
-                before.removeIf(data::remove);
+                beforeEntries.removeIf(dataEntries::remove);
+
+                Integer totalBuckets = null;
+                if (!dataEntries.isEmpty()) {
+                    totalBuckets = dataEntries.get(0).totalBuckets();
+                } else if (!beforeEntries.isEmpty()) {
+                    totalBuckets = beforeEntries.get(0).totalBuckets();
+                }
+
+                List<DataFileMeta> before =
+                        beforeEntries.stream()
+                                .map(ManifestEntry::file)
+                                .collect(Collectors.toList());
+                List<DataFileMeta> data =
+                        
dataEntries.stream().map(ManifestEntry::file).collect(Collectors.toList());
 
                 DataSplit.Builder builder =
                         DataSplit.builder()
                                 .withSnapshot(snapshot.id())
                                 .withPartition(part)
                                 .withBucket(bucket)
+                                .withTotalBuckets(totalBuckets)
                                 .withBeforeFiles(before)
                                 .withDataFiles(data)
                                 .isStreaming(isStreaming)
@@ -497,9 +519,9 @@ public class SnapshotReaderImpl implements SnapshotReader {
     public Plan readIncrementalDiff(Snapshot before) {
         withMode(ScanMode.ALL);
         FileStoreScan.Plan plan = scan.plan();
-        Map<BinaryRow, Map<Integer, List<DataFileMeta>>> dataFiles =
+        Map<BinaryRow, Map<Integer, List<ManifestEntry>>> dataFiles =
                 groupByPartFiles(plan.files(FileKind.ADD));
-        Map<BinaryRow, Map<Integer, List<DataFileMeta>>> beforeFiles =
+        Map<BinaryRow, Map<Integer, List<ManifestEntry>>> beforeFiles =
                 
groupByPartFiles(scan.withSnapshot(before).plan().files(FileKind.ADD));
         return toChangesPlan(false, plan, before, beforeFiles, dataFiles);
     }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java 
b/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java
index 3c794e146c..aa629a67bc 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java
@@ -109,6 +109,7 @@ public class TestAppendFileStore extends 
AppendOnlyFileStore {
         return new CommitMessageImpl(
                 partition,
                 bucket,
+                options().bucket(),
                 DataIncrement.emptyIncrement(),
                 CompactIncrement.emptyIncrement(),
                 new IndexIncrement(Collections.emptyList(), indexFileMetas));
@@ -143,6 +144,7 @@ public class TestAppendFileStore extends 
AppendOnlyFileStore {
         return new CommitMessageImpl(
                 partition,
                 bucket,
+                options().bucket(),
                 DataIncrement.emptyIncrement(),
                 CompactIncrement.emptyIncrement(),
                 new IndexIncrement(dvMaintainer.writeDeletionVectorsIndex()));
diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java 
b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
index 0da4489e3f..3e09b5a887 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
@@ -340,6 +340,7 @@ public class TestFileStore extends KeyValueFileStore {
                         new CommitMessageImpl(
                                 entryWithPartition.getKey(),
                                 entryWithBucket.getKey(),
+                                options().bucket(),
                                 increment.newFilesIncrement(),
                                 increment.compactIncrement(),
                                 new IndexIncrement(indexFiles)));
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainerTest.java
index 07446faab6..21ff27ae99 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainerTest.java
@@ -94,6 +94,7 @@ public class DeletionVectorsMaintainerTest extends 
PrimaryKeyTableTestBase {
                 new CommitMessageImpl(
                         BinaryRow.EMPTY_ROW,
                         0,
+                        1,
                         DataIncrement.emptyIncrement(),
                         CompactIncrement.emptyIncrement(),
                         new IndexIncrement(fileMetas1));
@@ -115,6 +116,7 @@ public class DeletionVectorsMaintainerTest extends 
PrimaryKeyTableTestBase {
                 new CommitMessageImpl(
                         BinaryRow.EMPTY_ROW,
                         0,
+                        1,
                         DataIncrement.emptyIncrement(),
                         CompactIncrement.emptyIncrement(),
                         new IndexIncrement(fileMetas2));
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java 
b/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java
index df71f204b7..43f594073f 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java
@@ -208,10 +208,12 @@ public class HashBucketAssignerTest extends 
PrimaryKeyTableTestBase {
         assertThat(assigner.currentPartitions()).contains(row(2));
     }
 
-    private CommitMessage createCommitMessage(BinaryRow partition, int bucket, 
IndexFileMeta file) {
+    private CommitMessage createCommitMessage(
+            BinaryRow partition, int bucket, int totalBuckets, IndexFileMeta 
file) {
         return new CommitMessageImpl(
                 partition,
                 bucket,
+                totalBuckets,
                 new DataIncrement(
                         Collections.emptyList(), Collections.emptyList(), 
Collections.emptyList()),
                 new CompactIncrement(
@@ -226,8 +228,8 @@ public class HashBucketAssignerTest extends 
PrimaryKeyTableTestBase {
         commit.commit(
                 0,
                 Arrays.asList(
-                        createCommitMessage(row(1), 0, bucket0),
-                        createCommitMessage(row(1), 2, bucket2)));
+                        createCommitMessage(row(1), 0, 3, bucket0),
+                        createCommitMessage(row(1), 2, 3, bucket2)));
 
         HashBucketAssigner assigner0 = createAssigner(3, 3, 0);
         HashBucketAssigner assigner2 = createAssigner(3, 3, 2);
@@ -252,8 +254,8 @@ public class HashBucketAssignerTest extends 
PrimaryKeyTableTestBase {
         commit.commit(
                 0,
                 Arrays.asList(
-                        createCommitMessage(row(1), 0, bucket0),
-                        createCommitMessage(row(1), 2, bucket2)));
+                        createCommitMessage(row(1), 0, 3, bucket0),
+                        createCommitMessage(row(1), 2, 3, bucket2)));
 
         HashBucketAssigner assigner0 = createAssigner(3, 3, 0, 1);
         HashBucketAssigner assigner2 = createAssigner(3, 3, 2, 1);
@@ -311,8 +313,10 @@ public class HashBucketAssignerTest extends 
PrimaryKeyTableTestBase {
         commit.commit(
                 0,
                 Arrays.asList(
-                        createCommitMessage(row(1), 0, 
fileHandler.writeHashIndex(new int[] {0})),
-                        createCommitMessage(row(2), 0, 
fileHandler.writeHashIndex(new int[] {0}))));
+                        createCommitMessage(
+                                row(1), 0, 1, fileHandler.writeHashIndex(new 
int[] {0})),
+                        createCommitMessage(
+                                row(2), 0, 1, fileHandler.writeHashIndex(new 
int[] {0}))));
         
assertThat(assigner.currentPartitions()).containsExactlyInAnyOrder(row(1), 
row(2));
 
         // checkpoint 1, but no commit
@@ -328,7 +332,8 @@ public class HashBucketAssignerTest extends 
PrimaryKeyTableTestBase {
         commit.commit(
                 1,
                 Collections.singletonList(
-                        createCommitMessage(row(1), 0, 
fileHandler.writeHashIndex(new int[] {1}))));
+                        createCommitMessage(
+                                row(1), 0, 1, fileHandler.writeHashIndex(new 
int[] {1}))));
         assigner.prepareCommit(3);
         assertThat(assigner.currentPartitions()).isEmpty();
     }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java
index 1544e879c7..b33055e9d5 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java
@@ -45,6 +45,78 @@ import static org.assertj.core.api.Assertions.assertThat;
 /** Compatibility Test for {@link ManifestCommittableSerializer}. */
 public class ManifestCommittableSerializerCompatibilityTest {
 
+    @Test
+    public void testCompatibilityToV3CommitV7() throws IOException {
+        SimpleStats keyStats =
+                new SimpleStats(
+                        singleColumn("min_key"),
+                        singleColumn("max_key"),
+                        fromLongArray(new Long[] {0L}));
+        SimpleStats valueStats =
+                new SimpleStats(
+                        singleColumn("min_value"),
+                        singleColumn("max_value"),
+                        fromLongArray(new Long[] {0L}));
+        DataFileMeta dataFile =
+                new DataFileMeta(
+                        "my_file",
+                        1024 * 1024,
+                        1024,
+                        singleColumn("min_key"),
+                        singleColumn("max_key"),
+                        keyStats,
+                        valueStats,
+                        15,
+                        200,
+                        5,
+                        3,
+                        Arrays.asList("extra1", "extra2"),
+                        
Timestamp.fromLocalDateTime(LocalDateTime.parse("2022-03-02T20:20:12")),
+                        11L,
+                        new byte[] {1, 2, 4},
+                        FileSource.COMPACT,
+                        Arrays.asList("field1", "field2", "field3"),
+                        "hdfs://localhost:9000/path/to/file");
+        List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
+
+        LinkedHashMap<String, DeletionVectorMeta> dvMetas = new 
LinkedHashMap<>();
+        dvMetas.put("dv_key1", new DeletionVectorMeta("dv_key1", 1, 2, 3L));
+        dvMetas.put("dv_key2", new DeletionVectorMeta("dv_key2", 3, 4, 5L));
+        IndexFileMeta indexFile =
+                new IndexFileMeta("my_index_type", "my_index_file", 1024 * 
100, 1002, dvMetas);
+        List<IndexFileMeta> indexFiles = Collections.singletonList(indexFile);
+
+        CommitMessageImpl commitMessage =
+                new CommitMessageImpl(
+                        singleColumn("my_partition"),
+                        11,
+                        16,
+                        new DataIncrement(dataFiles, dataFiles, dataFiles),
+                        new CompactIncrement(dataFiles, dataFiles, dataFiles),
+                        new IndexIncrement(indexFiles));
+
+        ManifestCommittable manifestCommittable =
+                new ManifestCommittable(
+                        5,
+                        202020L,
+                        Collections.singletonMap(5, 555L),
+                        Collections.singletonList(commitMessage));
+
+        ManifestCommittableSerializer serializer = new 
ManifestCommittableSerializer();
+        byte[] bytes = serializer.serialize(manifestCommittable);
+        ManifestCommittable deserialized = serializer.deserialize(3, bytes);
+        assertThat(deserialized).isEqualTo(manifestCommittable);
+
+        byte[] oldBytes =
+                IOUtils.readFully(
+                        ManifestCommittableSerializerCompatibilityTest.class
+                                .getClassLoader()
+                                
.getResourceAsStream("compatibility/manifest-committable-v7"),
+                        true);
+        deserialized = serializer.deserialize(3, oldBytes);
+        assertThat(deserialized).isEqualTo(manifestCommittable);
+    }
+
     @Test
     public void testCompatibilityToV3CommitV6() throws IOException {
         SimpleStats keyStats =
@@ -90,6 +162,7 @@ public class ManifestCommittableSerializerCompatibilityTest {
                 new CommitMessageImpl(
                         singleColumn("my_partition"),
                         11,
+                        null,
                         new DataIncrement(dataFiles, dataFiles, dataFiles),
                         new CompactIncrement(dataFiles, dataFiles, dataFiles),
                         new IndexIncrement(indexFiles));
@@ -161,6 +234,7 @@ public class ManifestCommittableSerializerCompatibilityTest 
{
                 new CommitMessageImpl(
                         singleColumn("my_partition"),
                         11,
+                        null,
                         new DataIncrement(dataFiles, dataFiles, dataFiles),
                         new CompactIncrement(dataFiles, dataFiles, dataFiles),
                         new IndexIncrement(indexFiles));
@@ -231,6 +305,7 @@ public class ManifestCommittableSerializerCompatibilityTest 
{
                 new CommitMessageImpl(
                         singleColumn("my_partition"),
                         11,
+                        null,
                         new DataIncrement(dataFiles, dataFiles, dataFiles),
                         new CompactIncrement(dataFiles, dataFiles, dataFiles),
                         new IndexIncrement(indexFiles));
@@ -302,6 +377,7 @@ public class ManifestCommittableSerializerCompatibilityTest 
{
                 new CommitMessageImpl(
                         singleColumn("my_partition"),
                         11,
+                        null,
                         new DataIncrement(dataFiles, dataFiles, dataFiles),
                         new CompactIncrement(dataFiles, dataFiles, dataFiles),
                         new IndexIncrement(indexFiles));
@@ -373,6 +449,7 @@ public class ManifestCommittableSerializerCompatibilityTest 
{
                 new CommitMessageImpl(
                         singleColumn("my_partition"),
                         11,
+                        null,
                         new DataIncrement(dataFiles, dataFiles, dataFiles),
                         new CompactIncrement(dataFiles, dataFiles, dataFiles),
                         new IndexIncrement(indexFiles));
@@ -441,6 +518,7 @@ public class ManifestCommittableSerializerCompatibilityTest 
{
                 new CommitMessageImpl(
                         singleColumn("my_partition"),
                         11,
+                        null,
                         new DataIncrement(dataFiles, Collections.emptyList(), 
dataFiles),
                         new CompactIncrement(dataFiles, dataFiles, dataFiles),
                         new IndexIncrement(indexFiles));
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java
index 8de8309bc8..6b2212d983 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java
@@ -28,9 +28,7 @@ import org.apache.paimon.table.sink.CommitMessageImpl;
 import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.List;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -61,23 +59,22 @@ public class ManifestCommittableSerializerTest {
                 rnd.nextBoolean()
                         ? new ManifestCommittable(rnd.nextLong(), 
rnd.nextLong())
                         : new ManifestCommittable(rnd.nextLong(), null);
-        addFileCommittables(committable, row(0), 0);
-        addFileCommittables(committable, row(0), 1);
-        addFileCommittables(committable, row(1), 0);
-        addFileCommittables(committable, row(1), 1);
+        addFileCommittables(committable, row(0), 0, 2);
+        addFileCommittables(committable, row(0), 1, 2);
+        addFileCommittables(committable, row(1), 0, 2);
+        addFileCommittables(committable, row(1), 1, 2);
         return committable;
     }
 
     private static void addFileCommittables(
-            ManifestCommittable committable, BinaryRow partition, int bucket) {
-        List<CommitMessage> commitMessages = new ArrayList<>();
+            ManifestCommittable committable, BinaryRow partition, int bucket, 
int totalBuckets) {
         int length = ThreadLocalRandom.current().nextInt(10) + 1;
         for (int i = 0; i < length; i++) {
             DataIncrement dataIncrement = randomNewFilesIncrement();
             CompactIncrement compactIncrement = randomCompactIncrement();
             CommitMessage commitMessage =
-                    new CommitMessageImpl(partition, bucket, dataIncrement, 
compactIncrement);
-            commitMessages.add(commitMessage);
+                    new CommitMessageImpl(
+                            partition, bucket, totalBuckets, dataIncrement, 
compactIncrement);
             committable.addFileCommittable(commitMessage);
         }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreTestUtils.java 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreTestUtils.java
index dc2af066e9..deab3a9703 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreTestUtils.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreTestUtils.java
@@ -100,6 +100,7 @@ public class FileStoreTestUtils {
                         new CommitMessageImpl(
                                 entryWithPartition.getKey(),
                                 entryWithBucket.getKey(),
+                                store.options().bucket(),
                                 increment.newFilesIncrement(),
                                 increment.compactIncrement()));
             }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
index aba37d4cce..901a93737b 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
@@ -330,6 +330,7 @@ public class PartitionExpireTest {
                 new CommitMessageImpl(
                         message.partition(),
                         message.bucket(),
+                        message.totalBuckets(),
                         new DataIncrement(emptyList(), emptyList(), 
emptyList()),
                         new CompactIncrement(singletonList(file), emptyList(), 
emptyList()));
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/TestCommitThread.java 
b/paimon-core/src/test/java/org/apache/paimon/operation/TestCommitThread.java
index ba735594a9..74c652b052 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/TestCommitThread.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/TestCommitThread.java
@@ -58,6 +58,8 @@ public class TestCommitThread extends Thread {
     private final RowType valueType;
     private final boolean enableOverwrite;
     private final Map<BinaryRow, List<KeyValue>> data;
+    private final int totalBuckets;
+
     private final Map<BinaryRow, List<KeyValue>> result;
     private final Map<BinaryRow, MergeTreeWriter> writers;
     private final Set<BinaryRow> writtenPartitions;
@@ -79,6 +81,8 @@ public class TestCommitThread extends Thread {
         this.valueType = valueType;
         this.enableOverwrite = enableOverwrite;
         this.data = data;
+        this.totalBuckets = testStore.options().bucket();
+
         this.result = new HashMap<>();
         this.writers = new HashMap<>();
         this.writtenPartitions = new HashSet<>();
@@ -140,7 +144,11 @@ public class TestCommitThread extends Thread {
             CommitIncrement inc = entry.getValue().prepareCommit(true);
             committable.addFileCommittable(
                     new CommitMessageImpl(
-                            entry.getKey(), 0, inc.newFilesIncrement(), 
inc.compactIncrement()));
+                            entry.getKey(),
+                            0,
+                            totalBuckets,
+                            inc.newFilesIncrement(),
+                            inc.compactIncrement()));
         }
 
         runWithRetry(committable, () -> commit.commit(committable, 
Collections.emptyMap()));
@@ -152,7 +160,11 @@ public class TestCommitThread extends Thread {
         CommitIncrement inc = writers.get(partition).prepareCommit(true);
         committable.addFileCommittable(
                 new CommitMessageImpl(
-                        partition, 0, inc.newFilesIncrement(), 
inc.compactIncrement()));
+                        partition,
+                        0,
+                        totalBuckets,
+                        inc.newFilesIncrement(),
+                        inc.compactIncrement()));
 
         runWithRetry(
                 committable,
@@ -175,7 +187,11 @@ public class TestCommitThread extends Thread {
                     CommitIncrement inc = writer.prepareCommit(true);
                     committable.addFileCommittable(
                             new CommitMessageImpl(
-                                    partition, 0, inc.newFilesIncrement(), 
inc.compactIncrement()));
+                                    partition,
+                                    0,
+                                    totalBuckets,
+                                    inc.newFilesIncrement(),
+                                    inc.compactIncrement()));
                 }
                 commit.commit(committable, Collections.emptyMap());
                 break;
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/sink/CommitMessageSerializerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/sink/CommitMessageSerializerTest.java
index 1f87838aea..351af6a3b7 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/sink/CommitMessageSerializerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/sink/CommitMessageSerializerTest.java
@@ -39,6 +39,7 @@ public class CommitMessageSerializerTest {
     @Test
     public void test() throws IOException {
         CommitMessageSerializer serializer = new CommitMessageSerializer();
+
         DataIncrement dataIncrement = randomNewFilesIncrement();
         CompactIncrement compactIncrement = randomCompactIncrement();
         IndexIncrement indexIncrement =
@@ -46,9 +47,14 @@ public class CommitMessageSerializerTest {
                         Arrays.asList(randomIndexFile(), randomIndexFile()),
                         Arrays.asList(randomIndexFile(), randomIndexFile()));
         CommitMessageImpl committable =
-                new CommitMessageImpl(row(0), 1, dataIncrement, 
compactIncrement, indexIncrement);
+                new CommitMessageImpl(
+                        row(0), 1, 2, dataIncrement, compactIncrement, 
indexIncrement);
+
         CommitMessageImpl newCommittable =
-                (CommitMessageImpl) serializer.deserialize(5, 
serializer.serialize(committable));
+                (CommitMessageImpl) serializer.deserialize(7, 
serializer.serialize(committable));
+        
assertThat(newCommittable.partition()).isEqualTo(committable.partition());
+        assertThat(newCommittable.bucket()).isEqualTo(committable.bucket());
+        
assertThat(newCommittable.totalBuckets()).isEqualTo(committable.totalBuckets());
         
assertThat(newCommittable.compactIncrement()).isEqualTo(committable.compactIncrement());
         
assertThat(newCommittable.newFilesIncrement()).isEqualTo(committable.newFilesIncrement());
         
assertThat(newCommittable.indexIncrement()).isEqualTo(committable.indexIncrement());
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java
index a87a645711..7213f11ca0 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java
@@ -511,6 +511,74 @@ public class SplitTest {
         assertThat(actual).isEqualTo(split);
     }
 
+    @Test
+    public void testSerializerCompatibleV6() throws Exception {
+        SimpleStats keyStats =
+                new SimpleStats(
+                        singleColumn("min_key"),
+                        singleColumn("max_key"),
+                        fromLongArray(new Long[] {0L}));
+        SimpleStats valueStats =
+                new SimpleStats(
+                        singleColumn("min_value"),
+                        singleColumn("max_value"),
+                        fromLongArray(new Long[] {0L}));
+
+        DataFileMeta dataFile =
+                new DataFileMeta(
+                        "my_file",
+                        1024 * 1024,
+                        1024,
+                        singleColumn("min_key"),
+                        singleColumn("max_key"),
+                        keyStats,
+                        valueStats,
+                        15,
+                        200,
+                        5,
+                        3,
+                        Arrays.asList("extra1", "extra2"),
+                        
Timestamp.fromLocalDateTime(LocalDateTime.parse("2022-03-02T20:20:12")),
+                        11L,
+                        new byte[] {1, 2, 4},
+                        FileSource.COMPACT,
+                        Arrays.asList("field1", "field2", "field3"),
+                        "hdfs:///path/to/warehouse");
+        List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
+
+        DeletionFile deletionFile = new DeletionFile("deletion_file", 100, 22, 
33L);
+        List<DeletionFile> deletionFiles = 
Collections.singletonList(deletionFile);
+
+        BinaryRow partition = new BinaryRow(1);
+        BinaryRowWriter binaryRowWriter = new BinaryRowWriter(partition);
+        binaryRowWriter.writeString(0, BinaryString.fromString("aaaaa"));
+        binaryRowWriter.complete();
+
+        DataSplit split =
+                DataSplit.builder()
+                        .withSnapshot(18)
+                        .withPartition(partition)
+                        .withBucket(20)
+                        .withTotalBuckets(32)
+                        .withDataFiles(dataFiles)
+                        .withDataDeletionFiles(deletionFiles)
+                        .withBucketPath("my path")
+                        .build();
+
+        assertThat(InstantiationUtil.clone(split)).isEqualTo(split);
+
+        byte[] v6Bytes =
+                IOUtils.readFully(
+                        SplitTest.class
+                                .getClassLoader()
+                                
.getResourceAsStream("compatibility/datasplit-v6"),
+                        true);
+
+        DataSplit actual =
+                InstantiationUtil.deserializeObject(v6Bytes, 
DataSplit.class.getClassLoader());
+        assertThat(actual).isEqualTo(split);
+    }
+
     private DataFileMeta newDataFile(long rowCount) {
         return newDataFile(rowCount, null, null);
     }
diff --git a/paimon-core/src/test/resources/compatibility/datasplit-v6 
b/paimon-core/src/test/resources/compatibility/datasplit-v6
new file mode 100644
index 0000000000..91cbc60001
Binary files /dev/null and 
b/paimon-core/src/test/resources/compatibility/datasplit-v6 differ
diff --git 
a/paimon-core/src/test/resources/compatibility/manifest-committable-v7 
b/paimon-core/src/test/resources/compatibility/manifest-committable-v7
new file mode 100644
index 0000000000..31566ca7fd
Binary files /dev/null and 
b/paimon-core/src/test/resources/compatibility/manifest-committable-v7 differ
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
index 91bb30b45b..316655613b 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
@@ -58,6 +58,7 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -239,9 +240,12 @@ public class CompactAction extends TableActionBase {
                 whereSql == null,
                 "Postpone bucket compaction currently does not support 
predicates");
 
+        Options options = new Options(table.options());
+        int defaultBucketNum = 
options.get(FlinkConnectorOptions.POSTPONE_DEFAULT_BUCKET_NUM);
+
         // change bucket to a positive value, so we can scan files from the 
bucket = -2 directory
         Map<String, String> bucketOptions = new HashMap<>(table.options());
-        bucketOptions.put(CoreOptions.BUCKET.key(), "1");
+        bucketOptions.put(CoreOptions.BUCKET.key(), 
String.valueOf(defaultBucketNum));
         FileStoreTable fileStoreTable = 
table.copy(table.schema().copy(bucketOptions));
 
         List<BinaryRow> partitions =
@@ -253,15 +257,16 @@ public class CompactAction extends TableActionBase {
             return false;
         }
 
-        Options options = new Options(fileStoreTable.options());
         InternalRowPartitionComputer partitionComputer =
                 new InternalRowPartitionComputer(
                         fileStoreTable.coreOptions().partitionDefaultName(),
                         fileStoreTable.rowType(),
                         fileStoreTable.partitionKeys().toArray(new String[0]),
                         fileStoreTable.coreOptions().legacyPartitionName());
+        String commitUser = CoreOptions.createCommitUser(options);
+        List<DataStream<Committable>> dataStreams = new ArrayList<>();
         for (BinaryRow partition : partitions) {
-            int bucketNum = 
options.get(FlinkConnectorOptions.POSTPONE_DEFAULT_BUCKET_NUM);
+            int bucketNum = defaultBucketNum;
 
             Iterator<ManifestEntry> it =
                     table.newSnapshotReader()
@@ -295,8 +300,6 @@ public class CompactAction extends TableActionBase {
                             new RowDataChannelComputer(realTable.schema(), 
false),
                             null);
             FixedBucketSink sink = new FixedBucketSink(realTable, null, null);
-            String commitUser =
-                    
CoreOptions.createCommitUser(realTable.coreOptions().toConfiguration());
             DataStream<Committable> written =
                     sink.doWrite(partitioned, commitUser, 
partitioned.getParallelism())
                             .forward()
@@ -304,9 +307,16 @@ public class CompactAction extends TableActionBase {
                                     "Rewrite compact committable",
                                     new CommittableTypeInfo(),
                                     new 
RewritePostponeBucketCommittableOperator(realTable));
-            sink.doCommit(written.union(sourcePair.getRight()), commitUser);
+            dataStreams.add(written);
+            dataStreams.add(sourcePair.getRight());
         }
 
+        FixedBucketSink sink = new FixedBucketSink(fileStoreTable, null, null);
+        DataStream<Committable> dataStream = dataStreams.get(0);
+        for (int i = 1; i < dataStreams.size(); i++) {
+            dataStream = dataStream.union(dataStreams.get(i));
+        }
+        sink.doCommit(dataStream, commitUser);
         return true;
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveUnexistingFilesAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveUnexistingFilesAction.java
index 8d014d66ee..996182e0b3 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveUnexistingFilesAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveUnexistingFilesAction.java
@@ -174,6 +174,7 @@ public class RemoveUnexistingFilesAction extends 
TableActionBase {
                         new CommitMessageImpl(
                                 reuse,
                                 entry.getKey(),
+                                table.coreOptions().bucket(),
                                 new DataIncrement(
                                         Collections.emptyList(),
                                         new 
ArrayList<>(entry.getValue().values()),
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketNewFilesCompactionCoordinatorOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketNewFilesCompactionCoordinatorOperator.java
index c52c0b2d85..a906f9b414 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketNewFilesCompactionCoordinatorOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketNewFilesCompactionCoordinatorOperator.java
@@ -61,6 +61,7 @@ public class 
UnawareBucketNewFilesCompactionCoordinatorOperator
 
     private final long targetFileSize;
     private final long compactionFileSize;
+    private final int totalBuckets;
 
     private transient UnawareBucketNewFilesCompactionCoordinator coordinator;
     private transient long checkpointId;
@@ -68,6 +69,7 @@ public class 
UnawareBucketNewFilesCompactionCoordinatorOperator
     public UnawareBucketNewFilesCompactionCoordinatorOperator(CoreOptions 
options) {
         this.targetFileSize = options.targetFileSize(false);
         this.compactionFileSize = options.compactionFileSize(false);
+        this.totalBuckets = options.bucket();
     }
 
     @Override
@@ -124,6 +126,7 @@ public class 
UnawareBucketNewFilesCompactionCoordinatorOperator
                 new CommitMessageImpl(
                         message.partition(),
                         message.bucket(),
+                        message.totalBuckets(),
                         new DataIncrement(
                                 skippedFiles,
                                 message.newFilesIncrement().deletedFiles(),
@@ -163,6 +166,7 @@ public class 
UnawareBucketNewFilesCompactionCoordinatorOperator
                         new CommitMessageImpl(
                                 p.getKey(),
                                 BucketMode.UNAWARE_BUCKET,
+                                totalBuckets,
                                 new DataIncrement(
                                         
Collections.singletonList(p.getValue().get(0)),
                                         Collections.emptyList(),
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketNewFilesCompactionWorkerOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketNewFilesCompactionWorkerOperator.java
index d634d22ec6..2ada98f6b9 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketNewFilesCompactionWorkerOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketNewFilesCompactionWorkerOperator.java
@@ -102,6 +102,7 @@ public class UnawareBucketNewFilesCompactionWorkerOperator
         return new CommitMessageImpl(
                 message.partition(),
                 message.bucket(),
+                message.totalBuckets(),
                 new DataIncrement(
                         message.compactIncrement().compactAfter(),
                         Collections.emptyList(),
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperator.java
index d1f6a5abd2..9f6bd4431d 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperator.java
@@ -107,6 +107,7 @@ public class ChangelogCompactCoordinateOperator
                 new CommitMessageImpl(
                         message.partition(),
                         message.bucket(),
+                        message.totalBuckets(),
                         new DataIncrement(
                                 message.newFilesIncrement().newFiles(),
                                 message.newFilesIncrement().deletedFiles(),
@@ -137,6 +138,7 @@ public class ChangelogCompactCoordinateOperator
                                 new ChangelogCompactTask(
                                         checkpointId,
                                         partition,
+                                        table.coreOptions().bucket(),
                                         
partitionChangelog.newFileChangelogFiles,
                                         
partitionChangelog.compactChangelogFiles))));
         partitionChangelogs.remove(partition);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java
index 7f3f730280..96fe15c344 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java
@@ -49,18 +49,22 @@ import java.util.UUID;
  * file, in order to reduce the number of small files.
  */
 public class ChangelogCompactTask implements Serializable {
+
     private final long checkpointId;
     private final BinaryRow partition;
+    private final int totalBuckets;
     private final Map<Integer, List<DataFileMeta>> newFileChangelogFiles;
     private final Map<Integer, List<DataFileMeta>> compactChangelogFiles;
 
     public ChangelogCompactTask(
             long checkpointId,
             BinaryRow partition,
+            int totalBuckets,
             Map<Integer, List<DataFileMeta>> newFileChangelogFiles,
             Map<Integer, List<DataFileMeta>> compactChangelogFiles) {
         this.checkpointId = checkpointId;
         this.partition = partition;
+        this.totalBuckets = totalBuckets;
         this.newFileChangelogFiles = newFileChangelogFiles;
         this.compactChangelogFiles = compactChangelogFiles;
     }
@@ -73,6 +77,10 @@ public class ChangelogCompactTask implements Serializable {
         return partition;
     }
 
+    public int totalBuckets() {
+        return totalBuckets;
+    }
+
     public Map<Integer, List<DataFileMeta>> newFileChangelogFiles() {
         return newFileChangelogFiles;
     }
@@ -204,6 +212,7 @@ public class ChangelogCompactTask implements Serializable {
                     new CommitMessageImpl(
                             partition,
                             entry.getKey(),
+                            totalBuckets,
                             new DataIncrement(
                                     Collections.emptyList(),
                                     Collections.emptyList(),
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskSerializer.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskSerializer.java
index e21220b26d..c7f56dc5de 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskSerializer.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskSerializer.java
@@ -39,7 +39,8 @@ import static 
org.apache.paimon.utils.SerializationUtils.serializeBinaryRow;
 /** Serializer for {@link ChangelogCompactTask}. */
 public class ChangelogCompactTaskSerializer
         implements SimpleVersionedSerializer<ChangelogCompactTask> {
-    private static final int CURRENT_VERSION = 1;
+
+    private static final int CURRENT_VERSION = 2;
 
     private final DataFileMetaSerializer dataFileSerializer;
 
@@ -69,6 +70,7 @@ public class ChangelogCompactTaskSerializer
     private void serialize(ChangelogCompactTask task, DataOutputView view) 
throws IOException {
         view.writeLong(task.checkpointId());
         serializeBinaryRow(task.partition(), view);
+        view.writeInt(task.totalBuckets());
         // serialize newFileChangelogFiles map
         serializeMap(task.newFileChangelogFiles(), view);
         serializeMap(task.compactChangelogFiles(), view);
@@ -82,6 +84,7 @@ public class ChangelogCompactTaskSerializer
         return new ChangelogCompactTask(
                 view.readLong(),
                 deserializeBinaryRow(view),
+                view.readInt(),
                 deserializeMap(view),
                 deserializeMap(view));
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCompactSplitSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCompactSplitSource.java
index 0bfab0d196..3746f355c3 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCompactSplitSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCompactSplitSource.java
@@ -108,6 +108,7 @@ public class PostponeBucketCompactSplitSource extends 
AbstractNonCoordinatedSour
                                         .withPartition(dataSplit.partition())
                                         .withBucket(dataSplit.bucket())
                                         .withBucketPath(dataSplit.bucketPath())
+                                        
.withTotalBuckets(dataSplit.totalBuckets())
                                         
.withDataFiles(Collections.singletonList(meta))
                                         .isStreaming(false)
                                         .build();
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/RemovePostponeBucketFilesOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/RemovePostponeBucketFilesOperator.java
index 63b44d0f85..09d3f829bf 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/RemovePostponeBucketFilesOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/RemovePostponeBucketFilesOperator.java
@@ -45,6 +45,7 @@ public class RemovePostponeBucketFilesOperator extends 
BoundedOneInputOperator<S
                 new CommitMessageImpl(
                         dataSplit.partition(),
                         dataSplit.bucket(),
+                        dataSplit.totalBuckets(),
                         DataIncrement.emptyIncrement(),
                         new CompactIncrement(
                                 dataSplit.dataFiles(),
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/RewritePostponeBucketCommittableOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/RewritePostponeBucketCommittableOperator.java
index f387222e4f..8cb2540aa5 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/RewritePostponeBucketCommittableOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/RewritePostponeBucketCommittableOperator.java
@@ -33,6 +33,8 @@ import org.apache.paimon.utils.FileStorePathFactory;
 
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
+import javax.annotation.Nullable;
+
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
@@ -93,12 +95,14 @@ public class RewritePostponeBucketCommittableOperator
                 bucketFiles.entrySet()) {
             for (Map.Entry<Integer, BucketFiles> bucketEntry :
                     partitionEntry.getValue().entrySet()) {
+                BucketFiles bucketFiles = bucketEntry.getValue();
                 CommitMessageImpl message =
                         new CommitMessageImpl(
                                 partitionEntry.getKey(),
                                 bucketEntry.getKey(),
+                                bucketFiles.totalBuckets,
                                 DataIncrement.emptyIncrement(),
-                                bucketEntry.getValue().makeIncrement());
+                                bucketFiles.makeIncrement());
                 output.collect(
                         new StreamRecord<>(
                                 new Committable(checkpointId, 
Committable.Kind.FILE, message)));
@@ -112,6 +116,7 @@ public class RewritePostponeBucketCommittableOperator
         private final DataFilePathFactory pathFactory;
         private final FileIO fileIO;
 
+        private @Nullable Integer totalBuckets;
         private final Map<String, DataFileMeta> newFiles;
         private final List<DataFileMeta> compactBefore;
         private final List<DataFileMeta> compactAfter;
@@ -128,6 +133,8 @@ public class RewritePostponeBucketCommittableOperator
         }
 
         private void update(CommitMessageImpl message) {
+            totalBuckets = message.totalBuckets();
+
             for (DataFileMeta file : message.newFilesIncrement().newFiles()) {
                 newFiles.put(file.fileName(), file);
             }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RewriteFileIndexSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RewriteFileIndexSink.java
index bd7ae4a824..d377851d38 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RewriteFileIndexSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RewriteFileIndexSink.java
@@ -127,16 +127,16 @@ public class RewriteFileIndexSink extends 
FlinkWriteSink<ManifestEntry> {
             ManifestEntry entry = element.getValue();
             BinaryRow partition = entry.partition();
             int bucket = entry.bucket();
-            DataFileMeta file = entry.file();
-            DataFileMeta indexedFile = fileIndexProcessor.process(partition, 
bucket, file);
+            DataFileMeta indexedFile = fileIndexProcessor.process(partition, 
bucket, entry);
 
             CommitMessageImpl commitMessage =
                     new CommitMessageImpl(
                             partition,
                             bucket,
+                            entry.totalBuckets(),
                             DataIncrement.emptyIncrement(),
                             new CompactIncrement(
-                                    Collections.singletonList(file),
+                                    Collections.singletonList(entry.file()),
                                     Collections.singletonList(indexedFile),
                                     Collections.emptyList()));
 
@@ -176,8 +176,9 @@ public class RewriteFileIndexSink extends 
FlinkWriteSink<ManifestEntry> {
             this.sizeInMeta = 
table.coreOptions().fileIndexInManifestThreshold();
         }
 
-        public DataFileMeta process(BinaryRow partition, int bucket, 
DataFileMeta dataFileMeta)
+        public DataFileMeta process(BinaryRow partition, int bucket, 
ManifestEntry manifestEntry)
                 throws IOException {
+            DataFileMeta dataFileMeta = manifestEntry.file();
             DataFilePathFactory dataFilePathFactory = 
pathFactories.get(partition, bucket);
             SchemaInfo schemaInfo = 
schemaInfoCache.schemaInfo(dataFileMeta.schemaId());
             List<String> extras = new ArrayList<>(dataFileMeta.extraFiles());
@@ -245,6 +246,7 @@ public class RewriteFileIndexSink extends 
FlinkWriteSink<ManifestEntry> {
                                                         pathFactory
                                                                 
.bucketPath(partition, bucket)
                                                                 .toString())
+                                                
.withTotalBuckets(manifestEntry.totalBuckets())
                                                 .withDataFiles(
                                                         
Collections.singletonList(dataFileMeta))
                                                 .rawConvertible(true)
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
index d274a02cd8..bd7c6f898f 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
@@ -264,10 +264,9 @@ public class PostponeBucketTableITCase extends 
AbstractTestBase {
                         + "  'postpone.default-bucket-num' = '2'\n"
                         + ")");
 
-        int numPartitions = 3;
         int numKeys = 100;
         List<String> values = new ArrayList<>();
-        for (int i = 0; i < numPartitions; i++) {
+        for (int i = 0; i < 3; i++) {
             for (int j = 0; j < numKeys; j++) {
                 values.add(String.format("(%d, %d, %d)", i, j, i * numKeys + 
j));
             }
@@ -276,7 +275,7 @@ public class PostponeBucketTableITCase extends 
AbstractTestBase {
         tEnv.executeSql("CALL sys.compact(`table` => 'default.T')").await();
 
         List<String> expectedBuckets = new ArrayList<>();
-        for (int i = 0; i < numPartitions; i++) {
+        for (int i = 0; i < 3; i++) {
             expectedBuckets.add(String.format("+I[{%d}, 2]", i));
         }
         String bucketSql =
@@ -284,7 +283,7 @@ public class PostponeBucketTableITCase extends 
AbstractTestBase {
         
assertThat(collect(tEnv.executeSql(bucketSql))).hasSameElementsAs(expectedBuckets);
 
         List<String> expectedData = new ArrayList<>();
-        for (int i = 0; i < numPartitions; i++) {
+        for (int i = 0; i < 3; i++) {
             expectedData.add(
                     String.format(
                             "+I[%d, %d]",
@@ -296,23 +295,20 @@ public class PostponeBucketTableITCase extends 
AbstractTestBase {
         // before rescaling, write some files in bucket = -2 directory,
         // these files should not be touched by rescaling
         values.clear();
-        int changedPartition = 1;
         for (int j = 0; j < numKeys; j++) {
-            values.add(
-                    String.format(
-                            "(%d, %d, %d)",
-                            changedPartition, j, -(changedPartition * numKeys 
+ j)));
+            values.add(String.format("(1, %d, 0)", j));
+            values.add(String.format("(2, %d, 1)", j));
         }
         tEnv.executeSql("INSERT INTO T VALUES " + String.join(", ", 
values)).await();
 
         tEnv.executeSql(
-                "CALL sys.rescale(`table` => 'default.T', `bucket_num` => 4, 
`partition` => 'pt="
-                        + changedPartition
-                        + "')");
+                "CALL sys.rescale(`table` => 'default.T', `bucket_num` => 4, 
`partition` => 'pt=1')");
+        tEnv.executeSql(
+                "CALL sys.rescale(`table` => 'default.T', `bucket_num` => 8, 
`partition` => 'pt=2')");
         expectedBuckets.clear();
-        for (int i = 0; i < numPartitions; i++) {
-            expectedBuckets.add(String.format("+I[{%d}, %d]", i, i == 
changedPartition ? 4 : 2));
-        }
+        expectedBuckets.add("+I[{0}, 2]");
+        expectedBuckets.add("+I[{1}, 4]");
+        expectedBuckets.add("+I[{2}, 8]");
         
assertThat(collect(tEnv.executeSql(bucketSql))).hasSameElementsAs(expectedBuckets);
         
assertThat(collect(tEnv.executeSql(query))).hasSameElementsAs(expectedData);
 
@@ -321,13 +317,9 @@ public class PostponeBucketTableITCase extends 
AbstractTestBase {
         
assertThat(collect(tEnv.executeSql(bucketSql))).hasSameElementsAs(expectedBuckets);
 
         expectedData.clear();
-        for (int i = 0; i < numPartitions; i++) {
-            int val = (i * numKeys + i * numKeys + numKeys - 1) * numKeys / 2;
-            if (i == changedPartition) {
-                val *= -1;
-            }
-            expectedData.add(String.format("+I[%d, %d]", i, val));
-        }
+        expectedData.add(String.format("+I[0, %d]", (numKeys - 1) * numKeys / 
2));
+        expectedData.add("+I[1, 0]");
+        expectedData.add(String.format("+I[2, %d]", numKeys));
         
assertThat(collect(tEnv.executeSql(query))).hasSameElementsAs(expectedData);
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/UnawareBucketNewFilesCompactionCoordinatorOperatorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/UnawareBucketNewFilesCompactionCoordinatorOperatorTest.java
index 4f734ff6da..5273e0c7db 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/UnawareBucketNewFilesCompactionCoordinatorOperatorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/UnawareBucketNewFilesCompactionCoordinatorOperatorTest.java
@@ -218,6 +218,7 @@ public class 
UnawareBucketNewFilesCompactionCoordinatorOperatorTest {
                 new CommitMessageImpl(
                         partition,
                         BucketMode.UNAWARE_BUCKET,
+                        -1,
                         new DataIncrement(
                                 Arrays.stream(mbs)
                                         
.mapToObj(this::createDataFileMetaOfSize)
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskSerializerTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskSerializerTest.java
index 906fac8509..7fcde6214f 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskSerializerTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskSerializerTest.java
@@ -49,6 +49,7 @@ public class ChangelogCompactTaskSerializerTest {
                 new ChangelogCompactTask(
                         1L,
                         partition,
+                        2,
                         new HashMap<Integer, List<DataFileMeta>>() {
                             {
                                 put(0, newFiles(20));
@@ -61,7 +62,7 @@ public class ChangelogCompactTaskSerializerTest {
                                 put(1, newFiles(10));
                             }
                         });
-        ChangelogCompactTask serializeTask = serializer.deserialize(1, 
serializer.serialize(task));
+        ChangelogCompactTask serializeTask = serializer.deserialize(2, 
serializer.serialize(task));
         assertThat(task).isEqualTo(serializeTask);
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommittableSerializerTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommittableSerializerTest.java
index 47f9ce570e..1cda7ddc18 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommittableSerializerTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommittableSerializerTest.java
@@ -45,7 +45,7 @@ public class CommittableSerializerTest {
         DataIncrement dataIncrement = randomNewFilesIncrement();
         CompactIncrement compactIncrement = randomCompactIncrement();
         CommitMessage committable =
-                new CommitMessageImpl(row(0), 1, dataIncrement, 
compactIncrement);
+                new CommitMessageImpl(row(0), 1, 2, dataIncrement, 
compactIncrement);
         CommitMessage newCommittable =
                 (CommitMessage)
                         serializer
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
index 9d3bc135e2..41ccfbf79e 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
@@ -336,6 +336,7 @@ public class CommitterOperatorTest extends 
CommitterOperatorTestBase {
                                             new CommitMessageImpl(
                                                     BinaryRow.EMPTY_ROW,
                                                     0,
+                                                    1,
                                                     new DataIncrement(
                                                             
Collections.emptyList(),
                                                             
Collections.emptyList(),
@@ -353,6 +354,7 @@ public class CommitterOperatorTest extends 
CommitterOperatorTestBase {
                                             new CommitMessageImpl(
                                                     BinaryRow.EMPTY_ROW,
                                                     0,
+                                                    1,
                                                     new DataIncrement(
                                                             
Collections.emptyList(),
                                                             
Collections.emptyList(),
@@ -369,6 +371,7 @@ public class CommitterOperatorTest extends 
CommitterOperatorTestBase {
                                             new CommitMessageImpl(
                                                     BinaryRow.EMPTY_ROW,
                                                     0,
+                                                    1,
                                                     new DataIncrement(
                                                             
Collections.emptyList(),
                                                             
Collections.emptyList(),
@@ -404,6 +407,7 @@ public class CommitterOperatorTest extends 
CommitterOperatorTestBase {
                                             new CommitMessageImpl(
                                                     BinaryRow.EMPTY_ROW,
                                                     0,
+                                                    1,
                                                     new DataIncrement(
                                                             
Collections.emptyList(),
                                                             
Collections.emptyList(),
@@ -421,6 +425,7 @@ public class CommitterOperatorTest extends 
CommitterOperatorTestBase {
                                             new CommitMessageImpl(
                                                     BinaryRow.EMPTY_ROW,
                                                     0,
+                                                    1,
                                                     new DataIncrement(
                                                             
Collections.emptyList(),
                                                             
Collections.emptyList(),
@@ -437,6 +442,7 @@ public class CommitterOperatorTest extends 
CommitterOperatorTestBase {
                                             new CommitMessageImpl(
                                                     BinaryRow.EMPTY_ROW,
                                                     0,
+                                                    1,
                                                     new DataIncrement(
                                                             
Collections.emptyList(),
                                                             
Collections.emptyList(),
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/MultiTableCommittableSerializerTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/MultiTableCommittableSerializerTest.java
index 16c1a7d044..549595ec7b 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/MultiTableCommittableSerializerTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/MultiTableCommittableSerializerTest.java
@@ -49,7 +49,7 @@ class MultiTableCommittableSerializerTest {
         DataIncrement dataIncrement = randomNewFilesIncrement();
         CompactIncrement compactIncrement = randomCompactIncrement();
         CommitMessage commitMessage =
-                new CommitMessageImpl(row(0), 1, dataIncrement, 
compactIncrement);
+                new CommitMessageImpl(row(0), 1, 2, dataIncrement, 
compactIncrement);
         Committable committable = new Committable(9, Committable.Kind.FILE, 
commitMessage);
 
         Arrays.asList(Tuple2.of("测试数据库", "用户信息表"), Tuple2.of("database", 
"table"))
@@ -82,7 +82,7 @@ class MultiTableCommittableSerializerTest {
         DataIncrement newFilesIncrement = randomNewFilesIncrement();
         CompactIncrement compactIncrement = randomCompactIncrement();
         CommitMessage commitMessage =
-                new CommitMessageImpl(row(0), 1, newFilesIncrement, 
compactIncrement);
+                new CommitMessageImpl(row(0), 1, 2, newFilesIncrement, 
compactIncrement);
         Committable committable = new Committable(9, Committable.Kind.FILE, 
commitMessage);
 
         Arrays.asList(Tuple2.of("测试数据库", "用户信息表"), Tuple2.of("database", 
"table"))
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WrappedManifestCommittableSerializerTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WrappedManifestCommittableSerializerTest.java
index b0aa76f157..f82bfc4278 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WrappedManifestCommittableSerializerTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WrappedManifestCommittableSerializerTest.java
@@ -76,22 +76,23 @@ class WrappedManifestCommittableSerializerTest {
                 rnd.nextBoolean()
                         ? new ManifestCommittable(rnd.nextLong(), 
rnd.nextLong())
                         : new ManifestCommittable(rnd.nextLong(), null);
-        addFileCommittables(committable, row(0), 0);
-        addFileCommittables(committable, row(0), 1);
-        addFileCommittables(committable, row(1), 0);
-        addFileCommittables(committable, row(1), 1);
+        addFileCommittables(committable, row(0), 0, 2);
+        addFileCommittables(committable, row(0), 1, 2);
+        addFileCommittables(committable, row(1), 0, 2);
+        addFileCommittables(committable, row(1), 1, 2);
         return committable;
     }
 
     public static void addFileCommittables(
-            ManifestCommittable committable, BinaryRow partition, int bucket) {
+            ManifestCommittable committable, BinaryRow partition, int bucket, 
int totalBuckets) {
         List<CommitMessage> commitMessages = new ArrayList<>();
         int length = ThreadLocalRandom.current().nextInt(10) + 1;
         for (int i = 0; i < length; i++) {
             DataIncrement dataIncrement = randomNewFilesIncrement();
             CompactIncrement compactIncrement = randomCompactIncrement();
             CommitMessage commitMessage =
-                    new CommitMessageImpl(partition, bucket, dataIncrement, 
compactIncrement);
+                    new CommitMessageImpl(
+                            partition, bucket, totalBuckets, dataIncrement, 
compactIncrement);
             commitMessages.add(commitMessage);
             committable.addFileCommittable(commitMessage);
         }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java
index f737a19fa9..a220d566ad 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java
@@ -112,6 +112,7 @@ class PartitionMarkDoneTest extends TableTestBase {
                     new CommitMessageImpl(
                             BinaryRow.singleColumn(0),
                             0,
+                            1,
                             new DataIncrement(emptyList(), emptyList(), 
emptyList()),
                             new CompactIncrement(singletonList(file), 
emptyList(), emptyList()),
                             new IndexIncrement(emptyList()));
@@ -120,6 +121,7 @@ class PartitionMarkDoneTest extends TableTestBase {
                     new CommitMessageImpl(
                             BinaryRow.singleColumn(0),
                             0,
+                            1,
                             new DataIncrement(singletonList(file), 
emptyList(), emptyList()),
                             new CompactIncrement(emptyList(), emptyList(), 
emptyList()),
                             new IndexIncrement(emptyList()));
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/filter/OrcSimpleStatsExtractor.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/filter/OrcSimpleStatsExtractor.java
index 88fe069d46..f044596708 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/filter/OrcSimpleStatsExtractor.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/filter/OrcSimpleStatsExtractor.java
@@ -45,6 +45,8 @@ import org.apache.orc.Reader;
 import org.apache.orc.StringColumnStatistics;
 import org.apache.orc.TimestampColumnStatistics;
 import org.apache.orc.TypeDescription;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.sql.Date;
@@ -54,6 +56,8 @@ import java.util.stream.IntStream;
 /** {@link SimpleStatsExtractor} for orc files. */
 public class OrcSimpleStatsExtractor implements SimpleStatsExtractor {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(OrcSimpleStatsExtractor.class);
+
     private final RowType rowType;
     private final SimpleColStatsCollector.Factory[] statsCollectors;
     private final boolean legacyTimestampLtzType;
@@ -131,135 +135,109 @@ public class OrcSimpleStatsExtractor implements 
SimpleStatsExtractor {
                         + "!");
 
         SimpleColStats fieldStats;
+        try {
+            fieldStats = toFieldStats(field, stats, nullCount);
+        } catch (Exception e) {
+            LOG.warn("Failed to extract field stats for field {}", field, e);
+            fieldStats = new SimpleColStats(null, null, nullCount);
+        }
+
+        return collector.convert(fieldStats);
+    }
+
+    private SimpleColStats toFieldStats(DataField field, ColumnStatistics 
stats, long nullCount) {
         switch (field.type().getTypeRoot()) {
             case CHAR:
             case VARCHAR:
                 assertStatsClass(field, stats, StringColumnStatistics.class);
                 StringColumnStatistics stringStats = (StringColumnStatistics) 
stats;
-                fieldStats =
-                        new SimpleColStats(
-                                
BinaryString.fromString(stringStats.getMinimum()),
-                                
BinaryString.fromString(stringStats.getMaximum()),
-                                nullCount);
-                break;
+                return new SimpleColStats(
+                        BinaryString.fromString(stringStats.getMinimum()),
+                        BinaryString.fromString(stringStats.getMaximum()),
+                        nullCount);
             case BOOLEAN:
                 assertStatsClass(field, stats, BooleanColumnStatistics.class);
                 BooleanColumnStatistics boolStats = (BooleanColumnStatistics) 
stats;
-                fieldStats =
-                        new SimpleColStats(
-                                boolStats.getFalseCount() == 0,
-                                boolStats.getTrueCount() != 0,
-                                nullCount);
-                break;
+                return new SimpleColStats(
+                        boolStats.getFalseCount() == 0, 
boolStats.getTrueCount() != 0, nullCount);
             case DECIMAL:
                 assertStatsClass(field, stats, DecimalColumnStatistics.class);
                 DecimalColumnStatistics decimalStats = 
(DecimalColumnStatistics) stats;
                 DecimalType decimalType = (DecimalType) (field.type());
                 int precision = decimalType.getPrecision();
                 int scale = decimalType.getScale();
-                fieldStats =
-                        new SimpleColStats(
-                                Decimal.fromBigDecimal(
-                                        
decimalStats.getMinimum().bigDecimalValue(),
-                                        precision,
-                                        scale),
-                                Decimal.fromBigDecimal(
-                                        
decimalStats.getMaximum().bigDecimalValue(),
-                                        precision,
-                                        scale),
-                                nullCount);
-                break;
+                return new SimpleColStats(
+                        Decimal.fromBigDecimal(
+                                decimalStats.getMinimum().bigDecimalValue(), 
precision, scale),
+                        Decimal.fromBigDecimal(
+                                decimalStats.getMaximum().bigDecimalValue(), 
precision, scale),
+                        nullCount);
             case TINYINT:
                 assertStatsClass(field, stats, IntegerColumnStatistics.class);
                 IntegerColumnStatistics byteStats = (IntegerColumnStatistics) 
stats;
-                fieldStats =
-                        new SimpleColStats(
-                                (byte) byteStats.getMinimum(),
-                                (byte) byteStats.getMaximum(),
-                                nullCount);
-                break;
+                return new SimpleColStats(
+                        (byte) byteStats.getMinimum(), (byte) 
byteStats.getMaximum(), nullCount);
             case SMALLINT:
                 assertStatsClass(field, stats, IntegerColumnStatistics.class);
                 IntegerColumnStatistics shortStats = (IntegerColumnStatistics) 
stats;
-                fieldStats =
-                        new SimpleColStats(
-                                (short) shortStats.getMinimum(),
-                                (short) shortStats.getMaximum(),
-                                nullCount);
-                break;
+                return new SimpleColStats(
+                        (short) shortStats.getMinimum(),
+                        (short) shortStats.getMaximum(),
+                        nullCount);
             case INTEGER:
             case TIME_WITHOUT_TIME_ZONE:
                 assertStatsClass(field, stats, IntegerColumnStatistics.class);
                 IntegerColumnStatistics intStats = (IntegerColumnStatistics) 
stats;
-                fieldStats =
-                        new SimpleColStats(
-                                Long.valueOf(intStats.getMinimum()).intValue(),
-                                Long.valueOf(intStats.getMaximum()).intValue(),
-                                nullCount);
-                break;
+                return new SimpleColStats(
+                        Long.valueOf(intStats.getMinimum()).intValue(),
+                        Long.valueOf(intStats.getMaximum()).intValue(),
+                        nullCount);
             case BIGINT:
                 assertStatsClass(field, stats, IntegerColumnStatistics.class);
                 IntegerColumnStatistics longStats = (IntegerColumnStatistics) 
stats;
-                fieldStats =
-                        new SimpleColStats(
-                                longStats.getMinimum(), 
longStats.getMaximum(), nullCount);
-                break;
+                return new SimpleColStats(
+                        longStats.getMinimum(), longStats.getMaximum(), 
nullCount);
             case FLOAT:
                 assertStatsClass(field, stats, DoubleColumnStatistics.class);
                 DoubleColumnStatistics floatStats = (DoubleColumnStatistics) 
stats;
-                fieldStats =
-                        new SimpleColStats(
-                                (float) floatStats.getMinimum(),
-                                (float) floatStats.getMaximum(),
-                                nullCount);
-                break;
+                return new SimpleColStats(
+                        (float) floatStats.getMinimum(),
+                        (float) floatStats.getMaximum(),
+                        nullCount);
             case DOUBLE:
                 assertStatsClass(field, stats, DoubleColumnStatistics.class);
                 DoubleColumnStatistics doubleStats = (DoubleColumnStatistics) 
stats;
-                fieldStats =
-                        new SimpleColStats(
-                                doubleStats.getMinimum(), 
doubleStats.getMaximum(), nullCount);
-                break;
+                return new SimpleColStats(
+                        doubleStats.getMinimum(), doubleStats.getMaximum(), 
nullCount);
             case DATE:
                 assertStatsClass(field, stats, DateColumnStatistics.class);
                 DateColumnStatistics dateStats = (DateColumnStatistics) stats;
-                fieldStats =
-                        new SimpleColStats(
-                                DateTimeUtils.toInternal(
-                                        new 
Date(dateStats.getMinimum().getTime())),
-                                DateTimeUtils.toInternal(
-                                        new 
Date(dateStats.getMaximum().getTime())),
-                                nullCount);
-                break;
+                return new SimpleColStats(
+                        DateTimeUtils.toInternal(new 
Date(dateStats.getMinimum().getTime())),
+                        DateTimeUtils.toInternal(new 
Date(dateStats.getMaximum().getTime())),
+                        nullCount);
             case TIMESTAMP_WITHOUT_TIME_ZONE:
                 assertStatsClass(field, stats, 
TimestampColumnStatistics.class);
                 TimestampColumnStatistics timestampStats = 
(TimestampColumnStatistics) stats;
-                fieldStats =
-                        new SimpleColStats(
-                                
Timestamp.fromSQLTimestamp(timestampStats.getMinimum()),
-                                
Timestamp.fromSQLTimestamp(timestampStats.getMaximum()),
-                                nullCount);
-                break;
+                return new SimpleColStats(
+                        
Timestamp.fromSQLTimestamp(timestampStats.getMinimum()),
+                        
Timestamp.fromSQLTimestamp(timestampStats.getMaximum()),
+                        nullCount);
             case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
                 assertStatsClass(field, stats, 
TimestampColumnStatistics.class);
                 TimestampColumnStatistics timestampLtzStats = 
(TimestampColumnStatistics) stats;
-                fieldStats =
-                        legacyTimestampLtzType
-                                ? new SimpleColStats(
-                                        
Timestamp.fromSQLTimestamp(timestampLtzStats.getMinimum()),
-                                        
Timestamp.fromSQLTimestamp(timestampLtzStats.getMaximum()),
-                                        nullCount)
-                                : new SimpleColStats(
-                                        Timestamp.fromInstant(
-                                                
timestampLtzStats.getMinimum().toInstant()),
-                                        Timestamp.fromInstant(
-                                                
timestampLtzStats.getMaximum().toInstant()),
-                                        nullCount);
-                break;
+                return legacyTimestampLtzType
+                        ? new SimpleColStats(
+                                
Timestamp.fromSQLTimestamp(timestampLtzStats.getMinimum()),
+                                
Timestamp.fromSQLTimestamp(timestampLtzStats.getMaximum()),
+                                nullCount)
+                        : new SimpleColStats(
+                                
Timestamp.fromInstant(timestampLtzStats.getMinimum().toInstant()),
+                                
Timestamp.fromInstant(timestampLtzStats.getMaximum().toInstant()),
+                                nullCount);
             default:
-                fieldStats = new SimpleColStats(null, null, nullCount);
+                return new SimpleColStats(null, null, nullCount);
         }
-        return collector.convert(fieldStats);
     }
 
     private void assertStatsClass(
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java
index d1478830ac..74f82cbbbe 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java
@@ -426,7 +426,8 @@ public class HiveMigrator implements Migrator {
                             HIDDEN_PATH_FILTER,
                             newDir,
                             rollback);
-            return FileMetaUtils.commitFile(partitionRow, fileMetas);
+            return FileMetaUtils.commitFile(
+                    partitionRow, paimonTable.coreOptions().bucket(), 
fileMetas);
         }
     }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala
index b5b56ba1d5..e68407903d 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala
@@ -137,6 +137,7 @@ trait ScanHelper extends Logging {
       .withSnapshot(split.snapshotId())
       .withPartition(split.partition())
       .withBucket(split.bucket())
+      .withTotalBuckets(split.totalBuckets())
       .withDataFiles(dataFiles.toList.asJava)
       .rawConvertible(split.rawConvertible)
       .withBucketPath(split.bucketPath)
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
index b83831ba21..fea0fd006d 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
@@ -271,6 +271,7 @@ trait PaimonCommand extends WithFileStoreTable with 
ExpressionHelper with SQLCon
           new CommitMessageImpl(
             partition,
             bucket,
+            files.head.totalBuckets,
             new DataIncrement(
               Collections.emptyList[DataFileMeta],
               deletedDataFileMetas,
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index e974ad98b4..b8a3cccf08 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -311,6 +311,7 @@ case class PaimonSparkWriter(table: FileStoreTable) {
           val commitMessage = new CommitMessageImpl(
             dvIndexFileMaintainer.getPartition,
             dvIndexFileMaintainer.getBucket,
+            null,
             DataIncrement.emptyIncrement(),
             CompactIncrement.emptyIncrement(),
             new IndexIncrement(added.map(_.indexFile).asJava, 
deleted.map(_.indexFile).asJava)
@@ -333,6 +334,7 @@ case class PaimonSparkWriter(table: FileStoreTable) {
           new CommitMessageImpl(
             partition,
             bucket,
+            null,
             DataIncrement.emptyIncrement(),
             CompactIncrement.emptyIncrement(),
             new IndexIncrement(added.map(_.indexFile()).asJava, 
removed.map(_.indexFile()).asJava))
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDeletionVectors.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDeletionVectors.scala
index 1f908aeb90..5a76030fd4 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDeletionVectors.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDeletionVectors.scala
@@ -49,19 +49,20 @@ object SparkDeletionVectors {
       root: Path,
       pathFactory: FileStorePathFactory,
       dataFilePathToMeta: Map[String, SparkDataFileMeta]): DataSplit = {
-    val (dataFiles, deletionFiles) = sparkDeletionVectors
+    val (dataFiles, deletionFiles, totalBuckets) = sparkDeletionVectors
       .relativePaths(pathFactory)
       .map {
         dataFile =>
           val meta = dataFilePathToMeta(dataFile)
-          (meta.dataFileMeta, meta.deletionFile.orNull)
+          (meta.dataFileMeta, meta.deletionFile.orNull, meta.totalBuckets)
       }
-      .unzip
+      .unzip3
     DataSplit
       .builder()
       .withBucketPath(root + "/" + sparkDeletionVectors.partitionAndBucket)
       
.withPartition(SerializationUtils.deserializeBinaryRow(sparkDeletionVectors.partition))
       .withBucket(sparkDeletionVectors.bucket)
+      .withTotalBuckets(totalBuckets.head)
       .withDataFiles(dataFiles.toList.asJava)
       .withDataDeletionFiles(deletionFiles.toList.asJava)
       .rawConvertible(true)
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkRemoveUnexistingFiles.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkRemoveUnexistingFiles.scala
index d370993882..f28a12824e 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkRemoveUnexistingFiles.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkRemoveUnexistingFiles.scala
@@ -66,6 +66,7 @@ case class SparkRemoveUnexistingFiles(
                     val message = new CommitMessageImpl(
                       reuse,
                       bucket,
+                      table.coreOptions().bucket(),
                       new DataIncrement(
                         Collections.emptyList(),
                         new util.ArrayList[DataFileMeta](metaMap.values()),

Reply via email to