vamossagar12 commented on code in PR #14432: URL: https://github.com/apache/kafka/pull/14432#discussion_r1354488025
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ########## @@ -285,6 +302,64 @@ public ConsumerGroupMember getOrMaybeCreateMember( return member; } + /** + * Gets or creates a static member. + * + * @param memberId The member id. + * @param instanceId The group instance id. + * @param createIfNotExists Booleans indicating whether the member must be + * created if it does not exist. + * + * @return A ConsumerGroupMember. + */ + public ConsumerGroupMember getOrMaybeCreateStaticMember( + String memberId, + String instanceId, + boolean createIfNotExists + ) { + ConsumerGroupMember member; + String existingMemberId = staticMemberId(instanceId); + if (!createIfNotExists) { + // The member joined with a non-zero epoch but we haven't registered this static member + // This could be an unknown member for the coordinator. + if (existingMemberId == null) { + throw Errors.UNKNOWN_MEMBER_ID.exception(); + } + // We can't create a member at this point. If the 2 member-ids don't match, + // we will throw an error. + if (!existingMemberId.equals(memberId)) { + throw Errors.FENCED_INSTANCE_ID.exception(); + } + member = getOrMaybeCreateMember(memberId, false); + } else { + // No existing member found against this instance id. Creating new. + if (existingMemberId == null) { + member = getOrMaybeCreateMember(memberId, true); + staticMembers.put(instanceId, memberId); Review Comment: Makes sense, I have removed this direct update of states and moved it to `replay()` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org