rajinisivaram commented on code in PR #12990:
URL: https://github.com/apache/kafka/pull/12990#discussion_r1112025710


##########
clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java:
##########
@@ -76,43 +99,185 @@ private Map<String, List<MemberInfo>> 
consumersPerTopic(Map<String, Subscription
         Map<String, List<MemberInfo>> topicToConsumers = new HashMap<>();
         for (Map.Entry<String, Subscription> subscriptionEntry : 
consumerMetadata.entrySet()) {
             String consumerId = subscriptionEntry.getKey();
-            MemberInfo memberInfo = new MemberInfo(consumerId, 
subscriptionEntry.getValue().groupInstanceId());
-            for (String topic : subscriptionEntry.getValue().topics()) {
+            Subscription subscription = subscriptionEntry.getValue();
+            MemberInfo memberInfo = new MemberInfo(consumerId, 
subscription.groupInstanceId(), subscription.rackId());
+            for (String topic : subscription.topics()) {
                 put(topicToConsumers, topic, memberInfo);
             }
         }
         return topicToConsumers;
     }
 
     @Override
-    public Map<String, List<TopicPartition>> assign(Map<String, Integer> 
partitionsPerTopic,
-                                                    Map<String, Subscription> 
subscriptions) {
+    public Map<String, List<TopicPartition>> assignPartitions(Map<String, 
List<PartitionInfo>> partitionsPerTopic,
+                                                              Map<String, 
Subscription> subscriptions) {
         Map<String, List<MemberInfo>> consumersPerTopic = 
consumersPerTopic(subscriptions);
+        List<TopicAssignmentState> topicAssignmentStates = 
partitionsPerTopic.entrySet().stream()
+                .filter(e -> !e.getValue().isEmpty())
+                .map(e -> new TopicAssignmentState(e.getKey(), e.getValue(), 
consumersPerTopic.get(e.getKey())))
+                .collect(Collectors.toList());
 
         Map<String, List<TopicPartition>> assignment = new HashMap<>();
         for (String memberId : subscriptions.keySet())
             assignment.put(memberId, new ArrayList<>());
 
-        for (Map.Entry<String, List<MemberInfo>> topicEntry : 
consumersPerTopic.entrySet()) {
-            String topic = topicEntry.getKey();
-            List<MemberInfo> consumersForTopic = topicEntry.getValue();
+        boolean useRackAware = topicAssignmentStates.stream().anyMatch(t -> 
t.needsRackAwareAssignment);
+        if (useRackAware)
+            assignWithRackMatching(topicAssignmentStates, assignment);
+
+        topicAssignmentStates.forEach(t -> assignRanges(t, (c, tp) -> true, 
assignment));
+
+        if (useRackAware)
+            assignment.values().forEach(list -> 
list.sort(PARTITION_COMPARATOR));
+        return assignment;
+    }
+
+    // This method is not used, but retained for compatibility with any custom 
assignors that extend this class.
+    @Override
+    public Map<String, List<TopicPartition>> assign(Map<String, Integer> 
partitionsPerTopic,
+                                                    Map<String, Subscription> 
subscriptions) {
+        return 
assignPartitions(partitionInfosWithoutRacks(partitionsPerTopic), subscriptions);
+    }
+
+    private void assignRanges(TopicAssignmentState assignmentState,
+                              BiFunction<String, TopicPartition, Boolean> 
mayAssign,
+                              Map<String, List<TopicPartition>> assignment) {
+        for (String consumer : assignmentState.consumers) {
+            if (assignmentState.unassignedPartitions.isEmpty())
+                break;
+            List<TopicPartition> assignablePartitions = 
assignmentState.unassignedPartitions.stream()
+                    .filter(tp -> mayAssign.apply(consumer, tp))
+                    .collect(Collectors.toList());
 
-            Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
-            if (numPartitionsForTopic == null)
+            int maxAssignable = 
Math.min(assignmentState.maxAssignable(consumer), assignablePartitions.size());
+            if (maxAssignable <= 0)
                 continue;
 
-            Collections.sort(consumersForTopic);
+            assign(consumer, assignablePartitions.subList(0, maxAssignable), 
assignmentState, assignment);
+        }
+    }
+
+    private void assignWithRackMatching(Collection<TopicAssignmentState> 
assignmentStates,
+                                        Map<String, List<TopicPartition>> 
assignment) {
 
-            int numPartitionsPerConsumer = numPartitionsForTopic / 
consumersForTopic.size();
-            int consumersWithExtraPartition = numPartitionsForTopic % 
consumersForTopic.size();
+        assignmentStates.stream().collect(Collectors.groupingBy(t -> 
t.consumers)).forEach((consumers, states) -> {
+            states.stream().collect(Collectors.groupingBy(t -> 
t.partitionRacks.size())).forEach((numPartitions, coPartitionedStates) -> {
+                if (coPartitionedStates.size() > 1)
+                    assignCoPartitionedWithRackMatching(consumers, 
numPartitions, states, assignment);
+                else {
+                    TopicAssignmentState state = coPartitionedStates.get(0);
+                    if (state.needsRackAwareAssignment)
+                        assignRanges(state, state::racksMatch, assignment);
+                }
+            });
+        });
+    }
+
+    private void assignCoPartitionedWithRackMatching(List<String> consumers,
+                                                     int numPartitions,
+                                                     
Collection<TopicAssignmentState> assignmentStates,
+                                                     Map<String, 
List<TopicPartition>> assignment) {
+
+        List<String> remainingConsumers = new LinkedList<>(consumers);
+        for (int i = 0; i < numPartitions; i++) {

Review Comment:
   I thought about these type of edge cases and felt that the logic is already 
much more complicated than it used to be. Handling edge cases would make it 
even more complex in terms of both implementation and testing. For example, in 
this particular scenario, we have n topics with p partitions and the n*p 
partitions could have replicas on different number of racks, but we want to 
co-partition. The current implementation works in a typical case where 
partition and consumer racks are uniform. We could extend further in follow-on 
PRs if we find that the edge cases are likely scenarios. What do you think?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/RangeAssignorTest.java:
##########
@@ -302,10 +339,151 @@ public void 
testStaticMemberRangeAssignmentPersistentAfterMemberIdChanges() {
         assertEquals(staticAssignment, newStaticAssignment);
     }
 
-    static Map<String, List<TopicPartition>> 
checkStaticAssignment(AbstractPartitionAssignor assignor,
-                                                                   Map<String, 
Integer> partitionsPerTopic,
-                                                                   Map<String, 
Subscription> consumers) {
-        Map<String, List<TopicPartition>> assignmentByMemberId = 
assignor.assign(partitionsPerTopic, consumers);
+    @Test
+    public void testRackAwareAssignmentWithUniformSubscription() {
+        Map<String, Integer> topics = mkMap(mkEntry("t1", 6), mkEntry("t2", 
7), mkEntry("t3", 2));
+        List<String> allTopics = asList("t1", "t2", "t3");
+        List<List<String>> consumerTopics = asList(allTopics, allTopics, 
allTopics);
+        List<String> nonRackAwareAssignment = asList(
+                "t1-0, t1-1, t2-0, t2-1, t2-2, t3-0",
+                "t1-2, t1-3, t2-3, t2-4, t3-1",
+                "t1-4, t1-5, t2-5, t2-6"
+        );
+
+        // Verify combinations where rack-aware logic is not used.
+        verifyNonRackAwareAssignment(topics, consumerTopics, 
nonRackAwareAssignment);

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to