Yu Wang created KAFKA-15005: ------------------------------- Summary: Status of KafkaConnect task not correct Key: KAFKA-15005 URL: https://issues.apache.org/jira/browse/KAFKA-15005 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 3.3.2, 3.0.0, 2.5.1 Reporter: Yu Wang
A rebalance of our MM2 source tasks, we found there are several tasks always in *UNASSIGNED* status. So we dump the payload of the status topic of Kafka Connect, and found the last two status change is status *RUNNING* followed by status {*}UNASSIGNED{*}. {code:java} LogAppendTime: 1684167885961 keysize: 64 valuesize: 95 sequence: -1 headerKeys: [] key: task-7 payload: {"state":"RUNNING","trace":null,"worker_id":"xxxxx","generation":437643} LogAppendTime: 1684167903456 keysize: 64 valuesize: 98 sequence: -1 headerKeys: [] key: task-7 payload: {"state":"UNASSIGNED","trace":null,"worker_id":"xxxxx","generation":437643} {code} Usually the RUNNING status should be appended after the UNASSIGNED, because the worker coordinator will revoked the tasks before start new tasks. Then we checked the log of our MM2 worker. And found that, during that time, there was a task that revoked on worker-2 and started on worker-1. Worker-1 {code:java} [2023-05-15 09:24:45,950] INFO [Worker clientId=connect-1, groupId=xxxx__group] Starting task task-7 (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [2023-05-15 09:24:45,951] INFO Creating task task-7 (org.apache.kafka.connect.runtime.Worker) {code} Worker-2 {code:java} [2023-05-15 09:24:40,922] INFO Stopping task task-7 (org.apache.kafka.connect.runtime.Worker) {code} So I think the incorrect status was caused by the revoked task finished later than the new started task, which made the UNASSIGNED status append to that status topic after the RUNNING status. After reading the code of DistributeHerder, I found that the task revoking is running in a thread pool, the revoke operation just return after submit all the callables. So I think even in the same worker, there is not a guarantee that the revoke operation will always finish before the new tasks start. {code:java} for (final ConnectorTaskId taskId : tasks) { callables.add(getTaskStoppingCallable(taskId)); } // The actual timeout for graceful task/connector stop is applied in worker's // stopAndAwaitTask/stopAndAwaitConnector methods. startAndStop(callables); log.info("Finished stopping tasks in preparation for rebalance"); {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)