[ 
https://issues.apache.org/jira/browse/KAFKA-13435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17452494#comment-17452494
 ] 

David Jacot commented on KAFKA-13435:
-------------------------------------

Yeah, I do agree that this solution does not fly if we can't assume that the 
assignor is deterministic and idempotent. I think that it is OK for internal 
assignors but as you said we don't know about external ones.

I have been playing with an alternative approach. If we summarise, the issue is 
that the re-joining leader does not know that it is the leader because the 
group coordinator does not inform it on purpose to avoid doing an assignment. 
As a side effect, the leader is not longer able to trigger a rebalance when the 
metadata is updated (e.g. partitions added) and only the leader is able to do 
this at the moment. What if we relax this constraint?

For the subscription, every member of the group does the check locally but only 
the leader known by the group coordinator can trigger a rebalance in the end. 
We can argue that we are wasting a bit of resources because all the non-leaders 
are doing it for nothing in the end. I wonder if we could do the same in our 
case for the metadata. All the members would check metadata changes and would 
try to trigger a rebalance. However, only the leader known by the group 
coordinator would succeed.

Looking at all the options, that seems to be a reasonable approach which is not 
too disruptive. 

> Group won't consume partitions added after static member restart
> ----------------------------------------------------------------
>
>                 Key: KAFKA-13435
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13435
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 3.0.0
>            Reporter: Ryan Leslie
>            Assignee: David Jacot
>            Priority: Critical
>
> When using consumer groups with static membership, if the consumer marked as 
> leader has restarted, then metadata changes such as partition increase are 
> not triggering expected rebalances.
> To reproduce this issue, simply:
>  # Create a static consumer subscribed to a single topic
>  # Close the consumer and create a new one with the same group instance id
>  # Increase partitions for the topic
>  # Observe that no rebalance occurs and the new partitions are not assigned
> I have only tested this in 2.7, but it may apply to newer versions as well.
> h3. Analysis
> In {_}ConsumerCoordinator{_}, one responsibility of the leader consumer is to 
> track metadata and trigger a rebalance if there are changes such as new 
> partitions added:
> [https://github.com/apache/kafka/blob/43bcc5682da82a602a4c0a000dc7433d0507b450/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L793]
> {code:java}
> if (assignmentSnapshot != null && 
> !assignmentSnapshot.matches(metadataSnapshot)) {
>     ...
>     requestRejoinIfNecessary(reason);
>     return true;
> }
> {code}
> Note thatĀ _assignmentSnapshot_ is currently only set if the consumer is the 
> leader:
> [https://github.com/apache/kafka/blob/43bcc5682da82a602a4c0a000dc7433d0507b450/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L353]
> {code:java}
> // Only the leader is responsible for monitoring for metadata changes (i.e. 
> partition changes)
> if (!isLeader)
>     assignmentSnapshot = null;
> {code}
> And _isLeader_ is only true after an assignment is performed during a 
> rebalance:
> [https://github.com/apache/kafka/blob/43bcc5682da82a602a4c0a000dc7433d0507b450/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L634]
> That is, when a consumer group forms, exactly one consumer in the group 
> should haveĀ _isLeader == True_ and be responsible for triggering rebalances 
> on metadata changes.
> However, in the case of static membership, if the leader has been restarted 
> and rejoined the group, the group essentially no longer has a current leader. 
> Even though the metadata changes are fetched, no rebalance will be triggered. 
> That is, _isLeader_ will be false for all members.
> This issue does not resolve until after an actual group change that causes a 
> proper rebalance. In order to safely make a partition increase when using 
> static membership, consumers must be stopped and have timed out, or forcibly 
> removed with {_}AdminClient.removeMembersFromConsumerGroup(){_}.
> Correcting this in the client probably also requires help from the broker. 
> Currently, when a static consumer that is leader is restarted, the 
> coordinator does recognize the change:
> e.g. leader _bbfcb930-61a3-4d21-945c-85f4576490ff_ was restarted
> {noformat}
> [2021-11-04 13:53:13,487] INFO [GroupCoordinator 4]: Static member 
> Some(1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0) of group ryan_test 
> with unknown member id rejoins, assigning new member id 
> 1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0-af88ecf2-
> 6ebf-47da-95ef-c54fef17ab74, while old member id 
> 1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0-bbfcb930-61a3-4d21-945c-85f4576490ff
>  will be removed. (
> kafka.coordinator.group.GroupCoordinator){noformat}
> However, it does not attempt to update the leader id since this isn't a new 
> rebalance, and JOIN_GROUP will continue returning the now stale member id as 
> leader:
> {noformat}
> 2021-11-04 13:53:13,490 DEBUG o.a.k.c.c.i.AbstractCoordinator [Consumer 
> instanceId=1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0, 
> clientId=1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0, 
> groupId=ryan_test] Received successful JoinGroup response: 
> JoinGroupResponseData(throttleTimeMs=0, errorCode=0, generationId=40, 
> protocolType='consumer', protocolName='range', 
> leader='1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0-bbfcb930-61a3-4d21-945c-85f4576490ff',
>  
> memberId='1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0-af88ecf2-6ebf-47da-95ef-c54fef17ab74',
>  members=[]){noformat}
> This means that it's not easy for any particular restarted member to identify 
> that it should consider itself leader and handle metadata changes.
> There is reference to the difficulty of leader restarts in KAFKA-7728 but the 
> focus seemed mainly on avoiding needless rebalances for static members. That 
> goal was accomplished, but this issue seems to be a side effect of both not 
> rebalancing AND not having the rejoined member reclaim its leadership status.
> Also, I have not verified if it's strictly related or valid, but noticed this 
> ticket has been opened too: KAFKA-12759.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to