dajac commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1377557816


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -869,6 +954,10 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
                     groupId, memberId, updatedMember.subscribedTopicRegex());
                 bumpGroupEpoch = true;
             }
+        } else {
+            if (staticMemberReplaced) {
+                records.add(newMemberSubscriptionRecord(groupId, 
updatedMember));
+            }

Review Comment:
   If we rely on `member` and `updatedMember` then we don't need this because 
`!updatedMember.equals(member)` will catch the new member.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -829,20 +885,49 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
 
         // Get or create the member.
         if (memberId.isEmpty()) memberId = Uuid.randomUuid().toString();
-        final ConsumerGroupMember member = 
group.getOrMaybeCreateMember(memberId, createIfNotExists);
+        ConsumerGroupMember member;
+        ConsumerGroupMember existingStaticMember = null;
+        if (instanceId == null) {
+            member = group.getOrMaybeCreateMember(memberId, createIfNotExists);
+        } else {
+            existingStaticMember = group.staticMember(instanceId);
+            throwIfStaticMemberValidationFails(groupId, instanceId, 
existingStaticMember, memberEpoch, memberId);
+            member = group.getOrMaybeCreateMember(memberId, createIfNotExists);
+        }

Review Comment:
   I wonder if we could simplify it even more. For instance, would it be 
possible to have something like the following:
   
   ```
   ConsumerGroupMember member;
   ConsumerGroupMember updatedMember;
   
   if (instanceId == null) {
     member = group.getOrMaybeCreateMember(memberId, createIfNotExists);
     throwIfMemberEpochIsInvalid(member, memberEpoch, ownedTopicPartitions);
     log.info(...);
     updatedMember = new ConsumerGroupMember.Builder(member)
        ....
   } else {
     // the new logic.
     // member is the current static member.
     // updatedMember is the updated current member or the new one.
   }
   ```
   
   



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -983,27 +1085,81 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
         return new CoordinatorResult<>(records, response);
     }
 
+    private void replaceStaticMemberInConsumerGroup(
+        List<Record> records,
+        ConsumerGroup group,
+        ConsumerGroupMember existingStaticMember
+    ) {
+        // Write tombstones for the departed static member.
+        removeMember(records, group.groupId(), 
existingStaticMember.memberId());
+        // Cancel all the timers of the departed static member.
+        cancelTimers(group.groupId(), existingStaticMember.memberId());
+    }
+
+    private boolean staticMemberReplaced(int memberEpoch, ConsumerGroupMember 
existingStaticMember) {
+        return memberEpoch == 0 && existingStaticMember != null && 
existingStaticMember.memberEpoch() == LEAVE_GROUP_STATIC_MEMBER_EPOCH;
+    }
+
     /**
      * Handles leave request from a consumer group member.
      * @param groupId       The group id from the request.
      * @param memberId      The member id from the request.
+     * @param memberEpoch   The member epoch from the request.
      *
      * @return A Result containing the ConsumerGroupHeartbeat response and
      *         a list of records to update the state machine.
      */
     private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> 
consumerGroupLeave(
         String groupId,
-        String memberId
+        String instanceId,
+        String memberId,
+        int memberEpoch
     ) throws ApiException {
         ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false);
-        ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, 
false);
+        ConsumerGroupMember member = memberEpoch == 
LEAVE_GROUP_STATIC_MEMBER_EPOCH ?
+                group.staticMember(instanceId) :
+                group.getOrMaybeCreateMember(memberId, false);
 
-        log.info("[GroupId " + groupId + "] Member " + memberId + " left the 
consumer group.");
-
-        List<Record> records = consumerGroupFenceMember(group, member);
+        List<Record> records = new ArrayList<>();
+        if (memberEpoch == LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
+            throwIfStaticMemberValidationFails(groupId, instanceId, member, 
memberEpoch, memberId);
+            log.info("[GroupId {}] Member {} with instance id {} is a static 
member and will not be fenced from the group",
+                    group.groupId(), member.memberId(), member.instanceId());

Review Comment:
   nit: `"[GroupId {}] Static member {} with member id {} left the consumer 
group."`? I would also use a similar logging structure for the other log 
messages.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -983,27 +1085,81 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
         return new CoordinatorResult<>(records, response);

Review Comment:
   Don't we need to also force the step 3.? If we don't do it, we don't write 
the current assignment record for the new member and we don't reconcile him.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -983,27 +1085,81 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
         return new CoordinatorResult<>(records, response);
     }
 
+    private void replaceStaticMemberInConsumerGroup(
+        List<Record> records,
+        ConsumerGroup group,
+        ConsumerGroupMember existingStaticMember
+    ) {
+        // Write tombstones for the departed static member.
+        removeMember(records, group.groupId(), 
existingStaticMember.memberId());
+        // Cancel all the timers of the departed static member.
+        cancelTimers(group.groupId(), existingStaticMember.memberId());
+    }
+
+    private boolean staticMemberReplaced(int memberEpoch, ConsumerGroupMember 
existingStaticMember) {
+        return memberEpoch == 0 && existingStaticMember != null && 
existingStaticMember.memberEpoch() == LEAVE_GROUP_STATIC_MEMBER_EPOCH;
+    }
+
     /**
      * Handles leave request from a consumer group member.
      * @param groupId       The group id from the request.
      * @param memberId      The member id from the request.
+     * @param memberEpoch   The member epoch from the request.
      *
      * @return A Result containing the ConsumerGroupHeartbeat response and
      *         a list of records to update the state machine.
      */
     private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> 
consumerGroupLeave(
         String groupId,
-        String memberId
+        String instanceId,
+        String memberId,
+        int memberEpoch
     ) throws ApiException {
         ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false);
-        ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, 
false);
+        ConsumerGroupMember member = memberEpoch == 
LEAVE_GROUP_STATIC_MEMBER_EPOCH ?
+                group.staticMember(instanceId) :
+                group.getOrMaybeCreateMember(memberId, false);
 
-        log.info("[GroupId " + groupId + "] Member " + memberId + " left the 
consumer group.");
-
-        List<Record> records = consumerGroupFenceMember(group, member);
+        List<Record> records = new ArrayList<>();
+        if (memberEpoch == LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
+            throwIfStaticMemberValidationFails(groupId, instanceId, member, 
memberEpoch, memberId);
+            log.info("[GroupId {}] Member {} with instance id {} is a static 
member and will not be fenced from the group",
+                    group.groupId(), member.memberId(), member.instanceId());
+            records.addAll(consumerGroupStaticMemberGroupLeave(group, member, 
memberId));
+        } else {
+            log.info("[GroupId {}] Member {} left the consumer group.", 
groupId, memberId);
+            records.addAll(consumerGroupFenceMember(group, member));
+        }
         return new CoordinatorResult<>(records, new 
ConsumerGroupHeartbeatResponseData()
             .setMemberId(memberId)
-            .setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH));
+            .setMemberEpoch(memberEpoch));
+    }
+
+    /**
+     * Handles the case when a static member decides to leave the group.
+     * The member is not actually fenced from the group, and instead it's
+     * member epoch is updated to -2 to reflect that a member using the given
+     * instance id decided to leave the group and would be back within session
+     * timeout.
+     *
+     * @param group       The group.
+     * @param existingStaticMember      The member.
+     *
+     * @return A list of records to be applied to the state.
+     */
+    private List<Record> consumerGroupStaticMemberGroupLeave(
+        ConsumerGroup group,
+        ConsumerGroupMember existingStaticMember,

Review Comment:
   nit: `member`?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -287,6 +305,18 @@ public ConsumerGroupMember getOrMaybeCreateMember(
         return member;
     }
 
+    /**
+     * Gets a static member.
+     *
+     * @param instanceId        The group instance id.

Review Comment:
   nit: We can reduce the space between the name and the desc.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -983,27 +1085,81 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
         return new CoordinatorResult<>(records, response);
     }
 
+    private void replaceStaticMemberInConsumerGroup(

Review Comment:
   nit: removeMemberAndCancelTimers? The logic is not tight to static members. 
I would also directly pass the groupId and the memberId as this is all it needs.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -898,31 +987,44 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
             group.setMetadataRefreshDeadline(currentTimeMs + 
consumerGroupMetadataRefreshIntervalMs, groupEpoch);
         }
 
-        // 2. Update the target assignment if the group epoch is larger than 
the target assignment epoch. The
-        // delta between the existing and the new target assignment is 
persisted to the partition.
+        // 2. Update the target assignment if the group epoch is larger than 
the target assignment epoch or a static member
+        // replaces an existing static member. The delta between the existing 
and the new target assignment is persisted to the partition.
         int targetAssignmentEpoch = group.assignmentEpoch();
         Assignment targetAssignment = group.targetAssignment(memberId);
-        if (groupEpoch > targetAssignmentEpoch) {
+        if (groupEpoch > targetAssignmentEpoch || staticMemberReplaced) {
             String preferredServerAssignor = 
group.computePreferredServerAssignor(
                 member,
                 updatedMember
             ).orElse(defaultAssignor.name());
 
             try {
-                TargetAssignmentBuilder.TargetAssignmentResult 
assignmentResult =
-                    new TargetAssignmentBuilder(groupId, groupEpoch, 
assignors.get(preferredServerAssignor))
+                TargetAssignmentBuilder assignmentResultBuilder =
+                    new TargetAssignmentBuilder(groupId, groupEpoch, 
assignors.get(preferredServerAssignor));
+                TargetAssignmentBuilder.TargetAssignmentResult 
assignmentResult;
+                // A new static member is replacing an older one with the same 
subscriptions.
+                // We just need to remove the older member and add the newer 
one. The new member can
+                // reuse the target assignment of the older member.
+                if (staticMemberReplaced && groupEpoch == 
targetAssignmentEpoch) {
+                    targetAssignment = 
group.targetAssignment(existingStaticMember.memberId());
+                    assignmentResult = assignmentResultBuilder
+                        .removeMember(existingStaticMember.memberId())
+                        .addOrUpdateMember(memberId, updatedMember)
+                        .build();
+                    records.addAll(assignmentResult.records());
+                } else {
+                    assignmentResult = assignmentResultBuilder
                         .withMembers(group.members())
                         .withSubscriptionMetadata(subscriptionMetadata)
                         .withTargetAssignment(group.targetAssignment())
                         .addOrUpdateMember(memberId, updatedMember)

Review Comment:
   When the `TargetAssignmentBuilder` builds the spec for the assignor, it must 
use the target assignment of the previous static member for the new static 
member. How do we ensure this? We may have to update the 
`TargetAssignmentBuilder` to understand that a static member is replaced.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -898,31 +987,44 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
             group.setMetadataRefreshDeadline(currentTimeMs + 
consumerGroupMetadataRefreshIntervalMs, groupEpoch);
         }
 
-        // 2. Update the target assignment if the group epoch is larger than 
the target assignment epoch. The
-        // delta between the existing and the new target assignment is 
persisted to the partition.
+        // 2. Update the target assignment if the group epoch is larger than 
the target assignment epoch or a static member
+        // replaces an existing static member. The delta between the existing 
and the new target assignment is persisted to the partition.
         int targetAssignmentEpoch = group.assignmentEpoch();
         Assignment targetAssignment = group.targetAssignment(memberId);
-        if (groupEpoch > targetAssignmentEpoch) {
+        if (groupEpoch > targetAssignmentEpoch || staticMemberReplaced) {
             String preferredServerAssignor = 
group.computePreferredServerAssignor(
                 member,
                 updatedMember
             ).orElse(defaultAssignor.name());
 
             try {
-                TargetAssignmentBuilder.TargetAssignmentResult 
assignmentResult =
-                    new TargetAssignmentBuilder(groupId, groupEpoch, 
assignors.get(preferredServerAssignor))
+                TargetAssignmentBuilder assignmentResultBuilder =
+                    new TargetAssignmentBuilder(groupId, groupEpoch, 
assignors.get(preferredServerAssignor));
+                TargetAssignmentBuilder.TargetAssignmentResult 
assignmentResult;
+                // A new static member is replacing an older one with the same 
subscriptions.
+                // We just need to remove the older member and add the newer 
one. The new member can
+                // reuse the target assignment of the older member.
+                if (staticMemberReplaced && groupEpoch == 
targetAssignmentEpoch) {
+                    targetAssignment = 
group.targetAssignment(existingStaticMember.memberId());
+                    assignmentResult = assignmentResultBuilder
+                        .removeMember(existingStaticMember.memberId())
+                        .addOrUpdateMember(memberId, updatedMember)
+                        .build();
+                    records.addAll(assignmentResult.records());

Review Comment:
   I don't fully understand how this would work because the members and the 
target assignment are not set.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -983,27 +1085,81 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
         return new CoordinatorResult<>(records, response);
     }
 
+    private void replaceStaticMemberInConsumerGroup(
+        List<Record> records,
+        ConsumerGroup group,
+        ConsumerGroupMember existingStaticMember
+    ) {
+        // Write tombstones for the departed static member.
+        removeMember(records, group.groupId(), 
existingStaticMember.memberId());
+        // Cancel all the timers of the departed static member.
+        cancelTimers(group.groupId(), existingStaticMember.memberId());
+    }
+
+    private boolean staticMemberReplaced(int memberEpoch, ConsumerGroupMember 
existingStaticMember) {
+        return memberEpoch == 0 && existingStaticMember != null && 
existingStaticMember.memberEpoch() == LEAVE_GROUP_STATIC_MEMBER_EPOCH;
+    }
+
     /**
      * Handles leave request from a consumer group member.
      * @param groupId       The group id from the request.
      * @param memberId      The member id from the request.
+     * @param memberEpoch   The member epoch from the request.
      *
      * @return A Result containing the ConsumerGroupHeartbeat response and
      *         a list of records to update the state machine.
      */
     private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> 
consumerGroupLeave(
         String groupId,
-        String memberId
+        String instanceId,
+        String memberId,
+        int memberEpoch
     ) throws ApiException {
         ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false);
-        ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, 
false);
+        ConsumerGroupMember member = memberEpoch == 
LEAVE_GROUP_STATIC_MEMBER_EPOCH ?
+                group.staticMember(instanceId) :
+                group.getOrMaybeCreateMember(memberId, false);
 
-        log.info("[GroupId " + groupId + "] Member " + memberId + " left the 
consumer group.");
-
-        List<Record> records = consumerGroupFenceMember(group, member);
+        List<Record> records = new ArrayList<>();
+        if (memberEpoch == LEAVE_GROUP_STATIC_MEMBER_EPOCH) {

Review Comment:
   nit: We check this twice. Once here and once earlier to lookup the member. 
Could we combine them?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -983,27 +1085,81 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
         return new CoordinatorResult<>(records, response);
     }
 
+    private void replaceStaticMemberInConsumerGroup(
+        List<Record> records,
+        ConsumerGroup group,
+        ConsumerGroupMember existingStaticMember
+    ) {
+        // Write tombstones for the departed static member.
+        removeMember(records, group.groupId(), 
existingStaticMember.memberId());
+        // Cancel all the timers of the departed static member.
+        cancelTimers(group.groupId(), existingStaticMember.memberId());
+    }
+
+    private boolean staticMemberReplaced(int memberEpoch, ConsumerGroupMember 
existingStaticMember) {
+        return memberEpoch == 0 && existingStaticMember != null && 
existingStaticMember.memberEpoch() == LEAVE_GROUP_STATIC_MEMBER_EPOCH;
+    }
+
     /**
      * Handles leave request from a consumer group member.
      * @param groupId       The group id from the request.
      * @param memberId      The member id from the request.
+     * @param memberEpoch   The member epoch from the request.
      *
      * @return A Result containing the ConsumerGroupHeartbeat response and
      *         a list of records to update the state machine.
      */
     private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> 
consumerGroupLeave(
         String groupId,
-        String memberId
+        String instanceId,
+        String memberId,
+        int memberEpoch
     ) throws ApiException {
         ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false);
-        ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, 
false);
+        ConsumerGroupMember member = memberEpoch == 
LEAVE_GROUP_STATIC_MEMBER_EPOCH ?
+                group.staticMember(instanceId) :
+                group.getOrMaybeCreateMember(memberId, false);
 
-        log.info("[GroupId " + groupId + "] Member " + memberId + " left the 
consumer group.");
-
-        List<Record> records = consumerGroupFenceMember(group, member);
+        List<Record> records = new ArrayList<>();
+        if (memberEpoch == LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
+            throwIfStaticMemberValidationFails(groupId, instanceId, member, 
memberEpoch, memberId);

Review Comment:
   This is actually executed twice. Once here and once in 
`consumerGroupStaticMemberGroupLeave`. I also wonder if we need to full 
validation here. I suppose that ensuring that the member id is correct would be 
enough, no?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -983,27 +1085,81 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
         return new CoordinatorResult<>(records, response);
     }
 
+    private void replaceStaticMemberInConsumerGroup(
+        List<Record> records,
+        ConsumerGroup group,
+        ConsumerGroupMember existingStaticMember
+    ) {
+        // Write tombstones for the departed static member.
+        removeMember(records, group.groupId(), 
existingStaticMember.memberId());
+        // Cancel all the timers of the departed static member.
+        cancelTimers(group.groupId(), existingStaticMember.memberId());
+    }
+
+    private boolean staticMemberReplaced(int memberEpoch, ConsumerGroupMember 
existingStaticMember) {
+        return memberEpoch == 0 && existingStaticMember != null && 
existingStaticMember.memberEpoch() == LEAVE_GROUP_STATIC_MEMBER_EPOCH;
+    }
+
     /**
      * Handles leave request from a consumer group member.
      * @param groupId       The group id from the request.
      * @param memberId      The member id from the request.
+     * @param memberEpoch   The member epoch from the request.
      *
      * @return A Result containing the ConsumerGroupHeartbeat response and
      *         a list of records to update the state machine.
      */
     private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> 
consumerGroupLeave(
         String groupId,
-        String memberId
+        String instanceId,
+        String memberId,
+        int memberEpoch
     ) throws ApiException {
         ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false);
-        ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, 
false);
+        ConsumerGroupMember member = memberEpoch == 
LEAVE_GROUP_STATIC_MEMBER_EPOCH ?
+                group.staticMember(instanceId) :
+                group.getOrMaybeCreateMember(memberId, false);
 
-        log.info("[GroupId " + groupId + "] Member " + memberId + " left the 
consumer group.");
-
-        List<Record> records = consumerGroupFenceMember(group, member);
+        List<Record> records = new ArrayList<>();
+        if (memberEpoch == LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
+            throwIfStaticMemberValidationFails(groupId, instanceId, member, 
memberEpoch, memberId);
+            log.info("[GroupId {}] Member {} with instance id {} is a static 
member and will not be fenced from the group",
+                    group.groupId(), member.memberId(), member.instanceId());
+            records.addAll(consumerGroupStaticMemberGroupLeave(group, member, 
memberId));
+        } else {
+            log.info("[GroupId {}] Member {} left the consumer group.", 
groupId, memberId);
+            records.addAll(consumerGroupFenceMember(group, member));
+        }
         return new CoordinatorResult<>(records, new 
ConsumerGroupHeartbeatResponseData()
             .setMemberId(memberId)
-            .setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH));
+            .setMemberEpoch(memberEpoch));
+    }
+
+    /**
+     * Handles the case when a static member decides to leave the group.
+     * The member is not actually fenced from the group, and instead it's
+     * member epoch is updated to -2 to reflect that a member using the given
+     * instance id decided to leave the group and would be back within session
+     * timeout.
+     *
+     * @param group       The group.
+     * @param existingStaticMember      The member.

Review Comment:
   nit: Let's align the description of the params.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -671,6 +688,10 @@ private void throwIfConsumerGroupHeartbeatRequestIsInvalid(
             if (request.subscribedTopicNames() == null || 
request.subscribedTopicNames().isEmpty()) {
                 throw new InvalidRequestException("SubscribedTopicNames must 
be set in first request.");
             }
+        } else if (request.memberEpoch() == LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
+            throwIfEmptyString(request.memberId(), "MemberId can't be empty. 
GroupId: " + request.groupId());
+            throwIfNull(request.instanceId(), "InstanceId can't be null for 
Static Member. GroupId: "

Review Comment:
   I think that the instance id cannot be null and cannot be empty as well. 
Then let's use `InstanceId can't be null or empty.`



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -983,27 +1085,81 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
         return new CoordinatorResult<>(records, response);
     }
 
+    private void replaceStaticMemberInConsumerGroup(
+        List<Record> records,
+        ConsumerGroup group,
+        ConsumerGroupMember existingStaticMember
+    ) {
+        // Write tombstones for the departed static member.
+        removeMember(records, group.groupId(), 
existingStaticMember.memberId());
+        // Cancel all the timers of the departed static member.
+        cancelTimers(group.groupId(), existingStaticMember.memberId());
+    }
+
+    private boolean staticMemberReplaced(int memberEpoch, ConsumerGroupMember 
existingStaticMember) {
+        return memberEpoch == 0 && existingStaticMember != null && 
existingStaticMember.memberEpoch() == LEAVE_GROUP_STATIC_MEMBER_EPOCH;
+    }
+
     /**
      * Handles leave request from a consumer group member.
      * @param groupId       The group id from the request.
      * @param memberId      The member id from the request.
+     * @param memberEpoch   The member epoch from the request.
      *
      * @return A Result containing the ConsumerGroupHeartbeat response and
      *         a list of records to update the state machine.
      */
     private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> 
consumerGroupLeave(
         String groupId,
-        String memberId
+        String instanceId,
+        String memberId,
+        int memberEpoch
     ) throws ApiException {
         ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false);
-        ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, 
false);
+        ConsumerGroupMember member = memberEpoch == 
LEAVE_GROUP_STATIC_MEMBER_EPOCH ?
+                group.staticMember(instanceId) :
+                group.getOrMaybeCreateMember(memberId, false);
 
-        log.info("[GroupId " + groupId + "] Member " + memberId + " left the 
consumer group.");
-
-        List<Record> records = consumerGroupFenceMember(group, member);
+        List<Record> records = new ArrayList<>();
+        if (memberEpoch == LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
+            throwIfStaticMemberValidationFails(groupId, instanceId, member, 
memberEpoch, memberId);
+            log.info("[GroupId {}] Member {} with instance id {} is a static 
member and will not be fenced from the group",
+                    group.groupId(), member.memberId(), member.instanceId());
+            records.addAll(consumerGroupStaticMemberGroupLeave(group, member, 
memberId));
+        } else {
+            log.info("[GroupId {}] Member {} left the consumer group.", 
groupId, memberId);
+            records.addAll(consumerGroupFenceMember(group, member));

Review Comment:
   nit: The `addAll` does not seem necessary here. Could we avoid it?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -671,6 +688,10 @@ private void throwIfConsumerGroupHeartbeatRequestIsInvalid(
             if (request.subscribedTopicNames() == null || 
request.subscribedTopicNames().isEmpty()) {
                 throw new InvalidRequestException("SubscribedTopicNames must 
be set in first request.");
             }
+        } else if (request.memberEpoch() == LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
+            throwIfEmptyString(request.memberId(), "MemberId can't be empty. 
GroupId: " + request.groupId());

Review Comment:
   nit: Let's use `"MemberId can't be empty."` to be consistent with the 
previous errors.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -731,6 +752,36 @@ private void throwIfConsumerGroupIsFull(
         }
     }
 
+    private void throwIfStaticMemberValidationFails(
+        String groupId,
+        String instanceId,
+        ConsumerGroupMember existingStaticMember,
+        int memberEpoch,
+        String memberId
+    ) {
+        if (memberEpoch != 0) {
+            // The member joined with a non-zero epoch but we haven't 
registered this static member
+            // This could be an unknown member for the coordinator.
+            if (existingStaticMember == null) {
+                throw Errors.UNKNOWN_MEMBER_ID.exception("No existing static 
member found against instance id: " +  instanceId);
+            }
+            // There already exists a different member id for the same 
instance id.
+            if (!existingStaticMember.memberId().equals(memberId)) {
+                log.info("Request memberId={} for static member with 
groupInstanceId={} " +
+                        "is fenced by existing memberId={}",
+                    memberId, instanceId, existingStaticMember.memberId());
+                throw Errors.FENCED_INSTANCE_ID.exception();

Review Comment:
   Could we add something to the exception?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to