jolshan commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1274177883


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##########
@@ -738,6 +742,84 @@ public String generateMemberId(String clientId, 
Optional<String> groupInstanceId
             .orElseGet(() -> clientId + MEMBER_ID_DELIMITER + 
UUID.randomUUID());
     }
 
+    /**
+     * Validates that (1) the group instance id exists and is mapped to the 
member id
+     * if the group instance id is provided; and (2) the member id exists in 
the group.
+     *
+     * @param memberId          The member id.
+     * @param groupInstanceId   The group instance id.
+     * @param operation         The operation.
+     *
+     * @throws UnknownMemberIdException
+     * @throws FencedInstanceIdException
+     */
+    public void validateMember(
+        String memberId,
+        String groupInstanceId,
+        String operation
+    ) throws UnknownMemberIdException, FencedInstanceIdException {
+        if (groupInstanceId != null) {
+            String existingMemberId = staticMemberId(groupInstanceId);
+            if (existingMemberId == null) {
+                throw Errors.UNKNOWN_MEMBER_ID.exception();
+            } else if (!existingMemberId.equals(memberId)) {
+                log.info("Request memberId={} for static member with 
groupInstanceId={} " +
+                         "is fenced by existing memberId={} during operation 
{}",
+                    memberId, groupInstanceId, existingMemberId, operation);
+                throw Errors.FENCED_INSTANCE_ID.exception();
+            }
+        }
+
+        if (!hasMemberId(memberId)) {
+            throw Errors.UNKNOWN_MEMBER_ID.exception();
+        }
+    }
+
+    /**
+     * Validates the OffsetCommit request.
+     *
+     * @param memberId          The member id.
+     * @param groupInstanceId   The group instance id.
+     * @param generationId      The generation id.
+     */
+    @Override
+    public void validateOffsetCommit(
+        String memberId,
+        String groupInstanceId,
+        int generationId
+    ) throws CoordinatorNotAvailableException, UnknownMemberIdException, 
IllegalGenerationException, FencedInstanceIdException {
+        if (isInState(DEAD)) {
+            throw Errors.COORDINATOR_NOT_AVAILABLE.exception();
+        }
+
+        if (generationId < 0 && isInState(EMPTY)) {
+            // When the generation id is -1, the request comes from either the 
admin client
+            // or a consumer which does not use the group management facility. 
In this case,
+            // the request can commit offsets if the group is empty.
+            return;
+        }
+
+        if (generationId >= 0 || !memberId.isEmpty() || groupInstanceId != 
null) {
+            validateMember(memberId, groupInstanceId, "offset-commit");

Review Comment:
   Do we not care to specify txn offset commits/do we expect to do them 
elsewhere?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to