[ 
https://issues.apache.org/jira/browse/KAFKA-17066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-17066:
---------------------------------

    Assignee: Kirk True

> New consumer updateFetchPositions should perform all operations in background 
> thread
> ------------------------------------------------------------------------------------
>
>                 Key: KAFKA-17066
>                 URL: https://issues.apache.org/jira/browse/KAFKA-17066
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, consumer
>    Affects Versions: 3.8.0
>            Reporter: Lianet Magrans
>            Assignee: Kirk True
>            Priority: Blocker
>              Labels: consumer-threading-refactor, kip-848-client-support
>             Fix For: 3.9.0
>
>
> The updateFetchPositions func in the new consumer performs several actions 
> based on the assigned partitions from the subscriptionState. The way it's 
> currently implemented, it fetches committed offsets for partitions that 
> required a position (retrieved from subscription state in the app thread), 
> and then resets positions for the partitions still needing one (retrieved 
> from the subscription state but in the backgroud thread). 
> This is problematic, given that the assignment/subscriptionState may change 
> in the background thread at any time (ex. new partitions reconciled), so we 
> could end up resetting positions to the partition offsets for a partition for 
> which we never evetn attempted to retrieve committed offsets.  
> This sequence for a consumer that owns a partitions tp0,:
>  * consumer owns tp0
>  * app thread -> updateFetchPositions triggers 
> initWithCommittedOffsetsIfNeeded to retrieve committed offsets for assigned 
> partitions requiring a position (taking them from 
> subscriptions.initializingPartitions()). This will fetch committed offsets 
> for tp0 only.
>  * background thread -> receives new partition tp1 and completes 
> reconciliation (adds it to the subscription state as INITIALIZING, requires a 
> position)
>  * app thread -> updateFetchPositions resets positions for all partitions 
> that still don't have a valid position after initWithCommittedOffsetsIfNeeded 
> (taking them from subscriptionState.partitionsNeedingReset). This will 
> mistakenly consider that it should reset tp1 to the partition offsets, when 
> in reality it never even tried fetching the committed offsets for it because 
> it wasn't assigned when initWithCommittedOffsetsIfNeeded happened. 
> We should consider moving the updateFetchPositions as a single event to the 
> background, that would safely use the subscriptionState object and apply all 
> actions involved in the updateFetchPositions to the same consistent set of 
> partitions assigned at that moment. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to