showuon commented on a change in pull request #10509:
URL: https://github.com/apache/kafka/pull/10509#discussion_r610325577
##########
File path:
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -163,117 +160,95 @@ private boolean allSubscriptionsEqual(Set<String>
allTopics,
*/
private Map<String, List<TopicPartition>> constrainedAssign(Map<String,
Integer> partitionsPerTopic,
Map<String,
List<TopicPartition>> consumerToOwnedPartitions) {
+ log.debug(String.format("performing constrained assign.
partitionsPerTopic: %s, consumerToOwnedPartitions: %s",
+ partitionsPerTopic, consumerToOwnedPartitions));
+
SortedSet<TopicPartition> unassignedPartitions =
getTopicPartitions(partitionsPerTopic);
Set<TopicPartition> allRevokedPartitions = new HashSet<>();
- // Each consumer should end up in exactly one of the below
- // the consumers not yet at capacity
+ // the consumers not yet at expected capacity
List<String> unfilledMembers = new LinkedList<>();
- // the members with exactly maxQuota partitions assigned
- Queue<String> maxCapacityMembers = new LinkedList<>();
- // the members with exactly minQuota partitions assigned
- Queue<String> minCapacityMembers = new LinkedList<>();
Review comment:
We don't need to keep the `maxCapacityMembers`/`minCapacityMembers`
anymore because we can precisely know how many members can have max capacity
now, by this
```
int numExpectedMaxCapacityMembers = unassignedPartitions.size() %
numberOfConsumers;
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]