This is an automated email from the ASF dual-hosted git repository.
dweeks pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 8da07dcae8 Merge control topic and last persisted offests (#14525)
8da07dcae8 is described below
commit 8da07dcae8ccf5ce1a0c61a7456413c1ce3b65fd
Author: Daniel Weeks <[email protected]>
AuthorDate: Fri Nov 7 07:55:15 2025 -0800
Merge control topic and last persisted offests (#14525)
* Merge control topic and last persisted offests
* Add tests
---
.../iceberg/connect/channel/Coordinator.java | 20 ++++++++++++------
.../iceberg/connect/channel/TestCoordinator.java | 24 ++++++++++++++++++++++
2 files changed, 38 insertions(+), 6 deletions(-)
diff --git
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java
index 97e06a8278..30ae5f33c7 100644
---
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java
+++
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java
@@ -34,6 +34,7 @@ import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
@@ -153,8 +154,6 @@ class Coordinator extends Channel {
private void doCommit(boolean partialCommit) {
Map<TableReference, List<Envelope>> commitMap =
commitState.tableCommitMap();
-
- String offsetsJson = offsetsJson();
OffsetDateTime validThroughTs = commitState.validThroughTs(partialCommit);
Tasks.foreach(commitMap.entrySet())
@@ -162,7 +161,8 @@ class Coordinator extends Channel {
.stopOnFailure()
.run(
entry -> {
- commitToTable(entry.getKey(), entry.getValue(), offsetsJson,
validThroughTs);
+ commitToTable(
+ entry.getKey(), entry.getValue(), controlTopicOffsets(),
validThroughTs);
});
// we should only get here if all tables committed successfully...
@@ -182,9 +182,9 @@ class Coordinator extends Channel {
validThroughTs);
}
- private String offsetsJson() {
+ private String offsetsToJson(Map<Integer, Long> offsets) {
try {
- return MAPPER.writeValueAsString(controlTopicOffsets());
+ return MAPPER.writeValueAsString(offsets);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
@@ -193,7 +193,7 @@ class Coordinator extends Channel {
private void commitToTable(
TableReference tableReference,
List<Envelope> envelopeList,
- String offsetsJson,
+ Map<Integer, Long> controlTopicOffsets,
OffsetDateTime validThroughTs) {
TableIdentifier tableIdentifier = tableReference.identifier();
Table table;
@@ -206,7 +206,15 @@ class Coordinator extends Channel {
String branch =
config.tableConfig(tableIdentifier.toString()).commitBranch();
+ // Control topic partition offsets may include a subset of partition ids
if there were no
+ // records for other partitions. Merge the updated topic partitions with
the last committed
+ // offsets.
Map<Integer, Long> committedOffsets = lastCommittedOffsetsForTable(table,
branch);
+ Map<Integer, Long> mergedOffsets =
+ Stream.of(committedOffsets, controlTopicOffsets)
+ .flatMap(map -> map.entrySet().stream())
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue,
Long::max));
+ String offsetsJson = offsetsToJson(mergedOffsets);
List<DataWritten> payloads =
envelopeList.stream()
diff --git
a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java
b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java
index 44ef43877c..4b1a878e56 100644
---
a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java
+++
b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java
@@ -229,4 +229,28 @@ public class TestCoordinator extends ChannelTestBase {
sourceConsumer.rebalance(ImmutableList.of(tp1));
assertThat(mockIcebergSinkTask.isCoordinatorRunning()).isFalse();
}
+
+ @Test
+ public void testCoordinatorCommittedOffsetMerging() {
+ // Set the initial offsets based on a message from partition 1
+ table
+ .newAppend()
+ .appendFile(EventTestUtil.createDataFile())
+ .set(OFFSETS_SNAPSHOT_PROP, "{\"1\":7}")
+ .commit();
+
+ table.refresh();
+ assertThat(table.snapshots()).hasSize(1);
+
assertThat(table.currentSnapshot().summary()).containsEntry(OFFSETS_SNAPSHOT_PROP,
"{\"1\":7}");
+
+ // Trigger commit to the table that will include partition 0 offsets
+ coordinatorTest(
+ ImmutableList.of(EventTestUtil.createDataFile()), ImmutableList.of(),
EventTestUtil.now());
+
+ // Assert that the table was not updated and both offsets are represented
+ table.refresh();
+ assertThat(table.snapshots()).hasSize(2);
+ assertThat(table.currentSnapshot().summary())
+ .containsEntry(OFFSETS_SNAPSHOT_PROP, "{\"0\":3,\"1\":7}");
+ }
}