dongnuo123 commented on code in PR #15988:
URL: https://github.com/apache/kafka/pull/15988#discussion_r1608907132


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -4274,6 +4342,77 @@ private void validateClassicGroupHeartbeat(
         }
     }
 
+    /**
+     * Handle a classic group HeartbeatRequest to a consumer group. A response 
with
+     * REBALANCE_IN_PROGRESS is returned if 1) the member epoch is smaller 
than the
+     * group epoch, 2) the member is in UNREVOKED_PARTITIONS, or 3) the member 
is in
+     * UNRELEASED_PARTITIONS and all its partitions pending assignment are 
free.
+     *
+     * @param group          The ConsumerGroup.
+     * @param context        The request context.
+     * @param request        The actual Heartbeat request.
+     *
+     * @return The coordinator result that contains the heartbeat response.
+     */
+    private CoordinatorResult<HeartbeatResponseData, CoordinatorRecord> 
classicGroupHeartbeatToConsumerGroup(
+        ConsumerGroup group,
+        RequestContext context,
+        HeartbeatRequestData request
+    ) throws UnknownMemberIdException, FencedInstanceIdException, 
IllegalGenerationException {
+        String groupId = request.groupId();
+        String memberId = request.memberId();
+        String instanceId = request.groupInstanceId();
+        ConsumerGroupMember member = validateConsumerGroupMember(group, 
memberId, instanceId);
+
+        throwIfMemberDoesNotUseClassicProtocol(member);
+        throwIfGenerationIdUnmatched(memberId, member.memberEpoch(), 
request.generationId());
+
+        scheduleConsumerGroupSessionTimeout(groupId, memberId, 
member.classicProtocolSessionTimeout().get());
+
+        Errors error = Errors.NONE;
+        if (member.memberEpoch() < group.groupEpoch() ||
+            member.state() == MemberState.UNREVOKED_PARTITIONS ||
+            (member.state() == MemberState.UNRELEASED_PARTITIONS && 
!group.hasUnreleasedPartitions(member))) {
+            error = Errors.REBALANCE_IN_PROGRESS;
+            scheduleConsumerGroupJoinTimeout(groupId, memberId, 
member.rebalanceTimeoutMs());

Review Comment:
   > we cancel the join timeout when we first convert to consumer group
   
   We don't cancel the timeout in case the conversion fails and the state needs 
to be reverted. The classic group join timeout does nothing if the group is a 
consumer group.
   
   > when we have a group epoch bump we tell the classic group member we're 
rebalancing and they should send a join request
   
   Yes correct, and the timeout here is for the member instead of the whole 
group. For each member, the rebalance will be something like
   - heartbeat -- if there's an ongoing rebalance, schedule the join timeout
   - join -- cancel the join timeout; schedule the sync timeout
   - sync -- cancel the sync timeout; maybe schedule a join timeout if a new 
rebalance ongoing



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to