dongnuo123 commented on code in PR #15988: URL: https://github.com/apache/kafka/pull/15988#discussion_r1608907132
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -4274,6 +4342,77 @@ private void validateClassicGroupHeartbeat( } } + /** + * Handle a classic group HeartbeatRequest to a consumer group. A response with + * REBALANCE_IN_PROGRESS is returned if 1) the member epoch is smaller than the + * group epoch, 2) the member is in UNREVOKED_PARTITIONS, or 3) the member is in + * UNRELEASED_PARTITIONS and all its partitions pending assignment are free. + * + * @param group The ConsumerGroup. + * @param context The request context. + * @param request The actual Heartbeat request. + * + * @return The coordinator result that contains the heartbeat response. + */ + private CoordinatorResult<HeartbeatResponseData, CoordinatorRecord> classicGroupHeartbeatToConsumerGroup( + ConsumerGroup group, + RequestContext context, + HeartbeatRequestData request + ) throws UnknownMemberIdException, FencedInstanceIdException, IllegalGenerationException { + String groupId = request.groupId(); + String memberId = request.memberId(); + String instanceId = request.groupInstanceId(); + ConsumerGroupMember member = validateConsumerGroupMember(group, memberId, instanceId); + + throwIfMemberDoesNotUseClassicProtocol(member); + throwIfGenerationIdUnmatched(memberId, member.memberEpoch(), request.generationId()); + + scheduleConsumerGroupSessionTimeout(groupId, memberId, member.classicProtocolSessionTimeout().get()); + + Errors error = Errors.NONE; + if (member.memberEpoch() < group.groupEpoch() || + member.state() == MemberState.UNREVOKED_PARTITIONS || + (member.state() == MemberState.UNRELEASED_PARTITIONS && !group.hasUnreleasedPartitions(member))) { + error = Errors.REBALANCE_IN_PROGRESS; + scheduleConsumerGroupJoinTimeout(groupId, memberId, member.rebalanceTimeoutMs()); Review Comment: > we cancel the join timeout when we first convert to consumer group We don't cancel the timeout in case the conversion fails and the state needs to be reverted. The classic group join timeout does nothing if the group is a consumer group. > when we have a group epoch bump we tell the classic group member we're rebalancing and they should send a join request Yes correct, and the timeout here is for the member instead of the whole group. For each member, the rebalance will be something like - heartbeat -- if there's an ongoing rebalance, schedule the join timeout - join -- cancel the join timeout; schedule the sync timeout - sync -- cancel the sync timeout; maybe schedule a join timeout if a new rebalance ongoing -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org