[ 
https://issues.apache.org/jira/browse/KAFKA-16665?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans closed KAFKA-16665.
----------------------------------

> Fail to get partition's position from within onPartitionsAssigned callback in 
> new consumer 
> -------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-16665
>                 URL: https://issues.apache.org/jira/browse/KAFKA-16665
>             Project: Kafka
>          Issue Type: Task
>          Components: clients, consumer
>    Affects Versions: 3.7.0
>            Reporter: Lianet Magrans
>            Assignee: Lianet Magrans
>            Priority: Major
>              Labels: kip-848-client-support
>             Fix For: 3.8.0
>
>
> If we attempt to call consumer.position(tp) from within the 
> onPartitionsAssigned callback, the new consumer fails with a 
> TimeoutException. The expectation is that we should be able to retrieve the 
> position of newly assigned partitions, as it happens with the legacy 
> consumer, that allows this call. This is actually used from places within 
> Kafka itself (ex. Connect 
> [WorkerSinkTask|https://github.com/apache/kafka/blob/2c0b8b692071b6d436fa7e9dc90a1592476a6b17/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L715])
>  
> The failure in the new consumer is because the partitions that are assigned 
> but awaiting the onPartitionsAssigned callback, are excluded from the list of 
> partitions that should initialize. We should allow the partitions to 
> initialize their positions, without allowing to fetch data from them (which 
> is already achieve based on the isFetchable flag in the subscription state).
> Note that a partition position can be updated from 2 places: call to 
> consumer.position or call to consumer.poll. Both will attempt to 
> `updateFetchPositions` if there is no valid position yet, but even after 
> having a valid position after those calls, the partition will remain 
> non-fetchable until the onPartitionsAssigned callback completes (fetchable 
> considers that the partitions has a valid position AND is not awaiting the 
> callback)
>   



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to