dajac commented on code in PR #15785: URL: https://github.com/apache/kafka/pull/15785#discussion_r1582678791
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -1307,13 +1307,14 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr } } + Map<String, Integer> subscribedTopicsMemberCount = new HashMap<>(); Review Comment: This is incorrect, I think. We should initialize it with the current subscribed topic names. Otherwise, we will use an empty map later one. I wonder if we should also call it `subscribedTopicNames` to be consistent. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -1350,6 +1351,11 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr .withMembers(group.members()) .withStaticMembers(group.staticMembers()) .withSubscriptionMetadata(subscriptionMetadata) + .withSubscriptionType(ConsumerGroup.subscriptionType( + subscribedTopicsMemberCount, + group.numMembers(), Review Comment: Note that the number of members may be incorrect here. For instance, when a new member joins, it is not accounted into it yet. I wonder if we could do the same without it. Or, we need to adjust it base on whether the member is new or not. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ########## @@ -966,6 +982,61 @@ private static void maybeUpdateSubscribedTopicNames( } } + /** + * Updates the subscription count. + * + * @param oldMember The old member. + * @param newMember The new member. + * + * @return Copy of the map of topics to the count of number of subscribers. + */ + public Map<String, Integer> updateSubscribedTopicNames( + ConsumerGroupMember oldMember, + ConsumerGroupMember newMember + ) { + Map<String, Integer> subscribedTopicCount = new HashMap<>(this.subscribedTopicNames); + if (oldMember != null) { + oldMember.subscribedTopicNames().forEach(topicName -> + subscribedTopicCount.compute(topicName, ConsumerGroup::decValue) + ); + } + + if (newMember != null) { + newMember.subscribedTopicNames().forEach(topicName -> + subscribedTopicCount.compute(topicName, ConsumerGroup::incValue) + ); + } + Review Comment: Could we reuse `maybeUpdateSubscribedTopicNames`? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -1350,6 +1351,11 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr .withMembers(group.members()) .withStaticMembers(group.staticMembers()) .withSubscriptionMetadata(subscriptionMetadata) + .withSubscriptionType(ConsumerGroup.subscriptionType( + subscribedTopicsMemberCount, + group.numMembers(), + group.subscriptionType() + )) Review Comment: I would prefer to have this right after calling `updateSubscribedTopicNames` in order to keep all the updates together. I would also call it `computeSubscriptionType`. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -1307,13 +1307,14 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr } } + Map<String, Integer> subscribedTopicsMemberCount = new HashMap<>(); if (bumpGroupEpoch || group.hasMetadataExpired(currentTimeMs)) { + subscribedTopicsMemberCount = group.updateSubscribedTopicNames(member, updatedMember); Review Comment: nit: Should we call it `computeSubscribedTopicNames` to follow `computeSubscriptionMetadata`? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ########## @@ -966,6 +982,61 @@ private static void maybeUpdateSubscribedTopicNames( } } + /** + * Updates the subscription count. + * + * @param oldMember The old member. + * @param newMember The new member. + * + * @return Copy of the map of topics to the count of number of subscribers. + */ + public Map<String, Integer> updateSubscribedTopicNames( + ConsumerGroupMember oldMember, + ConsumerGroupMember newMember + ) { + Map<String, Integer> subscribedTopicCount = new HashMap<>(this.subscribedTopicNames); + if (oldMember != null) { + oldMember.subscribedTopicNames().forEach(topicName -> + subscribedTopicCount.compute(topicName, ConsumerGroup::decValue) + ); + } + + if (newMember != null) { + newMember.subscribedTopicNames().forEach(topicName -> + subscribedTopicCount.compute(topicName, ConsumerGroup::incValue) + ); + } + + return subscribedTopicCount; + } + + /** + * Compute the subscription type of the consumer group. + * + * If all members are subscribed to the same set of topics, the type is homogeneous. + * Otherwise, it is heterogeneous. + * + * @param subscribedTopicNames A map of topic names to the count of members subscribed to each topic. + * @param numOfMembers The total number of members in the group. + * @param subscriptionType The current subscription type of the group. + * @return {@link SubscriptionType#HOMOGENEOUS} if all members are subscribed to exactly the same topics; + * otherwise, {@link SubscriptionType#HETEROGENEOUS}. + */ + public static SubscriptionType subscriptionType( + Map<String, Integer> subscribedTopicNames, + int numOfMembers, + SubscriptionType subscriptionType + ) { + if (subscribedTopicNames.isEmpty()) return subscriptionType; Review Comment: I don't get this one. If the subscriptions are empty, isn't the group HOMOGENEOUS by definition? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -1350,6 +1351,11 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr .withMembers(group.members()) .withStaticMembers(group.staticMembers()) .withSubscriptionMetadata(subscriptionMetadata) + .withSubscriptionType(ConsumerGroup.subscriptionType( + subscribedTopicsMemberCount, + group.numMembers(), Review Comment: An alternative may be to compare the counts within the Map without considering the group size. This could work because we do not accept empty subscriptions from a member. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -1307,13 +1307,14 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr } } + Map<String, Integer> subscribedTopicsMemberCount = new HashMap<>(); if (bumpGroupEpoch || group.hasMetadataExpired(currentTimeMs)) { + subscribedTopicsMemberCount = group.updateSubscribedTopicNames(member, updatedMember); // The subscription metadata is updated in two cases: // 1) The member has updated its subscriptions; // 2) The refresh deadline has been reached. Review Comment: nit: Let's keep these comments at the top of the if branch. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ########## @@ -948,7 +964,7 @@ private void maybeUpdateSubscribedTopicNames( * @param oldMember The old member. * @param newMember The new member. */ - private static void maybeUpdateSubscribedTopicNames( + public static void maybeUpdateSubscribedTopicNames( Review Comment: nit: Could we keep it private? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org