rajinisivaram commented on code in PR #12990:
URL: https://github.com/apache/kafka/pull/12990#discussion_r1112013804


##########
clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java:
##########
@@ -76,43 +99,185 @@ private Map<String, List<MemberInfo>> 
consumersPerTopic(Map<String, Subscription
         Map<String, List<MemberInfo>> topicToConsumers = new HashMap<>();
         for (Map.Entry<String, Subscription> subscriptionEntry : 
consumerMetadata.entrySet()) {
             String consumerId = subscriptionEntry.getKey();
-            MemberInfo memberInfo = new MemberInfo(consumerId, 
subscriptionEntry.getValue().groupInstanceId());
-            for (String topic : subscriptionEntry.getValue().topics()) {
+            Subscription subscription = subscriptionEntry.getValue();
+            MemberInfo memberInfo = new MemberInfo(consumerId, 
subscription.groupInstanceId(), subscription.rackId());
+            for (String topic : subscription.topics()) {
                 put(topicToConsumers, topic, memberInfo);
             }
         }
         return topicToConsumers;
     }
 
     @Override
-    public Map<String, List<TopicPartition>> assign(Map<String, Integer> 
partitionsPerTopic,
-                                                    Map<String, Subscription> 
subscriptions) {
+    public Map<String, List<TopicPartition>> assignPartitions(Map<String, 
List<PartitionInfo>> partitionsPerTopic,
+                                                              Map<String, 
Subscription> subscriptions) {
         Map<String, List<MemberInfo>> consumersPerTopic = 
consumersPerTopic(subscriptions);
+        List<TopicAssignmentState> topicAssignmentStates = 
partitionsPerTopic.entrySet().stream()
+                .filter(e -> !e.getValue().isEmpty())
+                .map(e -> new TopicAssignmentState(e.getKey(), e.getValue(), 
consumersPerTopic.get(e.getKey())))
+                .collect(Collectors.toList());
 
         Map<String, List<TopicPartition>> assignment = new HashMap<>();
         for (String memberId : subscriptions.keySet())
             assignment.put(memberId, new ArrayList<>());
 
-        for (Map.Entry<String, List<MemberInfo>> topicEntry : 
consumersPerTopic.entrySet()) {
-            String topic = topicEntry.getKey();
-            List<MemberInfo> consumersForTopic = topicEntry.getValue();
+        boolean useRackAware = topicAssignmentStates.stream().anyMatch(t -> 
t.needsRackAwareAssignment);
+        if (useRackAware)
+            assignWithRackMatching(topicAssignmentStates, assignment);
+
+        topicAssignmentStates.forEach(t -> assignRanges(t, (c, tp) -> true, 
assignment));
+
+        if (useRackAware)
+            assignment.values().forEach(list -> 
list.sort(PARTITION_COMPARATOR));
+        return assignment;
+    }
+
+    // This method is not used, but retained for compatibility with any custom 
assignors that extend this class.
+    @Override
+    public Map<String, List<TopicPartition>> assign(Map<String, Integer> 
partitionsPerTopic,
+                                                    Map<String, Subscription> 
subscriptions) {
+        return 
assignPartitions(partitionInfosWithoutRacks(partitionsPerTopic), subscriptions);
+    }
+
+    private void assignRanges(TopicAssignmentState assignmentState,
+                              BiFunction<String, TopicPartition, Boolean> 
mayAssign,
+                              Map<String, List<TopicPartition>> assignment) {
+        for (String consumer : assignmentState.consumers) {
+            if (assignmentState.unassignedPartitions.isEmpty())
+                break;
+            List<TopicPartition> assignablePartitions = 
assignmentState.unassignedPartitions.stream()
+                    .filter(tp -> mayAssign.apply(consumer, tp))
+                    .collect(Collectors.toList());
 
-            Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
-            if (numPartitionsForTopic == null)
+            int maxAssignable = 
Math.min(assignmentState.maxAssignable(consumer), assignablePartitions.size());
+            if (maxAssignable <= 0)
                 continue;
 
-            Collections.sort(consumersForTopic);
+            assign(consumer, assignablePartitions.subList(0, maxAssignable), 
assignmentState, assignment);
+        }
+    }
+
+    private void assignWithRackMatching(Collection<TopicAssignmentState> 
assignmentStates,
+                                        Map<String, List<TopicPartition>> 
assignment) {
 
-            int numPartitionsPerConsumer = numPartitionsForTopic / 
consumersForTopic.size();
-            int consumersWithExtraPartition = numPartitionsForTopic % 
consumersForTopic.size();
+        assignmentStates.stream().collect(Collectors.groupingBy(t -> 
t.consumers)).forEach((consumers, states) -> {
+            states.stream().collect(Collectors.groupingBy(t -> 
t.partitionRacks.size())).forEach((numPartitions, coPartitionedStates) -> {
+                if (coPartitionedStates.size() > 1)
+                    assignCoPartitionedWithRackMatching(consumers, 
numPartitions, states, assignment);
+                else {
+                    TopicAssignmentState state = coPartitionedStates.get(0);
+                    if (state.needsRackAwareAssignment)
+                        assignRanges(state, state::racksMatch, assignment);
+                }
+            });
+        });
+    }
+
+    private void assignCoPartitionedWithRackMatching(List<String> consumers,
+                                                     int numPartitions,
+                                                     
Collection<TopicAssignmentState> assignmentStates,
+                                                     Map<String, 
List<TopicPartition>> assignment) {
+
+        List<String> remainingConsumers = new LinkedList<>(consumers);
+        for (int i = 0; i < numPartitions; i++) {
+            int p = i;
 
-            List<TopicPartition> partitions = 
AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);
-            for (int i = 0, n = consumersForTopic.size(); i < n; i++) {
-                int start = numPartitionsPerConsumer * i + Math.min(i, 
consumersWithExtraPartition);
-                int length = numPartitionsPerConsumer + (i + 1 > 
consumersWithExtraPartition ? 0 : 1);
-                
assignment.get(consumersForTopic.get(i).memberId).addAll(partitions.subList(start,
 start + length));
+            Optional<String> matchingConsumer = remainingConsumers.stream()
+                    .filter(c -> assignmentStates.stream().allMatch(t -> 
t.racksMatch(c, new TopicPartition(t.topic, p)) && t.maxAssignable(c) > 0))
+                    .findFirst();
+            if (matchingConsumer.isPresent()) {
+                String consumer = matchingConsumer.get();
+                assignmentStates.forEach(t -> assign(consumer, 
Collections.singletonList(new TopicPartition(t.topic, p)), t, assignment));
+
+                if (assignmentStates.stream().noneMatch(t -> 
t.maxAssignable(consumer) > 0)) {
+                    remainingConsumers.remove(consumer);
+                    if (remainingConsumers.isEmpty())
+                        break;
+                }
             }
         }
-        return assignment;
+    }
+
+    private void assign(String consumer, List<TopicPartition> partitions, 
TopicAssignmentState assignmentState, Map<String, List<TopicPartition>> 
assignment) {
+        assignment.get(consumer).addAll(partitions);
+        assignmentState.onAssigned(consumer, partitions);
+    }
+
+    private class TopicAssignmentState {
+        private final String topic;
+        private final List<String> consumers;
+        private final boolean needsRackAwareAssignment;
+        private final Map<TopicPartition, Set<String>> partitionRacks;
+        private final Map<String, String> consumerRacks;

Review Comment:
   It was useful to have the consumer racks along with the partition racks for 
each topic for matching racks. But we don't need to populate it for each topic. 
I have removed this map and updated the list of consumers of the topic to 
include the rack.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to