[ 
https://issues.apache.org/jira/browse/KAFKA-7018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16505706#comment-16505706
 ] 

Boyang Chen commented on KAFKA-7018:
------------------------------------

Thanks Guozhang! This could be done by our side, by persisting some local file 
mapping from client-id to member-id (if the client-id is fixed through 
iteration), as long as we agree on the design. Hey [~hachikuji], could you 
share more thoughts here?

cc [~ishiihara]

> persist memberId for consumer restart
> -------------------------------------
>
>                 Key: KAFKA-7018
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7018
>             Project: Kafka
>          Issue Type: Improvement
>          Components: consumer, streams
>            Reporter: Boyang Chen
>            Assignee: Boyang Chen
>            Priority: Major
>
> In group coordinator, there is a logic to neglect join group request from 
> existing follower consumers:
> {code:java}
> case Empty | Stable =>
>   if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
>     // if the member id is unknown, register the member to the group
>     addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, clientId, 
> clientHost, protocolType, protocols, group, responseCallback)
>   } else {
>     val member = group.get(memberId)
>     if (group.isLeader(memberId) || !member.matches(protocols)) {
>       // force a rebalance if a member has changed metadata or if the leader 
> sends JoinGroup.
>       // The latter allows the leader to trigger rebalances for changes 
> affecting assignment
>       // which do not affect the member metadata (such as topic metadata 
> changes for the consumer)
>       updateMemberAndRebalance(group, member, protocols, responseCallback)
>     } else {
>       // for followers with no actual change to their metadata, just return 
> group information
>       // for the current generation which will allow them to issue SyncGroup
>       responseCallback(JoinGroupResult(
>         members = Map.empty,
>         memberId = memberId,
>         generationId = group.generationId,
>         subProtocol = group.protocolOrNull,
>         leaderId = group.leaderOrNull,
>         error = Errors.NONE))
>     }
> {code}
> While looking at the AbstractCoordinator, I found that the generation was 
> hard-coded as 
> NO_GENERATION on restart, which means we will send UNKNOWN_MEMBER_ID in the 
> first join group request. This means we will treat the restarted consumer as 
> a new member, so the rebalance will be triggered until session timeout.
> I'm trying to clarify the following things before we extend the discussion:
>  # Whether my understanding of the above logic is right (Hope [~mjsax] could 
> help me double check)
>  # Whether it makes sense to persist last round of memberId for consumers? We 
> currently only need this feature in stream application, but will do no harm 
> if we also use it for consumer in general. This would be a nice-to-have 
> feature on consumer restart when we configured the loading-previous-memberId 
> to true. If we failed, simply use the UNKNOWN_MEMBER_ID
>  # The behavior could also be changed on the broker side, but I suspect it is 
> very risky. So far client side change should be the least effort. The end 
> goal is to avoid excessive rebalance from the same consumer restart, so if 
> you feel server side change could also help, we could further discuss.
> Thank you for helping out! [~mjsax] [~guozhang]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to