dajac commented on code in PR #15988:
URL: https://github.com/apache/kafka/pull/15988#discussion_r1609835255


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -4274,6 +4343,81 @@ private void validateClassicGroupHeartbeat(
         }
     }
 
+    /**
+     * Handle a classic group HeartbeatRequest to a consumer group. A response 
with
+     * REBALANCE_IN_PROGRESS is returned if 1) the member epoch is smaller 
than the
+     * group epoch, 2) the member is in UNREVOKED_PARTITIONS, or 3) the member 
is in
+     * UNRELEASED_PARTITIONS and all its partitions pending assignment are 
free.
+     *
+     * @param group          The ConsumerGroup.
+     * @param context        The request context.
+     * @param request        The actual Heartbeat request.
+     *
+     * @return The coordinator result that contains the heartbeat response.
+     */
+    private CoordinatorResult<HeartbeatResponseData, CoordinatorRecord> 
classicGroupHeartbeatToConsumerGroup(
+        ConsumerGroup group,
+        RequestContext context,
+        HeartbeatRequestData request
+    ) throws UnknownMemberIdException, FencedInstanceIdException, 
IllegalGenerationException {
+        String groupId = request.groupId();
+        String memberId = request.memberId();
+        String instanceId = request.groupInstanceId();
+        ConsumerGroupMember member = validateConsumerGroupMember(group, 
memberId, instanceId);
+
+        throwIfMemberDoesNotUseClassicProtocol(member);
+        throwIfGenerationIdUnmatched(memberId, member.memberEpoch(), 
request.generationId());
+
+        scheduleConsumerGroupSessionTimeout(groupId, memberId, 
member.classicProtocolSessionTimeout().get());
+
+        Errors error = Errors.NONE;
+        // The member should rejoin if any of the following conditions is met.
+        // 1) The group epoch is bumped so the member need to rejoin to catch 
up.
+        // 2) The member needs to revoke some partitions and rejoin to 
reconcile with the new epoch.
+        // 3) The member's partitions pending assignment are free, so it can 
rejoin to get the complete assignment.
+        if (member.memberEpoch() < group.groupEpoch() ||
+            member.state() == MemberState.UNREVOKED_PARTITIONS ||
+            (member.state() == MemberState.UNRELEASED_PARTITIONS && 
!group.waitingOnUnreleasedPartition(member))) {
+            error = Errors.REBALANCE_IN_PROGRESS;
+            scheduleConsumerGroupJoinTimeoutIfAbsent(groupId, memberId, 
member.rebalanceTimeoutMs());
+        }
+
+        return new CoordinatorResult<>(
+            Collections.emptyList(),
+            new HeartbeatResponseData().setErrorCode(error.code())
+        );
+    }
+
+    /**
+     * Validates that (1) the instance id exists and is mapped to the member id
+     * if the group instance id is provided; and (2) the member id exists in 
the group.
+     *
+     * @param group             The consumer group.
+     * @param memberId          The member id.
+     * @param instanceId        The instance id.
+     *
+     * @return The ConsumerGruopMember.

Review Comment:
   nit: `The ConsumerGroupMember`.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -4209,31 +4241,67 @@ private void removePendingSyncMember(
      * @param context        The request context.
      * @param request        The actual Heartbeat request.
      *
-     * @return The Heartbeat response.
+     * @return The coordinator result that contains the heartbeat response.
      */
-    public HeartbeatResponseData classicGroupHeartbeat(
+    public CoordinatorResult<HeartbeatResponseData, CoordinatorRecord> 
classicGroupHeartbeat(

Review Comment:
   Yeah. This is correct. With the classic group type, the group state is not 
based on timeline data structures so using a read operation was fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to