showuon commented on a change in pull request #10509:
URL: https://github.com/apache/kafka/pull/10509#discussion_r610325987



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -163,117 +160,95 @@ private boolean allSubscriptionsEqual(Set<String> 
allTopics,
      */
     private Map<String, List<TopicPartition>> constrainedAssign(Map<String, 
Integer> partitionsPerTopic,
                                                                 Map<String, 
List<TopicPartition>> consumerToOwnedPartitions) {
+        log.debug(String.format("performing constrained assign. 
partitionsPerTopic: %s, consumerToOwnedPartitions: %s",
+            partitionsPerTopic, consumerToOwnedPartitions));
+
         SortedSet<TopicPartition> unassignedPartitions = 
getTopicPartitions(partitionsPerTopic);
 
         Set<TopicPartition> allRevokedPartitions = new HashSet<>();
 
-        // Each consumer should end up in exactly one of the below
-        // the consumers not yet at capacity
+        // the consumers not yet at expected capacity
         List<String> unfilledMembers = new LinkedList<>();
-        // the members with exactly maxQuota partitions assigned
-        Queue<String> maxCapacityMembers = new LinkedList<>();
-        // the members with exactly minQuota partitions assigned
-        Queue<String> minCapacityMembers = new LinkedList<>();
 
         int numberOfConsumers = consumerToOwnedPartitions.size();
         int minQuota = (int) Math.floor(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
         int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
+        // the expected number of members with maxQuota assignment
+        int numExpectedMaxCapacityMembers = unassignedPartitions.size() % 
numberOfConsumers;
+        // the number of members with exactly maxQuota partitions assigned
+        int numMaxCapacityMembers = 0;
 
-        // initialize the assignment map with an empty array of size minQuota 
for all members
+        // initialize the assignment map with an empty array of size maxQuota 
for all members
         Map<String, List<TopicPartition>> assignment = new HashMap<>(
-            
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(minQuota))));
+            
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(maxQuota))));
 
         // Reassign as many previously owned partitions as possible
         for (Map.Entry<String, List<TopicPartition>> consumerEntry : 
consumerToOwnedPartitions.entrySet()) {
             String consumer = consumerEntry.getKey();
             List<TopicPartition> ownedPartitions = consumerEntry.getValue();
 
             List<TopicPartition> consumerAssignment = assignment.get(consumer);
-            int i = 0;
-            // assign the first N partitions up to the max quota, and mark the 
remaining as being revoked
-            for (TopicPartition tp : ownedPartitions) {
-                if (i < maxQuota) {
-                    consumerAssignment.add(tp);
-                    unassignedPartitions.remove(tp);
-                } else {
-                    allRevokedPartitions.add(tp);
-                }
-                ++i;
-            }
 
             if (ownedPartitions.size() < minQuota) {
+                // the expected assignment size is more than consumer have 
now, so keep all the owned partitions
+                // and put this member into unfilled member list
+                consumerAssignment.addAll(ownedPartitions);
+                unassignedPartitions.removeAll(ownedPartitions);
                 unfilledMembers.add(consumer);
+            } else if (ownedPartitions.size() >= maxQuota && 
numMaxCapacityMembers++ <= numExpectedMaxCapacityMembers) {
+                // consumer owned the "maxQuota" of partitions or more, and we 
still under the number of expected max capacity members
+                // so keep "maxQuota" of the owned partitions, and revoke the 
rest of the partitions
+                consumerAssignment.addAll(ownedPartitions.subList(0, 
maxQuota));
+                unassignedPartitions.removeAll(ownedPartitions.subList(0, 
maxQuota));
+                allRevokedPartitions.addAll(ownedPartitions.subList(maxQuota, 
ownedPartitions.size()));
             } else {
-                // It's possible for a consumer to be at both min and max 
capacity if minQuota == maxQuota
-                if (consumerAssignment.size() == minQuota)
-                    minCapacityMembers.add(consumer);
-                if (consumerAssignment.size() == maxQuota)
-                    maxCapacityMembers.add(consumer);
+                // consumer owned the "minQuota" of partitions or more
+                // so keep "minQuota" of the owned partitions, and revoke the 
rest of the partitions
+                consumerAssignment.addAll(ownedPartitions.subList(0, 
minQuota));
+                unassignedPartitions.removeAll(ownedPartitions.subList(0, 
minQuota));
+                allRevokedPartitions.addAll(ownedPartitions.subList(minQuota, 
ownedPartitions.size()));
             }
         }
 
+        log.debug(String.format(
+            "After reassigning previously owned partitions, unfilled members: 
%s, unassigned partitions: %s, " +
+                "current assignment: %s", unfilledMembers, 
unassignedPartitions, assignment));
+
         Collections.sort(unfilledMembers);
         Iterator<TopicPartition> unassignedPartitionsIter = 
unassignedPartitions.iterator();
 
-        // Fill remaining members up to minQuota
+        // Fill remaining members up to the numExpectedMaxCapacityMembers 
numbers of maxQuota, otherwise, to minQuota
         while (!unfilledMembers.isEmpty() && !unassignedPartitions.isEmpty()) {
             Iterator<String> unfilledConsumerIter = unfilledMembers.iterator();
 
             while (unfilledConsumerIter.hasNext()) {
                 String consumer = unfilledConsumerIter.next();
                 List<TopicPartition> consumerAssignment = 
assignment.get(consumer);
-
-                if (unassignedPartitionsIter.hasNext()) {
-                    TopicPartition tp = unassignedPartitionsIter.next();
-                    consumerAssignment.add(tp);
-                    unassignedPartitionsIter.remove();
-                    // We already assigned all possible ownedPartitions, so we 
know this must be newly to this consumer
-                    if (allRevokedPartitions.contains(tp))
-                        partitionsTransferringOwnership.put(tp, consumer);
-                } else {
-                    break;
-                }
-
-                if (consumerAssignment.size() == minQuota) {
-                    minCapacityMembers.add(consumer);
-                    unfilledConsumerIter.remove();
-                }
-            }
-        }
-
-        // If we ran out of unassigned partitions before filling all 
consumers, we need to start stealing partitions
-        // from the over-full consumers at max capacity

Review comment:
       remove step 3 and the step 4 below




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to