vamossagar12 commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1354491971


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -285,6 +302,64 @@ public ConsumerGroupMember getOrMaybeCreateMember(
         return member;
     }
 
+    /**
+     * Gets or creates a static member.
+     *
+     * @param memberId          The member id.
+     * @param instanceId        The group instance id.
+     * @param createIfNotExists Booleans indicating whether the member must be
+     *                          created if it does not exist.
+     *
+     * @return A ConsumerGroupMember.
+     */
+    public ConsumerGroupMember getOrMaybeCreateStaticMember(
+            String memberId,
+            String instanceId,
+            boolean createIfNotExists
+    ) {
+        ConsumerGroupMember member;
+        String existingMemberId = staticMemberId(instanceId);
+        if (!createIfNotExists) {
+            // The member joined with a non-zero epoch but we haven't 
registered this static member
+            // This could be an unknown member for the coordinator.
+            if (existingMemberId == null) {
+                throw Errors.UNKNOWN_MEMBER_ID.exception();
+            }
+            // We can't create a member at this point. If the 2 member-ids 
don't match,
+            // we will throw an error.
+            if (!existingMemberId.equals(memberId)) {
+                throw Errors.FENCED_INSTANCE_ID.exception();
+            }
+            member = getOrMaybeCreateMember(memberId, false);
+        } else {
+            // No existing member found against this instance id. Creating new.
+            if (existingMemberId == null) {
+                member = getOrMaybeCreateMember(memberId, true);
+                staticMembers.put(instanceId, memberId);
+                return member;
+            } else {
+                // Get the details of the existing member
+                ConsumerGroupMember existingMember = 
getOrMaybeCreateMember(existingMemberId, false);
+                int currentMemberEpoch = existingMember.memberEpoch();
+                // A new member with a used instance id joined but the 
previous member using the same instance id
+                // hasn't requested leaving the group.
+                if (currentMemberEpoch != -2 && 
!existingMemberId.equals(memberId)) {

Review Comment:
   Actually this is no longer required. Have removed it.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -285,6 +302,64 @@ public ConsumerGroupMember getOrMaybeCreateMember(
         return member;
     }
 
+    /**
+     * Gets or creates a static member.
+     *
+     * @param memberId          The member id.
+     * @param instanceId        The group instance id.
+     * @param createIfNotExists Booleans indicating whether the member must be
+     *                          created if it does not exist.
+     *
+     * @return A ConsumerGroupMember.
+     */
+    public ConsumerGroupMember getOrMaybeCreateStaticMember(
+            String memberId,
+            String instanceId,
+            boolean createIfNotExists
+    ) {
+        ConsumerGroupMember member;
+        String existingMemberId = staticMemberId(instanceId);
+        if (!createIfNotExists) {
+            // The member joined with a non-zero epoch but we haven't 
registered this static member
+            // This could be an unknown member for the coordinator.
+            if (existingMemberId == null) {
+                throw Errors.UNKNOWN_MEMBER_ID.exception();
+            }
+            // We can't create a member at this point. If the 2 member-ids 
don't match,
+            // we will throw an error.
+            if (!existingMemberId.equals(memberId)) {
+                throw Errors.FENCED_INSTANCE_ID.exception();
+            }
+            member = getOrMaybeCreateMember(memberId, false);
+        } else {
+            // No existing member found against this instance id. Creating new.
+            if (existingMemberId == null) {
+                member = getOrMaybeCreateMember(memberId, true);
+                staticMembers.put(instanceId, memberId);
+                return member;
+            } else {
+                // Get the details of the existing member
+                ConsumerGroupMember existingMember = 
getOrMaybeCreateMember(existingMemberId, false);
+                int currentMemberEpoch = existingMember.memberEpoch();
+                // A new member with a used instance id joined but the 
previous member using the same instance id
+                // hasn't requested leaving the group.
+                if (currentMemberEpoch != -2 && 
!existingMemberId.equals(memberId)) {
+                    throw Errors.UNRELEASED_INSTANCE_ID.exception();
+                }
+                // A new static member is trying to take the place of a 
departed static member. We will
+                // provide the assignments of the old member to the new one.
+                member = new ConsumerGroupMember.Builder(memberId, 
existingMember)
+                        .setMemberEpoch(existingMember.targetMemberEpoch())
+                        .setPreviousMemberEpoch(0)
+                        
.setTargetMemberEpoch(existingMember.targetMemberEpoch())
+                        .build();
+                updateMember(member);
+                staticMembers.put(instanceId, memberId);

Review Comment:
   Ack.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to