dajac commented on code in PR #12914: URL: https://github.com/apache/kafka/pull/12914#discussion_r1037881208
########## clients/src/main/resources/common/message/ConsumerProtocolAssignment.json: ########## @@ -23,6 +23,7 @@ // that new versions cannot remove or reorder any of the existing fields. // // Version 2 is to support a new field "GenerationId" in ConsumerProtocolSubscription. + // Version 3 adds rack id to ConsumerProtocolSubscription. "validVersions": "0-2", Review Comment: We need to bump the version here. ########## clients/src/main/resources/common/message/ConsumerProtocolSubscription.json: ########## @@ -24,6 +24,7 @@ // Version 1 added the "OwnedPartitions" field to allow assigner know what partitions each member owned // Version 2 added a new field "GenerationId" to indicate if the member has out-of-date ownedPartitions. + // Version 3 adds rack id to enable rack-aware assignment. "validVersions": "0-2", Review Comment: We need to bump the version here as well. ########## clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java: ########## @@ -76,43 +88,145 @@ private Map<String, List<MemberInfo>> consumersPerTopic(Map<String, Subscription Map<String, List<MemberInfo>> topicToConsumers = new HashMap<>(); for (Map.Entry<String, Subscription> subscriptionEntry : consumerMetadata.entrySet()) { String consumerId = subscriptionEntry.getKey(); - MemberInfo memberInfo = new MemberInfo(consumerId, subscriptionEntry.getValue().groupInstanceId()); - for (String topic : subscriptionEntry.getValue().topics()) { + Subscription subscription = subscriptionEntry.getValue(); + MemberInfo memberInfo = new MemberInfo(consumerId, subscription.groupInstanceId(), subscription.rackId()); + for (String topic : subscription.topics()) { put(topicToConsumers, topic, memberInfo); } } return topicToConsumers; } @Override - public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic, - Map<String, Subscription> subscriptions) { + public Map<String, List<TopicPartition>> assignPartitions(Map<String, List<PartitionInfo>> partitionsPerTopic, + Map<String, Subscription> subscriptions) { Map<String, List<MemberInfo>> consumersPerTopic = consumersPerTopic(subscriptions); + Map<String, TopicAssignmentState> topicAssignmentStates = partitionsPerTopic.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> new TopicAssignmentState(e.getValue(), consumersPerTopic.get(e.getKey())))); Map<String, List<TopicPartition>> assignment = new HashMap<>(); for (String memberId : subscriptions.keySet()) assignment.put(memberId, new ArrayList<>()); - for (Map.Entry<String, List<MemberInfo>> topicEntry : consumersPerTopic.entrySet()) { + for (Map.Entry<String, TopicAssignmentState> topicEntry : topicAssignmentStates.entrySet()) { Review Comment: I have a general question here. One of the key promise of the range assignor is that it co-partitions partitions. If you have two topics (foo and bar) with three partitions each and two consumers, the first consumer will get foo-0, bar-0, foo-1, bar-1 and the second one will get foo-2, bar-2. I am trying to understand if we still maintain this property with the rack aware assignment. Do we? I suppose that we do when all the partitions have all the same racks. I am not sure about the case where for instance foo would have no rack or only a subset of the racks (e.g. brokers are offline). ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java: ########## @@ -268,6 +304,47 @@ private Map<String, List<TopicPartition>> constrainedAssign(Map<String, Integer> Collections.sort(unfilledMembersWithUnderMinQuotaPartitions); Collections.sort(unfilledMembersWithExactlyMinQuotaPartitions); + unassignedPartitions = rackInfo.sortPartitionsByRackConsumers(unassignedPartitions); + + // Round-Robin filling within racks for remaining members up to the expected numbers of maxQuota, + // otherwise, to minQuota + int nextUnfilledConsumerIndex = 0; + Iterator<TopicPartition> unassignedIter = unassignedPartitions.iterator(); + while (!rackInfo.consumerRacks.isEmpty() && unassignedIter.hasNext()) { + TopicPartition unassignedPartition = unassignedIter.next(); + String consumer = null; + int nextIndex = rackInfo.nextRackConsumer(unassignedPartition, unfilledMembersWithUnderMinQuotaPartitions, nextUnfilledConsumerIndex); + if (nextIndex >= 0) { + consumer = unfilledMembersWithUnderMinQuotaPartitions.get(nextIndex); + int assignmentCount = assignment.get(consumer).size() + 1; + if (assignmentCount >= minQuota) { + unfilledMembersWithUnderMinQuotaPartitions.remove(consumer); + if (assignmentCount < maxQuota) + unfilledMembersWithExactlyMinQuotaPartitions.add(consumer); + } else + nextIndex++; Review Comment: nit: I would put curly braces for the `else` statement to be consistent with the `if` statement. ########## clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java: ########## @@ -46,20 +53,20 @@ public AbstractStickyAssignor createAssignor() { } @Override - public Subscription buildSubscriptionV0(List<String> topics, List<TopicPartition> partitions, int generationId) { + public Subscription buildSubscriptionV0(List<String> topics, List<TopicPartition> partitions, int generationId, int consumerIndex) { // cooperative sticky assignor only supports ConsumerProtocolSubscription V1 or above return null; } @Override - public Subscription buildSubscriptionV1(List<String> topics, List<TopicPartition> partitions, int generationId) { + public Subscription buildSubscriptionV1(List<String> topics, List<TopicPartition> partitions, int generationId, int consumerIndex) { assignor.onAssignment(new ConsumerPartitionAssignor.Assignment(partitions), new ConsumerGroupMetadata(groupId, generationId, consumer1, Optional.empty())); - return new Subscription(topics, assignor.subscriptionUserData(new HashSet<>(topics)), partitions, DEFAULT_GENERATION); + return new Subscription(topics, assignor.subscriptionUserData(new HashSet<>(topics)), partitions, DEFAULT_GENERATION, consumerRackId(consumerIndex)); Review Comment: It seems that the rack should not be set here is this method simulate Subscription V1. ########## clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java: ########## @@ -76,43 +88,145 @@ private Map<String, List<MemberInfo>> consumersPerTopic(Map<String, Subscription Map<String, List<MemberInfo>> topicToConsumers = new HashMap<>(); for (Map.Entry<String, Subscription> subscriptionEntry : consumerMetadata.entrySet()) { String consumerId = subscriptionEntry.getKey(); - MemberInfo memberInfo = new MemberInfo(consumerId, subscriptionEntry.getValue().groupInstanceId()); - for (String topic : subscriptionEntry.getValue().topics()) { + Subscription subscription = subscriptionEntry.getValue(); + MemberInfo memberInfo = new MemberInfo(consumerId, subscription.groupInstanceId(), subscription.rackId()); + for (String topic : subscription.topics()) { put(topicToConsumers, topic, memberInfo); } } return topicToConsumers; } @Override - public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic, - Map<String, Subscription> subscriptions) { + public Map<String, List<TopicPartition>> assignPartitions(Map<String, List<PartitionInfo>> partitionsPerTopic, + Map<String, Subscription> subscriptions) { Map<String, List<MemberInfo>> consumersPerTopic = consumersPerTopic(subscriptions); + Map<String, TopicAssignmentState> topicAssignmentStates = partitionsPerTopic.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> new TopicAssignmentState(e.getValue(), consumersPerTopic.get(e.getKey())))); Map<String, List<TopicPartition>> assignment = new HashMap<>(); for (String memberId : subscriptions.keySet()) assignment.put(memberId, new ArrayList<>()); - for (Map.Entry<String, List<MemberInfo>> topicEntry : consumersPerTopic.entrySet()) { + for (Map.Entry<String, TopicAssignmentState> topicEntry : topicAssignmentStates.entrySet()) { String topic = topicEntry.getKey(); - List<MemberInfo> consumersForTopic = topicEntry.getValue(); + TopicAssignmentState topicAssignmentState = topicEntry.getValue(); + + List<PartitionInfo> partitionsForTopic = partitionsPerTopic.get(topic); + if (partitionsForTopic == null || partitionsForTopic.isEmpty()) + continue; + + // Assign based on racks first, but limit to each consumer's quota since we + // prioritize balanced assignment over locality. + for (String rackId : topicAssignmentState.racks) { + List<String> consumersForRack = topicAssignmentState.consumersByRack.get(rackId); + List<TopicPartition> partitionsForRack = topicAssignmentState.partitionsByRack.get(rackId); + doAssign(consumersForRack, partitionsForRack, assignment, topicAssignmentState); + } + + // Assign any remaining partitions without matching racks, still maintaining balanced assignment + doAssign(topicAssignmentState.consumers, new ArrayList<>(topicAssignmentState.unassignedPartitions), assignment, topicAssignmentState); + } + return assignment; + } - Integer numPartitionsForTopic = partitionsPerTopic.get(topic); - if (numPartitionsForTopic == null) + private void doAssign(List<String> consumers, + List<TopicPartition> assignablePartitions, + Map<String, List<TopicPartition>> assignment, + TopicAssignmentState assignmentState) { Review Comment: nit: Alignment of arguments is off. ########## clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java: ########## @@ -76,43 +88,145 @@ private Map<String, List<MemberInfo>> consumersPerTopic(Map<String, Subscription Map<String, List<MemberInfo>> topicToConsumers = new HashMap<>(); for (Map.Entry<String, Subscription> subscriptionEntry : consumerMetadata.entrySet()) { String consumerId = subscriptionEntry.getKey(); - MemberInfo memberInfo = new MemberInfo(consumerId, subscriptionEntry.getValue().groupInstanceId()); - for (String topic : subscriptionEntry.getValue().topics()) { + Subscription subscription = subscriptionEntry.getValue(); + MemberInfo memberInfo = new MemberInfo(consumerId, subscription.groupInstanceId(), subscription.rackId()); + for (String topic : subscription.topics()) { put(topicToConsumers, topic, memberInfo); } } return topicToConsumers; } @Override - public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic, - Map<String, Subscription> subscriptions) { + public Map<String, List<TopicPartition>> assignPartitions(Map<String, List<PartitionInfo>> partitionsPerTopic, + Map<String, Subscription> subscriptions) { Map<String, List<MemberInfo>> consumersPerTopic = consumersPerTopic(subscriptions); + Map<String, TopicAssignmentState> topicAssignmentStates = partitionsPerTopic.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> new TopicAssignmentState(e.getValue(), consumersPerTopic.get(e.getKey())))); Map<String, List<TopicPartition>> assignment = new HashMap<>(); for (String memberId : subscriptions.keySet()) assignment.put(memberId, new ArrayList<>()); - for (Map.Entry<String, List<MemberInfo>> topicEntry : consumersPerTopic.entrySet()) { + for (Map.Entry<String, TopicAssignmentState> topicEntry : topicAssignmentStates.entrySet()) { String topic = topicEntry.getKey(); - List<MemberInfo> consumersForTopic = topicEntry.getValue(); + TopicAssignmentState topicAssignmentState = topicEntry.getValue(); + + List<PartitionInfo> partitionsForTopic = partitionsPerTopic.get(topic); + if (partitionsForTopic == null || partitionsForTopic.isEmpty()) + continue; + + // Assign based on racks first, but limit to each consumer's quota since we + // prioritize balanced assignment over locality. + for (String rackId : topicAssignmentState.racks) { + List<String> consumersForRack = topicAssignmentState.consumersByRack.get(rackId); + List<TopicPartition> partitionsForRack = topicAssignmentState.partitionsByRack.get(rackId); + doAssign(consumersForRack, partitionsForRack, assignment, topicAssignmentState); + } + + // Assign any remaining partitions without matching racks, still maintaining balanced assignment + doAssign(topicAssignmentState.consumers, new ArrayList<>(topicAssignmentState.unassignedPartitions), assignment, topicAssignmentState); + } + return assignment; + } - Integer numPartitionsForTopic = partitionsPerTopic.get(topic); - if (numPartitionsForTopic == null) + private void doAssign(List<String> consumers, + List<TopicPartition> assignablePartitions, + Map<String, List<TopicPartition>> assignment, + TopicAssignmentState assignmentState) { + assignablePartitions.removeIf(tp -> !assignmentState.unassignedPartitions.contains(tp)); + if (consumers.isEmpty() || assignmentState.unassignedPartitions.isEmpty() || assignablePartitions.isEmpty()) + return; + + int start = 0; + for (String consumer : consumers) { + List<TopicPartition> consumerAssignment = assignment.get(consumer); + int numAssignable = Math.min(assignmentState.maxAssignable(consumer), assignablePartitions.size()); + if (numAssignable <= 0) continue; - Collections.sort(consumersForTopic); + List<TopicPartition> partitionsToAssign = assignablePartitions.subList(start, start + numAssignable); + consumerAssignment.addAll(partitionsToAssign); + assignmentState.onAssigned(consumer, partitionsToAssign); + start += numAssignable; + if (start >= assignablePartitions.size()) + break; + } + } + + @Override + public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic, + Map<String, Subscription> subscriptions) { + return assignPartitions(partitionInfosWithoutRacks(partitionsPerTopic), subscriptions); + } + + private class TopicAssignmentState { + private final List<String> consumers; + private final List<String> racks; + private final Map<String, List<TopicPartition>> partitionsByRack; + private final Map<String, List<String>> consumersByRack; + + private final List<TopicPartition> unassignedPartitions; + private final Map<String, Integer> numAssignedByConsumer; + private final int numPartitionsPerConsumer; + private final int numConsumersWithExtraPartition; - int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size(); - int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size(); + public TopicAssignmentState(List<PartitionInfo> partitionInfos, List<MemberInfo> membersOrNull) { + List<MemberInfo> members = membersOrNull == null ? Collections.emptyList() : membersOrNull; + Collections.sort(members); + consumers = members.stream().map(c -> c.memberId).collect(Collectors.toList()); - List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic); - for (int i = 0, n = consumersForTopic.size(); i < n; i++) { - int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition); - int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1); - assignment.get(consumersForTopic.get(i).memberId).addAll(partitions.subList(start, start + length)); + this.unassignedPartitions = partitionInfos.stream().map(p -> new TopicPartition(p.topic(), p.partition())) + .collect(Collectors.toCollection(LinkedList::new)); + this.numAssignedByConsumer = consumers.stream().collect(Collectors.toMap(Function.identity(), c -> 0)); + numPartitionsPerConsumer = consumers.isEmpty() ? 0 : partitionInfos.size() / consumers.size(); + numConsumersWithExtraPartition = consumers.isEmpty() ? 0 : partitionInfos.size() % consumers.size(); + + Map<String, List<String>> consumersByRack = new HashMap<>(); + members.forEach(consumer -> put(consumersByRack, consumer.rackId.orElse(""), consumer.memberId)); + consumersByRack.remove(""); Review Comment: nit: Did you consider adding an `if` statement in the `forEach` instead of removing `""` afterwards? This would be a bit more explicit in my opinion. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java: ########## @@ -84,13 +110,65 @@ protected static List<TopicPartition> partitions(String topic, int numPartitions return partitions; } + protected static Map<String, List<PartitionInfo>> partitionInfosWithoutRacks(Map<String, Integer> partitionsPerTopic) { + return partitionsPerTopic.entrySet().stream().collect(Collectors.toMap(Entry::getKey, e -> { + String topic = e.getKey(); + int numPartitions = e.getValue(); + List<PartitionInfo> partitionInfos = new ArrayList<>(numPartitions); + for (int i = 0; i < numPartitions; i++) + partitionInfos.add(new PartitionInfo(topic, i, Node.noNode(), NO_NODES, NO_NODES)); + return partitionInfos; + })); + } + + protected static Map<String, List<TopicPartition>> partitionsByRack(List<PartitionInfo> partitionInfos, Map<TopicPartition, Set<String>> partitionRacks) { Review Comment: I find this method a bit unintuitive. It is named `partitionsByRack` and computes `partitionsByRack` but also gives you `partitionRacks` at the same time. Have you combined them for performance reasons? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java: ########## @@ -173,18 +202,23 @@ private boolean allSubscriptionsEqual(Set<String> allTopics, * we're still under the number of expected max capacity members * 2. Fill remaining members up to the expected numbers of maxQuota partitions, otherwise, to minQuota partitions * - * @param partitionsPerTopic The number of partitions for each subscribed topic + * For rack-aware algorithm, only owned partitions with matching racks are allocated by 1). And 2) allocates + * partitions with matching racks first before allocating remaining without rack-matching. + * + * @param partitionsPerTopic The partitions for each subscribed topic * @param consumerToOwnedPartitions Each consumer's previously owned and still-subscribed partitions * @param partitionsWithMultiplePreviousOwners The partitions being claimed in the previous assignment of multiple consumers + * @param rackInfo Rack information for consumers and racks * * @return Map from each member to the list of partitions assigned to them. */ - private Map<String, List<TopicPartition>> constrainedAssign(Map<String, Integer> partitionsPerTopic, + private Map<String, List<TopicPartition>> constrainedAssign(Map<String, List<PartitionInfo>> partitionsPerTopic, Review Comment: Wow. This method is getting long... ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java: ########## @@ -60,31 +71,49 @@ static final class ConsumerGenerationPair { public static final class MemberData { public final List<TopicPartition> partitions; public final Optional<Integer> generation; - public MemberData(List<TopicPartition> partitions, Optional<Integer> generation) { + public final Optional<String> rackId; + public MemberData(List<TopicPartition> partitions, Optional<Integer> generation, Optional<String> rackId) { this.partitions = partitions; this.generation = generation; + this.rackId = rackId; + } + + public MemberData(List<TopicPartition> partitions, Optional<Integer> generation) { + this(partitions, generation, Optional.empty()); } } abstract protected MemberData memberData(Subscription subscription); @Override - public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic, - Map<String, Subscription> subscriptions) { + public Map<String, List<TopicPartition>> assignPartitions(Map<String, List<PartitionInfo>> partitionsPerTopic, + Map<String, Subscription> subscriptions) { Map<String, List<TopicPartition>> consumerToOwnedPartitions = new HashMap<>(); Set<TopicPartition> partitionsWithMultiplePreviousOwners = new HashSet<>(); - if (allSubscriptionsEqual(partitionsPerTopic.keySet(), subscriptions, consumerToOwnedPartitions, partitionsWithMultiplePreviousOwners)) { + + List<PartitionInfo> allPartitions = new ArrayList<>(); + NavigableMap<String, List<PartitionInfo>> sortedPartitionsPerTopic = new TreeMap<>(partitionsPerTopic); Review Comment: Why do we need to do this? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java: ########## @@ -268,6 +304,47 @@ private Map<String, List<TopicPartition>> constrainedAssign(Map<String, Integer> Collections.sort(unfilledMembersWithUnderMinQuotaPartitions); Collections.sort(unfilledMembersWithExactlyMinQuotaPartitions); + unassignedPartitions = rackInfo.sortPartitionsByRackConsumers(unassignedPartitions); + + // Round-Robin filling within racks for remaining members up to the expected numbers of maxQuota, + // otherwise, to minQuota Review Comment: Could we expand this comment a little more to give more details about the logic implemented in this block. It is not easy to dive into it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org