ableegoldman commented on a change in pull request #10509:
URL: https://github.com/apache/kafka/pull/10509#discussion_r627014485



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -149,141 +149,210 @@ private boolean allSubscriptionsEqual(Set<String> 
allTopics,
      * This constrainedAssign optimizes the assignment algorithm when all 
consumers were subscribed to same set of topics.
      * The method includes the following steps:
      *
-     * 1. Reassign as many previously owned partitions as possible, up to the 
maxQuota
-     * 2. Fill remaining members up to minQuota
-     * 3. If we ran out of unassigned partitions before filling all consumers, 
we need to start stealing partitions
-     *    from the over-full consumers at max capacity
-     * 4. Otherwise we may have run out of unfilled consumers before assigning 
all partitions, in which case we
-     *    should just distribute one partition each to all consumers at min 
capacity
+     * 1. Reassign previously owned partitions:
+     *   a. if owned less than minQuota partitions, just assign all owned 
partitions, and put the member into unfilled member list
+     *   b. if owned maxQuota or more, and we're still under the number of 
expected max capacity members, assign maxQuota partitions
+     *   c. if owned at least "minQuota" of partitions, assign minQuota 
partitions, and put the member into unfilled member list if
+     *     we're still under the number of expected max capacity members
+     * 2. Fill remaining members up to the expected numbers of maxQuota 
partitions, otherwise, to minQuota partitions
      *
      * @param partitionsPerTopic          The number of partitions for each 
subscribed topic
      * @param consumerToOwnedPartitions   Each consumer's previously owned and 
still-subscribed partitions
      *
-     * @return Map from each member to the list of partitions assigned to them.
+     * @return                            Map from each member to the list of 
partitions assigned to them.
      */
     private Map<String, List<TopicPartition>> constrainedAssign(Map<String, 
Integer> partitionsPerTopic,
                                                                 Map<String, 
List<TopicPartition>> consumerToOwnedPartitions) {
-        SortedSet<TopicPartition> unassignedPartitions = 
getTopicPartitions(partitionsPerTopic);
+        if (log.isDebugEnabled()) {
+            log.debug("performing constrained assign. partitionsPerTopic: {}, 
consumerToOwnedPartitions: {}",
+                partitionsPerTopic, consumerToOwnedPartitions);
+        }
 
         Set<TopicPartition> allRevokedPartitions = new HashSet<>();
 
-        // Each consumer should end up in exactly one of the below
-        // the consumers not yet at capacity
+        // the consumers not yet at expected capacity
         List<String> unfilledMembers = new LinkedList<>();
-        // the members with exactly maxQuota partitions assigned
-        Queue<String> maxCapacityMembers = new LinkedList<>();
-        // the members with exactly minQuota partitions assigned
-        Queue<String> minCapacityMembers = new LinkedList<>();
 
         int numberOfConsumers = consumerToOwnedPartitions.size();
-        int minQuota = (int) Math.floor(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
-        int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
+        int totalPartitionsCount = 
partitionsPerTopic.values().stream().reduce(0, Integer::sum);
+
+        int minQuota = (int) Math.floor(((double) totalPartitionsCount) / 
numberOfConsumers);
+        int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / 
numberOfConsumers);
+        // the expected number of members with maxQuota assignment
+        int expectedNumMembersHavingMorePartitions = totalPartitionsCount % 
numberOfConsumers;
+        // the number of members with exactly maxQuota partitions assigned
+        int numMembersHavingMorePartitions = 0;
 
-        // initialize the assignment map with an empty array of size minQuota 
for all members
+        // initialize the assignment map with an empty array of size maxQuota 
for all members
         Map<String, List<TopicPartition>> assignment = new HashMap<>(
-            
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(minQuota))));
+            
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(maxQuota))));
 
+        List<TopicPartition> assignedPartitions = new ArrayList<>();
         // Reassign as many previously owned partitions as possible
         for (Map.Entry<String, List<TopicPartition>> consumerEntry : 
consumerToOwnedPartitions.entrySet()) {
             String consumer = consumerEntry.getKey();
             List<TopicPartition> ownedPartitions = consumerEntry.getValue();
 
             List<TopicPartition> consumerAssignment = assignment.get(consumer);
-            int i = 0;
-            // assign the first N partitions up to the max quota, and mark the 
remaining as being revoked
-            for (TopicPartition tp : ownedPartitions) {
-                if (i < maxQuota) {
-                    consumerAssignment.add(tp);
-                    unassignedPartitions.remove(tp);
-                } else {
-                    allRevokedPartitions.add(tp);
-                }
-                ++i;
-            }
 
             if (ownedPartitions.size() < minQuota) {
+                // the expected assignment size is more than consumer have 
now, so keep all the owned partitions
+                // and put this member into unfilled member list
+                if (ownedPartitions.size() > 0) {
+                    consumerAssignment.addAll(ownedPartitions);
+                    assignedPartitions.addAll(ownedPartitions);
+                }
                 unfilledMembers.add(consumer);
+            } else if (ownedPartitions.size() >= maxQuota && 
numMembersHavingMorePartitions < expectedNumMembersHavingMorePartitions) {
+                // consumer owned the "maxQuota" of partitions or more, and 
we're still under the number of expected max capacity members
+                // so keep "maxQuota" of the owned partitions, and revoke the 
rest of the partitions
+                numMembersHavingMorePartitions++;
+                List<TopicPartition> maxQuotaPartitions = 
ownedPartitions.subList(0, maxQuota);
+                consumerAssignment.addAll(maxQuotaPartitions);
+                assignedPartitions.addAll(maxQuotaPartitions);
+                allRevokedPartitions.addAll(ownedPartitions.subList(maxQuota, 
ownedPartitions.size()));
             } else {
-                // It's possible for a consumer to be at both min and max 
capacity if minQuota == maxQuota
-                if (consumerAssignment.size() == minQuota)
-                    minCapacityMembers.add(consumer);
-                if (consumerAssignment.size() == maxQuota)
-                    maxCapacityMembers.add(consumer);
+                // consumer owned at least "minQuota" of partitions
+                // so keep "minQuota" of the owned partitions, and revoke the 
rest of the partitions
+                List<TopicPartition> minQuotaPartitions = 
ownedPartitions.subList(0, minQuota);
+                consumerAssignment.addAll(minQuotaPartitions);
+                assignedPartitions.addAll(minQuotaPartitions);
+                allRevokedPartitions.addAll(ownedPartitions.subList(minQuota, 
ownedPartitions.size()));
+                // this consumer is potential maxQuota candidate since we're 
still under the number of expected max capacity members

Review comment:
       nit (again, please address this in the other PR so I can merge this 
one): as Guozhang pointed out in another comment, in the case minQuota == 
maxQuota, this comment is a bit misleading as the number of expected max 
capacity members is technically all of them, but the variable 
`expectedNumMembersHavingMorePartitions` refers to the number of members who 
have more than the minQuota number of partitions, which in that case would 
actually be zero.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to