vamossagar12 commented on code in PR #14327:
URL: https://github.com/apache/kafka/pull/14327#discussion_r1376038092


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -669,7 +668,12 @@ private void throwIfConsumerGroupHeartbeatRequestIsInvalid(
                 throw new InvalidRequestException("TopicPartitions must be 
empty when (re-)joining.");
             }
             if (request.subscribedTopicNames() == null || 
request.subscribedTopicNames().isEmpty()) {
-                throw new InvalidRequestException("SubscribedTopicNames must 
be set in first request.");
+                if (request.subscribedTopicRegex() == null || 
request.subscribedTopicRegex().isEmpty()) {

Review Comment:
   I would recommend creating 2 separate methods on the lines of => 
`isSubscribedTopicRegexEmpty()` and `isSubscribedTopicNamesEmpty()` or 
something similar and moving these if conditions outside. We can use it in the 
next line as well where we are check if only one of them is set.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -669,7 +668,12 @@ private void throwIfConsumerGroupHeartbeatRequestIsInvalid(
                 throw new InvalidRequestException("TopicPartitions must be 
empty when (re-)joining.");
             }
             if (request.subscribedTopicNames() == null || 
request.subscribedTopicNames().isEmpty()) {
-                throw new InvalidRequestException("SubscribedTopicNames must 
be set in first request.");
+                if (request.subscribedTopicRegex() == null || 
request.subscribedTopicRegex().isEmpty()) {
+                    throw new InvalidRequestException("SubscribedTopicNames or 
SubscribedTopicRegex must be set in first request.");
+                }
+            }
+            if (request.subscribedTopicRegex() != null && 
!request.subscribedTopicNames().isEmpty()  && 
!isEmpty(request.subscribedTopicRegex())) {

Review Comment:
   Please check if we can extract these conditions to a separate method and the 
negations of those can be used here.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -669,7 +668,12 @@ private void throwIfConsumerGroupHeartbeatRequestIsInvalid(
                 throw new InvalidRequestException("TopicPartitions must be 
empty when (re-)joining.");
             }
             if (request.subscribedTopicNames() == null || 
request.subscribedTopicNames().isEmpty()) {
-                throw new InvalidRequestException("SubscribedTopicNames must 
be set in first request.");
+                if (request.subscribedTopicRegex() == null || 
request.subscribedTopicRegex().isEmpty()) {
+                    throw new InvalidRequestException("SubscribedTopicNames or 
SubscribedTopicRegex must be set in first request.");
+                }
+            }
+            if (request.subscribedTopicRegex() != null && 
!request.subscribedTopicNames().isEmpty()  && 
!isEmpty(request.subscribedTopicRegex())) {
+                throw new InvalidRequestException("SubscribedTopicNames or 
SubscribedTopicRegex should not be set at the same time.");

Review Comment:
   nit: `Both SubscribedTopicNames and SubscribedTopicRegex should not be set 
at the same time.`



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -669,7 +668,12 @@ private void throwIfConsumerGroupHeartbeatRequestIsInvalid(
                 throw new InvalidRequestException("TopicPartitions must be 
empty when (re-)joining.");
             }
             if (request.subscribedTopicNames() == null || 
request.subscribedTopicNames().isEmpty()) {
-                throw new InvalidRequestException("SubscribedTopicNames must 
be set in first request.");
+                if (request.subscribedTopicRegex() == null || 
request.subscribedTopicRegex().isEmpty()) {
+                    throw new InvalidRequestException("SubscribedTopicNames or 
SubscribedTopicRegex must be set in first request.");

Review Comment:
   nit: `Either SubscribedTopicNames or SubscribedTopicRegex must be set in 
first request.`



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -709,38 +713,53 @@ private static void maybeUpdateServerAssignors(
     /**
      * Updates the subscribed topic names count.
      *
+     * @param topicsImage The current metadata for all available topics.
      * @param oldMember The old member.
      * @param newMember The new member.
      */
     private void maybeUpdateSubscribedTopicNames(
-        ConsumerGroupMember oldMember,
-        ConsumerGroupMember newMember
+            TopicsImage topicsImage,
+            ConsumerGroupMember oldMember,
+            ConsumerGroupMember newMember
     ) {
-        maybeUpdateSubscribedTopicNames(subscribedTopicNames, oldMember, 
newMember);
+        maybeUpdateSubscribedTopicNames(topicsImage, subscribedTopicNames, 
oldMember, newMember);
     }
 
     /**
      * Updates the subscription count.
      *
+     * @param topicsImage            The current metadata for all available 
topics.
      * @param subscribedTopicCount  The map to update.
      * @param oldMember             The old member.
      * @param newMember             The new member.
      */
     private static void maybeUpdateSubscribedTopicNames(
+        TopicsImage topicsImage,
         Map<String, Integer> subscribedTopicCount,
         ConsumerGroupMember oldMember,
         ConsumerGroupMember newMember
     ) {
         if (oldMember != null) {
-            oldMember.subscribedTopicNames().forEach(topicName ->
-                subscribedTopicCount.compute(topicName, 
ConsumerGroup::decValue)
-            );
+            processMemberSubscribedTopics(oldMember, ConsumerGroup::decValue);
         }
-
         if (newMember != null) {
-            newMember.subscribedTopicNames().forEach(topicName ->
-                subscribedTopicCount.compute(topicName, 
ConsumerGroup::incValue)
-            );
+            processMemberSubscribedTopics(newMember, ConsumerGroup::incValue);

Review Comment:
   I think this and 
https://github.com/apache/kafka/pull/14327/files#diff-74fddde890b14d5626cb56e3eda5feb9d09ae0a4f469901517fed7eb1b11f34fR743
 will throw a compilation error because the method 
`processMemberSubscribedTopics` accepts 4 parameters and we are passing only 2. 
I see the Jenkins build is also failing due to the same reason.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -288,14 +291,15 @@ public ConsumerGroupMember getOrMaybeCreateMember(
     /**
      * Updates the member.
      *
+     * @param topicsImage The current metadata for all available topics.

Review Comment:
   The order of parameters must be inverted i.e `newMember` must be specified 
first and then `topicsImage`



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -304,11 +308,12 @@ public void updateMember(ConsumerGroupMember newMember) {
     /**
      * Remove the member from the group.
      *
+     * @param topicsImage The current metadata for all available topics.

Review Comment:
   Same comment here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to