rajinisivaram commented on code in PR #12990:
URL: https://github.com/apache/kafka/pull/12990#discussion_r1121729089


##########
clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java:
##########
@@ -74,45 +105,194 @@ public String name() {
 
     private Map<String, List<MemberInfo>> consumersPerTopic(Map<String, 
Subscription> consumerMetadata) {
         Map<String, List<MemberInfo>> topicToConsumers = new HashMap<>();
-        for (Map.Entry<String, Subscription> subscriptionEntry : 
consumerMetadata.entrySet()) {
-            String consumerId = subscriptionEntry.getKey();
-            MemberInfo memberInfo = new MemberInfo(consumerId, 
subscriptionEntry.getValue().groupInstanceId());
-            for (String topic : subscriptionEntry.getValue().topics()) {
-                put(topicToConsumers, topic, memberInfo);
-            }
-        }
+        consumerMetadata.forEach((consumerId, subscription) -> {
+            MemberInfo memberInfo = new MemberInfo(consumerId, 
subscription.groupInstanceId(), subscription.rackId());
+            subscription.topics().forEach(topic -> put(topicToConsumers, 
topic, memberInfo));
+        });
         return topicToConsumers;
     }
 
+    /**
+     * Performs range assignment of the specified partitions for the consumers 
with the provided subscriptions.
+     * If rack-awareness is enabled for one or more consumers, we perform 
rack-aware assignment first to assign
+     * the subset of partitions that can be aligned on racks, while retaining 
the same co-partitioning and
+     * per-topic balancing guarantees as non-rack-aware range assignment. The 
remaining partitions are assigned
+     * using standard non-rack-aware range assignment logic, which may result 
in mis-aligned racks.
+     */
     @Override
-    public Map<String, List<TopicPartition>> assign(Map<String, Integer> 
partitionsPerTopic,
-                                                    Map<String, Subscription> 
subscriptions) {
+    public Map<String, List<TopicPartition>> assignPartitions(Map<String, 
List<PartitionInfo>> partitionsPerTopic,
+                                                              Map<String, 
Subscription> subscriptions) {
         Map<String, List<MemberInfo>> consumersPerTopic = 
consumersPerTopic(subscriptions);
+        Map<String, String> consumerRacks = consumerRacks(subscriptions);
+        List<TopicAssignmentState> topicAssignmentStates = 
partitionsPerTopic.entrySet().stream()
+                .filter(e -> !e.getValue().isEmpty())
+                .map(e -> new TopicAssignmentState(e.getKey(), e.getValue(), 
consumersPerTopic.get(e.getKey()), consumerRacks))
+                .collect(Collectors.toList());
 
         Map<String, List<TopicPartition>> assignment = new HashMap<>();
-        for (String memberId : subscriptions.keySet())
-            assignment.put(memberId, new ArrayList<>());
+        subscriptions.keySet().forEach(memberId -> assignment.put(memberId, 
new ArrayList<>()));
+
+        boolean useRackAware = topicAssignmentStates.stream().anyMatch(t -> 
t.needsRackAwareAssignment);
+        if (useRackAware)
+            assignWithRackMatching(topicAssignmentStates, assignment);
+
+        topicAssignmentStates.forEach(t -> assignRanges(t, (c, tp) -> true, 
assignment));
+
+        if (useRackAware)
+            assignment.values().forEach(list -> 
list.sort(PARTITION_COMPARATOR));
+        return assignment;
+    }
+
+    // This method is not used, but retained for compatibility with any custom 
assignors that extend this class.
+    @Override
+    public Map<String, List<TopicPartition>> assign(Map<String, Integer> 
partitionsPerTopic,
+                                                    Map<String, Subscription> 
subscriptions) {
+        return 
assignPartitions(partitionInfosWithoutRacks(partitionsPerTopic), subscriptions);
+    }
 
-        for (Map.Entry<String, List<MemberInfo>> topicEntry : 
consumersPerTopic.entrySet()) {
-            String topic = topicEntry.getKey();
-            List<MemberInfo> consumersForTopic = topicEntry.getValue();
+    private void assignRanges(TopicAssignmentState assignmentState,
+                              BiFunction<String, TopicPartition, Boolean> 
mayAssign,
+                              Map<String, List<TopicPartition>> assignment) {
+        for (String consumer : assignmentState.consumers.keySet()) {
+            if (assignmentState.unassignedPartitions.isEmpty())
+                break;
+            List<TopicPartition> assignablePartitions = 
assignmentState.unassignedPartitions.stream()
+                    .filter(tp -> mayAssign.apply(consumer, tp))
+                    .collect(Collectors.toList());

Review Comment:
   Good idea, updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to