[ 
https://issues.apache.org/jira/browse/KAFKA-14520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17702118#comment-17702118
 ] 

Waleed Fateem commented on KAFKA-14520:
---------------------------------------

Hi [~sagarrao],

Thank you for the response. I'm not honestly sure about the console consumer 
and if there were other consumers reading from that topic at the time that 
experienced the same issue.

However, that wasn't my main concern. The focus was on the Kafka sink connector 
and trying to understand why a task was failing. The issue was that in this 
particular environment, there wasn't really a good way of monitoring the 
different connectors and their tasks, so it only becomes clear that there was a 
problem when other users noticed that some data wasn't making it to the 
destination. 

So it sounds like this is expected then, and it's not an issue with the Kafka 
Connect runtime nor there's a reason to believe that an enhancement of some 
sort would make sense. Is that a fair statement?

Is there anything you think a sink connector can or should do in a situation 
like this? My understanding is that there's not much we can do here, right?

> TimeoutException Raised by KafkaConsumer Leads to: User provided listener 
> org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance failed on 
> invocation of onPartitionsAssigned
> --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-14520
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14520
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 3.2.1
>            Reporter: Waleed Fateem
>            Priority: Minor
>
> I'm on the fence on whether or not this should actually be considered a bug, 
> but decided to open it as such from the perspective of a sink developer. Even 
> though there's a sign of a potential issue on the Kafka broker's side, we're 
> dependent on Kafka Connect to provide a level of robustness so we don't have 
> to manually intervene to restart the connector.
> We don't have access to the Kafka broker cluster, so we don't know what the 
> underlying issue might be that caused the following error during a rebalance:
> {code:java}
> Nov 21, 2022 @ 
> 06:09:44.234","org.apache.kafka.common.errors.TimeoutException: Timeout of 
> 60000ms expired before the position for partition topic-partition-2 could be 
> determined {code}
> That leads to the following problem:
> {code:java}
> Nov 21, 2022 @ 06:09:44.234","2022-11-21 06:09:44,234 ERROR [Consumer 
> clientId=connector-consumer-the-sink-1, groupId=connect-the-sink] User 
> provided listener 
> org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance failed on 
> invocation of onPartitionsAssigned for partitions [<list of partitions>] 
> (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) 
> [task-thread-the-sink-1] 
> {code}
> The KafkaConsumer's position() method invoked in the WorkerSinkTask's 
> HandleRebalance 
> [code|https://github.com/apache/kafka/blob/3.2/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L697]
>  causing that TimeoutException is:
> {code:java}
>     private class HandleRebalance implements ConsumerRebalanceListener {
>         @Override
>         public void onPartitionsAssigned(Collection<TopicPartition> 
> partitions){
>             log.debug("{} Partitions assigned {}", WorkerSinkTask.this, 
> partitions);
>             for (TopicPartition tp : partitions) {                long pos = 
> consumer.position(tp);                lastCommittedOffsets.put(tp, new 
> OffsetAndMetadata(pos));                currentOffsets.put(tp, new 
> OffsetAndMetadata(pos));                log.debug("{} Assigned topic 
> partition {} with offset {}", WorkerSinkTask.this, tp, pos);            
> }{code}
> Which is then considered an unrecoverable error 
> [here|https://github.com/a0x8o/kafka/blob/master/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java#L210]:
> {code:java}
> Nov 21, 2022 @ 06:09:44.234","2022-11-21 06:09:44,234 ERROR 
> WorkerSinkTask{id=the-sink-1} Task threw an uncaught and unrecoverable except 
> ion. Task is being killed and will not recover until manually restarted 
> (org.apache.kafka.connect.runtime.WorkerTask) [task-thread-the-sink-1] {code}
> Do we expect that TimeoutException to cause the task to be killed, or should 
> have this been handled ideally somehow in the WorkerSinkTask's 
> HandleRebalance code?
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to