This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new a9532bd175 [flink] Postpone bucket table supports to batch write fixed 
bucket (#6460)
a9532bd175 is described below

commit a9532bd175fe94b0bc907fd94f8d9bd192aff605
Author: yuzelin <[email protected]>
AuthorDate: Fri Oct 31 10:06:47 2025 +0800

    [flink] Postpone bucket table supports to batch write fixed bucket (#6460)
---
 .../shortcodes/generated/core_configuration.html   |   6 +
 .../main/java/org/apache/paimon/CoreOptions.java   |  11 +
 .../apache/paimon/table/sink/TableWriteImpl.java   |   2 -
 .../apache/paimon/flink/sink/FlinkSinkBuilder.java |  63 ++-
 .../apache/paimon/flink/sink/FlinkWriteSink.java   |  26 +-
 .../flink/sink/PostponeBatchWriteOperator.java     | 118 ++++++
 .../sink/PostponeFixedBucketChannelComputer.java   |  66 +++
 .../paimon/flink/sink/PostponeFixedBucketSink.java |  93 +++++
 .../flink/sink/RowDataStoreWriteOperator.java      |  16 +-
 .../sink/StatelessRowDataStoreWriteOperator.java   |  58 +++
 .../paimon/flink/sink/StoreSinkWriteImpl.java      |   2 -
 .../paimon/flink/CrossPartitionTableITCase.java    |   2 +-
 .../paimon/flink/PostponeBucketTableITCase.java    | 450 ++++++++++++++++++++-
 13 files changed, 859 insertions(+), 54 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 9a5f3c99ad..d26fa9b59d 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -941,6 +941,12 @@ This config option does not affect the default filesystem 
metastore.</td>
             <td>String</td>
             <td>You can specify a pattern to get a timestamp from partitions. 
The formatter pattern is defined by 'partition.timestamp-formatter'.<ul><li>By 
default, read from the first field.</li><li>If the timestamp in the partition 
is a single field called 'dt', you can use '$dt'.</li><li>If it is spread 
across multiple fields for year, month, day, and hour, you can use 
'$year-$month-$day $hour:00:00'.</li><li>If the timestamp is in fields dt and 
hour, you can use '$dt $hour:00:00'.</ [...]
         </tr>
+        <tr>
+            <td><h5>postpone.batch-write-fixed-bucket</h5></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Boolean</td>
+            <td>Whether to write the data into fixed bucket for batch writing 
a postpone bucket table.</td>
+        </tr>
         <tr>
             <td><h5>primary-key</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index bac0d58fd9..f5c90b4196 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -2042,6 +2042,13 @@ public class CoreOptions implements Serializable {
                     .defaultValue(false)
                     .withDescription("Whether discard duplicate files in 
commit.");
 
+    public static final ConfigOption<Boolean> 
POSTPONE_BATCH_WRITE_FIXED_BUCKET =
+            key("postpone.batch-write-fixed-bucket")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "Whether to write the data into fixed bucket for 
batch writing a postpone bucket table.");
+
     private final Options options;
 
     public CoreOptions(Map<String, String> options) {
@@ -3143,6 +3150,10 @@ public class CoreOptions implements Serializable {
         return options.get(BLOB_AS_DESCRIPTOR);
     }
 
+    public boolean postponeBatchWriteFixedBucket() {
+        return options.get(POSTPONE_BATCH_WRITE_FIXED_BUCKET);
+    }
+
     /** Specifies the merge engine for table with primary key. */
     public enum MergeEngine implements DescribedEnum {
         DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
index 325911b0ea..8d5757dcd3 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.table.sink;
 
 import org.apache.paimon.FileStore;
-import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.casting.DefaultValueRow;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
@@ -277,7 +276,6 @@ public class TableWriteImpl<T> implements InnerTableWrite, 
Restorable<List<State
         write.restore(state);
     }
 
-    @VisibleForTesting
     public FileStoreWrite<T> getWrite() {
         return write;
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
index af3d407d95..0d50178dd5 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
@@ -22,12 +22,14 @@ import org.apache.paimon.CoreOptions;
 import org.apache.paimon.CoreOptions.PartitionSinkStrategy;
 import org.apache.paimon.annotation.Public;
 import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.flink.FlinkRowWrapper;
 import org.apache.paimon.flink.sink.index.GlobalDynamicBucketSink;
 import org.apache.paimon.flink.sorter.TableSortInfo;
 import org.apache.paimon.flink.sorter.TableSorter;
+import org.apache.paimon.manifest.SimpleFileEntry;
 import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
@@ -54,6 +56,8 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.paimon.CoreOptions.BUCKET;
+import static org.apache.paimon.CoreOptions.WRITE_ONLY;
 import static org.apache.paimon.CoreOptions.clusteringStrategy;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.CLUSTERING_SAMPLE_FACTOR;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.MIN_CLUSTERING_SAMPLE_FACTOR;
@@ -292,16 +296,59 @@ public class FlinkSinkBuilder {
     }
 
     private DataStreamSink<?> buildPostponeBucketSink(DataStream<InternalRow> 
input) {
-        ChannelComputer<InternalRow> channelComputer;
-        if (!table.partitionKeys().isEmpty()
-                && table.coreOptions().partitionSinkStrategy() == 
PartitionSinkStrategy.HASH) {
-            channelComputer = new 
RowDataHashPartitionChannelComputer(table.schema());
+        if (isStreaming(input) || 
!table.coreOptions().postponeBatchWriteFixedBucket()) {
+            ChannelComputer<InternalRow> channelComputer;
+            if (!table.partitionKeys().isEmpty()
+                    && table.coreOptions().partitionSinkStrategy() == 
PartitionSinkStrategy.HASH) {
+                channelComputer = new 
RowDataHashPartitionChannelComputer(table.schema());
+            } else {
+                channelComputer = new 
PostponeBucketChannelComputer(table.schema());
+            }
+            DataStream<InternalRow> partitioned = partition(input, 
channelComputer, parallelism);
+            PostponeBucketSink sink = new PostponeBucketSink(table, 
overwritePartition);
+            return sink.sinkFrom(partitioned);
         } else {
-            channelComputer = new 
PostponeBucketChannelComputer(table.schema());
+            Map<BinaryRow, Integer> knownNumBuckets = getKnownNumBuckets();
+
+            DataStream<InternalRow> partitioned =
+                    partition(
+                            input,
+                            new 
PostponeFixedBucketChannelComputer(table.schema(), knownNumBuckets),
+                            parallelism);
+
+            Map<String, String> batchWriteOptions = new HashMap<>();
+            batchWriteOptions.put(WRITE_ONLY.key(), "true");
+            // It's just used to create merge tree writer for writing files to 
fixed bucket.
+            // The real bucket number is determined at runtime.
+            batchWriteOptions.put(BUCKET.key(), "1");
+            FileStoreTable tableForWrite = table.copy(batchWriteOptions);
+
+            PostponeFixedBucketSink sink =
+                    new PostponeFixedBucketSink(tableForWrite, 
overwritePartition, knownNumBuckets);
+            return sink.sinkFrom(partitioned);
         }
-        DataStream<InternalRow> partitioned = partition(input, 
channelComputer, parallelism);
-        PostponeBucketSink sink = new PostponeBucketSink(table, 
overwritePartition);
-        return sink.sinkFrom(partitioned);
+    }
+
+    private Map<BinaryRow, Integer> getKnownNumBuckets() {
+        Map<BinaryRow, Integer> knownNumBuckets = new HashMap<>();
+        List<SimpleFileEntry> simpleFileEntries =
+                
table.store().newScan().onlyReadRealBuckets().readSimpleEntries();
+        for (SimpleFileEntry entry : simpleFileEntries) {
+            if (entry.totalBuckets() >= 0) {
+                Integer oldTotalBuckets =
+                        knownNumBuckets.put(entry.partition(), 
entry.totalBuckets());
+                if (oldTotalBuckets != null && oldTotalBuckets != 
entry.totalBuckets()) {
+                    throw new IllegalStateException(
+                            "Partition "
+                                    + entry.partition()
+                                    + " has different totalBuckets "
+                                    + oldTotalBuckets
+                                    + " and "
+                                    + entry.totalBuckets());
+                }
+            }
+        }
+        return knownNumBuckets;
     }
 
     private DataStreamSink<?> buildUnawareBucketSink(DataStream<InternalRow> 
input) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java
index 1656ba8a8f..d694604627 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java
@@ -24,7 +24,6 @@ import 
org.apache.paimon.manifest.ManifestCommittableSerializer;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.table.FileStoreTable;
 
-import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
@@ -40,7 +39,7 @@ public abstract class FlinkWriteSink<T> extends FlinkSink<T> {
 
     private static final long serialVersionUID = 1L;
 
-    @Nullable private final Map<String, String> overwritePartition;
+    @Nullable protected final Map<String, String> overwritePartition;
 
     public FlinkWriteSink(FileStoreTable table, @Nullable Map<String, String> 
overwritePartition) {
         super(table, overwritePartition != null);
@@ -81,27 +80,8 @@ public abstract class FlinkWriteSink<T> extends FlinkSink<T> 
{
             @Override
             @SuppressWarnings("unchecked, rawtypes")
             public StreamOperator 
createStreamOperator(StreamOperatorParameters parameters) {
-                return new RowDataStoreWriteOperator(
-                        parameters, table, logSinkFunction, writeProvider, 
commitUser) {
-
-                    @Override
-                    protected StoreSinkWriteState createState(
-                            int subtaskId,
-                            StateInitializationContext context,
-                            StoreSinkWriteState.StateValueFilter stateFilter) {
-                        // No conflicts will occur in append only unaware 
bucket writer, so no state
-                        // is needed.
-                        return new NoopStoreSinkWriteState(subtaskId);
-                    }
-
-                    @Override
-                    protected String getCommitUser(StateInitializationContext 
context)
-                            throws Exception {
-                        // No conflicts will occur in append only unaware 
bucket writer, so
-                        // commitUser does not matter.
-                        return commitUser == null ? initialCommitUser : 
commitUser;
-                    }
-                };
+                return new StatelessRowDataStoreWriteOperator(
+                        parameters, table, logSinkFunction, writeProvider, 
commitUser);
             }
         };
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeBatchWriteOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeBatchWriteOperator.java
new file mode 100644
index 0000000000..a47925accf
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeBatchWriteOperator.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.bucket.BucketFunction;
+import org.apache.paimon.codegen.CodeGenUtils;
+import org.apache.paimon.codegen.Projection;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.flink.utils.RuntimeContextUtils;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.table.sink.RowPartitionKeyExtractor;
+import org.apache.paimon.table.sink.SinkRecord;
+
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
+
+/** Writer to write {@link InternalRow} to postpone fixed bucket. */
+public class PostponeBatchWriteOperator extends 
StatelessRowDataStoreWriteOperator {
+
+    private static final long serialVersionUID = 1L;
+
+    private final Map<BinaryRow, Integer> knownNumBuckets;
+    private final BucketFunction bucketFunction;
+
+    private transient RowPartitionKeyExtractor partitionKeyExtractor;
+    private transient int defaultNumBuckets;
+    private transient Projection bucketKeyProjection;
+
+    public PostponeBatchWriteOperator(
+            StreamOperatorParameters<Committable> parameters,
+            FileStoreTable table,
+            StoreSinkWrite.Provider storeSinkWriteProvider,
+            String initialCommitUser,
+            Map<BinaryRow, Integer> knownNumBuckets) {
+        super(parameters, table, null, storeSinkWriteProvider, 
initialCommitUser);
+        this.knownNumBuckets = new HashMap<>(knownNumBuckets);
+        this.bucketFunction =
+                BucketFunction.create(
+                        new CoreOptions(table.options()), 
table.schema().logicalBucketKeyType());
+    }
+
+    public void open() throws Exception {
+        super.open();
+
+        int sinkParallelism = 
RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext());
+        this.defaultNumBuckets = sinkParallelism <= 0 ? 1 : sinkParallelism;
+
+        TableSchema schema = table.schema();
+        this.partitionKeyExtractor = new RowPartitionKeyExtractor(schema);
+        this.bucketKeyProjection =
+                CodeGenUtils.newProjection(
+                        schema.logicalRowType(), 
schema.projection(schema.bucketKeys()));
+        ((StoreSinkWriteImpl) 
write).getWrite().getWrite().withIgnoreNumBucketCheck(true);
+    }
+
+    @Override
+    @Nullable
+    protected SinkRecord write(InternalRow row) throws Exception {
+        BinaryRow partition = partitionKeyExtractor.partition(row);
+        int numBuckets = knownNumBuckets.computeIfAbsent(partition.copy(), p 
-> defaultNumBuckets);
+        int bucket = bucketFunction.bucket(bucketKeyProjection.apply(row), 
numBuckets);
+        return write.write(row, bucket);
+    }
+
+    @Override
+    protected List<Committable> prepareCommit(boolean waitCompaction, long 
checkpointId)
+            throws IOException {
+        List<Committable> committables = new ArrayList<>();
+        for (Committable committable : super.prepareCommit(waitCompaction, 
checkpointId)) {
+            if (committable.kind() == Committable.Kind.FILE) {
+                CommitMessageImpl message = (CommitMessageImpl) 
committable.wrappedCommittable();
+                committables.add(
+                        new Committable(
+                                committable.checkpointId(),
+                                committable.kind(),
+                                new CommitMessageImpl(
+                                        message.partition(),
+                                        message.bucket(),
+                                        
checkNotNull(knownNumBuckets.get(message.partition())),
+                                        message.newFilesIncrement(),
+                                        message.compactIncrement())));
+            } else {
+                committables.add(committable);
+            }
+        }
+
+        return committables;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeFixedBucketChannelComputer.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeFixedBucketChannelComputer.java
new file mode 100644
index 0000000000..5fd55db13e
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeFixedBucketChannelComputer.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.sink.ChannelComputer;
+import org.apache.paimon.table.sink.RowPartitionKeyExtractor;
+
+import java.util.Map;
+
+/**
+ * {@link ChannelComputer} for writing {@link InternalRow}s into the fixed 
bucket of postpone-bucket
+ * tables.
+ */
+public class PostponeFixedBucketChannelComputer implements 
ChannelComputer<InternalRow> {
+
+    private static final long serialVersionUID = 1L;
+
+    private final TableSchema schema;
+    private final Map<BinaryRow, Integer> knownNumBuckets;
+
+    private transient int numChannels;
+    private transient RowPartitionKeyExtractor partitionKeyExtractor;
+
+    public PostponeFixedBucketChannelComputer(
+            TableSchema schema, Map<BinaryRow, Integer> knownNumBuckets) {
+        this.schema = schema;
+        this.knownNumBuckets = knownNumBuckets;
+    }
+
+    @Override
+    public void setup(int numChannels) {
+        this.numChannels = numChannels;
+        this.partitionKeyExtractor = new RowPartitionKeyExtractor(schema);
+    }
+
+    @Override
+    public int channel(InternalRow record) {
+        BinaryRow partition = partitionKeyExtractor.partition(record);
+        int bucket = knownNumBuckets.computeIfAbsent(partition, p -> 
numChannels);
+        return ChannelComputer.select(partition, bucket, numChannels);
+    }
+
+    @Override
+    public String toString() {
+        return "shuffle by bucket";
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeFixedBucketSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeFixedBucketSink.java
new file mode 100644
index 0000000000..92763c0cbe
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeFixedBucketSink.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.manifest.ManifestCommittable;
+import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.FileStoreTable;
+
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.Map;
+
+import static org.apache.paimon.CoreOptions.BUCKET;
+
+/** {@link FlinkSink} for writing records into fixed bucket of postpone table. 
*/
+public class PostponeFixedBucketSink extends FlinkWriteSink<InternalRow> {
+
+    private static final long serialVersionUID = 1L;
+
+    private final Map<BinaryRow, Integer> knownNumBuckets;
+
+    public PostponeFixedBucketSink(
+            FileStoreTable table,
+            @Nullable Map<String, String> overwritePartition,
+            Map<BinaryRow, Integer> knownNumBuckets) {
+        super(table, overwritePartition);
+        this.knownNumBuckets = knownNumBuckets;
+    }
+
+    @Override
+    protected OneInputStreamOperatorFactory<InternalRow, Committable> 
createWriteOperatorFactory(
+            StoreSinkWrite.Provider writeProvider, String commitUser) {
+        return new RowDataStoreWriteOperator.Factory(table, null, 
writeProvider, commitUser) {
+            @Override
+            @SuppressWarnings("unchecked, rawtypes")
+            public StreamOperator 
createStreamOperator(StreamOperatorParameters parameters) {
+                return new PostponeBatchWriteOperator(
+                        parameters, table, writeProvider, commitUser, 
knownNumBuckets);
+            }
+        };
+    }
+
+    @Override
+    protected CommittableStateManager<ManifestCommittable> 
createCommittableStateManager() {
+        return createRestoreOnlyCommittableStateManager(table);
+    }
+
+    @Override
+    protected Committer.Factory<Committable, ManifestCommittable> 
createCommitterFactory() {
+        if (overwritePartition == null) {
+            // The table has copied bucket option outside, no need to change 
anything
+            return super.createCommitterFactory();
+        } else {
+            // When overwriting, the postpone bucket files need to be deleted, 
so using a postpone
+            // bucket table commit here
+            FileStoreTable tableForCommit =
+                    table.copy(
+                            Collections.singletonMap(
+                                    BUCKET.key(), 
String.valueOf(BucketMode.POSTPONE_BUCKET)));
+            return context ->
+                    new StoreCommitter(
+                            tableForCommit,
+                            tableForCommit
+                                    .newCommit(context.commitUser())
+                                    .withOverwrite(overwritePartition)
+                                    
.ignoreEmptyCommit(!context.streamingCheckpointEnabled()),
+                            context);
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
index 96ab1be510..4a9b64f7b1 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
@@ -137,12 +137,7 @@ public class RowDataStoreWriteOperator extends 
TableWriteOperator<InternalRow> {
     public void processElement(StreamRecord<InternalRow> element) throws 
Exception {
         sinkContext.timestamp = element.hasTimestamp() ? 
element.getTimestamp() : null;
 
-        SinkRecord record;
-        try {
-            record = write.write(element.getValue());
-        } catch (Exception e) {
-            throw new IOException(e);
-        }
+        SinkRecord record = write(element.getValue());
 
         if (record != null
                 && logSinkFunction != null
@@ -153,6 +148,15 @@ public class RowDataStoreWriteOperator extends 
TableWriteOperator<InternalRow> {
         }
     }
 
+    @Nullable
+    protected SinkRecord write(InternalRow row) throws Exception {
+        try {
+            return write.write(row);
+        } catch (Exception e) {
+            throw new IOException(e);
+        }
+    }
+
     @Override
     public void snapshotState(StateSnapshotContext context) throws Exception {
         super.snapshotState(context);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StatelessRowDataStoreWriteOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StatelessRowDataStoreWriteOperator.java
new file mode 100644
index 0000000000..3c8af5e629
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StatelessRowDataStoreWriteOperator.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.table.FileStoreTable;
+
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+
+import javax.annotation.Nullable;
+
+/** Stateless writer used for unaware append and postpone bucket table. */
+public class StatelessRowDataStoreWriteOperator extends 
RowDataStoreWriteOperator {
+
+    private static final long serialVersionUID = 1L;
+
+    public StatelessRowDataStoreWriteOperator(
+            StreamOperatorParameters<Committable> parameters,
+            FileStoreTable table,
+            @Nullable LogSinkFunction logSinkFunction,
+            StoreSinkWrite.Provider storeSinkWriteProvider,
+            String initialCommitUser) {
+        super(parameters, table, logSinkFunction, storeSinkWriteProvider, 
initialCommitUser);
+    }
+
+    @Override
+    protected StoreSinkWriteState createState(
+            int subtaskId,
+            StateInitializationContext context,
+            StoreSinkWriteState.StateValueFilter stateFilter) {
+        // No conflicts will occur in append only unaware bucket writer, so no 
state
+        // is needed.
+        return new NoopStoreSinkWriteState(subtaskId);
+    }
+
+    @Override
+    protected String getCommitUser(StateInitializationContext context) throws 
Exception {
+        // No conflicts will occur in append only unaware bucket writer, so
+        // commitUser does not matter.
+        return commitUser == null ? initialCommitUser : commitUser;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
index 2d3742a95a..73c49be1e0 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
@@ -18,7 +18,6 @@
 
 package org.apache.paimon.flink.sink;
 
-import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.disk.IOManagerImpl;
@@ -189,7 +188,6 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
         write.restore((List) states);
     }
 
-    @VisibleForTesting
     public TableWriteImpl<?> getWrite() {
         return write;
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CrossPartitionTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CrossPartitionTableITCase.java
index 98747a31d6..ddad7c5e62 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CrossPartitionTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CrossPartitionTableITCase.java
@@ -198,7 +198,7 @@ public class CrossPartitionTableITCase extends 
CatalogITCaseBase {
             throws ExecutionException, InterruptedException {
         sql(
                 "create table cross_postpone (pt int, k int, v int, primary 
key (k) not enforced) "
-                        + "partitioned by (pt) with ('bucket' = '-2')");
+                        + "partitioned by (pt) with ('bucket' = '-2', 
'postpone.batch-write-fixed-bucket' = 'false')");
         sql("insert into cross_postpone values (1, 1, 1)");
         sql("insert into cross_postpone values (2, 2, 2)");
         tEnv.executeSql("CALL sys.compact(`table` => 
'default.cross_postpone')").await();
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
index be19cdff42..c6b832eebe 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
@@ -20,6 +20,8 @@ package org.apache.paimon.flink;
 
 import org.apache.paimon.flink.util.AbstractTestBase;
 
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
+
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableResult;
@@ -30,11 +32,17 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
 import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** IT cases for postpone bucket tables. */
@@ -71,9 +79,22 @@ public class PostponeBucketTableITCase extends 
AbstractTestBase {
                         + "  'merge-engine' = 'partial-update',\n"
                         + "  'rowkind.field' = 'row_kind_col'\n"
                         + ")");
-        assertThatThrownBy(() -> tEnv.executeSql("INSERT INTO T VALUES (1, 1, 
1, '-D')").await())
-                .rootCause()
-                .hasMessageContaining("By default, Partial update can not 
accept delete records");
+
+        boolean writeFixedBucket = ThreadLocalRandom.current().nextBoolean();
+        tEnv.executeSql(
+                String.format(
+                        "ALTER TABLE T SET 
('postpone.batch-write-fixed-bucket' = '%s')",
+                        writeFixedBucket));
+        if (writeFixedBucket) {
+            assertThatCode(() -> tEnv.executeSql("INSERT INTO T VALUES (1, 1, 
1, '-U')").await())
+                    .doesNotThrowAnyException();
+        } else {
+            assertThatThrownBy(
+                            () -> tEnv.executeSql("INSERT INTO T VALUES (1, 1, 
1, '-D')").await())
+                    .rootCause()
+                    .hasMessageContaining(
+                            "By default, Partial update can not accept delete 
records");
+        }
     }
 
     @Test
@@ -100,7 +121,8 @@ public class PostponeBucketTableITCase extends 
AbstractTestBase {
                         + "  v INT,\n"
                         + "  PRIMARY KEY (pt, k) NOT ENFORCED\n"
                         + ") PARTITIONED BY (pt) WITH (\n"
-                        + "  'bucket' = '-2'\n"
+                        + "  'bucket' = '-2',\n"
+                        + "  'postpone.batch-write-fixed-bucket' = 'false'\n"
                         + ")");
 
         int numPartitions = 3;
@@ -159,7 +181,7 @@ public class PostponeBucketTableITCase extends 
AbstractTestBase {
     }
 
     @Test
-    public void testOverwrite() throws Exception {
+    public void testOverwriteWithoutBatchWriteFixedBucket() throws Exception {
         String warehouse = getTempDirPath();
         TableEnvironment tEnv =
                 tableEnvironmentBuilder()
@@ -182,7 +204,8 @@ public class PostponeBucketTableITCase extends 
AbstractTestBase {
                         + "  v INT,\n"
                         + "  PRIMARY KEY (pt, k) NOT ENFORCED\n"
                         + ") PARTITIONED BY (pt) WITH (\n"
-                        + "  'bucket' = '-2'\n"
+                        + "  'bucket' = '-2',\n"
+                        + "  'postpone.batch-write-fixed-bucket' = 'false'\n"
                         + ")");
 
         tEnv.executeSql(
@@ -211,6 +234,52 @@ public class PostponeBucketTableITCase extends 
AbstractTestBase {
                         "+I[10, 110, 1]", "+I[20, 120, 1]", "+I[20, 221, 2]", 
"+I[30, 230, 2]");
     }
 
+    @Test
+    public void testOverwriteWithBatchWriteFixedBucket() throws Exception {
+        String warehouse = getTempDirPath();
+        TableEnvironment tEnv =
+                tableEnvironmentBuilder()
+                        .batchMode()
+                        .setConf(TableConfigOptions.TABLE_DML_SYNC, true)
+                        .build();
+
+        tEnv.executeSql(
+                "CREATE CATALOG mycat WITH (\n"
+                        + "  'type' = 'paimon',\n"
+                        + "  'warehouse' = '"
+                        + warehouse
+                        + "'\n"
+                        + ")");
+        tEnv.executeSql("USE CATALOG mycat");
+        tEnv.executeSql(
+                "CREATE TABLE T (\n"
+                        + "  pt INT,\n"
+                        + "  k INT,\n"
+                        + "  v INT,\n"
+                        + "  PRIMARY KEY (pt, k) NOT ENFORCED\n"
+                        + ") PARTITIONED BY (pt) WITH (\n"
+                        + "  'bucket' = '-2'\n"
+                        + ")");
+
+        // write postpone bucket with partition 1 and 2
+        tEnv.executeSql(
+                        "INSERT INTO T /*+ OPTIONS 
('postpone.batch-write-fixed-bucket' = 'false') */ "
+                                + "VALUES (1, 10, 110), (1, 20, 120), (2, 10, 
210), (2, 20, 220)")
+                .await();
+        assertThat(collect(tEnv.executeSql("SELECT k, v, pt FROM 
T"))).isEmpty();
+
+        // batch overite partition 2 and the new data can be read
+        tEnv.executeSql("INSERT OVERWRITE T VALUES (2, 20, 221), (2, 30, 
230)").await();
+        assertThat(collect(tEnv.executeSql("SELECT k, v, pt FROM T")))
+                .containsExactlyInAnyOrder("+I[20, 221, 2]", "+I[30, 230, 2]");
+
+        // compact then partition 1 can be read
+        tEnv.executeSql("CALL sys.compact(`table` => 'default.T')").await();
+        assertThat(collect(tEnv.executeSql("SELECT k, v, pt FROM T")))
+                .containsExactlyInAnyOrder(
+                        "+I[10, 110, 1]", "+I[20, 120, 1]", "+I[20, 221, 2]", 
"+I[30, 230, 2]");
+    }
+
     @Timeout(TIMEOUT)
     @Test
     public void testLookupChangelogProducer() throws Exception {
@@ -238,7 +307,8 @@ public class PostponeBucketTableITCase extends 
AbstractTestBase {
                         + "  PRIMARY KEY (pt, k) NOT ENFORCED\n"
                         + ") PARTITIONED BY (pt) WITH (\n"
                         + "  'bucket' = '-2',\n"
-                        + "  'changelog-producer' = 'lookup'\n"
+                        + "  'changelog-producer' = 'lookup',\n"
+                        + "  'postpone.batch-write-fixed-bucket' = 'false'\n"
                         + ")");
 
         TableEnvironment sEnv =
@@ -305,7 +375,8 @@ public class PostponeBucketTableITCase extends 
AbstractTestBase {
                         + "  PRIMARY KEY (pt, k) NOT ENFORCED\n"
                         + ") PARTITIONED BY (pt) WITH (\n"
                         + "  'bucket' = '-2',\n"
-                        + "  'postpone.default-bucket-num' = '2'\n"
+                        + "  'postpone.default-bucket-num' = '2',\n"
+                        + "  'postpone.batch-write-fixed-bucket' = 'false'\n"
                         + ")");
 
         int numKeys = 100;
@@ -461,6 +532,11 @@ public class PostponeBucketTableITCase extends 
AbstractTestBase {
                         + "  'snapshot.num-retained.max' = '3'\n"
                         + ")");
 
+        tEnv.executeSql(
+                String.format(
+                        "ALTER TABLE T SET 
('postpone.batch-write-fixed-bucket' = '%s')",
+                        ThreadLocalRandom.current().nextBoolean()));
+
         for (int i = 0; i < 5; i++) {
             tEnv.executeSql(String.format("INSERT INTO T VALUES (%d, 0, 0)", 
i)).await();
         }
@@ -494,7 +570,8 @@ public class PostponeBucketTableITCase extends 
AbstractTestBase {
                         + "  v INT,\n"
                         + "  PRIMARY KEY (k) NOT ENFORCED\n"
                         + ") WITH (\n"
-                        + "  'bucket' = '-2'\n"
+                        + "  'bucket' = '-2',\n"
+                        + "  'postpone.batch-write-fixed-bucket' = 'false'\n"
                         + ")");
         bEnv.executeSql("CREATE TABLE SRC (i INT, `proctime` AS PROCTIME())");
 
@@ -557,7 +634,8 @@ public class PostponeBucketTableITCase extends 
AbstractTestBase {
                         + "  v INT,\n"
                         + "  PRIMARY KEY (k, pt) NOT ENFORCED\n"
                         + ") PARTITIONED BY (pt) WITH (\n"
-                        + "  'bucket' = '-2'\n"
+                        + "  'bucket' = '-2',\n"
+                        + "  'postpone.batch-write-fixed-bucket' = 'false'\n"
                         + ")");
         bEnv.executeSql("CREATE TABLE SRC (i INT, pt INT, `proctime` AS 
PROCTIME())");
 
@@ -618,6 +696,7 @@ public class PostponeBucketTableITCase extends 
AbstractTestBase {
                         + "  PRIMARY KEY (k) NOT ENFORCED\n"
                         + ") WITH (\n"
                         + "  'bucket' = '-2',\n"
+                        + "  'postpone.batch-write-fixed-bucket' = 'false',\n"
                         + "  'deletion-vectors.enabled' = 'true'\n"
                         + ")");
 
@@ -662,7 +741,8 @@ public class PostponeBucketTableITCase extends 
AbstractTestBase {
                         + "  v INT,\n"
                         + "  PRIMARY KEY (k) NOT ENFORCED\n"
                         + ") WITH (\n"
-                        + "  'bucket' = '-2'\n"
+                        + "  'bucket' = '-2',\n"
+                        + "  'postpone.batch-write-fixed-bucket' = 'false'\n"
                         + ")");
 
         tEnv.executeSql(
@@ -696,7 +776,8 @@ public class PostponeBucketTableITCase extends 
AbstractTestBase {
                         + "  v TIMESTAMP(9),\n"
                         + "  PRIMARY KEY (k) NOT ENFORCED\n"
                         + ") WITH (\n"
-                        + "  'bucket' = '-2'\n"
+                        + "  'bucket' = '-2',\n"
+                        + "  'postpone.batch-write-fixed-bucket' = 'false'\n"
                         + ")");
 
         tEnv.executeSql(
@@ -733,6 +814,7 @@ public class PostponeBucketTableITCase extends 
AbstractTestBase {
                         + "  PRIMARY KEY (k) NOT ENFORCED\n"
                         + ") WITH (\n"
                         + "  'bucket' = '-2',\n"
+                        + "  'postpone.batch-write-fixed-bucket' = 'false',\n"
                         + "  'changelog-producer' = 'none',\n"
                         + "  'scan.remove-normalize' = 'true',\n"
                         + "  'continuous.discovery-interval' = '1ms'\n"
@@ -773,6 +855,342 @@ public class PostponeBucketTableITCase extends 
AbstractTestBase {
                         "+I[5, 53]");
     }
 
+    @Test
+    public void testWriteFixedBucketWithDifferentBucketNumber() throws 
Exception {
+        String warehouse = getTempDirPath();
+        TableEnvironment tEnv =
+                tableEnvironmentBuilder()
+                        .batchMode()
+                        .setConf(TableConfigOptions.TABLE_DML_SYNC, true)
+                        .build();
+
+        tEnv.executeSql(
+                "CREATE CATALOG mycat WITH (\n"
+                        + "  'type' = 'paimon',\n"
+                        + "  'warehouse' = '"
+                        + warehouse
+                        + "'\n"
+                        + ")");
+        tEnv.executeSql("USE CATALOG mycat");
+        tEnv.executeSql(
+                "CREATE TABLE T (\n"
+                        + "  k INT,\n"
+                        + "  v STRING,\n"
+                        + "  pt STRING,"
+                        + "  PRIMARY KEY (k, pt) NOT ENFORCED\n"
+                        + ") PARTITIONED BY (pt) WITH (\n"
+                        + "  'bucket' = '-2'\n"
+                        + ")");
+
+        // use sink.parallelism
+        List<String> values1 = new ArrayList<>();
+        List<String> expected1 = new ArrayList<>();
+        for (int i = 1; i <= 8; i++) {
+            values1.add(String.format("(%d, '%c', 'pt1')", i, (i - 1 + 'a')));
+            values1.add(String.format("(%d, '%c', 'pt2')", i, (i - 1 + 'a')));
+            expected1.add(String.format("+I[%d, %c, pt1]", i, (i - 1 + 'a')));
+            expected1.add(String.format("+I[%d, %c, pt2]", i, (i - 1 + 'a')));
+        }
+        tEnv.executeSql(
+                        "INSERT INTO T /*+ OPTIONS('sink.parallelism' = '4') 
*/ VALUES "
+                                + String.join(",", values1))
+                .await();
+        assertThat(collect(tEnv.executeSql("SELECT * FROM T")))
+                .containsExactlyInAnyOrderElementsOf(expected1);
+        assertThat(
+                        collectRow(tEnv.executeSql("SELECT * FROM 
`T$buckets`")).stream()
+                                .map(row -> (Integer) row.getField(1))
+                                .collect(Collectors.toSet()))
+                .containsExactlyInAnyOrder(0, 1, 2, 3);
+        assertThat(collect(tEnv.executeSql("SELECT * FROM `T$files` WHERE 
level > 0"))).isEmpty();
+
+        // test bucket number strategy: pt1 = 3, pt2 = 4, pt3 = 5 (runtime)
+        tEnv.executeSql(
+                "CALL sys.rescale(`table` => 'default.T', `bucket_num` => 3, 
`partition` => 'pt=pt1')");
+
+        List<String> values2 = new ArrayList<>();
+        List<String> expected2 = new ArrayList<>();
+        for (int i = 1; i <= 8; i++) {
+            values2.add(String.format("(%d, '%c', 'pt1')", i, (i - 1 + 'A')));
+            values2.add(String.format("(%d, '%c', 'pt2')", i, (i - 1 + 'A')));
+            values2.add(String.format("(%d, '%c', 'pt3')", i, (i - 1 + 'A')));
+            expected2.add(String.format("+I[%d, %c, pt1]", i, (i - 1 + 'A')));
+            expected2.add(String.format("+I[%d, %c, pt2]", i, (i - 1 + 'A')));
+            expected2.add(String.format("+I[%d, %c, pt3]", i, (i - 1 + 'A')));
+        }
+        for (int i = 9; i <= 16; i++) {
+            values2.add(String.format("(%d, '%d', 'pt1')", i, i));
+            values2.add(String.format("(%d, '%d', 'pt2')", i, i));
+            values2.add(String.format("(%d, '%d', 'pt3')", i, i));
+            expected2.add(String.format("+I[%d, %d, pt1]", i, i));
+            expected2.add(String.format("+I[%d, %d, pt2]", i, i));
+            expected2.add(String.format("+I[%d, %d, pt3]", i, i));
+        }
+
+        tEnv.executeSql(
+                        "INSERT INTO T /*+ OPTIONS('sink.parallelism' = '5') 
*/ VALUES "
+                                + String.join(",", values2))
+                .await();
+        assertThat(collect(tEnv.executeSql("SELECT * FROM T")))
+                .containsExactlyInAnyOrderElementsOf(expected2);
+        Map<String, Set<Integer>> partitionBuckets = new HashMap<>();
+        for (Row row : collectRow(tEnv.executeSql("SELECT * FROM 
`T$buckets`"))) {
+            partitionBuckets
+                    .computeIfAbsent((String) row.getField(0), p -> new 
HashSet<>())
+                    .add((Integer) row.getField(1));
+        }
+        assertThat(partitionBuckets).hasSize(3);
+        assertThat(partitionBuckets.get("{pt1}")).containsExactly(0, 1, 2);
+        assertThat(partitionBuckets.get("{pt2}")).containsExactly(0, 1, 2, 3);
+        assertThat(partitionBuckets.get("{pt3}")).containsExactly(0, 1, 2, 3, 
4);
+        assertThat(collect(tEnv.executeSql("SELECT * FROM `T$files` WHERE 
level > 0"))).isEmpty();
+    }
+
+    @Test
+    public void testWriteFixedBucketThenWritePostponeBucket() throws Exception 
{
+        String warehouse = getTempDirPath();
+        TableEnvironment tEnv =
+                tableEnvironmentBuilder()
+                        .batchMode()
+                        .setConf(TableConfigOptions.TABLE_DML_SYNC, true)
+                        .build();
+
+        tEnv.executeSql(
+                "CREATE CATALOG mycat WITH (\n"
+                        + "  'type' = 'paimon',\n"
+                        + "  'warehouse' = '"
+                        + warehouse
+                        + "'\n"
+                        + ")");
+        tEnv.executeSql("USE CATALOG mycat");
+        tEnv.executeSql(
+                "CREATE TABLE T (\n"
+                        + "  k INT,\n"
+                        + "  v STRING,\n"
+                        + "  PRIMARY KEY (k) NOT ENFORCED\n"
+                        + ") WITH (\n"
+                        + "  'bucket' = '-2'\n"
+                        + ")");
+
+        // use sink.parallelism
+        tEnv.executeSql(
+                        "INSERT INTO T /*+ OPTIONS('sink.parallelism' = '4') 
*/ "
+                                + "VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4, 
'd'), (5, 'e'), (6, 'f'), (7, 'g'), (8, 'h');")
+                .await();
+        assertThat(collect(tEnv.executeSql("SELECT * FROM T")))
+                .containsExactlyInAnyOrder(
+                        "+I[1, a]",
+                        "+I[2, b]",
+                        "+I[3, c]",
+                        "+I[4, d]",
+                        "+I[5, e]",
+                        "+I[6, f]",
+                        "+I[7, g]",
+                        "+I[8, h]");
+        assertThat(
+                        collectRow(tEnv.executeSql("SELECT * FROM 
`T$buckets`")).stream()
+                                .map(row -> (Integer) row.getField(1))
+                                .collect(Collectors.toSet()))
+                .containsExactlyInAnyOrder(0, 1, 2, 3);
+        assertThat(collect(tEnv.executeSql("SELECT * FROM `T$files` WHERE 
level > 0"))).isEmpty();
+
+        // write to postpone bucket, new record cannot be read before compact
+        tEnv.executeSql(
+                        "INSERT INTO T /*+ 
OPTIONS('postpone.batch-write-fixed-bucket' = 'false') */ "
+                                + "VALUES (1, 'A'), (2, 'B'), (3, 'C'), (4, 
'D'), (5, 'E'), (6, 'F'), (7, 'G'), (8, 'H'), "
+                                + "(9, '9'), (10, '10'), (11, '11'), (12, 
'12'), (13, '13'), (14, '14'), (15, '15'), (16, '16');")
+                .await();
+        assertThat(collect(tEnv.executeSql("SELECT * FROM T")))
+                .containsExactlyInAnyOrder(
+                        "+I[1, a]",
+                        "+I[2, b]",
+                        "+I[3, c]",
+                        "+I[4, d]",
+                        "+I[5, e]",
+                        "+I[6, f]",
+                        "+I[7, g]",
+                        "+I[8, h]");
+        assertThat(
+                        collectRow(tEnv.executeSql("SELECT * FROM 
`T$buckets`")).stream()
+                                .map(row -> (Integer) row.getField(1))
+                                .collect(Collectors.toSet()))
+                .containsExactlyInAnyOrder(-2, 0, 1, 2, 3);
+        assertThat(collect(tEnv.executeSql("SELECT * FROM `T$files` WHERE 
level > 0"))).isEmpty();
+
+        // compact and check result again
+        boolean forceUpLevel0 = ThreadLocalRandom.current().nextBoolean();
+        if (forceUpLevel0) {
+            tEnv.executeSql("ALTER TABLE T set ('compaction.force-up-level-0' 
= 'true')").await();
+        }
+        tEnv.executeSql("CALL sys.compact(`table` => 'default.T')").await();
+
+        assertThat(collect(tEnv.executeSql("SELECT * FROM T")))
+                .containsExactlyInAnyOrder(
+                        "+I[1, A]",
+                        "+I[2, B]",
+                        "+I[3, C]",
+                        "+I[4, D]",
+                        "+I[5, E]",
+                        "+I[6, F]",
+                        "+I[7, G]",
+                        "+I[8, H]",
+                        "+I[9, 9]",
+                        "+I[10, 10]",
+                        "+I[11, 11]",
+                        "+I[12, 12]",
+                        "+I[13, 13]",
+                        "+I[14, 14]",
+                        "+I[15, 15]",
+                        "+I[16, 16]");
+        assertThat(
+                        collectRow(tEnv.executeSql("SELECT * FROM 
`T$buckets`")).stream()
+                                .map(row -> (Integer) row.getField(1))
+                                .collect(Collectors.toSet()))
+                .containsExactlyInAnyOrder(0, 1, 2, 3);
+        if (forceUpLevel0) {
+            assertThat(collect(tEnv.executeSql("SELECT * FROM `T$files` WHERE 
level = 0")))
+                    .isEmpty();
+        }
+    }
+
+    @Test
+    public void testWriteWritePostponeBucketThenWriteFixedBucket() throws 
Exception {
+        String warehouse = getTempDirPath();
+        TableEnvironment tEnv =
+                tableEnvironmentBuilder()
+                        .batchMode()
+                        .setConf(TableConfigOptions.TABLE_DML_SYNC, true)
+                        .build();
+
+        tEnv.executeSql(
+                "CREATE CATALOG mycat WITH (\n"
+                        + "  'type' = 'paimon',\n"
+                        + "  'warehouse' = '"
+                        + warehouse
+                        + "'\n"
+                        + ")");
+        tEnv.executeSql("USE CATALOG mycat");
+        tEnv.executeSql(
+                "CREATE TABLE T (\n"
+                        + "  k INT,\n"
+                        + "  v STRING,\n"
+                        + "  PRIMARY KEY (k) NOT ENFORCED\n"
+                        + ") WITH (\n"
+                        + "  'bucket' = '-2'\n"
+                        + ")");
+
+        tEnv.executeSql(
+                        "INSERT INTO T /*+ 
OPTIONS('postpone.batch-write-fixed-bucket' = 'false') */ "
+                                + "VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4, 
'd'), (5, 'e'), (6, 'f'), (7, 'g'), (8, 'h');")
+                .await();
+        assertThat(collect(tEnv.executeSql("SELECT * FROM T"))).isEmpty();
+
+        // use sink.parallelism
+        tEnv.executeSql(
+                        "INSERT INTO T /*+ OPTIONS('sink.parallelism' = '4') 
*/ "
+                                + "VALUES (1, 'A'), (2, 'B'), (3, 'C'), (4, 
'D'), (5, 'E'), (6, 'F'), (7, 'G'), (8, 'H'), "
+                                + "(9, '9'), (10, '10'), (11, '11'), (12, 
'12'), (13, '13'), (14, '14'), (15, '15'), (16, '16');")
+                .await();
+        assertThat(collect(tEnv.executeSql("SELECT * FROM T")))
+                .containsExactlyInAnyOrder(
+                        "+I[1, A]",
+                        "+I[2, B]",
+                        "+I[3, C]",
+                        "+I[4, D]",
+                        "+I[5, E]",
+                        "+I[6, F]",
+                        "+I[7, G]",
+                        "+I[8, H]",
+                        "+I[9, 9]",
+                        "+I[10, 10]",
+                        "+I[11, 11]",
+                        "+I[12, 12]",
+                        "+I[13, 13]",
+                        "+I[14, 14]",
+                        "+I[15, 15]",
+                        "+I[16, 16]");
+        assertThat(
+                        collectRow(tEnv.executeSql("SELECT * FROM 
`T$buckets`")).stream()
+                                .map(row -> (Integer) row.getField(1))
+                                .collect(Collectors.toSet()))
+                .containsExactlyInAnyOrder(-2, 0, 1, 2, 3);
+        assertThat(collect(tEnv.executeSql("SELECT * FROM `T$files` WHERE 
level > 0"))).isEmpty();
+
+        // compact and check result again
+        boolean forceUpLevel0 = ThreadLocalRandom.current().nextBoolean();
+        if (forceUpLevel0) {
+            tEnv.executeSql("ALTER TABLE T set ('compaction.force-up-level-0' 
= 'true')").await();
+        }
+        tEnv.executeSql("CALL sys.compact(`table` => 'default.T')").await();
+
+        assertThat(collect(tEnv.executeSql("SELECT * FROM T")))
+                .containsExactlyInAnyOrder(
+                        "+I[1, a]",
+                        "+I[2, b]",
+                        "+I[3, c]",
+                        "+I[4, d]",
+                        "+I[5, e]",
+                        "+I[6, f]",
+                        "+I[7, g]",
+                        "+I[8, h]",
+                        "+I[9, 9]",
+                        "+I[10, 10]",
+                        "+I[11, 11]",
+                        "+I[12, 12]",
+                        "+I[13, 13]",
+                        "+I[14, 14]",
+                        "+I[15, 15]",
+                        "+I[16, 16]");
+        assertThat(
+                        collectRow(tEnv.executeSql("SELECT * FROM 
`T$buckets`")).stream()
+                                .map(row -> (Integer) row.getField(1))
+                                .collect(Collectors.toSet()))
+                .containsExactlyInAnyOrder(0, 1, 2, 3);
+        if (forceUpLevel0) {
+            assertThat(collect(tEnv.executeSql("SELECT * FROM `T$files` WHERE 
level = 0")))
+                    .isEmpty();
+        }
+    }
+
+    @Test
+    public void testCompactPostponeThenWriteFixedBucket() throws Exception {
+        String warehouse = getTempDirPath();
+        TableEnvironment tEnv =
+                tableEnvironmentBuilder()
+                        .batchMode()
+                        .setConf(TableConfigOptions.TABLE_DML_SYNC, true)
+                        .build();
+
+        tEnv.executeSql(
+                "CREATE CATALOG mycat WITH (\n"
+                        + "  'type' = 'paimon',\n"
+                        + "  'warehouse' = '"
+                        + warehouse
+                        + "'\n"
+                        + ")");
+        tEnv.executeSql("USE CATALOG mycat");
+        tEnv.executeSql(
+                "CREATE TABLE T (\n"
+                        + "  k INT,\n"
+                        + "  v STRING,\n"
+                        + "  PRIMARY KEY (k) NOT ENFORCED\n"
+                        + ") WITH (\n"
+                        + "  'bucket' = '-2'\n"
+                        + ")");
+
+        tEnv.executeSql(
+                        "INSERT INTO T /*+ 
OPTIONS('postpone.batch-write-fixed-bucket' = 'false') */ VALUES (1, 'a')")
+                .await();
+        assertThat(collect(tEnv.executeSql("SELECT * FROM T"))).isEmpty();
+
+        tEnv.executeSql("CALL sys.compact(`table` => 'default.T')").await();
+        assertThat(collect(tEnv.executeSql("SELECT * FROM T")))
+                .containsExactlyInAnyOrder("+I[1, a]");
+        tEnv.executeSql("INSERT INTO T VALUES (1, 'A')").await();
+        assertThat(collect(tEnv.executeSql("SELECT * FROM T")))
+                .containsExactlyInAnyOrder("+I[1, A]");
+    }
+
     private List<String> collect(TableResult result) throws Exception {
         List<String> ret = new ArrayList<>();
         try (CloseableIterator<Row> it = result.collect()) {
@@ -812,4 +1230,12 @@ public class PostponeBucketTableITCase extends 
AbstractTestBase {
         timerThread.join();
         return ret;
     }
+
+    private List<Row> collectRow(TableResult result) {
+        try (CloseableIterator<Row> iter = result.collect()) {
+            return ImmutableList.copyOf(iter);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
 }

Reply via email to