jolshan commented on code in PR #13901: URL: https://github.com/apache/kafka/pull/13901#discussion_r1244485648
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -727,6 +800,80 @@ public void replay( + " but did not receive ConsumerGroupTargetAssignmentMetadataValue tombstone."); } consumerGroup.removeMember(memberId); + updateGroupsByTopics(groupId, oldSubscribedTopicNames, consumerGroup.subscribedTopicNames()); + } + } + + /** + * @return The set of groups subscribed to the topic. + */ + public Set<String> groupsSubscribedToTopic(String topicName) { + Set<String> groups = groupsByTopics.get(topicName); + return groups != null ? groups : Collections.emptySet(); + } + + /** + * Subscribes a group to a topic. + * + * @param groupId The group id. + * @param topicName The topic name. + */ + private void subscribeGroupToTopic( + String groupId, + String topicName + ) { + groupsByTopics + .computeIfAbsent(topicName, __ -> new TimelineHashSet<>(snapshotRegistry, 1)) + .add(groupId); + } + + /** + * Unsubscribes a group from a topic. + * + * @param groupId The group id. + * @param topicName The topic name. + */ + private void unsubscribeGroupFromTopic( Review Comment: should return be groupsIds type here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org