jolshan commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1244485648


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -727,6 +800,80 @@ public void replay(
                     + " but did not receive 
ConsumerGroupTargetAssignmentMetadataValue tombstone.");
             }
             consumerGroup.removeMember(memberId);
+            updateGroupsByTopics(groupId, oldSubscribedTopicNames, 
consumerGroup.subscribedTopicNames());
+        }
+    }
+
+    /**
+     * @return The set of groups subscribed to the topic.
+     */
+    public Set<String> groupsSubscribedToTopic(String topicName) {
+        Set<String> groups = groupsByTopics.get(topicName);
+        return groups != null ? groups : Collections.emptySet();
+    }
+
+    /**
+     * Subscribes a group to a topic.
+     *
+     * @param groupId   The group id.
+     * @param topicName The topic name.
+     */
+    private void subscribeGroupToTopic(
+        String groupId,
+        String topicName
+    ) {
+        groupsByTopics
+            .computeIfAbsent(topicName, __ -> new 
TimelineHashSet<>(snapshotRegistry, 1))
+            .add(groupId);
+    }
+
+    /**
+     * Unsubscribes a group from a topic.
+     *
+     * @param groupId   The group id.
+     * @param topicName The topic name.
+     */
+    private void unsubscribeGroupFromTopic(

Review Comment:
   should return be boolean here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to