[ 
https://issues.apache.org/jira/browse/KAFKA-13435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17440816#comment-17440816
 ] 

Guozhang Wang commented on KAFKA-13435:
---------------------------------------

Hello [~rleslie], thanks for your reported issue. I looked through the code and 
I agree with you that this is a bug on the broker side --- unfortunately, since 
it is broker-side we need to fix it and upgrade the broker version in order to 
avoid hitting that again --- would you like to submit a PR that fixes on the 
broker? I think the fix should be inside {{updateStaticMemberAndRebalance}}, 
where we update the {{group.leader}} based on the oldMemberId.

> Group won't consume partitions added after static member restart
> ----------------------------------------------------------------
>
>                 Key: KAFKA-13435
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13435
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 2.7.0
>            Reporter: Ryan Leslie
>            Priority: Major
>
> When using consumer groups with static membership, if the consumer marked as 
> leader has restarted, then metadata changes such as partition increase are 
> not triggering expected rebalances.
> To reproduce this issue, simply:
>  # Create a static consumer subscribed to a single topic
>  # Close the consumer and create a new one with the same group instance id
>  # Increase partitions for the topic
>  # Observe that no rebalance occurs and the new partitions are not assigned
> I have only tested this in 2.7, but it may apply to newer versions as well.
> h3. Analysis
> In {_}ConsumerCoordinator{_}, one responsibility of the leader consumer is to 
> track metadata and trigger a rebalance if there are changes such as new 
> partitions added:
> [https://github.com/apache/kafka/blob/43bcc5682da82a602a4c0a000dc7433d0507b450/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L793]
> {code:java}
> if (assignmentSnapshot != null && 
> !assignmentSnapshot.matches(metadataSnapshot)) {
>     ...
>     requestRejoinIfNecessary(reason);
>     return true;
> }
> {code}
> Note thatĀ _assignmentSnapshot_ is currently only set if the consumer is the 
> leader:
> [https://github.com/apache/kafka/blob/43bcc5682da82a602a4c0a000dc7433d0507b450/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L353]
> {code:java}
> // Only the leader is responsible for monitoring for metadata changes (i.e. 
> partition changes)
> if (!isLeader)
>     assignmentSnapshot = null;
> {code}
> And _isLeader_ is only true after an assignment is performed during a 
> rebalance:
> [https://github.com/apache/kafka/blob/43bcc5682da82a602a4c0a000dc7433d0507b450/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L634]
> That is, when a consumer group forms, exactly one consumer in the group 
> should haveĀ _isLeader == True_ and be responsible for triggering rebalances 
> on metadata changes.
> However, in the case of static membership, if the leader has been restarted 
> and rejoined the group, the group essentially no longer has a current leader. 
> Even though the metadata changes are fetched, no rebalance will be triggered. 
> That is, _isLeader_ will be false for all members.
> This issue does not resolve until after an actual group change that causes a 
> proper rebalance. In order to safely make a partition increase when using 
> static membership, consumers must be stopped and have timed out, or forcibly 
> removed with {_}AdminClient.removeMembersFromConsumerGroup(){_}.
> Correcting this in the client probably also requires help from the broker. 
> Currently, when a static consumer that is leader is restarted, the 
> coordinator does recognize the change:
> e.g. leader _bbfcb930-61a3-4d21-945c-85f4576490ff_ was restarted
> {noformat}
> [2021-11-04 13:53:13,487] INFO [GroupCoordinator 4]: Static member 
> Some(1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0) of group ryan_test 
> with unknown member id rejoins, assigning new member id 
> 1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0-af88ecf2-
> 6ebf-47da-95ef-c54fef17ab74, while old member id 
> 1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0-bbfcb930-61a3-4d21-945c-85f4576490ff
>  will be removed. (
> kafka.coordinator.group.GroupCoordinator){noformat}
> However, it does not attempt to update the leader id since this isn't a new 
> rebalance, and JOIN_GROUP will continue returning the now stale member id as 
> leader:
> {noformat}
> 2021-11-04 13:53:13,490 DEBUG o.a.k.c.c.i.AbstractCoordinator [Consumer 
> instanceId=1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0, 
> clientId=1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0, 
> groupId=ryan_test] Received successful JoinGroup response: 
> JoinGroupResponseData(throttleTimeMs=0, errorCode=0, generationId=40, 
> protocolType='consumer', protocolName='range', 
> leader='1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0-bbfcb930-61a3-4d21-945c-85f4576490ff',
>  
> memberId='1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0-af88ecf2-6ebf-47da-95ef-c54fef17ab74',
>  members=[]){noformat}
> This means that it's not easy for any particular restarted member to identify 
> that it should consider itself leader and handle metadata changes.
> There is reference to the difficulty of leader restarts in KAFKA-7728 but the 
> focus seemed mainly on avoiding needless rebalances for static members. That 
> goal was accomplished, but this issue seems to be a side effect of both not 
> rebalancing AND not having the rejoined member reclaim its leadership status.
> Also, I have not verified if it's strictly related or valid, but noticed this 
> ticket has been opened too: KAFKA-12759.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to