jolshan commented on code in PR #14067: URL: https://github.com/apache/kafka/pull/14067#discussion_r1274177883
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java: ########## @@ -738,6 +742,84 @@ public String generateMemberId(String clientId, Optional<String> groupInstanceId .orElseGet(() -> clientId + MEMBER_ID_DELIMITER + UUID.randomUUID()); } + /** + * Validates that (1) the group instance id exists and is mapped to the member id + * if the group instance id is provided; and (2) the member id exists in the group. + * + * @param memberId The member id. + * @param groupInstanceId The group instance id. + * @param operation The operation. + * + * @throws UnknownMemberIdException + * @throws FencedInstanceIdException + */ + public void validateMember( + String memberId, + String groupInstanceId, + String operation + ) throws UnknownMemberIdException, FencedInstanceIdException { + if (groupInstanceId != null) { + String existingMemberId = staticMemberId(groupInstanceId); + if (existingMemberId == null) { + throw Errors.UNKNOWN_MEMBER_ID.exception(); + } else if (!existingMemberId.equals(memberId)) { + log.info("Request memberId={} for static member with groupInstanceId={} " + + "is fenced by existing memberId={} during operation {}", + memberId, groupInstanceId, existingMemberId, operation); + throw Errors.FENCED_INSTANCE_ID.exception(); + } + } + + if (!hasMemberId(memberId)) { + throw Errors.UNKNOWN_MEMBER_ID.exception(); + } + } + + /** + * Validates the OffsetCommit request. + * + * @param memberId The member id. + * @param groupInstanceId The group instance id. + * @param generationId The generation id. + */ + @Override + public void validateOffsetCommit( + String memberId, + String groupInstanceId, + int generationId + ) throws CoordinatorNotAvailableException, UnknownMemberIdException, IllegalGenerationException, FencedInstanceIdException { + if (isInState(DEAD)) { + throw Errors.COORDINATOR_NOT_AVAILABLE.exception(); + } + + if (generationId < 0 && isInState(EMPTY)) { + // When the generation id is -1, the request comes from either the admin client + // or a consumer which does not use the group management facility. In this case, + // the request can commit offsets if the group is empty. + return; + } + + if (generationId >= 0 || !memberId.isEmpty() || groupInstanceId != null) { + validateMember(memberId, groupInstanceId, "offset-commit"); Review Comment: Do we not care to specify txn offset commits/do we expect to do them elsewhere? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org