[ 
https://issues.apache.org/jira/browse/KAFKA-14520?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Waleed Fateem resolved KAFKA-14520.
-----------------------------------
    Resolution: Duplicate

> TimeoutException Raised by KafkaConsumer Leads to: User provided listener 
> org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance failed on 
> invocation of onPartitionsAssigned
> --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-14520
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14520
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 3.2.1
>            Reporter: Waleed Fateem
>            Priority: Minor
>
> I'm on the fence on whether or not this should actually be considered a bug, 
> but decided to open it as such from the perspective of a sink developer. Even 
> though there's a sign of a potential issue on the Kafka broker's side, we're 
> dependent on Kafka Connect to provide a level of robustness so we don't have 
> to manually intervene to restart the connector.
> We don't have access to the Kafka broker cluster, so we don't know what the 
> underlying issue might be that caused the following error during a rebalance:
> {code:java}
> Nov 21, 2022 @ 
> 06:09:44.234","org.apache.kafka.common.errors.TimeoutException: Timeout of 
> 60000ms expired before the position for partition topic-partition-2 could be 
> determined {code}
> That leads to the following problem:
> {code:java}
> Nov 21, 2022 @ 06:09:44.234","2022-11-21 06:09:44,234 ERROR [Consumer 
> clientId=connector-consumer-the-sink-1, groupId=connect-the-sink] User 
> provided listener 
> org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance failed on 
> invocation of onPartitionsAssigned for partitions [<list of partitions>] 
> (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) 
> [task-thread-the-sink-1] 
> {code}
> The KafkaConsumer's position() method invoked in the WorkerSinkTask's 
> HandleRebalance 
> [code|https://github.com/apache/kafka/blob/3.2/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L697]
>  causing that TimeoutException is:
> {code:java}
>     private class HandleRebalance implements ConsumerRebalanceListener {
>         @Override
>         public void onPartitionsAssigned(Collection<TopicPartition> 
> partitions){
>             log.debug("{} Partitions assigned {}", WorkerSinkTask.this, 
> partitions);
>             for (TopicPartition tp : partitions) {                long pos = 
> consumer.position(tp);                lastCommittedOffsets.put(tp, new 
> OffsetAndMetadata(pos));                currentOffsets.put(tp, new 
> OffsetAndMetadata(pos));                log.debug("{} Assigned topic 
> partition {} with offset {}", WorkerSinkTask.this, tp, pos);            
> }{code}
> Which is then considered an unrecoverable error 
> [here|https://github.com/a0x8o/kafka/blob/master/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java#L210]:
> {code:java}
> Nov 21, 2022 @ 06:09:44.234","2022-11-21 06:09:44,234 ERROR 
> WorkerSinkTask{id=the-sink-1} Task threw an uncaught and unrecoverable except 
> ion. Task is being killed and will not recover until manually restarted 
> (org.apache.kafka.connect.runtime.WorkerTask) [task-thread-the-sink-1] {code}
> Do we expect that TimeoutException to cause the task to be killed, or should 
> have this been handled ideally somehow in the WorkerSinkTask's 
> HandleRebalance code?
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to