vamossagar12 commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1354488025


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -285,6 +302,64 @@ public ConsumerGroupMember getOrMaybeCreateMember(
         return member;
     }
 
+    /**
+     * Gets or creates a static member.
+     *
+     * @param memberId          The member id.
+     * @param instanceId        The group instance id.
+     * @param createIfNotExists Booleans indicating whether the member must be
+     *                          created if it does not exist.
+     *
+     * @return A ConsumerGroupMember.
+     */
+    public ConsumerGroupMember getOrMaybeCreateStaticMember(
+            String memberId,
+            String instanceId,
+            boolean createIfNotExists
+    ) {
+        ConsumerGroupMember member;
+        String existingMemberId = staticMemberId(instanceId);
+        if (!createIfNotExists) {
+            // The member joined with a non-zero epoch but we haven't 
registered this static member
+            // This could be an unknown member for the coordinator.
+            if (existingMemberId == null) {
+                throw Errors.UNKNOWN_MEMBER_ID.exception();
+            }
+            // We can't create a member at this point. If the 2 member-ids 
don't match,
+            // we will throw an error.
+            if (!existingMemberId.equals(memberId)) {
+                throw Errors.FENCED_INSTANCE_ID.exception();
+            }
+            member = getOrMaybeCreateMember(memberId, false);
+        } else {
+            // No existing member found against this instance id. Creating new.
+            if (existingMemberId == null) {
+                member = getOrMaybeCreateMember(memberId, true);
+                staticMembers.put(instanceId, memberId);

Review Comment:
   Makes sense, I have removed this direct update of states and moved it to 
`replay()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to