Yu Wang created KAFKA-15005:
-------------------------------

             Summary: Status of KafkaConnect task not correct
                 Key: KAFKA-15005
                 URL: https://issues.apache.org/jira/browse/KAFKA-15005
             Project: Kafka
          Issue Type: Bug
          Components: KafkaConnect
    Affects Versions: 3.3.2, 3.0.0, 2.5.1
            Reporter: Yu Wang


A rebalance of our MM2 source tasks, we found there are several tasks always in 
*UNASSIGNED* status. 

So we dump the payload of the status topic of Kafka Connect, and found the last 
two status change is status *RUNNING* followed by status {*}UNASSIGNED{*}.

 
{code:java}
LogAppendTime: 1684167885961 keysize: 64 valuesize: 95 sequence: -1 headerKeys: 
[] key: task-7 payload: 
{"state":"RUNNING","trace":null,"worker_id":"xxxxx","generation":437643}

LogAppendTime: 1684167903456 keysize: 64 valuesize: 98 sequence: -1 headerKeys: 
[] key: task-7 payload: 
{"state":"UNASSIGNED","trace":null,"worker_id":"xxxxx","generation":437643}
 {code}
Usually the RUNNING status should be appended after the UNASSIGNED, because the 
worker coordinator will revoked the tasks before start new tasks.

Then we checked the log of our MM2 worker. And found that, during that time, 
there was a task that revoked on worker-2 and started on worker-1.

 

Worker-1

 
{code:java}
[2023-05-15 09:24:45,950] INFO [Worker clientId=connect-1, groupId=xxxx__group] 
Starting task task-7 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder)

[2023-05-15 09:24:45,951] INFO Creating task task-7 
(org.apache.kafka.connect.runtime.Worker) {code}
Worker-2
{code:java}
[2023-05-15 09:24:40,922] INFO Stopping task task-7 
(org.apache.kafka.connect.runtime.Worker) {code}
So I think the incorrect status was caused by the revoked task finished later 
than the new started task, which made the UNASSIGNED status append to that 
status topic after the RUNNING status. 

After reading the code of DistributeHerder, I found that the task revoking is 
running in a thread pool, the revoke operation just return after submit all the 
callables. So I think even in the same worker, there is not a guarantee that 
the revoke operation will always finish before the new tasks start.
{code:java}
for (final ConnectorTaskId taskId : tasks) {
    callables.add(getTaskStoppingCallable(taskId));
}

// The actual timeout for graceful task/connector stop is applied in worker's
// stopAndAwaitTask/stopAndAwaitConnector methods.
startAndStop(callables);
log.info("Finished stopping tasks in preparation for rebalance"); {code}
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to