[
https://issues.apache.org/jira/browse/KAFKA-19794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ravindranath Kakarla updated KAFKA-19794:
-----------------------------------------
Summary: MirrorCheckpointTask causes consumers on target cluster to rewind
offsets or skip messages due to erroneous offset commits (was:
MirrorCheckpointTask causes consumers on target cluster to rewind offsets or
skip messages due to unsafe offset commits)
> MirrorCheckpointTask causes consumers on target cluster to rewind offsets or
> skip messages due to erroneous offset commits
> --------------------------------------------------------------------------------------------------------------------------
>
> Key: KAFKA-19794
> URL: https://issues.apache.org/jira/browse/KAFKA-19794
> Project: Kafka
> Issue Type: Bug
> Components: mirrormaker
> Affects Versions: 3.5.2
> Reporter: Ravindranath Kakarla
> Priority: Major
>
> h2. *Description*
> The MirrorCheckpointTask in Mirror Maker 2 commits offsets for active
> consumer groups on the target cluster, causing consumers to rewind their
> offsets or skip messages. Typically brokers prevent committing offsets for
> consumer groups that are not in `EMPTY` state by throwing
> {{{}UnknownMemberIdException{}}}. In addition, {{MirrorCheckpointTask}} has
> logic in place to prevent committing offsets older than target for {{EMPTY}}
> consumer groups. However, due to a bug in {{MirrorCheckpointTask}} code, this
> prevention check is not enforced and it attempts to commit offsets for
> {{STABLE}} consumers. These calls can go through if consumers were
> momentarily disconnected moving the group state to {{{}EMPTY{}}}. This
> results in consumers' offsets getting reset to older values. If the offset is
> not available on the target broker (due to retention), the consumers can get
> reset to "{{{}earliest"{}}} or "{{{}latest"{}}}, thus reading duplicates or
> skipping messages.
> h2. *Bug location*
> 1. In [MirrorCheckpointTask|#L305], we only update the latest target cluster
> offsets ({{{}idleConsumerGroupsOffset{}}}) if target consumer group state is
> {{{}EMPTY{}}}.
> 2. When {{syncGroupOffset}} is called, we check if the target consumer group
> is present in
> {{{}idleConsumerGroupsOffset{}}}. The consumer group won't be present as it's
> an active group. We assume that this is a new group and start syncing
> consumer group offsets to target. These calls fail with {_}{{{{{}Unable to
> sync offsets for consumer group XYZ. This is likely caused by consumers
> currently using this group in the target cluster.
> (org.apache.kafka.connect.mirror.MirrorCheckpointTask{}}}}}{_}. When
> consumers have failed over, the logs typically contain a lot of these
> messages. These calls can succeed if consumer is momentarily disconnected due
> to restarts. The code should not assume the lack of consumer group in
> {{idleConsumerGroupsOffset}} map as a new consumer group.
> 3. These erroneous behavior can also be triggered calls to
> {{describeConsumerGroups}} or {{listConsumerGroupOffsets}} fail in
> {{refreshIdleConsumerGroupOffset}} method due to transient timeouts.
> h2. *Fix*
> Potential fix would be to add an explicit check to only sync offsets for
> EMPTY consumer group. We should also skip offset syncing for consumer groups
> for which we couldn't refresh the offsets.
>
> {code:java}
> // Fixed code adds state checking:
> ConsumerGroupState groupStateOnTarget =
> targetConsumerGroupStates.get(consumerGroupId);
> if (!isGroupPresentOnTarget || groupStateOnTarget == ConsumerGroupState.DEAD)
> { // Safe to sync - new or dead group
> syncGroupOffset(consumerGroupId, convertedUpstreamOffset); }
> else if (groupStateOnTarget == ConsumerGroupState.EMPTY)
> { // Safe to sync - idle group // ... existing offset comparison
> logic }
> else {
> // Skip active groups (STABLE, PREPARING_REBALANCE, COMPLETING_REBALANCE)
> log.info("Consumer group: {} with state: {} is being actively consumed on
> the target, skipping sync.",
> consumerGroupId, groupStateOnTarget);
> }
> {code}
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)