[ 
https://issues.apache.org/jira/browse/KAFKA-7018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16747366#comment-16747366
 ] 

ASF GitHub Bot commented on KAFKA-7018:
---------------------------------------

abbccdda commented on pull request #6177: KAFKA-7018 & KIP-345 part-one: Add 
static membership logic to JoinGroup protocol
URL: https://github.com/apache/kafka/pull/6177
 
 
   This is the first diff for the implementation of JoinGroup logic for static 
membership. The goal of this diff contains:
   
   1. Add `group.instance.id` to be unique identifier for consumer instances, 
provided by end user;
   2. Modify group coordinator to accept JoinGroupRequest with/without static 
membership, refactor the logic for readability and code reusability.
   3. Add client side support for incorporating static membership changes, 
including new config for `group.instance.id`, apply stream thread client id by 
default, and new join group exception handling.
   4. Remove `internal.leave.on.close` config by checking whether 
`group.instance.id` is defined. Effectively speaking, only dynamic member will 
send LeaveGroupRequest while static membership expiration is only controlled 
through session timeout.
   5. Increase max session timeout to 30 min for more user flexibility if they 
are inclined to tolerate partial unavailability than burdening rebalance.
   6. Unit tests for each module changes, especially on the group coordinator 
logic. Crossing the possibilities like:
     6.1 Dynamic/Static member
     6.2 Known/Unknown member id
     6.3 Group stable/unstable
   
   The hope here is to merge this logic before 2.2 code freeze so that we (as 
Pinterest) could start experimenting on the core logic ASAP.
   
   The rest of the 345 change will be broken down to 4 separate diffs:
   
   1. Avoid kicking out members through rebalance.timeout, only do the kick out 
through session timeout.
   2. Changes around LeaveGroup logic, including version bumping, broker logic, 
client logic, etc.
   3. Admin client changes to add ability to batch remove static members
   4. Deprecate group.initial.rebalance.delay
   
   Let me know your thoughts @guozhangwang @hachikuji @stanislavkozlovski 
@MayureshGharat @kkonstantine @lindong28 @Ishiihara @shawnsnguyen , thanks!
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> persist memberId for consumer restart
> -------------------------------------
>
>                 Key: KAFKA-7018
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7018
>             Project: Kafka
>          Issue Type: Improvement
>          Components: consumer, streams
>            Reporter: Boyang Chen
>            Assignee: Boyang Chen
>            Priority: Major
>
> In group coordinator, there is a logic to neglect join group request from 
> existing follower consumers:
> {code:java}
> case Empty | Stable =>
>   if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
>     // if the member id is unknown, register the member to the group
>     addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, clientId, 
> clientHost, protocolType, protocols, group, responseCallback)
>   } else {
>     val member = group.get(memberId)
>     if (group.isLeader(memberId) || !member.matches(protocols)) {
>       // force a rebalance if a member has changed metadata or if the leader 
> sends JoinGroup.
>       // The latter allows the leader to trigger rebalances for changes 
> affecting assignment
>       // which do not affect the member metadata (such as topic metadata 
> changes for the consumer)
>       updateMemberAndRebalance(group, member, protocols, responseCallback)
>     } else {
>       // for followers with no actual change to their metadata, just return 
> group information
>       // for the current generation which will allow them to issue SyncGroup
>       responseCallback(JoinGroupResult(
>         members = Map.empty,
>         memberId = memberId,
>         generationId = group.generationId,
>         subProtocol = group.protocolOrNull,
>         leaderId = group.leaderOrNull,
>         error = Errors.NONE))
>     }
> {code}
> While looking at the AbstractCoordinator, I found that the generation was 
> hard-coded as 
> NO_GENERATION on restart, which means we will send UNKNOWN_MEMBER_ID in the 
> first join group request. This means we will treat the restarted consumer as 
> a new member, so the rebalance will be triggered until session timeout.
> I'm trying to clarify the following things before we extend the discussion:
>  # Whether my understanding of the above logic is right (Hope [~mjsax] could 
> help me double check)
>  # Whether it makes sense to persist last round of memberId for consumers? We 
> currently only need this feature in stream application, but will do no harm 
> if we also use it for consumer in general. This would be a nice-to-have 
> feature on consumer restart when we configured the loading-previous-memberId 
> to true. If we failed, simply use the UNKNOWN_MEMBER_ID
>  # The behavior could also be changed on the broker side, but I suspect it is 
> very risky. So far client side change should be the least effort. The end 
> goal is to avoid excessive rebalance from the same consumer restart, so if 
> you feel server side change could also help, we could further discuss.
> Thank you for helping out! [~mjsax] [~guozhang]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to