vamossagar12 commented on code in PR #14327:
URL: https://github.com/apache/kafka/pull/14327#discussion_r1371564208


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -727,20 +728,42 @@ private void maybeUpdateSubscribedTopicNames(
      * @param newMember             The new member.
      */
     private static void maybeUpdateSubscribedTopicNames(
+        TopicsImage topics,
         Map<String, Integer> subscribedTopicCount,
         ConsumerGroupMember oldMember,
         ConsumerGroupMember newMember
     ) {
         if (oldMember != null) {
-            oldMember.subscribedTopicNames().forEach(topicName ->
-                subscribedTopicCount.compute(topicName, 
ConsumerGroup::decValue)
-            );
+            String oldMemberRegex = oldMember.subscribedTopicRegex();
+            if (oldMemberRegex == null || oldMemberRegex.isEmpty()) {
+                oldMember.subscribedTopicNames().forEach(topicName ->
+                        subscribedTopicCount.compute(topicName, 
ConsumerGroup::decValue)
+                );
+            } else {
+                Pattern pattern = Pattern.compile(oldMemberRegex);
+                oldMember.subscribedTopicNames()

Review Comment:
   Do we need this condition? Eventually for the old member's subscribed topic 
we need to decrement the counts. Assuming that any topic subscribed by virtue 
of regex based subscriptions as well will increment the count, we don't need to 
run pattern matching like this I feel.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java:
##########
@@ -556,6 +564,56 @@ public void testUpdateSubscriptionMetadata() {
                 image.cluster()
             )
         );
+
+        // Removing member1 results in returning bar and zar.
+        assertEquals(
+                mkMap(
+                        mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, 
mkMapOfPartitionRacks(2))),
+                        mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3, 
mkMapOfPartitionRacks(3)))
+                ),
+                consumerGroup.computeSubscriptionMetadata(
+                        member1,
+                        null,
+                        image.topics(),
+                        image.cluster()
+                )
+        );
+
+        // Updating group with removal of member1.
+        consumerGroup.removeMember(member1.memberId());
+
+        // Compute while taking into account member 4
+        assertEquals(
+                mkMap(
+                        mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, 
mkMapOfPartitionRacks(2))),
+                        mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3, 
mkMapOfPartitionRacks(3))),
+                        mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, 
mkMapOfPartitionRacks(1))),
+                        mkEntry("food", new TopicMetadata(foodTopicId, "food", 
4, mkMapOfPartitionRacks(4)))
+                ),
+                consumerGroup.computeSubscriptionMetadata(
+                        null,
+                        member4,
+                        image.topics(),
+                        image.cluster()
+                )
+        );
+
+        // Updating group with member5.
+        consumerGroup.updateMember(member5);

Review Comment:
   Don't we need to update member4 as well here? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to