dajac commented on code in PR #14432: URL: https://github.com/apache/kafka/pull/14432#discussion_r1377557816
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -869,6 +954,10 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr groupId, memberId, updatedMember.subscribedTopicRegex()); bumpGroupEpoch = true; } + } else { + if (staticMemberReplaced) { + records.add(newMemberSubscriptionRecord(groupId, updatedMember)); + } Review Comment: If we rely on `member` and `updatedMember` then we don't need this because `!updatedMember.equals(member)` will catch the new member. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -829,20 +885,49 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr // Get or create the member. if (memberId.isEmpty()) memberId = Uuid.randomUuid().toString(); - final ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, createIfNotExists); + ConsumerGroupMember member; + ConsumerGroupMember existingStaticMember = null; + if (instanceId == null) { + member = group.getOrMaybeCreateMember(memberId, createIfNotExists); + } else { + existingStaticMember = group.staticMember(instanceId); + throwIfStaticMemberValidationFails(groupId, instanceId, existingStaticMember, memberEpoch, memberId); + member = group.getOrMaybeCreateMember(memberId, createIfNotExists); + } Review Comment: I wonder if we could simplify it even more. For instance, would it be possible to have something like the following: ``` ConsumerGroupMember member; ConsumerGroupMember updatedMember; if (instanceId == null) { member = group.getOrMaybeCreateMember(memberId, createIfNotExists); throwIfMemberEpochIsInvalid(member, memberEpoch, ownedTopicPartitions); log.info(...); updatedMember = new ConsumerGroupMember.Builder(member) .... } else { // the new logic. // member is the current static member. // updatedMember is the updated current member or the new one. } ``` ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -983,27 +1085,81 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr return new CoordinatorResult<>(records, response); } + private void replaceStaticMemberInConsumerGroup( + List<Record> records, + ConsumerGroup group, + ConsumerGroupMember existingStaticMember + ) { + // Write tombstones for the departed static member. + removeMember(records, group.groupId(), existingStaticMember.memberId()); + // Cancel all the timers of the departed static member. + cancelTimers(group.groupId(), existingStaticMember.memberId()); + } + + private boolean staticMemberReplaced(int memberEpoch, ConsumerGroupMember existingStaticMember) { + return memberEpoch == 0 && existingStaticMember != null && existingStaticMember.memberEpoch() == LEAVE_GROUP_STATIC_MEMBER_EPOCH; + } + /** * Handles leave request from a consumer group member. * @param groupId The group id from the request. * @param memberId The member id from the request. + * @param memberEpoch The member epoch from the request. * * @return A Result containing the ConsumerGroupHeartbeat response and * a list of records to update the state machine. */ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGroupLeave( String groupId, - String memberId + String instanceId, + String memberId, + int memberEpoch ) throws ApiException { ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false); - ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false); + ConsumerGroupMember member = memberEpoch == LEAVE_GROUP_STATIC_MEMBER_EPOCH ? + group.staticMember(instanceId) : + group.getOrMaybeCreateMember(memberId, false); - log.info("[GroupId " + groupId + "] Member " + memberId + " left the consumer group."); - - List<Record> records = consumerGroupFenceMember(group, member); + List<Record> records = new ArrayList<>(); + if (memberEpoch == LEAVE_GROUP_STATIC_MEMBER_EPOCH) { + throwIfStaticMemberValidationFails(groupId, instanceId, member, memberEpoch, memberId); + log.info("[GroupId {}] Member {} with instance id {} is a static member and will not be fenced from the group", + group.groupId(), member.memberId(), member.instanceId()); Review Comment: nit: `"[GroupId {}] Static member {} with member id {} left the consumer group."`? I would also use a similar logging structure for the other log messages. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -983,27 +1085,81 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr return new CoordinatorResult<>(records, response); Review Comment: Don't we need to also force the step 3.? If we don't do it, we don't write the current assignment record for the new member and we don't reconcile him. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -983,27 +1085,81 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr return new CoordinatorResult<>(records, response); } + private void replaceStaticMemberInConsumerGroup( + List<Record> records, + ConsumerGroup group, + ConsumerGroupMember existingStaticMember + ) { + // Write tombstones for the departed static member. + removeMember(records, group.groupId(), existingStaticMember.memberId()); + // Cancel all the timers of the departed static member. + cancelTimers(group.groupId(), existingStaticMember.memberId()); + } + + private boolean staticMemberReplaced(int memberEpoch, ConsumerGroupMember existingStaticMember) { + return memberEpoch == 0 && existingStaticMember != null && existingStaticMember.memberEpoch() == LEAVE_GROUP_STATIC_MEMBER_EPOCH; + } + /** * Handles leave request from a consumer group member. * @param groupId The group id from the request. * @param memberId The member id from the request. + * @param memberEpoch The member epoch from the request. * * @return A Result containing the ConsumerGroupHeartbeat response and * a list of records to update the state machine. */ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGroupLeave( String groupId, - String memberId + String instanceId, + String memberId, + int memberEpoch ) throws ApiException { ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false); - ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false); + ConsumerGroupMember member = memberEpoch == LEAVE_GROUP_STATIC_MEMBER_EPOCH ? + group.staticMember(instanceId) : + group.getOrMaybeCreateMember(memberId, false); - log.info("[GroupId " + groupId + "] Member " + memberId + " left the consumer group."); - - List<Record> records = consumerGroupFenceMember(group, member); + List<Record> records = new ArrayList<>(); + if (memberEpoch == LEAVE_GROUP_STATIC_MEMBER_EPOCH) { + throwIfStaticMemberValidationFails(groupId, instanceId, member, memberEpoch, memberId); + log.info("[GroupId {}] Member {} with instance id {} is a static member and will not be fenced from the group", + group.groupId(), member.memberId(), member.instanceId()); + records.addAll(consumerGroupStaticMemberGroupLeave(group, member, memberId)); + } else { + log.info("[GroupId {}] Member {} left the consumer group.", groupId, memberId); + records.addAll(consumerGroupFenceMember(group, member)); + } return new CoordinatorResult<>(records, new ConsumerGroupHeartbeatResponseData() .setMemberId(memberId) - .setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH)); + .setMemberEpoch(memberEpoch)); + } + + /** + * Handles the case when a static member decides to leave the group. + * The member is not actually fenced from the group, and instead it's + * member epoch is updated to -2 to reflect that a member using the given + * instance id decided to leave the group and would be back within session + * timeout. + * + * @param group The group. + * @param existingStaticMember The member. + * + * @return A list of records to be applied to the state. + */ + private List<Record> consumerGroupStaticMemberGroupLeave( + ConsumerGroup group, + ConsumerGroupMember existingStaticMember, Review Comment: nit: `member`? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ########## @@ -287,6 +305,18 @@ public ConsumerGroupMember getOrMaybeCreateMember( return member; } + /** + * Gets a static member. + * + * @param instanceId The group instance id. Review Comment: nit: We can reduce the space between the name and the desc. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -983,27 +1085,81 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr return new CoordinatorResult<>(records, response); } + private void replaceStaticMemberInConsumerGroup( Review Comment: nit: removeMemberAndCancelTimers? The logic is not tight to static members. I would also directly pass the groupId and the memberId as this is all it needs. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -898,31 +987,44 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr group.setMetadataRefreshDeadline(currentTimeMs + consumerGroupMetadataRefreshIntervalMs, groupEpoch); } - // 2. Update the target assignment if the group epoch is larger than the target assignment epoch. The - // delta between the existing and the new target assignment is persisted to the partition. + // 2. Update the target assignment if the group epoch is larger than the target assignment epoch or a static member + // replaces an existing static member. The delta between the existing and the new target assignment is persisted to the partition. int targetAssignmentEpoch = group.assignmentEpoch(); Assignment targetAssignment = group.targetAssignment(memberId); - if (groupEpoch > targetAssignmentEpoch) { + if (groupEpoch > targetAssignmentEpoch || staticMemberReplaced) { String preferredServerAssignor = group.computePreferredServerAssignor( member, updatedMember ).orElse(defaultAssignor.name()); try { - TargetAssignmentBuilder.TargetAssignmentResult assignmentResult = - new TargetAssignmentBuilder(groupId, groupEpoch, assignors.get(preferredServerAssignor)) + TargetAssignmentBuilder assignmentResultBuilder = + new TargetAssignmentBuilder(groupId, groupEpoch, assignors.get(preferredServerAssignor)); + TargetAssignmentBuilder.TargetAssignmentResult assignmentResult; + // A new static member is replacing an older one with the same subscriptions. + // We just need to remove the older member and add the newer one. The new member can + // reuse the target assignment of the older member. + if (staticMemberReplaced && groupEpoch == targetAssignmentEpoch) { + targetAssignment = group.targetAssignment(existingStaticMember.memberId()); + assignmentResult = assignmentResultBuilder + .removeMember(existingStaticMember.memberId()) + .addOrUpdateMember(memberId, updatedMember) + .build(); + records.addAll(assignmentResult.records()); + } else { + assignmentResult = assignmentResultBuilder .withMembers(group.members()) .withSubscriptionMetadata(subscriptionMetadata) .withTargetAssignment(group.targetAssignment()) .addOrUpdateMember(memberId, updatedMember) Review Comment: When the `TargetAssignmentBuilder` builds the spec for the assignor, it must use the target assignment of the previous static member for the new static member. How do we ensure this? We may have to update the `TargetAssignmentBuilder` to understand that a static member is replaced. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -898,31 +987,44 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr group.setMetadataRefreshDeadline(currentTimeMs + consumerGroupMetadataRefreshIntervalMs, groupEpoch); } - // 2. Update the target assignment if the group epoch is larger than the target assignment epoch. The - // delta between the existing and the new target assignment is persisted to the partition. + // 2. Update the target assignment if the group epoch is larger than the target assignment epoch or a static member + // replaces an existing static member. The delta between the existing and the new target assignment is persisted to the partition. int targetAssignmentEpoch = group.assignmentEpoch(); Assignment targetAssignment = group.targetAssignment(memberId); - if (groupEpoch > targetAssignmentEpoch) { + if (groupEpoch > targetAssignmentEpoch || staticMemberReplaced) { String preferredServerAssignor = group.computePreferredServerAssignor( member, updatedMember ).orElse(defaultAssignor.name()); try { - TargetAssignmentBuilder.TargetAssignmentResult assignmentResult = - new TargetAssignmentBuilder(groupId, groupEpoch, assignors.get(preferredServerAssignor)) + TargetAssignmentBuilder assignmentResultBuilder = + new TargetAssignmentBuilder(groupId, groupEpoch, assignors.get(preferredServerAssignor)); + TargetAssignmentBuilder.TargetAssignmentResult assignmentResult; + // A new static member is replacing an older one with the same subscriptions. + // We just need to remove the older member and add the newer one. The new member can + // reuse the target assignment of the older member. + if (staticMemberReplaced && groupEpoch == targetAssignmentEpoch) { + targetAssignment = group.targetAssignment(existingStaticMember.memberId()); + assignmentResult = assignmentResultBuilder + .removeMember(existingStaticMember.memberId()) + .addOrUpdateMember(memberId, updatedMember) + .build(); + records.addAll(assignmentResult.records()); Review Comment: I don't fully understand how this would work because the members and the target assignment are not set. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -983,27 +1085,81 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr return new CoordinatorResult<>(records, response); } + private void replaceStaticMemberInConsumerGroup( + List<Record> records, + ConsumerGroup group, + ConsumerGroupMember existingStaticMember + ) { + // Write tombstones for the departed static member. + removeMember(records, group.groupId(), existingStaticMember.memberId()); + // Cancel all the timers of the departed static member. + cancelTimers(group.groupId(), existingStaticMember.memberId()); + } + + private boolean staticMemberReplaced(int memberEpoch, ConsumerGroupMember existingStaticMember) { + return memberEpoch == 0 && existingStaticMember != null && existingStaticMember.memberEpoch() == LEAVE_GROUP_STATIC_MEMBER_EPOCH; + } + /** * Handles leave request from a consumer group member. * @param groupId The group id from the request. * @param memberId The member id from the request. + * @param memberEpoch The member epoch from the request. * * @return A Result containing the ConsumerGroupHeartbeat response and * a list of records to update the state machine. */ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGroupLeave( String groupId, - String memberId + String instanceId, + String memberId, + int memberEpoch ) throws ApiException { ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false); - ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false); + ConsumerGroupMember member = memberEpoch == LEAVE_GROUP_STATIC_MEMBER_EPOCH ? + group.staticMember(instanceId) : + group.getOrMaybeCreateMember(memberId, false); - log.info("[GroupId " + groupId + "] Member " + memberId + " left the consumer group."); - - List<Record> records = consumerGroupFenceMember(group, member); + List<Record> records = new ArrayList<>(); + if (memberEpoch == LEAVE_GROUP_STATIC_MEMBER_EPOCH) { Review Comment: nit: We check this twice. Once here and once earlier to lookup the member. Could we combine them? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -983,27 +1085,81 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr return new CoordinatorResult<>(records, response); } + private void replaceStaticMemberInConsumerGroup( + List<Record> records, + ConsumerGroup group, + ConsumerGroupMember existingStaticMember + ) { + // Write tombstones for the departed static member. + removeMember(records, group.groupId(), existingStaticMember.memberId()); + // Cancel all the timers of the departed static member. + cancelTimers(group.groupId(), existingStaticMember.memberId()); + } + + private boolean staticMemberReplaced(int memberEpoch, ConsumerGroupMember existingStaticMember) { + return memberEpoch == 0 && existingStaticMember != null && existingStaticMember.memberEpoch() == LEAVE_GROUP_STATIC_MEMBER_EPOCH; + } + /** * Handles leave request from a consumer group member. * @param groupId The group id from the request. * @param memberId The member id from the request. + * @param memberEpoch The member epoch from the request. * * @return A Result containing the ConsumerGroupHeartbeat response and * a list of records to update the state machine. */ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGroupLeave( String groupId, - String memberId + String instanceId, + String memberId, + int memberEpoch ) throws ApiException { ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false); - ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false); + ConsumerGroupMember member = memberEpoch == LEAVE_GROUP_STATIC_MEMBER_EPOCH ? + group.staticMember(instanceId) : + group.getOrMaybeCreateMember(memberId, false); - log.info("[GroupId " + groupId + "] Member " + memberId + " left the consumer group."); - - List<Record> records = consumerGroupFenceMember(group, member); + List<Record> records = new ArrayList<>(); + if (memberEpoch == LEAVE_GROUP_STATIC_MEMBER_EPOCH) { + throwIfStaticMemberValidationFails(groupId, instanceId, member, memberEpoch, memberId); Review Comment: This is actually executed twice. Once here and once in `consumerGroupStaticMemberGroupLeave`. I also wonder if we need to full validation here. I suppose that ensuring that the member id is correct would be enough, no? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -983,27 +1085,81 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr return new CoordinatorResult<>(records, response); } + private void replaceStaticMemberInConsumerGroup( + List<Record> records, + ConsumerGroup group, + ConsumerGroupMember existingStaticMember + ) { + // Write tombstones for the departed static member. + removeMember(records, group.groupId(), existingStaticMember.memberId()); + // Cancel all the timers of the departed static member. + cancelTimers(group.groupId(), existingStaticMember.memberId()); + } + + private boolean staticMemberReplaced(int memberEpoch, ConsumerGroupMember existingStaticMember) { + return memberEpoch == 0 && existingStaticMember != null && existingStaticMember.memberEpoch() == LEAVE_GROUP_STATIC_MEMBER_EPOCH; + } + /** * Handles leave request from a consumer group member. * @param groupId The group id from the request. * @param memberId The member id from the request. + * @param memberEpoch The member epoch from the request. * * @return A Result containing the ConsumerGroupHeartbeat response and * a list of records to update the state machine. */ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGroupLeave( String groupId, - String memberId + String instanceId, + String memberId, + int memberEpoch ) throws ApiException { ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false); - ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false); + ConsumerGroupMember member = memberEpoch == LEAVE_GROUP_STATIC_MEMBER_EPOCH ? + group.staticMember(instanceId) : + group.getOrMaybeCreateMember(memberId, false); - log.info("[GroupId " + groupId + "] Member " + memberId + " left the consumer group."); - - List<Record> records = consumerGroupFenceMember(group, member); + List<Record> records = new ArrayList<>(); + if (memberEpoch == LEAVE_GROUP_STATIC_MEMBER_EPOCH) { + throwIfStaticMemberValidationFails(groupId, instanceId, member, memberEpoch, memberId); + log.info("[GroupId {}] Member {} with instance id {} is a static member and will not be fenced from the group", + group.groupId(), member.memberId(), member.instanceId()); + records.addAll(consumerGroupStaticMemberGroupLeave(group, member, memberId)); + } else { + log.info("[GroupId {}] Member {} left the consumer group.", groupId, memberId); + records.addAll(consumerGroupFenceMember(group, member)); + } return new CoordinatorResult<>(records, new ConsumerGroupHeartbeatResponseData() .setMemberId(memberId) - .setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH)); + .setMemberEpoch(memberEpoch)); + } + + /** + * Handles the case when a static member decides to leave the group. + * The member is not actually fenced from the group, and instead it's + * member epoch is updated to -2 to reflect that a member using the given + * instance id decided to leave the group and would be back within session + * timeout. + * + * @param group The group. + * @param existingStaticMember The member. Review Comment: nit: Let's align the description of the params. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -671,6 +688,10 @@ private void throwIfConsumerGroupHeartbeatRequestIsInvalid( if (request.subscribedTopicNames() == null || request.subscribedTopicNames().isEmpty()) { throw new InvalidRequestException("SubscribedTopicNames must be set in first request."); } + } else if (request.memberEpoch() == LEAVE_GROUP_STATIC_MEMBER_EPOCH) { + throwIfEmptyString(request.memberId(), "MemberId can't be empty. GroupId: " + request.groupId()); + throwIfNull(request.instanceId(), "InstanceId can't be null for Static Member. GroupId: " Review Comment: I think that the instance id cannot be null and cannot be empty as well. Then let's use `InstanceId can't be null or empty.` ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -983,27 +1085,81 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr return new CoordinatorResult<>(records, response); } + private void replaceStaticMemberInConsumerGroup( + List<Record> records, + ConsumerGroup group, + ConsumerGroupMember existingStaticMember + ) { + // Write tombstones for the departed static member. + removeMember(records, group.groupId(), existingStaticMember.memberId()); + // Cancel all the timers of the departed static member. + cancelTimers(group.groupId(), existingStaticMember.memberId()); + } + + private boolean staticMemberReplaced(int memberEpoch, ConsumerGroupMember existingStaticMember) { + return memberEpoch == 0 && existingStaticMember != null && existingStaticMember.memberEpoch() == LEAVE_GROUP_STATIC_MEMBER_EPOCH; + } + /** * Handles leave request from a consumer group member. * @param groupId The group id from the request. * @param memberId The member id from the request. + * @param memberEpoch The member epoch from the request. * * @return A Result containing the ConsumerGroupHeartbeat response and * a list of records to update the state machine. */ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGroupLeave( String groupId, - String memberId + String instanceId, + String memberId, + int memberEpoch ) throws ApiException { ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false); - ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false); + ConsumerGroupMember member = memberEpoch == LEAVE_GROUP_STATIC_MEMBER_EPOCH ? + group.staticMember(instanceId) : + group.getOrMaybeCreateMember(memberId, false); - log.info("[GroupId " + groupId + "] Member " + memberId + " left the consumer group."); - - List<Record> records = consumerGroupFenceMember(group, member); + List<Record> records = new ArrayList<>(); + if (memberEpoch == LEAVE_GROUP_STATIC_MEMBER_EPOCH) { + throwIfStaticMemberValidationFails(groupId, instanceId, member, memberEpoch, memberId); + log.info("[GroupId {}] Member {} with instance id {} is a static member and will not be fenced from the group", + group.groupId(), member.memberId(), member.instanceId()); + records.addAll(consumerGroupStaticMemberGroupLeave(group, member, memberId)); + } else { + log.info("[GroupId {}] Member {} left the consumer group.", groupId, memberId); + records.addAll(consumerGroupFenceMember(group, member)); Review Comment: nit: The `addAll` does not seem necessary here. Could we avoid it? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -671,6 +688,10 @@ private void throwIfConsumerGroupHeartbeatRequestIsInvalid( if (request.subscribedTopicNames() == null || request.subscribedTopicNames().isEmpty()) { throw new InvalidRequestException("SubscribedTopicNames must be set in first request."); } + } else if (request.memberEpoch() == LEAVE_GROUP_STATIC_MEMBER_EPOCH) { + throwIfEmptyString(request.memberId(), "MemberId can't be empty. GroupId: " + request.groupId()); Review Comment: nit: Let's use `"MemberId can't be empty."` to be consistent with the previous errors. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -731,6 +752,36 @@ private void throwIfConsumerGroupIsFull( } } + private void throwIfStaticMemberValidationFails( + String groupId, + String instanceId, + ConsumerGroupMember existingStaticMember, + int memberEpoch, + String memberId + ) { + if (memberEpoch != 0) { + // The member joined with a non-zero epoch but we haven't registered this static member + // This could be an unknown member for the coordinator. + if (existingStaticMember == null) { + throw Errors.UNKNOWN_MEMBER_ID.exception("No existing static member found against instance id: " + instanceId); + } + // There already exists a different member id for the same instance id. + if (!existingStaticMember.memberId().equals(memberId)) { + log.info("Request memberId={} for static member with groupInstanceId={} " + + "is fenced by existing memberId={}", + memberId, instanceId, existingStaticMember.memberId()); + throw Errors.FENCED_INSTANCE_ID.exception(); Review Comment: Could we add something to the exception? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org