dajac commented on code in PR #15785:
URL: https://github.com/apache/kafka/pull/15785#discussion_r1582678791


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1307,13 +1307,14 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
             }
         }
 
+        Map<String, Integer> subscribedTopicsMemberCount = new HashMap<>();

Review Comment:
   This is incorrect, I think. We should initialize it with the current 
subscribed topic names. Otherwise, we will use an empty map later one. I wonder 
if we should also call it `subscribedTopicNames` to be consistent.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1350,6 +1351,11 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
                         .withMembers(group.members())
                         .withStaticMembers(group.staticMembers())
                         .withSubscriptionMetadata(subscriptionMetadata)
+                        .withSubscriptionType(ConsumerGroup.subscriptionType(
+                            subscribedTopicsMemberCount,
+                            group.numMembers(),

Review Comment:
   Note that the number of members may be incorrect here. For instance, when a 
new member joins, it is not accounted into it yet. I wonder if we could do the 
same without it. Or, we need to adjust it base on whether the member is new or 
not.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -966,6 +982,61 @@ private static void maybeUpdateSubscribedTopicNames(
         }
     }
 
+    /**
+     * Updates the subscription count.
+     *
+     * @param oldMember             The old member.
+     * @param newMember             The new member.
+     *
+     * @return Copy of the map of topics to the count of number of subscribers.
+     */
+    public Map<String, Integer> updateSubscribedTopicNames(
+        ConsumerGroupMember oldMember,
+        ConsumerGroupMember newMember
+    ) {
+        Map<String, Integer> subscribedTopicCount = new 
HashMap<>(this.subscribedTopicNames);
+        if (oldMember != null) {
+            oldMember.subscribedTopicNames().forEach(topicName ->
+                subscribedTopicCount.compute(topicName, 
ConsumerGroup::decValue)
+            );
+        }
+
+        if (newMember != null) {
+            newMember.subscribedTopicNames().forEach(topicName ->
+                subscribedTopicCount.compute(topicName, 
ConsumerGroup::incValue)
+            );
+        }
+

Review Comment:
   Could we reuse `maybeUpdateSubscribedTopicNames`?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1350,6 +1351,11 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
                         .withMembers(group.members())
                         .withStaticMembers(group.staticMembers())
                         .withSubscriptionMetadata(subscriptionMetadata)
+                        .withSubscriptionType(ConsumerGroup.subscriptionType(
+                            subscribedTopicsMemberCount,
+                            group.numMembers(),
+                            group.subscriptionType()
+                        ))

Review Comment:
   I would prefer to have this right after calling `updateSubscribedTopicNames` 
in order to keep all the updates together. I would also call it 
`computeSubscriptionType`.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1307,13 +1307,14 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
             }
         }
 
+        Map<String, Integer> subscribedTopicsMemberCount = new HashMap<>();
         if (bumpGroupEpoch || group.hasMetadataExpired(currentTimeMs)) {
+            subscribedTopicsMemberCount = 
group.updateSubscribedTopicNames(member, updatedMember);

Review Comment:
   nit: Should we call it `computeSubscribedTopicNames` to follow 
`computeSubscriptionMetadata`?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -966,6 +982,61 @@ private static void maybeUpdateSubscribedTopicNames(
         }
     }
 
+    /**
+     * Updates the subscription count.
+     *
+     * @param oldMember             The old member.
+     * @param newMember             The new member.
+     *
+     * @return Copy of the map of topics to the count of number of subscribers.
+     */
+    public Map<String, Integer> updateSubscribedTopicNames(
+        ConsumerGroupMember oldMember,
+        ConsumerGroupMember newMember
+    ) {
+        Map<String, Integer> subscribedTopicCount = new 
HashMap<>(this.subscribedTopicNames);
+        if (oldMember != null) {
+            oldMember.subscribedTopicNames().forEach(topicName ->
+                subscribedTopicCount.compute(topicName, 
ConsumerGroup::decValue)
+            );
+        }
+
+        if (newMember != null) {
+            newMember.subscribedTopicNames().forEach(topicName ->
+                subscribedTopicCount.compute(topicName, 
ConsumerGroup::incValue)
+            );
+        }
+
+        return subscribedTopicCount;
+    }
+
+    /**
+     * Compute the subscription type of the consumer group.
+     *
+     * If all members are subscribed to the same set of topics, the type is 
homogeneous.
+     * Otherwise, it is heterogeneous.
+     *
+     * @param subscribedTopicNames      A map of topic names to the count of 
members subscribed to each topic.
+     * @param numOfMembers              The total number of members in the 
group.
+     * @param subscriptionType          The current subscription type of the 
group.
+     * @return {@link SubscriptionType#HOMOGENEOUS} if all members are 
subscribed to exactly the same topics;
+     *         otherwise, {@link SubscriptionType#HETEROGENEOUS}.
+     */
+    public static SubscriptionType subscriptionType(
+        Map<String, Integer> subscribedTopicNames,
+        int numOfMembers,
+        SubscriptionType subscriptionType
+    ) {
+        if (subscribedTopicNames.isEmpty()) return subscriptionType;

Review Comment:
   I don't get this one. If the subscriptions are empty, isn't the group 
HOMOGENEOUS by definition?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1350,6 +1351,11 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
                         .withMembers(group.members())
                         .withStaticMembers(group.staticMembers())
                         .withSubscriptionMetadata(subscriptionMetadata)
+                        .withSubscriptionType(ConsumerGroup.subscriptionType(
+                            subscribedTopicsMemberCount,
+                            group.numMembers(),

Review Comment:
   An alternative may be to compare the counts within the Map without 
considering the group size. This could work because we do not accept empty 
subscriptions from a member.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1307,13 +1307,14 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
             }
         }
 
+        Map<String, Integer> subscribedTopicsMemberCount = new HashMap<>();
         if (bumpGroupEpoch || group.hasMetadataExpired(currentTimeMs)) {
+            subscribedTopicsMemberCount = 
group.updateSubscribedTopicNames(member, updatedMember);
             // The subscription metadata is updated in two cases:
             // 1) The member has updated its subscriptions;
             // 2) The refresh deadline has been reached.

Review Comment:
   nit: Let's keep these comments at the top of the if branch.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -948,7 +964,7 @@ private void maybeUpdateSubscribedTopicNames(
      * @param oldMember             The old member.
      * @param newMember             The new member.
      */
-    private static void maybeUpdateSubscribedTopicNames(
+    public static void maybeUpdateSubscribedTopicNames(

Review Comment:
   nit: Could we keep it private?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to