[ https://issues.apache.org/jira/browse/KAFKA-16665?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Lianet Magrans closed KAFKA-16665. ---------------------------------- > Fail to get partition's position from within onPartitionsAssigned callback in > new consumer > ------------------------------------------------------------------------------------------- > > Key: KAFKA-16665 > URL: https://issues.apache.org/jira/browse/KAFKA-16665 > Project: Kafka > Issue Type: Task > Components: clients, consumer > Affects Versions: 3.7.0 > Reporter: Lianet Magrans > Assignee: Lianet Magrans > Priority: Major > Labels: kip-848-client-support > Fix For: 3.8.0 > > > If we attempt to call consumer.position(tp) from within the > onPartitionsAssigned callback, the new consumer fails with a > TimeoutException. The expectation is that we should be able to retrieve the > position of newly assigned partitions, as it happens with the legacy > consumer, that allows this call. This is actually used from places within > Kafka itself (ex. Connect > [WorkerSinkTask|https://github.com/apache/kafka/blob/2c0b8b692071b6d436fa7e9dc90a1592476a6b17/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L715]) > > The failure in the new consumer is because the partitions that are assigned > but awaiting the onPartitionsAssigned callback, are excluded from the list of > partitions that should initialize. We should allow the partitions to > initialize their positions, without allowing to fetch data from them (which > is already achieve based on the isFetchable flag in the subscription state). > Note that a partition position can be updated from 2 places: call to > consumer.position or call to consumer.poll. Both will attempt to > `updateFetchPositions` if there is no valid position yet, but even after > having a valid position after those calls, the partition will remain > non-fetchable until the onPartitionsAssigned callback completes (fetchable > considers that the partitions has a valid position AND is not awaiting the > callback) > -- This message was sent by Atlassian Jira (v8.20.10#820010)