This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a8322506 [FLINK-39056][pipeline-connector][iceberg] Fix Duplicate 
Data Issue in Iceberg Sink During Two-Phase Commit (#4269)
7a8322506 is described below

commit 7a8322506318cdeb12c0384b1301e8dde24d0bee
Author: fcfangcc <[email protected]>
AuthorDate: Wed Feb 11 16:17:26 2026 +0800

    [FLINK-39056][pipeline-connector][iceberg] Fix Duplicate Data Issue in 
Iceberg Sink During Two-Phase Commit (#4269)
    
    Co-authored-by: Copilot <[email protected]>
    Co-authored-by: lvyanquan <[email protected]>
    Co-authored-by: Kunni <[email protected]>
---
 .../connectors/iceberg/sink/IcebergDataSink.java   |   9 +-
 .../iceberg/sink/IcebergDataSinkFactory.java       |   6 +-
 .../iceberg/sink/IcebergDataSinkOptions.java       |   8 ++
 .../iceberg/sink/v2/IcebergCommitter.java          |  71 ++++++++++++++-
 .../connectors/iceberg/sink/v2/IcebergSink.java    |  61 ++++++++++++-
 .../connectors/iceberg/sink/v2/IcebergWriter.java  |  46 +++++++++-
 .../iceberg/sink/v2/IcebergWriterState.java        |  72 +++++++++++++++
 .../sink/v2/IcebergWriterStateSerializer.java      |  57 ++++++++++++
 .../iceberg/sink/v2/WriteResultWrapper.java        |  34 ++++++-
 .../iceberg/sink/IcebergDataSinkFactoryTest.java   |   2 +
 .../iceberg/sink/v2/CompactionOperatorTest.java    |  17 +++-
 .../iceberg/sink/v2/IcebergSinkITCase.java         |   7 +-
 .../sink/v2/IcebergWriterStateSerializerTest.java  |  37 ++++++++
 .../iceberg/sink/v2/IcebergWriterTest.java         | 101 ++++++++++++++++++++-
 14 files changed, 506 insertions(+), 22 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSink.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSink.java
index 0581858da..96c2b5f76 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSink.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSink.java
@@ -47,25 +47,30 @@ public class IcebergDataSink implements DataSink, 
Serializable {
 
     public final CompactionOptions compactionOptions;
 
+    public final String jobIdPrefix;
+
     public IcebergDataSink(
             Map<String, String> catalogOptions,
             Map<String, String> tableOptions,
             Map<TableId, List<String>> partitionMaps,
             ZoneId zoneId,
             String schemaOperatorUid,
-            CompactionOptions compactionOptions) {
+            CompactionOptions compactionOptions,
+            String jobIdPrefix) {
         this.catalogOptions = catalogOptions;
         this.tableOptions = tableOptions;
         this.partitionMaps = partitionMaps;
         this.zoneId = zoneId;
         this.schemaOperatorUid = schemaOperatorUid;
         this.compactionOptions = compactionOptions;
+        this.jobIdPrefix = jobIdPrefix;
     }
 
     @Override
     public EventSinkProvider getEventSinkProvider() {
         IcebergSink icebergEventSink =
-                new IcebergSink(catalogOptions, tableOptions, zoneId, 
compactionOptions);
+                new IcebergSink(
+                        catalogOptions, tableOptions, zoneId, 
compactionOptions, jobIdPrefix);
         return FlinkSinkProvider.of(icebergEventSink);
     }
 
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactory.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactory.java
index 80f1df659..c08565294 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactory.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactory.java
@@ -106,6 +106,8 @@ public class IcebergDataSinkFactory implements 
DataSinkFactory {
                 }
             }
         }
+        String jobIdPrefix =
+                
context.getFactoryConfiguration().get(IcebergDataSinkOptions.SINK_JOB_ID_PREFIX);
 
         return new IcebergDataSink(
                 catalogOptions,
@@ -113,7 +115,8 @@ public class IcebergDataSinkFactory implements 
DataSinkFactory {
                 partitionMaps,
                 zoneId,
                 schemaOperatorUid,
-                compactionOptions);
+                compactionOptions,
+                jobIdPrefix);
     }
 
     private CompactionOptions getCompactionStrategy(Configuration 
configuration) {
@@ -144,6 +147,7 @@ public class IcebergDataSinkFactory implements 
DataSinkFactory {
         options.add(IcebergDataSinkOptions.SINK_COMPACTION_ENABLED);
         options.add(IcebergDataSinkOptions.SINK_COMPACTION_COMMIT_INTERVAL);
         options.add(IcebergDataSinkOptions.SINK_COMPACTION_PARALLELISM);
+        options.add(IcebergDataSinkOptions.SINK_JOB_ID_PREFIX);
         return options;
     }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkOptions.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkOptions.java
index 517e1c8eb..f989909fa 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkOptions.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkOptions.java
@@ -78,4 +78,12 @@ public class IcebergDataSinkOptions {
                     .defaultValue(-1)
                     .withDescription(
                             "The parallelism for file compaction, default 
value is -1, which means that compaction parallelism is equal to sink writer 
parallelism.");
+
+    @Experimental
+    public static final ConfigOption<String> SINK_JOB_ID_PREFIX =
+            key("sink.job.id.prefix")
+                    .stringType()
+                    .defaultValue("cdc")
+                    .withDescription(
+                            "The prefix of job id, which is used to 
distinguish different jobs.");
 }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergCommitter.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergCommitter.java
index 9d61a8e43..6cadc3d91 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergCommitter.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergCommitter.java
@@ -28,10 +28,14 @@ import org.apache.iceberg.CatalogUtil;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.RowDelta;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotUpdate;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.flink.sink.SinkUtil;
 import org.apache.iceberg.io.WriteResult;
+import org.apache.iceberg.util.SnapshotUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,6 +48,7 @@ import java.util.Map;
 import java.util.Optional;
 
 import static java.util.stream.Collectors.toList;
+import static 
org.apache.flink.runtime.checkpoint.CheckpointIDCounter.INITIAL_CHECKPOINT_ID;
 
 /** A {@link Committer} for Apache Iceberg. */
 public class IcebergCommitter implements Committer<WriteResultWrapper> {
@@ -83,6 +88,14 @@ public class IcebergCommitter implements 
Committer<WriteResultWrapper> {
     }
 
     private void commit(List<WriteResultWrapper> writeResultWrappers) {
+        if (writeResultWrappers.isEmpty()) {
+            return;
+        }
+        // all commits a same checkpoint-id
+        long checkpointId = writeResultWrappers.get(0).getCheckpointId();
+        String newFlinkJobId = writeResultWrappers.get(0).getJobId();
+        String operatorId = writeResultWrappers.get(0).getOperatorId();
+
         Map<TableId, List<WriteResult>> tableMap = new HashMap<>();
         for (WriteResultWrapper writeResultWrapper : writeResultWrappers) {
             List<WriteResult> writeResult =
@@ -93,11 +106,29 @@ public class IcebergCommitter implements 
Committer<WriteResultWrapper> {
         }
         for (Map.Entry<TableId, List<WriteResult>> entry : 
tableMap.entrySet()) {
             TableId tableId = entry.getKey();
-            Optional<TableMetric> tableMetric = getTableMetric(tableId);
-            tableMetric.ifPresent(TableMetric::increaseCommitTimes);
+
             Table table =
                     catalog.loadTable(
                             TableIdentifier.of(tableId.getSchemaName(), 
tableId.getTableName()));
+
+            Snapshot snapshot = table.currentSnapshot();
+            if (snapshot != null) {
+                Iterable<Snapshot> ancestors =
+                        SnapshotUtil.ancestorsOf(snapshot.snapshotId(), 
table::snapshot);
+                long lastCheckpointId =
+                        getMaxCommittedCheckpointId(ancestors, newFlinkJobId, 
operatorId);
+                if (lastCheckpointId == checkpointId) {
+                    LOGGER.warn(
+                            "Checkpoint id {} has been committed to table {}, 
skipping",
+                            checkpointId,
+                            tableId.identifier());
+                    continue;
+                }
+            }
+
+            Optional<TableMetric> tableMetric = getTableMetric(tableId);
+            tableMetric.ifPresent(TableMetric::increaseCommitTimes);
+
             List<WriteResult> results = entry.getValue();
             List<DataFile> dataFiles =
                     results.stream()
@@ -117,15 +148,47 @@ public class IcebergCommitter implements 
Committer<WriteResultWrapper> {
                 if (deleteFiles.isEmpty()) {
                     AppendFiles append = table.newAppend();
                     dataFiles.forEach(append::appendFile);
-                    append.commit();
+                    commitOperation(append, newFlinkJobId, operatorId, 
checkpointId);
                 } else {
                     RowDelta delta = table.newRowDelta();
                     dataFiles.forEach(delta::addRows);
                     deleteFiles.forEach(delta::addDeletes);
-                    delta.commit();
+                    commitOperation(delta, newFlinkJobId, operatorId, 
checkpointId);
+                }
+            }
+        }
+    }
+
+    private static long getMaxCommittedCheckpointId(
+            Iterable<Snapshot> ancestors, String flinkJobId, String 
operatorId) {
+        long lastCommittedCheckpointId = INITIAL_CHECKPOINT_ID - 1;
+
+        for (Snapshot ancestor : ancestors) {
+            Map<String, String> summary = ancestor.summary();
+            String snapshotFlinkJobId = summary.get(SinkUtil.FLINK_JOB_ID);
+            String snapshotOperatorId = summary.get(SinkUtil.OPERATOR_ID);
+            if (flinkJobId.equals(snapshotFlinkJobId)
+                    && (snapshotOperatorId == null || 
snapshotOperatorId.equals(operatorId))) {
+                String value = 
summary.get(SinkUtil.MAX_COMMITTED_CHECKPOINT_ID);
+                if (value != null) {
+                    lastCommittedCheckpointId = Long.parseLong(value);
+                    break;
                 }
             }
         }
+
+        return lastCommittedCheckpointId;
+    }
+
+    private static void commitOperation(
+            SnapshotUpdate<?> operation,
+            String newFlinkJobId,
+            String operatorId,
+            long checkpointId) {
+        operation.set(SinkUtil.MAX_COMMITTED_CHECKPOINT_ID, 
Long.toString(checkpointId));
+        operation.set(SinkUtil.FLINK_JOB_ID, newFlinkJobId);
+        operation.set(SinkUtil.OPERATOR_ID, operatorId);
+        operation.commit();
     }
 
     private Optional<TableMetric> getTableMetric(TableId tableId) {
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSink.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSink.java
index 2d3269f55..db43adc3a 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSink.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSink.java
@@ -22,6 +22,8 @@ import org.apache.flink.api.connector.sink2.Committer;
 import org.apache.flink.api.connector.sink2.CommitterInitContext;
 import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.connector.sink2.StatefulSinkWriter;
+import org.apache.flink.api.connector.sink2.SupportsWriterState;
 import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
 import org.apache.flink.api.connector.sink2.WriterInitContext;
 import org.apache.flink.cdc.common.event.Event;
@@ -30,6 +32,7 @@ import 
org.apache.flink.cdc.connectors.iceberg.sink.v2.compaction.CompactionOper
 import 
org.apache.flink.cdc.connectors.iceberg.sink.v2.compaction.CompactionOptions;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.metrics.groups.SinkCommitterMetricGroup;
+import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
 import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
 import 
org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo;
 import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
@@ -39,8 +42,10 @@ import 
org.apache.flink.streaming.api.connector.sink2.WithPreWriteTopology;
 import org.apache.flink.streaming.api.datastream.DataStream;
 
 import java.time.ZoneId;
+import java.util.Collection;
 import java.util.Map;
 import java.util.Objects;
+import java.util.UUID;
 
 /** A {@link Sink} implementation for Apache Iceberg. */
 public class IcebergSink
@@ -48,7 +53,8 @@ public class IcebergSink
                 WithPreWriteTopology<Event>,
                 WithPreCommitTopology<Event, WriteResultWrapper>,
                 TwoPhaseCommittingSink<Event, WriteResultWrapper>,
-                WithPostCommitTopology<Event, WriteResultWrapper> {
+                WithPostCommitTopology<Event, WriteResultWrapper>,
+                SupportsWriterState<Event, IcebergWriterState> {
 
     protected final Map<String, String> catalogOptions;
     protected final Map<String, String> tableOptions;
@@ -57,15 +63,22 @@ public class IcebergSink
 
     private final CompactionOptions compactionOptions;
 
+    private String jobId;
+
+    private String operatorId;
+
     public IcebergSink(
             Map<String, String> catalogOptions,
             Map<String, String> tableOptions,
             ZoneId zoneId,
-            CompactionOptions compactionOptions) {
+            CompactionOptions compactionOptions,
+            String jobIdPrefix) {
         this.catalogOptions = catalogOptions;
         this.tableOptions = tableOptions;
         this.zoneId = zoneId;
         this.compactionOptions = compactionOptions;
+        this.jobId = jobIdPrefix + UUID.randomUUID();
+        this.operatorId = UUID.randomUUID().toString();
     }
 
     @Override
@@ -92,20 +105,60 @@ public class IcebergSink
 
     @Override
     public SinkWriter<Event> createWriter(InitContext context) {
+        long lastCheckpointId =
+                context.getRestoredCheckpointId()
+                        .orElse(CheckpointIDCounter.INITIAL_CHECKPOINT_ID - 1);
         return new IcebergWriter(
                 catalogOptions,
                 context.getTaskInfo().getIndexOfThisSubtask(),
                 context.getTaskInfo().getAttemptNumber(),
-                zoneId);
+                zoneId,
+                lastCheckpointId,
+                jobId,
+                operatorId);
     }
 
     @Override
     public SinkWriter<Event> createWriter(WriterInitContext context) {
+        long lastCheckpointId =
+                context.getRestoredCheckpointId()
+                        .orElse(CheckpointIDCounter.INITIAL_CHECKPOINT_ID - 1);
+        return new IcebergWriter(
+                catalogOptions,
+                context.getTaskInfo().getIndexOfThisSubtask(),
+                context.getTaskInfo().getAttemptNumber(),
+                zoneId,
+                lastCheckpointId,
+                jobId,
+                operatorId);
+    }
+
+    @Override
+    public StatefulSinkWriter<Event, IcebergWriterState> restoreWriter(
+            WriterInitContext context, Collection<IcebergWriterState> 
writerStates) {
+        // No need to read checkpointId from state
+        long lastCheckpointId =
+                context.getRestoredCheckpointId()
+                        .orElse(CheckpointIDCounter.INITIAL_CHECKPOINT_ID - 1);
+        if (writerStates != null && !writerStates.isEmpty()) {
+            IcebergWriterState icebergWriterState = 
writerStates.iterator().next();
+            jobId = icebergWriterState.getJobId();
+            operatorId = icebergWriterState.getOperatorId();
+        }
+
         return new IcebergWriter(
                 catalogOptions,
                 context.getTaskInfo().getIndexOfThisSubtask(),
                 context.getTaskInfo().getAttemptNumber(),
-                zoneId);
+                zoneId,
+                lastCheckpointId,
+                jobId,
+                operatorId);
+    }
+
+    @Override
+    public SimpleVersionedSerializer<IcebergWriterState> 
getWriterStateSerializer() {
+        return new IcebergWriterStateSerializer();
     }
 
     @Override
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriter.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriter.java
index 2914f3bca..62e47d897 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriter.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriter.java
@@ -20,6 +20,7 @@ package org.apache.flink.cdc.connectors.iceberg.sink.v2;
 import org.apache.flink.api.common.eventtime.Watermark;
 import org.apache.flink.api.connector.sink2.CommittingSinkWriter;
 import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.connector.sink2.StatefulSinkWriter;
 import org.apache.flink.cdc.common.event.DataChangeEvent;
 import org.apache.flink.cdc.common.event.Event;
 import org.apache.flink.cdc.common.event.SchemaChangeEvent;
@@ -46,12 +47,15 @@ import java.io.IOException;
 import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 /** A {@link SinkWriter} for Apache Iceberg. */
-public class IcebergWriter implements CommittingSinkWriter<Event, 
WriteResultWrapper> {
+public class IcebergWriter
+        implements CommittingSinkWriter<Event, WriteResultWrapper>,
+                StatefulSinkWriter<Event, IcebergWriterState> {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(IcebergWriter.class);
 
@@ -75,8 +79,20 @@ public class IcebergWriter implements 
CommittingSinkWriter<Event, WriteResultWra
 
     private final ZoneId zoneId;
 
+    private long lastCheckpointId;
+
+    private final String jobId;
+
+    private final String operatorId;
+
     public IcebergWriter(
-            Map<String, String> catalogOptions, int taskId, int attemptId, 
ZoneId zoneId) {
+            Map<String, String> catalogOptions,
+            int taskId,
+            int attemptId,
+            ZoneId zoneId,
+            long lastCheckpointId,
+            String jobId,
+            String operatorId) {
         catalog =
                 CatalogUtil.buildIcebergCatalog(
                         this.getClass().getSimpleName(), catalogOptions, new 
Configuration());
@@ -87,14 +103,30 @@ public class IcebergWriter implements 
CommittingSinkWriter<Event, WriteResultWra
         this.taskId = taskId;
         this.attemptId = attemptId;
         this.zoneId = zoneId;
+        this.lastCheckpointId = lastCheckpointId;
+        this.jobId = jobId;
+        this.operatorId = operatorId;
+        LOGGER.info(
+                "IcebergWriter created, taskId: {}, attemptId: {}, 
lastCheckpointId: {}, jobId: {}, operatorId: {}",
+                taskId,
+                attemptId,
+                lastCheckpointId,
+                jobId,
+                operatorId);
+    }
+
+    @Override
+    public List<IcebergWriterState> snapshotState(long checkpointId) {
+        return Collections.singletonList(new IcebergWriterState(jobId, 
operatorId));
     }
 
     @Override
-    public Collection<WriteResultWrapper> prepareCommit() throws IOException, 
InterruptedException {
+    public Collection<WriteResultWrapper> prepareCommit() throws IOException {
         List<WriteResultWrapper> list = new ArrayList<>();
         list.addAll(temporaryWriteResult);
         list.addAll(getWriteResult());
         temporaryWriteResult.clear();
+        lastCheckpointId++;
         return list;
     }
 
@@ -149,10 +181,16 @@ public class IcebergWriter implements 
CommittingSinkWriter<Event, WriteResultWra
     }
 
     private List<WriteResultWrapper> getWriteResult() throws IOException {
+        long currentCheckpointId = lastCheckpointId + 1;
         List<WriteResultWrapper> writeResults = new ArrayList<>();
         for (Map.Entry<TableId, TaskWriter<RowData>> entry : 
writerMap.entrySet()) {
             WriteResultWrapper writeResultWrapper =
-                    new WriteResultWrapper(entry.getValue().complete(), 
entry.getKey());
+                    new WriteResultWrapper(
+                            entry.getValue().complete(),
+                            entry.getKey(),
+                            currentCheckpointId,
+                            jobId,
+                            operatorId);
             writeResults.add(writeResultWrapper);
             LOGGER.info(writeResultWrapper.buildDescription());
         }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterState.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterState.java
new file mode 100644
index 000000000..767e3dca5
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterState.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.iceberg.sink.v2;
+
+import java.util.Objects;
+
+/** The state of the {@link IcebergWriter}. */
+public class IcebergWriterState {
+
+    // The job ID associated with this writer state
+    private final String jobId;
+
+    // The operator ID associated with this writer state
+    private final String operatorId;
+
+    public IcebergWriterState(String jobId, String operatorId) {
+        this.jobId = jobId;
+        this.operatorId = operatorId;
+    }
+
+    public String getJobId() {
+        return jobId;
+    }
+
+    public String getOperatorId() {
+        return operatorId;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(jobId, operatorId);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj == null || getClass() != obj.getClass()) {
+            return false;
+        }
+        IcebergWriterState that = (IcebergWriterState) obj;
+        return Objects.equals(jobId, that.jobId) && Objects.equals(operatorId, 
that.operatorId);
+    }
+
+    @Override
+    public String toString() {
+        return "IcebergWriterState{"
+                + "jobId='"
+                + jobId
+                + '\''
+                + ", operatorId='"
+                + operatorId
+                + '\''
+                + '}';
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterStateSerializer.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterStateSerializer.java
new file mode 100644
index 000000000..139d725fd
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterStateSerializer.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.iceberg.sink.v2;
+
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+
+import java.io.IOException;
+
+/** A {@link IcebergWriterStateSerializer} for {@link IcebergWriterState}. */
+public class IcebergWriterStateSerializer implements 
SimpleVersionedSerializer<IcebergWriterState> {
+
+    private static final int VERSION = 0;
+
+    private static final ThreadLocal<DataOutputSerializer> SERIALIZER_CACHE =
+            ThreadLocal.withInitial(() -> new DataOutputSerializer(64));
+
+    @Override
+    public int getVersion() {
+        return VERSION;
+    }
+
+    @Override
+    public byte[] serialize(IcebergWriterState icebergWriterState) throws 
IOException {
+        final DataOutputSerializer out = SERIALIZER_CACHE.get();
+        out.writeUTF(icebergWriterState.getJobId());
+        out.writeUTF(icebergWriterState.getOperatorId());
+        final byte[] result = out.getCopyOfBuffer();
+        out.clear();
+        return result;
+    }
+
+    @Override
+    public IcebergWriterState deserialize(int version, byte[] serialized) 
throws IOException {
+        if (version != VERSION) {
+            throw new IOException("Unknown version: " + version);
+        }
+        final DataInputDeserializer in = new DataInputDeserializer(serialized);
+        return new IcebergWriterState(in.readUTF(), in.readUTF());
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/WriteResultWrapper.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/WriteResultWrapper.java
index 5aae210c9..e64cc5535 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/WriteResultWrapper.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/WriteResultWrapper.java
@@ -34,9 +34,23 @@ public class WriteResultWrapper implements Serializable {
 
     private final TableId tableId;
 
-    public WriteResultWrapper(WriteResult writeResult, TableId tableId) {
+    private final long checkpointId;
+
+    private final String jobId;
+
+    private final String operatorId;
+
+    public WriteResultWrapper(
+            WriteResult writeResult,
+            TableId tableId,
+            long checkpointId,
+            String jobId,
+            String operatorId) {
         this.writeResult = writeResult;
         this.tableId = tableId;
+        this.checkpointId = checkpointId;
+        this.jobId = jobId;
+        this.operatorId = operatorId;
     }
 
     public WriteResult getWriteResult() {
@@ -47,6 +61,18 @@ public class WriteResultWrapper implements Serializable {
         return tableId;
     }
 
+    public long getCheckpointId() {
+        return checkpointId;
+    }
+
+    public String getJobId() {
+        return jobId;
+    }
+
+    public String getOperatorId() {
+        return operatorId;
+    }
+
     /** Build a simple description for the write result. */
     public String buildDescription() {
         long addCount = 0;
@@ -63,6 +89,12 @@ public class WriteResultWrapper implements Serializable {
         }
         return "WriteResult of "
                 + tableId
+                + ", CheckpointId: "
+                + checkpointId
+                + ", JobId: "
+                + jobId
+                + ", OperatorId: "
+                + operatorId
                 + ", AddCount: "
                 + addCount
                 + ", DeleteCount: "
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactoryTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactoryTest.java
index dff510a41..848fd2584 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactoryTest.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactoryTest.java
@@ -57,11 +57,13 @@ public class IcebergDataSinkFactoryTest {
         Configuration conf = Configuration.fromMap(ImmutableMap.<String, 
String>builder().build());
         conf.set(IcebergDataSinkOptions.WAREHOUSE, "/tmp/warehouse");
         conf.set(IcebergDataSinkOptions.SINK_COMPACTION_PARALLELISM, 4);
+        conf.set(IcebergDataSinkOptions.SINK_JOB_ID_PREFIX, "FlinkCDC");
         DataSink dataSink =
                 sinkFactory.createDataSink(
                         new FactoryHelper.DefaultContext(
                                 conf, conf, 
Thread.currentThread().getContextClassLoader()));
         Assertions.assertThat(dataSink).isInstanceOf(IcebergDataSink.class);
+        Assertions.assertThat(((IcebergDataSink) 
dataSink).jobIdPrefix).isEqualTo("FlinkCDC");
     }
 
     @Test
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/CompactionOperatorTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/CompactionOperatorTest.java
index f73a8c921..6d0fce3eb 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/CompactionOperatorTest.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/CompactionOperatorTest.java
@@ -60,6 +60,7 @@ public class CompactionOperatorTest {
 
     @Test
     public void testCompationOperator() throws IOException, 
InterruptedException {
+        long checkpointId = 0;
         Map<String, String> catalogOptions = new HashMap<>();
         String warehouse =
                 new File(temporaryFolder.toFile(), 
UUID.randomUUID().toString()).toString();
@@ -69,8 +70,17 @@ public class CompactionOperatorTest {
         Catalog catalog =
                 CatalogUtil.buildIcebergCatalog(
                         "cdc-iceberg-catalog", catalogOptions, new 
Configuration());
+        String jobId = UUID.randomUUID().toString();
+        String operatorId = UUID.randomUUID().toString();
         IcebergWriter icebergWriter =
-                new IcebergWriter(catalogOptions, 1, 1, 
ZoneId.systemDefault());
+                new IcebergWriter(
+                        catalogOptions,
+                        1,
+                        1,
+                        ZoneId.systemDefault(),
+                        checkpointId,
+                        jobId,
+                        operatorId);
         IcebergMetadataApplier icebergMetadataApplier = new 
IcebergMetadataApplier(catalogOptions);
         TableId tableId = TableId.parse("test.iceberg_table");
 
@@ -155,7 +165,10 @@ public class CompactionOperatorTest {
         compactionOperator.processElement(
                 new StreamRecord<>(
                         new CommittableWithLineage<>(
-                                new WriteResultWrapper(null, tableId), 0L, 
0)));
+                                new WriteResultWrapper(
+                                        null, tableId, checkpointId, jobId, 
operatorId),
+                                0L,
+                                0)));
         Map<String, String> summary =
                 catalog.loadTable(TableIdentifier.parse(tableId.identifier()))
                         .currentSnapshot()
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSinkITCase.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSinkITCase.java
index cb8daccba..3c36f12cb 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSinkITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSinkITCase.java
@@ -100,7 +100,12 @@ public class IcebergSinkITCase {
         DataStream<Event> stream = env.fromData(events, 
TypeInformation.of(Event.class));
 
         Sink<Event> icebergSink =
-                new IcebergSink(catalogOptions, null, null, 
CompactionOptions.builder().build());
+                new IcebergSink(
+                        catalogOptions,
+                        null,
+                        null,
+                        CompactionOptions.builder().build(),
+                        "FlinkCDC");
         String[] expected = new String[] {"21, 1.732, Disenchanted", "17, 
6.28, Doris Day"};
         stream.sinkTo(icebergSink);
         env.execute("Values to Iceberg Sink");
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterStateSerializerTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterStateSerializerTest.java
new file mode 100644
index 000000000..591c9709f
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterStateSerializerTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.iceberg.sink.v2;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+
+/** Tests for {@link IcebergWriterState} and {@link 
IcebergWriterStateSerializer}. */
+public class IcebergWriterStateSerializerTest {
+
+    @Test
+    public void testSerializer() throws IOException {
+        IcebergWriterState icebergWriterState = new 
IcebergWriterState("jobId", "operatorId");
+        IcebergWriterStateSerializer icebergWriterStateSerializer =
+                new IcebergWriterStateSerializer();
+        byte[] bytes = 
icebergWriterStateSerializer.serialize(icebergWriterState);
+        Assertions.assertThat(icebergWriterStateSerializer.deserialize(0, 
bytes))
+                .isEqualTo(icebergWriterState);
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterTest.java
index 2e28a39d0..b16b88931 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterTest.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterTest.java
@@ -50,6 +50,7 @@ import org.apache.iceberg.data.IcebergGenerics;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.flink.sink.SinkUtil;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.types.Types;
 import org.assertj.core.api.Assertions;
@@ -88,8 +89,11 @@ public class IcebergWriterTest {
         Catalog catalog =
                 CatalogUtil.buildIcebergCatalog(
                         "cdc-iceberg-catalog", catalogOptions, new 
Configuration());
+        String jobId = UUID.randomUUID().toString();
+        String operatorId = UUID.randomUUID().toString();
         IcebergWriter icebergWriter =
-                new IcebergWriter(catalogOptions, 1, 1, 
ZoneId.systemDefault());
+                new IcebergWriter(
+                        catalogOptions, 1, 1, ZoneId.systemDefault(), 0, 
jobId, operatorId);
         IcebergMetadataApplier icebergMetadataApplier = new 
IcebergMetadataApplier(catalogOptions);
         TableId tableId = TableId.parse("test.iceberg_table");
 
@@ -277,7 +281,10 @@ public class IcebergWriterTest {
                 CatalogUtil.buildIcebergCatalog(
                         "cdc-iceberg-catalog", catalogOptions, new 
Configuration());
         ZoneId pipelineZoneId = ZoneId.systemDefault();
-        IcebergWriter icebergWriter = new IcebergWriter(catalogOptions, 1, 1, 
pipelineZoneId);
+        String jobId = UUID.randomUUID().toString();
+        String operatorId = UUID.randomUUID().toString();
+        IcebergWriter icebergWriter =
+                new IcebergWriter(catalogOptions, 1, 1, pipelineZoneId, 0, 
jobId, operatorId);
         IcebergMetadataApplier icebergMetadataApplier = new 
IcebergMetadataApplier(catalogOptions);
         TableId tableId = TableId.parse("test.iceberg_table");
 
@@ -382,8 +389,11 @@ public class IcebergWriterTest {
         Catalog catalog =
                 CatalogUtil.buildIcebergCatalog(
                         "cdc-iceberg-catalog", catalogOptions, new 
Configuration());
+        String jobId = UUID.randomUUID().toString();
+        String operatorId = UUID.randomUUID().toString();
         IcebergWriter icebergWriter =
-                new IcebergWriter(catalogOptions, 1, 1, 
ZoneId.systemDefault());
+                new IcebergWriter(
+                        catalogOptions, 1, 1, ZoneId.systemDefault(), 0, 
jobId, operatorId);
 
         TableId tableId = TableId.parse("test.iceberg_table");
         Map<TableId, List<String>> partitionMaps = new HashMap<>();
@@ -457,6 +467,91 @@ public class IcebergWriterTest {
         Assertions.assertThat(result.size()).isEqualTo(2);
     }
 
+    @Test
+    public void testWithRepeatCommit() throws Exception {
+        Map<String, String> catalogOptions = new HashMap<>();
+        String warehouse =
+                new File(temporaryFolder.toFile(), 
UUID.randomUUID().toString()).toString();
+        catalogOptions.put("type", "hadoop");
+        catalogOptions.put("warehouse", warehouse);
+        catalogOptions.put("cache-enabled", "false");
+        Catalog catalog =
+                CatalogUtil.buildIcebergCatalog(
+                        "cdc-iceberg-catalog", catalogOptions, new 
Configuration());
+        ZoneId pipelineZoneId = ZoneId.systemDefault();
+        String jobId = UUID.randomUUID().toString();
+        String operatorId = UUID.randomUUID().toString();
+        IcebergWriter icebergWriter =
+                new IcebergWriter(catalogOptions, 1, 1, pipelineZoneId, 0, 
jobId, operatorId);
+        IcebergMetadataApplier icebergMetadataApplier = new 
IcebergMetadataApplier(catalogOptions);
+        TableId tableId = TableId.parse("test.iceberg_table");
+        TableIdentifier tableIdentifier =
+                TableIdentifier.of(tableId.getSchemaName(), 
tableId.getTableName());
+        // Create Table.
+        CreateTableEvent createTableEvent =
+                new CreateTableEvent(
+                        tableId,
+                        Schema.newBuilder()
+                                .physicalColumn(
+                                        "id",
+                                        DataTypes.BIGINT().notNull(),
+                                        "column for id",
+                                        "AUTO_DECREMENT()")
+                                .physicalColumn(
+                                        "name", DataTypes.VARCHAR(100), 
"column for name", null)
+                                .primaryKey("id")
+                                .build());
+        icebergMetadataApplier.applySchemaChange(createTableEvent);
+        icebergWriter.write(createTableEvent, null);
+        BinaryRecordDataGenerator dataGenerator =
+                new BinaryRecordDataGenerator(
+                        
createTableEvent.getSchema().getColumnDataTypes().toArray(new DataType[0]));
+        BinaryRecordData record1 =
+                dataGenerator.generate(
+                        new Object[] {
+                            1L, BinaryStringData.fromString("char1"),
+                        });
+        DataChangeEvent dataChangeEvent = DataChangeEvent.insertEvent(tableId, 
record1);
+        icebergWriter.write(dataChangeEvent, null);
+        Collection<WriteResultWrapper> writeResults = 
icebergWriter.prepareCommit();
+        IcebergCommitter icebergCommitter = new 
IcebergCommitter(catalogOptions);
+        Collection<Committer.CommitRequest<WriteResultWrapper>> collection =
+                
writeResults.stream().map(MockCommitRequestImpl::new).collect(Collectors.toList());
+        icebergCommitter.commit(collection);
+        List<String> result = fetchTableContent(catalog, tableId, null);
+        Assertions.assertThat(result.size()).isEqualTo(1);
+        Assertions.assertThat(result).containsExactlyInAnyOrder("1, char1");
+        Map<String, String> summary =
+                catalog.loadTable(tableIdentifier).currentSnapshot().summary();
+        
Assertions.assertThat(summary.get(SinkUtil.MAX_COMMITTED_CHECKPOINT_ID)).isEqualTo("1");
+        
Assertions.assertThat(summary.get(SinkUtil.FLINK_JOB_ID)).isEqualTo(jobId);
+        
Assertions.assertThat(summary.get(SinkUtil.OPERATOR_ID)).isEqualTo(operatorId);
+
+        // repeat commit with same committables, should not cause duplicate 
data.
+        BinaryRecordData record2 =
+                dataGenerator.generate(
+                        new Object[] {
+                            2L, BinaryStringData.fromString("char2"),
+                        });
+        DataChangeEvent dataChangeEvent2 = 
DataChangeEvent.insertEvent(tableId, record2);
+        icebergWriter.write(dataChangeEvent2, null);
+        writeResults = icebergWriter.prepareCommit();
+        collection =
+                
writeResults.stream().map(MockCommitRequestImpl::new).collect(Collectors.toList());
+        icebergCommitter.commit(collection);
+        icebergCommitter.commit(collection);
+        summary = 
catalog.loadTable(tableIdentifier).currentSnapshot().summary();
+        Assertions.assertThat(summary.get("total-data-files")).isEqualTo("2");
+        Assertions.assertThat(summary.get("added-records")).isEqualTo("1");
+        
Assertions.assertThat(summary.get(SinkUtil.MAX_COMMITTED_CHECKPOINT_ID)).isEqualTo("2");
+        
Assertions.assertThat(summary.get(SinkUtil.FLINK_JOB_ID)).isEqualTo(jobId);
+        
Assertions.assertThat(summary.get(SinkUtil.OPERATOR_ID)).isEqualTo(operatorId);
+
+        result = fetchTableContent(catalog, tableId, null);
+        Assertions.assertThat(result.size()).isEqualTo(2);
+        Assertions.assertThat(result).containsExactlyInAnyOrder("1, char1", 
"2, char2");
+    }
+
     /** Mock CommitRequestImpl. */
     public static class MockCommitRequestImpl<CommT> extends 
CommitRequestImpl<CommT> {
 


Reply via email to