dajac commented on code in PR #14432: URL: https://github.com/apache/kafka/pull/14432#discussion_r1397287511
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -829,21 +902,50 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr // Get or create the member. if (memberId.isEmpty()) memberId = Uuid.randomUuid().toString(); - final ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, createIfNotExists); - throwIfMemberEpochIsInvalid(member, memberEpoch, ownedTopicPartitions); - - if (memberEpoch == 0) { - log.info("[GroupId {}] Member {} joins the consumer group.", groupId, memberId); + ConsumerGroupMember member; + ConsumerGroupMember.Builder updatedMemberBuilder; + ConsumerGroupMember updatedMember; + boolean staticMemberReplaced = false; + if (instanceId == null) { + member = group.getOrMaybeCreateMember(memberId, createIfNotExists); + throwIfMemberEpochIsInvalid(member, memberEpoch, ownedTopicPartitions); + if (createIfNotExists) { + log.info("[GroupId {}] Member {} joins the consumer group.", groupId, memberId); + } + updatedMemberBuilder = new ConsumerGroupMember.Builder(member); + } else { + member = group.staticMember(instanceId); Review Comment: I still find this logic quite complex to follow. I wonder if we could be a little more explicit. I think that the complexity comes from `throwIfStaticMemberValidationFails` which hide quite a lot of the logic. I wonder if something as follow would be better. I am not sure... What do you think? ``` ConsumerGroupMember existingMember = group.staticMember(instanceId); if (memberEpoch == 0) { // A new static member joins or the existing static member rejoins. if (existingMember == null) { // New static member. member = group.getOrMaybeCreateMember(memberId, true); updatedMemberBuilder = new ConsumerGroupMember.Builder(member); } else { // Static member rejoins with a different instance id so it should replace // the previous instance iff the previous instance has -2. if (existingMember.memberEpoch() != LEAVE_GROUP_STATIC_MEMBER_EPOCH) { // The new member can't join. throw Errors.UNRELEASED_INSTANCE_ID.exception(...); } else { // Replace the current member. staticMemberReplaced = true; member = existingMember; updatedMemberBuilder = new ConsumerGroupMember.Builder(group.getOrMaybeCreateMember(memberId, true)); removeMemberAndCancelTimers(records, group.groupId(), member.memberId()); } } else { // Check member id or throw FENCED_INSTANCE_ID // Check epoch with throwIfMemberEpochIsInvalid } ``` Note that I just wrote this without testing it so the code is likely not 100% correct :). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org