This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c0fc7df1c [core][flink] Introduce postpone bucket tables. (#5095)
3c0fc7df1c is described below

commit 3c0fc7df1cd19a4d902713fbb44e5e7c742448fc
Author: tsreaper <[email protected]>
AuthorDate: Thu Feb 20 16:04:13 2025 +0800

    [core][flink] Introduce postpone bucket tables. (#5095)
---
 docs/content/flink/procedures.md                   |  15 +
 .../content/primary-key-table/data-distribution.md |  22 ++
 .../shortcodes/generated/core_configuration.html   |   2 +-
 .../generated/flink_connector_configuration.html   |   6 +
 .../main/java/org/apache/paimon/CoreOptions.java   |   4 +-
 .../java/org/apache/paimon/table/BucketMode.java   |   2 +
 .../java/org/apache/paimon/AbstractFileStore.java  |  13 +-
 .../org/apache/paimon/AppendOnlyFileStore.java     |  13 +-
 .../java/org/apache/paimon/KeyValueFileStore.java  | 105 +++--
 .../paimon/operation/AbstractFileStoreScan.java    |  11 +
 .../paimon/operation/FileStoreCommitImpl.java      |  14 +-
 .../org/apache/paimon/operation/FileStoreScan.java |   2 +
 .../paimon/operation/KeyValueFileStoreScan.java    |   8 +-
 .../paimon/operation/MergeFileSplitRead.java       |   3 +-
 .../postpone/PostponeBucketFileStoreWrite.java     | 118 ++++++
 .../paimon/postpone/PostponeBucketWriter.java      | 104 +++++
 .../org/apache/paimon/schema/SchemaManager.java    |   5 +
 .../org/apache/paimon/schema/SchemaValidation.java |   7 +-
 .../paimon/table/AbstractFileStoreTable.java       |   3 +-
 .../paimon/table/source/DataTableStreamScan.java   |   7 +-
 ...pScanner.java => ChangelogFollowUpScanner.java} |  14 +-
 .../CompactionChangelogFollowUpScanner.java        |  54 ---
 .../org/apache/paimon/utils/BatchRecordWriter.java |   2 +-
 .../apache/paimon/utils/FileStorePathFactory.java  |   7 +-
 .../org/apache/paimon/schema/TableSchemaTest.java  |   2 +-
 .../CompactionChangelogFollowUpScannerTest.java    |   7 +-
 .../InputChangelogFollowUpScannerTest.java         |   7 +-
 .../apache/paimon/flink/FlinkConnectorOptions.java |   7 +
 .../apache/paimon/flink/action/CompactAction.java  | 156 +++++++-
 .../paimon/flink/action/CompactActionFactory.java  |  28 +-
 .../apache/paimon/flink/action/RescaleAction.java  | 103 +++++
 .../paimon/flink/action/RescaleActionFactory.java  |  70 ++++
 .../postpone/PostponeBucketCompactSplitSource.java | 189 +++++++++
 .../RemovePostponeBucketFilesOperator.java         |  60 +++
 .../RewritePostponeBucketCommittableOperator.java  | 162 ++++++++
 .../paimon/flink/procedure/CompactProcedure.java   |   4 +-
 .../paimon/flink/procedure/RescaleProcedure.java   |  69 ++++
 .../org/apache/paimon/flink/sink/FlinkSink.java    |   2 +-
 .../apache/paimon/flink/sink/FlinkSinkBuilder.java |  12 +-
 .../sink/PostponeBucketTableWriteOperator.java     |  98 +++++
 .../paimon/flink/sink/PostponeBucketWriteSink.java |  58 +++
 .../paimon/flink/sink/TableWriteOperator.java      |  16 +-
 .../services/org.apache.paimon.factories.Factory   |   2 +
 .../paimon/flink/PostponeBucketTableITCase.java    | 438 +++++++++++++++++++++
 .../apache/paimon/flink/RescaleBucketITCase.java   |  10 +-
 45 files changed, 1864 insertions(+), 177 deletions(-)

diff --git a/docs/content/flink/procedures.md b/docs/content/flink/procedures.md
index db364ff0e3..3fc2ed7d37 100644
--- a/docs/content/flink/procedures.md
+++ b/docs/content/flink/procedures.md
@@ -754,5 +754,20 @@ All available procedures are listed below.
          CALL sys.compact_manifest(`table` => 'default.T')
       </td>
    </tr>
+   <tr>
+      <td>rescale</td>
+      <td>
+         CALL sys.rescale(`table` => 'identifier', `bucket_num` => bucket_num, 
`partition` => 'partition')
+      </td>
+      <td>
+         Rescale one partition of a table. Arguments:
+         <li>identifier: The target table identifier. Cannot be empty.</li>
+         <li>bucket_num: Resulting bucket number after rescale. The default 
value of argument bucket_num is the current bucket number of the table. Cannot 
be empty for postpone bucket tables.</li>
+         <li>partition: What partition to rescale. For partitioned table this 
argument cannot be empty.</li>
+      </td>
+      <td>
+         CALL sys.rescale(`table` => 'default.T', `bucket_num` => 16, 
`partition` => 'dt=20250217,hh=08')
+      </td>
+   </tr>
    </tbody>
 </table>
diff --git a/docs/content/primary-key-table/data-distribution.md 
b/docs/content/primary-key-table/data-distribution.md
index baf3327ed9..edfb0133f8 100644
--- a/docs/content/primary-key-table/data-distribution.md
+++ b/docs/content/primary-key-table/data-distribution.md
@@ -87,6 +87,28 @@ If your upsert does not rely on too old data, you can 
consider configuring index
 
 But please note that this may also cause data duplication.
 
+## Postpone Bucket
+
+Postpone bucket mode is configured by `'bucket' = '-2'`.
+This mode aims to solve the difficulty to determine a fixed number of buckets
+and support different buckets for different partitions.
+
+Currently, only Flink supports this mode.
+
+When writing records into the table,
+all records will first be stored in the `bucket-postpone` directory of each 
partition
+and are not available to readers.
+
+To move the records into the correct bucket and make them readable,
+you need to run a compaction job.
+See `compact` [procedure]({{< ref "flink/procedures" >}}).
+The bucket number for the partitions compacted for the first time
+is configured by the option `postpone.default-bucket-num`, whose default value 
is `4`.
+
+Finally, when you feel that the bucket number of some partition is too small,
+you can also run a rescale job.
+See `rescale` [procedure]({{< ref "flink/procedures" >}}).
+
 ## Pick Partition Fields
 
 The following three types of fields may be defined as partition fields in the 
warehouse:
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index e3e1c0f673..fc5c04477b 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -48,7 +48,7 @@ under the License.
             <td><h5>bucket</h5></td>
             <td style="word-wrap: break-word;">-1</td>
             <td>Integer</td>
-            <td>Bucket number for file store.<br />It should either be equal 
to -1 (dynamic bucket mode), or it must be greater than 0 (fixed bucket 
mode).</td>
+            <td>Bucket number for file store.<br />It should either be equal 
to -1 (dynamic bucket mode), -2 (postpone bucket mode), or it must be greater 
than 0 (fixed bucket mode).</td>
         </tr>
         <tr>
             <td><h5>bucket-key</h5></td>
diff --git 
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html 
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index eb15eb3145..14a96eed0d 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -98,6 +98,12 @@ under the License.
             <td>Duration</td>
             <td>You can specify time interval for partition, for example, 
daily partition is '1 d', hourly partition is '1 h'.</td>
         </tr>
+        <tr>
+            <td><h5>postpone.default-bucket-num</h5></td>
+            <td style="word-wrap: break-word;">4</td>
+            <td>Integer</td>
+            <td>Bucket number for the partitions compacted for the first time 
in postpone bucket tables.</td>
+        </tr>
         <tr>
             <td><h5>precommit-compact</h5></td>
             <td style="word-wrap: break-word;">false</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 380cf84744..9e0c679b57 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -104,7 +104,9 @@ public class CoreOptions implements Serializable {
                                     .text("Bucket number for file store.")
                                     .linebreak()
                                     .text(
-                                            "It should either be equal to -1 
(dynamic bucket mode), or it must be greater than 0 (fixed bucket mode).")
+                                            "It should either be equal to -1 
(dynamic bucket mode), "
+                                                    + "-2 (postpone bucket 
mode), "
+                                                    + "or it must be greater 
than 0 (fixed bucket mode).")
                                     .build());
 
     @Immutable
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/BucketMode.java 
b/paimon-common/src/main/java/org/apache/paimon/table/BucketMode.java
similarity index 97%
rename from paimon-core/src/main/java/org/apache/paimon/table/BucketMode.java
rename to paimon-common/src/main/java/org/apache/paimon/table/BucketMode.java
index c3b7ca1abd..74ff34613e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/BucketMode.java
+++ b/paimon-common/src/main/java/org/apache/paimon/table/BucketMode.java
@@ -60,4 +60,6 @@ public enum BucketMode {
     BUCKET_UNAWARE;
 
     public static final int UNAWARE_BUCKET = 0;
+
+    public static final int POSTPONE_BUCKET = -2;
 }
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index d9a1929dec..b3398035b5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -33,6 +33,7 @@ import org.apache.paimon.manifest.ManifestList;
 import org.apache.paimon.metastore.AddPartitionTagCallback;
 import org.apache.paimon.operation.ChangelogDeletion;
 import org.apache.paimon.operation.FileStoreCommitImpl;
+import org.apache.paimon.operation.FileStoreScan;
 import org.apache.paimon.operation.Lock;
 import org.apache.paimon.operation.ManifestsReader;
 import org.apache.paimon.operation.PartitionExpire;
@@ -257,6 +258,14 @@ abstract class AbstractFileStore<T> implements 
FileStore<T> {
         return schemaManager.mergeSchema(rowType, allowExplicitCast);
     }
 
+    protected abstract FileStoreScan newScan(ScanType scanType);
+
+    protected enum ScanType {
+        FOR_READ,
+        FOR_WRITE,
+        FOR_COMMIT
+    }
+
     @Override
     public FileStoreCommitImpl newCommit(String commitUser) {
         return newCommit(commitUser, Collections.emptyList());
@@ -283,7 +292,7 @@ abstract class AbstractFileStore<T> implements FileStore<T> 
{
                 manifestFileFactory(),
                 manifestListFactory(),
                 indexManifestFileFactory(),
-                newScan(),
+                newScan(ScanType.FOR_COMMIT),
                 options.bucket(),
                 options.manifestTargetSize(),
                 options.manifestFullCompactionThresholdSize(),
@@ -363,7 +372,7 @@ abstract class AbstractFileStore<T> implements FileStore<T> 
{
                 partitionExpireTime,
                 options.partitionExpireCheckInterval(),
                 PartitionExpireStrategy.createPartitionExpireStrategy(options, 
partitionType()),
-                newScan(),
+                newScan(ScanType.FOR_COMMIT),
                 newCommit(commitUser),
                 partitionHandler,
                 options.endInputCheckPartitionExpire(),
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
index a06b98d7b3..dc5171a744 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
@@ -72,7 +72,7 @@ public class AppendOnlyFileStore extends 
AbstractFileStore<InternalRow> {
 
     @Override
     public AppendOnlyFileStoreScan newScan() {
-        return newScan(false);
+        return newScan(ScanType.FOR_READ);
     }
 
     @Override
@@ -108,7 +108,7 @@ public class AppendOnlyFileStore extends 
AbstractFileStore<InternalRow> {
                     partitionType,
                     pathFactory(),
                     snapshotManager(),
-                    newScan(true).withManifestCacheFilter(manifestFilter),
+                    
newScan(ScanType.FOR_WRITE).withManifestCacheFilter(manifestFilter),
                     options,
                     dvMaintainerFactory,
                     tableName);
@@ -122,14 +122,15 @@ public class AppendOnlyFileStore extends 
AbstractFileStore<InternalRow> {
                     partitionType,
                     pathFactory(),
                     snapshotManager(),
-                    newScan(true).withManifestCacheFilter(manifestFilter),
+                    
newScan(ScanType.FOR_WRITE).withManifestCacheFilter(manifestFilter),
                     options,
                     dvMaintainerFactory,
                     tableName);
         }
     }
 
-    private AppendOnlyFileStoreScan newScan(boolean forWrite) {
+    @Override
+    protected AppendOnlyFileStoreScan newScan(ScanType scanType) {
         BucketSelectConverter bucketSelectConverter =
                 predicate -> {
                     if (bucketMode() != BucketMode.HASH_FIXED) {
@@ -152,12 +153,12 @@ public class AppendOnlyFileStore extends 
AbstractFileStore<InternalRow> {
                 };
 
         return new AppendOnlyFileStoreScan(
-                newManifestsReader(forWrite),
+                newManifestsReader(scanType == ScanType.FOR_WRITE),
                 bucketSelectConverter,
                 snapshotManager(),
                 schemaManager,
                 schema,
-                manifestFileFactory(forWrite),
+                manifestFileFactory(scanType == ScanType.FOR_WRITE),
                 options.scanManifestParallelism(),
                 options.fileIndexReadEnabled());
     }
diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
index a969fca037..18316901bf 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
@@ -28,11 +28,13 @@ import org.apache.paimon.index.IndexMaintainer;
 import org.apache.paimon.io.KeyValueFileReaderFactory;
 import org.apache.paimon.manifest.ManifestCacheFilter;
 import org.apache.paimon.mergetree.compact.MergeFunctionFactory;
+import org.apache.paimon.operation.AbstractFileStoreWrite;
 import org.apache.paimon.operation.BucketSelectConverter;
 import org.apache.paimon.operation.KeyValueFileStoreScan;
 import org.apache.paimon.operation.KeyValueFileStoreWrite;
 import org.apache.paimon.operation.MergeFileSplitRead;
 import org.apache.paimon.operation.RawFileSplitRead;
+import org.apache.paimon.postpone.PostponeBucketFileStoreWrite;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.schema.KeyValueFieldsExtractor;
 import org.apache.paimon.schema.SchemaManager;
@@ -112,7 +114,7 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
 
     @Override
     public KeyValueFileStoreScan newScan() {
-        return newScan(false);
+        return newScan(ScanType.FOR_READ);
     }
 
     @Override
@@ -152,12 +154,13 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
     }
 
     @Override
-    public KeyValueFileStoreWrite newWrite(String commitUser) {
+    public AbstractFileStoreWrite<KeyValue> newWrite(String commitUser) {
         return newWrite(commitUser, null);
     }
 
     @Override
-    public KeyValueFileStoreWrite newWrite(String commitUser, 
ManifestCacheFilter manifestFilter) {
+    public AbstractFileStoreWrite<KeyValue> newWrite(
+            String commitUser, ManifestCacheFilter manifestFilter) {
         IndexMaintainer.Factory<KeyValue> indexFactory = null;
         if (bucketMode() == BucketMode.HASH_DYNAMIC) {
             indexFactory = new 
HashIndexMaintainer.Factory(newIndexFileHandler());
@@ -167,27 +170,42 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
             deletionVectorsMaintainerFactory =
                     new 
DeletionVectorsMaintainer.Factory(newIndexFileHandler());
         }
-        return new KeyValueFileStoreWrite(
-                fileIO,
-                schemaManager,
-                schema,
-                commitUser,
-                partitionType,
-                keyType,
-                valueType,
-                keyComparatorSupplier,
-                () -> UserDefinedSeqComparator.create(valueType, options),
-                logDedupEqualSupplier,
-                mfFactory,
-                pathFactory(),
-                format2PathFactory(),
-                snapshotManager(),
-                newScan(true).withManifestCacheFilter(manifestFilter),
-                indexFactory,
-                deletionVectorsMaintainerFactory,
-                options,
-                keyValueFieldsExtractor,
-                tableName);
+
+        if (options.bucket() == BucketMode.POSTPONE_BUCKET) {
+            return new PostponeBucketFileStoreWrite(
+                    fileIO,
+                    schema,
+                    partitionType,
+                    keyType,
+                    valueType,
+                    format2PathFactory(),
+                    snapshotManager(),
+                    
newScan(ScanType.FOR_WRITE).withManifestCacheFilter(manifestFilter),
+                    options,
+                    tableName);
+        } else {
+            return new KeyValueFileStoreWrite(
+                    fileIO,
+                    schemaManager,
+                    schema,
+                    commitUser,
+                    partitionType,
+                    keyType,
+                    valueType,
+                    keyComparatorSupplier,
+                    () -> UserDefinedSeqComparator.create(valueType, options),
+                    logDedupEqualSupplier,
+                    mfFactory,
+                    pathFactory(),
+                    format2PathFactory(),
+                    snapshotManager(),
+                    
newScan(ScanType.FOR_WRITE).withManifestCacheFilter(manifestFilter),
+                    indexFactory,
+                    deletionVectorsMaintainerFactory,
+                    options,
+                    keyValueFieldsExtractor,
+                    tableName);
+        }
     }
 
     private Map<String, FileStorePathFactory> format2PathFactory() {
@@ -198,7 +216,8 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
         return pathFactoryMap;
     }
 
-    private KeyValueFileStoreScan newScan(boolean forWrite) {
+    @Override
+    protected KeyValueFileStoreScan newScan(ScanType scanType) {
         BucketSelectConverter bucketSelectConverter =
                 keyFilter -> {
                     if (bucketMode() != BucketMode.HASH_FIXED) {
@@ -210,24 +229,32 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
                                     splitAnd(keyFilter),
                                     keyType.getFieldNames(),
                                     bucketKeyType.getFieldNames());
-                    if (bucketFilters.size() > 0) {
+                    if (!bucketFilters.isEmpty()) {
                         return 
BucketSelectConverter.create(and(bucketFilters), bucketKeyType);
                     }
                     return Optional.empty();
                 };
-        return new KeyValueFileStoreScan(
-                newManifestsReader(forWrite),
-                bucketSelectConverter,
-                snapshotManager(),
-                schemaManager,
-                schema,
-                keyValueFieldsExtractor,
-                manifestFileFactory(forWrite),
-                options.scanManifestParallelism(),
-                options.deletionVectorsEnabled(),
-                options.mergeEngine(),
-                options.changelogProducer(),
-                options.fileIndexReadEnabled() && 
options.deletionVectorsEnabled());
+
+        KeyValueFileStoreScan scan =
+                new KeyValueFileStoreScan(
+                        newManifestsReader(scanType == ScanType.FOR_WRITE),
+                        bucketSelectConverter,
+                        snapshotManager(),
+                        schemaManager,
+                        schema,
+                        keyValueFieldsExtractor,
+                        manifestFileFactory(scanType == ScanType.FOR_WRITE),
+                        options.scanManifestParallelism(),
+                        options.deletionVectorsEnabled(),
+                        options.mergeEngine(),
+                        options.changelogProducer(),
+                        options.fileIndexReadEnabled() && 
options.deletionVectorsEnabled());
+
+        if (options.bucket() == BucketMode.POSTPONE_BUCKET && scanType == 
ScanType.FOR_READ) {
+            scan.onlyReadRealBuckets();
+        }
+
+        return scan;
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index 27ba4703b9..861128155d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -78,6 +78,7 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
     private final TableSchema schema;
 
     private Snapshot specifiedSnapshot = null;
+    private boolean onlyReadRealBuckets = false;
     private Filter<Integer> bucketFilter = null;
     private BiFilter<Integer, Integer> totalAwareBucketFilter = null;
     protected ScanMode scanMode = ScanMode.ALL;
@@ -136,6 +137,12 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
         return this;
     }
 
+    @Override
+    public FileStoreScan onlyReadRealBuckets() {
+        this.onlyReadRealBuckets = true;
+        return this;
+    }
+
     @Override
     public FileStoreScan withBucketFilter(Filter<Integer> bucketFilter) {
         this.bucketFilter = bucketFilter;
@@ -497,6 +504,10 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
             }
 
             int bucket = bucketGetter.apply(row);
+            if (onlyReadRealBuckets && bucket < 0) {
+                return false;
+            }
+
             if (bucketFilter != null && !bucketFilter.test(bucket)) {
                 return false;
             }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index af7e450e8e..4a59de8dac 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -775,12 +775,14 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
         List<ManifestEntry> changesWithOverwrite = new ArrayList<>();
         List<IndexManifestEntry> indexChangesWithOverwrite = new ArrayList<>();
         if (latestSnapshot != null) {
-            List<ManifestEntry> currentEntries =
-                    scan.withSnapshot(latestSnapshot)
-                            .withPartitionFilter(partitionFilter)
-                            .withKind(ScanMode.ALL)
-                            .plan()
-                            .files();
+            scan.withSnapshot(latestSnapshot)
+                    .withPartitionFilter(partitionFilter)
+                    .withKind(ScanMode.ALL);
+            if (numBucket != BucketMode.POSTPONE_BUCKET) {
+                // bucket = -2 can only be overwritten in postpone bucket 
tables
+                scan.withBucketFilter(bucket -> bucket >= 0);
+            }
+            List<ManifestEntry> currentEntries = scan.plan().files();
             for (ManifestEntry entry : currentEntries) {
                 changesWithOverwrite.add(
                         new ManifestEntry(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
index 99ae3ef47d..3e0bd25475 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
@@ -59,6 +59,8 @@ public interface FileStoreScan {
 
     FileStoreScan withBucket(int bucket);
 
+    FileStoreScan onlyReadRealBuckets();
+
     FileStoreScan withBucketFilter(Filter<Integer> bucketFilter);
 
     FileStoreScan withTotalAwareBucketFilter(BiFilter<Integer, Integer> 
bucketFilter);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
index e39ad2e3c2..22ba024460 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
@@ -54,16 +54,14 @@ public class KeyValueFileStoreScan extends 
AbstractFileStoreScan {
     private final SimpleStatsEvolutions fieldKeyStatsConverters;
     private final SimpleStatsEvolutions fieldValueStatsConverters;
     private final BucketSelectConverter bucketSelectConverter;
-
-    private Predicate keyFilter;
-    private Predicate valueFilter;
     private final boolean deletionVectorsEnabled;
     private final MergeEngine mergeEngine;
     private final ChangelogProducer changelogProducer;
-
     private final boolean fileIndexReadEnabled;
-    private final Map<Long, Predicate> schemaId2DataFilter = new HashMap<>();
 
+    private Predicate keyFilter;
+    private Predicate valueFilter;
+    private final Map<Long, Predicate> schemaId2DataFilter = new HashMap<>();
     private boolean valueFilterForceEnabled = false;
 
     public KeyValueFileStoreScan(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
index 23a3a576e4..67a15d2058 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
@@ -42,6 +42,7 @@ import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.reader.ReaderSupplier;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.DeletionFile;
 import org.apache.paimon.types.DataField;
@@ -245,7 +246,7 @@ public class MergeFileSplitRead implements 
SplitRead<KeyValue> {
             throw new IllegalArgumentException("This read cannot accept split 
with before files.");
         }
 
-        if (split.isStreaming()) {
+        if (split.isStreaming() || split.bucket() == 
BucketMode.POSTPONE_BUCKET) {
             return createNoMergeReader(
                     split.partition(),
                     split.bucket(),
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketFileStoreWrite.java
new file mode 100644
index 0000000000..b7cc166251
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketFileStoreWrite.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.postpone;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.KeyValueFileWriterFactory;
+import org.apache.paimon.operation.AbstractFileStoreWrite;
+import org.apache.paimon.operation.FileStoreScan;
+import org.apache.paimon.operation.FileStoreWrite;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.CommitIncrement;
+import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.SnapshotManager;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Function;
+
+/** {@link FileStoreWrite} for {@code bucket = -2} tables. */
+public class PostponeBucketFileStoreWrite extends 
AbstractFileStoreWrite<KeyValue> {
+
+    private final CoreOptions options;
+    private final KeyValueFileWriterFactory.Builder writerFactoryBuilder;
+
+    public PostponeBucketFileStoreWrite(
+            FileIO fileIO,
+            TableSchema schema,
+            RowType partitionType,
+            RowType keyType,
+            RowType valueType,
+            Map<String, FileStorePathFactory> format2PathFactory,
+            SnapshotManager snapshotManager,
+            FileStoreScan scan,
+            CoreOptions options,
+            String tableName) {
+        super(
+                snapshotManager,
+                scan,
+                null,
+                null,
+                tableName,
+                options,
+                options.bucket(),
+                partitionType,
+                options.writeMaxWritersToSpill(),
+                options.legacyPartitionName());
+
+        this.options = options;
+        this.writerFactoryBuilder =
+                KeyValueFileWriterFactory.builder(
+                        fileIO,
+                        schema.id(),
+                        keyType,
+                        valueType,
+                        options.fileFormat(),
+                        format2PathFactory,
+                        options.targetFileSize(true));
+
+        // Ignoring previous files saves scanning time.
+        //
+        // For postpone bucket tables, we only append new files to bucket = -2 
directories.
+        //
+        // Also, we don't need to know current largest sequence id, because 
when compacting these
+        // files, we will read the records file by file without merging, and 
then give them to
+        // normal bucket writers.
+        //
+        // Because there is no merging when reading, sequence id across files 
are useless.
+        withIgnorePreviousFiles(true);
+    }
+
+    @Override
+    protected PostponeBucketWriter createWriter(
+            @Nullable Long snapshotId,
+            BinaryRow partition,
+            int bucket,
+            List<DataFileMeta> restoreFiles,
+            long restoredMaxSeqNumber,
+            @Nullable CommitIncrement restoreIncrement,
+            ExecutorService compactExecutor,
+            @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer) {
+        Preconditions.checkArgument(bucket == BucketMode.POSTPONE_BUCKET);
+        KeyValueFileWriterFactory writerFactory =
+                writerFactoryBuilder.build(partition, bucket, options);
+        return new PostponeBucketWriter(writerFactory);
+    }
+
+    @Override
+    protected Function<WriterContainer<KeyValue>, Boolean> 
createWriterCleanChecker() {
+        return createNoConflictAwareWriterCleanChecker();
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketWriter.java
 
b/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketWriter.java
new file mode 100644
index 0000000000..47cafb9645
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketWriter.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.postpone;
+
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.io.KeyValueFileWriterFactory;
+import org.apache.paimon.io.RollingFileWriter;
+import org.apache.paimon.manifest.FileSource;
+import org.apache.paimon.utils.CommitIncrement;
+import org.apache.paimon.utils.RecordWriter;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+/** {@link RecordWriter} for {@code bucket = -2} tables. */
+public class PostponeBucketWriter implements RecordWriter<KeyValue> {
+
+    private final KeyValueFileWriterFactory writerFactory;
+    private RollingFileWriter<KeyValue, DataFileMeta> writer;
+
+    public PostponeBucketWriter(KeyValueFileWriterFactory writerFactory) {
+        this.writerFactory = writerFactory;
+        this.writer = null;
+    }
+
+    @Override
+    public void write(KeyValue record) throws Exception {
+        if (writer == null) {
+            writer = writerFactory.createRollingMergeTreeFileWriter(0, 
FileSource.APPEND);
+        }
+        writer.write(record);
+    }
+
+    @Override
+    public void compact(boolean fullCompaction) throws Exception {}
+
+    @Override
+    public void addNewFiles(List<DataFileMeta> files) {}
+
+    @Override
+    public Collection<DataFileMeta> dataFiles() {
+        // this method is only for checkpointing, while this writer does not 
need any checkpoint
+        return Collections.emptyList();
+    }
+
+    @Override
+    public long maxSequenceNumber() {
+        // see comments in the constructor of PostponeBucketFileStoreWrite
+        return 0;
+    }
+
+    @Override
+    public CommitIncrement prepareCommit(boolean waitCompaction) throws 
Exception {
+        List<DataFileMeta> newFiles = Collections.emptyList();
+        if (writer != null) {
+            writer.close();
+            newFiles = writer.result();
+            writer = null;
+        }
+        return new CommitIncrement(
+                new DataIncrement(newFiles, Collections.emptyList(), 
Collections.emptyList()),
+                CompactIncrement.emptyIncrement(),
+                null);
+    }
+
+    @Override
+    public boolean isCompacting() {
+        return false;
+    }
+
+    @Override
+    public void sync() throws Exception {}
+
+    @Override
+    public void withInsertOnly(boolean insertOnly) {}
+
+    @Override
+    public void close() throws Exception {
+        if (writer != null) {
+            writer.abort();
+            writer = null;
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index 753bc34d95..98e3b48fe4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -35,6 +35,7 @@ import 
org.apache.paimon.schema.SchemaChange.UpdateColumnNullability;
 import org.apache.paimon.schema.SchemaChange.UpdateColumnPosition;
 import org.apache.paimon.schema.SchemaChange.UpdateColumnType;
 import org.apache.paimon.schema.SchemaChange.UpdateComment;
+import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.FileStoreTableFactory;
 import org.apache.paimon.types.ArrayType;
 import org.apache.paimon.types.DataField;
@@ -847,6 +848,10 @@ public class SchemaManager implements Serializable {
             if (newBucket == -1) {
                 throw new UnsupportedOperationException("Cannot change bucket 
to -1.");
             }
+            if (oldBucket == BucketMode.POSTPONE_BUCKET) {
+                throw new UnsupportedOperationException(
+                        "Cannot change bucket for postpone bucket tables.");
+            }
         }
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index 3336d1e5c1..a0a217b39d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -27,6 +27,7 @@ import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.format.FileFormat;
 import org.apache.paimon.options.ConfigOption;
 import org.apache.paimon.options.Options;
+import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.types.ArrayType;
 import org.apache.paimon.types.BigIntType;
 import org.apache.paimon.types.DataField;
@@ -605,7 +606,7 @@ public class SchemaValidation {
                 throw new RuntimeException(
                         "AppendOnlyTable of unware or dynamic bucket does not 
support 'full-compaction.delta-commits'");
             }
-        } else if (bucket < 1) {
+        } else if (bucket < 1 && !isPostponeBucketTable(schema, bucket)) {
             throw new RuntimeException("The number of buckets needs to be 
greater than 0.");
         } else {
             if (schema.crossPartitionUpdate()) {
@@ -645,4 +646,8 @@ public class SchemaValidation {
             }
         }
     }
+
+    private static boolean isPostponeBucketTable(TableSchema schema, int 
bucket) {
+        return !schema.primaryKeys().isEmpty() && bucket == 
BucketMode.POSTPONE_BUCKET;
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index b23e50de19..9ee95afa8c 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -317,7 +317,8 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
                 });
     }
 
-    private FileStoreTable copyInternal(Map<String, String> dynamicOptions, 
boolean tryTimeTravel) {
+    protected FileStoreTable copyInternal(
+            Map<String, String> dynamicOptions, boolean tryTimeTravel) {
         Map<String, String> options = new HashMap<>(tableSchema.options());
 
         // merge non-null dynamic options into schema.options
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
index 9dab573471..5d92d6f707 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
@@ -27,10 +27,9 @@ import org.apache.paimon.operation.DefaultValueAssigner;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.table.source.snapshot.AllDeltaFollowUpScanner;
 import org.apache.paimon.table.source.snapshot.BoundedChecker;
-import 
org.apache.paimon.table.source.snapshot.CompactionChangelogFollowUpScanner;
+import org.apache.paimon.table.source.snapshot.ChangelogFollowUpScanner;
 import org.apache.paimon.table.source.snapshot.DeltaFollowUpScanner;
 import org.apache.paimon.table.source.snapshot.FollowUpScanner;
-import org.apache.paimon.table.source.snapshot.InputChangelogFollowUpScanner;
 import org.apache.paimon.table.source.snapshot.SnapshotReader;
 import org.apache.paimon.table.source.snapshot.StartingContext;
 import org.apache.paimon.table.source.snapshot.StartingScanner;
@@ -264,11 +263,9 @@ public class DataTableStreamScan extends 
AbstractDataTableScan implements Stream
                 followUpScanner = new DeltaFollowUpScanner();
                 break;
             case INPUT:
-                followUpScanner = new InputChangelogFollowUpScanner();
-                break;
             case FULL_COMPACTION:
             case LOOKUP:
-                followUpScanner = new CompactionChangelogFollowUpScanner();
+                followUpScanner = new ChangelogFollowUpScanner();
                 break;
             default:
                 throw new UnsupportedOperationException(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/InputChangelogFollowUpScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ChangelogFollowUpScanner.java
similarity index 71%
rename from 
paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/InputChangelogFollowUpScanner.java
rename to 
paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ChangelogFollowUpScanner.java
index 58edc40f45..2f0ece0b16 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/InputChangelogFollowUpScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ChangelogFollowUpScanner.java
@@ -18,28 +18,24 @@
 
 package org.apache.paimon.table.source.snapshot;
 
-import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.table.source.ScanMode;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/** {@link FollowUpScanner} for {@link CoreOptions.ChangelogProducer#INPUT} 
changelog producer. */
-public class InputChangelogFollowUpScanner implements FollowUpScanner {
+/** {@link FollowUpScanner} for tables with changelog producer. */
+public class ChangelogFollowUpScanner implements FollowUpScanner {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(InputChangelogFollowUpScanner.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(ChangelogFollowUpScanner.class);
 
     @Override
     public boolean shouldScanSnapshot(Snapshot snapshot) {
-        if (snapshot.commitKind() == Snapshot.CommitKind.APPEND) {
+        if (snapshot.changelogManifestList() != null) {
             return true;
         }
 
-        LOG.debug(
-                "Next snapshot id {} is not APPEND, but is {}, check next 
one.",
-                snapshot.id(),
-                snapshot.commitKind());
+        LOG.debug("Next snapshot id {} has no changelog, check next one.", 
snapshot.id());
         return false;
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactionChangelogFollowUpScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactionChangelogFollowUpScanner.java
deleted file mode 100644
index 2688b43468..0000000000
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactionChangelogFollowUpScanner.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.table.source.snapshot;
-
-import org.apache.paimon.CoreOptions;
-import org.apache.paimon.Snapshot;
-import org.apache.paimon.table.source.ScanMode;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * {@link FollowUpScanner} for {@link 
CoreOptions.ChangelogProducer#FULL_COMPACTION} changelog
- * producer.
- */
-public class CompactionChangelogFollowUpScanner implements FollowUpScanner {
-
-    private static final Logger LOG =
-            LoggerFactory.getLogger(CompactionChangelogFollowUpScanner.class);
-
-    @Override
-    public boolean shouldScanSnapshot(Snapshot snapshot) {
-        if (snapshot.commitKind() == Snapshot.CommitKind.COMPACT) {
-            return true;
-        }
-
-        LOG.debug(
-                "Next snapshot id {} is not COMPACT, but is {}, check next 
one.",
-                snapshot.id(),
-                snapshot.commitKind());
-        return false;
-    }
-
-    @Override
-    public SnapshotReader.Plan scan(Snapshot snapshot, SnapshotReader 
snapshotReader) {
-        return 
snapshotReader.withMode(ScanMode.CHANGELOG).withSnapshot(snapshot).read();
-    }
-}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/BatchRecordWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/BatchRecordWriter.java
index 3c4c2bd390..cc874e2e28 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/BatchRecordWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/BatchRecordWriter.java
@@ -24,6 +24,6 @@ import org.apache.paimon.io.BundleRecords;
 /** Write {@link BundleRecords} directly. */
 public interface BatchRecordWriter extends RecordWriter<InternalRow> {
 
-    /** Add a batch elemens to the writer. */
+    /** Add a batch elements to the writer. */
     void writeBundle(BundleRecords record) throws Exception;
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
index 811a2a4e6d..e8e40b76c1 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
@@ -23,6 +23,7 @@ import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.fs.ExternalPathProvider;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.io.DataFilePathFactory;
+import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.types.RowType;
 
 import javax.annotation.Nullable;
@@ -175,7 +176,11 @@ public class FileStorePathFactory {
     }
 
     public Path relativeBucketPath(BinaryRow partition, int bucket) {
-        Path relativeBucketPath = new Path(BUCKET_PATH_PREFIX + bucket);
+        String bucketName = String.valueOf(bucket);
+        if (bucket == BucketMode.POSTPONE_BUCKET) {
+            bucketName = "postpone";
+        }
+        Path relativeBucketPath = new Path(BUCKET_PATH_PREFIX + bucketName);
         String partitionPath = getPartitionString(partition);
         if (!partitionPath.isEmpty()) {
             relativeBucketPath = new Path(partitionPath, relativeBucketPath);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/schema/TableSchemaTest.java 
b/paimon-core/src/test/java/org/apache/paimon/schema/TableSchemaTest.java
index 25d4601c6b..7f0cd60425 100644
--- a/paimon-core/src/test/java/org/apache/paimon/schema/TableSchemaTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/schema/TableSchemaTest.java
@@ -212,7 +212,7 @@ public class TableSchemaTest {
         TableSchema schema =
                 new TableSchema(1, fields, 10, partitionKeys, primaryKeys, 
options, "");
 
-        options.put(BUCKET.key(), "-2");
+        options.put(BUCKET.key(), "-10");
         assertThatThrownBy(() -> validateTableSchema(schema))
                 .hasMessageContaining("The number of buckets needs to be 
greater than 0.");
     }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactionChangelogFollowUpScannerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactionChangelogFollowUpScannerTest.java
index 1bf70b64b4..a51d24eaf4 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactionChangelogFollowUpScannerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactionChangelogFollowUpScannerTest.java
@@ -35,7 +35,10 @@ import java.util.Arrays;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** Tests for {@link CompactionChangelogFollowUpScanner}. */
+/**
+ * Tests for {@link ChangelogFollowUpScanner} when changelog producer is {@link
+ * CoreOptions.ChangelogProducer#FULL_COMPACTION}.
+ */
 public class CompactionChangelogFollowUpScannerTest extends ScannerTestBase {
 
     @Test
@@ -66,7 +69,7 @@ public class CompactionChangelogFollowUpScannerTest extends 
ScannerTestBase {
 
         snapshotReader.withLevelFilter(level -> level == 
table.coreOptions().numLevels() - 1);
         TableRead read = table.newRead();
-        CompactionChangelogFollowUpScanner scanner = new 
CompactionChangelogFollowUpScanner();
+        ChangelogFollowUpScanner scanner = new ChangelogFollowUpScanner();
 
         Snapshot snapshot = snapshotManager.snapshot(1);
         
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/InputChangelogFollowUpScannerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/InputChangelogFollowUpScannerTest.java
index 09686a7c7d..14012068b9 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/InputChangelogFollowUpScannerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/InputChangelogFollowUpScannerTest.java
@@ -35,7 +35,10 @@ import java.util.Arrays;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** Tests for {@link InputChangelogFollowUpScanner}. */
+/**
+ * Tests for {@link ChangelogFollowUpScanner} when changelog producer is {@link
+ * CoreOptions.ChangelogProducer#INPUT}.
+ */
 public class InputChangelogFollowUpScannerTest extends ScannerTestBase {
 
     @Test
@@ -59,7 +62,7 @@ public class InputChangelogFollowUpScannerTest extends 
ScannerTestBase {
         assertThat(snapshotManager.latestSnapshotId()).isEqualTo(3);
 
         TableRead read = table.newRead();
-        InputChangelogFollowUpScanner scanner = new 
InputChangelogFollowUpScanner();
+        ChangelogFollowUpScanner scanner = new ChangelogFollowUpScanner();
 
         Snapshot snapshot = snapshotManager.snapshot(1);
         
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index cf1016a56a..195b5163f8 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -460,6 +460,13 @@ public class FlinkConnectorOptions {
                             "Bounded mode for Paimon consumer. "
                                     + "By default, Paimon automatically 
selects bounded mode based on the mode of the Flink job.");
 
+    public static final ConfigOption<Integer> POSTPONE_DEFAULT_BUCKET_NUM =
+            key("postpone.default-bucket-num")
+                    .intType()
+                    .defaultValue(4)
+                    .withDescription(
+                            "Bucket number for the partitions compacted for 
the first time in postpone bucket tables.");
+
     public static List<ConfigOption<?>> getOptions() {
         final Field[] fields = FlinkConnectorOptions.class.getFields();
         final List<ConfigOption<?>> list = new ArrayList<>(fields.length);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
index 73c96b2c4b..9722847852 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
@@ -19,19 +19,37 @@
 package org.apache.paimon.flink.action;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.flink.compact.UnawareBucketCompactionTopoBuilder;
+import org.apache.paimon.flink.postpone.PostponeBucketCompactSplitSource;
+import 
org.apache.paimon.flink.postpone.RewritePostponeBucketCommittableOperator;
 import org.apache.paimon.flink.predicate.SimpleSqlPredicateConvertor;
+import org.apache.paimon.flink.sink.Committable;
+import org.apache.paimon.flink.sink.CommittableTypeInfo;
 import org.apache.paimon.flink.sink.CompactorSinkBuilder;
+import org.apache.paimon.flink.sink.FixedBucketSink;
+import org.apache.paimon.flink.sink.FlinkSinkBuilder;
+import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
+import org.apache.paimon.flink.sink.RowDataChannelComputer;
 import org.apache.paimon.flink.source.CompactorSourceBuilder;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.PartitionPredicateVisitor;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.Filter;
+import org.apache.paimon.utils.InternalRowPartitionComputer;
+import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.Preconditions;
 
 import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.data.RowData;
@@ -40,8 +58,12 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import java.io.Serializable;
 import java.time.Duration;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -103,22 +125,23 @@ public class CompactAction extends TableActionBase {
 
     @Override
     public void build() throws Exception {
+        buildImpl();
+    }
+
+    private boolean buildImpl() throws Exception {
         ReadableConfig conf = env.getConfiguration();
         boolean isStreaming =
                 conf.get(ExecutionOptions.RUNTIME_MODE) == 
RuntimeExecutionMode.STREAMING;
         FileStoreTable fileStoreTable = (FileStoreTable) table;
-        switch (fileStoreTable.bucketMode()) {
-            case BUCKET_UNAWARE:
-                {
-                    buildForUnawareBucketCompaction(env, fileStoreTable, 
isStreaming);
-                    break;
-                }
-            case HASH_FIXED:
-            case HASH_DYNAMIC:
-            default:
-                {
-                    buildForTraditionalCompaction(env, fileStoreTable, 
isStreaming);
-                }
+
+        if (fileStoreTable.coreOptions().bucket() == 
BucketMode.POSTPONE_BUCKET) {
+            return buildForPostponeBucketCompaction(env, fileStoreTable, 
isStreaming);
+        } else if (fileStoreTable.bucketMode() == BucketMode.BUCKET_UNAWARE) {
+            buildForUnawareBucketCompaction(env, fileStoreTable, isStreaming);
+            return true;
+        } else {
+            buildForTraditionalCompaction(env, fileStoreTable, isStreaming);
+            return true;
         }
     }
 
@@ -207,9 +230,114 @@ public class CompactAction extends TableActionBase {
         return predicate;
     }
 
+    private boolean buildForPostponeBucketCompaction(
+            StreamExecutionEnvironment env, FileStoreTable table, boolean 
isStreaming) {
+        Preconditions.checkArgument(
+                !isStreaming, "Postpone bucket compaction currently only 
supports batch mode");
+        Preconditions.checkArgument(
+                partitions == null,
+                "Postpone bucket compaction currently does not support 
specifying partitions");
+        Preconditions.checkArgument(
+                whereSql == null,
+                "Postpone bucket compaction currently does not support 
predicates");
+
+        // change bucket to a positive value, so we can scan files from the 
bucket = -2 directory
+        Map<String, String> bucketOptions = new HashMap<>(table.options());
+        bucketOptions.put(CoreOptions.BUCKET.key(), "1");
+        FileStoreTable fileStoreTable = 
table.copy(table.schema().copy(bucketOptions));
+
+        List<BinaryRow> partitions =
+                fileStoreTable
+                        .newScan()
+                        .withBucketFilter(new PostponeBucketFilter())
+                        .listPartitions();
+        if (partitions.isEmpty()) {
+            return false;
+        }
+
+        Options options = new Options(fileStoreTable.options());
+        InternalRowPartitionComputer partitionComputer =
+                new InternalRowPartitionComputer(
+                        fileStoreTable.coreOptions().partitionDefaultName(),
+                        fileStoreTable.rowType(),
+                        fileStoreTable.partitionKeys().toArray(new String[0]),
+                        fileStoreTable.coreOptions().legacyPartitionName());
+        for (BinaryRow partition : partitions) {
+            int bucketNum = 
options.get(FlinkConnectorOptions.POSTPONE_DEFAULT_BUCKET_NUM);
+
+            Iterator<ManifestEntry> it =
+                    fileStoreTable
+                            .newSnapshotReader()
+                            
.withPartitionFilter(Collections.singletonList(partition))
+                            .withBucketFilter(new NormalBucketFilter())
+                            .readFileIterator();
+            if (it.hasNext()) {
+                bucketNum = it.next().totalBuckets();
+            }
+
+            bucketOptions = new HashMap<>(table.options());
+            bucketOptions.put(CoreOptions.BUCKET.key(), 
String.valueOf(bucketNum));
+            FileStoreTable realTable = 
table.copy(table.schema().copy(bucketOptions));
+
+            LinkedHashMap<String, String> partitionSpec =
+                    partitionComputer.generatePartValues(partition);
+            Pair<DataStream<RowData>, DataStream<Committable>> sourcePair =
+                    PostponeBucketCompactSplitSource.buildSource(
+                            env,
+                            realTable.fullName() + partitionSpec,
+                            realTable.rowType(),
+                            realTable
+                                    .newReadBuilder()
+                                    .withPartitionFilter(partitionSpec)
+                                    .withBucketFilter(new 
PostponeBucketFilter()),
+                            
options.get(FlinkConnectorOptions.SCAN_PARALLELISM));
+
+            DataStream<InternalRow> partitioned =
+                    FlinkStreamPartitioner.partition(
+                            FlinkSinkBuilder.mapToInternalRow(
+                                    sourcePair.getLeft(), realTable.rowType()),
+                            new RowDataChannelComputer(realTable.schema(), 
false),
+                            null);
+            FixedBucketSink sink = new FixedBucketSink(realTable, null, null);
+            String commitUser =
+                    
CoreOptions.createCommitUser(realTable.coreOptions().toConfiguration());
+            DataStream<Committable> written =
+                    sink.doWrite(partitioned, commitUser, 
partitioned.getParallelism())
+                            .forward()
+                            .transform(
+                                    "Rewrite compact committable",
+                                    new CommittableTypeInfo(),
+                                    new 
RewritePostponeBucketCommittableOperator(realTable));
+            sink.doCommit(written.union(sourcePair.getRight()), commitUser);
+        }
+
+        return true;
+    }
+
+    private static class PostponeBucketFilter implements Filter<Integer>, 
Serializable {
+
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public boolean test(Integer bucket) {
+            return bucket == BucketMode.POSTPONE_BUCKET;
+        }
+    }
+
+    private static class NormalBucketFilter implements Filter<Integer>, 
Serializable {
+
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public boolean test(Integer bucket) {
+            return bucket >= 0;
+        }
+    }
+
     @Override
     public void run() throws Exception {
-        build();
-        execute("Compact job");
+        if (buildImpl()) {
+            execute("Compact job : " + table.fullName());
+        }
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactActionFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactActionFactory.java
index 238e23aaeb..dc9614ce34 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactActionFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactActionFactory.java
@@ -101,12 +101,12 @@ public class CompactActionFactory implements 
ActionFactory {
 
         System.out.println("Syntax:");
         System.out.println(
-                "  compact --warehouse <warehouse_path> --database 
<database_name> \\\n"
-                        + "--table <table_name> [--partition <partition_name>] 
\\\n"
-                        + "[--order_strategy <order_strategy>] \\\n"
-                        + "[--table_conf <key>=<value>] \\\n"
-                        + "[--order_by <order_columns>] \\\n"
-                        + "[--partition_idle_time <partition_idle_time>] \\\n"
+                "  compact --warehouse <warehouse_path> --database 
<database_name> \n"
+                        + "--table <table_name> [--partition <partition_name>] 
\n"
+                        + "[--order_strategy <order_strategy>] \n"
+                        + "[--table_conf <key>=<value>] \n"
+                        + "[--order_by <order_columns>] \n"
+                        + "[--partition_idle_time <partition_idle_time>] \n"
                         + "[--compact_strategy <compact_strategy>]");
         System.out.println(
                 "  compact --warehouse s3://path/to/warehouse --database 
<database_name> "
@@ -140,14 +140,14 @@ public class CompactActionFactory implements 
ActionFactory {
                         + "`full` : Only supports batch mode. All files will 
be selected for merging."
                         + "`minor`: Pick the set of files that need to be 
merged based on specified conditions.");
         System.out.println(
-                "  compact --warehouse s3:///path/to/warehouse \\\n"
-                        + "--database test_db \\\n"
-                        + "--table test_table \\\n"
-                        + "--order_strategy zorder \\\n"
-                        + "--order_by a,b,c \\\n"
-                        + "--table_conf sink.parallelism=9 \\\n"
-                        + "--catalog_conf s3.endpoint=https://****.com \\\n"
-                        + "--catalog_conf s3.access-key=***** \\\n"
+                "  compact --warehouse s3:///path/to/warehouse \n"
+                        + "--database test_db \n"
+                        + "--table test_table \n"
+                        + "--order_strategy zorder \n"
+                        + "--order_by a,b,c \n"
+                        + "--table_conf sink.parallelism=9 \n"
+                        + "--catalog_conf s3.endpoint=https://****.com \n"
+                        + "--catalog_conf s3.access-key=***** \n"
                         + "--catalog_conf s3.secret-key=***** ");
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RescaleAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RescaleAction.java
new file mode 100644
index 0000000000..64a205aee0
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RescaleAction.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.flink.sink.FlinkSinkBuilder;
+import org.apache.paimon.flink.source.FlinkSourceBuilder;
+import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.InternalRowPartitionComputer;
+import org.apache.paimon.utils.Preconditions;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.data.RowData;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Action to rescale one partition of a table. */
+public class RescaleAction extends TableActionBase {
+
+    private @Nullable Integer bucketNum;
+    private Map<String, String> partition = new HashMap<>();
+
+    public RescaleAction(String databaseName, String tableName, Map<String, 
String> catalogConfig) {
+        super(databaseName, tableName, catalogConfig);
+    }
+
+    public RescaleAction withBucketNum(int bucketNum) {
+        this.bucketNum = bucketNum;
+        return this;
+    }
+
+    public RescaleAction withPartition(Map<String, String> partition) {
+        this.partition = partition;
+        return this;
+    }
+
+    @Override
+    public void build() throws Exception {
+        Configuration flinkConf = new Configuration();
+        flinkConf.set(ExecutionOptions.RUNTIME_MODE, 
RuntimeExecutionMode.BATCH);
+        env.configure(flinkConf);
+
+        FileStoreTable fileStoreTable = (FileStoreTable) table;
+        RowType partitionType = fileStoreTable.schema().logicalPartitionType();
+        Predicate partitionPredicate =
+                PartitionPredicate.createPartitionPredicate(
+                        partitionType,
+                        InternalRowPartitionComputer.convertSpecToInternal(
+                                partition,
+                                partitionType,
+                                
fileStoreTable.coreOptions().partitionDefaultName()));
+        DataStream<RowData> source =
+                new FlinkSourceBuilder(fileStoreTable)
+                        .env(env)
+                        .sourceBounded(true)
+                        .predicate(partitionPredicate)
+                        .build();
+
+        Map<String, String> bucketOptions = new 
HashMap<>(fileStoreTable.options());
+        if (bucketNum == null) {
+            Preconditions.checkArgument(
+                    fileStoreTable.coreOptions().bucket() != 
BucketMode.POSTPONE_BUCKET,
+                    "When rescaling postpone bucket tables, you must provide 
the resulting bucket number.");
+        } else {
+            bucketOptions.put(CoreOptions.BUCKET.key(), 
String.valueOf(bucketNum));
+        }
+        FileStoreTable rescaledTable =
+                
fileStoreTable.copy(fileStoreTable.schema().copy(bucketOptions));
+        new 
FlinkSinkBuilder(rescaledTable).overwrite(partition).forRowData(source).build();
+    }
+
+    @Override
+    public void run() throws Exception {
+        build();
+        env.execute("Rescale Postpone Bucket : " + table.fullName());
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RescaleActionFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RescaleActionFactory.java
new file mode 100644
index 0000000000..8a5465ff41
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RescaleActionFactory.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import java.util.Optional;
+
+/** Factory to create {@link RescaleAction}. */
+public class RescaleActionFactory implements ActionFactory {
+
+    public static final String IDENTIFIER = "rescale";
+    private static final String BUCKET_NUM = "bucket_num";
+    private static final String PARTITION = "partition";
+
+    @Override
+    public String identifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Optional<Action> create(MultipleParameterToolAdapter params) {
+        RescaleAction action =
+                new RescaleAction(
+                        params.getRequired(DATABASE),
+                        params.getRequired(TABLE),
+                        catalogConfigMap(params));
+
+        if (params.has(BUCKET_NUM)) {
+            action.withBucketNum(Integer.parseInt(params.get(BUCKET_NUM)));
+        }
+
+        if (params.has(PARTITION)) {
+            action.withPartition(getPartitions(params).get(0));
+        }
+
+        return Optional.of(action);
+    }
+
+    @Override
+    public void printHelp() {
+        System.out.println("Action \"rescale\" rescales one partition of a 
table.");
+        System.out.println();
+
+        System.out.println("Syntax:");
+        System.out.println(
+                "  rescale --warehouse <warehouse_path> --database 
<database_name> "
+                        + "--table <table_name> [--bucket_num <bucket_num>] "
+                        + "[--partition <partition>]");
+        System.out.println(
+                "The default value of argument bucket_num is the current 
bucket number of the table. "
+                        + "For postpone bucket tables, this argument must be 
specified.");
+        System.out.println(
+                "Argument partition must be specified if the table is a 
partitioned table.");
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCompactSplitSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCompactSplitSource.java
new file mode 100644
index 0000000000..0bfab0d196
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCompactSplitSource.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.postpone;
+
+import org.apache.paimon.flink.LogicalTypeConversion;
+import org.apache.paimon.flink.sink.Committable;
+import org.apache.paimon.flink.sink.CommittableTypeInfo;
+import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
+import org.apache.paimon.flink.source.AbstractNonCoordinatedSource;
+import org.apache.paimon.flink.source.AbstractNonCoordinatedSourceReader;
+import org.apache.paimon.flink.source.SimpleSourceSplit;
+import org.apache.paimon.flink.source.operator.ReadOperator;
+import org.apache.paimon.flink.utils.JavaTypeInfo;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.table.sink.ChannelComputer;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.EndOfScanException;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.Preconditions;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.transformations.PartitionTransformation;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Source for compacting postpone bucket tables. This source scans all files 
from {@code bucket =
+ * -2} directory and distributes the files to the readers.
+ */
+public class PostponeBucketCompactSplitSource extends 
AbstractNonCoordinatedSource<Split> {
+
+    private static final long serialVersionUID = 1L;
+    private static final Logger LOG =
+            LoggerFactory.getLogger(PostponeBucketCompactSplitSource.class);
+
+    private final ReadBuilder readBuilder;
+
+    public PostponeBucketCompactSplitSource(ReadBuilder readBuilder) {
+        this.readBuilder = readBuilder;
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return Boundedness.BOUNDED;
+    }
+
+    @Override
+    public SourceReader<Split, SimpleSourceSplit> 
createReader(SourceReaderContext readerContext)
+            throws Exception {
+        return new Reader();
+    }
+
+    private class Reader extends AbstractNonCoordinatedSourceReader<Split> {
+
+        private final TableScan scan = readBuilder.newScan();
+
+        @Override
+        public InputStatus pollNext(ReaderOutput<Split> output) throws 
Exception {
+            try {
+                List<Split> splits = scan.plan().splits();
+
+                for (Split split : splits) {
+                    DataSplit dataSplit = (DataSplit) split;
+                    List<DataFileMeta> files = new 
ArrayList<>(dataSplit.dataFiles());
+                    // we must replay the written records in exact order
+                    
files.sort(Comparator.comparing(DataFileMeta::creationTime));
+                    for (DataFileMeta meta : files) {
+                        DataSplit s =
+                                DataSplit.builder()
+                                        .withPartition(dataSplit.partition())
+                                        .withBucket(dataSplit.bucket())
+                                        .withBucketPath(dataSplit.bucketPath())
+                                        
.withDataFiles(Collections.singletonList(meta))
+                                        .isStreaming(false)
+                                        .build();
+                        output.collect(s);
+                    }
+                }
+            } catch (EndOfScanException esf) {
+                LOG.info("Catching EndOfStreamException, the stream is 
finished.");
+                return InputStatus.END_OF_INPUT;
+            }
+            return InputStatus.MORE_AVAILABLE;
+        }
+    }
+
+    public static Pair<DataStream<RowData>, DataStream<Committable>> 
buildSource(
+            StreamExecutionEnvironment env,
+            String name,
+            RowType rowType,
+            ReadBuilder readBuilder,
+            @Nullable Integer parallelism) {
+        DataStream<Split> source =
+                env.fromSource(
+                                new 
PostponeBucketCompactSplitSource(readBuilder),
+                                WatermarkStrategy.noWatermarks(),
+                                "Compact split generator: " + name,
+                                new JavaTypeInfo<>(Split.class))
+                        .forceNonParallel();
+
+        FlinkStreamPartitioner<Split> partitioner =
+                new FlinkStreamPartitioner<>(new SplitChannelComputer());
+        PartitionTransformation<Split> partitioned =
+                new PartitionTransformation<>(source.getTransformation(), 
partitioner);
+        if (parallelism != null) {
+            partitioned.setParallelism(parallelism);
+        }
+
+        return Pair.of(
+                new DataStream<>(source.getExecutionEnvironment(), partitioned)
+                        .transform(
+                                "Compact split reader: " + name,
+                                
InternalTypeInfo.of(LogicalTypeConversion.toLogicalType(rowType)),
+                                new ReadOperator(readBuilder, null)),
+                source.forward()
+                        .transform(
+                                "Remove new files",
+                                new CommittableTypeInfo(),
+                                new RemovePostponeBucketFilesOperator())
+                        .forceNonParallel());
+    }
+
+    private static class SplitChannelComputer implements 
ChannelComputer<Split> {
+
+        private transient int numChannels;
+        private transient Pattern pattern;
+
+        @Override
+        public void setup(int numChannels) {
+            this.numChannels = numChannels;
+            // see PostponeBucketTableWriteOperator
+            this.pattern = Pattern.compile("-s-(\\d+?)-");
+        }
+
+        @Override
+        public int channel(Split record) {
+            DataSplit dataSplit = (DataSplit) record;
+            String fileName = dataSplit.dataFiles().get(0).fileName();
+
+            Matcher matcher = pattern.matcher(fileName);
+            Preconditions.checkState(
+                    matcher.find(),
+                    "Data file name does not match the pattern. This is 
unexpected.");
+            int subtaskId = Integer.parseInt(matcher.group(1));
+
+            // send records written by the same subtask to the same subtask
+            // to make sure we replay the written records in the exact order
+            return (Math.abs(dataSplit.partition().hashCode()) + subtaskId) % 
numChannels;
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/RemovePostponeBucketFilesOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/RemovePostponeBucketFilesOperator.java
new file mode 100644
index 0000000000..63b44d0f85
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/RemovePostponeBucketFilesOperator.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.postpone;
+
+import org.apache.paimon.flink.sink.Committable;
+import org.apache.paimon.flink.utils.BoundedOneInputOperator;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.Split;
+
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.Collections;
+
+/**
+ * Operator used with {@link PostponeBucketCompactSplitSource}, to remove 
files in {@code bucket =
+ * -2} directory.
+ */
+public class RemovePostponeBucketFilesOperator extends 
BoundedOneInputOperator<Split, Committable> {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public void processElement(StreamRecord<Split> element) throws Exception {
+        DataSplit dataSplit = (DataSplit) element.getValue();
+        CommitMessageImpl message =
+                new CommitMessageImpl(
+                        dataSplit.partition(),
+                        dataSplit.bucket(),
+                        DataIncrement.emptyIncrement(),
+                        new CompactIncrement(
+                                dataSplit.dataFiles(),
+                                Collections.emptyList(),
+                                Collections.emptyList()));
+        output.collect(
+                new StreamRecord<>(
+                        new Committable(Long.MAX_VALUE, Committable.Kind.FILE, 
message)));
+    }
+
+    @Override
+    public void endInput() throws Exception {}
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/RewritePostponeBucketCommittableOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/RewritePostponeBucketCommittableOperator.java
new file mode 100644
index 0000000000..f387222e4f
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/RewritePostponeBucketCommittableOperator.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.postpone;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.flink.sink.Committable;
+import org.apache.paimon.flink.utils.BoundedOneInputOperator;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataFilePathFactory;
+import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.utils.FileStorePathFactory;
+
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Rewrite committable from postpone bucket table compactor. It moves all new 
files into compact
+ * results, and delete unused new files, because compactor only produce 
compact snapshots.
+ */
+public class RewritePostponeBucketCommittableOperator
+        extends BoundedOneInputOperator<Committable, Committable> {
+
+    private static final long serialVersionUID = 1L;
+
+    private final FileStoreTable table;
+
+    private transient FileStorePathFactory pathFactory;
+    private transient Map<BinaryRow, Map<Integer, BucketFiles>> bucketFiles;
+
+    public RewritePostponeBucketCommittableOperator(FileStoreTable table) {
+        this.table = table;
+    }
+
+    @Override
+    public void open() throws Exception {
+        pathFactory = table.store().pathFactory();
+        bucketFiles = new HashMap<>();
+    }
+
+    @Override
+    public void processElement(StreamRecord<Committable> element) throws 
Exception {
+        Committable committable = element.getValue();
+        if (committable.kind() != Committable.Kind.FILE) {
+            output.collect(element);
+        }
+
+        CommitMessageImpl message = (CommitMessageImpl) 
committable.wrappedCommittable();
+        bucketFiles
+                .computeIfAbsent(message.partition(), p -> new HashMap<>())
+                .computeIfAbsent(
+                        message.bucket(),
+                        b ->
+                                new BucketFiles(
+                                        pathFactory.createDataFilePathFactory(
+                                                message.partition(), 
message.bucket()),
+                                        table.fileIO()))
+                .update(message);
+    }
+
+    @Override
+    public void endInput() throws Exception {
+        emitAll(Long.MAX_VALUE);
+    }
+
+    protected void emitAll(long checkpointId) {
+        for (Map.Entry<BinaryRow, Map<Integer, BucketFiles>> partitionEntry :
+                bucketFiles.entrySet()) {
+            for (Map.Entry<Integer, BucketFiles> bucketEntry :
+                    partitionEntry.getValue().entrySet()) {
+                CommitMessageImpl message =
+                        new CommitMessageImpl(
+                                partitionEntry.getKey(),
+                                bucketEntry.getKey(),
+                                DataIncrement.emptyIncrement(),
+                                bucketEntry.getValue().makeIncrement());
+                output.collect(
+                        new StreamRecord<>(
+                                new Committable(checkpointId, 
Committable.Kind.FILE, message)));
+            }
+        }
+        bucketFiles.clear();
+    }
+
+    private static class BucketFiles {
+
+        private final DataFilePathFactory pathFactory;
+        private final FileIO fileIO;
+
+        private final Map<String, DataFileMeta> newFiles;
+        private final List<DataFileMeta> compactBefore;
+        private final List<DataFileMeta> compactAfter;
+        private final List<DataFileMeta> changelogFiles;
+
+        private BucketFiles(DataFilePathFactory pathFactory, FileIO fileIO) {
+            this.pathFactory = pathFactory;
+            this.fileIO = fileIO;
+
+            this.newFiles = new LinkedHashMap<>();
+            this.compactBefore = new ArrayList<>();
+            this.compactAfter = new ArrayList<>();
+            this.changelogFiles = new ArrayList<>();
+        }
+
+        private void update(CommitMessageImpl message) {
+            for (DataFileMeta file : message.newFilesIncrement().newFiles()) {
+                newFiles.put(file.fileName(), file);
+            }
+
+            Map<String, Path> toDelete = new HashMap<>();
+            for (DataFileMeta file : 
message.compactIncrement().compactBefore()) {
+                if (newFiles.containsKey(file.fileName())) {
+                    toDelete.put(file.fileName(), pathFactory.toPath(file));
+                    newFiles.remove(file.fileName());
+                } else {
+                    compactBefore.add(file);
+                }
+            }
+
+            for (DataFileMeta file : 
message.compactIncrement().compactAfter()) {
+                compactAfter.add(file);
+                toDelete.remove(file.fileName());
+            }
+
+            
changelogFiles.addAll(message.newFilesIncrement().changelogFiles());
+            changelogFiles.addAll(message.compactIncrement().changelogFiles());
+
+            toDelete.forEach((fileName, path) -> fileIO.deleteQuietly(path));
+        }
+
+        private CompactIncrement makeIncrement() {
+            List<DataFileMeta> realCompactAfter = new 
ArrayList<>(newFiles.values());
+            realCompactAfter.addAll(compactAfter);
+            return new CompactIncrement(compactBefore, realCompactAfter, 
changelogFiles);
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
index b80693b39e..74ae5f786d 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
@@ -99,7 +99,7 @@ public class CompactProcedure extends ProcedureBase {
             if (checkCompactStrategy(compactStrategy)) {
                 
action.withFullCompaction(compactStrategy.trim().equalsIgnoreCase(FULL));
             }
-            jobName = "Compact Job";
+            jobName = "Compact Job : " + identifier.getFullName();
         } else if (!isNullOrWhitespaceOnly(orderStrategy)
                 && !isNullOrWhitespaceOnly(orderByColumns)) {
             Preconditions.checkArgument(
@@ -113,7 +113,7 @@ public class CompactProcedure extends ProcedureBase {
                                     tableConf)
                             .withOrderStrategy(orderStrategy)
                             .withOrderColumns(orderByColumns.split(","));
-            jobName = "Sort Compact Job";
+            jobName = "Sort Compact Job : " + identifier.getFullName();
         } else {
             throw new IllegalArgumentException(
                     "You must specify 'order strategy' and 'order by columns' 
both.");
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RescaleProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RescaleProcedure.java
new file mode 100644
index 0000000000..c7d2a1f2e7
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RescaleProcedure.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.action.RescaleAction;
+import org.apache.paimon.utils.ParameterUtils;
+
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.procedure.ProcedureContext;
+
+import javax.annotation.Nullable;
+
+/** Procedure to rescale one partition of a table. */
+public class RescaleProcedure extends ProcedureBase {
+
+    public static final String IDENTIFIER = "rescale";
+
+    @ProcedureHint(
+            argument = {
+                @ArgumentHint(name = "table", type = @DataTypeHint("STRING")),
+                @ArgumentHint(name = "bucket_num", type = 
@DataTypeHint("INT"), isOptional = true),
+                @ArgumentHint(name = "partition", type = 
@DataTypeHint("STRING"), isOptional = true)
+            })
+    public String[] call(
+            ProcedureContext procedureContext,
+            String tableId,
+            @Nullable Integer bucketNum,
+            @Nullable String partition)
+            throws Exception {
+        Identifier identifier = Identifier.fromString(tableId);
+        String databaseName = identifier.getDatabaseName();
+        String tableName = identifier.getObjectName();
+
+        RescaleAction action = new RescaleAction(databaseName, tableName, 
catalog.options());
+        if (bucketNum != null) {
+            action.withBucketNum(bucketNum);
+        }
+        if (partition != null) {
+            
action.withPartition(ParameterUtils.getPartitions(partition).get(0));
+        }
+
+        return execute(
+                procedureContext, action, "Rescale Postpone Bucket : " + 
identifier.getFullName());
+    }
+
+    @Override
+    public String identifier() {
+        return IDENTIFIER;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index 2098729858..fb0f8184b5 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -262,7 +262,7 @@ public abstract class FlinkSink<T> implements Serializable {
         return written;
     }
 
-    protected DataStreamSink<?> doCommit(DataStream<Committable> written, 
String commitUser) {
+    public DataStreamSink<?> doCommit(DataStream<Committable> written, String 
commitUser) {
         StreamExecutionEnvironment env = written.getExecutionEnvironment();
         ReadableConfig conf = env.getConfiguration();
         CheckpointConfig checkpointConfig = env.getCheckpointConfig();
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
index d99a7d818f..35a7d744c9 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
@@ -234,7 +234,11 @@ public class FlinkSinkBuilder {
         BucketMode bucketMode = table.bucketMode();
         switch (bucketMode) {
             case HASH_FIXED:
-                return buildForFixedBucket(input);
+                if (table.coreOptions().bucket() == 
BucketMode.POSTPONE_BUCKET) {
+                    return buildPostponeBucketSink(input);
+                } else {
+                    return buildForFixedBucket(input);
+                }
             case HASH_DYNAMIC:
                 return buildDynamicBucketSink(input, false);
             case CROSS_PARTITION:
@@ -246,7 +250,7 @@ public class FlinkSinkBuilder {
         }
     }
 
-    protected DataStream<InternalRow> mapToInternalRow(
+    public static DataStream<InternalRow> mapToInternalRow(
             DataStream<RowData> input, org.apache.paimon.types.RowType 
rowType) {
         SingleOutputStreamOperator<InternalRow> result =
                 input.transform(
@@ -291,6 +295,10 @@ public class FlinkSinkBuilder {
         return sink.sinkFrom(partitioned);
     }
 
+    private DataStreamSink<?> buildPostponeBucketSink(DataStream<InternalRow> 
input) {
+        return new PostponeBucketWriteSink(table, 
overwritePartition).sinkFrom(input, parallelism);
+    }
+
     private DataStreamSink<?> buildUnawareBucketSink(DataStream<InternalRow> 
input) {
         checkArgument(
                 table.primaryKeys().isEmpty(),
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeBucketTableWriteOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeBucketTableWriteOperator.java
new file mode 100644
index 0000000000..a233c6a6cc
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeBucketTableWriteOperator.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.flink.utils.RuntimeContextUtils;
+import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.FileStoreTable;
+
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** {@link TableWriteOperator} for writing records in postpone bucket table. */
+public class PostponeBucketTableWriteOperator extends 
TableWriteOperator<InternalRow> {
+
+    private static final long serialVersionUID = 1L;
+
+    public PostponeBucketTableWriteOperator(
+            StreamOperatorParameters<Committable> parameters,
+            FileStoreTable table,
+            StoreSinkWrite.Provider storeSinkWriteProvider,
+            String initialCommitUser) {
+        super(parameters, table, storeSinkWriteProvider, initialCommitUser);
+    }
+
+    @Override
+    protected boolean containLogSystem() {
+        return false;
+    }
+
+    @Override
+    public void initializeState(StateInitializationContext context) throws 
Exception {
+        Map<String, String> dynamicOptions = new HashMap<>();
+        dynamicOptions.put(
+                CoreOptions.DATA_FILE_PREFIX.key(),
+                String.format(
+                        "%s-u-%s-s-%d-w-",
+                        table.coreOptions().dataFilePrefix(),
+                        getCommitUser(context),
+                        
RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext())));
+        table = table.copy(dynamicOptions);
+
+        super.initializeState(context);
+    }
+
+    @Override
+    public void processElement(StreamRecord<InternalRow> element) throws 
Exception {
+        write.write(element.getValue(), BucketMode.POSTPONE_BUCKET);
+    }
+
+    /** Factory to create {@link PostponeBucketTableWriteOperator}. */
+    public static class Factory extends 
TableWriteOperator.Factory<InternalRow> {
+
+        protected Factory(
+                FileStoreTable table,
+                StoreSinkWrite.Provider storeSinkWriteProvider,
+                String initialCommitUser) {
+            super(table, storeSinkWriteProvider, initialCommitUser);
+        }
+
+        @Override
+        @SuppressWarnings("unchecked")
+        public <T extends StreamOperator<Committable>> T createStreamOperator(
+                StreamOperatorParameters<Committable> parameters) {
+            return (T)
+                    new PostponeBucketTableWriteOperator(
+                            parameters, table, storeSinkWriteProvider, 
initialCommitUser);
+        }
+
+        @Override
+        @SuppressWarnings("rawtypes")
+        public Class<? extends StreamOperator> 
getStreamOperatorClass(ClassLoader classLoader) {
+            return PostponeBucketTableWriteOperator.class;
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeBucketWriteSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeBucketWriteSink.java
new file mode 100644
index 0000000000..4c6d271f72
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeBucketWriteSink.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.table.FileStoreTable;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+
+import static org.apache.paimon.CoreOptions.createCommitUser;
+
+/** {@link FlinkSink} for writing records into postpone bucket table. */
+public class PostponeBucketWriteSink extends FlinkWriteSink<InternalRow> {
+
+    public PostponeBucketWriteSink(
+            FileStoreTable table, @Nullable Map<String, String> 
overwritePartition) {
+        super(table, overwritePartition);
+    }
+
+    @Override
+    protected OneInputStreamOperatorFactory<InternalRow, Committable> 
createWriteOperatorFactory(
+            StoreSinkWrite.Provider writeProvider, String commitUser) {
+        return new PostponeBucketTableWriteOperator.Factory(table, 
writeProvider, commitUser);
+    }
+
+    public DataStreamSink<?> sinkFrom(
+            DataStream<InternalRow> input, @Nullable Integer parallelism) {
+        String initialCommitUser = 
createCommitUser(table.coreOptions().toConfiguration());
+        DataStream<Committable> written =
+                doWrite(
+                        input,
+                        initialCommitUser,
+                        parallelism == null ? input.getParallelism() : 
parallelism);
+        return doCommit(written, initialCommitUser);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
index fd876698c0..ec69ce59f6 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
@@ -43,6 +43,7 @@ public abstract class TableWriteOperator<IN> extends 
PrepareCommitOperator<IN, C
     private final StoreSinkWrite.Provider storeSinkWriteProvider;
     private final String initialCommitUser;
 
+    private transient String commitUser;
     private transient StoreSinkWriteState state;
     protected transient StoreSinkWrite write;
 
@@ -90,11 +91,16 @@ public abstract class TableWriteOperator<IN> extends 
PrepareCommitOperator<IN, C
     }
 
     protected String getCommitUser(StateInitializationContext context) throws 
Exception {
-        // Each job can only have one username and this name must be 
consistent across restarts.
-        // We cannot use job id as commit username here because user may 
change job id by creating
-        // a savepoint, stop the job and then resume from savepoint.
-        return StateUtils.getSingleValueFromState(
-                context, "commit_user_state", String.class, initialCommitUser);
+        if (commitUser == null) {
+            // Each job can only have one username and this name must be 
consistent across restarts.
+            // We cannot use job id as commit username here because user may 
change job id by
+            // creating a savepoint, stop the job and then resume from 
savepoint.
+            commitUser =
+                    StateUtils.getSingleValueFromState(
+                            context, "commit_user_state", String.class, 
initialCommitUser);
+        }
+
+        return commitUser;
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index efaa25627d..ae624f848d 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++ 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -45,6 +45,7 @@ org.apache.paimon.flink.action.RewriteFileIndexActionFactory
 org.apache.paimon.flink.action.ExpireSnapshotsActionFactory
 org.apache.paimon.flink.action.RemoveUnexistingFilesActionFactory
 org.apache.paimon.flink.action.ClearConsumerActionFactory
+org.apache.paimon.flink.action.RescaleActionFactory
 
 ### procedure factories
 org.apache.paimon.flink.procedure.CompactDatabaseProcedure
@@ -87,3 +88,4 @@ org.apache.paimon.flink.procedure.RefreshObjectTableProcedure
 org.apache.paimon.flink.procedure.RemoveUnexistingFilesProcedure
 org.apache.paimon.flink.procedure.ClearConsumersProcedure
 org.apache.paimon.flink.procedure.MigrateIcebergTableProcedure
+org.apache.paimon.flink.procedure.RescaleProcedure
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
new file mode 100644
index 0000000000..d274a02cd8
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
@@ -0,0 +1,438 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.paimon.flink.util.AbstractTestBase;
+
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.config.TableConfigOptions;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT cases for postpone bucket tables. */
+public class PostponeBucketTableITCase extends AbstractTestBase {
+
+    private static final int TIMEOUT = 120;
+
+    @Test
+    public void testWriteThenCompact() throws Exception {
+        String warehouse = getTempDirPath();
+        TableEnvironment tEnv =
+                tableEnvironmentBuilder()
+                        .batchMode()
+                        .setConf(TableConfigOptions.TABLE_DML_SYNC, true)
+                        .build();
+
+        tEnv.executeSql(
+                "CREATE CATALOG mycat WITH (\n"
+                        + "  'type' = 'paimon',\n"
+                        + "  'warehouse' = '"
+                        + warehouse
+                        + "'\n"
+                        + ")");
+        tEnv.executeSql("USE CATALOG mycat");
+        tEnv.executeSql(
+                "CREATE TABLE T (\n"
+                        + "  pt INT,\n"
+                        + "  k INT,\n"
+                        + "  v INT,\n"
+                        + "  PRIMARY KEY (pt, k) NOT ENFORCED\n"
+                        + ") PARTITIONED BY (pt) WITH (\n"
+                        + "  'bucket' = '-2'\n"
+                        + ")");
+
+        int numPartitions = 3;
+        int numKeys = 100;
+        List<String> values = new ArrayList<>();
+        for (int i = 0; i < numPartitions; i++) {
+            for (int j = 0; j < numKeys; j++) {
+                values.add(String.format("(%d, %d, %d)", i, j, i * numKeys + 
j));
+            }
+        }
+        tEnv.executeSql("INSERT INTO T VALUES " + String.join(", ", 
values)).await();
+        assertThat(collect(tEnv.executeSql("SELECT * FROM T"))).isEmpty();
+
+        tEnv.executeSql("CALL sys.compact(`table` => 'default.T')").await();
+
+        List<String> expected = new ArrayList<>();
+        for (int i = 0; i < numPartitions; i++) {
+            expected.add(
+                    String.format(
+                            "+I[%d, %d]",
+                            i, (i * numKeys + i * numKeys + numKeys - 1) * 
numKeys / 2));
+        }
+        String query = "SELECT pt, SUM(v) FROM T GROUP BY pt";
+        
assertThat(collect(tEnv.executeSql(query))).hasSameElementsAs(expected);
+
+        values.clear();
+        int changedPartition = 1;
+        for (int j = 0; j < numKeys; j++) {
+            values.add(
+                    String.format(
+                            "(%d, %d, %d)",
+                            changedPartition, j, -(changedPartition * numKeys 
+ j)));
+        }
+        tEnv.executeSql("INSERT INTO T VALUES " + String.join(", ", 
values)).await();
+        
assertThat(collect(tEnv.executeSql(query))).hasSameElementsAs(expected);
+
+        tEnv.executeSql("CALL sys.compact(`table` => 'default.T')").await();
+
+        expected.clear();
+        for (int i = 0; i < numPartitions; i++) {
+            int val = (i * numKeys + i * numKeys + numKeys - 1) * numKeys / 2;
+            if (i == changedPartition) {
+                val *= -1;
+            }
+            expected.add(String.format("+I[%d, %d]", i, val));
+        }
+        
assertThat(collect(tEnv.executeSql(query))).hasSameElementsAs(expected);
+    }
+
+    @Test
+    public void testOverwrite() throws Exception {
+        String warehouse = getTempDirPath();
+        TableEnvironment tEnv =
+                tableEnvironmentBuilder()
+                        .batchMode()
+                        .setConf(TableConfigOptions.TABLE_DML_SYNC, true)
+                        .build();
+
+        tEnv.executeSql(
+                "CREATE CATALOG mycat WITH (\n"
+                        + "  'type' = 'paimon',\n"
+                        + "  'warehouse' = '"
+                        + warehouse
+                        + "'\n"
+                        + ")");
+        tEnv.executeSql("USE CATALOG mycat");
+        tEnv.executeSql(
+                "CREATE TABLE T (\n"
+                        + "  pt INT,\n"
+                        + "  k INT,\n"
+                        + "  v INT,\n"
+                        + "  PRIMARY KEY (pt, k) NOT ENFORCED\n"
+                        + ") PARTITIONED BY (pt) WITH (\n"
+                        + "  'bucket' = '-2'\n"
+                        + ")");
+
+        tEnv.executeSql(
+                        "INSERT INTO T VALUES (1, 10, 110), (1, 20, 120), (2, 
10, 210), (2, 20, 220)")
+                .await();
+        assertThat(collect(tEnv.executeSql("SELECT * FROM T"))).isEmpty();
+        tEnv.executeSql("CALL sys.compact(`table` => 'default.T')").await();
+        assertThat(collect(tEnv.executeSql("SELECT k, v, pt FROM T")))
+                .containsExactlyInAnyOrder(
+                        "+I[10, 110, 1]", "+I[20, 120, 1]", "+I[10, 210, 2]", 
"+I[20, 220, 2]");
+
+        // no compact, so the result is the same
+        tEnv.executeSql("INSERT INTO T VALUES (2, 40, 240)").await();
+        assertThat(collect(tEnv.executeSql("SELECT k, v, pt FROM T")))
+                .containsExactlyInAnyOrder(
+                        "+I[10, 110, 1]", "+I[20, 120, 1]", "+I[10, 210, 2]", 
"+I[20, 220, 2]");
+
+        tEnv.executeSql("INSERT OVERWRITE T VALUES (2, 20, 221), (2, 30, 
230)").await();
+        assertThat(collect(tEnv.executeSql("SELECT k, v, pt FROM T")))
+                .containsExactlyInAnyOrder("+I[10, 110, 1]", "+I[20, 120, 1]");
+        tEnv.executeSql("CALL sys.compact(`table` => 'default.T')").await();
+        // overwrite should also clean up files in bucket = -2 directory,
+        // which the record with key = 40
+        assertThat(collect(tEnv.executeSql("SELECT k, v, pt FROM T")))
+                .containsExactlyInAnyOrder(
+                        "+I[10, 110, 1]", "+I[20, 120, 1]", "+I[20, 221, 2]", 
"+I[30, 230, 2]");
+    }
+
+    @Timeout(TIMEOUT)
+    @Test
+    public void testLookupChangelogProducer() throws Exception {
+        String warehouse = getTempDirPath();
+        TableEnvironment bEnv =
+                tableEnvironmentBuilder()
+                        .batchMode()
+                        .setConf(TableConfigOptions.TABLE_DML_SYNC, true)
+                        .build();
+
+        String createCatalogSql =
+                "CREATE CATALOG mycat WITH (\n"
+                        + "  'type' = 'paimon',\n"
+                        + "  'warehouse' = '"
+                        + warehouse
+                        + "'\n"
+                        + ")";
+        bEnv.executeSql(createCatalogSql);
+        bEnv.executeSql("USE CATALOG mycat");
+        bEnv.executeSql(
+                "CREATE TABLE T (\n"
+                        + "  pt INT,\n"
+                        + "  k INT,\n"
+                        + "  v INT,\n"
+                        + "  PRIMARY KEY (pt, k) NOT ENFORCED\n"
+                        + ") PARTITIONED BY (pt) WITH (\n"
+                        + "  'bucket' = '-2',\n"
+                        + "  'changelog-producer' = 'lookup'\n"
+                        + ")");
+
+        TableEnvironment sEnv =
+                tableEnvironmentBuilder()
+                        .streamingMode()
+                        .parallelism(1)
+                        .checkpointIntervalMs(1000)
+                        .build();
+        sEnv.executeSql(createCatalogSql);
+        sEnv.executeSql("USE CATALOG mycat");
+        TableResult streamingSelect = sEnv.executeSql("SELECT k, v, pt FROM 
T");
+        JobClient client = streamingSelect.getJobClient().get();
+        CloseableIterator<Row> it = streamingSelect.collect();
+
+        bEnv.executeSql(
+                        "INSERT INTO T VALUES (1, 10, 110), (1, 20, 120), (2, 
10, 210), (2, 20, 220)")
+                .await();
+        assertThat(collect(bEnv.executeSql("SELECT * FROM T"))).isEmpty();
+        bEnv.executeSql("CALL sys.compact(`table` => 'default.T')").await();
+        assertThat(collect(bEnv.executeSql("SELECT k, v, pt FROM T")))
+                .containsExactlyInAnyOrder(
+                        "+I[10, 110, 1]", "+I[20, 120, 1]", "+I[10, 210, 2]", 
"+I[20, 220, 2]");
+        assertThat(collect(client, it, 4))
+                .containsExactlyInAnyOrder(
+                        "+I[10, 110, 1]", "+I[20, 120, 1]", "+I[10, 210, 2]", 
"+I[20, 220, 2]");
+
+        bEnv.executeSql("INSERT INTO T VALUES (1, 20, 121), (2, 30, 
230)").await();
+        bEnv.executeSql("CALL sys.compact(`table` => 'default.T')").await();
+        assertThat(collect(bEnv.executeSql("SELECT k, v, pt FROM T")))
+                .containsExactlyInAnyOrder(
+                        "+I[10, 110, 1]",
+                        "+I[20, 121, 1]",
+                        "+I[10, 210, 2]",
+                        "+I[20, 220, 2]",
+                        "+I[30, 230, 2]");
+        assertThat(collect(client, it, 3))
+                .containsExactlyInAnyOrder("-U[20, 120, 1]", "+U[20, 121, 1]", 
"+I[30, 230, 2]");
+
+        it.close();
+    }
+
+    @Test
+    public void testRescaleBucket() throws Exception {
+        String warehouse = getTempDirPath();
+        TableEnvironment tEnv =
+                tableEnvironmentBuilder()
+                        .batchMode()
+                        .setConf(TableConfigOptions.TABLE_DML_SYNC, true)
+                        .build();
+
+        tEnv.executeSql(
+                "CREATE CATALOG mycat WITH (\n"
+                        + "  'type' = 'paimon',\n"
+                        + "  'warehouse' = '"
+                        + warehouse
+                        + "'\n"
+                        + ")");
+        tEnv.executeSql("USE CATALOG mycat");
+        tEnv.executeSql(
+                "CREATE TABLE T (\n"
+                        + "  pt INT,\n"
+                        + "  k INT,\n"
+                        + "  v INT,\n"
+                        + "  PRIMARY KEY (pt, k) NOT ENFORCED\n"
+                        + ") PARTITIONED BY (pt) WITH (\n"
+                        + "  'bucket' = '-2',\n"
+                        + "  'postpone.default-bucket-num' = '2'\n"
+                        + ")");
+
+        int numPartitions = 3;
+        int numKeys = 100;
+        List<String> values = new ArrayList<>();
+        for (int i = 0; i < numPartitions; i++) {
+            for (int j = 0; j < numKeys; j++) {
+                values.add(String.format("(%d, %d, %d)", i, j, i * numKeys + 
j));
+            }
+        }
+        tEnv.executeSql("INSERT INTO T VALUES " + String.join(", ", 
values)).await();
+        tEnv.executeSql("CALL sys.compact(`table` => 'default.T')").await();
+
+        List<String> expectedBuckets = new ArrayList<>();
+        for (int i = 0; i < numPartitions; i++) {
+            expectedBuckets.add(String.format("+I[{%d}, 2]", i));
+        }
+        String bucketSql =
+                "SELECT `partition`, COUNT(DISTINCT bucket) FROM `T$files` 
GROUP BY `partition`";
+        
assertThat(collect(tEnv.executeSql(bucketSql))).hasSameElementsAs(expectedBuckets);
+
+        List<String> expectedData = new ArrayList<>();
+        for (int i = 0; i < numPartitions; i++) {
+            expectedData.add(
+                    String.format(
+                            "+I[%d, %d]",
+                            i, (i * numKeys + i * numKeys + numKeys - 1) * 
numKeys / 2));
+        }
+        String query = "SELECT pt, SUM(v) FROM T GROUP BY pt";
+        
assertThat(collect(tEnv.executeSql(query))).hasSameElementsAs(expectedData);
+
+        // before rescaling, write some files in bucket = -2 directory,
+        // these files should not be touched by rescaling
+        values.clear();
+        int changedPartition = 1;
+        for (int j = 0; j < numKeys; j++) {
+            values.add(
+                    String.format(
+                            "(%d, %d, %d)",
+                            changedPartition, j, -(changedPartition * numKeys 
+ j)));
+        }
+        tEnv.executeSql("INSERT INTO T VALUES " + String.join(", ", 
values)).await();
+
+        tEnv.executeSql(
+                "CALL sys.rescale(`table` => 'default.T', `bucket_num` => 4, 
`partition` => 'pt="
+                        + changedPartition
+                        + "')");
+        expectedBuckets.clear();
+        for (int i = 0; i < numPartitions; i++) {
+            expectedBuckets.add(String.format("+I[{%d}, %d]", i, i == 
changedPartition ? 4 : 2));
+        }
+        
assertThat(collect(tEnv.executeSql(bucketSql))).hasSameElementsAs(expectedBuckets);
+        
assertThat(collect(tEnv.executeSql(query))).hasSameElementsAs(expectedData);
+
+        // rescaling bucket should not touch the files in bucket = -2 directory
+        tEnv.executeSql("CALL sys.compact(`table` => 'default.T')").await();
+        
assertThat(collect(tEnv.executeSql(bucketSql))).hasSameElementsAs(expectedBuckets);
+
+        expectedData.clear();
+        for (int i = 0; i < numPartitions; i++) {
+            int val = (i * numKeys + i * numKeys + numKeys - 1) * numKeys / 2;
+            if (i == changedPartition) {
+                val *= -1;
+            }
+            expectedData.add(String.format("+I[%d, %d]", i, val));
+        }
+        
assertThat(collect(tEnv.executeSql(query))).hasSameElementsAs(expectedData);
+    }
+
+    @Timeout(TIMEOUT)
+    @Test
+    public void testInputChangelogProducer() throws Exception {
+        String warehouse = getTempDirPath();
+        TableEnvironment sEnv =
+                tableEnvironmentBuilder()
+                        .streamingMode()
+                        .parallelism(1)
+                        .checkpointIntervalMs(500)
+                        .build();
+        String createCatalog =
+                "CREATE CATALOG mycat WITH (\n"
+                        + "  'type' = 'paimon',\n"
+                        + "  'warehouse' = '"
+                        + warehouse
+                        + "'\n"
+                        + ")";
+        sEnv.executeSql(createCatalog);
+        sEnv.executeSql("USE CATALOG mycat");
+        sEnv.executeSql(
+                "CREATE TEMPORARY TABLE S (\n"
+                        + "  i INT\n"
+                        + ") WITH (\n"
+                        + "  'connector' = 'datagen',\n"
+                        + "  'fields.i.kind' = 'sequence',\n"
+                        + "  'fields.i.start' = '0',\n"
+                        + "  'fields.i.end' = '199',\n"
+                        + "  'number-of-rows' = '200',\n"
+                        + "  'rows-per-second' = '50'\n"
+                        + ")");
+        sEnv.executeSql(
+                "CREATE TABLE T (\n"
+                        + "  k INT,\n"
+                        + "  v INT,\n"
+                        + "  PRIMARY KEY (k) NOT ENFORCED\n"
+                        + ") WITH (\n"
+                        + "  'bucket' = '-2',\n"
+                        + "  'changelog-producer' = 'input',\n"
+                        + "  'continuous.discovery-interval' = '1ms'\n"
+                        + ")");
+        sEnv.executeSql(
+                "CREATE TEMPORARY VIEW V AS SELECT MOD(i, 2) AS x, IF(MOD(i, 
2) = 0, 1, 1000) AS y FROM S");
+        sEnv.executeSql("INSERT INTO T SELECT SUM(y), x FROM V GROUP BY 
x").await();
+
+        TableEnvironment bEnv =
+                tableEnvironmentBuilder()
+                        .batchMode()
+                        .parallelism(2)
+                        .setConf(TableConfigOptions.TABLE_DML_SYNC, true)
+                        .build();
+        bEnv.executeSql(createCatalog);
+        bEnv.executeSql("USE CATALOG mycat");
+        bEnv.executeSql("CALL sys.compact(`table` => 'default.T')").await();
+
+        // if the read order when compacting is wrong, this check will fail
+        assertThat(collect(bEnv.executeSql("SELECT * FROM T")))
+                .containsExactlyInAnyOrder("+U[100, 0]", "+U[100000, 1]");
+        TableResult streamingSelect =
+                sEnv.executeSql("SELECT * FROM T /*+ 
OPTIONS('scan.snapshot-id' = '1') */");
+        JobClient client = streamingSelect.getJobClient().get();
+        CloseableIterator<Row> it = streamingSelect.collect();
+        // if the number of changelog is not sufficient, this call will fail
+        collect(client, it, 400 - 2);
+    }
+
+    private List<String> collect(TableResult result) throws Exception {
+        List<String> ret = new ArrayList<>();
+        try (CloseableIterator<Row> it = result.collect()) {
+            while (it.hasNext()) {
+                ret.add(it.next().toString());
+            }
+        }
+        return ret;
+    }
+
+    private List<String> collect(JobClient client, CloseableIterator<Row> it, 
int limit)
+            throws Exception {
+        AtomicBoolean shouldStop = new AtomicBoolean(false);
+        Thread timerThread =
+                new Thread(
+                        () -> {
+                            try {
+                                for (int i = 0; i < TIMEOUT; i++) {
+                                    Thread.sleep(1000);
+                                    if (shouldStop.get()) {
+                                        return;
+                                    }
+                                }
+                                client.cancel().get();
+                            } catch (Exception e) {
+                                throw new RuntimeException(e);
+                            }
+                        });
+        timerThread.start();
+
+        List<String> ret = new ArrayList<>();
+        for (int i = 0; i < limit && it.hasNext(); i++) {
+            ret.add(it.next().toString());
+        }
+
+        shouldStop.set(true);
+        timerThread.join();
+        return ret;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RescaleBucketITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RescaleBucketITCase.java
index d5747d2e28..d44ba03f12 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RescaleBucketITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RescaleBucketITCase.java
@@ -26,6 +26,7 @@ import org.apache.paimon.utils.SnapshotManager;
 
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.table.api.config.TableConfigOptions;
 import org.apache.flink.types.Row;
 import org.junit.jupiter.api.Test;
 
@@ -35,6 +36,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ThreadLocalRandom;
 
 import static org.apache.paimon.CoreOptions.BUCKET;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -204,7 +206,13 @@ public class RescaleBucketITCase extends CatalogITCaseBase 
{
                         "Try to write table with a new bucket num 4, but the 
previous bucket num is 2. "
                                 + "Please switch to batch mode, and perform 
INSERT OVERWRITE to rescale current data layout first.");
 
-        batchSql(rescaleOverwriteSql, tableName, tableName);
+        if (ThreadLocalRandom.current().nextBoolean()) {
+            batchSql(rescaleOverwriteSql, tableName, tableName);
+        } else {
+            tEnv.getConfig().set(TableConfigOptions.TABLE_DML_SYNC, true);
+            batchSql(String.format("CALL sys.rescale(`table` => 
'default.%s')", tableName));
+        }
+
         snapshot = findLatestSnapshot(tableName);
         assertThat(snapshot).isNotNull();
         assertThat(snapshot.id()).isEqualTo(2L);

Reply via email to