rajinisivaram commented on code in PR #13350:
URL: https://github.com/apache/kafka/pull/13350#discussion_r1154530618


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java:
##########
@@ -574,6 +697,44 @@ private void assignOwnedPartitions() {
             }
         }
 
+        // Round-Robin filling within racks for remaining members up to the 
expected numbers of maxQuota,
+        // otherwise, to minQuota
+        private void assignRackAwareRoundRobin(List<TopicPartition> 
unassignedPartitions) {
+            if (rackInfo.consumerRacks.isEmpty())
+                return;
+            int nextUnfilledConsumerIndex = 0;
+            Iterator<TopicPartition> unassignedIter = 
unassignedPartitions.iterator();
+            while (unassignedIter.hasNext()) {
+                TopicPartition unassignedPartition = unassignedIter.next();
+                String consumer = null;
+                int nextIndex = rackInfo.nextRackConsumer(unassignedPartition, 
unfilledMembersWithUnderMinQuotaPartitions, nextUnfilledConsumerIndex);
+                if (nextIndex >= 0) {
+                    consumer = 
unfilledMembersWithUnderMinQuotaPartitions.get(nextIndex);
+                    int assignmentCount = assignment.get(consumer).size() + 1;
+                    if (assignmentCount >= minQuota) {
+                        
unfilledMembersWithUnderMinQuotaPartitions.remove(consumer);
+                        if (assignmentCount < maxQuota)
+                            
unfilledMembersWithExactlyMinQuotaPartitions.add(consumer);
+                    } else {
+                        nextIndex++;
+                    }
+                    nextUnfilledConsumerIndex = 
unfilledMembersWithUnderMinQuotaPartitions.isEmpty() ? 0 : nextIndex % 
unfilledMembersWithUnderMinQuotaPartitions.size();
+                } else if 
(!unfilledMembersWithExactlyMinQuotaPartitions.isEmpty()) {
+                    int firstIndex = 
rackInfo.nextRackConsumer(unassignedPartition, 
unfilledMembersWithExactlyMinQuotaPartitions, 0);
+                    if (firstIndex >= 0) {
+                        consumer = 
unfilledMembersWithExactlyMinQuotaPartitions.get(firstIndex);
+                        if (assignment.get(consumer).size() + 1 == maxQuota)
+                            
unfilledMembersWithExactlyMinQuotaPartitions.remove(firstIndex);
+                    }
+                }
+                if (consumer == null)
+                    continue;

Review Comment:
    Updated.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java:
##########
@@ -835,7 +1004,7 @@ private List<TopicPartition> assignOwnedPartitions() {
                             // if this topic partition of this consumer no 
longer exists, remove it from currentAssignment of the consumer
                             partitionIter.remove();
                             currentPartitionConsumer.remove(partition);
-                        } else if 
(!consumerSubscription.topics().contains(partition.topic())) {
+                        } else if 
(!consumerSubscription.topics().contains(partition.topic()) || 
rackInfo.racksMismatch(consumer, partition)) {

Review Comment:
   Updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to