[ 
https://issues.apache.org/jira/browse/KAFKA-13636?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Olson updated KAFKA-13636:
---------------------------------
    Description: 
The group coordinator might delete invalid offsets during a group rebalance. 
During a rebalance, the coordinator is relying on the last commit timestamp 
({_}offsetAndMetadata.commitTimestamp{_}) instead of the last state 
modification {_}timestamp (currentStateTimestamp{_}) to detect expired offsets.

 

This is relatively easy to reproduce by playing with 
group.initial.rebalance.delay.ms, offset.retention.minutes and 
offset.check.retention.interval, I uploaded an example on: 
[https://github.com/Dabz/kafka-example/tree/master/docker/offsets-retention] .

This script does:
 * Start a broker with: offset.retention.minute=2, 
o[ffset.check.retention.interval.ms=|http://offset.check.retention.interval.ms/]1000,
  group.initial.rebalance.delay=20000
 * Produced 10 messages
 * Create a consumer group to consume 10 messages, and disable auto.commit to 
only commit a few times
 * Wait 3 minutes, then the Consumer get a {{kill -9}}
 * Restart the consumer after a few seconds
 * The consumer restart from {{auto.offset.reset}} , the offset got removed

 

The cause is due to the GroupMetadata.scala:
 * When the group get emptied, the {{subscribedTopics}} is set to {{Set.empty}} 
([https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala#L520-L521])
 * When the new member joins, we add the new member right away in the group ; 
BUT the {{subscribedTopics}} is only updated once the migration is over (in the 
initNewGeneration) (which could take a while due to the 
{{{}group.initial.rebalance.delay{}}})
 * When the log cleaner got executed,  {{subscribedTopics.isDefined}} returns 
true as {{Set.empty != None}} (the underlying condition)
 * Thus we enter 
[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala#L782-L785]
 with an empty {{subscribedTopics}} list and we are relying on the 
{{commitTimestamp}} regardless of the {{currentStateTimestamp}}

 

This seem to be a regression generated by KIP-496 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-496%3A+Administrative+API+to+delete+consumer+offsets#KIP496:AdministrativeAPItodeleteconsumeroffsets-ProposedChanges
 (KAFKA-8338, KAFKA-8370)

  was:
The group coordinator might delete invalid offsets during a group rebalance. 
During a rebalance, the coordinator is relying on the last commit timestamp 
({_}offsetAndMetadata.commitTimestamp{_}) instead of the last state 
modification {_}timestampt (currentStateTimestamp{_}) to detect expired offsets.

 

This is relatively easy to reproduce by playing with 
group.initial.rebalance.delay.ms, offset.retention.minutes and 
offset.check.retention.interval, I uploaded an example on: 
[https://github.com/Dabz/kafka-example/tree/master/docker/offsets-retention] .

This script does:
 * Start a broker with: offset.retention.minute=2, 
o[ffset.check.retention.interval.ms=|http://offset.check.retention.interval.ms/]1000,
  group.initial.rebalance.delay=20000
 * Produced 10 messages
 * Create a consumer group to consume 10 messages, and disable auto.commit to 
only commit a few times
 * Wait 3 minutes, then the Consumer get a {{kill -9}}
 * Restart the consumer after a few seconds
 * The consumer restart from {{auto.offset.reset}} , the offset got removed

 

The cause is due to the GroupMetadata.scala:
 * When the group get emptied, the {{subscribedTopics}} is set to {{Set.empty}} 
([https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala#L520-L521])
 * When the new member joins, we add the new member right away in the group ; 
BUT the {{subscribedTopics}} is only updated once the migration is over (in the 
initNewGeneration) (which could take a while due to the 
{{{}group.initial.rebalance.delay{}}})
 * When the log cleaner got executed,  {{subscribedTopics.isDefined}} returns 
true as {{Set.empty != None}} (the underlying condition)
 * Thus we enter 
[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala#L782-L785]
 with an empty {{subscribedTopics}} list and we are relying on the 
{{commitTimestamp}} regardless of the {{currentStateTimestamp}}

 

This seem to be a regression generated by KIP-496 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-496%3A+Administrative+API+to+delete+consumer+offsets#KIP496:AdministrativeAPItodeleteconsumeroffsets-ProposedChanges


> Committed offsets could be deleted during a rebalance if a group did not 
> commit for a while
> -------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-13636
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13636
>             Project: Kafka
>          Issue Type: Bug
>          Components: core, offset manager
>    Affects Versions: 2.4.0, 2.5.1, 2.6.2, 2.7.2, 2.8.1, 3.0.0
>            Reporter: Damien Gasparina
>            Assignee: Prince Mahajan
>            Priority: Major
>             Fix For: 3.0.1, 2.8.2, 3.2.0, 3.1.1
>
>
> The group coordinator might delete invalid offsets during a group rebalance. 
> During a rebalance, the coordinator is relying on the last commit timestamp 
> ({_}offsetAndMetadata.commitTimestamp{_}) instead of the last state 
> modification {_}timestamp (currentStateTimestamp{_}) to detect expired 
> offsets.
>  
> This is relatively easy to reproduce by playing with 
> group.initial.rebalance.delay.ms, offset.retention.minutes and 
> offset.check.retention.interval, I uploaded an example on: 
> [https://github.com/Dabz/kafka-example/tree/master/docker/offsets-retention] .
> This script does:
>  * Start a broker with: offset.retention.minute=2, 
> o[ffset.check.retention.interval.ms=|http://offset.check.retention.interval.ms/]1000,
>   group.initial.rebalance.delay=20000
>  * Produced 10 messages
>  * Create a consumer group to consume 10 messages, and disable auto.commit to 
> only commit a few times
>  * Wait 3 minutes, then the Consumer get a {{kill -9}}
>  * Restart the consumer after a few seconds
>  * The consumer restart from {{auto.offset.reset}} , the offset got removed
>  
> The cause is due to the GroupMetadata.scala:
>  * When the group get emptied, the {{subscribedTopics}} is set to 
> {{Set.empty}} 
> ([https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala#L520-L521])
>  * When the new member joins, we add the new member right away in the group ; 
> BUT the {{subscribedTopics}} is only updated once the migration is over (in 
> the initNewGeneration) (which could take a while due to the 
> {{{}group.initial.rebalance.delay{}}})
>  * When the log cleaner got executed,  {{subscribedTopics.isDefined}} returns 
> true as {{Set.empty != None}} (the underlying condition)
>  * Thus we enter 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala#L782-L785]
>  with an empty {{subscribedTopics}} list and we are relying on the 
> {{commitTimestamp}} regardless of the {{currentStateTimestamp}}
>  
> This seem to be a regression generated by KIP-496 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-496%3A+Administrative+API+to+delete+consumer+offsets#KIP496:AdministrativeAPItodeleteconsumeroffsets-ProposedChanges
>  (KAFKA-8338, KAFKA-8370)



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to