showuon commented on a change in pull request #10985: URL: https://github.com/apache/kafka/pull/10985#discussion_r665772469
########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ########## @@ -238,32 +272,50 @@ private boolean allSubscriptionsEqual(Set<String> allTopics, Iterator<String> unfilledConsumerIter = unfilledMembers.iterator(); // Round-Robin filling remaining members up to the expected numbers of maxQuota, otherwise, to minQuota for (TopicPartition unassignedPartition : unassignedPartitions) { - if (!unfilledConsumerIter.hasNext()) { - if (unfilledMembers.isEmpty()) { - // Should not enter here since we have calculated the exact number to assign to each consumer - // There might be issues in the assigning algorithm, or maybe assigning the same partition to two owners. + String consumer; + if (unfilledConsumerIter.hasNext()) { + consumer = unfilledConsumerIter.next(); + } else { + if (unfilledMembers.isEmpty() && potentiallyUnfilledMembersAtMinQuota.isEmpty()) { + // Should not enter here since we have calculated the exact number to assign to each consumer. + // This indicates issues in the assignment algorithm int currentPartitionIndex = unassignedPartitions.indexOf(unassignedPartition); log.error("No more unfilled consumers to be assigned. The remaining unassigned partitions are: {}", - unassignedPartitions.subList(currentPartitionIndex, unassignedPartitions.size())); + unassignedPartitions.subList(currentPartitionIndex, unassignedPartitions.size())); throw new IllegalStateException("No more unfilled consumers to be assigned."); + } else if (unfilledMembers.isEmpty()) { + consumer = potentiallyUnfilledMembersAtMinQuota.poll(); + } else { + unfilledConsumerIter = unfilledMembers.iterator(); + consumer = unfilledConsumerIter.next(); } - unfilledConsumerIter = unfilledMembers.iterator(); } - String consumer = unfilledConsumerIter.next(); + List<TopicPartition> consumerAssignment = assignment.get(consumer); consumerAssignment.add(unassignedPartition); // We already assigned all possible ownedPartitions, so we know this must be newly assigned to this consumer - if (allRevokedPartitions.contains(unassignedPartition)) + // or else the partition was actually claimed by multiple previous owners and had to be invalidated from all + // members claimed ownedPartitions + if (allRevokedPartitions.contains(unassignedPartition) || partitionsWithMultiplePreviousOwners.contains(unassignedPartition)) partitionsTransferringOwnership.put(unassignedPartition, consumer); Review comment: I'm thinking we can have a test for package scope `partitionsTransferringOwnership` in `AbstractStickyAssignorTest`. I found we didn't test it before. We can verify the doubly assigned partitions and other revoked partitions are put into `partitionsTransferringOwnership` correctly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org