rajinisivaram commented on code in PR #12990: URL: https://github.com/apache/kafka/pull/12990#discussion_r1121729089
########## clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java: ########## @@ -74,45 +105,194 @@ public String name() { private Map<String, List<MemberInfo>> consumersPerTopic(Map<String, Subscription> consumerMetadata) { Map<String, List<MemberInfo>> topicToConsumers = new HashMap<>(); - for (Map.Entry<String, Subscription> subscriptionEntry : consumerMetadata.entrySet()) { - String consumerId = subscriptionEntry.getKey(); - MemberInfo memberInfo = new MemberInfo(consumerId, subscriptionEntry.getValue().groupInstanceId()); - for (String topic : subscriptionEntry.getValue().topics()) { - put(topicToConsumers, topic, memberInfo); - } - } + consumerMetadata.forEach((consumerId, subscription) -> { + MemberInfo memberInfo = new MemberInfo(consumerId, subscription.groupInstanceId(), subscription.rackId()); + subscription.topics().forEach(topic -> put(topicToConsumers, topic, memberInfo)); + }); return topicToConsumers; } + /** + * Performs range assignment of the specified partitions for the consumers with the provided subscriptions. + * If rack-awareness is enabled for one or more consumers, we perform rack-aware assignment first to assign + * the subset of partitions that can be aligned on racks, while retaining the same co-partitioning and + * per-topic balancing guarantees as non-rack-aware range assignment. The remaining partitions are assigned + * using standard non-rack-aware range assignment logic, which may result in mis-aligned racks. + */ @Override - public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic, - Map<String, Subscription> subscriptions) { + public Map<String, List<TopicPartition>> assignPartitions(Map<String, List<PartitionInfo>> partitionsPerTopic, + Map<String, Subscription> subscriptions) { Map<String, List<MemberInfo>> consumersPerTopic = consumersPerTopic(subscriptions); + Map<String, String> consumerRacks = consumerRacks(subscriptions); + List<TopicAssignmentState> topicAssignmentStates = partitionsPerTopic.entrySet().stream() + .filter(e -> !e.getValue().isEmpty()) + .map(e -> new TopicAssignmentState(e.getKey(), e.getValue(), consumersPerTopic.get(e.getKey()), consumerRacks)) + .collect(Collectors.toList()); Map<String, List<TopicPartition>> assignment = new HashMap<>(); - for (String memberId : subscriptions.keySet()) - assignment.put(memberId, new ArrayList<>()); + subscriptions.keySet().forEach(memberId -> assignment.put(memberId, new ArrayList<>())); + + boolean useRackAware = topicAssignmentStates.stream().anyMatch(t -> t.needsRackAwareAssignment); + if (useRackAware) + assignWithRackMatching(topicAssignmentStates, assignment); + + topicAssignmentStates.forEach(t -> assignRanges(t, (c, tp) -> true, assignment)); + + if (useRackAware) + assignment.values().forEach(list -> list.sort(PARTITION_COMPARATOR)); + return assignment; + } + + // This method is not used, but retained for compatibility with any custom assignors that extend this class. + @Override + public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic, + Map<String, Subscription> subscriptions) { + return assignPartitions(partitionInfosWithoutRacks(partitionsPerTopic), subscriptions); + } - for (Map.Entry<String, List<MemberInfo>> topicEntry : consumersPerTopic.entrySet()) { - String topic = topicEntry.getKey(); - List<MemberInfo> consumersForTopic = topicEntry.getValue(); + private void assignRanges(TopicAssignmentState assignmentState, + BiFunction<String, TopicPartition, Boolean> mayAssign, + Map<String, List<TopicPartition>> assignment) { + for (String consumer : assignmentState.consumers.keySet()) { + if (assignmentState.unassignedPartitions.isEmpty()) + break; + List<TopicPartition> assignablePartitions = assignmentState.unassignedPartitions.stream() + .filter(tp -> mayAssign.apply(consumer, tp)) + .collect(Collectors.toList()); Review Comment: Good idea, updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org