jeffkbkim commented on code in PR #14147:
URL: https://github.com/apache/kafka/pull/14147#discussion_r1316322730


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2904,6 +2908,135 @@ private void validateGenericGroupHeartbeat(
         }
     }
 
+    /**
+     * Handle a generic group LeaveGroup request.
+     *
+     * @param context        The request context.
+     * @param request        The actual LeaveGroup request.
+     *
+     * @return The LeaveGroup response and the GroupMetadata record to append 
if the group
+     *         no longer has any member.
+     */
+    public CoordinatorResult<LeaveGroupResponseData, Record> genericGroupLeave(
+        RequestContext context,
+        LeaveGroupRequestData request
+    ) throws UnknownMemberIdException, GroupIdNotFoundException {
+        GenericGroup group = getOrMaybeCreateGenericGroup(request.groupId(), 
false);
+        if (group.isInState(DEAD)) {
+            return new CoordinatorResult<>(Collections.emptyList(),
+                new LeaveGroupResponseData()
+                    .setErrorCode(COORDINATOR_NOT_AVAILABLE.code())
+            );
+        }
+
+        CoordinatorResult<Void, Record> coordinatorResult = EMPTY_RESULT;
+        List<MemberResponse> memberResponses = new ArrayList<>();
+
+            for (MemberIdentity member : request.members()) {
+                // The LeaveGroup API allows administrative removal of members 
by GroupInstanceId
+                // in which case we expect the MemberId to be undefined.
+                if (member.memberId().equals(UNKNOWN_MEMBER_ID)) {
+                    if (member.groupInstanceId() != null && 
group.staticMemberId(member.groupInstanceId()) != null) {
+                        coordinatorResult = 
removeCurrentMemberFromGenericGroup(
+                            group,
+                            group.staticMemberId(member.groupInstanceId()),
+                            member.reason()
+                        );
+                        memberResponses.add(
+                            new MemberResponse()
+                                .setMemberId(member.memberId())
+                                .setGroupInstanceId(member.groupInstanceId())
+                        );
+                    } else {
+                        memberResponses.add(
+                            new MemberResponse()
+                                .setMemberId(member.memberId())
+                                .setGroupInstanceId(member.groupInstanceId())
+                                .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())
+                        );
+                    }
+                } else if (group.isPendingMember(member.memberId())) {
+                    coordinatorResult = 
removePendingMemberAndUpdateGenericGroup(group, member.memberId());
+                    timer.cancel(genericGroupHeartbeatKey(group.groupId(), 
member.memberId()));
+                    log.info("Pending member {} has left group {} through 
explicit `LeaveGroup` request.",
+                        member.memberId(), group.groupId());
+
+                    memberResponses.add(
+                        new MemberResponse()
+                            .setMemberId(member.memberId())
+                            .setGroupInstanceId(member.groupInstanceId())
+                    );
+                } else {
+                    try {
+                        group.validateMember(member.memberId(), 
member.groupInstanceId(), "leave-group");
+                        coordinatorResult = 
removeCurrentMemberFromGenericGroup(
+                            group,
+                            member.memberId(),
+                            member.reason()
+                        );
+                        memberResponses.add(
+                            new MemberResponse()
+                                .setMemberId(member.memberId())
+                                .setGroupInstanceId(member.groupInstanceId())
+                        );
+                    } catch (KafkaException e) {
+                        memberResponses.add(
+                            new MemberResponse()
+                                .setMemberId(member.memberId())
+                                .setGroupInstanceId(member.groupInstanceId())
+                                .setErrorCode(Errors.forException(e).code())
+                        );
+                    }
+                }
+            }
+            return new CoordinatorResult<>(
+                coordinatorResult.records(),
+                new LeaveGroupResponseData()
+                    .setMembers(memberResponses),
+                coordinatorResult.appendFuture()
+            );
+    }
+
+    /**
+     * Remove a member from the group. Cancel member's heartbeat, and prepare 
rebalance
+     * or complete the join phase if necessary.
+     * 
+     * @param group     The generic group.
+     * @param memberId  The member id.
+     * @param reason    The reason for the LeaveGroup request.
+     *
+     * @return The GroupMetadata record and the append future to be completed 
once the record is
+     *         appended to the log (and replicated).
+     */
+    private CoordinatorResult<Void, Record> 
removeCurrentMemberFromGenericGroup(
+        GenericGroup group,
+        String memberId,
+        String reason
+    ) {
+        GenericGroupMember member = group.member(memberId);
+        reason = reason != null ? reason : "not provided";
+        timer.cancel(genericGroupHeartbeatKey(group.groupId(), memberId));
+        log.info("[Group {}] Member {} has left group through explicit 
`LeaveGroup` request; client reason: {}",
+            group.groupId(), memberId, reason);
+
+        group.completeJoinFuture(member,
+            new JoinGroupResponseData()
+                .setMemberId(UNKNOWN_MEMBER_ID)
+                .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())
+        );
+        group.remove(member.memberId());
+
+        switch (group.currentState()) {
+            case STABLE:
+            case COMPLETING_REBALANCE:
+                return maybePrepareRebalanceOrCompleteJoin(group, reason);
+            case PREPARING_REBALANCE:
+                timer.cancel(genericGroupJoinKey(group.groupId()));

Review Comment:
   Thanks for the catch. I updated this to `maybeCompleteJoinPhase(group)` but 
one difference is that the existing code (`GroupCoordinator#tryCompleteJoin`) 
completes the join phase even if it's the initial rebalance whereas 
`maybeCompleteJoinPhase` does not.
   
   Do you think this will be an issue? In the case that it is the initial 
rebalance, then the timer will retry after the timeout.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to