This is an automated email from the ASF dual-hosted git repository.

laskoviymishka pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 4c767c14b0 Kafka Connect: Add end-to-end test for commit failure 
propagation (#16432)
4c767c14b0 is described below

commit 4c767c14b02b7b48e0030c7da01b9f87d65dcad5
Author: Anupam Yadav <[email protected]>
AuthorDate: Fri May 22 14:59:36 2026 -0700

    Kafka Connect: Add end-to-end test for commit failure propagation (#16432)
    
    Fixes #16380
---
 .../iceberg/connect/channel/TestCommitterImpl.java | 51 ++++++++++++++++++++++
 1 file changed, 51 insertions(+)

diff --git 
a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCommitterImpl.java
 
b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCommitterImpl.java
index c6b7c86e4c..f7440dacbe 100644
--- 
a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCommitterImpl.java
+++ 
b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCommitterImpl.java
@@ -19,12 +19,17 @@
 package org.apache.iceberg.connect.channel;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.lang.reflect.Field;
+import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import org.apache.iceberg.connect.IcebergSinkConfig;
@@ -114,4 +119,50 @@ public class TestCommitterImpl {
       assertThat(committer.hasLeaderPartition(nonLeaderAssignments)).isFalse();
     }
   }
+
+  @Test
+  public void testCommitFailurePropagatesAsNotRunningException()
+      throws NoSuchFieldException, IllegalAccessException {
+    Coordinator coordinator = mock(Coordinator.class);
+    doThrow(new RuntimeException("commit failed")).when(coordinator).process();
+
+    CoordinatorThread coordinatorThread = new CoordinatorThread(coordinator);
+    coordinatorThread.start();
+
+    // wait for the thread to catch the exception, set terminated, and call 
stop
+    verify(coordinator, timeout(1000)).stop();
+    assertThat(coordinatorThread.isTerminated()).isTrue();
+
+    CommitterImpl committer = new CommitterImpl();
+    Field field = CommitterImpl.class.getDeclaredField("coordinatorThread");
+    field.setAccessible(true);
+    field.set(committer, coordinatorThread);
+
+    assertThatThrownBy(() -> committer.save(Collections.emptyList()))
+        .isInstanceOf(NotRunningException.class)
+        .hasMessageContaining("Coordinator unexpectedly terminated");
+  }
+
+  @Test
+  public void testStartFailurePropagatesAsNotRunningException()
+      throws NoSuchFieldException, IllegalAccessException {
+    Coordinator coordinator = mock(Coordinator.class);
+    doThrow(new RuntimeException("start failed")).when(coordinator).start();
+
+    CoordinatorThread coordinatorThread = new CoordinatorThread(coordinator);
+    coordinatorThread.start();
+
+    // wait for the thread to catch the exception, set terminated, and call 
stop
+    verify(coordinator, timeout(1000)).stop();
+    assertThat(coordinatorThread.isTerminated()).isTrue();
+
+    CommitterImpl committer = new CommitterImpl();
+    Field field = CommitterImpl.class.getDeclaredField("coordinatorThread");
+    field.setAccessible(true);
+    field.set(committer, coordinatorThread);
+
+    assertThatThrownBy(() -> committer.save(Collections.emptyList()))
+        .isInstanceOf(NotRunningException.class)
+        .hasMessageContaining("Coordinator unexpectedly terminated");
+  }
 }

Reply via email to