ableegoldman commented on a change in pull request #10985:
URL: https://github.com/apache/kafka/pull/10985#discussion_r668410517



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -238,32 +272,50 @@ private boolean allSubscriptionsEqual(Set<String> 
allTopics,
         Iterator<String> unfilledConsumerIter = unfilledMembers.iterator();
         // Round-Robin filling remaining members up to the expected numbers of 
maxQuota, otherwise, to minQuota
         for (TopicPartition unassignedPartition : unassignedPartitions) {
-            if (!unfilledConsumerIter.hasNext()) {
-                if (unfilledMembers.isEmpty()) {
-                    // Should not enter here since we have calculated the 
exact number to assign to each consumer
-                    // There might be issues in the assigning algorithm, or 
maybe assigning the same partition to two owners.
+            String consumer;
+            if (unfilledConsumerIter.hasNext()) {
+                consumer = unfilledConsumerIter.next();
+            } else {
+                if (unfilledMembers.isEmpty() && 
potentiallyUnfilledMembersAtMinQuota.isEmpty()) {
+                    // Should not enter here since we have calculated the 
exact number to assign to each consumer.
+                    // This indicates issues in the assignment algorithm
                     int currentPartitionIndex = 
unassignedPartitions.indexOf(unassignedPartition);
                     log.error("No more unfilled consumers to be assigned. The 
remaining unassigned partitions are: {}",
-                        unassignedPartitions.subList(currentPartitionIndex, 
unassignedPartitions.size()));
+                              
unassignedPartitions.subList(currentPartitionIndex, 
unassignedPartitions.size()));
                     throw new IllegalStateException("No more unfilled 
consumers to be assigned.");
+                } else if (unfilledMembers.isEmpty()) {
+                    consumer = potentiallyUnfilledMembersAtMinQuota.poll();
+                } else {
+                    unfilledConsumerIter = unfilledMembers.iterator();
+                    consumer = unfilledConsumerIter.next();
                 }
-                unfilledConsumerIter = unfilledMembers.iterator();
             }
-            String consumer = unfilledConsumerIter.next();
+
             List<TopicPartition> consumerAssignment = assignment.get(consumer);
             consumerAssignment.add(unassignedPartition);
 
             // We already assigned all possible ownedPartitions, so we know 
this must be newly assigned to this consumer
-            if (allRevokedPartitions.contains(unassignedPartition))
+            // or else the partition was actually claimed by multiple previous 
owners and had to be invalidated from all
+            // members claimed ownedPartitions
+            if (allRevokedPartitions.contains(unassignedPartition) || 
partitionsWithMultiplePreviousOwners.contains(unassignedPartition))
                 partitionsTransferringOwnership.put(unassignedPartition, 
consumer);
 
             int currentAssignedCount = consumerAssignment.size();
-            int expectedAssignedCount = numMembersAssignedOverMinQuota < 
expectedNumMembersAssignedOverMinQuota ? maxQuota : minQuota;
-            if (currentAssignedCount == expectedAssignedCount) {
-                if (currentAssignedCount == maxQuota) {
-                    numMembersAssignedOverMinQuota++;
-                }
+            if (currentAssignedCount == minQuota) {
                 unfilledConsumerIter.remove();
+                potentiallyUnfilledMembersAtMinQuota.add(consumer);
+            } else if (currentAssignedCount == maxQuota) {
+                numMembersAssignedOverMinQuota++;
+                if (numMembersAssignedOverMinQuota == 
expectedNumMembersAssignedOverMinQuota) {
+                    // We only start to iterate over the "potentially 
unfilled" members at minQuota after we've filled
+                    // all members up to at least minQuota, so once the last 
minQuota member reaches maxQuota, we
+                    // should be done. But in case of some algorithmic error, 
just log a warning and continue to
+                    // assign any remaining partitions within the assignment 
constraints
+                    if (unassignedPartitions.indexOf(unassignedPartition) != 
unassignedPartitions.size() - 1) {
+                        log.warn("Filled the last member up to maxQuota but 
still had partitions remaining to assign, "

Review comment:
       I responded to the above comment as well, but specifically here I think 
that to just check on that condition requires us to make assumptions about the 
algorithm's correctness up to this point (and the correctness of its 
assumptions). But if those are all correct then we would never reach this to 
begin with, so it's better to directly look for any remaining 
`unassignedPartitions` -- it's a sanity check. 
   
   But ack on bumping to ERROR




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to