dajac commented on code in PR #14432: URL: https://github.com/apache/kafka/pull/14432#discussion_r1348699907
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ########## @@ -107,6 +107,11 @@ public static class DeadlineAndEpoch { */ private final TimelineHashMap<String, ConsumerGroupMember> members; + /** + * The static group members. + */ + private final Map<String, String> staticMembers; Review Comment: I suppose that this must be a `TimelineHashMap`. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -564,6 +564,22 @@ private void throwIfNotNull( } } + /** + * Throws an InvalidRequestException if the value is null. + * + * @param value The value. + * @param error The error message. + * @throws InvalidRequestException + */ + private void throwIfNull( + Object value, + String error Review Comment: nit: Indentation should be 4 spaces. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ########## @@ -285,6 +302,64 @@ public ConsumerGroupMember getOrMaybeCreateMember( return member; } + /** + * Gets or creates a static member. + * + * @param memberId The member id. + * @param instanceId The group instance id. + * @param createIfNotExists Booleans indicating whether the member must be + * created if it does not exist. + * + * @return A ConsumerGroupMember. + */ + public ConsumerGroupMember getOrMaybeCreateStaticMember( + String memberId, + String instanceId, + boolean createIfNotExists + ) { + ConsumerGroupMember member; + String existingMemberId = staticMemberId(instanceId); + if (!createIfNotExists) { + // The member joined with a non-zero epoch but we haven't registered this static member + // This could be an unknown member for the coordinator. + if (existingMemberId == null) { + throw Errors.UNKNOWN_MEMBER_ID.exception(); + } + // We can't create a member at this point. If the 2 member-ids don't match, + // we will throw an error. + if (!existingMemberId.equals(memberId)) { + throw Errors.FENCED_INSTANCE_ID.exception(); + } + member = getOrMaybeCreateMember(memberId, false); + } else { + // No existing member found against this instance id. Creating new. + if (existingMemberId == null) { + member = getOrMaybeCreateMember(memberId, true); + staticMembers.put(instanceId, memberId); + return member; + } else { + // Get the details of the existing member + ConsumerGroupMember existingMember = getOrMaybeCreateMember(existingMemberId, false); + int currentMemberEpoch = existingMember.memberEpoch(); + // A new member with a used instance id joined but the previous member using the same instance id + // hasn't requested leaving the group. + if (currentMemberEpoch != -2 && !existingMemberId.equals(memberId)) { + throw Errors.UNRELEASED_INSTANCE_ID.exception(); + } + // A new static member is trying to take the place of a departed static member. We will + // provide the assignments of the old member to the new one. + member = new ConsumerGroupMember.Builder(memberId, existingMember) + .setMemberEpoch(existingMember.targetMemberEpoch()) + .setPreviousMemberEpoch(0) + .setTargetMemberEpoch(existingMember.targetMemberEpoch()) + .build(); + updateMember(member); + staticMembers.put(instanceId, memberId); Review Comment: As said previously, the state should not be updated here but in replay. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -750,7 +770,9 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr // Get or create the member. if (memberId.isEmpty()) memberId = Uuid.randomUuid().toString(); - final ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, createIfNotExists); + final ConsumerGroupMember member = instanceId == null ? + group.getOrMaybeCreateMember(memberId, createIfNotExists) : + group.getOrMaybeCreateStaticMember(memberId, instanceId, createIfNotExists); Review Comment: nit: indentation. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -908,23 +936,40 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr * Handles leave request from a consumer group member. * @param groupId The group id from the request. * @param memberId The member id from the request. + * @param memberEpoch The member epoch from the request. * * @return A Result containing the ConsumerGroupHeartbeat response and * a list of records to update the state machine. */ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGroupLeave( String groupId, - String memberId + String instanceId, + String memberId, + int memberEpoch ) throws ApiException { ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false); - ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false); - - log.info("[GroupId " + groupId + "] Member " + memberId + " left the consumer group."); + ConsumerGroupMember member = memberEpoch == -2 ? + group.getOrMaybeCreateStaticMember(memberId, instanceId, false) : + group.getOrMaybeCreateMember(memberId, false); - List<Record> records = consumerGroupFenceMember(group, member); + List<Record> records = new ArrayList<>(); + // The departing member is a static one. We don't need to fence this member because it is + // expected to come back within session timeout + if (memberEpoch == -2) { Review Comment: I would rather prefer to have a separate method for the static leave group. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -1127,7 +1176,9 @@ public void replay( Set<String> oldSubscribedTopicNames = new HashSet<>(consumerGroup.subscribedTopicNames()); if (value != null) { - ConsumerGroupMember oldMember = consumerGroup.getOrMaybeCreateMember(memberId, true); + ConsumerGroupMember oldMember = value.instanceId() != null ? + consumerGroup.getOrMaybeCreateStaticMember(memberId, value.instanceId(), true) : + consumerGroup.getOrMaybeCreateMember(memberId, true); consumerGroup.updateMember(new ConsumerGroupMember.Builder(oldMember) Review Comment: In there, we need to update the static id mapping in updateMember and removeMember, I think. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ########## @@ -285,6 +302,64 @@ public ConsumerGroupMember getOrMaybeCreateMember( return member; } + /** + * Gets or creates a static member. + * + * @param memberId The member id. + * @param instanceId The group instance id. + * @param createIfNotExists Booleans indicating whether the member must be + * created if it does not exist. + * + * @return A ConsumerGroupMember. + */ + public ConsumerGroupMember getOrMaybeCreateStaticMember( + String memberId, + String instanceId, + boolean createIfNotExists + ) { + ConsumerGroupMember member; + String existingMemberId = staticMemberId(instanceId); + if (!createIfNotExists) { + // The member joined with a non-zero epoch but we haven't registered this static member + // This could be an unknown member for the coordinator. + if (existingMemberId == null) { + throw Errors.UNKNOWN_MEMBER_ID.exception(); Review Comment: Could we add custom message to all the exceptions raise in this method? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ########## @@ -285,6 +302,64 @@ public ConsumerGroupMember getOrMaybeCreateMember( return member; } + /** + * Gets or creates a static member. + * + * @param memberId The member id. + * @param instanceId The group instance id. + * @param createIfNotExists Booleans indicating whether the member must be + * created if it does not exist. + * + * @return A ConsumerGroupMember. + */ + public ConsumerGroupMember getOrMaybeCreateStaticMember( + String memberId, + String instanceId, + boolean createIfNotExists + ) { + ConsumerGroupMember member; + String existingMemberId = staticMemberId(instanceId); + if (!createIfNotExists) { + // The member joined with a non-zero epoch but we haven't registered this static member + // This could be an unknown member for the coordinator. + if (existingMemberId == null) { + throw Errors.UNKNOWN_MEMBER_ID.exception(); + } + // We can't create a member at this point. If the 2 member-ids don't match, + // we will throw an error. + if (!existingMemberId.equals(memberId)) { + throw Errors.FENCED_INSTANCE_ID.exception(); + } + member = getOrMaybeCreateMember(memberId, false); + } else { + // No existing member found against this instance id. Creating new. + if (existingMemberId == null) { + member = getOrMaybeCreateMember(memberId, true); + staticMembers.put(instanceId, memberId); Review Comment: The state should not be updated like this. All the updates are handled in the `replay()` methods. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ########## @@ -285,6 +302,64 @@ public ConsumerGroupMember getOrMaybeCreateMember( return member; } + /** + * Gets or creates a static member. + * + * @param memberId The member id. + * @param instanceId The group instance id. + * @param createIfNotExists Booleans indicating whether the member must be + * created if it does not exist. + * + * @return A ConsumerGroupMember. + */ + public ConsumerGroupMember getOrMaybeCreateStaticMember( + String memberId, + String instanceId, + boolean createIfNotExists + ) { + ConsumerGroupMember member; + String existingMemberId = staticMemberId(instanceId); + if (!createIfNotExists) { + // The member joined with a non-zero epoch but we haven't registered this static member + // This could be an unknown member for the coordinator. + if (existingMemberId == null) { + throw Errors.UNKNOWN_MEMBER_ID.exception(); + } + // We can't create a member at this point. If the 2 member-ids don't match, + // we will throw an error. + if (!existingMemberId.equals(memberId)) { + throw Errors.FENCED_INSTANCE_ID.exception(); + } + member = getOrMaybeCreateMember(memberId, false); + } else { + // No existing member found against this instance id. Creating new. + if (existingMemberId == null) { + member = getOrMaybeCreateMember(memberId, true); + staticMembers.put(instanceId, memberId); + return member; + } else { + // Get the details of the existing member + ConsumerGroupMember existingMember = getOrMaybeCreateMember(existingMemberId, false); + int currentMemberEpoch = existingMember.memberEpoch(); + // A new member with a used instance id joined but the previous member using the same instance id + // hasn't requested leaving the group. + if (currentMemberEpoch != -2 && !existingMemberId.equals(memberId)) { Review Comment: I wonder if we really need the second part of the condition here. What was your thinking about it? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ########## @@ -285,6 +302,64 @@ public ConsumerGroupMember getOrMaybeCreateMember( return member; } + /** + * Gets or creates a static member. + * + * @param memberId The member id. + * @param instanceId The group instance id. + * @param createIfNotExists Booleans indicating whether the member must be + * created if it does not exist. + * + * @return A ConsumerGroupMember. + */ + public ConsumerGroupMember getOrMaybeCreateStaticMember( + String memberId, + String instanceId, + boolean createIfNotExists + ) { + ConsumerGroupMember member; + String existingMemberId = staticMemberId(instanceId); + if (!createIfNotExists) { + // The member joined with a non-zero epoch but we haven't registered this static member + // This could be an unknown member for the coordinator. + if (existingMemberId == null) { + throw Errors.UNKNOWN_MEMBER_ID.exception(); + } + // We can't create a member at this point. If the 2 member-ids don't match, + // we will throw an error. + if (!existingMemberId.equals(memberId)) { + throw Errors.FENCED_INSTANCE_ID.exception(); + } + member = getOrMaybeCreateMember(memberId, false); + } else { + // No existing member found against this instance id. Creating new. + if (existingMemberId == null) { + member = getOrMaybeCreateMember(memberId, true); + staticMembers.put(instanceId, memberId); + return member; + } else { + // Get the details of the existing member + ConsumerGroupMember existingMember = getOrMaybeCreateMember(existingMemberId, false); + int currentMemberEpoch = existingMember.memberEpoch(); + // A new member with a used instance id joined but the previous member using the same instance id + // hasn't requested leaving the group. + if (currentMemberEpoch != -2 && !existingMemberId.equals(memberId)) { + throw Errors.UNRELEASED_INSTANCE_ID.exception(); + } + // A new static member is trying to take the place of a departed static member. We will + // provide the assignments of the old member to the new one. + member = new ConsumerGroupMember.Builder(memberId, existingMember) + .setMemberEpoch(existingMember.targetMemberEpoch()) + .setPreviousMemberEpoch(0) + .setTargetMemberEpoch(existingMember.targetMemberEpoch()) + .build(); Review Comment: This is a bit weird because you pass `existingMember` to the builder and then you still have to override other fields. Would it be better to do `new ConsumerGroupMember.Builder(existingMember)` and then override the fields? I think that we only need to set the new member id. nit: The indentation should be four spaces. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ########## @@ -285,6 +302,64 @@ public ConsumerGroupMember getOrMaybeCreateMember( return member; } + /** + * Gets or creates a static member. + * + * @param memberId The member id. + * @param instanceId The group instance id. + * @param createIfNotExists Booleans indicating whether the member must be + * created if it does not exist. + * + * @return A ConsumerGroupMember. + */ + public ConsumerGroupMember getOrMaybeCreateStaticMember( + String memberId, + String instanceId, + boolean createIfNotExists Review Comment: nit: Indentation should be four spaces. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -790,6 +812,12 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr groupId, memberId, updatedMember.subscribedTopicRegex()); bumpGroupEpoch = true; } + } else { + // A new static member replaces an older one with the same instance id, assignments etc. + // We will create a new member subscription record for this new member. + if (instanceId != null) { + records.add(newMemberSubscriptionRecord(groupId, updatedMember)); + } Review Comment: This does not seem correct because we will write a record whenever the member is not updated and we have an instance id. I think that it would be better to capture the fact that we have a new static member in the condition at L801. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -908,23 +936,40 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr * Handles leave request from a consumer group member. * @param groupId The group id from the request. * @param memberId The member id from the request. + * @param memberEpoch The member epoch from the request. * * @return A Result containing the ConsumerGroupHeartbeat response and * a list of records to update the state machine. */ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGroupLeave( String groupId, - String memberId + String instanceId, + String memberId, + int memberEpoch ) throws ApiException { ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false); - ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false); - - log.info("[GroupId " + groupId + "] Member " + memberId + " left the consumer group."); + ConsumerGroupMember member = memberEpoch == -2 ? + group.getOrMaybeCreateStaticMember(memberId, instanceId, false) : + group.getOrMaybeCreateMember(memberId, false); - List<Record> records = consumerGroupFenceMember(group, member); + List<Record> records = new ArrayList<>(); + // The departing member is a static one. We don't need to fence this member because it is + // expected to come back within session timeout + if (memberEpoch == -2) { + log.info("[GroupId {}] Member {} with instance id {} is a static member and will not be fenced from the group", + groupId, memberId, member.instanceId()); + // We will write a member epoch of -2 for this departing static member. + ConsumerGroupMember leavingStaticMember = new ConsumerGroupMember.Builder(member) + .setMemberEpoch(-2) + .build(); Review Comment: nit: indentation. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -750,7 +770,9 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr // Get or create the member. if (memberId.isEmpty()) memberId = Uuid.randomUuid().toString(); - final ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, createIfNotExists); + final ConsumerGroupMember member = instanceId == null ? + group.getOrMaybeCreateMember(memberId, createIfNotExists) : + group.getOrMaybeCreateStaticMember(memberId, instanceId, createIfNotExists); throwIfMemberEpochIsInvalid(member, memberEpoch, ownedTopicPartitions); if (memberEpoch == 0) { Review Comment: I just thought about something else. When a static member is replaced, we need to write records to erase the state of the previous member. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -750,7 +770,9 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr // Get or create the member. if (memberId.isEmpty()) memberId = Uuid.randomUuid().toString(); - final ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, createIfNotExists); + final ConsumerGroupMember member = instanceId == null ? + group.getOrMaybeCreateMember(memberId, createIfNotExists) : + group.getOrMaybeCreateStaticMember(memberId, instanceId, createIfNotExists); throwIfMemberEpochIsInvalid(member, memberEpoch, ownedTopicPartitions); if (memberEpoch == 0) { Review Comment: I wonder if we could log something here as well when a static member is replaced. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -1084,10 +1129,14 @@ public CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGro ) throws ApiException { throwIfConsumerGroupHeartbeatRequestIsInvalid(request); - if (request.memberEpoch() == LEAVE_GROUP_MEMBER_EPOCH) { + if (request.memberEpoch() == LEAVE_GROUP_MEMBER_EPOCH || request.memberEpoch() == -2) { Review Comment: nit: Should we introduce a constant for -2 as well? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -1127,7 +1176,9 @@ public void replay( Set<String> oldSubscribedTopicNames = new HashSet<>(consumerGroup.subscribedTopicNames()); if (value != null) { - ConsumerGroupMember oldMember = consumerGroup.getOrMaybeCreateMember(memberId, true); + ConsumerGroupMember oldMember = value.instanceId() != null ? + consumerGroup.getOrMaybeCreateStaticMember(memberId, value.instanceId(), true) : + consumerGroup.getOrMaybeCreateMember(memberId, true); Review Comment: We don't need to use getOrMaybeCreateStaticMember here as we only want to look up the member by its id. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org