rajinisivaram commented on code in PR #13350: URL: https://github.com/apache/kafka/pull/13350#discussion_r1154530618
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java: ########## @@ -574,6 +697,44 @@ private void assignOwnedPartitions() { } } + // Round-Robin filling within racks for remaining members up to the expected numbers of maxQuota, + // otherwise, to minQuota + private void assignRackAwareRoundRobin(List<TopicPartition> unassignedPartitions) { + if (rackInfo.consumerRacks.isEmpty()) + return; + int nextUnfilledConsumerIndex = 0; + Iterator<TopicPartition> unassignedIter = unassignedPartitions.iterator(); + while (unassignedIter.hasNext()) { + TopicPartition unassignedPartition = unassignedIter.next(); + String consumer = null; + int nextIndex = rackInfo.nextRackConsumer(unassignedPartition, unfilledMembersWithUnderMinQuotaPartitions, nextUnfilledConsumerIndex); + if (nextIndex >= 0) { + consumer = unfilledMembersWithUnderMinQuotaPartitions.get(nextIndex); + int assignmentCount = assignment.get(consumer).size() + 1; + if (assignmentCount >= minQuota) { + unfilledMembersWithUnderMinQuotaPartitions.remove(consumer); + if (assignmentCount < maxQuota) + unfilledMembersWithExactlyMinQuotaPartitions.add(consumer); + } else { + nextIndex++; + } + nextUnfilledConsumerIndex = unfilledMembersWithUnderMinQuotaPartitions.isEmpty() ? 0 : nextIndex % unfilledMembersWithUnderMinQuotaPartitions.size(); + } else if (!unfilledMembersWithExactlyMinQuotaPartitions.isEmpty()) { + int firstIndex = rackInfo.nextRackConsumer(unassignedPartition, unfilledMembersWithExactlyMinQuotaPartitions, 0); + if (firstIndex >= 0) { + consumer = unfilledMembersWithExactlyMinQuotaPartitions.get(firstIndex); + if (assignment.get(consumer).size() + 1 == maxQuota) + unfilledMembersWithExactlyMinQuotaPartitions.remove(firstIndex); + } + } + if (consumer == null) + continue; Review Comment: Updated. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java: ########## @@ -835,7 +1004,7 @@ private List<TopicPartition> assignOwnedPartitions() { // if this topic partition of this consumer no longer exists, remove it from currentAssignment of the consumer partitionIter.remove(); currentPartitionConsumer.remove(partition); - } else if (!consumerSubscription.topics().contains(partition.topic())) { + } else if (!consumerSubscription.topics().contains(partition.topic()) || rackInfo.racksMismatch(consumer, partition)) { Review Comment: Updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org