rajinisivaram commented on code in PR #12990:
URL: https://github.com/apache/kafka/pull/12990#discussion_r1115773500


##########
clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java:
##########
@@ -74,45 +105,194 @@ public String name() {
 
     private Map<String, List<MemberInfo>> consumersPerTopic(Map<String, 
Subscription> consumerMetadata) {
         Map<String, List<MemberInfo>> topicToConsumers = new HashMap<>();
-        for (Map.Entry<String, Subscription> subscriptionEntry : 
consumerMetadata.entrySet()) {
-            String consumerId = subscriptionEntry.getKey();
-            MemberInfo memberInfo = new MemberInfo(consumerId, 
subscriptionEntry.getValue().groupInstanceId());
-            for (String topic : subscriptionEntry.getValue().topics()) {
-                put(topicToConsumers, topic, memberInfo);
-            }
-        }
+        consumerMetadata.forEach((consumerId, subscription) -> {
+            MemberInfo memberInfo = new MemberInfo(consumerId, 
subscription.groupInstanceId(), subscription.rackId());
+            subscription.topics().forEach(topic -> put(topicToConsumers, 
topic, memberInfo));
+        });
         return topicToConsumers;
     }
 
+    /**
+     * Performs range assignment of the specified partitions for the consumers 
with the provided subscriptions.
+     * If rack-awareness is enabled for one or more consumers, we perform 
rack-aware assignment first to assign
+     * the subset of partitions that can be aligned on racks, while retaining 
the same co-partitioning and
+     * per-topic balancing guarantees as non-rack-aware range assignment. The 
remaining partitions are assigned
+     * using standard non-rack-aware range assignment logic, which may result 
in mis-aligned racks.
+     */
     @Override
-    public Map<String, List<TopicPartition>> assign(Map<String, Integer> 
partitionsPerTopic,
-                                                    Map<String, Subscription> 
subscriptions) {
+    public Map<String, List<TopicPartition>> assignPartitions(Map<String, 
List<PartitionInfo>> partitionsPerTopic,
+                                                              Map<String, 
Subscription> subscriptions) {
         Map<String, List<MemberInfo>> consumersPerTopic = 
consumersPerTopic(subscriptions);
+        Map<String, String> consumerRacks = consumerRacks(subscriptions);
+        List<TopicAssignmentState> topicAssignmentStates = 
partitionsPerTopic.entrySet().stream()
+                .filter(e -> !e.getValue().isEmpty())
+                .map(e -> new TopicAssignmentState(e.getKey(), e.getValue(), 
consumersPerTopic.get(e.getKey()), consumerRacks))
+                .collect(Collectors.toList());
 
         Map<String, List<TopicPartition>> assignment = new HashMap<>();
-        for (String memberId : subscriptions.keySet())
-            assignment.put(memberId, new ArrayList<>());
+        subscriptions.keySet().forEach(memberId -> assignment.put(memberId, 
new ArrayList<>()));
+
+        boolean useRackAware = topicAssignmentStates.stream().anyMatch(t -> 
t.needsRackAwareAssignment);
+        if (useRackAware)
+            assignWithRackMatching(topicAssignmentStates, assignment);
+
+        topicAssignmentStates.forEach(t -> assignRanges(t, (c, tp) -> true, 
assignment));
+
+        if (useRackAware)
+            assignment.values().forEach(list -> 
list.sort(PARTITION_COMPARATOR));
+        return assignment;
+    }
+
+    // This method is not used, but retained for compatibility with any custom 
assignors that extend this class.
+    @Override
+    public Map<String, List<TopicPartition>> assign(Map<String, Integer> 
partitionsPerTopic,
+                                                    Map<String, Subscription> 
subscriptions) {
+        return 
assignPartitions(partitionInfosWithoutRacks(partitionsPerTopic), subscriptions);
+    }
 
-        for (Map.Entry<String, List<MemberInfo>> topicEntry : 
consumersPerTopic.entrySet()) {
-            String topic = topicEntry.getKey();
-            List<MemberInfo> consumersForTopic = topicEntry.getValue();
+    private void assignRanges(TopicAssignmentState assignmentState,
+                              BiFunction<String, TopicPartition, Boolean> 
mayAssign,
+                              Map<String, List<TopicPartition>> assignment) {
+        for (String consumer : assignmentState.consumers.keySet()) {
+            if (assignmentState.unassignedPartitions.isEmpty())
+                break;
+            List<TopicPartition> assignablePartitions = 
assignmentState.unassignedPartitions.stream()
+                    .filter(tp -> mayAssign.apply(consumer, tp))
+                    .collect(Collectors.toList());
 
-            Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
-            if (numPartitionsForTopic == null)
+            int maxAssignable = 
Math.min(assignmentState.maxAssignable(consumer), assignablePartitions.size());
+            if (maxAssignable <= 0)
                 continue;
 
-            Collections.sort(consumersForTopic);

Review Comment:
   @dajac Thanks for the review. This `sort` is still done, I just moved it to 
TopicAssignmentState where the members are processed. All the tests in 
RangeAssignorTest are also run in rack-aware mode where all partitions have all 
racks to verify that the new algorithm gives the same result as the old 
algorithm. 
RangeAssignorTest.testStaticMemberRangeAssignmentPersistentAfterMemberIdChanges 
tests the scenario you described. I have added another version of the same test 
with a lower replication factor to verify the case where racks have a subset of 
partitions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to