This is an automated email from the ASF dual-hosted git repository.

junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 21ba91e9f8 [flink] Make PostponeBucketSink no state and no intended 
failure (#5746)
21ba91e9f8 is described below

commit 21ba91e9f8af4c5937ed9f31928147ca0688d5eb
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Jun 16 10:20:11 2025 +0800

    [flink] Make PostponeBucketSink no state and no intended failure (#5746)
---
 .../paimon/flink/sink/cdc/CdcAppendTableSink.java  |  7 +++
 .../sink/cdc/CdcAppendTableWriteOperator.java      | 13 +++++
 .../flink/sink/cdc/FlinkCdcMultiTableSink.java     |  2 +-
 .../apache/paimon/flink/sink/AppendTableSink.java  |  2 +
 .../apache/paimon/flink/sink/FlinkSinkBuilder.java |  2 +-
 .../apache/paimon/flink/sink/FlinkWriteSink.java   | 50 +++++++++++++++++++
 .../paimon/flink/sink/PostponeBucketSink.java}     | 34 ++++++-------
 .../RestoreAndFailCommittableStateManager.java     | 56 ++--------------------
 ...er.java => RestoreCommittableStateManager.java} | 28 ++---------
 .../paimon/flink/sink/RowAppendTableSink.java      | 40 ++++------------
 .../sink/BatchWriteGeneratorTagOperatorTest.java   |  4 +-
 .../paimon/flink/sink/CommitterOperatorTest.java   |  2 +-
 .../paimon/flink/sink/StoreMultiCommitterTest.java |  2 +-
 13 files changed, 112 insertions(+), 130 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcAppendTableSink.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcAppendTableSink.java
index d43690e5b2..7a5b2a0f11 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcAppendTableSink.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcAppendTableSink.java
@@ -19,8 +19,10 @@
 package org.apache.paimon.flink.sink.cdc;
 
 import org.apache.paimon.flink.sink.Committable;
+import org.apache.paimon.flink.sink.CommittableStateManager;
 import org.apache.paimon.flink.sink.FlinkWriteSink;
 import org.apache.paimon.flink.sink.StoreSinkWrite;
+import org.apache.paimon.manifest.ManifestCommittable;
 import org.apache.paimon.table.FileStoreTable;
 
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -52,4 +54,9 @@ public class CdcAppendTableSink extends 
FlinkWriteSink<CdcRecord> {
             DataStream<CdcRecord> input, String initialCommitUser, @Nullable 
Integer parallelism) {
         return super.doWrite(input, initialCommitUser, this.parallelism);
     }
+
+    @Override
+    protected CommittableStateManager<ManifestCommittable> 
createCommittableStateManager() {
+        return createRestoreOnlyCommittableStateManager(table);
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcAppendTableWriteOperator.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcAppendTableWriteOperator.java
index 96b9fefcdf..b3e4153727 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcAppendTableWriteOperator.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcAppendTableWriteOperator.java
@@ -19,11 +19,14 @@
 package org.apache.paimon.flink.sink.cdc;
 
 import org.apache.paimon.flink.sink.Committable;
+import org.apache.paimon.flink.sink.NoopStoreSinkWriteState;
 import org.apache.paimon.flink.sink.PrepareCommitOperator;
 import org.apache.paimon.flink.sink.StoreSinkWrite;
+import org.apache.paimon.flink.sink.StoreSinkWriteState;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.types.RowKind;
 
+import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
 import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
@@ -40,6 +43,16 @@ public class CdcAppendTableWriteOperator extends 
CdcRecordStoreWriteOperator {
         super(parameters, table, storeSinkWriteProvider, initialCommitUser);
     }
 
+    @Override
+    protected StoreSinkWriteState createState(
+            int subtaskId,
+            StateInitializationContext context,
+            StoreSinkWriteState.StateValueFilter stateFilter) {
+        // No conflicts will occur in append only unaware bucket writer, so no 
state
+        // is needed.
+        return new NoopStoreSinkWriteState(subtaskId, stateFilter);
+    }
+
     @Override
     public void processElement(StreamRecord<CdcRecord> element) throws 
Exception {
         // only accepts INSERT record
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
index 0cd2638179..a67e74ef55 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
@@ -177,6 +177,6 @@ public class FlinkCdcMultiTableSink implements Serializable 
{
 
     protected CommittableStateManager<WrappedManifestCommittable> 
createCommittableStateManager() {
         return new RestoreAndFailCommittableStateManager<>(
-                WrappedManifestCommittableSerializer::new);
+                WrappedManifestCommittableSerializer::new, true);
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendTableSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendTableSink.java
index 24d0e2db6f..094d61f6ea 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendTableSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendTableSink.java
@@ -48,6 +48,8 @@ import static 
org.apache.paimon.flink.utils.ParallelismUtils.setParallelism;
  */
 public abstract class AppendTableSink<T> extends FlinkWriteSink<T> {
 
+    private static final long serialVersionUID = 1L;
+
     protected final FileStoreTable table;
     protected final LogSinkFunction logSinkFunction;
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
index 7f211870b9..3484210014 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
@@ -300,7 +300,7 @@ public class FlinkSinkBuilder {
             channelComputer = new 
PostponeBucketChannelComputer(table.schema());
         }
         DataStream<InternalRow> partitioned = partition(input, 
channelComputer, parallelism);
-        FixedBucketSink sink = new FixedBucketSink(table, overwritePartition, 
null);
+        PostponeBucketSink sink = new PostponeBucketSink(table, 
overwritePartition);
         return sink.sinkFrom(partitioned);
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java
index 74f1febd90..8dc2044734 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java
@@ -18,11 +18,17 @@
 
 package org.apache.paimon.flink.sink;
 
+import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.manifest.ManifestCommittable;
 import org.apache.paimon.manifest.ManifestCommittableSerializer;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.table.FileStoreTable;
 
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+
 import javax.annotation.Nullable;
 
 import java.util.Map;
@@ -63,4 +69,48 @@ public abstract class FlinkWriteSink<T> extends FlinkSink<T> 
{
                 ManifestCommittableSerializer::new,
                 options.get(PARTITION_MARK_DONE_RECOVER_FROM_STATE));
     }
+
+    protected static OneInputStreamOperatorFactory<InternalRow, Committable>
+            createNoStateRowWriteOperatorFactory(
+                    FileStoreTable table,
+                    LogSinkFunction logSinkFunction,
+                    StoreSinkWrite.Provider writeProvider,
+                    String commitUser) {
+        return new RowDataStoreWriteOperator.Factory(
+                table, logSinkFunction, writeProvider, commitUser) {
+            @Override
+            @SuppressWarnings("unchecked, rawtypes")
+            public StreamOperator 
createStreamOperator(StreamOperatorParameters parameters) {
+                return new RowDataStoreWriteOperator(
+                        parameters, table, logSinkFunction, writeProvider, 
commitUser) {
+
+                    @Override
+                    protected StoreSinkWriteState createState(
+                            int subtaskId,
+                            StateInitializationContext context,
+                            StoreSinkWriteState.StateValueFilter stateFilter) {
+                        // No conflicts will occur in append only unaware 
bucket writer, so no state
+                        // is needed.
+                        return new NoopStoreSinkWriteState(subtaskId, 
stateFilter);
+                    }
+
+                    @Override
+                    protected String getCommitUser(StateInitializationContext 
context)
+                            throws Exception {
+                        // No conflicts will occur in append only unaware 
bucket writer, so
+                        // commitUser does not matter.
+                        return commitUser;
+                    }
+                };
+            }
+        };
+    }
+
+    protected static CommittableStateManager<ManifestCommittable>
+            createRestoreOnlyCommittableStateManager(FileStoreTable table) {
+        Options options = table.coreOptions().toConfiguration();
+        return new RestoreCommittableStateManager<>(
+                ManifestCommittableSerializer::new,
+                options.get(PARTITION_MARK_DONE_RECOVER_FROM_STATE));
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcAppendTableSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeBucketSink.java
similarity index 51%
copy from 
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcAppendTableSink.java
copy to 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeBucketSink.java
index d43690e5b2..1ab6f398c5 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcAppendTableSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeBucketSink.java
@@ -16,40 +16,36 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink.sink.cdc;
+package org.apache.paimon.flink.sink;
 
-import org.apache.paimon.flink.sink.Committable;
-import org.apache.paimon.flink.sink.FlinkWriteSink;
-import org.apache.paimon.flink.sink.StoreSinkWrite;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.manifest.ManifestCommittable;
 import org.apache.paimon.table.FileStoreTable;
 
-import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
 
 import javax.annotation.Nullable;
 
-/**
- * CDC Sink for unaware bucket table. It should not add compaction node, 
because the compaction may
- * have old schema.
- */
-public class CdcAppendTableSink extends FlinkWriteSink<CdcRecord> {
+import java.util.Map;
+
+/** {@link FlinkSink} for writing records into fixed bucket Paimon table. */
+public class PostponeBucketSink extends FlinkWriteSink<InternalRow> {
 
-    private final Integer parallelism;
+    private static final long serialVersionUID = 1L;
 
-    public CdcAppendTableSink(FileStoreTable table, Integer parallelism) {
-        super(table, null);
-        this.parallelism = parallelism;
+    public PostponeBucketSink(
+            FileStoreTable table, @Nullable Map<String, String> 
overwritePartition) {
+        super(table, overwritePartition);
     }
 
     @Override
-    protected OneInputStreamOperatorFactory<CdcRecord, Committable> 
createWriteOperatorFactory(
+    protected OneInputStreamOperatorFactory<InternalRow, Committable> 
createWriteOperatorFactory(
             StoreSinkWrite.Provider writeProvider, String commitUser) {
-        return new CdcAppendTableWriteOperator.Factory(table, writeProvider, 
commitUser);
+        return createNoStateRowWriteOperatorFactory(table, null, 
writeProvider, commitUser);
     }
 
     @Override
-    public DataStream<Committable> doWrite(
-            DataStream<CdcRecord> input, String initialCommitUser, @Nullable 
Integer parallelism) {
-        return super.doWrite(input, initialCommitUser, this.parallelism);
+    protected CommittableStateManager<ManifestCommittable> 
createCommittableStateManager() {
+        return createRestoreOnlyCommittableStateManager(table);
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RestoreAndFailCommittableStateManager.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RestoreAndFailCommittableStateManager.java
index a9b0922d0b..8556cb6e77 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RestoreAndFailCommittableStateManager.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RestoreAndFailCommittableStateManager.java
@@ -19,18 +19,9 @@
 package org.apache.paimon.flink.sink;
 
 import org.apache.paimon.data.serializer.VersionedSerializer;
-import org.apache.paimon.flink.VersionedSerializerWrapper;
 import org.apache.paimon.manifest.ManifestCommittable;
 import org.apache.paimon.utils.SerializableSupplier;
 
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
-import org.apache.flink.runtime.state.StateInitializationContext;
-import org.apache.flink.runtime.state.StateSnapshotContext;
-import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
-
-import java.util.ArrayList;
 import java.util.List;
 
 /**
@@ -44,52 +35,20 @@ import java.util.List;
  * store writers.
  */
 public class RestoreAndFailCommittableStateManager<GlobalCommitT>
-        implements CommittableStateManager<GlobalCommitT> {
+        extends RestoreCommittableStateManager<GlobalCommitT> {
 
     private static final long serialVersionUID = 1L;
 
-    /** The committable's serializer. */
-    private final SerializableSupplier<VersionedSerializer<GlobalCommitT>> 
committableSerializer;
-
-    private final boolean partitionMarkDoneRecoverFromState;
-
-    /** GlobalCommitT state of this job. Used to filter out previous 
successful commits. */
-    private ListState<GlobalCommitT> streamingCommitterState;
-
-    public RestoreAndFailCommittableStateManager(
-            SerializableSupplier<VersionedSerializer<GlobalCommitT>> 
committableSerializer) {
-        this(committableSerializer, true);
-    }
-
     public RestoreAndFailCommittableStateManager(
             SerializableSupplier<VersionedSerializer<GlobalCommitT>> 
committableSerializer,
             boolean partitionMarkDoneRecoverFromState) {
-        this.committableSerializer = committableSerializer;
-        this.partitionMarkDoneRecoverFromState = 
partitionMarkDoneRecoverFromState;
+        super(committableSerializer, partitionMarkDoneRecoverFromState);
     }
 
     @Override
-    public void initializeState(
-            StateInitializationContext context, Committer<?, GlobalCommitT> 
committer)
+    protected int recover(List<GlobalCommitT> committables, Committer<?, 
GlobalCommitT> committer)
             throws Exception {
-        streamingCommitterState =
-                new SimpleVersionedListState<>(
-                        context.getOperatorStateStore()
-                                .getListState(
-                                        new ListStateDescriptor<>(
-                                                
"streaming_committer_raw_states",
-                                                
BytePrimitiveArraySerializer.INSTANCE)),
-                        new 
VersionedSerializerWrapper<>(committableSerializer.get()));
-        List<GlobalCommitT> restored = new ArrayList<>();
-        streamingCommitterState.get().forEach(restored::add);
-        streamingCommitterState.clear();
-        recover(restored, committer);
-    }
-
-    private void recover(List<GlobalCommitT> committables, Committer<?, 
GlobalCommitT> committer)
-            throws Exception {
-        int numCommitted =
-                committer.filterAndCommit(committables, true, 
partitionMarkDoneRecoverFromState);
+        int numCommitted = super.recover(committables, committer);
         if (numCommitted > 0) {
             throw new RuntimeException(
                     "This exception is intentionally thrown "
@@ -97,11 +56,6 @@ public class 
RestoreAndFailCommittableStateManager<GlobalCommitT>
                             + "By restarting the job we hope that "
                             + "writers can start writing based on these new 
commits.");
         }
-    }
-
-    @Override
-    public void snapshotState(StateSnapshotContext context, 
List<GlobalCommitT> committables)
-            throws Exception {
-        streamingCommitterState.update(committables);
+        return numCommitted;
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RestoreAndFailCommittableStateManager.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RestoreCommittableStateManager.java
similarity index 75%
copy from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RestoreAndFailCommittableStateManager.java
copy to 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RestoreCommittableStateManager.java
index a9b0922d0b..b1ed396bdc 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RestoreAndFailCommittableStateManager.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RestoreCommittableStateManager.java
@@ -36,14 +36,9 @@ import java.util.List;
 /**
  * A {@link CommittableStateManager} which stores uncommitted {@link 
ManifestCommittable}s in state.
  *
- * <p>When the job restarts, these {@link ManifestCommittable}s will be 
restored and committed, then
- * an intended failure will occur, hoping that after the job restarts, all 
writers can start writing
- * based on the restored snapshot.
- *
- * <p>Useful for committing snapshots containing records. For example 
snapshots produced by table
- * store writers.
+ * <p>When the job restarts, these {@link ManifestCommittable}s will be 
restored and committed.
  */
-public class RestoreAndFailCommittableStateManager<GlobalCommitT>
+public class RestoreCommittableStateManager<GlobalCommitT>
         implements CommittableStateManager<GlobalCommitT> {
 
     private static final long serialVersionUID = 1L;
@@ -56,12 +51,7 @@ public class 
RestoreAndFailCommittableStateManager<GlobalCommitT>
     /** GlobalCommitT state of this job. Used to filter out previous 
successful commits. */
     private ListState<GlobalCommitT> streamingCommitterState;
 
-    public RestoreAndFailCommittableStateManager(
-            SerializableSupplier<VersionedSerializer<GlobalCommitT>> 
committableSerializer) {
-        this(committableSerializer, true);
-    }
-
-    public RestoreAndFailCommittableStateManager(
+    public RestoreCommittableStateManager(
             SerializableSupplier<VersionedSerializer<GlobalCommitT>> 
committableSerializer,
             boolean partitionMarkDoneRecoverFromState) {
         this.committableSerializer = committableSerializer;
@@ -86,17 +76,9 @@ public class 
RestoreAndFailCommittableStateManager<GlobalCommitT>
         recover(restored, committer);
     }
 
-    private void recover(List<GlobalCommitT> committables, Committer<?, 
GlobalCommitT> committer)
+    protected int recover(List<GlobalCommitT> committables, Committer<?, 
GlobalCommitT> committer)
             throws Exception {
-        int numCommitted =
-                committer.filterAndCommit(committables, true, 
partitionMarkDoneRecoverFromState);
-        if (numCommitted > 0) {
-            throw new RuntimeException(
-                    "This exception is intentionally thrown "
-                            + "after committing the restored checkpoints. "
-                            + "By restarting the job we hope that "
-                            + "writers can start writing based on these new 
commits.");
-        }
+        return committer.filterAndCommit(committables, true, 
partitionMarkDoneRecoverFromState);
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowAppendTableSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowAppendTableSink.java
index 69a339a411..a58839a841 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowAppendTableSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowAppendTableSink.java
@@ -19,18 +19,18 @@
 package org.apache.paimon.flink.sink;
 
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.manifest.ManifestCommittable;
 import org.apache.paimon.table.FileStoreTable;
 
-import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
-import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
 
 import java.util.Map;
 
 /** An {@link AppendTableSink} which handles {@link InternalRow}. */
 public class RowAppendTableSink extends AppendTableSink<InternalRow> {
 
+    private static final long serialVersionUID = 1L;
+
     public RowAppendTableSink(
             FileStoreTable table,
             Map<String, String> overwritePartitions,
@@ -42,34 +42,12 @@ public class RowAppendTableSink extends 
AppendTableSink<InternalRow> {
     @Override
     protected OneInputStreamOperatorFactory<InternalRow, Committable> 
createWriteOperatorFactory(
             StoreSinkWrite.Provider writeProvider, String commitUser) {
-        return new RowDataStoreWriteOperator.Factory(
-                table, logSinkFunction, writeProvider, commitUser) {
-            @Override
-            @SuppressWarnings("unchecked, rawtypes")
-            public StreamOperator 
createStreamOperator(StreamOperatorParameters parameters) {
-                return new RowDataStoreWriteOperator(
-                        parameters, table, logSinkFunction, writeProvider, 
commitUser) {
-
-                    @Override
-                    protected StoreSinkWriteState createState(
-                            int subtaskId,
-                            StateInitializationContext context,
-                            StoreSinkWriteState.StateValueFilter stateFilter)
-                            throws Exception {
-                        // No conflicts will occur in append only unaware 
bucket writer, so no state
-                        // is needed.
-                        return new NoopStoreSinkWriteState(subtaskId, 
stateFilter);
-                    }
+        return createNoStateRowWriteOperatorFactory(
+                table, logSinkFunction, writeProvider, commitUser);
+    }
 
-                    @Override
-                    protected String getCommitUser(StateInitializationContext 
context)
-                            throws Exception {
-                        // No conflicts will occur in append only unaware 
bucket writer, so
-                        // commitUser does not matter.
-                        return commitUser;
-                    }
-                };
-            }
-        };
+    @Override
+    protected CommittableStateManager<ManifestCommittable> 
createCommittableStateManager() {
+        return createRestoreOnlyCommittableStateManager(table);
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java
index 32ee2f42af..1bc68477a3 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java
@@ -67,7 +67,7 @@ public class BatchWriteGeneratorTagOperatorTest extends 
CommitterOperatorTest {
                         table,
                         initialCommitUser,
                         new RestoreAndFailCommittableStateManager<>(
-                                ManifestCommittableSerializer::new));
+                                ManifestCommittableSerializer::new, true));
 
         OneInputStreamOperator<Committable, Committable> committerOperator =
                 committerOperatorFactory.createStreamOperator(
@@ -143,7 +143,7 @@ public class BatchWriteGeneratorTagOperatorTest extends 
CommitterOperatorTest {
                         table,
                         initialCommitUser,
                         new RestoreAndFailCommittableStateManager<>(
-                                ManifestCommittableSerializer::new));
+                                ManifestCommittableSerializer::new, true));
 
         OneInputStreamOperator<Committable, Committable> committerOperator =
                 committerOperatorFactory.createStreamOperator(
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
index 4ad1dff9aa..220af9a73b 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
@@ -665,7 +665,7 @@ public class CommitterOperatorTest extends 
CommitterOperatorTestBase {
                         table,
                         null,
                         new RestoreAndFailCommittableStateManager<>(
-                                ManifestCommittableSerializer::new));
+                                ManifestCommittableSerializer::new, true));
         OneInputStreamOperatorTestHarness<Committable, Committable> 
testHarness =
                 createTestHarness(operatorFactory);
         testHarness.open();
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
index a61a379bde..041a692d9e 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
@@ -652,7 +652,7 @@ class StoreMultiCommitterTest {
                         initialCommitUser,
                         context -> new StoreMultiCommitter(catalogLoader, 
context),
                         new RestoreAndFailCommittableStateManager<>(
-                                WrappedManifestCommittableSerializer::new));
+                                WrappedManifestCommittableSerializer::new, 
true));
         return createTestHarness(operator);
     }
 

Reply via email to