dajac commented on code in PR #15785:
URL: https://github.com/apache/kafka/pull/15785#discussion_r1579102085


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/ConsumerGroupSubscriptionModel.java:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+/**
+ * The subscription model followed by the consumer group.
+ *
+ * A homogeneous subscription model means that all the members
+ * of the group are subscribed to the same set of topics.
+ *
+ * The model is heterogeneous otherwise.
+ */
+public enum ConsumerGroupSubscriptionModel {

Review Comment:
   nit: Should we call it `SubscriptionType`?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##########
@@ -90,17 +91,29 @@ private Map<Uuid, List<String>> membersPerTopic(final 
AssignmentSpec assignmentS
         Map<Uuid, List<String>> membersPerTopic = new HashMap<>();
         Map<String, AssignmentMemberSpec> membersData = 
assignmentSpec.members();
 
-        membersData.forEach((memberId, memberMetadata) -> {
-            Collection<Uuid> topics = memberMetadata.subscribedTopicIds();
+        if (assignmentSpec.groupSubscriptionModel().equals(HOMOGENEOUS)) {
+            List<String> allMembers = new ArrayList<>(membersData.keySet());

Review Comment:
   I wonder if we could change the return type of the method from `Map<Uuid, 
List<String>>` to `Map<Uuid, Collection<String>>` and avoid this copy here. It 
seems possible because we only iterate over the member ids later on. This could 
be a nice performance improvement too while we are here.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/ConsumerGroupSubscriptionModel.java:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+/**
+ * The subscription model followed by the consumer group.
+ *
+ * A homogeneous subscription model means that all the members
+ * of the group are subscribed to the same set of topics.
+ *
+ * The model is heterogeneous otherwise.
+ */
+public enum ConsumerGroupSubscriptionModel {
+    HOMOGENEOUS("Homogeneous"),
+    HETEROGENEOUS("Heterogeneous");
+    private final String name;

Review Comment:
   nit: Let's add an empty line before this one.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -620,9 +637,9 @@ public Map<String, TopicMetadata> 
computeSubscriptionMetadata(
         TopicsImage topicsImage,
         ClusterImage clusterImage
     ) {
-        // Copy and update the current subscriptions.
+        // Copy and update the current subscription information.
         Map<String, Integer> subscribedTopicNames = new 
HashMap<>(this.subscribedTopicNames);
-        maybeUpdateSubscribedTopicNames(subscribedTopicNames, oldMember, 
newMember);
+        
maybeUpdateSubscribedTopicNamesAndGroupSubscriptionModel(subscribedTopicNames, 
oldMember, newMember);

Review Comment:
   Hum... We need to be careful here because  we are not suppose to update the 
internal state of the group on this code path. It may be better to keep it as 
it was before and to have another method to update the subscription type.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/ConsumerGroupSubscriptionModel.java:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+/**
+ * The subscription model followed by the consumer group.
+ *
+ * A homogeneous subscription model means that all the members
+ * of the group are subscribed to the same set of topics.
+ *
+ * The model is heterogeneous otherwise.
+ */
+public enum ConsumerGroupSubscriptionModel {
+    HOMOGENEOUS("Homogeneous"),
+    HETEROGENEOUS("Heterogeneous");

Review Comment:
   nit: Could we add some javadoc to both of them? I would actually move the 
description about the two types from the top level javadoc to here.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -148,6 +151,12 @@ public static class DeadlineAndEpoch {
      */
     private final TimelineHashMap<String, TopicMetadata> 
subscribedTopicMetadata;
 
+    /**
+     * The consumer group's subscription model.
+     * This value is set to Homogeneous by default.
+     */
+    private final TimelineObject<ConsumerGroupSubscriptionModel> 
groupSubscriptionModel;

Review Comment:
   nit: I think that we can drop the `group` prefix here. Based on my previous 
commit, we could also use `subscriptionType` here.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -952,6 +971,31 @@ private static void maybeUpdateSubscribedTopicNames(
                 subscribedTopicCount.compute(topicName, 
ConsumerGroup::incValue)
             );
         }
+
+        maybeUpdateGroupSubscriptionModel();
+    }
+
+    /**
+     * Updates the subscription model type, iff necessary.
+     *
+     * If all members are subscribed to the same set of topics, the model is 
homogeneous.
+     * Otherwise, it is heterogeneous.
+     */
+    private void maybeUpdateGroupSubscriptionModel() {
+        int numOfMembers = members.size();
+        boolean isSubscriptionHomogeneous = true;
+        for (Map.Entry<String, Integer> entry : 
subscribedTopicNames.entrySet()) {
+            if (entry.getValue() != numOfMembers) {
+                isSubscriptionHomogeneous = false;
+                break;

Review Comment:
   nit: We don't need this variable in my opinion. How about the following?
   
   ```
   int numOfMembers = members.size();
   for (Map.Entry<String, Integer> entry : subscribedTopicNames.entrySet()) {
       if (entry.getValue() != numOfMembers) {
           groupSubscriptionModel.set(HETEROGENEOUS);
           return;
       }
   }
   groupSubscriptionModel.set(HOMOGENEOUS);
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -487,6 +497,13 @@ public boolean isSubscribedToTopic(String topic) {
         return subscribedTopicNames.containsKey(topic);
     }
 
+    /**
+     * @return The group's subscription model.
+     */
+    public ConsumerGroupSubscriptionModel groupSubscriptionModel() {

Review Comment:
   ditto about the naming.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java:
##########
@@ -687,7 +688,7 @@ public void 
testReassignmentWhenMultipleSubscriptionsRemovedAfterInitialAssignme
             currentAssignmentForC
         ));
 
-        AssignmentSpec assignmentSpec = new AssignmentSpec(members);
+        AssignmentSpec assignmentSpec = new AssignmentSpec(members, 
HETEROGENEOUS);

Review Comment:
   Is this change enough to cover the `HETEROGENEOUS` path?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilderTest.java:
##########
@@ -49,6 +50,7 @@ public class GeneralUniformAssignmentBuilderTest {
     private final String memberA = "A";
     private final String memberB = "B";
     private final String memberC = "C";
+    private final ConsumerGroupSubscriptionModel groupSubscriptionModel = 
HETEROGENEOUS;

Review Comment:
   nit: As we use the same value everywhere, I would directly inline 
`HETEROGENEOUS`.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java:
##########
@@ -51,6 +52,7 @@ public class OptimizedUniformAssignmentBuilderTest {
     private final String memberA = "A";
     private final String memberB = "B";
     private final String memberC = "C";
+    private final ConsumerGroupSubscriptionModel groupSubscriptionModel = 
HOMOGENEOUS;

Review Comment:
   ditto.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to