vamossagar12 commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1383617522


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -983,27 +1085,81 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
         return new CoordinatorResult<>(records, response);
     }
 
+    private void replaceStaticMemberInConsumerGroup(
+        List<Record> records,
+        ConsumerGroup group,
+        ConsumerGroupMember existingStaticMember
+    ) {
+        // Write tombstones for the departed static member.
+        removeMember(records, group.groupId(), 
existingStaticMember.memberId());
+        // Cancel all the timers of the departed static member.
+        cancelTimers(group.groupId(), existingStaticMember.memberId());
+    }
+
+    private boolean staticMemberReplaced(int memberEpoch, ConsumerGroupMember 
existingStaticMember) {
+        return memberEpoch == 0 && existingStaticMember != null && 
existingStaticMember.memberEpoch() == LEAVE_GROUP_STATIC_MEMBER_EPOCH;
+    }
+
     /**
      * Handles leave request from a consumer group member.
      * @param groupId       The group id from the request.
      * @param memberId      The member id from the request.
+     * @param memberEpoch   The member epoch from the request.
      *
      * @return A Result containing the ConsumerGroupHeartbeat response and
      *         a list of records to update the state machine.
      */
     private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> 
consumerGroupLeave(
         String groupId,
-        String memberId
+        String instanceId,
+        String memberId,
+        int memberEpoch
     ) throws ApiException {
         ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false);
-        ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, 
false);
+        ConsumerGroupMember member = memberEpoch == 
LEAVE_GROUP_STATIC_MEMBER_EPOCH ?
+                group.staticMember(instanceId) :
+                group.getOrMaybeCreateMember(memberId, false);
 
-        log.info("[GroupId " + groupId + "] Member " + memberId + " left the 
consumer group.");
-
-        List<Record> records = consumerGroupFenceMember(group, member);
+        List<Record> records = new ArrayList<>();
+        if (memberEpoch == LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
+            throwIfStaticMemberValidationFails(groupId, instanceId, member, 
memberEpoch, memberId);
+            log.info("[GroupId {}] Member {} with instance id {} is a static 
member and will not be fenced from the group",
+                    group.groupId(), member.memberId(), member.instanceId());

Review Comment:
   done.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -983,27 +1085,81 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
         return new CoordinatorResult<>(records, response);
     }
 
+    private void replaceStaticMemberInConsumerGroup(
+        List<Record> records,
+        ConsumerGroup group,
+        ConsumerGroupMember existingStaticMember
+    ) {
+        // Write tombstones for the departed static member.
+        removeMember(records, group.groupId(), 
existingStaticMember.memberId());
+        // Cancel all the timers of the departed static member.
+        cancelTimers(group.groupId(), existingStaticMember.memberId());
+    }
+
+    private boolean staticMemberReplaced(int memberEpoch, ConsumerGroupMember 
existingStaticMember) {
+        return memberEpoch == 0 && existingStaticMember != null && 
existingStaticMember.memberEpoch() == LEAVE_GROUP_STATIC_MEMBER_EPOCH;
+    }
+
     /**
      * Handles leave request from a consumer group member.
      * @param groupId       The group id from the request.
      * @param memberId      The member id from the request.
+     * @param memberEpoch   The member epoch from the request.
      *
      * @return A Result containing the ConsumerGroupHeartbeat response and
      *         a list of records to update the state machine.
      */
     private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> 
consumerGroupLeave(
         String groupId,
-        String memberId
+        String instanceId,
+        String memberId,
+        int memberEpoch
     ) throws ApiException {
         ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false);
-        ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, 
false);
+        ConsumerGroupMember member = memberEpoch == 
LEAVE_GROUP_STATIC_MEMBER_EPOCH ?
+                group.staticMember(instanceId) :
+                group.getOrMaybeCreateMember(memberId, false);
 
-        log.info("[GroupId " + groupId + "] Member " + memberId + " left the 
consumer group.");
-
-        List<Record> records = consumerGroupFenceMember(group, member);
+        List<Record> records = new ArrayList<>();
+        if (memberEpoch == LEAVE_GROUP_STATIC_MEMBER_EPOCH) {

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to