[ https://issues.apache.org/jira/browse/KAFKA-14520?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Waleed Fateem resolved KAFKA-14520. ----------------------------------- Resolution: Duplicate > TimeoutException Raised by KafkaConsumer Leads to: User provided listener > org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance failed on > invocation of onPartitionsAssigned > -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- > > Key: KAFKA-14520 > URL: https://issues.apache.org/jira/browse/KAFKA-14520 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect > Affects Versions: 3.2.1 > Reporter: Waleed Fateem > Priority: Minor > > I'm on the fence on whether or not this should actually be considered a bug, > but decided to open it as such from the perspective of a sink developer. Even > though there's a sign of a potential issue on the Kafka broker's side, we're > dependent on Kafka Connect to provide a level of robustness so we don't have > to manually intervene to restart the connector. > We don't have access to the Kafka broker cluster, so we don't know what the > underlying issue might be that caused the following error during a rebalance: > {code:java} > Nov 21, 2022 @ > 06:09:44.234","org.apache.kafka.common.errors.TimeoutException: Timeout of > 60000ms expired before the position for partition topic-partition-2 could be > determined {code} > That leads to the following problem: > {code:java} > Nov 21, 2022 @ 06:09:44.234","2022-11-21 06:09:44,234 ERROR [Consumer > clientId=connector-consumer-the-sink-1, groupId=connect-the-sink] User > provided listener > org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance failed on > invocation of onPartitionsAssigned for partitions [<list of partitions>] > (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) > [task-thread-the-sink-1] > {code} > The KafkaConsumer's position() method invoked in the WorkerSinkTask's > HandleRebalance > [code|https://github.com/apache/kafka/blob/3.2/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L697] > causing that TimeoutException is: > {code:java} > private class HandleRebalance implements ConsumerRebalanceListener { > @Override > public void onPartitionsAssigned(Collection<TopicPartition> > partitions){ > log.debug("{} Partitions assigned {}", WorkerSinkTask.this, > partitions); > for (TopicPartition tp : partitions) { long pos = > consumer.position(tp); lastCommittedOffsets.put(tp, new > OffsetAndMetadata(pos)); currentOffsets.put(tp, new > OffsetAndMetadata(pos)); log.debug("{} Assigned topic > partition {} with offset {}", WorkerSinkTask.this, tp, pos); > }{code} > Which is then considered an unrecoverable error > [here|https://github.com/a0x8o/kafka/blob/master/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java#L210]: > {code:java} > Nov 21, 2022 @ 06:09:44.234","2022-11-21 06:09:44,234 ERROR > WorkerSinkTask{id=the-sink-1} Task threw an uncaught and unrecoverable except > ion. Task is being killed and will not recover until manually restarted > (org.apache.kafka.connect.runtime.WorkerTask) [task-thread-the-sink-1] {code} > Do we expect that TimeoutException to cause the task to be killed, or should > have this been handled ideally somehow in the WorkerSinkTask's > HandleRebalance code? > > -- This message was sent by Atlassian Jira (v8.20.10#820010)