Greg Harris created KAFKA-14311: ----------------------------------- Summary: Connect Worker clean shutdown does not cleanly stop connectors/tasks Key: KAFKA-14311 URL: https://issues.apache.org/jira/browse/KAFKA-14311 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 3.3.1 Reporter: Greg Harris
When the DistributedHerder::stop() method called, it triggers asynchronous shutdown of the background herder thread, and continues with synchronous shutdown of some other resources, including the stopAndStartExecutor. This executor is responsible for cleanly stopping connectors and tasks, which it the DistributedHerder::halt() method. There is a race condition between the halt() method asynchronously submitting these connector/task stop jobs and the stop() method terminating the executor. If the executor is terminated first, this exception appears: {noformat} [2022-10-17 16:29:23,396] ERROR [Worker clientId=connect-2, groupId=connect-integration-test-connect-cluster-1] Uncaught exception in herder work thread, exiting: (org.apache.kafka.connect.runtime.distributed.DistributedHerder:366) java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@62878e25[Not completed, task = org.apache.kafka.connect.runtime.distributed.DistributedHerder$$Lambda$2285/0x00000008015046a8@58deade3] rejected from java.util.concurrent.ThreadPoolExecutor@10351ac3[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1] at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2065) at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:833) at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1365) at java.base/java.util.concurrent.AbstractExecutorService.invokeAll(AbstractExecutorService.java:247) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startAndStop(DistributedHerder.java:1667) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.halt(DistributedHerder.java:765) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:361) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:833){noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)