vamossagar12 commented on code in PR #14432: URL: https://github.com/apache/kafka/pull/14432#discussion_r1383612141
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -898,31 +987,44 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr group.setMetadataRefreshDeadline(currentTimeMs + consumerGroupMetadataRefreshIntervalMs, groupEpoch); } - // 2. Update the target assignment if the group epoch is larger than the target assignment epoch. The - // delta between the existing and the new target assignment is persisted to the partition. + // 2. Update the target assignment if the group epoch is larger than the target assignment epoch or a static member + // replaces an existing static member. The delta between the existing and the new target assignment is persisted to the partition. int targetAssignmentEpoch = group.assignmentEpoch(); Assignment targetAssignment = group.targetAssignment(memberId); - if (groupEpoch > targetAssignmentEpoch) { + if (groupEpoch > targetAssignmentEpoch || staticMemberReplaced) { String preferredServerAssignor = group.computePreferredServerAssignor( member, updatedMember ).orElse(defaultAssignor.name()); try { - TargetAssignmentBuilder.TargetAssignmentResult assignmentResult = - new TargetAssignmentBuilder(groupId, groupEpoch, assignors.get(preferredServerAssignor)) + TargetAssignmentBuilder assignmentResultBuilder = + new TargetAssignmentBuilder(groupId, groupEpoch, assignors.get(preferredServerAssignor)); + TargetAssignmentBuilder.TargetAssignmentResult assignmentResult; + // A new static member is replacing an older one with the same subscriptions. + // We just need to remove the older member and add the newer one. The new member can + // reuse the target assignment of the older member. + if (staticMemberReplaced && groupEpoch == targetAssignmentEpoch) { + targetAssignment = group.targetAssignment(existingStaticMember.memberId()); + assignmentResult = assignmentResultBuilder + .removeMember(existingStaticMember.memberId()) + .addOrUpdateMember(memberId, updatedMember) + .build(); + records.addAll(assignmentResult.records()); Review Comment: I have added members and target assignment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org