This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 8dee2a013a [cdc] Support postpone bucket tables in Paimon CDC (#5627)
8dee2a013a is described below

commit 8dee2a013a9d253a934f44111e302fb1d795db45
Author: tsreaper <[email protected]>
AuthorDate: Tue May 20 21:34:22 2025 +0800

    [cdc] Support postpone bucket tables in Paimon CDC (#5627)
---
 .../org/apache/paimon/AppendOnlyFileStore.java     |  8 +++-
 .../src/main/java/org/apache/paimon/FileStore.java |  5 +-
 .../java/org/apache/paimon/KeyValueFileStore.java  | 11 +++--
 .../postpone/PostponeBucketFileStoreWrite.java     | 25 ++++++++--
 .../paimon/postpone/PostponeBucketWriter.java      | 28 +++++++++---
 .../paimon/privilege/PrivilegedFileStore.java      |  7 ++-
 .../paimon/privilege/PrivilegedFileStoreTable.java |  9 +++-
 .../paimon/table/AppendOnlyFileStoreTable.java     | 10 ++--
 .../paimon/table/DelegatedFileStoreTable.java      |  9 +++-
 .../org/apache/paimon/table/FileStoreTable.java    |  7 ++-
 .../paimon/table/PrimaryKeyFileStoreTable.java     |  8 ++--
 .../apache/paimon/table/object/ObjectTable.java    |  7 ++-
 paimon-flink/paimon-flink-cdc/pom.xml              |  7 +--
 .../sink/cdc/CdcPostponeBucketChannelComputer.java | 53 ++++++++++++++++++++++
 .../sink/cdc/CdcRecordStoreMultiWriteOperator.java |  7 ++-
 .../paimon/flink/sink/cdc/CdcSinkBuilder.java      | 12 +++++
 .../sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java  | 10 ++++
 .../sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java   | 27 +++++++++++
 .../sink/cdc/FlinkCdcSyncTableSinkITCase.java      | 20 ++++++++
 .../sink/MultiTablesStoreCompactOperator.java      |  1 +
 .../paimon/flink/sink/NoopStoreSinkWriteState.java |  9 +++-
 .../paimon/flink/sink/RowAppendTableSink.java      | 10 ++--
 .../paimon/flink/sink/StoreCompactOperator.java    |  1 +
 .../paimon/flink/sink/StoreSinkWriteImpl.java      |  3 +-
 .../paimon/flink/sink/StoreSinkWriteState.java     |  2 +
 .../paimon/flink/sink/StoreSinkWriteStateImpl.java |  9 ++++
 .../paimon/flink/sink/TableWriteOperator.java      | 11 +++--
 27 files changed, 266 insertions(+), 50 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
index b901681050..05faeb5fc9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
@@ -36,6 +36,8 @@ import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.CatalogEnvironment;
 import org.apache.paimon.types.RowType;
 
+import javax.annotation.Nullable;
+
 import java.util.Comparator;
 import java.util.List;
 import java.util.Optional;
@@ -89,12 +91,14 @@ public class AppendOnlyFileStore extends 
AbstractFileStore<InternalRow> {
 
     @Override
     public BaseAppendFileStoreWrite newWrite(String commitUser) {
-        return newWrite(commitUser, null);
+        return newWrite(commitUser, null, null);
     }
 
     @Override
     public BaseAppendFileStoreWrite newWrite(
-            String commitUser, ManifestCacheFilter manifestFilter) {
+            String commitUser,
+            @Nullable ManifestCacheFilter manifestFilter,
+            @Nullable Integer writeId) {
         DeletionVectorsMaintainer.Factory dvMaintainerFactory =
                 options.deletionVectorsEnabled()
                         ? 
DeletionVectorsMaintainer.factory(newIndexFileHandler())
diff --git a/paimon-core/src/main/java/org/apache/paimon/FileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
index 34e04cfabb..12fc60a383 100644
--- a/paimon-core/src/main/java/org/apache/paimon/FileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
@@ -88,7 +88,10 @@ public interface FileStore<T> {
 
     FileStoreWrite<T> newWrite(String commitUser);
 
-    FileStoreWrite<T> newWrite(String commitUser, ManifestCacheFilter 
manifestFilter);
+    FileStoreWrite<T> newWrite(
+            String commitUser,
+            @Nullable ManifestCacheFilter manifestFilter,
+            @Nullable Integer writeId);
 
     FileStoreCommit newCommit(String commitUser, FileStoreTable table);
 
diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
index 57f8db5f2d..81039da369 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
@@ -46,6 +46,8 @@ import org.apache.paimon.utils.KeyComparatorSupplier;
 import org.apache.paimon.utils.UserDefinedSeqComparator;
 import org.apache.paimon.utils.ValueEqualiserSupplier;
 
+import javax.annotation.Nullable;
+
 import java.util.Comparator;
 import java.util.List;
 import java.util.Optional;
@@ -154,12 +156,14 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
 
     @Override
     public AbstractFileStoreWrite<KeyValue> newWrite(String commitUser) {
-        return newWrite(commitUser, null);
+        return newWrite(commitUser, null, null);
     }
 
     @Override
     public AbstractFileStoreWrite<KeyValue> newWrite(
-            String commitUser, ManifestCacheFilter manifestFilter) {
+            String commitUser,
+            @Nullable ManifestCacheFilter manifestFilter,
+            @Nullable Integer writeId) {
         IndexMaintainer.Factory<KeyValue> indexFactory = null;
         if (bucketMode() == BucketMode.HASH_DYNAMIC) {
             indexFactory = new 
HashIndexMaintainer.Factory(newIndexFileHandler());
@@ -182,7 +186,8 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
                     snapshotManager(),
                     
newScan(ScanType.FOR_WRITE).withManifestCacheFilter(manifestFilter),
                     options,
-                    tableName);
+                    tableName,
+                    writeId);
         } else {
             return new KeyValueFileStoreWrite(
                     fileIO,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketFileStoreWrite.java
index b52bd0b812..0bc6c48470 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketFileStoreWrite.java
@@ -66,22 +66,28 @@ public class PostponeBucketFileStoreWrite extends 
AbstractFileStoreWrite<KeyValu
             SnapshotManager snapshotManager,
             FileStoreScan scan,
             CoreOptions options,
-            String tableName) {
+            String tableName,
+            @Nullable Integer writeId) {
         super(snapshotManager, scan, null, null, tableName, options, 
partitionType);
 
         Options newOptions = new Options(options.toMap());
         // use avro for postpone bucket
         newOptions.set(CoreOptions.FILE_FORMAT, "avro");
         newOptions.set(CoreOptions.METADATA_STATS_MODE, "none");
-        // each writer should have its unique prefix, so files from the same 
writer can be consumed
-        // by the same compaction reader to keep the input order
+        // Each writer should have its unique prefix, so files from the same 
writer can be consumed
+        // by the same compaction reader to keep the input order.
+        // Also note that, for Paimon CDC, this object might be created 
multiple times in the same
+        // job, however the object will always stay in the same thread, so we 
use hash of thread
+        // name as the identifier.
         newOptions.set(
                 CoreOptions.DATA_FILE_PREFIX,
                 String.format(
                         "%s-u-%s-s-%d-w-",
                         options.dataFilePrefix(),
                         commitUser,
-                        
ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE)));
+                        writeId == null
+                                ? 
ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE)
+                                : writeId));
         this.options = new CoreOptions(newOptions);
 
         FileFormat fileFormat = fileFormat(this.options);
@@ -107,6 +113,12 @@ public class PostponeBucketFileStoreWrite extends 
AbstractFileStoreWrite<KeyValu
         withIgnorePreviousFiles(true);
     }
 
+    @Override
+    public void withIgnorePreviousFiles(boolean ignorePrevious) {
+        // see comments in constructor
+        super.withIgnorePreviousFiles(true);
+    }
+
     @Override
     protected PostponeBucketWriter createWriter(
             BinaryRow partition,
@@ -117,9 +129,12 @@ public class PostponeBucketFileStoreWrite extends 
AbstractFileStoreWrite<KeyValu
             ExecutorService compactExecutor,
             @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer) {
         Preconditions.checkArgument(bucket == BucketMode.POSTPONE_BUCKET);
+        Preconditions.checkArgument(
+                restoreFiles.isEmpty(),
+                "Postpone bucket writers should not restore previous files. 
This is unexpected.");
         KeyValueFileWriterFactory writerFactory =
                 writerFactoryBuilder.build(partition, bucket, options);
-        return new PostponeBucketWriter(writerFactory);
+        return new PostponeBucketWriter(writerFactory, restoreIncrement);
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketWriter.java
 
b/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketWriter.java
index 7bc7cafbf5..4eee650425 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketWriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketWriter.java
@@ -28,6 +28,9 @@ import org.apache.paimon.manifest.FileSource;
 import org.apache.paimon.utils.CommitIncrement;
 import org.apache.paimon.utils.RecordWriter;
 
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -36,10 +39,18 @@ import java.util.List;
 public class PostponeBucketWriter implements RecordWriter<KeyValue> {
 
     private final KeyValueFileWriterFactory writerFactory;
+    private final List<DataFileMeta> files;
+
     private RollingFileWriter<KeyValue, DataFileMeta> writer;
 
-    public PostponeBucketWriter(KeyValueFileWriterFactory writerFactory) {
+    public PostponeBucketWriter(
+            KeyValueFileWriterFactory writerFactory, @Nullable CommitIncrement 
restoreIncrement) {
         this.writerFactory = writerFactory;
+        this.files = new ArrayList<>();
+        if (restoreIncrement != null) {
+            files.addAll(restoreIncrement.newFilesIncrement().newFiles());
+        }
+
         this.writer = null;
     }
 
@@ -55,12 +66,13 @@ public class PostponeBucketWriter implements 
RecordWriter<KeyValue> {
     public void compact(boolean fullCompaction) throws Exception {}
 
     @Override
-    public void addNewFiles(List<DataFileMeta> files) {}
+    public void addNewFiles(List<DataFileMeta> files) {
+        this.files.addAll(files);
+    }
 
     @Override
     public Collection<DataFileMeta> dataFiles() {
-        // this method is only for checkpointing, while this writer does not 
need any checkpoint
-        return Collections.emptyList();
+        return new ArrayList<>(files);
     }
 
     @Override
@@ -71,14 +83,16 @@ public class PostponeBucketWriter implements 
RecordWriter<KeyValue> {
 
     @Override
     public CommitIncrement prepareCommit(boolean waitCompaction) throws 
Exception {
-        List<DataFileMeta> newFiles = Collections.emptyList();
         if (writer != null) {
             writer.close();
-            newFiles = writer.result();
+            files.addAll(writer.result());
             writer = null;
         }
+
+        List<DataFileMeta> result = new ArrayList<>(files);
+        files.clear();
         return new CommitIncrement(
-                new DataIncrement(newFiles, Collections.emptyList(), 
Collections.emptyList()),
+                new DataIncrement(result, Collections.emptyList(), 
Collections.emptyList()),
                 CompactIncrement.emptyIncrement(),
                 null);
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
index a6084350ba..c30b4a94be 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
@@ -147,9 +147,12 @@ public class PrivilegedFileStore<T> implements 
FileStore<T> {
     }
 
     @Override
-    public FileStoreWrite<T> newWrite(String commitUser, ManifestCacheFilter 
manifestFilter) {
+    public FileStoreWrite<T> newWrite(
+            String commitUser,
+            @Nullable ManifestCacheFilter manifestFilter,
+            @Nullable Integer writeId) {
         privilegeChecker.assertCanInsert(identifier);
-        return wrapped.newWrite(commitUser, manifestFilter);
+        return wrapped.newWrite(commitUser, manifestFilter, writeId);
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
index 716c243289..26e9ec1ea1 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
@@ -41,6 +41,8 @@ import org.apache.paimon.utils.ChangelogManager;
 import org.apache.paimon.utils.SnapshotManager;
 import org.apache.paimon.utils.TagManager;
 
+import javax.annotation.Nullable;
+
 import java.time.Duration;
 import java.util.Map;
 import java.util.Objects;
@@ -228,9 +230,12 @@ public class PrivilegedFileStoreTable extends 
DelegatedFileStoreTable {
     }
 
     @Override
-    public TableWriteImpl<?> newWrite(String commitUser, ManifestCacheFilter 
manifestFilter) {
+    public TableWriteImpl<?> newWrite(
+            String commitUser,
+            @Nullable ManifestCacheFilter manifestFilter,
+            @Nullable Integer writeId) {
         privilegeChecker.assertCanInsert(identifier);
-        return wrapped.newWrite(commitUser, manifestFilter);
+        return wrapped.newWrite(commitUser, manifestFilter, writeId);
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
index 69a024db9c..240163cf3d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
@@ -42,6 +42,8 @@ import org.apache.paimon.table.source.SplitGenerator;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.Preconditions;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.function.BiConsumer;
 
@@ -125,13 +127,15 @@ public class AppendOnlyFileStoreTable extends 
AbstractFileStoreTable {
 
     @Override
     public TableWriteImpl<InternalRow> newWrite(String commitUser) {
-        return newWrite(commitUser, null);
+        return newWrite(commitUser, null, null);
     }
 
     @Override
     public TableWriteImpl<InternalRow> newWrite(
-            String commitUser, ManifestCacheFilter manifestFilter) {
-        BaseAppendFileStoreWrite writer = store().newWrite(commitUser, 
manifestFilter);
+            String commitUser,
+            @Nullable ManifestCacheFilter manifestFilter,
+            @Nullable Integer writeId) {
+        BaseAppendFileStoreWrite writer = store().newWrite(commitUser, 
manifestFilter, writeId);
         return new TableWriteImpl<>(
                 rowType(),
                 writer,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
index dc4aeec073..112bb21dc2 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
@@ -49,6 +49,8 @@ import org.apache.paimon.utils.TagManager;
 
 import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
 
+import javax.annotation.Nullable;
+
 import java.time.Duration;
 import java.util.Objects;
 import java.util.Optional;
@@ -292,8 +294,11 @@ public abstract class DelegatedFileStoreTable implements 
FileStoreTable {
     }
 
     @Override
-    public TableWriteImpl<?> newWrite(String commitUser, ManifestCacheFilter 
manifestFilter) {
-        return wrapped.newWrite(commitUser, manifestFilter);
+    public TableWriteImpl<?> newWrite(
+            String commitUser,
+            @Nullable ManifestCacheFilter manifestFilter,
+            @Nullable Integer writeId) {
+        return wrapped.newWrite(commitUser, manifestFilter, writeId);
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
index 92b0fc7b6a..f3af21f266 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
@@ -41,6 +41,8 @@ import org.apache.paimon.utils.TagManager;
 
 import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
 
+import javax.annotation.Nullable;
+
 import java.time.Duration;
 import java.util.List;
 import java.util.Map;
@@ -111,7 +113,10 @@ public interface FileStoreTable extends DataTable {
     @Override
     TableWriteImpl<?> newWrite(String commitUser);
 
-    TableWriteImpl<?> newWrite(String commitUser, ManifestCacheFilter 
manifestFilter);
+    TableWriteImpl<?> newWrite(
+            String commitUser,
+            @Nullable ManifestCacheFilter manifestFilter,
+            @Nullable Integer writeId);
 
     @Override
     TableCommitImpl newCommit(String commitUser);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
index c375bb2900..1a22822779 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
@@ -152,16 +152,18 @@ public class PrimaryKeyFileStoreTable extends 
AbstractFileStoreTable {
 
     @Override
     public TableWriteImpl<KeyValue> newWrite(String commitUser) {
-        return newWrite(commitUser, null);
+        return newWrite(commitUser, null, null);
     }
 
     @Override
     public TableWriteImpl<KeyValue> newWrite(
-            String commitUser, ManifestCacheFilter manifestFilter) {
+            String commitUser,
+            @Nullable ManifestCacheFilter manifestFilter,
+            @Nullable Integer writeId) {
         KeyValue kv = new KeyValue();
         return new TableWriteImpl<>(
                 rowType(),
-                store().newWrite(commitUser, manifestFilter),
+                store().newWrite(commitUser, manifestFilter, writeId),
                 createRowKeyExtractor(),
                 (record, rowKind) ->
                         kv.replace(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectTable.java
index 4013fede49..d63d547656 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectTable.java
@@ -30,6 +30,8 @@ import org.apache.paimon.table.sink.TableWriteImpl;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
 
+import javax.annotation.Nullable;
+
 import java.util.HashSet;
 import java.util.Map;
 
@@ -157,7 +159,10 @@ public interface ObjectTable extends FileStoreTable {
         }
 
         @Override
-        public TableWriteImpl<?> newWrite(String commitUser, 
ManifestCacheFilter manifestFilter) {
+        public TableWriteImpl<?> newWrite(
+                String commitUser,
+                @Nullable ManifestCacheFilter manifestFilter,
+                @Nullable Integer writeId) {
             throw new UnsupportedOperationException("Object table does not 
support Write.");
         }
 
diff --git a/paimon-flink/paimon-flink-cdc/pom.xml 
b/paimon-flink/paimon-flink-cdc/pom.xml
index 182637513e..b489aaf615 100644
--- a/paimon-flink/paimon-flink-cdc/pom.xml
+++ b/paimon-flink/paimon-flink-cdc/pom.xml
@@ -59,12 +59,7 @@ under the License.
             <groupId>org.apache.paimon</groupId>
             <artifactId>paimon-flink-common</artifactId>
             <version>${project.version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>*</groupId>
-                    <artifactId>*</artifactId>
-                </exclusion>
-            </exclusions>
+            <scope>provided</scope>
         </dependency>
 
         <!-- Flink dependencies -->
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcPostponeBucketChannelComputer.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcPostponeBucketChannelComputer.java
new file mode 100644
index 0000000000..5dacef7cad
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcPostponeBucketChannelComputer.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.cdc;
+
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.sink.ChannelComputer;
+import org.apache.paimon.table.sink.KeyAndBucketExtractor;
+
+/**
+ * {@link ChannelComputer} for writing {@link CdcRecord}s into postpone bucket 
tables. Records with
+ * same primary keys are distributed to the same subtask.
+ */
+public class CdcPostponeBucketChannelComputer implements 
ChannelComputer<CdcRecord> {
+
+    private final TableSchema schema;
+
+    private transient int numChannels;
+    private transient KeyAndBucketExtractor<CdcRecord> extractor;
+
+    public CdcPostponeBucketChannelComputer(TableSchema schema) {
+        this.schema = schema;
+    }
+
+    @Override
+    public void setup(int numChannels) {
+        this.numChannels = numChannels;
+        this.extractor = new CdcRecordKeyAndBucketExtractor(schema);
+    }
+
+    @Override
+    public int channel(CdcRecord record) {
+        extractor.setRecord(record);
+        return Math.abs(
+                (extractor.partition().hashCode() + 
extractor.trimmedPrimaryKey().hashCode())
+                        % numChannels);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
index fb59878b8b..ea6008d872 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
@@ -30,6 +30,7 @@ import org.apache.paimon.flink.sink.StoreSinkWrite;
 import org.apache.paimon.flink.sink.StoreSinkWriteImpl;
 import org.apache.paimon.flink.sink.StoreSinkWriteState;
 import org.apache.paimon.flink.sink.StoreSinkWriteStateImpl;
+import org.apache.paimon.flink.utils.RuntimeContextUtils;
 import org.apache.paimon.memory.HeapMemorySegmentPool;
 import org.apache.paimon.memory.MemoryPoolFactory;
 import org.apache.paimon.options.Options;
@@ -107,7 +108,11 @@ public class CdcRecordStoreMultiWriteOperator
                         context, "commit_user_state", String.class, 
initialCommitUser);
 
         // TODO: should use CdcRecordMultiChannelComputer to filter
-        state = new StoreSinkWriteStateImpl(context, (tableName, partition, 
bucket) -> true);
+        state =
+                new StoreSinkWriteStateImpl(
+                        
RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext()),
+                        context,
+                        (tableName, partition, bucket) -> true);
         tables = new HashMap<>();
         writes = new HashMap<>();
         compactExecutor =
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
index 9297670d3a..e4518d3a26 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
@@ -132,6 +132,8 @@ public class CdcSinkBuilder<T> {
             case HASH_DYNAMIC:
                 return new CdcDynamicBucketSink((FileStoreTable) table)
                         .build(converted, parallelism);
+            case POSTPONE_MODE:
+                return buildForPostponeBucket(converted);
             case BUCKET_UNAWARE:
                 return buildForUnawareBucket(converted);
             default:
@@ -146,6 +148,16 @@ public class CdcSinkBuilder<T> {
         return new CdcFixedBucketSink(dataTable).sinkFrom(partitioned);
     }
 
+    private DataStreamSink<?> buildForPostponeBucket(DataStream<CdcRecord> 
parsed) {
+        FileStoreTable dataTable = (FileStoreTable) table;
+        DataStream<CdcRecord> partitioned =
+                partition(
+                        parsed,
+                        new 
CdcPostponeBucketChannelComputer(dataTable.schema()),
+                        parallelism);
+        return new CdcFixedBucketSink(dataTable).sinkFrom(partitioned);
+    }
+
     private DataStreamSink<?> buildForUnawareBucket(DataStream<CdcRecord> 
parsed) {
         FileStoreTable dataTable = (FileStoreTable) table;
         // rebalance it to make sure schema change work to avoid infinite loop
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
index a6b272eaf8..b53ec9c80f 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
@@ -204,6 +204,13 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
         new CdcFixedBucketSink(table).sinkFrom(partitioned);
     }
 
+    private void buildForPostponeBucket(FileStoreTable table, 
DataStream<CdcRecord> parsed) {
+        DataStream<CdcRecord> partitioned =
+                partition(
+                        parsed, new 
CdcPostponeBucketChannelComputer(table.schema()), parallelism);
+        new CdcFixedBucketSink(table).sinkFrom(partitioned);
+    }
+
     private void buildForUnawareBucket(FileStoreTable table, 
DataStream<CdcRecord> parsed) {
         // rebalance it to make sure schema change work to avoid infinite loop
         new CdcAppendTableSink(table, 
parallelism).sinkFrom(parsed.rebalance());
@@ -251,6 +258,9 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
                 case HASH_DYNAMIC:
                     new CdcDynamicBucketSink(table).build(converted, 
parallelism);
                     break;
+                case POSTPONE_MODE:
+                    buildForPostponeBucket(table, converted);
+                    break;
                 case BUCKET_UNAWARE:
                     buildForUnawareBucket(table, converted);
                     break;
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
index 484670e23c..4db53f7f80 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
@@ -23,6 +23,7 @@ import org.apache.paimon.catalog.CatalogLoader;
 import org.apache.paimon.catalog.CatalogUtils;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.flink.FlinkCatalogFactory;
+import org.apache.paimon.flink.action.CompactAction;
 import org.apache.paimon.flink.action.cdc.TypeMapping;
 import org.apache.paimon.flink.util.AbstractTestBase;
 import org.apache.paimon.fs.FileIO;
@@ -35,6 +36,7 @@ import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.SchemaUtils;
 import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.FileStoreTableFactory;
 import org.apache.paimon.table.source.ReadBuilder;
@@ -53,6 +55,7 @@ import org.junit.jupiter.api.io.TempDir;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
@@ -80,6 +83,12 @@ public class FlinkCdcSyncDatabaseSinkITCase extends 
AbstractTestBase {
         innerTestRandomCdcEvents(() -> -1, false);
     }
 
+    @Test
+    @Timeout(120)
+    public void testRandomCdcEventsPostponeBucket() throws Exception {
+        innerTestRandomCdcEvents(() -> BucketMode.POSTPONE_BUCKET, false);
+    }
+
     @Test
     @Timeout(120)
     public void testRandomCdcEventsUnawareBucket() throws Exception {
@@ -183,6 +192,24 @@ public class FlinkCdcSyncDatabaseSinkITCase extends 
AbstractTestBase {
 
         // no failure when checking results
         FailingFileIO.reset(failingName, 0, 1);
+
+        for (int i = 0; i < numTables; i++) {
+            FileStoreTable table = 
fileStoreTables.get(i).copyWithLatestSchema();
+            if (table.coreOptions().bucket() == BucketMode.POSTPONE_BUCKET) {
+                // postpone bucket tables must be compacted, so data can be 
consumed
+                StreamExecutionEnvironment compactEnv =
+                        
streamExecutionEnvironmentBuilder().batchMode().parallelism(2).build();
+                CompactAction compactAction =
+                        new CompactAction(
+                                DATABASE_NAME,
+                                table.name(),
+                                catalogOptions.toMap(),
+                                new HashMap<>());
+                compactAction.withStreamExecutionEnvironment(compactEnv);
+                compactAction.run();
+            }
+        }
+
         for (int i = 0; i < numTables; i++) {
             FileStoreTable table = 
fileStoreTables.get(i).copyWithLatestSchema();
             SchemaManager schemaManager = new SchemaManager(table.fileIO(), 
table.location());
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
index 9fccaac992..ea02652971 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
@@ -24,6 +24,7 @@ import org.apache.paimon.catalog.CatalogUtils;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.flink.FlinkCatalogFactory;
+import org.apache.paimon.flink.action.CompactAction;
 import org.apache.paimon.flink.util.AbstractTestBase;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
@@ -35,6 +36,7 @@ import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.SchemaUtils;
 import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.FileStoreTableFactory;
 import org.apache.paimon.table.source.ReadBuilder;
@@ -53,6 +55,7 @@ import org.junit.jupiter.api.io.TempDir;
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
@@ -77,6 +80,12 @@ public class FlinkCdcSyncTableSinkITCase extends 
AbstractTestBase {
         innerTestRandomCdcEvents(-1, false, false);
     }
 
+    @Test
+    @Timeout(120)
+    public void testRandomCdcEventsPostponeBucket() throws Exception {
+        innerTestRandomCdcEvents(BucketMode.POSTPONE_BUCKET, false, false);
+    }
+
     @Disabled
     @Test
     @Timeout(120)
@@ -177,6 +186,17 @@ public class FlinkCdcSyncTableSinkITCase extends 
AbstractTestBase {
         // no failure when checking results
         FailingFileIO.reset(failingName, 0, 1);
 
+        if (numBucket == BucketMode.POSTPONE_BUCKET) {
+            // postpone bucket tables must be compacted, so data can be 
consumed
+            StreamExecutionEnvironment compactEnv =
+                    
streamExecutionEnvironmentBuilder().batchMode().parallelism(3).build();
+            CompactAction compactAction =
+                    new CompactAction(
+                            DATABASE_NAME, TABLE_NAME, catalogOptions.toMap(), 
new HashMap<>());
+            compactAction.withStreamExecutionEnvironment(compactEnv);
+            compactAction.run();
+        }
+
         table = table.copyWithLatestSchema();
         SchemaManager schemaManager = new SchemaManager(table.fileIO(), 
table.location());
         TableSchema schema = schemaManager.latest().get();
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
index 71c41d2f6b..e1eb9ac7d6 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
@@ -113,6 +113,7 @@ public class MultiTablesStoreCompactOperator
 
         state =
                 new StoreSinkWriteStateImpl(
+                        
RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext()),
                         context,
                         (tableName, partition, bucket) ->
                                 ChannelComputer.select(
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/NoopStoreSinkWriteState.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/NoopStoreSinkWriteState.java
index f1974da321..7e994e2015 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/NoopStoreSinkWriteState.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/NoopStoreSinkWriteState.java
@@ -28,9 +28,11 @@ import java.util.List;
  */
 public class NoopStoreSinkWriteState implements StoreSinkWriteState {
 
+    private final int subtaskId;
     private final StateValueFilter stateValueFilter;
 
-    public NoopStoreSinkWriteState(StateValueFilter stateValueFilter) {
+    public NoopStoreSinkWriteState(int subtaskId, StateValueFilter 
stateValueFilter) {
+        this.subtaskId = subtaskId;
         this.stateValueFilter = stateValueFilter;
     }
 
@@ -49,4 +51,9 @@ public class NoopStoreSinkWriteState implements 
StoreSinkWriteState {
 
     @Override
     public void snapshotState() throws Exception {}
+
+    @Override
+    public int getSubtaskId() {
+        return subtaskId;
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowAppendTableSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowAppendTableSink.java
index b6f2381c9f..69a339a411 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowAppendTableSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowAppendTableSink.java
@@ -45,27 +45,27 @@ public class RowAppendTableSink extends 
AppendTableSink<InternalRow> {
         return new RowDataStoreWriteOperator.Factory(
                 table, logSinkFunction, writeProvider, commitUser) {
             @Override
+            @SuppressWarnings("unchecked, rawtypes")
             public StreamOperator 
createStreamOperator(StreamOperatorParameters parameters) {
                 return new RowDataStoreWriteOperator(
                         parameters, table, logSinkFunction, writeProvider, 
commitUser) {
 
                     @Override
                     protected StoreSinkWriteState createState(
+                            int subtaskId,
                             StateInitializationContext context,
                             StoreSinkWriteState.StateValueFilter stateFilter)
                             throws Exception {
                         // No conflicts will occur in append only unaware 
bucket writer, so no state
-                        // is
-                        // needed.
-                        return new NoopStoreSinkWriteState(stateFilter);
+                        // is needed.
+                        return new NoopStoreSinkWriteState(subtaskId, 
stateFilter);
                     }
 
                     @Override
                     protected String getCommitUser(StateInitializationContext 
context)
                             throws Exception {
                         // No conflicts will occur in append only unaware 
bucket writer, so
-                        // commitUser does
-                        // not matter.
+                        // commitUser does not matter.
                         return commitUser;
                     }
                 };
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
index 99ad6c0c00..6c0fe5c13a 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
@@ -93,6 +93,7 @@ public class StoreCompactOperator extends 
PrepareCommitOperator<RowData, Committ
 
         state =
                 new StoreSinkWriteStateImpl(
+                        
RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext()),
                         context,
                         (tableName, partition, bucket) ->
                                 ChannelComputer.select(
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
index ef80428209..96ebf1b662 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
@@ -145,7 +145,8 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
                 table.newWrite(
                                 commitUser,
                                 (part, bucket) ->
-                                        
state.stateValueFilter().filter(table.name(), part, bucket))
+                                        
state.stateValueFilter().filter(table.name(), part, bucket),
+                                state.getSubtaskId())
                         .withIOManager(paimonIOManager)
                         .withIgnorePreviousFiles(ignorePreviousFiles)
                         .withExecutionMode(isStreamingMode)
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteState.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteState.java
index 8626a01a66..e7b3bcf254 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteState.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteState.java
@@ -39,6 +39,8 @@ public interface StoreSinkWriteState {
 
     void snapshotState() throws Exception;
 
+    int getSubtaskId();
+
     /**
      * A state value for {@link StoreSinkWrite}. All state values should be 
given a partition and a
      * bucket so that they can be redistributed once the sink parallelism is 
changed.
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteStateImpl.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteStateImpl.java
index a01cbcb68d..7f4379c241 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteStateImpl.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteStateImpl.java
@@ -46,6 +46,7 @@ import java.util.Map;
  */
 public class StoreSinkWriteStateImpl implements StoreSinkWriteState {
 
+    private final int subtaskId;
     private final StoreSinkWriteState.StateValueFilter stateValueFilter;
 
     private final ListState<Tuple5<String, String, byte[], Integer, byte[]>> 
listState;
@@ -53,10 +54,13 @@ public class StoreSinkWriteStateImpl implements 
StoreSinkWriteState {
 
     @SuppressWarnings("unchecked")
     public StoreSinkWriteStateImpl(
+            int subtaskId,
             StateInitializationContext context,
             StoreSinkWriteState.StateValueFilter stateValueFilter)
             throws Exception {
+        this.subtaskId = subtaskId;
         this.stateValueFilter = stateValueFilter;
+
         TupleSerializer<Tuple5<String, String, byte[], Integer, byte[]>> 
listStateSerializer =
                 new TupleSerializer<>(
                         (Class<Tuple5<String, String, byte[], Integer, 
byte[]>>)
@@ -120,4 +124,9 @@ public class StoreSinkWriteStateImpl implements 
StoreSinkWriteState {
         }
         listState.update(list);
     }
+
+    @Override
+    public int getSubtaskId() {
+        return subtaskId;
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
index ec69ce59f6..127acc97c8 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
@@ -64,16 +64,17 @@ public abstract class TableWriteOperator<IN> extends 
PrepareCommitOperator<IN, C
 
         boolean containLogSystem = containLogSystem();
         int numTasks = 
RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext());
+        int subtaskId = 
RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext());
         StateValueFilter stateFilter =
                 (tableName, partition, bucket) -> {
                     int task =
                             containLogSystem
                                     ? ChannelComputer.select(bucket, numTasks)
                                     : ChannelComputer.select(partition, 
bucket, numTasks);
-                    return task == 
RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext());
+                    return task == subtaskId;
                 };
 
-        state = createState(context, stateFilter);
+        state = createState(subtaskId, context, stateFilter);
         write =
                 storeSinkWriteProvider.provide(
                         table,
@@ -85,9 +86,11 @@ public abstract class TableWriteOperator<IN> extends 
PrepareCommitOperator<IN, C
     }
 
     protected StoreSinkWriteState createState(
-            StateInitializationContext context, 
StoreSinkWriteState.StateValueFilter stateFilter)
+            int subtaskId,
+            StateInitializationContext context,
+            StoreSinkWriteState.StateValueFilter stateFilter)
             throws Exception {
-        return new StoreSinkWriteStateImpl(context, stateFilter);
+        return new StoreSinkWriteStateImpl(subtaskId, context, stateFilter);
     }
 
     protected String getCommitUser(StateInitializationContext context) throws 
Exception {


Reply via email to