rajinisivaram commented on code in PR #12990: URL: https://github.com/apache/kafka/pull/12990#discussion_r1115773500
########## clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java: ########## @@ -74,45 +105,194 @@ public String name() { private Map<String, List<MemberInfo>> consumersPerTopic(Map<String, Subscription> consumerMetadata) { Map<String, List<MemberInfo>> topicToConsumers = new HashMap<>(); - for (Map.Entry<String, Subscription> subscriptionEntry : consumerMetadata.entrySet()) { - String consumerId = subscriptionEntry.getKey(); - MemberInfo memberInfo = new MemberInfo(consumerId, subscriptionEntry.getValue().groupInstanceId()); - for (String topic : subscriptionEntry.getValue().topics()) { - put(topicToConsumers, topic, memberInfo); - } - } + consumerMetadata.forEach((consumerId, subscription) -> { + MemberInfo memberInfo = new MemberInfo(consumerId, subscription.groupInstanceId(), subscription.rackId()); + subscription.topics().forEach(topic -> put(topicToConsumers, topic, memberInfo)); + }); return topicToConsumers; } + /** + * Performs range assignment of the specified partitions for the consumers with the provided subscriptions. + * If rack-awareness is enabled for one or more consumers, we perform rack-aware assignment first to assign + * the subset of partitions that can be aligned on racks, while retaining the same co-partitioning and + * per-topic balancing guarantees as non-rack-aware range assignment. The remaining partitions are assigned + * using standard non-rack-aware range assignment logic, which may result in mis-aligned racks. + */ @Override - public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic, - Map<String, Subscription> subscriptions) { + public Map<String, List<TopicPartition>> assignPartitions(Map<String, List<PartitionInfo>> partitionsPerTopic, + Map<String, Subscription> subscriptions) { Map<String, List<MemberInfo>> consumersPerTopic = consumersPerTopic(subscriptions); + Map<String, String> consumerRacks = consumerRacks(subscriptions); + List<TopicAssignmentState> topicAssignmentStates = partitionsPerTopic.entrySet().stream() + .filter(e -> !e.getValue().isEmpty()) + .map(e -> new TopicAssignmentState(e.getKey(), e.getValue(), consumersPerTopic.get(e.getKey()), consumerRacks)) + .collect(Collectors.toList()); Map<String, List<TopicPartition>> assignment = new HashMap<>(); - for (String memberId : subscriptions.keySet()) - assignment.put(memberId, new ArrayList<>()); + subscriptions.keySet().forEach(memberId -> assignment.put(memberId, new ArrayList<>())); + + boolean useRackAware = topicAssignmentStates.stream().anyMatch(t -> t.needsRackAwareAssignment); + if (useRackAware) + assignWithRackMatching(topicAssignmentStates, assignment); + + topicAssignmentStates.forEach(t -> assignRanges(t, (c, tp) -> true, assignment)); + + if (useRackAware) + assignment.values().forEach(list -> list.sort(PARTITION_COMPARATOR)); + return assignment; + } + + // This method is not used, but retained for compatibility with any custom assignors that extend this class. + @Override + public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic, + Map<String, Subscription> subscriptions) { + return assignPartitions(partitionInfosWithoutRacks(partitionsPerTopic), subscriptions); + } - for (Map.Entry<String, List<MemberInfo>> topicEntry : consumersPerTopic.entrySet()) { - String topic = topicEntry.getKey(); - List<MemberInfo> consumersForTopic = topicEntry.getValue(); + private void assignRanges(TopicAssignmentState assignmentState, + BiFunction<String, TopicPartition, Boolean> mayAssign, + Map<String, List<TopicPartition>> assignment) { + for (String consumer : assignmentState.consumers.keySet()) { + if (assignmentState.unassignedPartitions.isEmpty()) + break; + List<TopicPartition> assignablePartitions = assignmentState.unassignedPartitions.stream() + .filter(tp -> mayAssign.apply(consumer, tp)) + .collect(Collectors.toList()); - Integer numPartitionsForTopic = partitionsPerTopic.get(topic); - if (numPartitionsForTopic == null) + int maxAssignable = Math.min(assignmentState.maxAssignable(consumer), assignablePartitions.size()); + if (maxAssignable <= 0) continue; - Collections.sort(consumersForTopic); Review Comment: @dajac Thanks for the review. This `sort` is still done, I just moved it to TopicAssignmentState where the members are processed. All the tests in RangeAssignorTest are also run in rack-aware mode where all partitions have all racks to verify that the new algorithm gives the same result as the old algorithm. RangeAssignorTest.testStaticMemberRangeAssignmentPersistentAfterMemberIdChanges tests the scenario you described. I have added another version of the same test with a lower replication factor to verify the case where racks have a subset of partitions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org