Sam Cantero created KAFKA-15035:
-----------------------------------

             Summary: Consumer offsets can be deleted immediately if kafka does 
not detect a consumer as dead
                 Key: KAFKA-15035
                 URL: https://issues.apache.org/jira/browse/KAFKA-15035
             Project: Kafka
          Issue Type: Bug
    Affects Versions: 2.7.2
            Reporter: Sam Cantero


We've recently encountered a scenario where a consumer group got their 
committed offsets deleted almost right after (around 3 minutes) the consumer 
got into inactive state.

As per 
[KIP-211|https://cwiki.apache.org/confluence/display/KAFKA/KIP-211%3A+Revise+Expiration+Semantics+of+Consumer+Group+Offsets],
 committed offsets for an active (i.e running) consumer group should not be 
deleted. However, if a consumer becomes inactive, {+}the deletion of committed 
offsets will not occur immediately{+}. Instead, the committed offsets will only 
be removed if the consumer remains inactive for at least the duration specified 
by 
[offset.retention.minutes|https://kafka.apache.org/documentation/#brokerconfigs_offsets.retention.minutes].

In our case {{offset.retention.minutes}} is set to 7 days and the consumer was 
only inactive for 5 minutes, so deletion should have not occurred.

By inspecting the KIP-211 further, we can find the following sentence:
{quote}If a group consumer unsubscribes from a topic but continues to consume 
from other subscribed topics, the offset information of that unsubscribed 
topic’s partitions should be deleted at the appropriate time.
{quote}
And later on:
{quote}If there are partitions the group has offset for but no longer consumes 
from, and offsets.retention.minutes has passed since their last commit 
timestamp, the corresponding offsets will be removed from the offset cache
{quote}
It is implied, though {*}+this is what I want to confirm in this ticket+{*}, 
that Kafka employs two approaches for offset expiration:
 * The deletion timer is activated when a consumer group enters the Empty state 
(i.e., not running). Once the timer exceeds the {{offset.retention.minutes}} 
threshold, the committed offsets are deleted.
 * If a consumer is in a "running" state (i.e., not in the Empty state) but is 
no longer consuming from topics with committed offsets older than the 
offset.retention.minutes duration, the committed offsets are deleted.

Note that the second approach only takes into account the timestamp of the last 
committed offset.

Throughout this event, the affected consumer group didn’t transition into Empty 
state. Based on the kafka logs, the consumer group was not detected as Empty, 
indicating that Kafka considered the consumer to be running from its 
perspective. It’s unclear why kafka didn’t detect this consumer group as Empty.
{noformat}
01:30:47 am - [GroupCoordinator 1]: Member consumer-mycg-1-uuid in group mycg 
has failed, removing it from the group

01:30:47 am - [GroupCoordinator 1]: Preparing to rebalance group mycg in state 
PreparingRebalance with old generation 432 (__consumer_offsets-16) (reason: 
removing member consumer-mycg-1-uuid on heartbeat expiration)

1:30:50 am - [GroupCoordinator 1]: Member consumer-mycg-2-uuid in group mycg 
has failed, removing it from the group

01:30:50 am - [GroupCoordinator 1]: Stabilized group mycg generation 433 
(__consumer_offsets-16)

01:30:50 am - [GroupCoordinator 1]: Assignment received from leader for group 
mycg for generation 433{noformat}
This suggests that kafka might have followed the second approach and that's why 
kafka deleted the offsets 3 minutes later.
{noformat}
1:33:17 am - 
[GroupMetadataManager brokerId=1] Removed 285 expired offsets in 8 
milliseconds.{noformat}
As a reference a regular consumer join/startup logs looks like this. The group 
is stabilised and the assignment from the leader received.
{noformat}
 [GroupCoordinator 0]: Preparing to rebalance group mycg in state 
PreparingRebalance with old generation 6 (__consumer_offsets-22) (reason: 
Adding new member consumer-mycg-1-2b8ba689-fbaa-4829-82f5-dd2ed1d89d86 with 
group instance id None) (kafka.coordinator.group.GroupCoordinator)
 
[GroupCoordinator 0]: Stabilized group mycg generation 7 
(__consumer_offsets-22) (kafka.coordinator.group.GroupCoordinator)

[GroupCoordinator 0]: Assignment received from leader for group mycg for 
generation 7 (kafka.coordinator.group.GroupCoordinator){noformat}
As a reference a regular consumer leave/shutdown logs looks like this. NOTE how 
the consumer group moves into empty state.

 
{noformat}
[GroupCoordinator 0]: Member[group.instance.id None, member.id 
consumer-mycg-1-eb77a142-5b64-476e-bc3d-2731c9b811a7] in group mycg has left, 
removing it from the group (kafka.coordinator.group.GroupCoordinator)

[GroupCoordinator 0]: Preparing to rebalance group mycg in state 
PreparingRebalance with old generation 8 (__consumer_offsets-22) (reason: 
removing member consumer-mycg-1-eb77a142-5b64-476e-bc3d-2731c9b811a7 on 
LeaveGroup) (kafka.coordinator.group.GroupCoordinator)

[GroupCoordinator 0]: Group mycg with generation 9 is now empty 
(__consumer_offsets-22) (kafka.coordinator.group.GroupCoordinator){noformat}
 
As another piece of information, the consumer's underlying node experienced an 
unclean termination, which could have played a role in Kafka's failure to 
identify the consumer group as inactive.

In summary, when combining Kafka's expiration semantics of committed offsets 
with Kafka's failure to detect a consumer in a dead state, it is possible for 
committed offsets to be deleted.
 
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to