showuon commented on a change in pull request #10552:
URL: https://github.com/apache/kafka/pull/10552#discussion_r637939950



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -384,37 +326,39 @@ private boolean allSubscriptionsEqual(Set<String> 
allTopics,
      *
      * @param partitionsPerTopic         The number of partitions for each 
subscribed topic.
      * @param subscriptions              Map from the member id to their 
respective topic subscription
+     * @param currentAssignment          Each consumer's previously owned and 
still-subscribed partitions
      *
      * @return                           Map from each member to the list of 
partitions assigned to them.
      */
     private Map<String, List<TopicPartition>> generalAssign(Map<String, 
Integer> partitionsPerTopic,
-                                                            Map<String, 
Subscription> subscriptions) {
-        Map<String, List<TopicPartition>> currentAssignment = new HashMap<>();
+                                                            Map<String, 
Subscription> subscriptions,
+                                                            Map<String, 
List<TopicPartition>> currentAssignment) {
+        if (log.isDebugEnabled()) {
+            log.debug("performing general assign. partitionsPerTopic: {}, 
subscriptions: {}, currentAssignment: {}",
+                partitionsPerTopic, subscriptions, currentAssignment);
+        }
+
         Map<TopicPartition, ConsumerGenerationPair> prevAssignment = new 
HashMap<>();
         partitionMovements = new PartitionMovements();
 
-        prepopulateCurrentAssignments(subscriptions, currentAssignment, 
prevAssignment);
+        prepopulateCurrentAssignments(subscriptions, prevAssignment);
 
-        // a mapping of all topic partitions to all consumers that can be 
assigned to them
-        final Map<TopicPartition, List<String>> 
partition2AllPotentialConsumers = new HashMap<>();
-        // a mapping of all consumers to all potential topic partitions that 
can be assigned to them
-        final Map<String, List<TopicPartition>> 
consumer2AllPotentialPartitions = new HashMap<>();
+        // a mapping of all topics to all consumers that can be assigned to 
them
+        final Map<String, List<String>> topic2AllPotentialConsumers = new 
HashMap<>(partitionsPerTopic.keySet().size());
+        // a mapping of all consumers to all potential topics that can be 
assigned to them
+        final Map<String, List<String>> consumer2AllPotentialTopics = new 
HashMap<>(subscriptions.keySet().size());
 
-        // initialize partition2AllPotentialConsumers and 
consumer2AllPotentialPartitions in the following two for loops
-        for (Entry<String, Integer> entry: partitionsPerTopic.entrySet()) {
-            for (int i = 0; i < entry.getValue(); ++i)
-                partition2AllPotentialConsumers.put(new 
TopicPartition(entry.getKey(), i), new ArrayList<>());
-        }
+        // initialize topic2AllPotentialConsumers and 
consumer2AllPotentialTopics
+        partitionsPerTopic.keySet().stream().forEach(
+            topicName -> topic2AllPotentialConsumers.put(topicName, new 
ArrayList<>()));
 
         for (Entry<String, Subscription> entry: subscriptions.entrySet()) {
             String consumerId = entry.getKey();
-            consumer2AllPotentialPartitions.put(consumerId, new ArrayList<>());
+            List<String> subscribedTopics = new 
ArrayList<>(entry.getValue().topics().size());
+            consumer2AllPotentialTopics.put(consumerId, subscribedTopics);
             entry.getValue().topics().stream().filter(topic -> 
partitionsPerTopic.get(topic) != null).forEach(topic -> {
-                for (int i = 0; i < partitionsPerTopic.get(topic); ++i) {
-                    TopicPartition topicPartition = new TopicPartition(topic, 
i);
-                    
consumer2AllPotentialPartitions.get(consumerId).add(topicPartition);
-                    
partition2AllPotentialConsumers.get(topicPartition).add(consumerId);
-                }
+                subscribedTopics.add(topic);

Review comment:
       No, it just create a List with the capacity: `topics().size()`. We 
cannot just create a List with all topics directly, because we need to filter 
out topics not in `partitionsPerTopic`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to