[ https://issues.apache.org/jira/browse/KAFKA-13636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836455#comment-17836455 ]
zhangzhisheng commented on KAFKA-13636: --------------------------------------- Kafka broker version is 2.7.2 Client version is 2.5.2. auto.offset.reset is earliest there have been instances where offset messages were deleted on the consumer side. {code:java} // code placeholder org.apache.kafka.clients.consumer.internals.SubscriptionState SubscriptionState.java:397 [trace=,span=] - [Consumer clientId=app1_a2acc55fc0bbac81, groupId=gid-app1] Resetting offset for partition topic-4 to offset 3577938321. {code} > Committed offsets could be deleted during a rebalance if a group did not > commit for a while > ------------------------------------------------------------------------------------------- > > Key: KAFKA-13636 > URL: https://issues.apache.org/jira/browse/KAFKA-13636 > Project: Kafka > Issue Type: Bug > Components: core, offset manager > Affects Versions: 2.4.0, 2.5.1, 2.6.2, 2.7.2, 2.8.1, 3.0.0 > Reporter: Damien Gasparina > Assignee: Prince Mahajan > Priority: Major > Fix For: 3.0.1, 2.8.2, 3.2.0, 3.1.1 > > > The group coordinator might delete invalid offsets during a group rebalance. > During a rebalance, the coordinator is relying on the last commit timestamp > ({_}offsetAndMetadata.commitTimestamp{_}) instead of the last state > modification {_}timestamp (currentStateTimestamp{_}) to detect expired > offsets. > > This is relatively easy to reproduce by playing with > group.initial.rebalance.delay.ms, offset.retention.minutes and > offset.check.retention.interval, I uploaded an example on: > [https://github.com/Dabz/kafka-example/tree/master/docker/offsets-retention] . > This script does: > * Start a broker with: offset.retention.minute=2, > o[ffset.check.retention.interval.ms=|http://offset.check.retention.interval.ms/]1000, > group.initial.rebalance.delay=20000 > * Produced 10 messages > * Create a consumer group to consume 10 messages, and disable auto.commit to > only commit a few times > * Wait 3 minutes, then the Consumer get a {{kill -9}} > * Restart the consumer after a few seconds > * The consumer restart from {{auto.offset.reset}} , the offset got removed > > The cause is due to the GroupMetadata.scala: > * When the group get emptied, the {{subscribedTopics}} is set to > {{Set.empty}} > ([https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala#L520-L521]) > * When the new member joins, we add the new member right away in the group ; > BUT the {{subscribedTopics}} is only updated once the migration is over (in > the initNewGeneration) (which could take a while due to the > {{{}group.initial.rebalance.delay{}}}) > * When the log cleaner got executed, {{subscribedTopics.isDefined}} returns > true as {{Set.empty != None}} (the underlying condition) > * Thus we enter > [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala#L782-L785] > with an empty {{subscribedTopics}} list and we are relying on the > {{commitTimestamp}} regardless of the {{currentStateTimestamp}} > > This seem to be a regression generated by KIP-496 > https://cwiki.apache.org/confluence/display/KAFKA/KIP-496%3A+Administrative+API+to+delete+consumer+offsets#KIP496:AdministrativeAPItodeleteconsumeroffsets-ProposedChanges > (KAFKA-8338, KAFKA-8370) -- This message was sent by Atlassian Jira (v8.20.10#820010)