This is an automated email from the ASF dual-hosted git repository.

lvyanquan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 15288b3d3 [FLINK-38450][iceberg] Fix duplicate records when schema 
change splits writes within a checkpoint (#4360)
15288b3d3 is described below

commit 15288b3d32989bf8c7ebaa6c4d8703ca1771d757
Author: Spoorthi Basu <[email protected]>
AuthorDate: Tue Jun 2 21:30:57 2026 -0400

    [FLINK-38450][iceberg] Fix duplicate records when schema change splits 
writes within a checkpoint (#4360)
---
 .../iceberg/sink/v2/IcebergCommitter.java          | 172 +++-
 .../connectors/iceberg/sink/v2/IcebergWriter.java  |  50 +-
 .../iceberg/sink/v2/WriteResultWrapper.java        |  22 +-
 .../sink/v2/WriteResultWrapperSerializer.java      |   6 +-
 .../iceberg/sink/v2/IcebergWriterTest.java         | 980 +++++++++++++++++++++
 .../pipeline/tests/MySqlToIcebergE2eITCase.java    |  20 +-
 6 files changed, 1202 insertions(+), 48 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergCommitter.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergCommitter.java
index 759777440..847292c5b 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergCommitter.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergCommitter.java
@@ -47,6 +47,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.TreeMap;
 
 import static java.util.stream.Collectors.toList;
 import static 
org.apache.flink.runtime.checkpoint.CheckpointIDCounter.INITIAL_CHECKPOINT_ID;
@@ -62,6 +63,15 @@ public class IcebergCommitter implements 
Committer<WriteResultWrapper> {
 
     public static final String TABLE_GROUP_KEY = "table";
 
+    // Use a flink-cdc. prefix so these don't clash with the flink. namespace 
reserved by the
+    // Iceberg Flink connector.
+
+    /** Snapshot summary key for the batch index; used to resume partial 
commits on retry. */
+    static final String FLINK_BATCH_INDEX = "flink-cdc.batch-index";
+
+    /** Snapshot summary key for the checkpoint ID on intermediate batch 
commits. */
+    static final String FLINK_CHECKPOINT_ID_PROP = "flink-cdc.checkpoint-id";
+
     private final Catalog catalog;
 
     private final SinkCommitterMetricGroup metricGroup;
@@ -96,74 +106,140 @@ public class IcebergCommitter implements 
Committer<WriteResultWrapper> {
         if (writeResultWrappers.isEmpty()) {
             return;
         }
-        // all commits a same checkpoint-id
         long checkpointId = writeResultWrappers.get(0).getCheckpointId();
         String newFlinkJobId = writeResultWrappers.get(0).getJobId();
         String operatorId = writeResultWrappers.get(0).getOperatorId();
 
-        Map<TableId, List<WriteResult>> tableMap = new HashMap<>();
-        for (WriteResultWrapper writeResultWrapper : writeResultWrappers) {
-            List<WriteResult> writeResult =
-                    tableMap.getOrDefault(writeResultWrapper.getTableId(), new 
ArrayList<>());
-            writeResult.add(writeResultWrapper.getWriteResult());
-            tableMap.put(writeResultWrapper.getTableId(), writeResult);
-            LOGGER.info(writeResultWrapper.buildDescription());
+        Map<TableId, List<WriteResultWrapper>> tableMap = new HashMap<>();
+        for (WriteResultWrapper w : writeResultWrappers) {
+            tableMap.computeIfAbsent(w.getTableId(), k -> new 
ArrayList<>()).add(w);
         }
-        for (Map.Entry<TableId, List<WriteResult>> entry : 
tableMap.entrySet()) {
+
+        for (Map.Entry<TableId, List<WriteResultWrapper>> entry : 
tableMap.entrySet()) {
             TableId tableId = entry.getKey();
 
+            // Group by batchIndex so wrappers from different subtasks for the 
same batch
+            // are merged into one snapshot, not committed separately.
+            TreeMap<Integer, List<WriteResultWrapper>> batchGroups = new 
TreeMap<>();
+            for (WriteResultWrapper w : entry.getValue()) {
+                batchGroups.computeIfAbsent(w.getBatchIndex(), k -> new 
ArrayList<>()).add(w);
+                LOGGER.info(w.buildDescription());
+            }
+
             Table table =
                     catalog.loadTable(
                             TableIdentifier.of(tableId.getSchemaName(), 
tableId.getTableName()));
 
+            int startBatchIndex = 0;
             Snapshot snapshot = table.currentSnapshot();
             if (snapshot != null) {
                 Iterable<Snapshot> ancestors =
                         SnapshotUtil.ancestorsOf(snapshot.snapshotId(), 
table::snapshot);
-                long lastCheckpointId =
+                long lastCommittedCheckpointId =
                         getMaxCommittedCheckpointId(ancestors, newFlinkJobId, 
operatorId);
-                if (lastCheckpointId == checkpointId) {
+                if (lastCommittedCheckpointId >= checkpointId) {
                     LOGGER.warn(
                             "Checkpoint id {} has been committed to table {}, 
skipping",
                             checkpointId,
                             tableId.identifier());
                     continue;
                 }
+                ancestors = SnapshotUtil.ancestorsOf(snapshot.snapshotId(), 
table::snapshot);
+                startBatchIndex =
+                        getLastCommittedBatchIndex(
+                                        ancestors, newFlinkJobId, operatorId, 
checkpointId)
+                                + 1;
             }
 
             Optional<TableMetric> tableMetric = getTableMetric(tableId);
             tableMetric.ifPresent(TableMetric::increaseCommitTimes);
 
-            List<WriteResult> results = entry.getValue();
-            List<DataFile> dataFiles =
-                    results.stream()
-                            .filter(payload -> payload.dataFiles() != null)
-                            .flatMap(payload -> 
Arrays.stream(payload.dataFiles()))
-                            .filter(dataFile -> dataFile.recordCount() > 0)
-                            .collect(toList());
-            List<DeleteFile> deleteFiles =
-                    results.stream()
-                            .filter(payload -> payload.deleteFiles() != null)
-                            .flatMap(payload -> 
Arrays.stream(payload.deleteFiles()))
-                            .filter(deleteFile -> deleteFile.recordCount() > 0)
-                            .collect(toList());
-            if (dataFiles.isEmpty() && deleteFiles.isEmpty()) {
-                LOGGER.info(String.format("Nothing to commit to table %s, 
skipping", table.name()));
-            } else {
+            int lastNonEmptyBatchIndex = -1;
+            for (Map.Entry<Integer, List<WriteResultWrapper>> g : 
batchGroups.entrySet()) {
+                List<DataFile> df = collectDataFilesFromGroup(g.getValue());
+                List<DeleteFile> del = 
collectDeleteFilesFromGroup(g.getValue());
+                if (!df.isEmpty() || !del.isEmpty()) {
+                    lastNonEmptyBatchIndex = g.getKey();
+                }
+            }
+
+            // Commit each batch as a separate snapshot so sequence numbers 
increase per batch.
+            for (Map.Entry<Integer, List<WriteResultWrapper>> g : 
batchGroups.entrySet()) {
+                int batchIdx = g.getKey();
+                if (batchIdx < startBatchIndex) {
+                    LOGGER.info(
+                            "Batch {} for checkpoint {} of table {} already 
committed, skipping",
+                            batchIdx,
+                            checkpointId,
+                            tableId.identifier());
+                    continue;
+                }
+
+                List<DataFile> dataFiles = 
collectDataFilesFromGroup(g.getValue());
+                List<DeleteFile> deleteFiles = 
collectDeleteFilesFromGroup(g.getValue());
+
+                if (dataFiles.isEmpty() && deleteFiles.isEmpty()) {
+                    LOGGER.info(
+                            "Batch {} for checkpoint {} of table {} has 
nothing to commit, skipping",
+                            batchIdx,
+                            checkpointId,
+                            tableId.identifier());
+                    continue;
+                }
+
+                SnapshotUpdate<?> operation;
                 if (deleteFiles.isEmpty()) {
                     AppendFiles append = table.newAppend();
                     dataFiles.forEach(append::appendFile);
-                    commitOperation(append, newFlinkJobId, operatorId, 
checkpointId);
+                    operation = append;
                 } else {
                     RowDelta delta = table.newRowDelta();
                     dataFiles.forEach(delta::addRows);
                     deleteFiles.forEach(delta::addDeletes);
-                    commitOperation(delta, newFlinkJobId, operatorId, 
checkpointId);
+                    operation = delta;
                 }
+
+                operation.set(SinkUtil.FLINK_JOB_ID, newFlinkJobId);
+                operation.set(SinkUtil.OPERATOR_ID, operatorId);
+                operation.set(FLINK_BATCH_INDEX, String.valueOf(batchIdx));
+                operation.set(FLINK_CHECKPOINT_ID_PROP, 
String.valueOf(checkpointId));
+                if (batchIdx == lastNonEmptyBatchIndex) {
+                    operation.set(
+                            SinkUtil.MAX_COMMITTED_CHECKPOINT_ID, 
String.valueOf(checkpointId));
+                }
+                operation.commit();
             }
         }
     }
 
+    private static List<DataFile> 
collectDataFilesFromGroup(List<WriteResultWrapper> group) {
+        return group.stream()
+                .flatMap(w -> collectDataFiles(w.getWriteResult()).stream())
+                .collect(toList());
+    }
+
+    private static List<DeleteFile> 
collectDeleteFilesFromGroup(List<WriteResultWrapper> group) {
+        return group.stream()
+                .flatMap(w -> collectDeleteFiles(w.getWriteResult()).stream())
+                .collect(toList());
+    }
+
+    private static List<DataFile> collectDataFiles(WriteResult result) {
+        if (result.dataFiles() == null) {
+            return new ArrayList<>();
+        }
+        return Arrays.stream(result.dataFiles()).filter(f -> f.recordCount() > 
0).collect(toList());
+    }
+
+    private static List<DeleteFile> collectDeleteFiles(WriteResult result) {
+        if (result.deleteFiles() == null) {
+            return new ArrayList<>();
+        }
+        return Arrays.stream(result.deleteFiles())
+                .filter(f -> f.recordCount() > 0)
+                .collect(toList());
+    }
+
     private static long getMaxCommittedCheckpointId(
             Iterable<Snapshot> ancestors, String flinkJobId, String 
operatorId) {
         long lastCommittedCheckpointId = INITIAL_CHECKPOINT_ID - 1;
@@ -185,15 +261,35 @@ public class IcebergCommitter implements 
Committer<WriteResultWrapper> {
         return lastCommittedCheckpointId;
     }
 
-    private static void commitOperation(
-            SnapshotUpdate<?> operation,
-            String newFlinkJobId,
-            String operatorId,
-            long checkpointId) {
-        operation.set(SinkUtil.MAX_COMMITTED_CHECKPOINT_ID, 
Long.toString(checkpointId));
-        operation.set(SinkUtil.FLINK_JOB_ID, newFlinkJobId);
-        operation.set(SinkUtil.OPERATOR_ID, operatorId);
-        operation.commit();
+    /**
+     * Returns the highest batch index already committed for the given 
checkpoint, or -1 if none.
+     * Used to skip already-persisted batches on retry.
+     */
+    private static int getLastCommittedBatchIndex(
+            Iterable<Snapshot> ancestors, String flinkJobId, String 
operatorId, long checkpointId) {
+        for (Snapshot ancestor : ancestors) {
+            Map<String, String> summary = ancestor.summary();
+            if (!flinkJobId.equals(summary.get(SinkUtil.FLINK_JOB_ID))) {
+                continue;
+            }
+            String snapshotOperatorId = summary.get(SinkUtil.OPERATOR_ID);
+            if (snapshotOperatorId != null && 
!snapshotOperatorId.equals(operatorId)) {
+                continue;
+            }
+            // Stop once we pass a fully-committed earlier checkpoint; 
intermediate batch
+            // snapshots for the current checkpoint lie between it and the 
current tip.
+            String maxCommittedStr = 
summary.get(SinkUtil.MAX_COMMITTED_CHECKPOINT_ID);
+            if (maxCommittedStr != null && Long.parseLong(maxCommittedStr) < 
checkpointId) {
+                break;
+            }
+            String snapshotCheckpointId = 
summary.get(FLINK_CHECKPOINT_ID_PROP);
+            if (snapshotCheckpointId != null
+                    && Long.parseLong(snapshotCheckpointId) == checkpointId) {
+                String batchIndexStr = summary.get(FLINK_BATCH_INDEX);
+                return batchIndexStr != null ? Integer.parseInt(batchIndexStr) 
: 0;
+            }
+        }
+        return -1;
     }
 
     private Optional<TableMetric> getTableMetric(TableId tableId) {
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriter.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriter.java
index cbcd3b98e..9af29d88c 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriter.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriter.java
@@ -72,6 +72,11 @@ public class IcebergWriter
 
     private final List<WriteResultWrapper> temporaryWriteResult;
 
+    /**
+     * Per-table batch index; incremented on each schema-change flush, even 
when no writer exists.
+     */
+    private Map<TableId, Integer> tableBatchIndexMap;
+
     private Catalog catalog;
 
     private final int taskId;
@@ -102,6 +107,7 @@ public class IcebergWriter
         writerFactoryMap = new HashMap<>();
         writerMap = new HashMap<>();
         schemaMap = new HashMap<>();
+        tableBatchIndexMap = new HashMap<>();
         temporaryWriteResult = new ArrayList<>();
         this.taskId = taskId;
         this.attemptId = attemptId;
@@ -129,6 +135,7 @@ public class IcebergWriter
         list.addAll(temporaryWriteResult);
         list.addAll(getWriteResult());
         temporaryWriteResult.clear();
+        tableBatchIndexMap.clear();
         lastCheckpointId++;
         return list;
     }
@@ -166,6 +173,11 @@ public class IcebergWriter
         } else {
             SchemaChangeEvent schemaChangeEvent = (SchemaChangeEvent) event;
             TableId tableId = schemaChangeEvent.tableId();
+            // Flush only when the table is already known; skip on initial 
CreateTableEvent since
+            // no data has been written yet and there is nothing to split.
+            if (schemaMap.containsKey(tableId)) {
+                flushTableWriter(tableId);
+            }
             TableSchemaWrapper tableSchemaWrapper = schemaMap.get(tableId);
 
             Schema newSchema =
@@ -179,21 +191,46 @@ public class IcebergWriter
 
     @Override
     public void flush(boolean flush) throws IOException {
-        // Notice: flush method may be called many times during one checkpoint.
-        temporaryWriteResult.addAll(getWriteResult());
+        // Clear the factory cache so the next write picks up the latest 
catalog schema.
+        // Writers keep running; schema-change splits are handled in 
flushTableWriter.
+        writerFactoryMap.clear();
+    }
+
+    private void flushTableWriter(TableId tableId) throws IOException {
+        TaskWriter<RowData> writer = writerMap.remove(tableId);
+        // Advance even when no writer exists, to keep batchIndex in sync 
across subtasks.
+        int batchIndex = tableBatchIndexMap.getOrDefault(tableId, 0);
+        tableBatchIndexMap.put(tableId, batchIndex + 1);
+        if (writer == null) {
+            return;
+        }
+        WriteResultWrapper writeResultWrapper =
+                new WriteResultWrapper(
+                        writer.complete(),
+                        tableId,
+                        lastCheckpointId + 1,
+                        jobId,
+                        operatorId,
+                        batchIndex);
+        temporaryWriteResult.add(writeResultWrapper);
+        LOGGER.info(writeResultWrapper.buildDescription());
+        writerFactoryMap.remove(tableId);
     }
 
     private List<WriteResultWrapper> getWriteResult() throws IOException {
         long currentCheckpointId = lastCheckpointId + 1;
         List<WriteResultWrapper> writeResults = new ArrayList<>();
         for (Map.Entry<TableId, TaskWriter<RowData>> entry : 
writerMap.entrySet()) {
+            TableId tableId = entry.getKey();
+            int batchIndex = tableBatchIndexMap.getOrDefault(tableId, 0);
             WriteResultWrapper writeResultWrapper =
                     new WriteResultWrapper(
                             entry.getValue().complete(),
-                            entry.getKey(),
+                            tableId,
                             currentCheckpointId,
                             jobId,
-                            operatorId);
+                            operatorId,
+                            batchIndex);
             writeResults.add(writeResultWrapper);
             LOGGER.info(writeResultWrapper.buildDescription());
         }
@@ -225,6 +262,11 @@ public class IcebergWriter
             writerFactoryMap = null;
         }
 
+        if (tableBatchIndexMap != null) {
+            tableBatchIndexMap.clear();
+            tableBatchIndexMap = null;
+        }
+
         catalog = null;
     }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/WriteResultWrapper.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/WriteResultWrapper.java
index e64cc5535..3e8d733c5 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/WriteResultWrapper.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/WriteResultWrapper.java
@@ -40,17 +40,31 @@ public class WriteResultWrapper implements Serializable {
 
     private final String operatorId;
 
+    /** Batch index within the checkpoint for this table; increments on each 
schema-change flush. */
+    private final int batchIndex;
+
     public WriteResultWrapper(
             WriteResult writeResult,
             TableId tableId,
             long checkpointId,
             String jobId,
-            String operatorId) {
+            String operatorId,
+            int batchIndex) {
         this.writeResult = writeResult;
         this.tableId = tableId;
         this.checkpointId = checkpointId;
         this.jobId = jobId;
         this.operatorId = operatorId;
+        this.batchIndex = batchIndex;
+    }
+
+    public WriteResultWrapper(
+            WriteResult writeResult,
+            TableId tableId,
+            long checkpointId,
+            String jobId,
+            String operatorId) {
+        this(writeResult, tableId, checkpointId, jobId, operatorId, 0);
     }
 
     public WriteResult getWriteResult() {
@@ -73,6 +87,10 @@ public class WriteResultWrapper implements Serializable {
         return operatorId;
     }
 
+    public int getBatchIndex() {
+        return batchIndex;
+    }
+
     /** Build a simple description for the write result. */
     public String buildDescription() {
         long addCount = 0;
@@ -95,6 +113,8 @@ public class WriteResultWrapper implements Serializable {
                 + jobId
                 + ", OperatorId: "
                 + operatorId
+                + ", BatchIndex: "
+                + batchIndex
                 + ", AddCount: "
                 + addCount
                 + ", DeleteCount: "
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/WriteResultWrapperSerializer.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/WriteResultWrapperSerializer.java
index ca8775d3e..26b5b942d 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/WriteResultWrapperSerializer.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/WriteResultWrapperSerializer.java
@@ -29,7 +29,8 @@ import java.io.IOException;
 
 /** Serializer for {@link WriteResultWrapper}. */
 class WriteResultWrapperSerializer implements 
SimpleVersionedSerializer<WriteResultWrapper> {
-    private static final int VERSION = 1;
+    // v2 added the batchIndex field. v1 payloads still read back: the missing 
field defaults to 0.
+    private static final int VERSION = 2;
 
     @Override
     public int getVersion() {
@@ -47,7 +48,8 @@ class WriteResultWrapperSerializer implements 
SimpleVersionedSerializer<WriteRes
 
     @Override
     public WriteResultWrapper deserialize(int version, byte[] serialized) 
throws IOException {
-        if (version == 1) {
+        // v1 and v2 share the same byte layout, so both deserialize the same 
way.
+        if (version == 1 || version == 2) {
             DataInputDeserializer view = new DataInputDeserializer(serialized);
             byte[] resultBuf = new byte[serialized.length];
             view.read(resultBuf);
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterTest.java
index 745aea81f..71e08de4c 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterTest.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterTest.java
@@ -42,7 +42,11 @@ import 
org.apache.flink.streaming.runtime.operators.sink.committables.CommitRequ
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.RowDelta;
+import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.TableIdentifier;
@@ -66,6 +70,7 @@ import java.time.ZonedDateTime;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -596,6 +601,981 @@ public class IcebergWriterTest {
         Assertions.assertThat(result).containsExactlyInAnyOrder("1, char1", 
"2, char2");
     }
 
+    @Test
+    public void testNoDuplicateWhenFlushSplitsSamePkUpdatesWithinCheckpoint() 
throws Exception {
+        Map<String, String> catalogOptions = new HashMap<>();
+        String warehouse =
+                new File(temporaryFolder.toFile(), 
UUID.randomUUID().toString()).toString();
+        catalogOptions.put("type", "hadoop");
+        catalogOptions.put("warehouse", warehouse);
+        catalogOptions.put("cache-enabled", "false");
+        Catalog catalog =
+                CatalogUtil.buildIcebergCatalog(
+                        "cdc-iceberg-catalog", catalogOptions, new 
Configuration());
+
+        String jobId = UUID.randomUUID().toString();
+        String operatorId = UUID.randomUUID().toString();
+        IcebergWriter icebergWriter =
+                new IcebergWriter(
+                        catalogOptions,
+                        1,
+                        1,
+                        ZoneId.systemDefault(),
+                        0,
+                        jobId,
+                        operatorId,
+                        new HashMap<>());
+        IcebergMetadataApplier icebergMetadataApplier = new 
IcebergMetadataApplier(catalogOptions);
+
+        TableId tableId = TableId.parse("test.iceberg_table");
+        CreateTableEvent createTableEvent =
+                new CreateTableEvent(
+                        tableId,
+                        Schema.newBuilder()
+                                .physicalColumn("id", 
DataTypes.BIGINT().notNull())
+                                .physicalColumn("name", DataTypes.VARCHAR(100))
+                                .primaryKey("id")
+                                .build());
+        icebergMetadataApplier.applySchemaChange(createTableEvent);
+        icebergWriter.write(createTableEvent, null);
+
+        BinaryRecordDataGenerator generator =
+                new BinaryRecordDataGenerator(
+                        
createTableEvent.getSchema().getColumnDataTypes().toArray(new DataType[0]));
+
+        RecordData recordA =
+                generator.generate(new Object[] {1L, 
BinaryStringData.fromString("a")});
+        RecordData recordB =
+                generator.generate(new Object[] {1L, 
BinaryStringData.fromString("b")});
+
+        icebergWriter.write(DataChangeEvent.insertEvent(tableId, recordA), 
null);
+        // flush resets the factory cache but keeps the writer running, so the 
update
+        // lands in the same writer as the insert and uses a position delete.
+        icebergWriter.flush(false);
+        icebergWriter.write(DataChangeEvent.updateEvent(tableId, recordA, 
recordB), null);
+
+        Collection<WriteResultWrapper> writeResults = 
icebergWriter.prepareCommit();
+
+        // Both writes went through the same writer (batchIndex 0) since flush 
only
+        // cleared the factory. Position delete handles dedup within the 
snapshot.
+        List<Integer> batchIndexes =
+                writeResults.stream()
+                        .map(WriteResultWrapper::getBatchIndex)
+                        .sorted()
+                        .collect(Collectors.toList());
+        Assertions.assertThat(batchIndexes).containsExactly(0);
+
+        IcebergCommitter icebergCommitter = new 
IcebergCommitter(catalogOptions, new HashMap<>());
+        Collection<Committer.CommitRequest<WriteResultWrapper>> collection =
+                
writeResults.stream().map(MockCommitRequestImpl::new).collect(Collectors.toList());
+        icebergCommitter.commit(collection);
+
+        List<String> result = fetchTableContent(catalog, tableId, null);
+        Assertions.assertThat(result).containsExactly("1, b");
+    }
+
+    /**
+     * A schema change mid-checkpoint splits writes into two batches for the 
same table. Both
+     * batches must not land in the same Iceberg snapshot: files in one 
snapshot share the same
+     * seq_num, so the equality-delete from batch 1 would silently miss the 
insert from batch 0.
+     * Each batch gets its own snapshot so the delete's seq_num is strictly 
higher than the data it
+     * targets.
+     */
+    @Test
+    public void testNoDuplicateWhenSchemaChangeFlushSplitsSamePkUpdates() 
throws Exception {
+        Map<String, String> catalogOptions = new HashMap<>();
+        String warehouse =
+                new File(temporaryFolder.toFile(), 
UUID.randomUUID().toString()).toString();
+        catalogOptions.put("type", "hadoop");
+        catalogOptions.put("warehouse", warehouse);
+        catalogOptions.put("cache-enabled", "false");
+        Catalog catalog =
+                CatalogUtil.buildIcebergCatalog(
+                        "cdc-iceberg-catalog", catalogOptions, new 
Configuration());
+
+        String jobId = UUID.randomUUID().toString();
+        String operatorId = UUID.randomUUID().toString();
+        IcebergWriter icebergWriter =
+                new IcebergWriter(
+                        catalogOptions,
+                        1,
+                        1,
+                        ZoneId.systemDefault(),
+                        0,
+                        jobId,
+                        operatorId,
+                        new HashMap<>());
+        IcebergMetadataApplier icebergMetadataApplier = new 
IcebergMetadataApplier(catalogOptions);
+
+        TableId tableId = TableId.parse("test.iceberg_table");
+        Schema initialSchema =
+                Schema.newBuilder()
+                        .physicalColumn("id", DataTypes.BIGINT().notNull())
+                        .physicalColumn("name", DataTypes.VARCHAR(100))
+                        .primaryKey("id")
+                        .build();
+        CreateTableEvent createTableEvent = new CreateTableEvent(tableId, 
initialSchema);
+        icebergMetadataApplier.applySchemaChange(createTableEvent);
+        icebergWriter.write(createTableEvent, null);
+
+        BinaryRecordDataGenerator oldGenerator =
+                new BinaryRecordDataGenerator(
+                        initialSchema.getColumnDataTypes().toArray(new 
DataType[0]));
+
+        // Insert (id=1, name="a") — goes into writer batch 0.
+        RecordData recordA =
+                oldGenerator.generate(new Object[] {1L, 
BinaryStringData.fromString("a")});
+        icebergWriter.write(DataChangeEvent.insertEvent(tableId, recordA), 
null);
+
+        // Schema change: AddColumn triggers flushTableWriter, completing 
batch 0.
+        AddColumnEvent addColumnEvent =
+                new AddColumnEvent(
+                        tableId,
+                        Arrays.asList(
+                                AddColumnEvent.last(
+                                        new PhysicalColumn(
+                                                "extra", DataTypes.STRING(), 
null, null))));
+        icebergMetadataApplier.applySchemaChange(addColumnEvent);
+        icebergWriter.write(addColumnEvent, null);
+
+        // Update (id=1) with the new schema — goes into writer batch 1.
+        Schema newSchema = SchemaUtils.applySchemaChangeEvent(initialSchema, 
addColumnEvent);
+        BinaryRecordDataGenerator newGenerator =
+                new BinaryRecordDataGenerator(
+                        newSchema.getColumnDataTypes().toArray(new 
DataType[0]));
+        RecordData recordANew =
+                newGenerator.generate(new Object[] {1L, 
BinaryStringData.fromString("a"), null});
+        RecordData recordB =
+                newGenerator.generate(new Object[] {1L, 
BinaryStringData.fromString("b"), null});
+        icebergWriter.write(DataChangeEvent.updateEvent(tableId, recordANew, 
recordB), null);
+
+        Collection<WriteResultWrapper> writeResults = 
icebergWriter.prepareCommit();
+        IcebergCommitter icebergCommitter = new 
IcebergCommitter(catalogOptions, new HashMap<>());
+        Collection<Committer.CommitRequest<WriteResultWrapper>> collection =
+                
writeResults.stream().map(MockCommitRequestImpl::new).collect(Collectors.toList());
+        icebergCommitter.commit(collection);
+
+        // Expect only (1, b, null) — batch 0's stale (1, a, null) must be 
deleted.
+        List<String> result = fetchTableContent(catalog, tableId, null);
+        Assertions.assertThat(result).containsExactly("1, b, null");
+    }
+
+    /**
+     * Verifies idempotency when the committer crashes after committing batch 
0 but before
+     * committing batch 1 of the same checkpoint.
+     *
+     * <p>On retry, Flink re-delivers the full collection of committables for 
that checkpoint. The
+     * committer must detect that batch 0's snapshot is already present in the 
table history (via
+     * {@code flink-cdc.batch-index} and {@code flink-cdc.checkpoint-id} 
snapshot properties) and
+     * skip it, then commit only batch 1. Without this skip, batch 0's files 
would be added a second
+     * time, causing duplicate records.
+     */
+    @Test
+    public void testRetryAfterPartialBatchCommit() throws Exception {
+        Map<String, String> catalogOptions = new HashMap<>();
+        String warehouse =
+                new File(temporaryFolder.toFile(), 
UUID.randomUUID().toString()).toString();
+        catalogOptions.put("type", "hadoop");
+        catalogOptions.put("warehouse", warehouse);
+        catalogOptions.put("cache-enabled", "false");
+        Catalog catalog =
+                CatalogUtil.buildIcebergCatalog(
+                        "cdc-iceberg-catalog", catalogOptions, new 
Configuration());
+
+        String jobId = UUID.randomUUID().toString();
+        String operatorId = UUID.randomUUID().toString();
+        IcebergWriter icebergWriter =
+                new IcebergWriter(
+                        catalogOptions,
+                        1,
+                        1,
+                        ZoneId.systemDefault(),
+                        0,
+                        jobId,
+                        operatorId,
+                        new HashMap<>());
+        IcebergMetadataApplier icebergMetadataApplier = new 
IcebergMetadataApplier(catalogOptions);
+
+        TableId tableId = TableId.parse("test.iceberg_table");
+        Schema initialSchema =
+                Schema.newBuilder()
+                        .physicalColumn("id", DataTypes.BIGINT().notNull())
+                        .physicalColumn("name", DataTypes.VARCHAR(100))
+                        .primaryKey("id")
+                        .build();
+        CreateTableEvent createTableEvent = new CreateTableEvent(tableId, 
initialSchema);
+        icebergMetadataApplier.applySchemaChange(createTableEvent);
+        icebergWriter.write(createTableEvent, null);
+
+        BinaryRecordDataGenerator oldGenerator =
+                new BinaryRecordDataGenerator(
+                        initialSchema.getColumnDataTypes().toArray(new 
DataType[0]));
+        RecordData recordA =
+                oldGenerator.generate(new Object[] {1L, 
BinaryStringData.fromString("a")});
+        icebergWriter.write(DataChangeEvent.insertEvent(tableId, recordA), 
null);
+
+        AddColumnEvent addColumnEvent =
+                new AddColumnEvent(
+                        tableId,
+                        Arrays.asList(
+                                AddColumnEvent.last(
+                                        new PhysicalColumn(
+                                                "extra", DataTypes.STRING(), 
null, null))));
+        icebergMetadataApplier.applySchemaChange(addColumnEvent);
+        icebergWriter.write(addColumnEvent, null);
+
+        Schema newSchema = SchemaUtils.applySchemaChangeEvent(initialSchema, 
addColumnEvent);
+        BinaryRecordDataGenerator newGenerator =
+                new BinaryRecordDataGenerator(
+                        newSchema.getColumnDataTypes().toArray(new 
DataType[0]));
+        RecordData recordANew =
+                newGenerator.generate(new Object[] {1L, 
BinaryStringData.fromString("a"), null});
+        RecordData recordB =
+                newGenerator.generate(new Object[] {1L, 
BinaryStringData.fromString("b"), null});
+        icebergWriter.write(DataChangeEvent.updateEvent(tableId, recordANew, 
recordB), null);
+
+        Collection<WriteResultWrapper> writeResults = 
icebergWriter.prepareCommit();
+        List<WriteResultWrapper> sortedBatches =
+                writeResults.stream()
+                        
.sorted(Comparator.comparingInt(WriteResultWrapper::getBatchIndex))
+                        .collect(Collectors.toList());
+        Assertions.assertThat(sortedBatches).hasSize(2);
+        
Assertions.assertThat(sortedBatches.get(0).getBatchIndex()).isEqualTo(0);
+        
Assertions.assertThat(sortedBatches.get(1).getBatchIndex()).isEqualTo(1);
+
+        // Simulate a partial commit: manually commit only batch 0 using the 
Iceberg API,
+        // setting the intermediate batch properties but NOT 
MAX_COMMITTED_CHECKPOINT_ID.
+        // This replicates the on-disk state left behind when the committer 
crashes mid-checkpoint.
+        long checkpointId = sortedBatches.get(0).getCheckpointId();
+        Table table =
+                catalog.loadTable(
+                        TableIdentifier.of(tableId.getSchemaName(), 
tableId.getTableName()));
+        RowDelta partialDelta = table.newRowDelta();
+        WriteResultWrapper batch0 = sortedBatches.get(0);
+        if (batch0.getWriteResult().dataFiles() != null) {
+            for (DataFile f : batch0.getWriteResult().dataFiles()) {
+                partialDelta.addRows(f);
+            }
+        }
+        if (batch0.getWriteResult().deleteFiles() != null) {
+            for (DeleteFile f : batch0.getWriteResult().deleteFiles()) {
+                partialDelta.addDeletes(f);
+            }
+        }
+        partialDelta.set(SinkUtil.FLINK_JOB_ID, jobId);
+        partialDelta.set(SinkUtil.OPERATOR_ID, operatorId);
+        partialDelta.set(IcebergCommitter.FLINK_BATCH_INDEX, "0");
+        partialDelta.set(IcebergCommitter.FLINK_CHECKPOINT_ID_PROP, 
String.valueOf(checkpointId));
+        // Intentionally omitting MAX_COMMITTED_CHECKPOINT_ID — this is an 
incomplete checkpoint.
+        partialDelta.commit();
+
+        // Retry: Flink re-delivers all committables for the checkpoint.
+        IcebergCommitter icebergCommitter = new 
IcebergCommitter(catalogOptions, new HashMap<>());
+        Collection<Committer.CommitRequest<WriteResultWrapper>> collection =
+                
writeResults.stream().map(MockCommitRequestImpl::new).collect(Collectors.toList());
+        icebergCommitter.commit(collection);
+
+        // Batch 0 must be skipped (its snapshot is already present); only 
batch 1 is committed.
+        // Batch 1's eqDelete (higher sequence number) supersedes batch 0's 
data file.
+        List<String> result = fetchTableContent(catalog, tableId, null);
+        Assertions.assertThat(result).containsExactly("1, b, null");
+
+        // Verify the final snapshot carries MAX_COMMITTED_CHECKPOINT_ID.
+        Map<String, String> finalSummary =
+                catalog.loadTable(
+                                TableIdentifier.of(tableId.getSchemaName(), 
tableId.getTableName()))
+                        .currentSnapshot()
+                        .summary();
+        
Assertions.assertThat(finalSummary.get(SinkUtil.MAX_COMMITTED_CHECKPOINT_ID))
+                .isEqualTo(String.valueOf(checkpointId));
+    }
+
+    /**
+     * Verifies that two schema-change events within a single checkpoint 
(producing three batches)
+     * do not cause duplicate records for the same primary key.
+     *
+     * <p>Timeline: INSERT(id=1,"a") → AddColumn1 flush → UPDATE(id=1,"b") → 
AddColumn2 flush →
+     * UPDATE(id=1,"c") → commit. The three batches are committed as three 
sequential Iceberg
+     * snapshots (seq=N, N+1, N+2), so each batch's equality-delete supersedes 
all earlier data.
+     */
+    @Test
+    public void testNoDuplicateWithMultipleSchemaChangesInOneCheckpoint() 
throws Exception {
+        Map<String, String> catalogOptions = new HashMap<>();
+        String warehouse =
+                new File(temporaryFolder.toFile(), 
UUID.randomUUID().toString()).toString();
+        catalogOptions.put("type", "hadoop");
+        catalogOptions.put("warehouse", warehouse);
+        catalogOptions.put("cache-enabled", "false");
+        Catalog catalog =
+                CatalogUtil.buildIcebergCatalog(
+                        "cdc-iceberg-catalog", catalogOptions, new 
Configuration());
+
+        String jobId = UUID.randomUUID().toString();
+        String operatorId = UUID.randomUUID().toString();
+        IcebergWriter icebergWriter =
+                new IcebergWriter(
+                        catalogOptions,
+                        1,
+                        1,
+                        ZoneId.systemDefault(),
+                        0,
+                        jobId,
+                        operatorId,
+                        new HashMap<>());
+        IcebergMetadataApplier icebergMetadataApplier = new 
IcebergMetadataApplier(catalogOptions);
+
+        TableId tableId = TableId.parse("test.iceberg_table");
+        Schema schema0 =
+                Schema.newBuilder()
+                        .physicalColumn("id", DataTypes.BIGINT().notNull())
+                        .physicalColumn("name", DataTypes.VARCHAR(100))
+                        .primaryKey("id")
+                        .build();
+        CreateTableEvent createTableEvent = new CreateTableEvent(tableId, 
schema0);
+        icebergMetadataApplier.applySchemaChange(createTableEvent);
+        icebergWriter.write(createTableEvent, null);
+
+        // Batch 0: INSERT(id=1,"a")
+        BinaryRecordDataGenerator gen0 =
+                new BinaryRecordDataGenerator(
+                        schema0.getColumnDataTypes().toArray(new DataType[0]));
+        RecordData r0a = gen0.generate(new Object[] {1L, 
BinaryStringData.fromString("a")});
+        icebergWriter.write(DataChangeEvent.insertEvent(tableId, r0a), null);
+
+        // First schema change → flushTableWriter → batch 0 complete, batch 
index starts at 1.
+        AddColumnEvent addCol1 =
+                new AddColumnEvent(
+                        tableId,
+                        Arrays.asList(
+                                AddColumnEvent.last(
+                                        new PhysicalColumn(
+                                                "extra1", DataTypes.STRING(), 
null, null))));
+        icebergMetadataApplier.applySchemaChange(addCol1);
+        icebergWriter.write(addCol1, null);
+
+        // Batch 1: UPDATE(id=1,"a"→"b") with schema {id, name, extra1}
+        Schema schema1 = SchemaUtils.applySchemaChangeEvent(schema0, addCol1);
+        BinaryRecordDataGenerator gen1 =
+                new BinaryRecordDataGenerator(
+                        schema1.getColumnDataTypes().toArray(new DataType[0]));
+        RecordData r1a = gen1.generate(new Object[] {1L, 
BinaryStringData.fromString("a"), null});
+        RecordData r1b = gen1.generate(new Object[] {1L, 
BinaryStringData.fromString("b"), null});
+        icebergWriter.write(DataChangeEvent.updateEvent(tableId, r1a, r1b), 
null);
+
+        // Second schema change → flushTableWriter → batch 1 complete, batch 
index now at 2.
+        AddColumnEvent addCol2 =
+                new AddColumnEvent(
+                        tableId,
+                        Arrays.asList(
+                                AddColumnEvent.last(
+                                        new PhysicalColumn(
+                                                "extra2", DataTypes.STRING(), 
null, null))));
+        icebergMetadataApplier.applySchemaChange(addCol2);
+        icebergWriter.write(addCol2, null);
+
+        // Batch 2: UPDATE(id=1,"b"→"c") with schema {id, name, extra1, extra2}
+        Schema schema2 = SchemaUtils.applySchemaChangeEvent(schema1, addCol2);
+        BinaryRecordDataGenerator gen2 =
+                new BinaryRecordDataGenerator(
+                        schema2.getColumnDataTypes().toArray(new DataType[0]));
+        RecordData r2b =
+                gen2.generate(new Object[] {1L, 
BinaryStringData.fromString("b"), null, null});
+        RecordData r2c =
+                gen2.generate(new Object[] {1L, 
BinaryStringData.fromString("c"), null, null});
+        icebergWriter.write(DataChangeEvent.updateEvent(tableId, r2b, r2c), 
null);
+
+        // Verify three batches with indices 0, 1, 2 were produced.
+        Collection<WriteResultWrapper> writeResults = 
icebergWriter.prepareCommit();
+        List<WriteResultWrapper> sortedBatches =
+                writeResults.stream()
+                        
.sorted(Comparator.comparingInt(WriteResultWrapper::getBatchIndex))
+                        .collect(Collectors.toList());
+        Assertions.assertThat(sortedBatches).hasSize(3);
+        
Assertions.assertThat(sortedBatches.get(0).getBatchIndex()).isEqualTo(0);
+        
Assertions.assertThat(sortedBatches.get(1).getBatchIndex()).isEqualTo(1);
+        
Assertions.assertThat(sortedBatches.get(2).getBatchIndex()).isEqualTo(2);
+
+        IcebergCommitter icebergCommitter = new 
IcebergCommitter(catalogOptions, new HashMap<>());
+        Collection<Committer.CommitRequest<WriteResultWrapper>> collection =
+                
writeResults.stream().map(MockCommitRequestImpl::new).collect(Collectors.toList());
+        icebergCommitter.commit(collection);
+
+        // Only the final value (id=1,"c") should survive; stale "a" and "b" 
must be deleted.
+        List<String> result = fetchTableContent(catalog, tableId, null);
+        Assertions.assertThat(result).containsExactly("1, c, null, null");
+    }
+
+    /**
+     * Verifies that a schema-change flush on tableA does not affect tableB. 
TableB has no schema
+     * change and commits as a single batch, while tableA's two batches are 
committed sequentially.
+     * Both tables must contain exactly the correct final records.
+     */
+    @Test
+    public void testSchemaChangeFlushDoesNotAffectOtherTable() throws 
Exception {
+        Map<String, String> catalogOptions = new HashMap<>();
+        String warehouse =
+                new File(temporaryFolder.toFile(), 
UUID.randomUUID().toString()).toString();
+        catalogOptions.put("type", "hadoop");
+        catalogOptions.put("warehouse", warehouse);
+        catalogOptions.put("cache-enabled", "false");
+        Catalog catalog =
+                CatalogUtil.buildIcebergCatalog(
+                        "cdc-iceberg-catalog", catalogOptions, new 
Configuration());
+
+        String jobId = UUID.randomUUID().toString();
+        String operatorId = UUID.randomUUID().toString();
+        IcebergWriter icebergWriter =
+                new IcebergWriter(
+                        catalogOptions,
+                        1,
+                        1,
+                        ZoneId.systemDefault(),
+                        0,
+                        jobId,
+                        operatorId,
+                        new HashMap<>());
+        IcebergMetadataApplier icebergMetadataApplier = new 
IcebergMetadataApplier(catalogOptions);
+
+        TableId tableA = TableId.parse("test.table_a");
+        TableId tableB = TableId.parse("test.table_b");
+
+        Schema schemaA =
+                Schema.newBuilder()
+                        .physicalColumn("id", DataTypes.BIGINT().notNull())
+                        .physicalColumn("name", DataTypes.VARCHAR(100))
+                        .primaryKey("id")
+                        .build();
+        Schema schemaB =
+                Schema.newBuilder()
+                        .physicalColumn("id", DataTypes.BIGINT().notNull())
+                        .physicalColumn("value", DataTypes.VARCHAR(100))
+                        .primaryKey("id")
+                        .build();
+
+        icebergMetadataApplier.applySchemaChange(new CreateTableEvent(tableA, 
schemaA));
+        icebergMetadataApplier.applySchemaChange(new CreateTableEvent(tableB, 
schemaB));
+        icebergWriter.write(new CreateTableEvent(tableA, schemaA), null);
+        icebergWriter.write(new CreateTableEvent(tableB, schemaB), null);
+
+        BinaryRecordDataGenerator genA =
+                new BinaryRecordDataGenerator(
+                        schemaA.getColumnDataTypes().toArray(new DataType[0]));
+        BinaryRecordDataGenerator genB =
+                new BinaryRecordDataGenerator(
+                        schemaB.getColumnDataTypes().toArray(new DataType[0]));
+
+        // TableA: INSERT(id=1,"a") → schema change flush → UPDATE(id=1,"b") 
[2 batches]
+        RecordData rAa = genA.generate(new Object[] {1L, 
BinaryStringData.fromString("a")});
+        icebergWriter.write(DataChangeEvent.insertEvent(tableA, rAa), null);
+
+        AddColumnEvent addColA =
+                new AddColumnEvent(
+                        tableA,
+                        Arrays.asList(
+                                AddColumnEvent.last(
+                                        new PhysicalColumn(
+                                                "extra", DataTypes.STRING(), 
null, null))));
+        icebergMetadataApplier.applySchemaChange(addColA);
+        icebergWriter.write(addColA, null);
+
+        Schema schemaANew = SchemaUtils.applySchemaChangeEvent(schemaA, 
addColA);
+        BinaryRecordDataGenerator genANew =
+                new BinaryRecordDataGenerator(
+                        schemaANew.getColumnDataTypes().toArray(new 
DataType[0]));
+        RecordData rAaNew =
+                genANew.generate(new Object[] {1L, 
BinaryStringData.fromString("a"), null});
+        RecordData rAb =
+                genANew.generate(new Object[] {1L, 
BinaryStringData.fromString("b"), null});
+        icebergWriter.write(DataChangeEvent.updateEvent(tableA, rAaNew, rAb), 
null);
+
+        // TableB: INSERT(id=2,"x") → UPDATE(id=2,"y") in the same checkpoint, 
no schema change.
+        // Both events land in tableB's single writer (no flush), so dedup is 
handled internally.
+        RecordData rBx = genB.generate(new Object[] {2L, 
BinaryStringData.fromString("x")});
+        RecordData rBy = genB.generate(new Object[] {2L, 
BinaryStringData.fromString("y")});
+        icebergWriter.write(DataChangeEvent.insertEvent(tableB, rBx), null);
+        icebergWriter.write(DataChangeEvent.updateEvent(tableB, rBx, rBy), 
null);
+
+        Collection<WriteResultWrapper> writeResults = 
icebergWriter.prepareCommit();
+
+        // TableA should produce 2 batches; tableB should produce 1 batch.
+        Map<TableId, Long> batchCountByTable =
+                writeResults.stream()
+                        .collect(
+                                Collectors.groupingBy(
+                                        WriteResultWrapper::getTableId, 
Collectors.counting()));
+        Assertions.assertThat(batchCountByTable.get(tableA)).isEqualTo(2);
+        Assertions.assertThat(batchCountByTable.get(tableB)).isEqualTo(1);
+
+        IcebergCommitter icebergCommitter = new 
IcebergCommitter(catalogOptions, new HashMap<>());
+        Collection<Committer.CommitRequest<WriteResultWrapper>> collection =
+                
writeResults.stream().map(MockCommitRequestImpl::new).collect(Collectors.toList());
+        icebergCommitter.commit(collection);
+
+        // TableA: only final value survives (stale "a" deleted by batch 1's 
eqDelete).
+        List<String> resultA = fetchTableContent(catalog, tableA, null);
+        Assertions.assertThat(resultA).containsExactly("1, b, null");
+
+        // TableB: only final value survives (internal position-delete handles 
dedup within writer).
+        List<String> resultB = fetchTableContent(catalog, tableB, null);
+        Assertions.assertThat(resultB).containsExactly("2, y");
+    }
+
+    /**
+     * Verifies that batchIndex stays in sync across subtasks even when a 
subtask has no writer for
+     * the table at schema-change flush time (parallelism > 1 scenario).
+     */
+    @Test
+    public void testBatchIndexInSyncWhenSubtaskHasNoWriterAtSchemaChange() 
throws Exception {
+        Map<String, String> catalogOptions = new HashMap<>();
+        catalogOptions.put("type", "hadoop");
+        catalogOptions.put(
+                "warehouse",
+                new File(temporaryFolder.toFile(), 
UUID.randomUUID().toString()).toString());
+        catalogOptions.put("cache-enabled", "false");
+        IcebergMetadataApplier icebergMetadataApplier = new 
IcebergMetadataApplier(catalogOptions);
+
+        String jobId = UUID.randomUUID().toString();
+        String operatorId = UUID.randomUUID().toString();
+
+        // Two subtask writers sharing the same catalog and table.
+        IcebergWriter writer0 =
+                new IcebergWriter(
+                        catalogOptions,
+                        0,
+                        1,
+                        ZoneId.systemDefault(),
+                        0,
+                        jobId,
+                        operatorId,
+                        new HashMap<>());
+        IcebergWriter writer1 =
+                new IcebergWriter(
+                        catalogOptions,
+                        1,
+                        1,
+                        ZoneId.systemDefault(),
+                        0,
+                        jobId,
+                        operatorId,
+                        new HashMap<>());
+
+        TableId tableId = TableId.parse("test.iceberg_table");
+        Schema schema =
+                Schema.newBuilder()
+                        .physicalColumn("id", DataTypes.BIGINT().notNull())
+                        .physicalColumn("name", DataTypes.VARCHAR(100))
+                        .primaryKey("id")
+                        .build();
+        CreateTableEvent createEvent = new CreateTableEvent(tableId, schema);
+        icebergMetadataApplier.applySchemaChange(createEvent);
+        writer0.write(createEvent, null);
+        writer1.write(createEvent, null);
+
+        BinaryRecordDataGenerator gen =
+                new 
BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]));
+
+        // Only subtask 0 has data before the schema change.
+        writer0.write(
+                DataChangeEvent.insertEvent(
+                        tableId, gen.generate(new Object[] {1L, 
BinaryStringData.fromString("a")})),
+                null);
+        // Subtask 1 has no writer for the table yet.
+
+        // Both subtasks receive the same SchemaChangeEvent (broadcast).
+        AddColumnEvent addColumn =
+                new AddColumnEvent(
+                        tableId,
+                        Arrays.asList(
+                                AddColumnEvent.last(
+                                        new PhysicalColumn(
+                                                "extra", DataTypes.STRING(), 
null, null))));
+        icebergMetadataApplier.applySchemaChange(addColumn);
+        writer0.write(addColumn, null); // has writer → flushes at 
batchIndex=0; counter → 1
+        writer1.write(addColumn, null); // no writer  → counter must still 
advance to 1
+
+        Schema newSchema = SchemaUtils.applySchemaChangeEvent(schema, 
addColumn);
+        BinaryRecordDataGenerator newGen =
+                new BinaryRecordDataGenerator(
+                        newSchema.getColumnDataTypes().toArray(new 
DataType[0]));
+
+        // Subtask 1 writes data after the schema change.
+        writer1.write(
+                DataChangeEvent.insertEvent(
+                        tableId,
+                        newGen.generate(new Object[] {2L, 
BinaryStringData.fromString("b"), null})),
+                null);
+
+        Collection<WriteResultWrapper> results0 = writer0.prepareCommit();
+        Collection<WriteResultWrapper> results1 = writer1.prepareCommit();
+
+        // subtask 0: one batch at batchIndex=0 (pre-schema-change flush)
+        Assertions.assertThat(results0).hasSize(1);
+        
Assertions.assertThat(results0.iterator().next().getBatchIndex()).isEqualTo(0);
+
+        // subtask 1: must be at batchIndex=1, not 0 — counter advanced at E1 
even without a writer
+        Assertions.assertThat(results1).hasSize(1);
+        
Assertions.assertThat(results1.iterator().next().getBatchIndex()).isEqualTo(1);
+    }
+
+    /**
+     * Verifies no duplicates when parallel subtasks share a table and one 
subtask has no data
+     * before the schema-change flush while the other has an UPDATE that 
produces an
+     * equality-delete.
+     */
+    @Test
+    public void 
testNoDuplicateWithParallelSubtasksMissingPreSchemaChangeData() throws 
Exception {
+        Map<String, String> catalogOptions = new HashMap<>();
+        String warehouse =
+                new File(temporaryFolder.toFile(), 
UUID.randomUUID().toString()).toString();
+        catalogOptions.put("type", "hadoop");
+        catalogOptions.put("warehouse", warehouse);
+        catalogOptions.put("cache-enabled", "false");
+        Catalog catalog =
+                CatalogUtil.buildIcebergCatalog(
+                        "cdc-iceberg-catalog", catalogOptions, new 
Configuration());
+        IcebergMetadataApplier icebergMetadataApplier = new 
IcebergMetadataApplier(catalogOptions);
+
+        String jobId = UUID.randomUUID().toString();
+        String operatorId = UUID.randomUUID().toString();
+
+        IcebergWriter writer0 =
+                new IcebergWriter(
+                        catalogOptions,
+                        0,
+                        1,
+                        ZoneId.systemDefault(),
+                        0,
+                        jobId,
+                        operatorId,
+                        new HashMap<>());
+        IcebergWriter writer1 =
+                new IcebergWriter(
+                        catalogOptions,
+                        1,
+                        1,
+                        ZoneId.systemDefault(),
+                        0,
+                        jobId,
+                        operatorId,
+                        new HashMap<>());
+
+        TableId tableId = TableId.parse("test.iceberg_table");
+        Schema initialSchema =
+                Schema.newBuilder()
+                        .physicalColumn("id", DataTypes.BIGINT().notNull())
+                        .physicalColumn("name", DataTypes.VARCHAR(100))
+                        .primaryKey("id")
+                        .build();
+        CreateTableEvent createEvent = new CreateTableEvent(tableId, 
initialSchema);
+        icebergMetadataApplier.applySchemaChange(createEvent);
+        writer0.write(createEvent, null);
+        writer1.write(createEvent, null);
+
+        BinaryRecordDataGenerator oldGen =
+                new BinaryRecordDataGenerator(
+                        initialSchema.getColumnDataTypes().toArray(new 
DataType[0]));
+
+        // Subtask 1 writes the "old" row before the schema change.
+        writer1.write(
+                DataChangeEvent.insertEvent(
+                        tableId,
+                        oldGen.generate(new Object[] {1L, 
BinaryStringData.fromString("old")})),
+                null);
+        // Subtask 0 has no data for the table yet.
+
+        // Schema-change E1 is broadcast to both subtasks.
+        AddColumnEvent addColumn =
+                new AddColumnEvent(
+                        tableId,
+                        Arrays.asList(
+                                AddColumnEvent.last(
+                                        new PhysicalColumn(
+                                                "extra", DataTypes.STRING(), 
null, null))));
+        icebergMetadataApplier.applySchemaChange(addColumn);
+        writer0.write(addColumn, null); // no writer  → batchIndex must still 
advance to 1 (fix)
+        writer1.write(addColumn, null); // has writer → flushes "old" at 
batchIndex=0; counter → 1
+
+        // Subtask 0 processes the UPDATE after E1, using the new schema.
+        Schema newSchema = SchemaUtils.applySchemaChangeEvent(initialSchema, 
addColumn);
+        BinaryRecordDataGenerator newGen =
+                new BinaryRecordDataGenerator(
+                        newSchema.getColumnDataTypes().toArray(new 
DataType[0]));
+        RecordData before =
+                newGen.generate(new Object[] {1L, 
BinaryStringData.fromString("old"), null});
+        RecordData after =
+                newGen.generate(new Object[] {1L, 
BinaryStringData.fromString("new"), null});
+        writer0.write(DataChangeEvent.updateEvent(tableId, before, after), 
null);
+
+        // Collect and commit all results from both subtasks.
+        List<WriteResultWrapper> allResults = new ArrayList<>();
+        allResults.addAll(writer0.prepareCommit());
+        allResults.addAll(writer1.prepareCommit());
+
+        IcebergCommitter committer = new IcebergCommitter(catalogOptions, new 
HashMap<>());
+        committer.commit(
+                
allResults.stream().map(MockCommitRequestImpl::new).collect(Collectors.toList()));
+
+        // Only the updated value must survive; "old" must be deleted by the 
equality-delete in
+        // batch 1 (higher sequence number). Without the fix both rows appear.
+        List<String> result = fetchTableContent(catalog, tableId, null);
+        Assertions.assertThat(result).containsExactly("1, new, null");
+    }
+
+    /**
+     * Verifies that wrappers from two subtasks sharing the same batchIndex 
are merged into exactly
+     * one Iceberg snapshot, not two. This directly tests the committer-side 
grouping fix.
+     */
+    @Test
+    public void testSameBatchIndexFromTwoSubtasksMergedIntoOneSnapshot() 
throws Exception {
+        Map<String, String> catalogOptions = new HashMap<>();
+        catalogOptions.put("type", "hadoop");
+        catalogOptions.put(
+                "warehouse",
+                new File(temporaryFolder.toFile(), 
UUID.randomUUID().toString()).toString());
+        catalogOptions.put("cache-enabled", "false");
+        Catalog catalog =
+                CatalogUtil.buildIcebergCatalog(
+                        "cdc-iceberg-catalog", catalogOptions, new 
Configuration());
+        IcebergMetadataApplier icebergMetadataApplier = new 
IcebergMetadataApplier(catalogOptions);
+
+        String jobId = UUID.randomUUID().toString();
+        String operatorId = UUID.randomUUID().toString();
+
+        IcebergWriter writer0 =
+                new IcebergWriter(
+                        catalogOptions,
+                        0,
+                        1,
+                        ZoneId.systemDefault(),
+                        0,
+                        jobId,
+                        operatorId,
+                        new HashMap<>());
+        IcebergWriter writer1 =
+                new IcebergWriter(
+                        catalogOptions,
+                        1,
+                        1,
+                        ZoneId.systemDefault(),
+                        0,
+                        jobId,
+                        operatorId,
+                        new HashMap<>());
+
+        TableId tableId = TableId.parse("test.iceberg_table");
+        Schema schema =
+                Schema.newBuilder()
+                        .physicalColumn("id", DataTypes.BIGINT().notNull())
+                        .physicalColumn("name", DataTypes.VARCHAR(100))
+                        .primaryKey("id")
+                        .build();
+        CreateTableEvent createEvent = new CreateTableEvent(tableId, schema);
+        icebergMetadataApplier.applySchemaChange(createEvent);
+        writer0.write(createEvent, null);
+        writer1.write(createEvent, null);
+
+        BinaryRecordDataGenerator gen =
+                new 
BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]));
+
+        // Both subtasks write data with no schema change, so both produce 
batchIndex=0.
+        writer0.write(
+                DataChangeEvent.insertEvent(
+                        tableId, gen.generate(new Object[] {1L, 
BinaryStringData.fromString("a")})),
+                null);
+        writer1.write(
+                DataChangeEvent.insertEvent(
+                        tableId, gen.generate(new Object[] {2L, 
BinaryStringData.fromString("b")})),
+                null);
+
+        List<WriteResultWrapper> allResults = new ArrayList<>();
+        allResults.addAll(writer0.prepareCommit());
+        allResults.addAll(writer1.prepareCommit());
+
+        // Both wrappers carry batchIndex=0.
+        Assertions.assertThat(allResults).hasSize(2);
+        Assertions.assertThat(
+                        allResults.stream()
+                                .mapToInt(WriteResultWrapper::getBatchIndex)
+                                .distinct()
+                                .count())
+                .isEqualTo(1);
+
+        Table table =
+                catalog.loadTable(
+                        TableIdentifier.of(tableId.getSchemaName(), 
tableId.getTableName()));
+        long snapshotsBefore = countSnapshots(table);
+
+        IcebergCommitter committer = new IcebergCommitter(catalogOptions, new 
HashMap<>());
+        committer.commit(
+                
allResults.stream().map(MockCommitRequestImpl::new).collect(Collectors.toList()));
+
+        table.refresh();
+        long snapshotsAfter = countSnapshots(table);
+
+        // Two wrappers with the same batchIndex must produce exactly ONE new 
snapshot, not two.
+        Assertions.assertThat(snapshotsAfter - snapshotsBefore).isEqualTo(1);
+
+        List<String> result = fetchTableContent(catalog, tableId, null);
+        Assertions.assertThat(result).containsExactlyInAnyOrder("1, a", "2, 
b");
+    }
+
+    /**
+     * Verifies no duplicates in the most complex parallel scenario: subtask 0 
has data only before
+     * SC1, subtask 1 has data only between SC1 and SC2, and both have updates 
after SC2. This
+     * exercises all three batchIndex slots across two subtasks simultaneously 
and confirms that
+     * equality-deletes in batch 2 correctly suppress stale data from batches 
0 and 1.
+     */
+    @Test
+    public void 
testNoDuplicateWithMixedDataAcrossSubtasksAndMultipleSchemaChanges()
+            throws Exception {
+        Map<String, String> catalogOptions = new HashMap<>();
+        catalogOptions.put("type", "hadoop");
+        catalogOptions.put(
+                "warehouse",
+                new File(temporaryFolder.toFile(), 
UUID.randomUUID().toString()).toString());
+        catalogOptions.put("cache-enabled", "false");
+        Catalog catalog =
+                CatalogUtil.buildIcebergCatalog(
+                        "cdc-iceberg-catalog", catalogOptions, new 
Configuration());
+        IcebergMetadataApplier icebergMetadataApplier = new 
IcebergMetadataApplier(catalogOptions);
+
+        String jobId = UUID.randomUUID().toString();
+        String operatorId = UUID.randomUUID().toString();
+
+        IcebergWriter writer0 =
+                new IcebergWriter(
+                        catalogOptions,
+                        0,
+                        1,
+                        ZoneId.systemDefault(),
+                        0,
+                        jobId,
+                        operatorId,
+                        new HashMap<>());
+        IcebergWriter writer1 =
+                new IcebergWriter(
+                        catalogOptions,
+                        1,
+                        1,
+                        ZoneId.systemDefault(),
+                        0,
+                        jobId,
+                        operatorId,
+                        new HashMap<>());
+
+        TableId tableId = TableId.parse("test.iceberg_table");
+        Schema schema0 =
+                Schema.newBuilder()
+                        .physicalColumn("id", DataTypes.BIGINT().notNull())
+                        .physicalColumn("name", DataTypes.VARCHAR(100))
+                        .primaryKey("id")
+                        .build();
+        CreateTableEvent createEvent = new CreateTableEvent(tableId, schema0);
+        icebergMetadataApplier.applySchemaChange(createEvent);
+        writer0.write(createEvent, null);
+        writer1.write(createEvent, null);
+
+        BinaryRecordDataGenerator gen0 =
+                new BinaryRecordDataGenerator(
+                        schema0.getColumnDataTypes().toArray(new DataType[0]));
+
+        // Batch 0: only subtask 0 has data before SC1.
+        writer0.write(
+                DataChangeEvent.insertEvent(
+                        tableId,
+                        gen0.generate(new Object[] {1L, 
BinaryStringData.fromString("a")})),
+                null);
+        // Subtask 1 has no data before SC1.
+
+        // SC1 broadcast to both subtasks.
+        AddColumnEvent sc1 =
+                new AddColumnEvent(
+                        tableId,
+                        Arrays.asList(
+                                AddColumnEvent.last(
+                                        new PhysicalColumn(
+                                                "extra1", DataTypes.STRING(), 
null, null))));
+        icebergMetadataApplier.applySchemaChange(sc1);
+        writer0.write(sc1, null); // has writer → flush batchIndex=0; counter 
→ 1
+        writer1.write(sc1, null); // no writer  → counter must still advance 
to 1
+
+        Schema schema1 = SchemaUtils.applySchemaChangeEvent(schema0, sc1);
+        BinaryRecordDataGenerator gen1 =
+                new BinaryRecordDataGenerator(
+                        schema1.getColumnDataTypes().toArray(new DataType[0]));
+
+        // Batch 1: only subtask 1 has data between SC1 and SC2.
+        writer1.write(
+                DataChangeEvent.insertEvent(
+                        tableId,
+                        gen1.generate(new Object[] {2L, 
BinaryStringData.fromString("b"), null})),
+                null);
+        // Subtask 0 has no data between SC1 and SC2.
+
+        // SC2 broadcast to both subtasks.
+        AddColumnEvent sc2 =
+                new AddColumnEvent(
+                        tableId,
+                        Arrays.asList(
+                                AddColumnEvent.last(
+                                        new PhysicalColumn(
+                                                "extra2", DataTypes.STRING(), 
null, null))));
+        icebergMetadataApplier.applySchemaChange(sc2);
+        writer0.write(sc2, null); // no writer  → counter must still advance 
to 2
+        writer1.write(sc2, null); // has writer → flush batchIndex=1; counter 
→ 2
+
+        Schema schema2 = SchemaUtils.applySchemaChangeEvent(schema1, sc2);
+        BinaryRecordDataGenerator gen2 =
+                new BinaryRecordDataGenerator(
+                        schema2.getColumnDataTypes().toArray(new DataType[0]));
+
+        // Batch 2: both subtasks update their respective rows after SC2.
+        // Subtask 0 updates id=1 "a" → "c"; subtask 1 updates id=2 "b" → "d".
+        writer0.write(
+                DataChangeEvent.updateEvent(
+                        tableId,
+                        gen2.generate(
+                                new Object[] {1L, 
BinaryStringData.fromString("a"), null, null}),
+                        gen2.generate(
+                                new Object[] {1L, 
BinaryStringData.fromString("c"), null, null})),
+                null);
+        writer1.write(
+                DataChangeEvent.updateEvent(
+                        tableId,
+                        gen2.generate(
+                                new Object[] {2L, 
BinaryStringData.fromString("b"), null, null}),
+                        gen2.generate(
+                                new Object[] {2L, 
BinaryStringData.fromString("d"), null, null})),
+                null);
+
+        List<WriteResultWrapper> allResults = new ArrayList<>();
+        allResults.addAll(writer0.prepareCommit());
+        allResults.addAll(writer1.prepareCommit());
+
+        // Expect 3 batches: {0: sub0}, {1: sub1}, {2: sub0+sub1}
+        long distinctBatchIndices =
+                
allResults.stream().mapToInt(WriteResultWrapper::getBatchIndex).distinct().count();
+        Assertions.assertThat(distinctBatchIndices).isEqualTo(3);
+
+        IcebergCommitter committer = new IcebergCommitter(catalogOptions, new 
HashMap<>());
+        committer.commit(
+                
allResults.stream().map(MockCommitRequestImpl::new).collect(Collectors.toList()));
+
+        // Only the final values must survive. Equality-deletes in batch 2 
(seq N+2) must suppress
+        // the stale inserts in batch 0 (seq N) and batch 1 (seq N+1).
+        List<String> result = fetchTableContent(catalog, tableId, null);
+        Assertions.assertThat(result)
+                .containsExactlyInAnyOrder("1, c, null, null", "2, d, null, 
null");
+    }
+
+    private static long countSnapshots(Table table) {
+        long count = 0;
+        for (Snapshot ignored : table.snapshots()) {
+            count++;
+        }
+        return count;
+    }
+
     /** Mock CommitRequestImpl. */
     public static class MockCommitRequestImpl<CommT> extends 
CommitRequestImpl<CommT> {
 
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToIcebergE2eITCase.java
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToIcebergE2eITCase.java
index 8b4d7768d..df5654a82 100644
--- 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToIcebergE2eITCase.java
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToIcebergE2eITCase.java
@@ -213,10 +213,25 @@ public class MySqlToIcebergE2eITCase extends 
PipelineTestEnvironment {
             stat.execute("UPDATE products SET description='Fay' WHERE 
id=106;");
             stat.execute("UPDATE products SET weight='5.125' WHERE id=107;");
 
+            // Same row updated twice before the schema-change flush below.
+            stat.execute("UPDATE products SET description='Bob v1' WHERE 
id=102;");
+            stat.execute("UPDATE products SET description='Bob v2' WHERE 
id=102;");
+
             // modify table schema
             stat.execute("ALTER TABLE products DROP COLUMN point_c;");
             stat.execute("DELETE FROM products WHERE id=101;");
 
+            // And once more after the flush; the latest value should win, no 
duplicate.
+            stat.execute("UPDATE products SET weight='1.125' WHERE id=102;");
+            // Update then delete the same row; it should be gone.
+            stat.execute("UPDATE products SET description='Cecily v2' WHERE 
id=103;");
+            stat.execute("DELETE FROM products WHERE id=103;");
+            // Delete then re-insert the same id; the re-inserted row should 
survive.
+            stat.execute("DELETE FROM products WHERE id=104;");
+            stat.execute(
+                    "INSERT INTO products (id, name, description, weight, 
enum_c, json_c) "
+                            + "VALUES (104, 'Four', 'Reborn', 9.875, 'white', 
null);");
+
             stat.execute(
                     "INSERT INTO products VALUES 
(default,'Eleven','Kryo',5.18, null, null);"); // 111
             stat.execute(
@@ -229,9 +244,8 @@ public class MySqlToIcebergE2eITCase extends 
PipelineTestEnvironment {
         List<String> recordsInSnapshotPhase =
                 new ArrayList<>(
                         Arrays.asList(
-                                "102, Two, Bob, 1.703, white, {\"key2\": 
\"value2\"}, null, null, null, null, null, null, null, null, null, null",
-                                "103, Three, Cecily, 4.105, red, {\"key3\": 
\"value3\"}, null, null, null, null, null, null, null, null, null, null",
-                                "104, Four, Derrida, 1.857, white, {\"key4\": 
\"value4\"}, null, null, null, null, null, null, null, null, null, null",
+                                "102, Two, Bob v2, 1.125, white, 
{\"key2\":\"value2\"}, null, null, null, null, null, null, null, null, null, 
null",
+                                "104, Four, Reborn, 9.875, white, null, null, 
null, null, null, null, null, null, null, null, null",
                                 "105, Five, Evelyn, 5.211, red, {\"K\": \"V\", 
\"k\": \"v\"}, null, null, null, null, null, null, null, null, null, null",
                                 "106, Six, Fay, 9.813, null, null, null, null, 
null, null, null, null, null, null, null, null",
                                 "107, Seven, Grace, 5.125, null, null, null, 
null, null, null, null, null, null, null, null, null",

Reply via email to