Greg Harris created KAFKA-14311:
-----------------------------------

             Summary: Connect Worker clean shutdown does not cleanly stop 
connectors/tasks
                 Key: KAFKA-14311
                 URL: https://issues.apache.org/jira/browse/KAFKA-14311
             Project: Kafka
          Issue Type: Bug
          Components: KafkaConnect
    Affects Versions: 3.3.1
            Reporter: Greg Harris


When the DistributedHerder::stop() method called, it triggers asynchronous 
shutdown of the background herder thread, and continues with synchronous 
shutdown of some other resources, including the stopAndStartExecutor.

This executor is responsible for cleanly stopping connectors and tasks, which 
it  the DistributedHerder::halt() method. There is a race condition between the 
halt() method asynchronously submitting these connector/task stop jobs and the 
stop() method terminating the executor. If the executor is terminated first, 
this exception appears:
{noformat}
[2022-10-17 16:29:23,396] ERROR [Worker clientId=connect-2, 
groupId=connect-integration-test-connect-cluster-1] Uncaught exception in 
herder work thread, exiting:  
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:366)
java.util.concurrent.RejectedExecutionException: Task 
java.util.concurrent.FutureTask@62878e25[Not completed, task = 
org.apache.kafka.connect.runtime.distributed.DistributedHerder$$Lambda$2285/0x00000008015046a8@58deade3]
 rejected from java.util.concurrent.ThreadPoolExecutor@10351ac3[Terminated, 
pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1]
    at 
java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2065)
    at 
java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:833)
    at 
java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1365)
    at 
java.base/java.util.concurrent.AbstractExecutorService.invokeAll(AbstractExecutorService.java:247)
    at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.startAndStop(DistributedHerder.java:1667)
    at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.halt(DistributedHerder.java:765)
    at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:361)
    at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833){noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to