vamossagar12 commented on code in PR #14327: URL: https://github.com/apache/kafka/pull/14327#discussion_r1376038092
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -669,7 +668,12 @@ private void throwIfConsumerGroupHeartbeatRequestIsInvalid( throw new InvalidRequestException("TopicPartitions must be empty when (re-)joining."); } if (request.subscribedTopicNames() == null || request.subscribedTopicNames().isEmpty()) { - throw new InvalidRequestException("SubscribedTopicNames must be set in first request."); + if (request.subscribedTopicRegex() == null || request.subscribedTopicRegex().isEmpty()) { Review Comment: I would recommend creating 2 separate methods on the lines of => `isSubscribedTopicRegexEmpty()` and `isSubscribedTopicNamesEmpty()` or something similar and moving these if conditions outside. We can use it in the next line as well where we are check if only one of them is set. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -669,7 +668,12 @@ private void throwIfConsumerGroupHeartbeatRequestIsInvalid( throw new InvalidRequestException("TopicPartitions must be empty when (re-)joining."); } if (request.subscribedTopicNames() == null || request.subscribedTopicNames().isEmpty()) { - throw new InvalidRequestException("SubscribedTopicNames must be set in first request."); + if (request.subscribedTopicRegex() == null || request.subscribedTopicRegex().isEmpty()) { + throw new InvalidRequestException("SubscribedTopicNames or SubscribedTopicRegex must be set in first request."); + } + } + if (request.subscribedTopicRegex() != null && !request.subscribedTopicNames().isEmpty() && !isEmpty(request.subscribedTopicRegex())) { Review Comment: Please check if we can extract these conditions to a separate method and the negations of those can be used here. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -669,7 +668,12 @@ private void throwIfConsumerGroupHeartbeatRequestIsInvalid( throw new InvalidRequestException("TopicPartitions must be empty when (re-)joining."); } if (request.subscribedTopicNames() == null || request.subscribedTopicNames().isEmpty()) { - throw new InvalidRequestException("SubscribedTopicNames must be set in first request."); + if (request.subscribedTopicRegex() == null || request.subscribedTopicRegex().isEmpty()) { + throw new InvalidRequestException("SubscribedTopicNames or SubscribedTopicRegex must be set in first request."); + } + } + if (request.subscribedTopicRegex() != null && !request.subscribedTopicNames().isEmpty() && !isEmpty(request.subscribedTopicRegex())) { + throw new InvalidRequestException("SubscribedTopicNames or SubscribedTopicRegex should not be set at the same time."); Review Comment: nit: `Both SubscribedTopicNames and SubscribedTopicRegex should not be set at the same time.` ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -669,7 +668,12 @@ private void throwIfConsumerGroupHeartbeatRequestIsInvalid( throw new InvalidRequestException("TopicPartitions must be empty when (re-)joining."); } if (request.subscribedTopicNames() == null || request.subscribedTopicNames().isEmpty()) { - throw new InvalidRequestException("SubscribedTopicNames must be set in first request."); + if (request.subscribedTopicRegex() == null || request.subscribedTopicRegex().isEmpty()) { + throw new InvalidRequestException("SubscribedTopicNames or SubscribedTopicRegex must be set in first request."); Review Comment: nit: `Either SubscribedTopicNames or SubscribedTopicRegex must be set in first request.` ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ########## @@ -709,38 +713,53 @@ private static void maybeUpdateServerAssignors( /** * Updates the subscribed topic names count. * + * @param topicsImage The current metadata for all available topics. * @param oldMember The old member. * @param newMember The new member. */ private void maybeUpdateSubscribedTopicNames( - ConsumerGroupMember oldMember, - ConsumerGroupMember newMember + TopicsImage topicsImage, + ConsumerGroupMember oldMember, + ConsumerGroupMember newMember ) { - maybeUpdateSubscribedTopicNames(subscribedTopicNames, oldMember, newMember); + maybeUpdateSubscribedTopicNames(topicsImage, subscribedTopicNames, oldMember, newMember); } /** * Updates the subscription count. * + * @param topicsImage The current metadata for all available topics. * @param subscribedTopicCount The map to update. * @param oldMember The old member. * @param newMember The new member. */ private static void maybeUpdateSubscribedTopicNames( + TopicsImage topicsImage, Map<String, Integer> subscribedTopicCount, ConsumerGroupMember oldMember, ConsumerGroupMember newMember ) { if (oldMember != null) { - oldMember.subscribedTopicNames().forEach(topicName -> - subscribedTopicCount.compute(topicName, ConsumerGroup::decValue) - ); + processMemberSubscribedTopics(oldMember, ConsumerGroup::decValue); } - if (newMember != null) { - newMember.subscribedTopicNames().forEach(topicName -> - subscribedTopicCount.compute(topicName, ConsumerGroup::incValue) - ); + processMemberSubscribedTopics(newMember, ConsumerGroup::incValue); Review Comment: I think this and https://github.com/apache/kafka/pull/14327/files#diff-74fddde890b14d5626cb56e3eda5feb9d09ae0a4f469901517fed7eb1b11f34fR743 will throw a compilation error because the method `processMemberSubscribedTopics` accepts 4 parameters and we are passing only 2. I see the Jenkins build is also failing due to the same reason. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ########## @@ -288,14 +291,15 @@ public ConsumerGroupMember getOrMaybeCreateMember( /** * Updates the member. * + * @param topicsImage The current metadata for all available topics. Review Comment: The order of parameters must be inverted i.e `newMember` must be specified first and then `topicsImage` ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ########## @@ -304,11 +308,12 @@ public void updateMember(ConsumerGroupMember newMember) { /** * Remove the member from the group. * + * @param topicsImage The current metadata for all available topics. Review Comment: Same comment here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org