This is an automated email from the ASF dual-hosted git repository.

fmariani pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 33c70e1db3f0bda1840a10d90fcbe1a46ceeba3c
Author: Croway <[email protected]>
AuthorDate: Tue Mar 31 15:15:10 2026 +0200

    Fix KafkaConnectRunnerService executor lifecycle and REST port conflicts
    
    The ExecutorService was created once and reused across test methods.
    After stop() called shutdown(), the next test's start() submitted to a
    dead executor causing RejectedExecutionException and herder=null.
    
    Create a fresh ExecutorService in start() so each test gets a clean
    runtime. Also properly shutdown/shutdownNow the executor in stop().
    
    Use port 0 for the REST server listener so the OS assigns a free port,
    avoiding bind conflicts between consecutive test runs where the previous
    REST server port is still being released.
---
 .../kafkaconnect/DefaultKafkaConnectPropertyFactory.java       |  4 ++--
 .../services/kafkaconnect/KafkaConnectRunnerService.java       | 10 +++++++---
 2 files changed, 9 insertions(+), 5 deletions(-)

diff --git 
a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/DefaultKafkaConnectPropertyFactory.java
 
b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/DefaultKafkaConnectPropertyFactory.java
index 764b3ccfb7..04fff966c5 100644
--- 
a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/DefaultKafkaConnectPropertyFactory.java
+++ 
b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/DefaultKafkaConnectPropertyFactory.java
@@ -55,8 +55,8 @@ class DefaultKafkaConnectPropertyFactory implements 
KafkaConnectPropertyFactory
         props.put(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, 
this.getClass().getResource("/").getPath() + "connect.offsets");
         props.put(StandaloneConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG, "10000");
 
-        String address = NetworkUtils.getAddress("http");
-        LOG.info("Using the following address for  the listener configuration: 
{}", address);
+        String address = "http://localhost:0";;
+        LOG.info("Using the following address for the listener configuration: 
{}", address);
         props.put(RestServerConfig.LISTENERS_CONFIG, address);
 
         String pluginPaths = PluginPathHelper.getInstance().pluginPaths();
diff --git 
a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectRunnerService.java
 
b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectRunnerService.java
index 0c48bb9e57..e3a27a8384 100644
--- 
a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectRunnerService.java
+++ 
b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectRunnerService.java
@@ -39,7 +39,7 @@ public class KafkaConnectRunnerService implements 
KafkaConnectService {
     private static final Logger LOG = 
LoggerFactory.getLogger(KafkaConnectRunnerService.class);
 
     private final KafkaConnectRunner kafkaConnectRunner;
-    private final ExecutorService service = Executors.newCachedThreadPool();
+    private ExecutorService service;
 
 
     public KafkaConnectRunnerService(KafkaService kafkaService) {
@@ -97,16 +97,20 @@ public class KafkaConnectRunnerService implements 
KafkaConnectService {
 
     public void stop() {
         kafkaConnectRunner.stop();
+        service.shutdown();
         try {
-            if (!service.awaitTermination(5, TimeUnit.SECONDS)) {
-                LOG.warn("Timed out while waiting for the embedded runner to 
stop");
+            if (!service.awaitTermination(10, TimeUnit.SECONDS)) {
+                LOG.warn("Timed out while waiting for the embedded runner to 
stop, forcing shutdown");
+                service.shutdownNow();
             }
         } catch (InterruptedException e) {
             LOG.warn("The test was interrupted while executing");
+            service.shutdownNow();
         }
     }
 
     public void start() {
+        service = Executors.newCachedThreadPool();
         CountDownLatch latch = new CountDownLatch(1);
         service.submit(() -> kafkaConnectRunner.run(latch));
 

Reply via email to