dajac commented on code in PR #12914:
URL: https://github.com/apache/kafka/pull/12914#discussion_r1037881208


##########
clients/src/main/resources/common/message/ConsumerProtocolAssignment.json:
##########
@@ -23,6 +23,7 @@
   // that new versions cannot remove or reorder any of the existing fields.
   //
   // Version 2 is to support a new field "GenerationId" in 
ConsumerProtocolSubscription.
+  // Version 3 adds rack id to ConsumerProtocolSubscription.
   "validVersions": "0-2",

Review Comment:
   We need to bump the version here.



##########
clients/src/main/resources/common/message/ConsumerProtocolSubscription.json:
##########
@@ -24,6 +24,7 @@
 
   // Version 1 added the "OwnedPartitions" field to allow assigner know what 
partitions each member owned
   // Version 2 added a new field "GenerationId" to indicate if the member has 
out-of-date ownedPartitions.
+  // Version 3 adds rack id to enable rack-aware assignment.
   "validVersions": "0-2",

Review Comment:
   We need to bump the version here as well.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java:
##########
@@ -76,43 +88,145 @@ private Map<String, List<MemberInfo>> 
consumersPerTopic(Map<String, Subscription
         Map<String, List<MemberInfo>> topicToConsumers = new HashMap<>();
         for (Map.Entry<String, Subscription> subscriptionEntry : 
consumerMetadata.entrySet()) {
             String consumerId = subscriptionEntry.getKey();
-            MemberInfo memberInfo = new MemberInfo(consumerId, 
subscriptionEntry.getValue().groupInstanceId());
-            for (String topic : subscriptionEntry.getValue().topics()) {
+            Subscription subscription = subscriptionEntry.getValue();
+            MemberInfo memberInfo = new MemberInfo(consumerId, 
subscription.groupInstanceId(), subscription.rackId());
+            for (String topic : subscription.topics()) {
                 put(topicToConsumers, topic, memberInfo);
             }
         }
         return topicToConsumers;
     }
 
     @Override
-    public Map<String, List<TopicPartition>> assign(Map<String, Integer> 
partitionsPerTopic,
-                                                    Map<String, Subscription> 
subscriptions) {
+    public Map<String, List<TopicPartition>> assignPartitions(Map<String, 
List<PartitionInfo>> partitionsPerTopic,
+                                                              Map<String, 
Subscription> subscriptions) {
         Map<String, List<MemberInfo>> consumersPerTopic = 
consumersPerTopic(subscriptions);
+        Map<String, TopicAssignmentState> topicAssignmentStates = 
partitionsPerTopic.entrySet().stream()
+                .collect(Collectors.toMap(Map.Entry::getKey, e -> new 
TopicAssignmentState(e.getValue(), consumersPerTopic.get(e.getKey()))));
 
         Map<String, List<TopicPartition>> assignment = new HashMap<>();
         for (String memberId : subscriptions.keySet())
             assignment.put(memberId, new ArrayList<>());
 
-        for (Map.Entry<String, List<MemberInfo>> topicEntry : 
consumersPerTopic.entrySet()) {
+        for (Map.Entry<String, TopicAssignmentState> topicEntry : 
topicAssignmentStates.entrySet()) {

Review Comment:
   I have a general question here. One of the key promise of the range assignor 
is that it co-partitions partitions. If you have two topics (foo and bar) with 
three partitions each and two consumers, the first consumer will get foo-0, 
bar-0, foo-1, bar-1 and the second one will get foo-2, bar-2. I am trying to 
understand if we still maintain this property with the rack aware assignment. 
Do we?
   
   I suppose that we do when all the partitions have all the same racks. I am 
not sure about the case where for instance foo would have no rack or only a 
subset of the racks (e.g. brokers are offline).



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java:
##########
@@ -268,6 +304,47 @@ private Map<String, List<TopicPartition>> 
constrainedAssign(Map<String, Integer>
 
         Collections.sort(unfilledMembersWithUnderMinQuotaPartitions);
         Collections.sort(unfilledMembersWithExactlyMinQuotaPartitions);
+        unassignedPartitions = 
rackInfo.sortPartitionsByRackConsumers(unassignedPartitions);
+
+        // Round-Robin filling within racks for remaining members up to the 
expected numbers of maxQuota,
+        // otherwise, to minQuota
+        int nextUnfilledConsumerIndex = 0;
+        Iterator<TopicPartition> unassignedIter = 
unassignedPartitions.iterator();
+        while (!rackInfo.consumerRacks.isEmpty() && unassignedIter.hasNext()) {
+            TopicPartition unassignedPartition = unassignedIter.next();
+            String consumer = null;
+            int nextIndex = rackInfo.nextRackConsumer(unassignedPartition, 
unfilledMembersWithUnderMinQuotaPartitions, nextUnfilledConsumerIndex);
+            if (nextIndex >= 0) {
+                consumer = 
unfilledMembersWithUnderMinQuotaPartitions.get(nextIndex);
+                int assignmentCount = assignment.get(consumer).size() + 1;
+                if (assignmentCount >= minQuota) {
+                    
unfilledMembersWithUnderMinQuotaPartitions.remove(consumer);
+                    if (assignmentCount < maxQuota)
+                        
unfilledMembersWithExactlyMinQuotaPartitions.add(consumer);
+                } else
+                    nextIndex++;

Review Comment:
   nit: I would put curly braces for the `else` statement to be consistent with 
the `if` statement.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java:
##########
@@ -46,20 +53,20 @@ public AbstractStickyAssignor createAssignor() {
     }
 
     @Override
-    public Subscription buildSubscriptionV0(List<String> topics, 
List<TopicPartition> partitions, int generationId) {
+    public Subscription buildSubscriptionV0(List<String> topics, 
List<TopicPartition> partitions, int generationId, int consumerIndex) {
         // cooperative sticky assignor only supports 
ConsumerProtocolSubscription V1 or above
         return null;
     }
 
     @Override
-    public Subscription buildSubscriptionV1(List<String> topics, 
List<TopicPartition> partitions, int generationId) {
+    public Subscription buildSubscriptionV1(List<String> topics, 
List<TopicPartition> partitions, int generationId, int consumerIndex) {
         assignor.onAssignment(new 
ConsumerPartitionAssignor.Assignment(partitions), new 
ConsumerGroupMetadata(groupId, generationId, consumer1, Optional.empty()));
-        return new Subscription(topics, assignor.subscriptionUserData(new 
HashSet<>(topics)), partitions, DEFAULT_GENERATION);
+        return new Subscription(topics, assignor.subscriptionUserData(new 
HashSet<>(topics)), partitions, DEFAULT_GENERATION, 
consumerRackId(consumerIndex));

Review Comment:
   It seems that the rack should not be set here is this method simulate 
Subscription V1.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java:
##########
@@ -76,43 +88,145 @@ private Map<String, List<MemberInfo>> 
consumersPerTopic(Map<String, Subscription
         Map<String, List<MemberInfo>> topicToConsumers = new HashMap<>();
         for (Map.Entry<String, Subscription> subscriptionEntry : 
consumerMetadata.entrySet()) {
             String consumerId = subscriptionEntry.getKey();
-            MemberInfo memberInfo = new MemberInfo(consumerId, 
subscriptionEntry.getValue().groupInstanceId());
-            for (String topic : subscriptionEntry.getValue().topics()) {
+            Subscription subscription = subscriptionEntry.getValue();
+            MemberInfo memberInfo = new MemberInfo(consumerId, 
subscription.groupInstanceId(), subscription.rackId());
+            for (String topic : subscription.topics()) {
                 put(topicToConsumers, topic, memberInfo);
             }
         }
         return topicToConsumers;
     }
 
     @Override
-    public Map<String, List<TopicPartition>> assign(Map<String, Integer> 
partitionsPerTopic,
-                                                    Map<String, Subscription> 
subscriptions) {
+    public Map<String, List<TopicPartition>> assignPartitions(Map<String, 
List<PartitionInfo>> partitionsPerTopic,
+                                                              Map<String, 
Subscription> subscriptions) {
         Map<String, List<MemberInfo>> consumersPerTopic = 
consumersPerTopic(subscriptions);
+        Map<String, TopicAssignmentState> topicAssignmentStates = 
partitionsPerTopic.entrySet().stream()
+                .collect(Collectors.toMap(Map.Entry::getKey, e -> new 
TopicAssignmentState(e.getValue(), consumersPerTopic.get(e.getKey()))));
 
         Map<String, List<TopicPartition>> assignment = new HashMap<>();
         for (String memberId : subscriptions.keySet())
             assignment.put(memberId, new ArrayList<>());
 
-        for (Map.Entry<String, List<MemberInfo>> topicEntry : 
consumersPerTopic.entrySet()) {
+        for (Map.Entry<String, TopicAssignmentState> topicEntry : 
topicAssignmentStates.entrySet()) {
             String topic = topicEntry.getKey();
-            List<MemberInfo> consumersForTopic = topicEntry.getValue();
+            TopicAssignmentState topicAssignmentState = topicEntry.getValue();
+
+            List<PartitionInfo> partitionsForTopic = 
partitionsPerTopic.get(topic);
+            if (partitionsForTopic == null || partitionsForTopic.isEmpty())
+                continue;
+
+            // Assign based on racks first, but limit to each consumer's quota 
since we
+            // prioritize balanced assignment over locality.
+            for (String rackId : topicAssignmentState.racks) {
+                List<String> consumersForRack = 
topicAssignmentState.consumersByRack.get(rackId);
+                List<TopicPartition> partitionsForRack = 
topicAssignmentState.partitionsByRack.get(rackId);
+                doAssign(consumersForRack, partitionsForRack, assignment, 
topicAssignmentState);
+            }
+
+            // Assign any remaining partitions without matching racks, still 
maintaining balanced assignment
+            doAssign(topicAssignmentState.consumers, new 
ArrayList<>(topicAssignmentState.unassignedPartitions), assignment, 
topicAssignmentState);
+        }
+        return assignment;
+    }
 
-            Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
-            if (numPartitionsForTopic == null)
+    private void doAssign(List<String> consumers,
+                         List<TopicPartition> assignablePartitions,
+                         Map<String, List<TopicPartition>> assignment,
+                         TopicAssignmentState assignmentState) {

Review Comment:
   nit: Alignment of arguments is off.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java:
##########
@@ -76,43 +88,145 @@ private Map<String, List<MemberInfo>> 
consumersPerTopic(Map<String, Subscription
         Map<String, List<MemberInfo>> topicToConsumers = new HashMap<>();
         for (Map.Entry<String, Subscription> subscriptionEntry : 
consumerMetadata.entrySet()) {
             String consumerId = subscriptionEntry.getKey();
-            MemberInfo memberInfo = new MemberInfo(consumerId, 
subscriptionEntry.getValue().groupInstanceId());
-            for (String topic : subscriptionEntry.getValue().topics()) {
+            Subscription subscription = subscriptionEntry.getValue();
+            MemberInfo memberInfo = new MemberInfo(consumerId, 
subscription.groupInstanceId(), subscription.rackId());
+            for (String topic : subscription.topics()) {
                 put(topicToConsumers, topic, memberInfo);
             }
         }
         return topicToConsumers;
     }
 
     @Override
-    public Map<String, List<TopicPartition>> assign(Map<String, Integer> 
partitionsPerTopic,
-                                                    Map<String, Subscription> 
subscriptions) {
+    public Map<String, List<TopicPartition>> assignPartitions(Map<String, 
List<PartitionInfo>> partitionsPerTopic,
+                                                              Map<String, 
Subscription> subscriptions) {
         Map<String, List<MemberInfo>> consumersPerTopic = 
consumersPerTopic(subscriptions);
+        Map<String, TopicAssignmentState> topicAssignmentStates = 
partitionsPerTopic.entrySet().stream()
+                .collect(Collectors.toMap(Map.Entry::getKey, e -> new 
TopicAssignmentState(e.getValue(), consumersPerTopic.get(e.getKey()))));
 
         Map<String, List<TopicPartition>> assignment = new HashMap<>();
         for (String memberId : subscriptions.keySet())
             assignment.put(memberId, new ArrayList<>());
 
-        for (Map.Entry<String, List<MemberInfo>> topicEntry : 
consumersPerTopic.entrySet()) {
+        for (Map.Entry<String, TopicAssignmentState> topicEntry : 
topicAssignmentStates.entrySet()) {
             String topic = topicEntry.getKey();
-            List<MemberInfo> consumersForTopic = topicEntry.getValue();
+            TopicAssignmentState topicAssignmentState = topicEntry.getValue();
+
+            List<PartitionInfo> partitionsForTopic = 
partitionsPerTopic.get(topic);
+            if (partitionsForTopic == null || partitionsForTopic.isEmpty())
+                continue;
+
+            // Assign based on racks first, but limit to each consumer's quota 
since we
+            // prioritize balanced assignment over locality.
+            for (String rackId : topicAssignmentState.racks) {
+                List<String> consumersForRack = 
topicAssignmentState.consumersByRack.get(rackId);
+                List<TopicPartition> partitionsForRack = 
topicAssignmentState.partitionsByRack.get(rackId);
+                doAssign(consumersForRack, partitionsForRack, assignment, 
topicAssignmentState);
+            }
+
+            // Assign any remaining partitions without matching racks, still 
maintaining balanced assignment
+            doAssign(topicAssignmentState.consumers, new 
ArrayList<>(topicAssignmentState.unassignedPartitions), assignment, 
topicAssignmentState);
+        }
+        return assignment;
+    }
 
-            Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
-            if (numPartitionsForTopic == null)
+    private void doAssign(List<String> consumers,
+                         List<TopicPartition> assignablePartitions,
+                         Map<String, List<TopicPartition>> assignment,
+                         TopicAssignmentState assignmentState) {
+        assignablePartitions.removeIf(tp -> 
!assignmentState.unassignedPartitions.contains(tp));
+        if (consumers.isEmpty() || 
assignmentState.unassignedPartitions.isEmpty() || 
assignablePartitions.isEmpty())
+            return;
+
+        int start = 0;
+        for (String consumer : consumers) {
+            List<TopicPartition> consumerAssignment = assignment.get(consumer);
+            int numAssignable = 
Math.min(assignmentState.maxAssignable(consumer), assignablePartitions.size());
+            if (numAssignable <= 0)
                 continue;
 
-            Collections.sort(consumersForTopic);
+            List<TopicPartition> partitionsToAssign = 
assignablePartitions.subList(start, start + numAssignable);
+            consumerAssignment.addAll(partitionsToAssign);
+            assignmentState.onAssigned(consumer, partitionsToAssign);
+            start += numAssignable;
+            if (start >= assignablePartitions.size())
+                break;
+        }
+    }
+
+    @Override
+    public Map<String, List<TopicPartition>> assign(Map<String, Integer> 
partitionsPerTopic,
+                                                    Map<String, Subscription> 
subscriptions) {
+        return 
assignPartitions(partitionInfosWithoutRacks(partitionsPerTopic), subscriptions);
+    }
+
+    private class TopicAssignmentState {
+        private final List<String> consumers;
+        private final List<String> racks;
+        private final Map<String, List<TopicPartition>> partitionsByRack;
+        private final Map<String, List<String>> consumersByRack;
+
+        private final List<TopicPartition> unassignedPartitions;
+        private final Map<String, Integer> numAssignedByConsumer;
+        private final int numPartitionsPerConsumer;
+        private final int numConsumersWithExtraPartition;
 
-            int numPartitionsPerConsumer = numPartitionsForTopic / 
consumersForTopic.size();
-            int consumersWithExtraPartition = numPartitionsForTopic % 
consumersForTopic.size();
+        public TopicAssignmentState(List<PartitionInfo> partitionInfos, 
List<MemberInfo> membersOrNull) {
+            List<MemberInfo> members = membersOrNull == null ? 
Collections.emptyList() : membersOrNull;
+            Collections.sort(members);
+            consumers = members.stream().map(c -> 
c.memberId).collect(Collectors.toList());
 
-            List<TopicPartition> partitions = 
AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);
-            for (int i = 0, n = consumersForTopic.size(); i < n; i++) {
-                int start = numPartitionsPerConsumer * i + Math.min(i, 
consumersWithExtraPartition);
-                int length = numPartitionsPerConsumer + (i + 1 > 
consumersWithExtraPartition ? 0 : 1);
-                
assignment.get(consumersForTopic.get(i).memberId).addAll(partitions.subList(start,
 start + length));
+            this.unassignedPartitions = partitionInfos.stream().map(p -> new 
TopicPartition(p.topic(), p.partition()))
+                    .collect(Collectors.toCollection(LinkedList::new));
+            this.numAssignedByConsumer = 
consumers.stream().collect(Collectors.toMap(Function.identity(), c -> 0));
+            numPartitionsPerConsumer = consumers.isEmpty() ? 0 : 
partitionInfos.size() / consumers.size();
+            numConsumersWithExtraPartition = consumers.isEmpty() ? 0 : 
partitionInfos.size() % consumers.size();
+
+            Map<String, List<String>> consumersByRack = new HashMap<>();
+            members.forEach(consumer -> put(consumersByRack, 
consumer.rackId.orElse(""), consumer.memberId));
+            consumersByRack.remove("");

Review Comment:
   nit: Did you consider adding an `if` statement in the `forEach` instead of 
removing `""` afterwards? This would be a bit more explicit in my opinion.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java:
##########
@@ -84,13 +110,65 @@ protected static List<TopicPartition> partitions(String 
topic, int numPartitions
         return partitions;
     }
 
+    protected static Map<String, List<PartitionInfo>> 
partitionInfosWithoutRacks(Map<String, Integer> partitionsPerTopic) {
+        return 
partitionsPerTopic.entrySet().stream().collect(Collectors.toMap(Entry::getKey, 
e -> {
+            String topic = e.getKey();
+            int numPartitions = e.getValue();
+            List<PartitionInfo> partitionInfos = new 
ArrayList<>(numPartitions);
+            for (int i = 0; i < numPartitions; i++)
+                partitionInfos.add(new PartitionInfo(topic, i, Node.noNode(), 
NO_NODES, NO_NODES));
+            return partitionInfos;
+        }));
+    }
+
+    protected static Map<String, List<TopicPartition>> 
partitionsByRack(List<PartitionInfo> partitionInfos, Map<TopicPartition, 
Set<String>> partitionRacks) {

Review Comment:
   I find this method a bit unintuitive. It is named `partitionsByRack` and 
computes `partitionsByRack` but also gives you `partitionRacks` at the same 
time. Have you combined them for performance reasons?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java:
##########
@@ -173,18 +202,23 @@ private boolean allSubscriptionsEqual(Set<String> 
allTopics,
      *     we're still under the number of expected max capacity members
      * 2. Fill remaining members up to the expected numbers of maxQuota 
partitions, otherwise, to minQuota partitions
      *
-     * @param partitionsPerTopic                   The number of partitions 
for each subscribed topic
+     * For rack-aware algorithm, only owned partitions with matching racks are 
allocated by 1). And 2) allocates
+     * partitions with matching racks first before allocating remaining 
without rack-matching.
+     *
+     * @param partitionsPerTopic                   The partitions for each 
subscribed topic
      * @param consumerToOwnedPartitions            Each consumer's previously 
owned and still-subscribed partitions
      * @param partitionsWithMultiplePreviousOwners The partitions being 
claimed in the previous assignment of multiple consumers
+     * @param rackInfo                             Rack information for 
consumers and racks
      *
      * @return                                     Map from each member to the 
list of partitions assigned to them.
      */
-    private Map<String, List<TopicPartition>> constrainedAssign(Map<String, 
Integer> partitionsPerTopic,
+    private Map<String, List<TopicPartition>> constrainedAssign(Map<String, 
List<PartitionInfo>> partitionsPerTopic,

Review Comment:
   Wow. This method is getting long...



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java:
##########
@@ -60,31 +71,49 @@ static final class ConsumerGenerationPair {
     public static final class MemberData {
         public final List<TopicPartition> partitions;
         public final Optional<Integer> generation;
-        public MemberData(List<TopicPartition> partitions, Optional<Integer> 
generation) {
+        public final Optional<String> rackId;
+        public MemberData(List<TopicPartition> partitions, Optional<Integer> 
generation, Optional<String> rackId) {
             this.partitions = partitions;
             this.generation = generation;
+            this.rackId = rackId;
+        }
+
+        public MemberData(List<TopicPartition> partitions, Optional<Integer> 
generation) {
+            this(partitions, generation, Optional.empty());
         }
     }
 
     abstract protected MemberData memberData(Subscription subscription);
 
     @Override
-    public Map<String, List<TopicPartition>> assign(Map<String, Integer> 
partitionsPerTopic,
-                                                    Map<String, Subscription> 
subscriptions) {
+    public Map<String, List<TopicPartition>> assignPartitions(Map<String, 
List<PartitionInfo>> partitionsPerTopic,
+                                                              Map<String, 
Subscription> subscriptions) {
         Map<String, List<TopicPartition>> consumerToOwnedPartitions = new 
HashMap<>();
         Set<TopicPartition> partitionsWithMultiplePreviousOwners = new 
HashSet<>();
-        if (allSubscriptionsEqual(partitionsPerTopic.keySet(), subscriptions, 
consumerToOwnedPartitions, partitionsWithMultiplePreviousOwners)) {
+
+        List<PartitionInfo> allPartitions = new ArrayList<>();
+        NavigableMap<String, List<PartitionInfo>> sortedPartitionsPerTopic = 
new TreeMap<>(partitionsPerTopic);

Review Comment:
   Why do we need to do this?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java:
##########
@@ -268,6 +304,47 @@ private Map<String, List<TopicPartition>> 
constrainedAssign(Map<String, Integer>
 
         Collections.sort(unfilledMembersWithUnderMinQuotaPartitions);
         Collections.sort(unfilledMembersWithExactlyMinQuotaPartitions);
+        unassignedPartitions = 
rackInfo.sortPartitionsByRackConsumers(unassignedPartitions);
+
+        // Round-Robin filling within racks for remaining members up to the 
expected numbers of maxQuota,
+        // otherwise, to minQuota

Review Comment:
   Could we expand this comment a little more to give more details about the 
logic implemented in this block. It is not easy to dive into it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to