showuon commented on a change in pull request #10552: URL: https://github.com/apache/kafka/pull/10552#discussion_r637939950
########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ########## @@ -384,37 +326,39 @@ private boolean allSubscriptionsEqual(Set<String> allTopics, * * @param partitionsPerTopic The number of partitions for each subscribed topic. * @param subscriptions Map from the member id to their respective topic subscription + * @param currentAssignment Each consumer's previously owned and still-subscribed partitions * * @return Map from each member to the list of partitions assigned to them. */ private Map<String, List<TopicPartition>> generalAssign(Map<String, Integer> partitionsPerTopic, - Map<String, Subscription> subscriptions) { - Map<String, List<TopicPartition>> currentAssignment = new HashMap<>(); + Map<String, Subscription> subscriptions, + Map<String, List<TopicPartition>> currentAssignment) { + if (log.isDebugEnabled()) { + log.debug("performing general assign. partitionsPerTopic: {}, subscriptions: {}, currentAssignment: {}", + partitionsPerTopic, subscriptions, currentAssignment); + } + Map<TopicPartition, ConsumerGenerationPair> prevAssignment = new HashMap<>(); partitionMovements = new PartitionMovements(); - prepopulateCurrentAssignments(subscriptions, currentAssignment, prevAssignment); + prepopulateCurrentAssignments(subscriptions, prevAssignment); - // a mapping of all topic partitions to all consumers that can be assigned to them - final Map<TopicPartition, List<String>> partition2AllPotentialConsumers = new HashMap<>(); - // a mapping of all consumers to all potential topic partitions that can be assigned to them - final Map<String, List<TopicPartition>> consumer2AllPotentialPartitions = new HashMap<>(); + // a mapping of all topics to all consumers that can be assigned to them + final Map<String, List<String>> topic2AllPotentialConsumers = new HashMap<>(partitionsPerTopic.keySet().size()); + // a mapping of all consumers to all potential topics that can be assigned to them + final Map<String, List<String>> consumer2AllPotentialTopics = new HashMap<>(subscriptions.keySet().size()); - // initialize partition2AllPotentialConsumers and consumer2AllPotentialPartitions in the following two for loops - for (Entry<String, Integer> entry: partitionsPerTopic.entrySet()) { - for (int i = 0; i < entry.getValue(); ++i) - partition2AllPotentialConsumers.put(new TopicPartition(entry.getKey(), i), new ArrayList<>()); - } + // initialize topic2AllPotentialConsumers and consumer2AllPotentialTopics + partitionsPerTopic.keySet().stream().forEach( + topicName -> topic2AllPotentialConsumers.put(topicName, new ArrayList<>())); for (Entry<String, Subscription> entry: subscriptions.entrySet()) { String consumerId = entry.getKey(); - consumer2AllPotentialPartitions.put(consumerId, new ArrayList<>()); + List<String> subscribedTopics = new ArrayList<>(entry.getValue().topics().size()); + consumer2AllPotentialTopics.put(consumerId, subscribedTopics); entry.getValue().topics().stream().filter(topic -> partitionsPerTopic.get(topic) != null).forEach(topic -> { - for (int i = 0; i < partitionsPerTopic.get(topic); ++i) { - TopicPartition topicPartition = new TopicPartition(topic, i); - consumer2AllPotentialPartitions.get(consumerId).add(topicPartition); - partition2AllPotentialConsumers.get(topicPartition).add(consumerId); - } + subscribedTopics.add(topic); Review comment: No, it just create a List with the capacity: `topics().size()`. We cannot just create a List with all topics directly, because we need to filter out topics not in `partitionsPerTopic`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org