jeffkbkim commented on code in PR #14147: URL: https://github.com/apache/kafka/pull/14147#discussion_r1316322730
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -2904,6 +2908,135 @@ private void validateGenericGroupHeartbeat( } } + /** + * Handle a generic group LeaveGroup request. + * + * @param context The request context. + * @param request The actual LeaveGroup request. + * + * @return The LeaveGroup response and the GroupMetadata record to append if the group + * no longer has any member. + */ + public CoordinatorResult<LeaveGroupResponseData, Record> genericGroupLeave( + RequestContext context, + LeaveGroupRequestData request + ) throws UnknownMemberIdException, GroupIdNotFoundException { + GenericGroup group = getOrMaybeCreateGenericGroup(request.groupId(), false); + if (group.isInState(DEAD)) { + return new CoordinatorResult<>(Collections.emptyList(), + new LeaveGroupResponseData() + .setErrorCode(COORDINATOR_NOT_AVAILABLE.code()) + ); + } + + CoordinatorResult<Void, Record> coordinatorResult = EMPTY_RESULT; + List<MemberResponse> memberResponses = new ArrayList<>(); + + for (MemberIdentity member : request.members()) { + // The LeaveGroup API allows administrative removal of members by GroupInstanceId + // in which case we expect the MemberId to be undefined. + if (member.memberId().equals(UNKNOWN_MEMBER_ID)) { + if (member.groupInstanceId() != null && group.staticMemberId(member.groupInstanceId()) != null) { + coordinatorResult = removeCurrentMemberFromGenericGroup( + group, + group.staticMemberId(member.groupInstanceId()), + member.reason() + ); + memberResponses.add( + new MemberResponse() + .setMemberId(member.memberId()) + .setGroupInstanceId(member.groupInstanceId()) + ); + } else { + memberResponses.add( + new MemberResponse() + .setMemberId(member.memberId()) + .setGroupInstanceId(member.groupInstanceId()) + .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()) + ); + } + } else if (group.isPendingMember(member.memberId())) { + coordinatorResult = removePendingMemberAndUpdateGenericGroup(group, member.memberId()); + timer.cancel(genericGroupHeartbeatKey(group.groupId(), member.memberId())); + log.info("Pending member {} has left group {} through explicit `LeaveGroup` request.", + member.memberId(), group.groupId()); + + memberResponses.add( + new MemberResponse() + .setMemberId(member.memberId()) + .setGroupInstanceId(member.groupInstanceId()) + ); + } else { + try { + group.validateMember(member.memberId(), member.groupInstanceId(), "leave-group"); + coordinatorResult = removeCurrentMemberFromGenericGroup( + group, + member.memberId(), + member.reason() + ); + memberResponses.add( + new MemberResponse() + .setMemberId(member.memberId()) + .setGroupInstanceId(member.groupInstanceId()) + ); + } catch (KafkaException e) { + memberResponses.add( + new MemberResponse() + .setMemberId(member.memberId()) + .setGroupInstanceId(member.groupInstanceId()) + .setErrorCode(Errors.forException(e).code()) + ); + } + } + } + return new CoordinatorResult<>( + coordinatorResult.records(), + new LeaveGroupResponseData() + .setMembers(memberResponses), + coordinatorResult.appendFuture() + ); + } + + /** + * Remove a member from the group. Cancel member's heartbeat, and prepare rebalance + * or complete the join phase if necessary. + * + * @param group The generic group. + * @param memberId The member id. + * @param reason The reason for the LeaveGroup request. + * + * @return The GroupMetadata record and the append future to be completed once the record is + * appended to the log (and replicated). + */ + private CoordinatorResult<Void, Record> removeCurrentMemberFromGenericGroup( + GenericGroup group, + String memberId, + String reason + ) { + GenericGroupMember member = group.member(memberId); + reason = reason != null ? reason : "not provided"; + timer.cancel(genericGroupHeartbeatKey(group.groupId(), memberId)); + log.info("[Group {}] Member {} has left group through explicit `LeaveGroup` request; client reason: {}", + group.groupId(), memberId, reason); + + group.completeJoinFuture(member, + new JoinGroupResponseData() + .setMemberId(UNKNOWN_MEMBER_ID) + .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()) + ); + group.remove(member.memberId()); + + switch (group.currentState()) { + case STABLE: + case COMPLETING_REBALANCE: + return maybePrepareRebalanceOrCompleteJoin(group, reason); + case PREPARING_REBALANCE: + timer.cancel(genericGroupJoinKey(group.groupId())); Review Comment: Thanks for the catch. I updated this to `maybeCompleteJoinPhase(group)` but one difference is that the existing code (`GroupCoordinator#tryCompleteJoin`) completes the join phase even if it's the initial rebalance whereas `maybeCompleteJoinPhase` does not. Do you think this will be an issue? In the case that it is the initial rebalance, then the timer will retry after the timeout. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org