This is an automated email from the ASF dual-hosted git repository.
laskoviymishka pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 4c767c14b0 Kafka Connect: Add end-to-end test for commit failure
propagation (#16432)
4c767c14b0 is described below
commit 4c767c14b02b7b48e0030c7da01b9f87d65dcad5
Author: Anupam Yadav <[email protected]>
AuthorDate: Fri May 22 14:59:36 2026 -0700
Kafka Connect: Add end-to-end test for commit failure propagation (#16432)
Fixes #16380
---
.../iceberg/connect/channel/TestCommitterImpl.java | 51 ++++++++++++++++++++++
1 file changed, 51 insertions(+)
diff --git
a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCommitterImpl.java
b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCommitterImpl.java
index c6b7c86e4c..f7440dacbe 100644
---
a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCommitterImpl.java
+++
b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCommitterImpl.java
@@ -19,12 +19,17 @@
package org.apache.iceberg.connect.channel;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.lang.reflect.Field;
+import java.util.Collections;
import java.util.List;
import java.util.Optional;
import org.apache.iceberg.connect.IcebergSinkConfig;
@@ -114,4 +119,50 @@ public class TestCommitterImpl {
assertThat(committer.hasLeaderPartition(nonLeaderAssignments)).isFalse();
}
}
+
+ @Test
+ public void testCommitFailurePropagatesAsNotRunningException()
+ throws NoSuchFieldException, IllegalAccessException {
+ Coordinator coordinator = mock(Coordinator.class);
+ doThrow(new RuntimeException("commit failed")).when(coordinator).process();
+
+ CoordinatorThread coordinatorThread = new CoordinatorThread(coordinator);
+ coordinatorThread.start();
+
+ // wait for the thread to catch the exception, set terminated, and call
stop
+ verify(coordinator, timeout(1000)).stop();
+ assertThat(coordinatorThread.isTerminated()).isTrue();
+
+ CommitterImpl committer = new CommitterImpl();
+ Field field = CommitterImpl.class.getDeclaredField("coordinatorThread");
+ field.setAccessible(true);
+ field.set(committer, coordinatorThread);
+
+ assertThatThrownBy(() -> committer.save(Collections.emptyList()))
+ .isInstanceOf(NotRunningException.class)
+ .hasMessageContaining("Coordinator unexpectedly terminated");
+ }
+
+ @Test
+ public void testStartFailurePropagatesAsNotRunningException()
+ throws NoSuchFieldException, IllegalAccessException {
+ Coordinator coordinator = mock(Coordinator.class);
+ doThrow(new RuntimeException("start failed")).when(coordinator).start();
+
+ CoordinatorThread coordinatorThread = new CoordinatorThread(coordinator);
+ coordinatorThread.start();
+
+ // wait for the thread to catch the exception, set terminated, and call
stop
+ verify(coordinator, timeout(1000)).stop();
+ assertThat(coordinatorThread.isTerminated()).isTrue();
+
+ CommitterImpl committer = new CommitterImpl();
+ Field field = CommitterImpl.class.getDeclaredField("coordinatorThread");
+ field.setAccessible(true);
+ field.set(committer, coordinatorThread);
+
+ assertThatThrownBy(() -> committer.save(Collections.emptyList()))
+ .isInstanceOf(NotRunningException.class)
+ .hasMessageContaining("Coordinator unexpectedly terminated");
+ }
}