[jira] [Updated] (KAFKA-17066) New consumer updateFetchPositions should perform all operations in background thread

2024-08-09 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe updated KAFKA-17066:
-
Fix Version/s: 4.0.0
   (was: 3.9.0)

> New consumer updateFetchPositions should perform all operations in background 
> thread
> 
>
> Key: KAFKA-17066
> URL: https://issues.apache.org/jira/browse/KAFKA-17066
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.8.0
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Blocker
>  Labels: consumer-threading-refactor, kip-848-client-support
> Fix For: 4.0.0
>
>
> The updateFetchPositions func in the new consumer performs several actions 
> based on the assigned partitions from the subscriptionState. The way it's 
> currently implemented, it fetches committed offsets for partitions that 
> required a position (retrieved from subscription state in the app thread), 
> and then resets positions for the partitions still needing one (retrieved 
> from the subscription state but in the backgroud thread). 
> This is problematic, given that the assignment/subscriptionState may change 
> in the background thread at any time (ex. new partitions reconciled), so we 
> could end up resetting positions to the partition offsets for a partition for 
> which we never evetn attempted to retrieve committed offsets.  
> This sequence for a consumer that owns a partitions tp0,:
>  * consumer owns tp0
>  * app thread -> updateFetchPositions triggers 
> initWithCommittedOffsetsIfNeeded to retrieve committed offsets for assigned 
> partitions requiring a position (taking them from 
> subscriptions.initializingPartitions()). This will fetch committed offsets 
> for tp0 only.
>  * background thread -> receives new partition tp1 and completes 
> reconciliation (adds it to the subscription state as INITIALIZING, requires a 
> position)
>  * app thread -> updateFetchPositions resets positions for all partitions 
> that still don't have a valid position after initWithCommittedOffsetsIfNeeded 
> (taking them from subscriptionState.partitionsNeedingReset). This will 
> mistakenly consider that it should reset tp1 to the partition offsets, when 
> in reality it never even tried fetching the committed offsets for it because 
> it wasn't assigned when initWithCommittedOffsetsIfNeeded happened. 
> We should consider moving the updateFetchPositions as a single event to the 
> background, that would safely use the subscriptionState object and apply all 
> actions involved in the updateFetchPositions to the same consistent set of 
> partitions assigned at that moment. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17066) New consumer updateFetchPositions should perform all operations in background thread

2024-07-08 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-17066:
---
Labels: consumer-threading-refactor kip-848-client-support  (was: )

> New consumer updateFetchPositions should perform all operations in background 
> thread
> 
>
> Key: KAFKA-17066
> URL: https://issues.apache.org/jira/browse/KAFKA-17066
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.8.0
>Reporter: Lianet Magrans
>Priority: Blocker
>  Labels: consumer-threading-refactor, kip-848-client-support
> Fix For: 3.9.0
>
>
> The updateFetchPositions func in the new consumer performs several actions 
> based on the assigned partitions from the subscriptionState. The way it's 
> currently implemented, it fetches committed offsets for partitions that 
> required a position (retrieved from subscription state in the app thread), 
> and then resets positions for the partitions still needing one (retrieved 
> from the subscription state but in the backgroud thread). 
> This is problematic, given that the assignment/subscriptionState may change 
> in the background thread at any time (ex. new partitions reconciled), so we 
> could end up resetting positions to the partition offsets for a partition for 
> which we never evetn attempted to retrieve committed offsets.  
> This sequence for a consumer that owns a partitions tp0,:
>  * consumer owns tp0
>  * app thread -> updateFetchPositions triggers 
> initWithCommittedOffsetsIfNeeded to retrieve committed offsets for assigned 
> partitions requiring a position (taking them from 
> subscriptions.initializingPartitions()). This will fetch committed offsets 
> for tp0 only.
>  * background thread -> receives new partition tp1 and completes 
> reconciliation (adds it to the subscription state as INITIALIZING, requires a 
> position)
>  * app thread -> updateFetchPositions resets positions for all partitions 
> that still don't have a valid position after initWithCommittedOffsetsIfNeeded 
> (taking them from subscriptionState.partitionsNeedingReset). This will 
> mistakenly consider that it should reset tp1 to the partition offsets, when 
> in reality it never even tried fetching the committed offsets for it because 
> it wasn't assigned when initWithCommittedOffsetsIfNeeded happened. 
> We should consider moving the updateFetchPositions as a single event to the 
> background, that would safely use the subscriptionState object and apply all 
> actions involved in the updateFetchPositions to the same consistent set of 
> partitions assigned at that moment. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17066) New consumer updateFetchPositions should perform all operations in background thread

2024-07-03 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-17066:
--
Component/s: clients

> New consumer updateFetchPositions should perform all operations in background 
> thread
> 
>
> Key: KAFKA-17066
> URL: https://issues.apache.org/jira/browse/KAFKA-17066
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.8.0
>Reporter: Lianet Magrans
>Priority: Blocker
> Fix For: 3.9.0
>
>
> The updateFetchPositions func in the new consumer performs several actions 
> based on the assigned partitions from the subscriptionState. The way it's 
> currently implemented, it fetches committed offsets for partitions that 
> required a position (retrieved from subscription state in the app thread), 
> and then resets positions for the partitions still needing one (retrieved 
> from the subscription state but in the backgroud thread). 
> This is problematic, given that the assignment/subscriptionState may change 
> in the background thread at any time (ex. new partitions reconciled), so we 
> could end up resetting positions to the partition offsets for a partition for 
> which we never evetn attempted to retrieve committed offsets.  
> This sequence for a consumer that owns a partitions tp0,:
>  * consumer owns tp0
>  * app thread -> updateFetchPositions triggers 
> initWithCommittedOffsetsIfNeeded to retrieve committed offsets for assigned 
> partitions requiring a position (taking them from 
> subscriptions.initializingPartitions()). This will fetch committed offsets 
> for tp0 only.
>  * background thread -> receives new partition tp1 and completes 
> reconciliation (adds it to the subscription state as INITIALIZING, requires a 
> position)
>  * app thread -> updateFetchPositions resets positions for all partitions 
> that still don't have a valid position after initWithCommittedOffsetsIfNeeded 
> (taking them from subscriptionState.partitionsNeedingReset). This will 
> mistakenly consider that it should reset tp1 to the partition offsets, when 
> in reality it never even tried fetching the committed offsets for it because 
> it wasn't assigned when initWithCommittedOffsetsIfNeeded happened. 
> We should consider moving the updateFetchPositions as a single event to the 
> background, that would safely use the subscriptionState object and apply all 
> actions involved in the updateFetchPositions to the same consistent set of 
> partitions assigned at that moment. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17066) New consumer updateFetchPositions should perform all operations in background thread

2024-07-02 Thread Lin Siyuan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lin Siyuan updated KAFKA-17066:
---
Description: 
The updateFetchPositions func in the new consumer performs several actions 
based on the assigned partitions from the subscriptionState. The way it's 
currently implemented, it fetches committed offsets for partitions that 
required a position (retrieved from subscription state in the app thread), and 
then resets positions for the partitions still needing one (retrieved from the 
subscription state but in the backgroud thread). 
This is problematic, given that the assignment/subscriptionState may change in 
the background thread at any time (ex. new partitions reconciled), so we could 
end up resetting positions to the partition offsets for a partition for which 
we never evetn attempted to retrieve committed offsets.  

This sequence for a consumer that owns a partitions tp0,:
 * consumer owns tp0
 * app thread -> updateFetchPositions triggers 
initWithCommittedOffsetsIfNeeded to retrieve committed offsets for assigned 
partitions requiring a position (taking them from 
subscriptions.initializingPartitions()). This will fetch committed offsets for 
tp0 only.
 * background thread -> receives new partition tp1 and completes reconciliation 
(adds it to the subscription state as INITIALIZING, requires a position)

 * app thread -> updateFetchPositions resets positions for all partitions that 
still don't have a valid position after initWithCommittedOffsetsIfNeeded 
(taking them from subscriptionState.partitionsNeedingReset). This will 
mistakenly consider that it should reset tp1 to the partition offsets, when in 
reality it never even tried fetching the committed offsets for it because it 
wasn't assigned when initWithCommittedOffsetsIfNeeded happened. 

We should consider moving the updateFetchPositions as a single event to the 
background, that would safely use the subscriptionState object and apply all 
actions involved in the updateFetchPositions to the same consistent set of 
partitions assigned at that moment. 

  was:
{color:red}着色文本{color}The updateFetchPositions func in the new consumer 
performs several actions based on the assigned partitions from the 
subscriptionState. The way it's currently implemented, it fetches committed 
offsets for partitions that required a position (retrieved from subscription 
state in the app thread), and then resets positions for the partitions still 
needing one (retrieved from the subscription state but in the backgroud 
thread). 
This is problematic, given that the assignment/subscriptionState may change in 
the background thread at any time (ex. new partitions reconciled), so we could 
end up resetting positions to the partition offsets for a partition for which 
we never evetn attempted to retrieve committed offsets.  

This sequence for a consumer that owns a partitions tp0,:
 * consumer owns tp0
 * app thread -> updateFetchPositions triggers 
initWithCommittedOffsetsIfNeeded to retrieve committed offsets for assigned 
partitions requiring a position (taking them from 
subscriptions.initializingPartitions()). This will fetch committed offsets for 
tp0 only. 
 * 
background thread -> receives new partition tp1 and completes reconciliation 
(adds it to the subscription state as INITIALIZING, requires a position)
 * app thread -> updateFetchPositions resets positions for all partitions that 
still don't have a valid position after initWithCommittedOffsetsIfNeeded 
(taking them from subscriptionState.partitionsNeedingReset). This will 
mistakenly consider that it should reset tp1 to the partition offsets, when in 
reality it never even tried fetching the committed offsets for it because it 
wasn't assigned when initWithCommittedOffsetsIfNeeded happened. 

We should consider moving the updateFetchPositions as a single event to the 
background, that would safely use the subscriptionState object and apply all 
actions involved in the updateFetchPositions to the same consistent set of 
partitions assigned at that moment. 


> New consumer updateFetchPositions should perform all operations in background 
> thread
> 
>
> Key: KAFKA-17066
> URL: https://issues.apache.org/jira/browse/KAFKA-17066
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 3.8.0
>Reporter: Lianet Magrans
>Priority: Blocker
> Fix For: 3.9.0
>
>
> The updateFetchPositions func in the new consumer performs several actions 
> based on the assigned partitions from the subscriptionState. The way it's 
> currently implemented, it fetches committed offsets for partitions that 
> required a position (retrieved from subscription state in the app thread), 
> and then resets positions for the par

[jira] [Updated] (KAFKA-17066) New consumer updateFetchPositions should perform all operations in background thread

2024-07-02 Thread Lin Siyuan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lin Siyuan updated KAFKA-17066:
---
Description: 
{color:red}着色文本{color}The updateFetchPositions func in the new consumer 
performs several actions based on the assigned partitions from the 
subscriptionState. The way it's currently implemented, it fetches committed 
offsets for partitions that required a position (retrieved from subscription 
state in the app thread), and then resets positions for the partitions still 
needing one (retrieved from the subscription state but in the backgroud 
thread). 
This is problematic, given that the assignment/subscriptionState may change in 
the background thread at any time (ex. new partitions reconciled), so we could 
end up resetting positions to the partition offsets for a partition for which 
we never evetn attempted to retrieve committed offsets.  

This sequence for a consumer that owns a partitions tp0,:
 * consumer owns tp0
 * app thread -> updateFetchPositions triggers 
initWithCommittedOffsetsIfNeeded to retrieve committed offsets for assigned 
partitions requiring a position (taking them from 
subscriptions.initializingPartitions()). This will fetch committed offsets for 
tp0 only. 
 * 
background thread -> receives new partition tp1 and completes reconciliation 
(adds it to the subscription state as INITIALIZING, requires a position)
 * app thread -> updateFetchPositions resets positions for all partitions that 
still don't have a valid position after initWithCommittedOffsetsIfNeeded 
(taking them from subscriptionState.partitionsNeedingReset). This will 
mistakenly consider that it should reset tp1 to the partition offsets, when in 
reality it never even tried fetching the committed offsets for it because it 
wasn't assigned when initWithCommittedOffsetsIfNeeded happened. 

We should consider moving the updateFetchPositions as a single event to the 
background, that would safely use the subscriptionState object and apply all 
actions involved in the updateFetchPositions to the same consistent set of 
partitions assigned at that moment. 

  was:
The updateFetchPositions func in the new consumer performs several actions 
based on the assigned partitions from the subscriptionState. The way it's 
currently implemented, it fetches committed offsets for partitions that 
required a position (retrieved from subscription state in the app thread), and 
then resets positions for the partitions still needing one (retrieved from the 
subscription state but in the backgroud thread). 
This is problematic, given that the assignment/subscriptionState may change in 
the background thread at any time (ex. new partitions reconciled), so we could 
end up resetting positions to the partition offsets for a partition for which 
we never evetn attempted to retrieve committed offsets.  

This sequence for a consumer that owns a partitions tp0,:
 * consumer owns tp0
 * app thread -> updateFetchPositions triggers 
initWithCommittedOffsetsIfNeeded to retrieve committed offsets for assigned 
partitions requiring a position (taking them from 
subscriptions.initializingPartitions()). This will fetch committed offsets for 
tp0 only. 
 * 
background thread -> receives new partition tp1 and completes reconciliation 
(adds it to the subscription state as INITIALIZING, requires a position)
 * app thread -> updateFetchPositions resets positions for all partitions that 
still don't have a valid position after initWithCommittedOffsetsIfNeeded 
(taking them from subscriptionState.partitionsNeedingReset). This will 
mistakenly consider that it should reset tp1 to the partition offsets, when in 
reality it never even tried fetching the committed offsets for it because it 
wasn't assigned when initWithCommittedOffsetsIfNeeded happened. 

We should consider moving the updateFetchPositions as a single event to the 
background, that would safely use the subscriptionState object and apply all 
actions involved in the updateFetchPositions to the same consistent set of 
partitions assigned at that moment. 


> New consumer updateFetchPositions should perform all operations in background 
> thread
> 
>
> Key: KAFKA-17066
> URL: https://issues.apache.org/jira/browse/KAFKA-17066
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 3.8.0
>Reporter: Lianet Magrans
>Priority: Blocker
> Fix For: 3.9.0
>
>
> {color:red}着色文本{color}The updateFetchPositions func in the new consumer 
> performs several actions based on the assigned partitions from the 
> subscriptionState. The way it's currently implemented, it fetches committed 
> offsets for partitions that required a position (retrieved from subscription 
> state in the app thread), and then reset

[jira] [Updated] (KAFKA-17066) New consumer updateFetchPositions should perform all operations in background thread

2024-07-02 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-17066:
---
Priority: Blocker  (was: Major)

> New consumer updateFetchPositions should perform all operations in background 
> thread
> 
>
> Key: KAFKA-17066
> URL: https://issues.apache.org/jira/browse/KAFKA-17066
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 3.8.0
>Reporter: Lianet Magrans
>Priority: Blocker
> Fix For: 3.9.0
>
>
> The updateFetchPositions func in the new consumer performs several actions 
> based on the assigned partitions from the subscriptionState. The way it's 
> currently implemented, it fetches committed offsets for partitions that 
> required a position (retrieved from subscription state in the app thread), 
> and then resets positions for the partitions still needing one (retrieved 
> from the subscription state but in the backgroud thread). 
> This is problematic, given that the assignment/subscriptionState may change 
> in the background thread at any time (ex. new partitions reconciled), so we 
> could end up resetting positions to the partition offsets for a partition for 
> which we never evetn attempted to retrieve committed offsets.  
> This sequence for a consumer that owns a partitions tp0,:
>  * consumer owns tp0
>  * app thread -> updateFetchPositions triggers 
> initWithCommittedOffsetsIfNeeded to retrieve committed offsets for assigned 
> partitions requiring a position (taking them from 
> subscriptions.initializingPartitions()). This will fetch committed offsets 
> for tp0 only. 
>  * 
> background thread -> receives new partition tp1 and completes reconciliation 
> (adds it to the subscription state as INITIALIZING, requires a position)
>  * app thread -> updateFetchPositions resets positions for all partitions 
> that still don't have a valid position after initWithCommittedOffsetsIfNeeded 
> (taking them from subscriptionState.partitionsNeedingReset). This will 
> mistakenly consider that it should reset tp1 to the partition offsets, when 
> in reality it never even tried fetching the committed offsets for it because 
> it wasn't assigned when initWithCommittedOffsetsIfNeeded happened. 
> We should consider moving the updateFetchPositions as a single event to the 
> background, that would safely use the subscriptionState object and apply all 
> actions involved in the updateFetchPositions to the same consistent set of 
> partitions assigned at that moment. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)