This is an automated email from the ASF dual-hosted git repository.
laskoviymishka pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 2fe32aa8f7 Kafka Connect: Surface commit failures instead of silently
swallowing them (#16237)
2fe32aa8f7 is described below
commit 2fe32aa8f77ebaee5359cabd4592300e1df69e72
Author: Anupam Yadav <[email protected]>
AuthorDate: Mon May 18 13:44:30 2026 -0700
Kafka Connect: Surface commit failures instead of silently swallowing them
(#16237)
* Kafka Connect: Surface commit failures instead of silently swallowing them
Narrow the catch around doCommit() and rethrow on full-commit
failures. Partial-commit failures (triggered by commit timeout) are
logged at WARN and swallowed since the coordinator will retry on
the next cycle.
This ensures commit failures surface to operators by terminating
the coordinator thread, which transitions the Connect task to FAILED.
Fixes #15878
* Retrigger CI
* Retrigger CI (attempt 2)
---
.../iceberg/connect/channel/Coordinator.java | 17 +++++---
.../iceberg/connect/channel/TestCoordinator.java | 48 ++++++++++++++++++----
2 files changed, 51 insertions(+), 14 deletions(-)
diff --git
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java
index c986f8afc2..4b46b941f4 100644
---
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java
+++
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java
@@ -150,12 +150,17 @@ class Coordinator extends Channel {
private void commit(boolean partialCommit) {
try {
doCommit(partialCommit);
- } catch (Exception e) {
- LOG.warn(
- "Coordinator {} failed to commit for commit {}, will try again next
cycle",
- taskId,
- commitState.currentCommitId(),
- e);
+ } catch (RuntimeException e) {
+ if (partialCommit) {
+ LOG.warn(
+ "Partial commit {} failed for task {}, will retry",
+ commitState.currentCommitId(),
+ taskId,
+ e);
+ } else {
+ LOG.error("Commit {} failed for task {}",
commitState.currentCommitId(), taskId, e);
+ throw e;
+ }
} finally {
commitState.endCurrentCommit();
}
diff --git
a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java
b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java
index ed370fcdad..0b5553e127 100644
---
a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java
+++
b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java
@@ -19,12 +19,16 @@
package org.apache.iceberg.connect.channel;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import java.time.OffsetDateTime;
import java.util.List;
import java.util.UUID;
+import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.DataOperations;
@@ -45,6 +49,8 @@ import org.apache.iceberg.connect.events.PayloadType;
import org.apache.iceberg.connect.events.StartCommit;
import org.apache.iceberg.connect.events.TableReference;
import org.apache.iceberg.connect.events.TopicPartitionOffset;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types.StructType;
@@ -135,14 +141,35 @@ public class TestCoordinator extends ChannelTestBase {
.withRecordCount(5)
.build();
- coordinatorTest(ImmutableList.of(badDataFile), ImmutableList.of(), null);
+ assertThatThrownBy(
+ () -> coordinatorTest(ImmutableList.of(badDataFile),
ImmutableList.of(), null))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Cannot find partition spec");
- // no commit messages sent
assertThat(producer.history()).hasSize(1);
-
assertThat(table.snapshots()).isEmpty();
}
+ @Test
+ public void testCommitFailedExceptionPropagates() {
+ Table spiedTable = spy(table);
+ AppendFiles spiedAppend = spy(table.newAppend());
+ doThrow(new CommitFailedException("Glue detected concurrent update"))
+ .when(spiedAppend)
+ .commit();
+ when(spiedTable.newAppend()).thenReturn(spiedAppend);
+ when(catalog.loadTable(TABLE_IDENTIFIER)).thenReturn(spiedTable);
+
+ assertThatThrownBy(
+ () ->
+ coordinatorTest(
+ ImmutableList.of(EventTestUtil.createDataFile()),
+ ImmutableList.of(),
+ EventTestUtil.now()))
+ .isInstanceOf(CommitFailedException.class)
+ .hasMessageContaining("Glue detected concurrent update");
+ }
+
private void assertCommitTable(int idx, UUID commitId, OffsetDateTime ts) {
byte[] bytes = producer.history().get(idx).value();
Event commitTable = AvroUtil.decode(bytes);
@@ -289,13 +316,18 @@ public class TestCoordinator extends ChannelTestBase {
Snapshot firstSnapshot = table.currentSnapshot();
assertThat(firstSnapshot.summary()).containsEntry(OFFSETS_SNAPSHOT_PROP,
"{\"0\":7}");
- // Trigger commit to the table
- coordinatorTest(
- ImmutableList.of(EventTestUtil.createDataFile()), ImmutableList.of(),
EventTestUtil.now());
+ // Trigger commit to the table - should throw ValidationException
+ assertThatThrownBy(
+ () ->
+ coordinatorTest(
+ ImmutableList.of(EventTestUtil.createDataFile()),
+ ImmutableList.of(),
+ EventTestUtil.now()))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining("stale offsets");
- // Assert that the table was not updated and offsets remain
table.refresh();
assertThat(table.snapshots()).hasSize(2);
- assertThat(firstSnapshot.summary()).containsEntry(OFFSETS_SNAPSHOT_PROP,
"{\"0\":7}");
+
assertThat(table.currentSnapshot().summary()).containsEntry(OFFSETS_SNAPSHOT_PROP,
"{\"0\":7}");
}
}