[ 
https://issues.apache.org/jira/browse/KAFKA-19794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ravindranath Kakarla updated KAFKA-19794:
-----------------------------------------
    Summary: MirrorCheckpointTask causes consumers on target cluster to rewind 
offsets or skip messages due to erroneous offset commits  (was: 
MirrorCheckpointTask causes consumers on target cluster to rewind offsets or 
skip messages due to unsafe offset commits)

> MirrorCheckpointTask causes consumers on target cluster to rewind offsets or 
> skip messages due to erroneous offset commits
> --------------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-19794
>                 URL: https://issues.apache.org/jira/browse/KAFKA-19794
>             Project: Kafka
>          Issue Type: Bug
>          Components: mirrormaker
>    Affects Versions: 3.5.2
>            Reporter: Ravindranath Kakarla
>            Priority: Major
>
> h2. *Description*
> The MirrorCheckpointTask in Mirror Maker 2 commits offsets for active 
> consumer groups on the target cluster, causing consumers to rewind their 
> offsets or skip messages. Typically brokers prevent committing offsets for 
> consumer groups that are not in `EMPTY` state by throwing 
> {{{}UnknownMemberIdException{}}}. In addition, {{MirrorCheckpointTask}} has 
> logic in place to prevent committing offsets older than target for {{EMPTY}} 
> consumer groups. However, due to a bug in {{MirrorCheckpointTask}} code, this 
> prevention check is not enforced and it attempts to commit offsets for 
> {{STABLE}} consumers. These calls can go through if consumers were 
> momentarily disconnected moving the group state to {{{}EMPTY{}}}. This 
> results in consumers' offsets getting reset to older values. If the offset is 
> not available on the target broker (due to retention), the consumers can get 
> reset to "{{{}earliest"{}}} or "{{{}latest"{}}}, thus reading duplicates or 
> skipping messages. 
> h2. *Bug location*
> 1. In [MirrorCheckpointTask|#L305], we only update the latest target cluster 
> offsets ({{{}idleConsumerGroupsOffset{}}})  if target consumer group state is 
> {{{}EMPTY{}}}.
> 2. When {{syncGroupOffset}} is called, we check if the target consumer group 
> is present in  
> {{{}idleConsumerGroupsOffset{}}}. The consumer group won't be present as it's 
> an active group. We assume that this is a new group and start syncing 
> consumer group offsets to target. These calls fail with {_}{{{{{}Unable to 
> sync offsets for consumer group XYZ. This is likely caused by consumers 
> currently using this group in the target cluster. 
> (org.apache.kafka.connect.mirror.MirrorCheckpointTask{}}}}}{_}. When 
> consumers have failed over, the logs typically contain a lot of these 
> messages. These calls can succeed if consumer is momentarily disconnected due 
> to restarts. The code should not assume the lack of consumer group in 
> {{idleConsumerGroupsOffset}} map as a new consumer group.
> 3. These erroneous behavior can also be triggered calls to 
> {{describeConsumerGroups}} or {{listConsumerGroupOffsets}} fail in 
> {{refreshIdleConsumerGroupOffset}} method due to transient timeouts.
> h2. *Fix*
> Potential fix would be to add an explicit check to only sync offsets for 
> EMPTY consumer group. We should also skip offset syncing for consumer groups 
> for which we couldn't refresh the offsets. 
>  
> {code:java}
> // Fixed code adds state checking:
> ConsumerGroupState groupStateOnTarget = 
> targetConsumerGroupStates.get(consumerGroupId);
> if (!isGroupPresentOnTarget || groupStateOnTarget == ConsumerGroupState.DEAD)
> {     // Safe to sync - new or dead group     
> syncGroupOffset(consumerGroupId, convertedUpstreamOffset); }
> else if (groupStateOnTarget == ConsumerGroupState.EMPTY)
> {     // Safe to sync - idle group     // ... existing offset comparison 
> logic }
> else {
>     // Skip active groups (STABLE, PREPARING_REBALANCE, COMPLETING_REBALANCE)
>     log.info("Consumer group: {} with state: {} is being actively consumed on 
> the target, skipping sync.",
>              consumerGroupId, groupStateOnTarget);
> }
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to