Zhanxiang (Patrick) Huang created KAFKA-8069:
------------------------------------------------
Summary: Committed offsets get cleaned up right after the
coordinator loading them back from __consumer_offsets in broker with old
inter-broker protocol version (< 2.2)
Key: KAFKA-8069
URL: https://issues.apache.org/jira/browse/KAFKA-8069
Project: Kafka
Issue Type: Bug
Reporter: Zhanxiang (Patrick) Huang
Assignee: Zhanxiang (Patrick) Huang
After the 2.1 release, if the broker hasn't been upgrade to the latest
inter-broker protocol version,
the committed offsets stored in the __consumer_offset topic will get cleaned up
way earlier than it should be when the offsets are loaded back from the
__consumer_offset topic in GroupCoordinator, which will happen during
leadership transition or after broker bounce.
TL;DR
For V1 on-disk format for __consumer_offsets, we have the *expireTimestamp*
field and if the inter-broker protocol (IBP) version is prior to 2.1 (prior to
[KIP-211|https://cwiki.apache.org/confluence/display/KAFKA/KIP-211%3A+Revise+Expiration+Semantics+of+Consumer+Group+Offsets])
for a kafka 2.1 broker, the logic of getting the expired offsets looks like:
{code:java}
def getExpiredOffsets(baseTimestamp: CommitRecordMetadataAndOffset => Long):
Map[TopicPartition, OffsetAndMetadata] = {
offsets.filter {
case (topicPartition, commitRecordMetadataAndOffset) =>
... && {
commitRecordMetadataAndOffset.offsetAndMetadata.expireTimestamp match {
case None =>
// current version with no per partition retention
currentTimestamp - baseTimestamp(commitRecordMetadataAndOffset) >=
offsetRetentionMs
case Some(expireTimestamp) =>
// older versions with explicit expire_timestamp field => old expiration
semantics is used
currentTimestamp >= expireTimestamp
}
}
}....
}
{code}
The expireTimestamp in the on-disk offset record can only be set when storing
the committed offset in the __consumer_offset topic. But the GroupCoordinator
also has keep a in-memory representation for the expireTimestamp (see the codes
above), which can be set in the following two cases:
# Upon the GroupCoordinator receiving OffsetCommitRequest, the expireTimestamp
is set using the following logic:
{code:java}
expireTimestamp = offsetCommitRequest.retentionTime match {
case OffsetCommitRequest.DEFAULT_RETENTION_TIME => None
case retentionTime => Some(currentTimestamp + retentionTime)
}
{code}
In all the latest client versions, the consumer will set out
OffsetCommitRequest with DEFAULT_RETENTION_TIME so the expireTimestamp will
always be None in this case. *This means any committed offset set in this case
will always hit the "case None" in the "getExpiredOffsets(...)" when
coordinator is doing the cleanup, which is correct.*
# Upon the GroupCoordinatorReceiving loading the committed offset stored in
the __consumer_offsets topic from disk, the expireTimestamp is set using the
following logic if IBP<2.1:
{code:java}
val expireTimestamp =
value.get(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
{code}
and the logic to persist the expireTimestamp is:
{code:java}
// OffsetCommitRequest.DEFAULT_TIMESTAMP = -1
value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1,
offsetAndMetadata.expireTimestamp.getOrElse(OffsetCommitRequest.DEFAULT_TIMESTAMP))
{code}
Since the in-memory expireTimestamp will always be None in our case as
mentioned in 1), we will always store -1 on-disk. Therefore, when the offset is
loaded from the __consumer_offsets topic, the in-memory expireTimestamp will
always be set to -1. *This means any committed offset set in this case will
always hit "case Some(expireTimestamp)" in the "getExpiredOffsets(...)" when
coordinator is doing the cleanup, which basically indicates we will always
expire the committed offset on the first expiration check (which is shortly
after they are loaded from __consumer_offsets topic)*.
I am able to reproduce this bug on my local box with one broker using 2.*,1.*
and 0.11.* consumer. The consumer will see null committed offset after the
broker is bounced.
This bug is introduced by [PR-5690|https://github.com/apache/kafka/pull/5690]
in the kafka 2.1 release and the fix is very straight-forward, which is
basically set the expireTimestamp to None if it is -1 in the on-disk format.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)