This is an automated email from the ASF dual-hosted git repository.

dweeks pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 8da07dcae8 Merge control topic and last persisted offests (#14525)
8da07dcae8 is described below

commit 8da07dcae8ccf5ce1a0c61a7456413c1ce3b65fd
Author: Daniel Weeks <[email protected]>
AuthorDate: Fri Nov 7 07:55:15 2025 -0800

    Merge control topic and last persisted offests (#14525)
    
    * Merge control topic and last persisted offests
    
    * Add tests
---
 .../iceberg/connect/channel/Coordinator.java       | 20 ++++++++++++------
 .../iceberg/connect/channel/TestCoordinator.java   | 24 ++++++++++++++++++++++
 2 files changed, 38 insertions(+), 6 deletions(-)

diff --git 
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java
 
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java
index 97e06a8278..30ae5f33c7 100644
--- 
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java
+++ 
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java
@@ -34,6 +34,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.apache.iceberg.AppendFiles;
 import org.apache.iceberg.ContentFile;
 import org.apache.iceberg.DataFile;
@@ -153,8 +154,6 @@ class Coordinator extends Channel {
 
   private void doCommit(boolean partialCommit) {
     Map<TableReference, List<Envelope>> commitMap = 
commitState.tableCommitMap();
-
-    String offsetsJson = offsetsJson();
     OffsetDateTime validThroughTs = commitState.validThroughTs(partialCommit);
 
     Tasks.foreach(commitMap.entrySet())
@@ -162,7 +161,8 @@ class Coordinator extends Channel {
         .stopOnFailure()
         .run(
             entry -> {
-              commitToTable(entry.getKey(), entry.getValue(), offsetsJson, 
validThroughTs);
+              commitToTable(
+                  entry.getKey(), entry.getValue(), controlTopicOffsets(), 
validThroughTs);
             });
 
     // we should only get here if all tables committed successfully...
@@ -182,9 +182,9 @@ class Coordinator extends Channel {
         validThroughTs);
   }
 
-  private String offsetsJson() {
+  private String offsetsToJson(Map<Integer, Long> offsets) {
     try {
-      return MAPPER.writeValueAsString(controlTopicOffsets());
+      return MAPPER.writeValueAsString(offsets);
     } catch (IOException e) {
       throw new UncheckedIOException(e);
     }
@@ -193,7 +193,7 @@ class Coordinator extends Channel {
   private void commitToTable(
       TableReference tableReference,
       List<Envelope> envelopeList,
-      String offsetsJson,
+      Map<Integer, Long> controlTopicOffsets,
       OffsetDateTime validThroughTs) {
     TableIdentifier tableIdentifier = tableReference.identifier();
     Table table;
@@ -206,7 +206,15 @@ class Coordinator extends Channel {
 
     String branch = 
config.tableConfig(tableIdentifier.toString()).commitBranch();
 
+    // Control topic partition offsets may include a subset of partition ids 
if there were no
+    // records for other partitions.  Merge the updated topic partitions with 
the last committed
+    // offsets.
     Map<Integer, Long> committedOffsets = lastCommittedOffsetsForTable(table, 
branch);
+    Map<Integer, Long> mergedOffsets =
+        Stream.of(committedOffsets, controlTopicOffsets)
+            .flatMap(map -> map.entrySet().stream())
+            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, 
Long::max));
+    String offsetsJson = offsetsToJson(mergedOffsets);
 
     List<DataWritten> payloads =
         envelopeList.stream()
diff --git 
a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java
 
b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java
index 44ef43877c..4b1a878e56 100644
--- 
a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java
+++ 
b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java
@@ -229,4 +229,28 @@ public class TestCoordinator extends ChannelTestBase {
     sourceConsumer.rebalance(ImmutableList.of(tp1));
     assertThat(mockIcebergSinkTask.isCoordinatorRunning()).isFalse();
   }
+
+  @Test
+  public void testCoordinatorCommittedOffsetMerging() {
+    // Set the initial offsets based on a message from partition 1
+    table
+        .newAppend()
+        .appendFile(EventTestUtil.createDataFile())
+        .set(OFFSETS_SNAPSHOT_PROP, "{\"1\":7}")
+        .commit();
+
+    table.refresh();
+    assertThat(table.snapshots()).hasSize(1);
+    
assertThat(table.currentSnapshot().summary()).containsEntry(OFFSETS_SNAPSHOT_PROP,
 "{\"1\":7}");
+
+    // Trigger commit to the table that will include partition 0 offsets
+    coordinatorTest(
+        ImmutableList.of(EventTestUtil.createDataFile()), ImmutableList.of(), 
EventTestUtil.now());
+
+    // Assert that the table was not updated and both offsets are represented
+    table.refresh();
+    assertThat(table.snapshots()).hasSize(2);
+    assertThat(table.currentSnapshot().summary())
+        .containsEntry(OFFSETS_SNAPSHOT_PROP, "{\"0\":3,\"1\":7}");
+  }
 }

Reply via email to