This is an automated email from the ASF dual-hosted git repository.

huaxingao pushed a commit to branch 1.10.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/1.10.x by this push:
     new 16b7dcb092 [1.10.x] cherry pick Flink: Ensure DynamicCommitter 
Idempotence in the presence of failures (#14461)
16b7dcb092 is described below

commit 16b7dcb092d5b712815c41b469852db7389c6852
Author: Huaxin Gao <[email protected]>
AuthorDate: Fri Oct 31 20:45:50 2025 -0700

    [1.10.x] cherry pick Flink: Ensure DynamicCommitter Idempotence in the 
presence of failures (#14461)
    
    * Flink: Ensure DynamicCommitter Idempotence in the presence of failures 
(#14182)
    
    (cherry picked from commit 3860284b763a3a744e9ebb3b58278c4ba91f1f5d)
    
    * Flink: Backport #14182: Ensure DynamicCommitter Idempotence in the 
presence of failures (#14213)
    
    (cherry picked from commit 441597e22ef3ec1ea03fd837cbc1e5dffce899a4)
    
    ---------
    
    Co-authored-by: Maximilian Michels <[email protected]>
---
 .../flink/sink/dynamic/DynamicCommitter.java       |  55 ++-
 .../flink/sink/dynamic/TestDynamicCommitter.java   | 535 ++++++++++++++++++---
 .../flink/sink/dynamic/TestDynamicIcebergSink.java |  92 +---
 .../flink/sink/dynamic/DynamicCommitter.java       |  55 ++-
 .../flink/sink/dynamic/TestDynamicCommitter.java   | 535 ++++++++++++++++++---
 .../flink/sink/dynamic/TestDynamicIcebergSink.java |  92 +---
 .../flink/sink/dynamic/DynamicCommitter.java       |  55 ++-
 .../flink/sink/dynamic/TestDynamicCommitter.java   | 535 ++++++++++++++++++---
 .../flink/sink/dynamic/TestDynamicIcebergSink.java |  92 +---
 9 files changed, 1485 insertions(+), 561 deletions(-)

diff --git 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java
 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java
index e58066aac6..54d506b663 100644
--- 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java
+++ 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java
@@ -274,26 +274,25 @@ class DynamicCommitter implements 
Committer<DynamicCommittable> {
       CommitSummary summary,
       String newFlinkJobId,
       String operatorId) {
-    for (Map.Entry<Long, List<WriteResult>> e : pendingResults.entrySet()) {
-      // We don't commit the merged result into a single transaction because 
for the sequential
-      // transaction txn1 and txn2, the equality-delete files of txn2 are 
required to be applied
-      // to data files from txn1. Committing the merged one will lead to the 
incorrect delete
-      // semantic.
-      for (WriteResult result : e.getValue()) {
-        ReplacePartitions dynamicOverwrite =
-            table.newReplacePartitions().scanManifestsWith(workerPool);
+    // Iceberg tables are unsorted. So the order of the append data does not 
matter.
+    // Hence, we commit everything in one snapshot.
+    ReplacePartitions dynamicOverwrite = 
table.newReplacePartitions().scanManifestsWith(workerPool);
+
+    for (List<WriteResult> writeResults : pendingResults.values()) {
+      for (WriteResult result : writeResults) {
         Arrays.stream(result.dataFiles()).forEach(dynamicOverwrite::addFile);
-        commitOperation(
-            table,
-            branch,
-            dynamicOverwrite,
-            summary,
-            "dynamic partition overwrite",
-            newFlinkJobId,
-            operatorId,
-            e.getKey());
       }
     }
+
+    commitOperation(
+        table,
+        branch,
+        dynamicOverwrite,
+        summary,
+        "dynamic partition overwrite",
+        newFlinkJobId,
+        operatorId,
+        pendingResults.lastKey());
   }
 
   private void commitDeltaTxn(
@@ -304,11 +303,11 @@ class DynamicCommitter implements 
Committer<DynamicCommittable> {
       String newFlinkJobId,
       String operatorId) {
     for (Map.Entry<Long, List<WriteResult>> e : pendingResults.entrySet()) {
-      // We don't commit the merged result into a single transaction because 
for the sequential
-      // transaction txn1 and txn2, the equality-delete files of txn2 are 
required to be applied
-      // to data files from txn1. Committing the merged one will lead to the 
incorrect delete
-      // semantic.
-      for (WriteResult result : e.getValue()) {
+      long checkpointId = e.getKey();
+      List<WriteResult> writeResults = e.getValue();
+
+      RowDelta rowDelta = table.newRowDelta().scanManifestsWith(workerPool);
+      for (WriteResult result : writeResults) {
         // Row delta validations are not needed for streaming changes that 
write equality deletes.
         // Equality deletes are applied to data in all previous sequence 
numbers, so retries may
         // push deletes further in the future, but do not affect correctness. 
Position deletes
@@ -316,13 +315,17 @@ class DynamicCommitter implements 
Committer<DynamicCommittable> {
         // being added in this commit. There is no way for data files added 
along with the delete
         // files to be concurrently removed, so there is no need to validate 
the files referenced by
         // the position delete files that are being committed.
-        RowDelta rowDelta = table.newRowDelta().scanManifestsWith(workerPool);
-
         Arrays.stream(result.dataFiles()).forEach(rowDelta::addRows);
         Arrays.stream(result.deleteFiles()).forEach(rowDelta::addDeletes);
-        commitOperation(
-            table, branch, rowDelta, summary, "rowDelta", newFlinkJobId, 
operatorId, e.getKey());
       }
+
+      // Every Flink checkpoint contains a set of independent changes which 
can be committed
+      // together. While it is technically feasible to combine append-only 
data across checkpoints,
+      // for the sake of simplicity, we do not implement this (premature) 
optimization. Multiple
+      // pending checkpoints here are very rare to occur, i.e. only with very 
short checkpoint
+      // intervals or when concurrent checkpointing is enabled.
+      commitOperation(
+          table, branch, rowDelta, summary, "rowDelta", newFlinkJobId, 
operatorId, checkpointId);
     }
   }
 
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
index 99a5465362..f5387aee88 100644
--- 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
@@ -19,8 +19,13 @@
 package org.apache.iceberg.flink.sink.dynamic;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assertions.fail;
 
+import java.io.IOException;
+import java.io.Serializable;
 import java.nio.ByteBuffer;
+import java.util.Collection;
 import java.util.Map;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.connector.sink2.Committer.CommitRequest;
@@ -30,19 +35,24 @@ import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileMetadata;
 import org.apache.iceberg.Metrics;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotUpdate;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.flink.HadoopCatalogExtension;
+import org.apache.iceberg.flink.sink.CommitSummary;
 import org.apache.iceberg.io.WriteResult;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.assertj.core.api.ThrowableAssert.ThrowingCallable;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
@@ -74,6 +84,39 @@ class TestDynamicCommitter {
                   ))
           .build();
 
+  private static final DataFile DATA_FILE_2 =
+      DataFiles.builder(PartitionSpec.unpartitioned())
+          .withPath("/path/to/data-2.parquet")
+          .withFileSizeInBytes(0)
+          .withMetrics(
+              new Metrics(
+                  24L,
+                  null, // no column sizes
+                  ImmutableMap.of(1, 3L), // value count
+                  ImmutableMap.of(1, 0L), // null count
+                  null,
+                  ImmutableMap.of(1, ByteBuffer.allocate(1)), // lower bounds
+                  ImmutableMap.of(1, ByteBuffer.allocate(1)) // upper bounds
+                  ))
+          .build();
+
+  private static final DeleteFile DELETE_FILE =
+      FileMetadata.deleteFileBuilder(PartitionSpec.unpartitioned())
+          .withPath("/path/to/data-3.parquet")
+          .withFileSizeInBytes(0)
+          .withMetrics(
+              new Metrics(
+                  24L,
+                  null, // no column sizes
+                  ImmutableMap.of(1, 3L), // value count
+                  ImmutableMap.of(1, 0L), // null count
+                  null,
+                  ImmutableMap.of(1, ByteBuffer.allocate(1)), // lower bounds
+                  ImmutableMap.of(1, ByteBuffer.allocate(1)) // upper bounds
+                  ))
+          .ofPositionDeletes()
+          .build();
+
   @BeforeEach
   void before() {
     catalog = CATALOG_EXTENSION.catalog();
@@ -162,60 +205,57 @@ class TestDynamicCommitter {
     Snapshot first = Iterables.getFirst(table1.snapshots(), null);
     assertThat(first.summary())
         .containsAllEntriesOf(
-            (Map)
-                ImmutableMap.builder()
-                    .put("added-data-files", "1")
-                    .put("added-records", "42")
-                    .put("changed-partition-count", "1")
-                    .put("flink.job-id", jobId)
-                    .put("flink.max-committed-checkpoint-id", "" + 
checkpointId)
-                    .put("flink.operator-id", operatorId)
-                    .put("total-data-files", "1")
-                    .put("total-delete-files", "0")
-                    .put("total-equality-deletes", "0")
-                    .put("total-files-size", "0")
-                    .put("total-position-deletes", "0")
-                    .put("total-records", "42")
-                    .build());
+            ImmutableMap.<String, String>builder()
+                .put("added-data-files", "1")
+                .put("added-records", "42")
+                .put("changed-partition-count", "1")
+                .put("flink.job-id", jobId)
+                .put("flink.max-committed-checkpoint-id", "" + checkpointId)
+                .put("flink.operator-id", operatorId)
+                .put("total-data-files", "1")
+                .put("total-delete-files", "0")
+                .put("total-equality-deletes", "0")
+                .put("total-files-size", "0")
+                .put("total-position-deletes", "0")
+                .put("total-records", "42")
+                .build());
     Snapshot second = Iterables.get(table1.snapshots(), 1, null);
     assertThat(second.summary())
         .containsAllEntriesOf(
-            (Map)
-                ImmutableMap.builder()
-                    .put("added-data-files", "1")
-                    .put("added-records", "42")
-                    .put("changed-partition-count", "1")
-                    .put("flink.job-id", jobId)
-                    .put("flink.max-committed-checkpoint-id", "" + 
checkpointId)
-                    .put("flink.operator-id", operatorId)
-                    .put("total-data-files", "1")
-                    .put("total-delete-files", "0")
-                    .put("total-equality-deletes", "0")
-                    .put("total-files-size", "0")
-                    .put("total-position-deletes", "0")
-                    .put("total-records", "42")
-                    .build());
+            ImmutableMap.<String, String>builder()
+                .put("added-data-files", "1")
+                .put("added-records", "42")
+                .put("changed-partition-count", "1")
+                .put("flink.job-id", jobId)
+                .put("flink.max-committed-checkpoint-id", "" + checkpointId)
+                .put("flink.operator-id", operatorId)
+                .put("total-data-files", "1")
+                .put("total-delete-files", "0")
+                .put("total-equality-deletes", "0")
+                .put("total-files-size", "0")
+                .put("total-position-deletes", "0")
+                .put("total-records", "42")
+                .build());
 
     table2.refresh();
     assertThat(table2.snapshots()).hasSize(1);
     Snapshot third = Iterables.getFirst(table2.snapshots(), null);
     assertThat(third.summary())
         .containsAllEntriesOf(
-            (Map)
-                ImmutableMap.builder()
-                    .put("added-data-files", "1")
-                    .put("added-records", "42")
-                    .put("changed-partition-count", "1")
-                    .put("flink.job-id", jobId)
-                    .put("flink.max-committed-checkpoint-id", "" + 
checkpointId)
-                    .put("flink.operator-id", operatorId)
-                    .put("total-data-files", "1")
-                    .put("total-delete-files", "0")
-                    .put("total-equality-deletes", "0")
-                    .put("total-files-size", "0")
-                    .put("total-position-deletes", "0")
-                    .put("total-records", "42")
-                    .build());
+            ImmutableMap.<String, String>builder()
+                .put("added-data-files", "1")
+                .put("added-records", "42")
+                .put("changed-partition-count", "1")
+                .put("flink.job-id", jobId)
+                .put("flink.max-committed-checkpoint-id", "" + checkpointId)
+                .put("flink.operator-id", operatorId)
+                .put("total-data-files", "1")
+                .put("total-delete-files", "0")
+                .put("total-equality-deletes", "0")
+                .put("total-files-size", "0")
+                .put("total-position-deletes", "0")
+                .put("total-records", "42")
+                .build());
   }
 
   @Test
@@ -277,21 +317,276 @@ class TestDynamicCommitter {
     Snapshot first = Iterables.getFirst(table1.snapshots(), null);
     assertThat(first.summary())
         .containsAllEntriesOf(
-            (Map)
-                ImmutableMap.builder()
-                    .put("added-data-files", "1")
-                    .put("added-records", "42")
-                    .put("changed-partition-count", "1")
-                    .put("flink.job-id", jobId)
-                    .put("flink.max-committed-checkpoint-id", "" + 
checkpointId)
-                    .put("flink.operator-id", operatorId)
-                    .put("total-data-files", "1")
-                    .put("total-delete-files", "0")
-                    .put("total-equality-deletes", "0")
-                    .put("total-files-size", "0")
-                    .put("total-position-deletes", "0")
-                    .put("total-records", "42")
-                    .build());
+            ImmutableMap.<String, String>builder()
+                .put("added-data-files", "1")
+                .put("added-records", "42")
+                .put("changed-partition-count", "1")
+                .put("flink.job-id", jobId)
+                .put("flink.max-committed-checkpoint-id", "" + checkpointId)
+                .put("flink.operator-id", operatorId)
+                .put("total-data-files", "1")
+                .put("total-delete-files", "0")
+                .put("total-equality-deletes", "0")
+                .put("total-files-size", "0")
+                .put("total-position-deletes", "0")
+                .put("total-records", "42")
+                .build());
+  }
+
+  @Test
+  void testTableBranchAtomicCommitForAppendOnlyData() throws Exception {
+    Table table = catalog.loadTable(TableIdentifier.of(TABLE1));
+    assertThat(table.snapshots()).isEmpty();
+
+    DynamicWriteResultAggregator aggregator =
+        new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader());
+    OneInputStreamOperatorTestHarness aggregatorHarness =
+        new OneInputStreamOperatorTestHarness(aggregator);
+    aggregatorHarness.open();
+
+    WriteTarget writeTarget1 =
+        new WriteTarget(TABLE1, "branch1", 42, 0, true, Sets.newHashSet(1, 2));
+    // writeTarget2 has a different schema
+    WriteTarget writeTarget2 = new WriteTarget(TABLE1, "branch1", 23, 0, true, 
Sets.newHashSet());
+    // Different branch for writeTarget3
+    WriteTarget writeTarget3 = new WriteTarget(TABLE1, "branch2", 23, 0, true, 
Sets.newHashSet());
+
+    WriteResult writeResult1 = 
WriteResult.builder().addDataFiles(DATA_FILE).build();
+    WriteResult writeResult2 = 
WriteResult.builder().addDataFiles(DATA_FILE_2).build();
+
+    final String jobId = JobID.generate().toHexString();
+    final String operatorId = new OperatorID().toHexString();
+    final int checkpointId1 = 1;
+    final int checkpointId2 = 2;
+
+    byte[] deltaManifest1 =
+        aggregator.writeToManifest(
+            writeTarget1,
+            Sets.newHashSet(new DynamicWriteResult(writeTarget1, 
writeResult1)),
+            checkpointId1);
+
+    CommitRequest<DynamicCommittable> commitRequest1 =
+        new MockCommitRequest<>(
+            new DynamicCommittable(writeTarget1, deltaManifest1, jobId, 
operatorId, checkpointId1));
+
+    byte[] deltaManifest2 =
+        aggregator.writeToManifest(
+            writeTarget2,
+            Sets.newHashSet(new DynamicWriteResult(writeTarget2, 
writeResult2)),
+            checkpointId1);
+
+    CommitRequest<DynamicCommittable> commitRequest2 =
+        new MockCommitRequest<>(
+            new DynamicCommittable(writeTarget2, deltaManifest2, jobId, 
operatorId, checkpointId1));
+
+    byte[] deltaManifest3 =
+        aggregator.writeToManifest(
+            writeTarget3,
+            Sets.newHashSet(new DynamicWriteResult(writeTarget3, 
writeResult2)),
+            checkpointId2);
+
+    CommitRequest<DynamicCommittable> commitRequest3 =
+        new MockCommitRequest<>(
+            new DynamicCommittable(writeTarget3, deltaManifest3, jobId, 
operatorId, checkpointId2));
+
+    boolean overwriteMode = false;
+    int workerPoolSize = 1;
+    String sinkId = "sinkId";
+    UnregisteredMetricsGroup metricGroup = new UnregisteredMetricsGroup();
+    DynamicCommitterMetrics committerMetrics = new 
DynamicCommitterMetrics(metricGroup);
+    DynamicCommitter dynamicCommitter =
+        new DynamicCommitter(
+            CATALOG_EXTENSION.catalog(),
+            Maps.newHashMap(),
+            overwriteMode,
+            workerPoolSize,
+            sinkId,
+            committerMetrics);
+
+    dynamicCommitter.commit(Sets.newHashSet(commitRequest1, commitRequest2, 
commitRequest3));
+
+    table.refresh();
+    // Two committables, one for each snapshot / table / branch.
+    assertThat(table.snapshots()).hasSize(2);
+
+    Snapshot snapshot1 = Iterables.getFirst(table.snapshots(), null);
+    
assertThat(snapshot1.snapshotId()).isEqualTo(table.refs().get("branch1").snapshotId());
+    assertThat(snapshot1.summary())
+        .containsAllEntriesOf(
+            ImmutableMap.<String, String>builder()
+                .put("added-data-files", "2")
+                .put("added-records", "66")
+                .put("changed-partition-count", "1")
+                .put("flink.job-id", jobId)
+                .put("flink.max-committed-checkpoint-id", "" + checkpointId1)
+                .put("flink.operator-id", operatorId)
+                .put("total-data-files", "2")
+                .put("total-delete-files", "0")
+                .put("total-equality-deletes", "0")
+                .put("total-files-size", "0")
+                .put("total-position-deletes", "0")
+                .put("total-records", "66")
+                .build());
+
+    Snapshot snapshot2 = Iterables.get(table.snapshots(), 1);
+    
assertThat(snapshot2.snapshotId()).isEqualTo(table.refs().get("branch2").snapshotId());
+    assertThat(snapshot2.summary())
+        .containsAllEntriesOf(
+            ImmutableMap.<String, String>builder()
+                .put("added-data-files", "1")
+                .put("added-records", "24")
+                .put("changed-partition-count", "1")
+                .put("flink.job-id", jobId)
+                .put("flink.max-committed-checkpoint-id", "" + checkpointId2)
+                .put("flink.operator-id", operatorId)
+                .put("total-data-files", "1")
+                .put("total-delete-files", "0")
+                .put("total-equality-deletes", "0")
+                .put("total-files-size", "0")
+                .put("total-position-deletes", "0")
+                .put("total-records", "24")
+                .build());
+  }
+
+  @Test
+  void testTableBranchAtomicCommitWithFailures() throws Exception {
+    Table table = catalog.loadTable(TableIdentifier.of(TABLE1));
+    assertThat(table.snapshots()).isEmpty();
+
+    DynamicWriteResultAggregator aggregator =
+        new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader());
+    OneInputStreamOperatorTestHarness aggregatorHarness =
+        new OneInputStreamOperatorTestHarness(aggregator);
+    aggregatorHarness.open();
+
+    WriteTarget writeTarget1 = new WriteTarget(TABLE1, "branch", 42, 0, false, 
Sets.newHashSet());
+    // writeTarget2 has a different schema
+    WriteTarget writeTarget2 = new WriteTarget(TABLE1, "branch", 23, 0, false, 
Sets.newHashSet());
+    WriteTarget writeTarget3 = new WriteTarget(TABLE1, "branch", 23, 0, false, 
Sets.newHashSet());
+
+    WriteResult writeResult1 = 
WriteResult.builder().addDataFiles(DATA_FILE).build();
+    WriteResult writeResult2 = 
WriteResult.builder().addDeleteFiles(DELETE_FILE).build();
+    WriteResult writeResult3 = 
WriteResult.builder().addDataFiles(DATA_FILE).build();
+
+    final String jobId = JobID.generate().toHexString();
+    final String operatorId = new OperatorID().toHexString();
+    final int checkpointId1 = 1;
+    final int checkpointId2 = 2;
+
+    byte[] deltaManifest1 =
+        aggregator.writeToManifest(
+            writeTarget1,
+            Sets.newHashSet(new DynamicWriteResult(writeTarget1, 
writeResult1)),
+            checkpointId1);
+
+    CommitRequest<DynamicCommittable> commitRequest1 =
+        new MockCommitRequest<>(
+            new DynamicCommittable(writeTarget1, deltaManifest1, jobId, 
operatorId, checkpointId1));
+
+    byte[] deltaManifest2 =
+        aggregator.writeToManifest(
+            writeTarget2,
+            Sets.newHashSet(new DynamicWriteResult(writeTarget2, 
writeResult2)),
+            checkpointId2);
+
+    CommitRequest<DynamicCommittable> commitRequest2 =
+        new MockCommitRequest<>(
+            new DynamicCommittable(writeTarget2, deltaManifest2, jobId, 
operatorId, checkpointId2));
+
+    byte[] deltaManifest3 =
+        aggregator.writeToManifest(
+            writeTarget3,
+            Sets.newHashSet(new DynamicWriteResult(writeTarget3, 
writeResult3)),
+            checkpointId2);
+
+    CommitRequest<DynamicCommittable> commitRequest3 =
+        new MockCommitRequest<>(
+            new DynamicCommittable(writeTarget3, deltaManifest3, jobId, 
operatorId, checkpointId2));
+
+    boolean overwriteMode = false;
+    int workerPoolSize = 1;
+    String sinkId = "sinkId";
+    UnregisteredMetricsGroup metricGroup = new UnregisteredMetricsGroup();
+    DynamicCommitterMetrics committerMetrics = new 
DynamicCommitterMetrics(metricGroup);
+
+    // Use special hook to fail during various states of the commit operation
+    CommitHook commitHook = new FailBeforeAndAfterCommit();
+    DynamicCommitter dynamicCommitter =
+        new CommitHookEnabledDynamicCommitter(
+            commitHook,
+            CATALOG_EXTENSION.catalog(),
+            Maps.newHashMap(),
+            overwriteMode,
+            workerPoolSize,
+            sinkId,
+            committerMetrics);
+
+    ThrowingCallable commitExecutable =
+        () ->
+            dynamicCommitter.commit(
+                Sets.newHashSet(commitRequest1, commitRequest2, 
commitRequest3));
+
+    // First fail pre-commit
+    assertThatThrownBy(commitExecutable);
+    assertThat(FailBeforeAndAfterCommit.failedBeforeCommit).isTrue();
+
+    // Second fail during commit
+    assertThatThrownBy(commitExecutable);
+    assertThat(FailBeforeAndAfterCommit.failedDuringCommit).isTrue();
+
+    // Third fail after commit
+    assertThatThrownBy(commitExecutable);
+    assertThat(FailBeforeAndAfterCommit.failedAfterCommit).isTrue();
+
+    // Finally commit must go through, although it is a NOOP because the third 
failure is directly
+    // after the commit finished.
+    try {
+      commitExecutable.call();
+    } catch (Throwable e) {
+      fail("Should not have thrown an exception");
+    }
+
+    table.refresh();
+    // Three committables, but only two snapshots! WriteResults from different 
checkpoints are not
+    // getting
+    // combined due to one writeResult2 containing a delete file.
+    assertThat(table.snapshots()).hasSize(2);
+
+    Snapshot snapshot1 = Iterables.getFirst(table.snapshots(), null);
+    assertThat(snapshot1.summary())
+        .containsAllEntriesOf(
+            ImmutableMap.<String, String>builder()
+                .put("added-data-files", "1")
+                .put("added-records", "42")
+                .put("changed-partition-count", "1")
+                .put("flink.job-id", jobId)
+                .put("flink.max-committed-checkpoint-id", "" + checkpointId1)
+                .put("flink.operator-id", operatorId)
+                .put("total-data-files", "1")
+                .put("total-delete-files", "0")
+                .put("total-equality-deletes", "0")
+                .put("total-files-size", "0")
+                .put("total-position-deletes", "0")
+                .put("total-records", "42")
+                .build());
+
+    Snapshot snapshot2 = Iterables.get(table.snapshots(), 1);
+    assertThat(snapshot2.summary())
+        .containsAllEntriesOf(
+            ImmutableMap.<String, String>builder()
+                .put("added-data-files", "1")
+                .put("added-records", "42")
+                .put("changed-partition-count", "1")
+                .put("flink.job-id", jobId)
+                .put("flink.max-committed-checkpoint-id", "" + checkpointId2)
+                .put("flink.operator-id", operatorId)
+                .put("total-data-files", "2")
+                .put("total-delete-files", "1")
+                .put("total-equality-deletes", "0")
+                .put("total-files-size", "0")
+                .put("total-position-deletes", "24")
+                .put("total-records", "84")
+                .build());
   }
 
   @Test
@@ -361,21 +656,109 @@ class TestDynamicCommitter {
     Snapshot latestSnapshot = Iterables.getLast(table1.snapshots());
     assertThat(latestSnapshot.summary())
         .containsAllEntriesOf(
-            (Map)
-                ImmutableMap.builder()
-                    .put("replace-partitions", "true")
-                    .put("added-data-files", "1")
-                    .put("added-records", "42")
-                    .put("changed-partition-count", "1")
-                    .put("flink.job-id", jobId)
-                    .put("flink.max-committed-checkpoint-id", 
String.valueOf(checkpointId + 1))
-                    .put("flink.operator-id", operatorId)
-                    .put("total-data-files", "1")
-                    .put("total-delete-files", "0")
-                    .put("total-equality-deletes", "0")
-                    .put("total-files-size", "0")
-                    .put("total-position-deletes", "0")
-                    .put("total-records", "42")
-                    .build());
+            ImmutableMap.<String, String>builder()
+                .put("replace-partitions", "true")
+                .put("added-data-files", "1")
+                .put("added-records", "42")
+                .put("changed-partition-count", "1")
+                .put("flink.job-id", jobId)
+                .put("flink.max-committed-checkpoint-id", 
String.valueOf(checkpointId + 1))
+                .put("flink.operator-id", operatorId)
+                .put("total-data-files", "1")
+                .put("total-delete-files", "0")
+                .put("total-equality-deletes", "0")
+                .put("total-files-size", "0")
+                .put("total-position-deletes", "0")
+                .put("total-records", "42")
+                .build());
+  }
+
+  interface CommitHook extends Serializable {
+    void beforeCommit();
+
+    void duringCommit();
+
+    void afterCommit();
+  }
+
+  static class FailBeforeAndAfterCommit implements CommitHook {
+
+    static boolean failedBeforeCommit;
+    static boolean failedDuringCommit;
+    static boolean failedAfterCommit;
+
+    FailBeforeAndAfterCommit() {
+      reset();
+    }
+
+    @Override
+    public void beforeCommit() {
+      if (!failedBeforeCommit) {
+        failedBeforeCommit = true;
+        throw new RuntimeException("Failing before commit");
+      }
+    }
+
+    @Override
+    public void duringCommit() {
+      if (!failedDuringCommit) {
+        failedDuringCommit = true;
+        throw new RuntimeException("Failing during commit");
+      }
+    }
+
+    @Override
+    public void afterCommit() {
+      if (!failedAfterCommit) {
+        failedAfterCommit = true;
+        throw new RuntimeException("Failing before commit");
+      }
+    }
+
+    static void reset() {
+      failedBeforeCommit = false;
+      failedDuringCommit = false;
+      failedAfterCommit = false;
+    }
+  }
+
+  static class CommitHookEnabledDynamicCommitter extends DynamicCommitter {
+    private final CommitHook commitHook;
+
+    CommitHookEnabledDynamicCommitter(
+        CommitHook commitHook,
+        Catalog catalog,
+        Map<String, String> snapshotProperties,
+        boolean replacePartitions,
+        int workerPoolSize,
+        String sinkId,
+        DynamicCommitterMetrics committerMetrics) {
+      super(
+          catalog, snapshotProperties, replacePartitions, workerPoolSize, 
sinkId, committerMetrics);
+      this.commitHook = commitHook;
+    }
+
+    @Override
+    public void commit(Collection<CommitRequest<DynamicCommittable>> 
commitRequests)
+        throws IOException, InterruptedException {
+      commitHook.beforeCommit();
+      super.commit(commitRequests);
+      commitHook.afterCommit();
+    }
+
+    @Override
+    void commitOperation(
+        Table table,
+        String branch,
+        SnapshotUpdate<?> operation,
+        CommitSummary summary,
+        String description,
+        String newFlinkJobId,
+        String operatorId,
+        long checkpointId) {
+      super.commitOperation(
+          table, branch, operation, summary, description, newFlinkJobId, 
operatorId, checkpointId);
+      commitHook.duringCommit();
+    }
   }
 }
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
index b61e297cc1..20fae212b4 100644
--- 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
@@ -25,7 +25,6 @@ import static org.assertj.core.api.Assertions.fail;
 import java.io.IOException;
 import java.io.Serializable;
 import java.time.Duration;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -56,7 +55,6 @@ import org.apache.iceberg.DistributionMode;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
-import org.apache.iceberg.SnapshotUpdate;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.TableIdentifier;
@@ -69,8 +67,9 @@ import org.apache.iceberg.flink.FlinkWriteConf;
 import org.apache.iceberg.flink.MiniFlinkClusterExtension;
 import org.apache.iceberg.flink.SimpleDataUtil;
 import org.apache.iceberg.flink.TestHelpers;
-import org.apache.iceberg.flink.sink.CommitSummary;
 import org.apache.iceberg.flink.sink.TestFlinkIcebergSinkBase;
+import org.apache.iceberg.flink.sink.dynamic.TestDynamicCommitter.CommitHook;
+import 
org.apache.iceberg.flink.sink.dynamic.TestDynamicCommitter.FailBeforeAndAfterCommit;
 import org.apache.iceberg.inmemory.InMemoryInputFile;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -528,7 +527,8 @@ class TestDynamicIcebergSink extends 
TestFlinkIcebergSinkBase {
     // Configure a Restart strategy to allow recovery
     Configuration configuration = new Configuration();
     configuration.set(RestartStrategyOptions.RESTART_STRATEGY, "fixed-delay");
-    
configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 
2);
+    // Allow max 3 retries to make up for the three failures we are simulating 
here
+    
configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 
3);
     
configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY, 
Duration.ZERO);
     env.configure(configuration);
 
@@ -539,14 +539,15 @@ class TestDynamicIcebergSink extends 
TestFlinkIcebergSinkBase {
             new DynamicIcebergDataImpl(
                 SimpleDataUtil.SCHEMA, "t2", "main", 
PartitionSpec.unpartitioned()));
 
-    FailBeforeAndAfterCommit.reset();
     final CommitHook commitHook = new FailBeforeAndAfterCommit();
     assertThat(FailBeforeAndAfterCommit.failedBeforeCommit).isFalse();
+    assertThat(FailBeforeAndAfterCommit.failedDuringCommit).isFalse();
     assertThat(FailBeforeAndAfterCommit.failedAfterCommit).isFalse();
 
     executeDynamicSink(rows, env, true, 1, commitHook);
 
     assertThat(FailBeforeAndAfterCommit.failedBeforeCommit).isTrue();
+    assertThat(FailBeforeAndAfterCommit.failedDuringCommit).isTrue();
     assertThat(FailBeforeAndAfterCommit.failedAfterCommit).isTrue();
   }
 
@@ -569,44 +570,6 @@ class TestDynamicIcebergSink extends 
TestFlinkIcebergSinkBase {
     executeDynamicSink(rows, env, true, 1, commitHook);
   }
 
-  interface CommitHook extends Serializable {
-    void beforeCommit();
-
-    void duringCommit();
-
-    void afterCommit();
-  }
-
-  private static class FailBeforeAndAfterCommit implements CommitHook {
-
-    static boolean failedBeforeCommit;
-    static boolean failedAfterCommit;
-
-    @Override
-    public void beforeCommit() {
-      if (!failedBeforeCommit) {
-        failedBeforeCommit = true;
-        throw new RuntimeException("Failing before commit");
-      }
-    }
-
-    @Override
-    public void duringCommit() {}
-
-    @Override
-    public void afterCommit() {
-      if (!failedAfterCommit) {
-        failedAfterCommit = true;
-        throw new RuntimeException("Failing before commit");
-      }
-    }
-
-    static void reset() {
-      failedBeforeCommit = false;
-      failedAfterCommit = false;
-    }
-  }
-
   private static class AppendRightBeforeCommit implements CommitHook {
 
     final String tableIdentifier;
@@ -734,8 +697,7 @@ class TestDynamicIcebergSink extends 
TestFlinkIcebergSinkBase {
 
     @Override
     public Committer<DynamicCommittable> createCommitter(CommitterInitContext 
context) {
-      //      return super.createCommitter(context);
-      return new CommitHookEnabledDynamicCommitter(
+      return new TestDynamicCommitter.CommitHookEnabledDynamicCommitter(
           commitHook,
           CATALOG_EXTENSION.catalogLoader().loadCatalog(),
           Collections.emptyMap(),
@@ -746,46 +708,6 @@ class TestDynamicIcebergSink extends 
TestFlinkIcebergSinkBase {
     }
   }
 
-  static class CommitHookEnabledDynamicCommitter extends DynamicCommitter {
-    private final CommitHook commitHook;
-
-    CommitHookEnabledDynamicCommitter(
-        CommitHook commitHook,
-        Catalog catalog,
-        Map<String, String> snapshotProperties,
-        boolean replacePartitions,
-        int workerPoolSize,
-        String sinkId,
-        DynamicCommitterMetrics committerMetrics) {
-      super(
-          catalog, snapshotProperties, replacePartitions, workerPoolSize, 
sinkId, committerMetrics);
-      this.commitHook = commitHook;
-    }
-
-    @Override
-    public void commit(Collection<CommitRequest<DynamicCommittable>> 
commitRequests)
-        throws IOException, InterruptedException {
-      commitHook.beforeCommit();
-      super.commit(commitRequests);
-      commitHook.afterCommit();
-    }
-
-    @Override
-    void commitOperation(
-        Table table,
-        String branch,
-        SnapshotUpdate<?> operation,
-        CommitSummary summary,
-        String description,
-        String newFlinkJobId,
-        String operatorId,
-        long checkpointId) {
-      commitHook.duringCommit();
-      super.commitOperation(
-          table, branch, operation, summary, description, newFlinkJobId, 
operatorId, checkpointId);
-    }
-  }
-
   private void verifyResults(List<DynamicIcebergDataImpl> dynamicData) throws 
IOException {
     // Calculate the expected result
     Map<Tuple2<String, String>, List<RowData>> expectedData = 
Maps.newHashMap();
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java
index e58066aac6..54d506b663 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java
@@ -274,26 +274,25 @@ class DynamicCommitter implements 
Committer<DynamicCommittable> {
       CommitSummary summary,
       String newFlinkJobId,
       String operatorId) {
-    for (Map.Entry<Long, List<WriteResult>> e : pendingResults.entrySet()) {
-      // We don't commit the merged result into a single transaction because 
for the sequential
-      // transaction txn1 and txn2, the equality-delete files of txn2 are 
required to be applied
-      // to data files from txn1. Committing the merged one will lead to the 
incorrect delete
-      // semantic.
-      for (WriteResult result : e.getValue()) {
-        ReplacePartitions dynamicOverwrite =
-            table.newReplacePartitions().scanManifestsWith(workerPool);
+    // Iceberg tables are unsorted. So the order of the append data does not 
matter.
+    // Hence, we commit everything in one snapshot.
+    ReplacePartitions dynamicOverwrite = 
table.newReplacePartitions().scanManifestsWith(workerPool);
+
+    for (List<WriteResult> writeResults : pendingResults.values()) {
+      for (WriteResult result : writeResults) {
         Arrays.stream(result.dataFiles()).forEach(dynamicOverwrite::addFile);
-        commitOperation(
-            table,
-            branch,
-            dynamicOverwrite,
-            summary,
-            "dynamic partition overwrite",
-            newFlinkJobId,
-            operatorId,
-            e.getKey());
       }
     }
+
+    commitOperation(
+        table,
+        branch,
+        dynamicOverwrite,
+        summary,
+        "dynamic partition overwrite",
+        newFlinkJobId,
+        operatorId,
+        pendingResults.lastKey());
   }
 
   private void commitDeltaTxn(
@@ -304,11 +303,11 @@ class DynamicCommitter implements 
Committer<DynamicCommittable> {
       String newFlinkJobId,
       String operatorId) {
     for (Map.Entry<Long, List<WriteResult>> e : pendingResults.entrySet()) {
-      // We don't commit the merged result into a single transaction because 
for the sequential
-      // transaction txn1 and txn2, the equality-delete files of txn2 are 
required to be applied
-      // to data files from txn1. Committing the merged one will lead to the 
incorrect delete
-      // semantic.
-      for (WriteResult result : e.getValue()) {
+      long checkpointId = e.getKey();
+      List<WriteResult> writeResults = e.getValue();
+
+      RowDelta rowDelta = table.newRowDelta().scanManifestsWith(workerPool);
+      for (WriteResult result : writeResults) {
         // Row delta validations are not needed for streaming changes that 
write equality deletes.
         // Equality deletes are applied to data in all previous sequence 
numbers, so retries may
         // push deletes further in the future, but do not affect correctness. 
Position deletes
@@ -316,13 +315,17 @@ class DynamicCommitter implements 
Committer<DynamicCommittable> {
         // being added in this commit. There is no way for data files added 
along with the delete
         // files to be concurrently removed, so there is no need to validate 
the files referenced by
         // the position delete files that are being committed.
-        RowDelta rowDelta = table.newRowDelta().scanManifestsWith(workerPool);
-
         Arrays.stream(result.dataFiles()).forEach(rowDelta::addRows);
         Arrays.stream(result.deleteFiles()).forEach(rowDelta::addDeletes);
-        commitOperation(
-            table, branch, rowDelta, summary, "rowDelta", newFlinkJobId, 
operatorId, e.getKey());
       }
+
+      // Every Flink checkpoint contains a set of independent changes which 
can be committed
+      // together. While it is technically feasible to combine append-only 
data across checkpoints,
+      // for the sake of simplicity, we do not implement this (premature) 
optimization. Multiple
+      // pending checkpoints here are very rare to occur, i.e. only with very 
short checkpoint
+      // intervals or when concurrent checkpointing is enabled.
+      commitOperation(
+          table, branch, rowDelta, summary, "rowDelta", newFlinkJobId, 
operatorId, checkpointId);
     }
   }
 
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
index 99a5465362..f5387aee88 100644
--- 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
@@ -19,8 +19,13 @@
 package org.apache.iceberg.flink.sink.dynamic;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assertions.fail;
 
+import java.io.IOException;
+import java.io.Serializable;
 import java.nio.ByteBuffer;
+import java.util.Collection;
 import java.util.Map;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.connector.sink2.Committer.CommitRequest;
@@ -30,19 +35,24 @@ import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileMetadata;
 import org.apache.iceberg.Metrics;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotUpdate;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.flink.HadoopCatalogExtension;
+import org.apache.iceberg.flink.sink.CommitSummary;
 import org.apache.iceberg.io.WriteResult;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.assertj.core.api.ThrowableAssert.ThrowingCallable;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
@@ -74,6 +84,39 @@ class TestDynamicCommitter {
                   ))
           .build();
 
+  private static final DataFile DATA_FILE_2 =
+      DataFiles.builder(PartitionSpec.unpartitioned())
+          .withPath("/path/to/data-2.parquet")
+          .withFileSizeInBytes(0)
+          .withMetrics(
+              new Metrics(
+                  24L,
+                  null, // no column sizes
+                  ImmutableMap.of(1, 3L), // value count
+                  ImmutableMap.of(1, 0L), // null count
+                  null,
+                  ImmutableMap.of(1, ByteBuffer.allocate(1)), // lower bounds
+                  ImmutableMap.of(1, ByteBuffer.allocate(1)) // upper bounds
+                  ))
+          .build();
+
+  private static final DeleteFile DELETE_FILE =
+      FileMetadata.deleteFileBuilder(PartitionSpec.unpartitioned())
+          .withPath("/path/to/data-3.parquet")
+          .withFileSizeInBytes(0)
+          .withMetrics(
+              new Metrics(
+                  24L,
+                  null, // no column sizes
+                  ImmutableMap.of(1, 3L), // value count
+                  ImmutableMap.of(1, 0L), // null count
+                  null,
+                  ImmutableMap.of(1, ByteBuffer.allocate(1)), // lower bounds
+                  ImmutableMap.of(1, ByteBuffer.allocate(1)) // upper bounds
+                  ))
+          .ofPositionDeletes()
+          .build();
+
   @BeforeEach
   void before() {
     catalog = CATALOG_EXTENSION.catalog();
@@ -162,60 +205,57 @@ class TestDynamicCommitter {
     Snapshot first = Iterables.getFirst(table1.snapshots(), null);
     assertThat(first.summary())
         .containsAllEntriesOf(
-            (Map)
-                ImmutableMap.builder()
-                    .put("added-data-files", "1")
-                    .put("added-records", "42")
-                    .put("changed-partition-count", "1")
-                    .put("flink.job-id", jobId)
-                    .put("flink.max-committed-checkpoint-id", "" + 
checkpointId)
-                    .put("flink.operator-id", operatorId)
-                    .put("total-data-files", "1")
-                    .put("total-delete-files", "0")
-                    .put("total-equality-deletes", "0")
-                    .put("total-files-size", "0")
-                    .put("total-position-deletes", "0")
-                    .put("total-records", "42")
-                    .build());
+            ImmutableMap.<String, String>builder()
+                .put("added-data-files", "1")
+                .put("added-records", "42")
+                .put("changed-partition-count", "1")
+                .put("flink.job-id", jobId)
+                .put("flink.max-committed-checkpoint-id", "" + checkpointId)
+                .put("flink.operator-id", operatorId)
+                .put("total-data-files", "1")
+                .put("total-delete-files", "0")
+                .put("total-equality-deletes", "0")
+                .put("total-files-size", "0")
+                .put("total-position-deletes", "0")
+                .put("total-records", "42")
+                .build());
     Snapshot second = Iterables.get(table1.snapshots(), 1, null);
     assertThat(second.summary())
         .containsAllEntriesOf(
-            (Map)
-                ImmutableMap.builder()
-                    .put("added-data-files", "1")
-                    .put("added-records", "42")
-                    .put("changed-partition-count", "1")
-                    .put("flink.job-id", jobId)
-                    .put("flink.max-committed-checkpoint-id", "" + 
checkpointId)
-                    .put("flink.operator-id", operatorId)
-                    .put("total-data-files", "1")
-                    .put("total-delete-files", "0")
-                    .put("total-equality-deletes", "0")
-                    .put("total-files-size", "0")
-                    .put("total-position-deletes", "0")
-                    .put("total-records", "42")
-                    .build());
+            ImmutableMap.<String, String>builder()
+                .put("added-data-files", "1")
+                .put("added-records", "42")
+                .put("changed-partition-count", "1")
+                .put("flink.job-id", jobId)
+                .put("flink.max-committed-checkpoint-id", "" + checkpointId)
+                .put("flink.operator-id", operatorId)
+                .put("total-data-files", "1")
+                .put("total-delete-files", "0")
+                .put("total-equality-deletes", "0")
+                .put("total-files-size", "0")
+                .put("total-position-deletes", "0")
+                .put("total-records", "42")
+                .build());
 
     table2.refresh();
     assertThat(table2.snapshots()).hasSize(1);
     Snapshot third = Iterables.getFirst(table2.snapshots(), null);
     assertThat(third.summary())
         .containsAllEntriesOf(
-            (Map)
-                ImmutableMap.builder()
-                    .put("added-data-files", "1")
-                    .put("added-records", "42")
-                    .put("changed-partition-count", "1")
-                    .put("flink.job-id", jobId)
-                    .put("flink.max-committed-checkpoint-id", "" + 
checkpointId)
-                    .put("flink.operator-id", operatorId)
-                    .put("total-data-files", "1")
-                    .put("total-delete-files", "0")
-                    .put("total-equality-deletes", "0")
-                    .put("total-files-size", "0")
-                    .put("total-position-deletes", "0")
-                    .put("total-records", "42")
-                    .build());
+            ImmutableMap.<String, String>builder()
+                .put("added-data-files", "1")
+                .put("added-records", "42")
+                .put("changed-partition-count", "1")
+                .put("flink.job-id", jobId)
+                .put("flink.max-committed-checkpoint-id", "" + checkpointId)
+                .put("flink.operator-id", operatorId)
+                .put("total-data-files", "1")
+                .put("total-delete-files", "0")
+                .put("total-equality-deletes", "0")
+                .put("total-files-size", "0")
+                .put("total-position-deletes", "0")
+                .put("total-records", "42")
+                .build());
   }
 
   @Test
@@ -277,21 +317,276 @@ class TestDynamicCommitter {
     Snapshot first = Iterables.getFirst(table1.snapshots(), null);
     assertThat(first.summary())
         .containsAllEntriesOf(
-            (Map)
-                ImmutableMap.builder()
-                    .put("added-data-files", "1")
-                    .put("added-records", "42")
-                    .put("changed-partition-count", "1")
-                    .put("flink.job-id", jobId)
-                    .put("flink.max-committed-checkpoint-id", "" + 
checkpointId)
-                    .put("flink.operator-id", operatorId)
-                    .put("total-data-files", "1")
-                    .put("total-delete-files", "0")
-                    .put("total-equality-deletes", "0")
-                    .put("total-files-size", "0")
-                    .put("total-position-deletes", "0")
-                    .put("total-records", "42")
-                    .build());
+            ImmutableMap.<String, String>builder()
+                .put("added-data-files", "1")
+                .put("added-records", "42")
+                .put("changed-partition-count", "1")
+                .put("flink.job-id", jobId)
+                .put("flink.max-committed-checkpoint-id", "" + checkpointId)
+                .put("flink.operator-id", operatorId)
+                .put("total-data-files", "1")
+                .put("total-delete-files", "0")
+                .put("total-equality-deletes", "0")
+                .put("total-files-size", "0")
+                .put("total-position-deletes", "0")
+                .put("total-records", "42")
+                .build());
+  }
+
+  @Test
+  void testTableBranchAtomicCommitForAppendOnlyData() throws Exception {
+    Table table = catalog.loadTable(TableIdentifier.of(TABLE1));
+    assertThat(table.snapshots()).isEmpty();
+
+    DynamicWriteResultAggregator aggregator =
+        new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader());
+    OneInputStreamOperatorTestHarness aggregatorHarness =
+        new OneInputStreamOperatorTestHarness(aggregator);
+    aggregatorHarness.open();
+
+    WriteTarget writeTarget1 =
+        new WriteTarget(TABLE1, "branch1", 42, 0, true, Sets.newHashSet(1, 2));
+    // writeTarget2 has a different schema
+    WriteTarget writeTarget2 = new WriteTarget(TABLE1, "branch1", 23, 0, true, 
Sets.newHashSet());
+    // Different branch for writeTarget3
+    WriteTarget writeTarget3 = new WriteTarget(TABLE1, "branch2", 23, 0, true, 
Sets.newHashSet());
+
+    WriteResult writeResult1 = 
WriteResult.builder().addDataFiles(DATA_FILE).build();
+    WriteResult writeResult2 = 
WriteResult.builder().addDataFiles(DATA_FILE_2).build();
+
+    final String jobId = JobID.generate().toHexString();
+    final String operatorId = new OperatorID().toHexString();
+    final int checkpointId1 = 1;
+    final int checkpointId2 = 2;
+
+    byte[] deltaManifest1 =
+        aggregator.writeToManifest(
+            writeTarget1,
+            Sets.newHashSet(new DynamicWriteResult(writeTarget1, 
writeResult1)),
+            checkpointId1);
+
+    CommitRequest<DynamicCommittable> commitRequest1 =
+        new MockCommitRequest<>(
+            new DynamicCommittable(writeTarget1, deltaManifest1, jobId, 
operatorId, checkpointId1));
+
+    byte[] deltaManifest2 =
+        aggregator.writeToManifest(
+            writeTarget2,
+            Sets.newHashSet(new DynamicWriteResult(writeTarget2, 
writeResult2)),
+            checkpointId1);
+
+    CommitRequest<DynamicCommittable> commitRequest2 =
+        new MockCommitRequest<>(
+            new DynamicCommittable(writeTarget2, deltaManifest2, jobId, 
operatorId, checkpointId1));
+
+    byte[] deltaManifest3 =
+        aggregator.writeToManifest(
+            writeTarget3,
+            Sets.newHashSet(new DynamicWriteResult(writeTarget3, 
writeResult2)),
+            checkpointId2);
+
+    CommitRequest<DynamicCommittable> commitRequest3 =
+        new MockCommitRequest<>(
+            new DynamicCommittable(writeTarget3, deltaManifest3, jobId, 
operatorId, checkpointId2));
+
+    boolean overwriteMode = false;
+    int workerPoolSize = 1;
+    String sinkId = "sinkId";
+    UnregisteredMetricsGroup metricGroup = new UnregisteredMetricsGroup();
+    DynamicCommitterMetrics committerMetrics = new 
DynamicCommitterMetrics(metricGroup);
+    DynamicCommitter dynamicCommitter =
+        new DynamicCommitter(
+            CATALOG_EXTENSION.catalog(),
+            Maps.newHashMap(),
+            overwriteMode,
+            workerPoolSize,
+            sinkId,
+            committerMetrics);
+
+    dynamicCommitter.commit(Sets.newHashSet(commitRequest1, commitRequest2, 
commitRequest3));
+
+    table.refresh();
+    // Two committables, one for each snapshot / table / branch.
+    assertThat(table.snapshots()).hasSize(2);
+
+    Snapshot snapshot1 = Iterables.getFirst(table.snapshots(), null);
+    
assertThat(snapshot1.snapshotId()).isEqualTo(table.refs().get("branch1").snapshotId());
+    assertThat(snapshot1.summary())
+        .containsAllEntriesOf(
+            ImmutableMap.<String, String>builder()
+                .put("added-data-files", "2")
+                .put("added-records", "66")
+                .put("changed-partition-count", "1")
+                .put("flink.job-id", jobId)
+                .put("flink.max-committed-checkpoint-id", "" + checkpointId1)
+                .put("flink.operator-id", operatorId)
+                .put("total-data-files", "2")
+                .put("total-delete-files", "0")
+                .put("total-equality-deletes", "0")
+                .put("total-files-size", "0")
+                .put("total-position-deletes", "0")
+                .put("total-records", "66")
+                .build());
+
+    Snapshot snapshot2 = Iterables.get(table.snapshots(), 1);
+    
assertThat(snapshot2.snapshotId()).isEqualTo(table.refs().get("branch2").snapshotId());
+    assertThat(snapshot2.summary())
+        .containsAllEntriesOf(
+            ImmutableMap.<String, String>builder()
+                .put("added-data-files", "1")
+                .put("added-records", "24")
+                .put("changed-partition-count", "1")
+                .put("flink.job-id", jobId)
+                .put("flink.max-committed-checkpoint-id", "" + checkpointId2)
+                .put("flink.operator-id", operatorId)
+                .put("total-data-files", "1")
+                .put("total-delete-files", "0")
+                .put("total-equality-deletes", "0")
+                .put("total-files-size", "0")
+                .put("total-position-deletes", "0")
+                .put("total-records", "24")
+                .build());
+  }
+
+  @Test
+  void testTableBranchAtomicCommitWithFailures() throws Exception {
+    Table table = catalog.loadTable(TableIdentifier.of(TABLE1));
+    assertThat(table.snapshots()).isEmpty();
+
+    DynamicWriteResultAggregator aggregator =
+        new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader());
+    OneInputStreamOperatorTestHarness aggregatorHarness =
+        new OneInputStreamOperatorTestHarness(aggregator);
+    aggregatorHarness.open();
+
+    WriteTarget writeTarget1 = new WriteTarget(TABLE1, "branch", 42, 0, false, 
Sets.newHashSet());
+    // writeTarget2 has a different schema
+    WriteTarget writeTarget2 = new WriteTarget(TABLE1, "branch", 23, 0, false, 
Sets.newHashSet());
+    WriteTarget writeTarget3 = new WriteTarget(TABLE1, "branch", 23, 0, false, 
Sets.newHashSet());
+
+    WriteResult writeResult1 = 
WriteResult.builder().addDataFiles(DATA_FILE).build();
+    WriteResult writeResult2 = 
WriteResult.builder().addDeleteFiles(DELETE_FILE).build();
+    WriteResult writeResult3 = 
WriteResult.builder().addDataFiles(DATA_FILE).build();
+
+    final String jobId = JobID.generate().toHexString();
+    final String operatorId = new OperatorID().toHexString();
+    final int checkpointId1 = 1;
+    final int checkpointId2 = 2;
+
+    byte[] deltaManifest1 =
+        aggregator.writeToManifest(
+            writeTarget1,
+            Sets.newHashSet(new DynamicWriteResult(writeTarget1, 
writeResult1)),
+            checkpointId1);
+
+    CommitRequest<DynamicCommittable> commitRequest1 =
+        new MockCommitRequest<>(
+            new DynamicCommittable(writeTarget1, deltaManifest1, jobId, 
operatorId, checkpointId1));
+
+    byte[] deltaManifest2 =
+        aggregator.writeToManifest(
+            writeTarget2,
+            Sets.newHashSet(new DynamicWriteResult(writeTarget2, 
writeResult2)),
+            checkpointId2);
+
+    CommitRequest<DynamicCommittable> commitRequest2 =
+        new MockCommitRequest<>(
+            new DynamicCommittable(writeTarget2, deltaManifest2, jobId, 
operatorId, checkpointId2));
+
+    byte[] deltaManifest3 =
+        aggregator.writeToManifest(
+            writeTarget3,
+            Sets.newHashSet(new DynamicWriteResult(writeTarget3, 
writeResult3)),
+            checkpointId2);
+
+    CommitRequest<DynamicCommittable> commitRequest3 =
+        new MockCommitRequest<>(
+            new DynamicCommittable(writeTarget3, deltaManifest3, jobId, 
operatorId, checkpointId2));
+
+    boolean overwriteMode = false;
+    int workerPoolSize = 1;
+    String sinkId = "sinkId";
+    UnregisteredMetricsGroup metricGroup = new UnregisteredMetricsGroup();
+    DynamicCommitterMetrics committerMetrics = new 
DynamicCommitterMetrics(metricGroup);
+
+    // Use special hook to fail during various states of the commit operation
+    CommitHook commitHook = new FailBeforeAndAfterCommit();
+    DynamicCommitter dynamicCommitter =
+        new CommitHookEnabledDynamicCommitter(
+            commitHook,
+            CATALOG_EXTENSION.catalog(),
+            Maps.newHashMap(),
+            overwriteMode,
+            workerPoolSize,
+            sinkId,
+            committerMetrics);
+
+    ThrowingCallable commitExecutable =
+        () ->
+            dynamicCommitter.commit(
+                Sets.newHashSet(commitRequest1, commitRequest2, 
commitRequest3));
+
+    // First fail pre-commit
+    assertThatThrownBy(commitExecutable);
+    assertThat(FailBeforeAndAfterCommit.failedBeforeCommit).isTrue();
+
+    // Second fail during commit
+    assertThatThrownBy(commitExecutable);
+    assertThat(FailBeforeAndAfterCommit.failedDuringCommit).isTrue();
+
+    // Third fail after commit
+    assertThatThrownBy(commitExecutable);
+    assertThat(FailBeforeAndAfterCommit.failedAfterCommit).isTrue();
+
+    // Finally commit must go through, although it is a NOOP because the third 
failure is directly
+    // after the commit finished.
+    try {
+      commitExecutable.call();
+    } catch (Throwable e) {
+      fail("Should not have thrown an exception");
+    }
+
+    table.refresh();
+    // Three committables, but only two snapshots! WriteResults from different 
checkpoints are not
+    // getting
+    // combined due to one writeResult2 containing a delete file.
+    assertThat(table.snapshots()).hasSize(2);
+
+    Snapshot snapshot1 = Iterables.getFirst(table.snapshots(), null);
+    assertThat(snapshot1.summary())
+        .containsAllEntriesOf(
+            ImmutableMap.<String, String>builder()
+                .put("added-data-files", "1")
+                .put("added-records", "42")
+                .put("changed-partition-count", "1")
+                .put("flink.job-id", jobId)
+                .put("flink.max-committed-checkpoint-id", "" + checkpointId1)
+                .put("flink.operator-id", operatorId)
+                .put("total-data-files", "1")
+                .put("total-delete-files", "0")
+                .put("total-equality-deletes", "0")
+                .put("total-files-size", "0")
+                .put("total-position-deletes", "0")
+                .put("total-records", "42")
+                .build());
+
+    Snapshot snapshot2 = Iterables.get(table.snapshots(), 1);
+    assertThat(snapshot2.summary())
+        .containsAllEntriesOf(
+            ImmutableMap.<String, String>builder()
+                .put("added-data-files", "1")
+                .put("added-records", "42")
+                .put("changed-partition-count", "1")
+                .put("flink.job-id", jobId)
+                .put("flink.max-committed-checkpoint-id", "" + checkpointId2)
+                .put("flink.operator-id", operatorId)
+                .put("total-data-files", "2")
+                .put("total-delete-files", "1")
+                .put("total-equality-deletes", "0")
+                .put("total-files-size", "0")
+                .put("total-position-deletes", "24")
+                .put("total-records", "84")
+                .build());
   }
 
   @Test
@@ -361,21 +656,109 @@ class TestDynamicCommitter {
     Snapshot latestSnapshot = Iterables.getLast(table1.snapshots());
     assertThat(latestSnapshot.summary())
         .containsAllEntriesOf(
-            (Map)
-                ImmutableMap.builder()
-                    .put("replace-partitions", "true")
-                    .put("added-data-files", "1")
-                    .put("added-records", "42")
-                    .put("changed-partition-count", "1")
-                    .put("flink.job-id", jobId)
-                    .put("flink.max-committed-checkpoint-id", 
String.valueOf(checkpointId + 1))
-                    .put("flink.operator-id", operatorId)
-                    .put("total-data-files", "1")
-                    .put("total-delete-files", "0")
-                    .put("total-equality-deletes", "0")
-                    .put("total-files-size", "0")
-                    .put("total-position-deletes", "0")
-                    .put("total-records", "42")
-                    .build());
+            ImmutableMap.<String, String>builder()
+                .put("replace-partitions", "true")
+                .put("added-data-files", "1")
+                .put("added-records", "42")
+                .put("changed-partition-count", "1")
+                .put("flink.job-id", jobId)
+                .put("flink.max-committed-checkpoint-id", 
String.valueOf(checkpointId + 1))
+                .put("flink.operator-id", operatorId)
+                .put("total-data-files", "1")
+                .put("total-delete-files", "0")
+                .put("total-equality-deletes", "0")
+                .put("total-files-size", "0")
+                .put("total-position-deletes", "0")
+                .put("total-records", "42")
+                .build());
+  }
+
+  interface CommitHook extends Serializable {
+    void beforeCommit();
+
+    void duringCommit();
+
+    void afterCommit();
+  }
+
+  static class FailBeforeAndAfterCommit implements CommitHook {
+
+    static boolean failedBeforeCommit;
+    static boolean failedDuringCommit;
+    static boolean failedAfterCommit;
+
+    FailBeforeAndAfterCommit() {
+      reset();
+    }
+
+    @Override
+    public void beforeCommit() {
+      if (!failedBeforeCommit) {
+        failedBeforeCommit = true;
+        throw new RuntimeException("Failing before commit");
+      }
+    }
+
+    @Override
+    public void duringCommit() {
+      if (!failedDuringCommit) {
+        failedDuringCommit = true;
+        throw new RuntimeException("Failing during commit");
+      }
+    }
+
+    @Override
+    public void afterCommit() {
+      if (!failedAfterCommit) {
+        failedAfterCommit = true;
+        throw new RuntimeException("Failing before commit");
+      }
+    }
+
+    static void reset() {
+      failedBeforeCommit = false;
+      failedDuringCommit = false;
+      failedAfterCommit = false;
+    }
+  }
+
+  static class CommitHookEnabledDynamicCommitter extends DynamicCommitter {
+    private final CommitHook commitHook;
+
+    CommitHookEnabledDynamicCommitter(
+        CommitHook commitHook,
+        Catalog catalog,
+        Map<String, String> snapshotProperties,
+        boolean replacePartitions,
+        int workerPoolSize,
+        String sinkId,
+        DynamicCommitterMetrics committerMetrics) {
+      super(
+          catalog, snapshotProperties, replacePartitions, workerPoolSize, 
sinkId, committerMetrics);
+      this.commitHook = commitHook;
+    }
+
+    @Override
+    public void commit(Collection<CommitRequest<DynamicCommittable>> 
commitRequests)
+        throws IOException, InterruptedException {
+      commitHook.beforeCommit();
+      super.commit(commitRequests);
+      commitHook.afterCommit();
+    }
+
+    @Override
+    void commitOperation(
+        Table table,
+        String branch,
+        SnapshotUpdate<?> operation,
+        CommitSummary summary,
+        String description,
+        String newFlinkJobId,
+        String operatorId,
+        long checkpointId) {
+      super.commitOperation(
+          table, branch, operation, summary, description, newFlinkJobId, 
operatorId, checkpointId);
+      commitHook.duringCommit();
+    }
   }
 }
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
index b61e297cc1..20fae212b4 100644
--- 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
@@ -25,7 +25,6 @@ import static org.assertj.core.api.Assertions.fail;
 import java.io.IOException;
 import java.io.Serializable;
 import java.time.Duration;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -56,7 +55,6 @@ import org.apache.iceberg.DistributionMode;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
-import org.apache.iceberg.SnapshotUpdate;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.TableIdentifier;
@@ -69,8 +67,9 @@ import org.apache.iceberg.flink.FlinkWriteConf;
 import org.apache.iceberg.flink.MiniFlinkClusterExtension;
 import org.apache.iceberg.flink.SimpleDataUtil;
 import org.apache.iceberg.flink.TestHelpers;
-import org.apache.iceberg.flink.sink.CommitSummary;
 import org.apache.iceberg.flink.sink.TestFlinkIcebergSinkBase;
+import org.apache.iceberg.flink.sink.dynamic.TestDynamicCommitter.CommitHook;
+import 
org.apache.iceberg.flink.sink.dynamic.TestDynamicCommitter.FailBeforeAndAfterCommit;
 import org.apache.iceberg.inmemory.InMemoryInputFile;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -528,7 +527,8 @@ class TestDynamicIcebergSink extends 
TestFlinkIcebergSinkBase {
     // Configure a Restart strategy to allow recovery
     Configuration configuration = new Configuration();
     configuration.set(RestartStrategyOptions.RESTART_STRATEGY, "fixed-delay");
-    
configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 
2);
+    // Allow max 3 retries to make up for the three failures we are simulating 
here
+    
configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 
3);
     
configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY, 
Duration.ZERO);
     env.configure(configuration);
 
@@ -539,14 +539,15 @@ class TestDynamicIcebergSink extends 
TestFlinkIcebergSinkBase {
             new DynamicIcebergDataImpl(
                 SimpleDataUtil.SCHEMA, "t2", "main", 
PartitionSpec.unpartitioned()));
 
-    FailBeforeAndAfterCommit.reset();
     final CommitHook commitHook = new FailBeforeAndAfterCommit();
     assertThat(FailBeforeAndAfterCommit.failedBeforeCommit).isFalse();
+    assertThat(FailBeforeAndAfterCommit.failedDuringCommit).isFalse();
     assertThat(FailBeforeAndAfterCommit.failedAfterCommit).isFalse();
 
     executeDynamicSink(rows, env, true, 1, commitHook);
 
     assertThat(FailBeforeAndAfterCommit.failedBeforeCommit).isTrue();
+    assertThat(FailBeforeAndAfterCommit.failedDuringCommit).isTrue();
     assertThat(FailBeforeAndAfterCommit.failedAfterCommit).isTrue();
   }
 
@@ -569,44 +570,6 @@ class TestDynamicIcebergSink extends 
TestFlinkIcebergSinkBase {
     executeDynamicSink(rows, env, true, 1, commitHook);
   }
 
-  interface CommitHook extends Serializable {
-    void beforeCommit();
-
-    void duringCommit();
-
-    void afterCommit();
-  }
-
-  private static class FailBeforeAndAfterCommit implements CommitHook {
-
-    static boolean failedBeforeCommit;
-    static boolean failedAfterCommit;
-
-    @Override
-    public void beforeCommit() {
-      if (!failedBeforeCommit) {
-        failedBeforeCommit = true;
-        throw new RuntimeException("Failing before commit");
-      }
-    }
-
-    @Override
-    public void duringCommit() {}
-
-    @Override
-    public void afterCommit() {
-      if (!failedAfterCommit) {
-        failedAfterCommit = true;
-        throw new RuntimeException("Failing before commit");
-      }
-    }
-
-    static void reset() {
-      failedBeforeCommit = false;
-      failedAfterCommit = false;
-    }
-  }
-
   private static class AppendRightBeforeCommit implements CommitHook {
 
     final String tableIdentifier;
@@ -734,8 +697,7 @@ class TestDynamicIcebergSink extends 
TestFlinkIcebergSinkBase {
 
     @Override
     public Committer<DynamicCommittable> createCommitter(CommitterInitContext 
context) {
-      //      return super.createCommitter(context);
-      return new CommitHookEnabledDynamicCommitter(
+      return new TestDynamicCommitter.CommitHookEnabledDynamicCommitter(
           commitHook,
           CATALOG_EXTENSION.catalogLoader().loadCatalog(),
           Collections.emptyMap(),
@@ -746,46 +708,6 @@ class TestDynamicIcebergSink extends 
TestFlinkIcebergSinkBase {
     }
   }
 
-  static class CommitHookEnabledDynamicCommitter extends DynamicCommitter {
-    private final CommitHook commitHook;
-
-    CommitHookEnabledDynamicCommitter(
-        CommitHook commitHook,
-        Catalog catalog,
-        Map<String, String> snapshotProperties,
-        boolean replacePartitions,
-        int workerPoolSize,
-        String sinkId,
-        DynamicCommitterMetrics committerMetrics) {
-      super(
-          catalog, snapshotProperties, replacePartitions, workerPoolSize, 
sinkId, committerMetrics);
-      this.commitHook = commitHook;
-    }
-
-    @Override
-    public void commit(Collection<CommitRequest<DynamicCommittable>> 
commitRequests)
-        throws IOException, InterruptedException {
-      commitHook.beforeCommit();
-      super.commit(commitRequests);
-      commitHook.afterCommit();
-    }
-
-    @Override
-    void commitOperation(
-        Table table,
-        String branch,
-        SnapshotUpdate<?> operation,
-        CommitSummary summary,
-        String description,
-        String newFlinkJobId,
-        String operatorId,
-        long checkpointId) {
-      commitHook.duringCommit();
-      super.commitOperation(
-          table, branch, operation, summary, description, newFlinkJobId, 
operatorId, checkpointId);
-    }
-  }
-
   private void verifyResults(List<DynamicIcebergDataImpl> dynamicData) throws 
IOException {
     // Calculate the expected result
     Map<Tuple2<String, String>, List<RowData>> expectedData = 
Maps.newHashMap();
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java
index e58066aac6..54d506b663 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java
@@ -274,26 +274,25 @@ class DynamicCommitter implements 
Committer<DynamicCommittable> {
       CommitSummary summary,
       String newFlinkJobId,
       String operatorId) {
-    for (Map.Entry<Long, List<WriteResult>> e : pendingResults.entrySet()) {
-      // We don't commit the merged result into a single transaction because 
for the sequential
-      // transaction txn1 and txn2, the equality-delete files of txn2 are 
required to be applied
-      // to data files from txn1. Committing the merged one will lead to the 
incorrect delete
-      // semantic.
-      for (WriteResult result : e.getValue()) {
-        ReplacePartitions dynamicOverwrite =
-            table.newReplacePartitions().scanManifestsWith(workerPool);
+    // Iceberg tables are unsorted. So the order of the append data does not 
matter.
+    // Hence, we commit everything in one snapshot.
+    ReplacePartitions dynamicOverwrite = 
table.newReplacePartitions().scanManifestsWith(workerPool);
+
+    for (List<WriteResult> writeResults : pendingResults.values()) {
+      for (WriteResult result : writeResults) {
         Arrays.stream(result.dataFiles()).forEach(dynamicOverwrite::addFile);
-        commitOperation(
-            table,
-            branch,
-            dynamicOverwrite,
-            summary,
-            "dynamic partition overwrite",
-            newFlinkJobId,
-            operatorId,
-            e.getKey());
       }
     }
+
+    commitOperation(
+        table,
+        branch,
+        dynamicOverwrite,
+        summary,
+        "dynamic partition overwrite",
+        newFlinkJobId,
+        operatorId,
+        pendingResults.lastKey());
   }
 
   private void commitDeltaTxn(
@@ -304,11 +303,11 @@ class DynamicCommitter implements 
Committer<DynamicCommittable> {
       String newFlinkJobId,
       String operatorId) {
     for (Map.Entry<Long, List<WriteResult>> e : pendingResults.entrySet()) {
-      // We don't commit the merged result into a single transaction because 
for the sequential
-      // transaction txn1 and txn2, the equality-delete files of txn2 are 
required to be applied
-      // to data files from txn1. Committing the merged one will lead to the 
incorrect delete
-      // semantic.
-      for (WriteResult result : e.getValue()) {
+      long checkpointId = e.getKey();
+      List<WriteResult> writeResults = e.getValue();
+
+      RowDelta rowDelta = table.newRowDelta().scanManifestsWith(workerPool);
+      for (WriteResult result : writeResults) {
         // Row delta validations are not needed for streaming changes that 
write equality deletes.
         // Equality deletes are applied to data in all previous sequence 
numbers, so retries may
         // push deletes further in the future, but do not affect correctness. 
Position deletes
@@ -316,13 +315,17 @@ class DynamicCommitter implements 
Committer<DynamicCommittable> {
         // being added in this commit. There is no way for data files added 
along with the delete
         // files to be concurrently removed, so there is no need to validate 
the files referenced by
         // the position delete files that are being committed.
-        RowDelta rowDelta = table.newRowDelta().scanManifestsWith(workerPool);
-
         Arrays.stream(result.dataFiles()).forEach(rowDelta::addRows);
         Arrays.stream(result.deleteFiles()).forEach(rowDelta::addDeletes);
-        commitOperation(
-            table, branch, rowDelta, summary, "rowDelta", newFlinkJobId, 
operatorId, e.getKey());
       }
+
+      // Every Flink checkpoint contains a set of independent changes which 
can be committed
+      // together. While it is technically feasible to combine append-only 
data across checkpoints,
+      // for the sake of simplicity, we do not implement this (premature) 
optimization. Multiple
+      // pending checkpoints here are very rare to occur, i.e. only with very 
short checkpoint
+      // intervals or when concurrent checkpointing is enabled.
+      commitOperation(
+          table, branch, rowDelta, summary, "rowDelta", newFlinkJobId, 
operatorId, checkpointId);
     }
   }
 
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
index 99a5465362..f5387aee88 100644
--- 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
@@ -19,8 +19,13 @@
 package org.apache.iceberg.flink.sink.dynamic;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assertions.fail;
 
+import java.io.IOException;
+import java.io.Serializable;
 import java.nio.ByteBuffer;
+import java.util.Collection;
 import java.util.Map;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.connector.sink2.Committer.CommitRequest;
@@ -30,19 +35,24 @@ import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileMetadata;
 import org.apache.iceberg.Metrics;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotUpdate;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.flink.HadoopCatalogExtension;
+import org.apache.iceberg.flink.sink.CommitSummary;
 import org.apache.iceberg.io.WriteResult;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.assertj.core.api.ThrowableAssert.ThrowingCallable;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
@@ -74,6 +84,39 @@ class TestDynamicCommitter {
                   ))
           .build();
 
+  private static final DataFile DATA_FILE_2 =
+      DataFiles.builder(PartitionSpec.unpartitioned())
+          .withPath("/path/to/data-2.parquet")
+          .withFileSizeInBytes(0)
+          .withMetrics(
+              new Metrics(
+                  24L,
+                  null, // no column sizes
+                  ImmutableMap.of(1, 3L), // value count
+                  ImmutableMap.of(1, 0L), // null count
+                  null,
+                  ImmutableMap.of(1, ByteBuffer.allocate(1)), // lower bounds
+                  ImmutableMap.of(1, ByteBuffer.allocate(1)) // upper bounds
+                  ))
+          .build();
+
+  private static final DeleteFile DELETE_FILE =
+      FileMetadata.deleteFileBuilder(PartitionSpec.unpartitioned())
+          .withPath("/path/to/data-3.parquet")
+          .withFileSizeInBytes(0)
+          .withMetrics(
+              new Metrics(
+                  24L,
+                  null, // no column sizes
+                  ImmutableMap.of(1, 3L), // value count
+                  ImmutableMap.of(1, 0L), // null count
+                  null,
+                  ImmutableMap.of(1, ByteBuffer.allocate(1)), // lower bounds
+                  ImmutableMap.of(1, ByteBuffer.allocate(1)) // upper bounds
+                  ))
+          .ofPositionDeletes()
+          .build();
+
   @BeforeEach
   void before() {
     catalog = CATALOG_EXTENSION.catalog();
@@ -162,60 +205,57 @@ class TestDynamicCommitter {
     Snapshot first = Iterables.getFirst(table1.snapshots(), null);
     assertThat(first.summary())
         .containsAllEntriesOf(
-            (Map)
-                ImmutableMap.builder()
-                    .put("added-data-files", "1")
-                    .put("added-records", "42")
-                    .put("changed-partition-count", "1")
-                    .put("flink.job-id", jobId)
-                    .put("flink.max-committed-checkpoint-id", "" + 
checkpointId)
-                    .put("flink.operator-id", operatorId)
-                    .put("total-data-files", "1")
-                    .put("total-delete-files", "0")
-                    .put("total-equality-deletes", "0")
-                    .put("total-files-size", "0")
-                    .put("total-position-deletes", "0")
-                    .put("total-records", "42")
-                    .build());
+            ImmutableMap.<String, String>builder()
+                .put("added-data-files", "1")
+                .put("added-records", "42")
+                .put("changed-partition-count", "1")
+                .put("flink.job-id", jobId)
+                .put("flink.max-committed-checkpoint-id", "" + checkpointId)
+                .put("flink.operator-id", operatorId)
+                .put("total-data-files", "1")
+                .put("total-delete-files", "0")
+                .put("total-equality-deletes", "0")
+                .put("total-files-size", "0")
+                .put("total-position-deletes", "0")
+                .put("total-records", "42")
+                .build());
     Snapshot second = Iterables.get(table1.snapshots(), 1, null);
     assertThat(second.summary())
         .containsAllEntriesOf(
-            (Map)
-                ImmutableMap.builder()
-                    .put("added-data-files", "1")
-                    .put("added-records", "42")
-                    .put("changed-partition-count", "1")
-                    .put("flink.job-id", jobId)
-                    .put("flink.max-committed-checkpoint-id", "" + 
checkpointId)
-                    .put("flink.operator-id", operatorId)
-                    .put("total-data-files", "1")
-                    .put("total-delete-files", "0")
-                    .put("total-equality-deletes", "0")
-                    .put("total-files-size", "0")
-                    .put("total-position-deletes", "0")
-                    .put("total-records", "42")
-                    .build());
+            ImmutableMap.<String, String>builder()
+                .put("added-data-files", "1")
+                .put("added-records", "42")
+                .put("changed-partition-count", "1")
+                .put("flink.job-id", jobId)
+                .put("flink.max-committed-checkpoint-id", "" + checkpointId)
+                .put("flink.operator-id", operatorId)
+                .put("total-data-files", "1")
+                .put("total-delete-files", "0")
+                .put("total-equality-deletes", "0")
+                .put("total-files-size", "0")
+                .put("total-position-deletes", "0")
+                .put("total-records", "42")
+                .build());
 
     table2.refresh();
     assertThat(table2.snapshots()).hasSize(1);
     Snapshot third = Iterables.getFirst(table2.snapshots(), null);
     assertThat(third.summary())
         .containsAllEntriesOf(
-            (Map)
-                ImmutableMap.builder()
-                    .put("added-data-files", "1")
-                    .put("added-records", "42")
-                    .put("changed-partition-count", "1")
-                    .put("flink.job-id", jobId)
-                    .put("flink.max-committed-checkpoint-id", "" + 
checkpointId)
-                    .put("flink.operator-id", operatorId)
-                    .put("total-data-files", "1")
-                    .put("total-delete-files", "0")
-                    .put("total-equality-deletes", "0")
-                    .put("total-files-size", "0")
-                    .put("total-position-deletes", "0")
-                    .put("total-records", "42")
-                    .build());
+            ImmutableMap.<String, String>builder()
+                .put("added-data-files", "1")
+                .put("added-records", "42")
+                .put("changed-partition-count", "1")
+                .put("flink.job-id", jobId)
+                .put("flink.max-committed-checkpoint-id", "" + checkpointId)
+                .put("flink.operator-id", operatorId)
+                .put("total-data-files", "1")
+                .put("total-delete-files", "0")
+                .put("total-equality-deletes", "0")
+                .put("total-files-size", "0")
+                .put("total-position-deletes", "0")
+                .put("total-records", "42")
+                .build());
   }
 
   @Test
@@ -277,21 +317,276 @@ class TestDynamicCommitter {
     Snapshot first = Iterables.getFirst(table1.snapshots(), null);
     assertThat(first.summary())
         .containsAllEntriesOf(
-            (Map)
-                ImmutableMap.builder()
-                    .put("added-data-files", "1")
-                    .put("added-records", "42")
-                    .put("changed-partition-count", "1")
-                    .put("flink.job-id", jobId)
-                    .put("flink.max-committed-checkpoint-id", "" + 
checkpointId)
-                    .put("flink.operator-id", operatorId)
-                    .put("total-data-files", "1")
-                    .put("total-delete-files", "0")
-                    .put("total-equality-deletes", "0")
-                    .put("total-files-size", "0")
-                    .put("total-position-deletes", "0")
-                    .put("total-records", "42")
-                    .build());
+            ImmutableMap.<String, String>builder()
+                .put("added-data-files", "1")
+                .put("added-records", "42")
+                .put("changed-partition-count", "1")
+                .put("flink.job-id", jobId)
+                .put("flink.max-committed-checkpoint-id", "" + checkpointId)
+                .put("flink.operator-id", operatorId)
+                .put("total-data-files", "1")
+                .put("total-delete-files", "0")
+                .put("total-equality-deletes", "0")
+                .put("total-files-size", "0")
+                .put("total-position-deletes", "0")
+                .put("total-records", "42")
+                .build());
+  }
+
+  @Test
+  void testTableBranchAtomicCommitForAppendOnlyData() throws Exception {
+    Table table = catalog.loadTable(TableIdentifier.of(TABLE1));
+    assertThat(table.snapshots()).isEmpty();
+
+    DynamicWriteResultAggregator aggregator =
+        new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader());
+    OneInputStreamOperatorTestHarness aggregatorHarness =
+        new OneInputStreamOperatorTestHarness(aggregator);
+    aggregatorHarness.open();
+
+    WriteTarget writeTarget1 =
+        new WriteTarget(TABLE1, "branch1", 42, 0, true, Sets.newHashSet(1, 2));
+    // writeTarget2 has a different schema
+    WriteTarget writeTarget2 = new WriteTarget(TABLE1, "branch1", 23, 0, true, 
Sets.newHashSet());
+    // Different branch for writeTarget3
+    WriteTarget writeTarget3 = new WriteTarget(TABLE1, "branch2", 23, 0, true, 
Sets.newHashSet());
+
+    WriteResult writeResult1 = 
WriteResult.builder().addDataFiles(DATA_FILE).build();
+    WriteResult writeResult2 = 
WriteResult.builder().addDataFiles(DATA_FILE_2).build();
+
+    final String jobId = JobID.generate().toHexString();
+    final String operatorId = new OperatorID().toHexString();
+    final int checkpointId1 = 1;
+    final int checkpointId2 = 2;
+
+    byte[] deltaManifest1 =
+        aggregator.writeToManifest(
+            writeTarget1,
+            Sets.newHashSet(new DynamicWriteResult(writeTarget1, 
writeResult1)),
+            checkpointId1);
+
+    CommitRequest<DynamicCommittable> commitRequest1 =
+        new MockCommitRequest<>(
+            new DynamicCommittable(writeTarget1, deltaManifest1, jobId, 
operatorId, checkpointId1));
+
+    byte[] deltaManifest2 =
+        aggregator.writeToManifest(
+            writeTarget2,
+            Sets.newHashSet(new DynamicWriteResult(writeTarget2, 
writeResult2)),
+            checkpointId1);
+
+    CommitRequest<DynamicCommittable> commitRequest2 =
+        new MockCommitRequest<>(
+            new DynamicCommittable(writeTarget2, deltaManifest2, jobId, 
operatorId, checkpointId1));
+
+    byte[] deltaManifest3 =
+        aggregator.writeToManifest(
+            writeTarget3,
+            Sets.newHashSet(new DynamicWriteResult(writeTarget3, 
writeResult2)),
+            checkpointId2);
+
+    CommitRequest<DynamicCommittable> commitRequest3 =
+        new MockCommitRequest<>(
+            new DynamicCommittable(writeTarget3, deltaManifest3, jobId, 
operatorId, checkpointId2));
+
+    boolean overwriteMode = false;
+    int workerPoolSize = 1;
+    String sinkId = "sinkId";
+    UnregisteredMetricsGroup metricGroup = new UnregisteredMetricsGroup();
+    DynamicCommitterMetrics committerMetrics = new 
DynamicCommitterMetrics(metricGroup);
+    DynamicCommitter dynamicCommitter =
+        new DynamicCommitter(
+            CATALOG_EXTENSION.catalog(),
+            Maps.newHashMap(),
+            overwriteMode,
+            workerPoolSize,
+            sinkId,
+            committerMetrics);
+
+    dynamicCommitter.commit(Sets.newHashSet(commitRequest1, commitRequest2, 
commitRequest3));
+
+    table.refresh();
+    // Two committables, one for each snapshot / table / branch.
+    assertThat(table.snapshots()).hasSize(2);
+
+    Snapshot snapshot1 = Iterables.getFirst(table.snapshots(), null);
+    
assertThat(snapshot1.snapshotId()).isEqualTo(table.refs().get("branch1").snapshotId());
+    assertThat(snapshot1.summary())
+        .containsAllEntriesOf(
+            ImmutableMap.<String, String>builder()
+                .put("added-data-files", "2")
+                .put("added-records", "66")
+                .put("changed-partition-count", "1")
+                .put("flink.job-id", jobId)
+                .put("flink.max-committed-checkpoint-id", "" + checkpointId1)
+                .put("flink.operator-id", operatorId)
+                .put("total-data-files", "2")
+                .put("total-delete-files", "0")
+                .put("total-equality-deletes", "0")
+                .put("total-files-size", "0")
+                .put("total-position-deletes", "0")
+                .put("total-records", "66")
+                .build());
+
+    Snapshot snapshot2 = Iterables.get(table.snapshots(), 1);
+    
assertThat(snapshot2.snapshotId()).isEqualTo(table.refs().get("branch2").snapshotId());
+    assertThat(snapshot2.summary())
+        .containsAllEntriesOf(
+            ImmutableMap.<String, String>builder()
+                .put("added-data-files", "1")
+                .put("added-records", "24")
+                .put("changed-partition-count", "1")
+                .put("flink.job-id", jobId)
+                .put("flink.max-committed-checkpoint-id", "" + checkpointId2)
+                .put("flink.operator-id", operatorId)
+                .put("total-data-files", "1")
+                .put("total-delete-files", "0")
+                .put("total-equality-deletes", "0")
+                .put("total-files-size", "0")
+                .put("total-position-deletes", "0")
+                .put("total-records", "24")
+                .build());
+  }
+
+  @Test
+  void testTableBranchAtomicCommitWithFailures() throws Exception {
+    Table table = catalog.loadTable(TableIdentifier.of(TABLE1));
+    assertThat(table.snapshots()).isEmpty();
+
+    DynamicWriteResultAggregator aggregator =
+        new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader());
+    OneInputStreamOperatorTestHarness aggregatorHarness =
+        new OneInputStreamOperatorTestHarness(aggregator);
+    aggregatorHarness.open();
+
+    WriteTarget writeTarget1 = new WriteTarget(TABLE1, "branch", 42, 0, false, 
Sets.newHashSet());
+    // writeTarget2 has a different schema
+    WriteTarget writeTarget2 = new WriteTarget(TABLE1, "branch", 23, 0, false, 
Sets.newHashSet());
+    WriteTarget writeTarget3 = new WriteTarget(TABLE1, "branch", 23, 0, false, 
Sets.newHashSet());
+
+    WriteResult writeResult1 = 
WriteResult.builder().addDataFiles(DATA_FILE).build();
+    WriteResult writeResult2 = 
WriteResult.builder().addDeleteFiles(DELETE_FILE).build();
+    WriteResult writeResult3 = 
WriteResult.builder().addDataFiles(DATA_FILE).build();
+
+    final String jobId = JobID.generate().toHexString();
+    final String operatorId = new OperatorID().toHexString();
+    final int checkpointId1 = 1;
+    final int checkpointId2 = 2;
+
+    byte[] deltaManifest1 =
+        aggregator.writeToManifest(
+            writeTarget1,
+            Sets.newHashSet(new DynamicWriteResult(writeTarget1, 
writeResult1)),
+            checkpointId1);
+
+    CommitRequest<DynamicCommittable> commitRequest1 =
+        new MockCommitRequest<>(
+            new DynamicCommittable(writeTarget1, deltaManifest1, jobId, 
operatorId, checkpointId1));
+
+    byte[] deltaManifest2 =
+        aggregator.writeToManifest(
+            writeTarget2,
+            Sets.newHashSet(new DynamicWriteResult(writeTarget2, 
writeResult2)),
+            checkpointId2);
+
+    CommitRequest<DynamicCommittable> commitRequest2 =
+        new MockCommitRequest<>(
+            new DynamicCommittable(writeTarget2, deltaManifest2, jobId, 
operatorId, checkpointId2));
+
+    byte[] deltaManifest3 =
+        aggregator.writeToManifest(
+            writeTarget3,
+            Sets.newHashSet(new DynamicWriteResult(writeTarget3, 
writeResult3)),
+            checkpointId2);
+
+    CommitRequest<DynamicCommittable> commitRequest3 =
+        new MockCommitRequest<>(
+            new DynamicCommittable(writeTarget3, deltaManifest3, jobId, 
operatorId, checkpointId2));
+
+    boolean overwriteMode = false;
+    int workerPoolSize = 1;
+    String sinkId = "sinkId";
+    UnregisteredMetricsGroup metricGroup = new UnregisteredMetricsGroup();
+    DynamicCommitterMetrics committerMetrics = new 
DynamicCommitterMetrics(metricGroup);
+
+    // Use special hook to fail during various states of the commit operation
+    CommitHook commitHook = new FailBeforeAndAfterCommit();
+    DynamicCommitter dynamicCommitter =
+        new CommitHookEnabledDynamicCommitter(
+            commitHook,
+            CATALOG_EXTENSION.catalog(),
+            Maps.newHashMap(),
+            overwriteMode,
+            workerPoolSize,
+            sinkId,
+            committerMetrics);
+
+    ThrowingCallable commitExecutable =
+        () ->
+            dynamicCommitter.commit(
+                Sets.newHashSet(commitRequest1, commitRequest2, 
commitRequest3));
+
+    // First fail pre-commit
+    assertThatThrownBy(commitExecutable);
+    assertThat(FailBeforeAndAfterCommit.failedBeforeCommit).isTrue();
+
+    // Second fail during commit
+    assertThatThrownBy(commitExecutable);
+    assertThat(FailBeforeAndAfterCommit.failedDuringCommit).isTrue();
+
+    // Third fail after commit
+    assertThatThrownBy(commitExecutable);
+    assertThat(FailBeforeAndAfterCommit.failedAfterCommit).isTrue();
+
+    // Finally commit must go through, although it is a NOOP because the third 
failure is directly
+    // after the commit finished.
+    try {
+      commitExecutable.call();
+    } catch (Throwable e) {
+      fail("Should not have thrown an exception");
+    }
+
+    table.refresh();
+    // Three committables, but only two snapshots! WriteResults from different 
checkpoints are not
+    // getting
+    // combined due to one writeResult2 containing a delete file.
+    assertThat(table.snapshots()).hasSize(2);
+
+    Snapshot snapshot1 = Iterables.getFirst(table.snapshots(), null);
+    assertThat(snapshot1.summary())
+        .containsAllEntriesOf(
+            ImmutableMap.<String, String>builder()
+                .put("added-data-files", "1")
+                .put("added-records", "42")
+                .put("changed-partition-count", "1")
+                .put("flink.job-id", jobId)
+                .put("flink.max-committed-checkpoint-id", "" + checkpointId1)
+                .put("flink.operator-id", operatorId)
+                .put("total-data-files", "1")
+                .put("total-delete-files", "0")
+                .put("total-equality-deletes", "0")
+                .put("total-files-size", "0")
+                .put("total-position-deletes", "0")
+                .put("total-records", "42")
+                .build());
+
+    Snapshot snapshot2 = Iterables.get(table.snapshots(), 1);
+    assertThat(snapshot2.summary())
+        .containsAllEntriesOf(
+            ImmutableMap.<String, String>builder()
+                .put("added-data-files", "1")
+                .put("added-records", "42")
+                .put("changed-partition-count", "1")
+                .put("flink.job-id", jobId)
+                .put("flink.max-committed-checkpoint-id", "" + checkpointId2)
+                .put("flink.operator-id", operatorId)
+                .put("total-data-files", "2")
+                .put("total-delete-files", "1")
+                .put("total-equality-deletes", "0")
+                .put("total-files-size", "0")
+                .put("total-position-deletes", "24")
+                .put("total-records", "84")
+                .build());
   }
 
   @Test
@@ -361,21 +656,109 @@ class TestDynamicCommitter {
     Snapshot latestSnapshot = Iterables.getLast(table1.snapshots());
     assertThat(latestSnapshot.summary())
         .containsAllEntriesOf(
-            (Map)
-                ImmutableMap.builder()
-                    .put("replace-partitions", "true")
-                    .put("added-data-files", "1")
-                    .put("added-records", "42")
-                    .put("changed-partition-count", "1")
-                    .put("flink.job-id", jobId)
-                    .put("flink.max-committed-checkpoint-id", 
String.valueOf(checkpointId + 1))
-                    .put("flink.operator-id", operatorId)
-                    .put("total-data-files", "1")
-                    .put("total-delete-files", "0")
-                    .put("total-equality-deletes", "0")
-                    .put("total-files-size", "0")
-                    .put("total-position-deletes", "0")
-                    .put("total-records", "42")
-                    .build());
+            ImmutableMap.<String, String>builder()
+                .put("replace-partitions", "true")
+                .put("added-data-files", "1")
+                .put("added-records", "42")
+                .put("changed-partition-count", "1")
+                .put("flink.job-id", jobId)
+                .put("flink.max-committed-checkpoint-id", 
String.valueOf(checkpointId + 1))
+                .put("flink.operator-id", operatorId)
+                .put("total-data-files", "1")
+                .put("total-delete-files", "0")
+                .put("total-equality-deletes", "0")
+                .put("total-files-size", "0")
+                .put("total-position-deletes", "0")
+                .put("total-records", "42")
+                .build());
+  }
+
+  interface CommitHook extends Serializable {
+    void beforeCommit();
+
+    void duringCommit();
+
+    void afterCommit();
+  }
+
+  static class FailBeforeAndAfterCommit implements CommitHook {
+
+    static boolean failedBeforeCommit;
+    static boolean failedDuringCommit;
+    static boolean failedAfterCommit;
+
+    FailBeforeAndAfterCommit() {
+      reset();
+    }
+
+    @Override
+    public void beforeCommit() {
+      if (!failedBeforeCommit) {
+        failedBeforeCommit = true;
+        throw new RuntimeException("Failing before commit");
+      }
+    }
+
+    @Override
+    public void duringCommit() {
+      if (!failedDuringCommit) {
+        failedDuringCommit = true;
+        throw new RuntimeException("Failing during commit");
+      }
+    }
+
+    @Override
+    public void afterCommit() {
+      if (!failedAfterCommit) {
+        failedAfterCommit = true;
+        throw new RuntimeException("Failing before commit");
+      }
+    }
+
+    static void reset() {
+      failedBeforeCommit = false;
+      failedDuringCommit = false;
+      failedAfterCommit = false;
+    }
+  }
+
+  static class CommitHookEnabledDynamicCommitter extends DynamicCommitter {
+    private final CommitHook commitHook;
+
+    CommitHookEnabledDynamicCommitter(
+        CommitHook commitHook,
+        Catalog catalog,
+        Map<String, String> snapshotProperties,
+        boolean replacePartitions,
+        int workerPoolSize,
+        String sinkId,
+        DynamicCommitterMetrics committerMetrics) {
+      super(
+          catalog, snapshotProperties, replacePartitions, workerPoolSize, 
sinkId, committerMetrics);
+      this.commitHook = commitHook;
+    }
+
+    @Override
+    public void commit(Collection<CommitRequest<DynamicCommittable>> 
commitRequests)
+        throws IOException, InterruptedException {
+      commitHook.beforeCommit();
+      super.commit(commitRequests);
+      commitHook.afterCommit();
+    }
+
+    @Override
+    void commitOperation(
+        Table table,
+        String branch,
+        SnapshotUpdate<?> operation,
+        CommitSummary summary,
+        String description,
+        String newFlinkJobId,
+        String operatorId,
+        long checkpointId) {
+      super.commitOperation(
+          table, branch, operation, summary, description, newFlinkJobId, 
operatorId, checkpointId);
+      commitHook.duringCommit();
+    }
   }
 }
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
index b61e297cc1..20fae212b4 100644
--- 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
@@ -25,7 +25,6 @@ import static org.assertj.core.api.Assertions.fail;
 import java.io.IOException;
 import java.io.Serializable;
 import java.time.Duration;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -56,7 +55,6 @@ import org.apache.iceberg.DistributionMode;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
-import org.apache.iceberg.SnapshotUpdate;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.TableIdentifier;
@@ -69,8 +67,9 @@ import org.apache.iceberg.flink.FlinkWriteConf;
 import org.apache.iceberg.flink.MiniFlinkClusterExtension;
 import org.apache.iceberg.flink.SimpleDataUtil;
 import org.apache.iceberg.flink.TestHelpers;
-import org.apache.iceberg.flink.sink.CommitSummary;
 import org.apache.iceberg.flink.sink.TestFlinkIcebergSinkBase;
+import org.apache.iceberg.flink.sink.dynamic.TestDynamicCommitter.CommitHook;
+import 
org.apache.iceberg.flink.sink.dynamic.TestDynamicCommitter.FailBeforeAndAfterCommit;
 import org.apache.iceberg.inmemory.InMemoryInputFile;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -528,7 +527,8 @@ class TestDynamicIcebergSink extends 
TestFlinkIcebergSinkBase {
     // Configure a Restart strategy to allow recovery
     Configuration configuration = new Configuration();
     configuration.set(RestartStrategyOptions.RESTART_STRATEGY, "fixed-delay");
-    
configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 
2);
+    // Allow max 3 retries to make up for the three failures we are simulating 
here
+    
configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 
3);
     
configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY, 
Duration.ZERO);
     env.configure(configuration);
 
@@ -539,14 +539,15 @@ class TestDynamicIcebergSink extends 
TestFlinkIcebergSinkBase {
             new DynamicIcebergDataImpl(
                 SimpleDataUtil.SCHEMA, "t2", "main", 
PartitionSpec.unpartitioned()));
 
-    FailBeforeAndAfterCommit.reset();
     final CommitHook commitHook = new FailBeforeAndAfterCommit();
     assertThat(FailBeforeAndAfterCommit.failedBeforeCommit).isFalse();
+    assertThat(FailBeforeAndAfterCommit.failedDuringCommit).isFalse();
     assertThat(FailBeforeAndAfterCommit.failedAfterCommit).isFalse();
 
     executeDynamicSink(rows, env, true, 1, commitHook);
 
     assertThat(FailBeforeAndAfterCommit.failedBeforeCommit).isTrue();
+    assertThat(FailBeforeAndAfterCommit.failedDuringCommit).isTrue();
     assertThat(FailBeforeAndAfterCommit.failedAfterCommit).isTrue();
   }
 
@@ -569,44 +570,6 @@ class TestDynamicIcebergSink extends 
TestFlinkIcebergSinkBase {
     executeDynamicSink(rows, env, true, 1, commitHook);
   }
 
-  interface CommitHook extends Serializable {
-    void beforeCommit();
-
-    void duringCommit();
-
-    void afterCommit();
-  }
-
-  private static class FailBeforeAndAfterCommit implements CommitHook {
-
-    static boolean failedBeforeCommit;
-    static boolean failedAfterCommit;
-
-    @Override
-    public void beforeCommit() {
-      if (!failedBeforeCommit) {
-        failedBeforeCommit = true;
-        throw new RuntimeException("Failing before commit");
-      }
-    }
-
-    @Override
-    public void duringCommit() {}
-
-    @Override
-    public void afterCommit() {
-      if (!failedAfterCommit) {
-        failedAfterCommit = true;
-        throw new RuntimeException("Failing before commit");
-      }
-    }
-
-    static void reset() {
-      failedBeforeCommit = false;
-      failedAfterCommit = false;
-    }
-  }
-
   private static class AppendRightBeforeCommit implements CommitHook {
 
     final String tableIdentifier;
@@ -734,8 +697,7 @@ class TestDynamicIcebergSink extends 
TestFlinkIcebergSinkBase {
 
     @Override
     public Committer<DynamicCommittable> createCommitter(CommitterInitContext 
context) {
-      //      return super.createCommitter(context);
-      return new CommitHookEnabledDynamicCommitter(
+      return new TestDynamicCommitter.CommitHookEnabledDynamicCommitter(
           commitHook,
           CATALOG_EXTENSION.catalogLoader().loadCatalog(),
           Collections.emptyMap(),
@@ -746,46 +708,6 @@ class TestDynamicIcebergSink extends 
TestFlinkIcebergSinkBase {
     }
   }
 
-  static class CommitHookEnabledDynamicCommitter extends DynamicCommitter {
-    private final CommitHook commitHook;
-
-    CommitHookEnabledDynamicCommitter(
-        CommitHook commitHook,
-        Catalog catalog,
-        Map<String, String> snapshotProperties,
-        boolean replacePartitions,
-        int workerPoolSize,
-        String sinkId,
-        DynamicCommitterMetrics committerMetrics) {
-      super(
-          catalog, snapshotProperties, replacePartitions, workerPoolSize, 
sinkId, committerMetrics);
-      this.commitHook = commitHook;
-    }
-
-    @Override
-    public void commit(Collection<CommitRequest<DynamicCommittable>> 
commitRequests)
-        throws IOException, InterruptedException {
-      commitHook.beforeCommit();
-      super.commit(commitRequests);
-      commitHook.afterCommit();
-    }
-
-    @Override
-    void commitOperation(
-        Table table,
-        String branch,
-        SnapshotUpdate<?> operation,
-        CommitSummary summary,
-        String description,
-        String newFlinkJobId,
-        String operatorId,
-        long checkpointId) {
-      commitHook.duringCommit();
-      super.commitOperation(
-          table, branch, operation, summary, description, newFlinkJobId, 
operatorId, checkpointId);
-    }
-  }
-
   private void verifyResults(List<DynamicIcebergDataImpl> dynamicData) throws 
IOException {
     // Calculate the expected result
     Map<Tuple2<String, String>, List<RowData>> expectedData = 
Maps.newHashMap();

Reply via email to