[ https://issues.apache.org/jira/browse/KAFKA-17066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Lianet Magrans reassigned KAFKA-17066: -------------------------------------- Assignee: Lianet Magrans (was: Kirk True) > New consumer updateFetchPositions should perform all operations in background > thread > ------------------------------------------------------------------------------------ > > Key: KAFKA-17066 > URL: https://issues.apache.org/jira/browse/KAFKA-17066 > Project: Kafka > Issue Type: Bug > Components: clients, consumer > Affects Versions: 3.8.0 > Reporter: Lianet Magrans > Assignee: Lianet Magrans > Priority: Blocker > Labels: consumer-threading-refactor, kip-848-client-support > Fix For: 3.9.0 > > > The updateFetchPositions func in the new consumer performs several actions > based on the assigned partitions from the subscriptionState. The way it's > currently implemented, it fetches committed offsets for partitions that > required a position (retrieved from subscription state in the app thread), > and then resets positions for the partitions still needing one (retrieved > from the subscription state but in the backgroud thread). > This is problematic, given that the assignment/subscriptionState may change > in the background thread at any time (ex. new partitions reconciled), so we > could end up resetting positions to the partition offsets for a partition for > which we never evetn attempted to retrieve committed offsets. > This sequence for a consumer that owns a partitions tp0,: > * consumer owns tp0 > * app thread -> updateFetchPositions triggers > initWithCommittedOffsetsIfNeeded to retrieve committed offsets for assigned > partitions requiring a position (taking them from > subscriptions.initializingPartitions()). This will fetch committed offsets > for tp0 only. > * background thread -> receives new partition tp1 and completes > reconciliation (adds it to the subscription state as INITIALIZING, requires a > position) > * app thread -> updateFetchPositions resets positions for all partitions > that still don't have a valid position after initWithCommittedOffsetsIfNeeded > (taking them from subscriptionState.partitionsNeedingReset). This will > mistakenly consider that it should reset tp1 to the partition offsets, when > in reality it never even tried fetching the committed offsets for it because > it wasn't assigned when initWithCommittedOffsetsIfNeeded happened. > We should consider moving the updateFetchPositions as a single event to the > background, that would safely use the subscriptionState object and apply all > actions involved in the updateFetchPositions to the same consistent set of > partitions assigned at that moment. -- This message was sent by Atlassian Jira (v8.20.10#820010)