This is an automated email from the ASF dual-hosted git repository. fmariani pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 33c70e1db3f0bda1840a10d90fcbe1a46ceeba3c Author: Croway <[email protected]> AuthorDate: Tue Mar 31 15:15:10 2026 +0200 Fix KafkaConnectRunnerService executor lifecycle and REST port conflicts The ExecutorService was created once and reused across test methods. After stop() called shutdown(), the next test's start() submitted to a dead executor causing RejectedExecutionException and herder=null. Create a fresh ExecutorService in start() so each test gets a clean runtime. Also properly shutdown/shutdownNow the executor in stop(). Use port 0 for the REST server listener so the OS assigns a free port, avoiding bind conflicts between consecutive test runs where the previous REST server port is still being released. --- .../kafkaconnect/DefaultKafkaConnectPropertyFactory.java | 4 ++-- .../services/kafkaconnect/KafkaConnectRunnerService.java | 10 +++++++--- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/DefaultKafkaConnectPropertyFactory.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/DefaultKafkaConnectPropertyFactory.java index 764b3ccfb7..04fff966c5 100644 --- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/DefaultKafkaConnectPropertyFactory.java +++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/DefaultKafkaConnectPropertyFactory.java @@ -55,8 +55,8 @@ class DefaultKafkaConnectPropertyFactory implements KafkaConnectPropertyFactory props.put(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, this.getClass().getResource("/").getPath() + "connect.offsets"); props.put(StandaloneConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG, "10000"); - String address = NetworkUtils.getAddress("http"); - LOG.info("Using the following address for the listener configuration: {}", address); + String address = "http://localhost:0"; + LOG.info("Using the following address for the listener configuration: {}", address); props.put(RestServerConfig.LISTENERS_CONFIG, address); String pluginPaths = PluginPathHelper.getInstance().pluginPaths(); diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectRunnerService.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectRunnerService.java index 0c48bb9e57..e3a27a8384 100644 --- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectRunnerService.java +++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectRunnerService.java @@ -39,7 +39,7 @@ public class KafkaConnectRunnerService implements KafkaConnectService { private static final Logger LOG = LoggerFactory.getLogger(KafkaConnectRunnerService.class); private final KafkaConnectRunner kafkaConnectRunner; - private final ExecutorService service = Executors.newCachedThreadPool(); + private ExecutorService service; public KafkaConnectRunnerService(KafkaService kafkaService) { @@ -97,16 +97,20 @@ public class KafkaConnectRunnerService implements KafkaConnectService { public void stop() { kafkaConnectRunner.stop(); + service.shutdown(); try { - if (!service.awaitTermination(5, TimeUnit.SECONDS)) { - LOG.warn("Timed out while waiting for the embedded runner to stop"); + if (!service.awaitTermination(10, TimeUnit.SECONDS)) { + LOG.warn("Timed out while waiting for the embedded runner to stop, forcing shutdown"); + service.shutdownNow(); } } catch (InterruptedException e) { LOG.warn("The test was interrupted while executing"); + service.shutdownNow(); } } public void start() { + service = Executors.newCachedThreadPool(); CountDownLatch latch = new CountDownLatch(1); service.submit(() -> kafkaConnectRunner.run(latch));
