[ https://issues.apache.org/jira/browse/KAFKA-8069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16787609#comment-16787609 ]
Jason Gustafson commented on KAFKA-8069: ---------------------------------------- Great find and thanks for the detailed summary. I was able to confirm this on trunk. The only requirement is a 2.1+ broker with IBP set to an old version. I did the following: # Start up the broker with IBP set to 2.0 # Start up a console consumer (on trunk) and verify it commits offsets # Stop the consumer # Restart the broker We see the following in the logs: {code:java} [2019-03-07 22:49:25,774] DEBUG [GroupMetadataManager brokerId=0] Loaded group metadata GroupMetadata(groupId=blah, generation=4, protocolType=Some(consumer), currentState=Empty, members=Map()) with offsets Map(foo-0 -> CommitRecordMetadataAndOffset(Some(12),OffsetAndMetadata(offset=1, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1552027741320, expireTimestamp=Some(-1)))) and pending offsets Map() (kafka.coordinator.group.GroupMetadataManager) {code} The key is `*expireTimestamp=Some(-1)*`. Shortly after I see the coordinator expiring the offset. > Committed offsets get cleaned up right after the coordinator loading them > back from __consumer_offsets in broker with old inter-broker protocol version > (< 2.2) > --------------------------------------------------------------------------------------------------------------------------------------------------------------- > > Key: KAFKA-8069 > URL: https://issues.apache.org/jira/browse/KAFKA-8069 > Project: Kafka > Issue Type: Bug > Affects Versions: 2.1.0, 2.2.0, 2.1.1, 2.1.2, 2.2.1 > Reporter: Zhanxiang (Patrick) Huang > Assignee: Zhanxiang (Patrick) Huang > Priority: Critical > Fix For: 2.2.0 > > > After the 2.1 release, if the broker hasn't been upgrade to the latest > inter-broker protocol version, > the committed offsets stored in the __consumer_offset topic will get cleaned > up way earlier than it should be when the offsets are loaded back from the > __consumer_offset topic in GroupCoordinator, which will happen during > leadership transition or after broker bounce. > TL;DR > For V1 on-disk format for __consumer_offsets, we have the *expireTimestamp* > field and if the inter-broker protocol (IBP) version is prior to 2.1 (prior > to > [KIP-211|https://cwiki.apache.org/confluence/display/KAFKA/KIP-211%3A+Revise+Expiration+Semantics+of+Consumer+Group+Offsets]) > for a kafka 2.1 broker, the logic of getting the expired offsets looks like: > {code:java} > def getExpiredOffsets(baseTimestamp: CommitRecordMetadataAndOffset => Long): > Map[TopicPartition, OffsetAndMetadata] = { > offsets.filter { > case (topicPartition, commitRecordMetadataAndOffset) => > ... && { > commitRecordMetadataAndOffset.offsetAndMetadata.expireTimestamp match { > case None => > // current version with no per partition retention > currentTimestamp - baseTimestamp(commitRecordMetadataAndOffset) >= > offsetRetentionMs > case Some(expireTimestamp) => > // older versions with explicit expire_timestamp field => old expiration > semantics is used > currentTimestamp >= expireTimestamp > } > } > }.... > } > {code} > The expireTimestamp in the on-disk offset record can only be set when storing > the committed offset in the __consumer_offset topic. But the GroupCoordinator > also has keep a in-memory representation for the expireTimestamp (see the > codes above), which can be set in the following two cases: > # Upon the GroupCoordinator receiving OffsetCommitRequest, the > expireTimestamp is set using the following logic: > {code:java} > expireTimestamp = offsetCommitRequest.retentionTime match { > case OffsetCommitRequest.DEFAULT_RETENTION_TIME => None > case retentionTime => Some(currentTimestamp + retentionTime) > } > {code} > In all the latest client versions, the consumer will set out > OffsetCommitRequest with DEFAULT_RETENTION_TIME so the expireTimestamp will > always be None in this case. *This means any committed offset set in this > case will always hit the "case None" in the "getExpiredOffsets(...)" when > coordinator is doing the cleanup, which is correct.* > # Upon the GroupCoordinatorReceiving loading the committed offset stored in > the __consumer_offsets topic from disk, the expireTimestamp is set using the > following logic if IBP<2.1: > {code:java} > val expireTimestamp = > value.get(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1).asInstanceOf[Long] > {code} > and the logic to persist the expireTimestamp is: > {code:java} > // OffsetCommitRequest.DEFAULT_TIMESTAMP = -1 > value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, > offsetAndMetadata.expireTimestamp.getOrElse(OffsetCommitRequest.DEFAULT_TIMESTAMP)) > {code} > Since the in-memory expireTimestamp will always be None in our case as > mentioned in 1), we will always store -1 on-disk. Therefore, when the offset > is loaded from the __consumer_offsets topic, the in-memory expireTimestamp > will always be set to -1. *This means any committed offset set in this case > will always hit "case Some(expireTimestamp)" in the "getExpiredOffsets(...)" > when coordinator is doing the cleanup, which basically indicates we will > always expire the committed offset on the first expiration check (which is > shortly after they are loaded from __consumer_offsets topic)*. > I am able to reproduce this bug on my local box with one broker using 2.*,1.* > and 0.11.* consumer. The consumer will see null committed offset after the > broker is bounced. > This bug is introduced by [PR-5690|https://github.com/apache/kafka/pull/5690] > in the kafka 2.1 release and the fix is very straight-forward, which is > basically set the expireTimestamp to None if it is -1 in the on-disk format. -- This message was sent by Atlassian JIRA (v7.6.3#76005)