dajac commented on code in PR #12914:
URL: https://github.com/apache/kafka/pull/12914#discussion_r1037881208
##
clients/src/main/resources/common/message/ConsumerProtocolAssignment.json:
##
@@ -23,6 +23,7 @@
// that new versions cannot remove or reorder any of the existing fields.
//
// Version 2 is to support a new field "GenerationId" in
ConsumerProtocolSubscription.
+ // Version 3 adds rack id to ConsumerProtocolSubscription.
"validVersions": "0-2",
Review Comment:
We need to bump the version here.
##
clients/src/main/resources/common/message/ConsumerProtocolSubscription.json:
##
@@ -24,6 +24,7 @@
// Version 1 added the "OwnedPartitions" field to allow assigner know what
partitions each member owned
// Version 2 added a new field "GenerationId" to indicate if the member has
out-of-date ownedPartitions.
+ // Version 3 adds rack id to enable rack-aware assignment.
"validVersions": "0-2",
Review Comment:
We need to bump the version here as well.
##
clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java:
##
@@ -76,43 +88,145 @@ private Map>
consumersPerTopic(Map> topicToConsumers = new HashMap<>();
for (Map.Entry subscriptionEntry :
consumerMetadata.entrySet()) {
String consumerId = subscriptionEntry.getKey();
-MemberInfo memberInfo = new MemberInfo(consumerId,
subscriptionEntry.getValue().groupInstanceId());
-for (String topic : subscriptionEntry.getValue().topics()) {
+Subscription subscription = subscriptionEntry.getValue();
+MemberInfo memberInfo = new MemberInfo(consumerId,
subscription.groupInstanceId(), subscription.rackId());
+for (String topic : subscription.topics()) {
put(topicToConsumers, topic, memberInfo);
}
}
return topicToConsumers;
}
@Override
-public Map> assign(Map
partitionsPerTopic,
-Map
subscriptions) {
+public Map> assignPartitions(Map> partitionsPerTopic,
+ Map subscriptions) {
Map> consumersPerTopic =
consumersPerTopic(subscriptions);
+Map topicAssignmentStates =
partitionsPerTopic.entrySet().stream()
+.collect(Collectors.toMap(Map.Entry::getKey, e -> new
TopicAssignmentState(e.getValue(), consumersPerTopic.get(e.getKey();
Map> assignment = new HashMap<>();
for (String memberId : subscriptions.keySet())
assignment.put(memberId, new ArrayList<>());
-for (Map.Entry> topicEntry :
consumersPerTopic.entrySet()) {
+for (Map.Entry topicEntry :
topicAssignmentStates.entrySet()) {
Review Comment:
I have a general question here. One of the key promise of the range assignor
is that it co-partitions partitions. If you have two topics (foo and bar) with
three partitions each and two consumers, the first consumer will get foo-0,
bar-0, foo-1, bar-1 and the second one will get foo-2, bar-2. I am trying to
understand if we still maintain this property with the rack aware assignment.
Do we?
I suppose that we do when all the partitions have all the same racks. I am
not sure about the case where for instance foo would have no rack or only a
subset of the racks (e.g. brokers are offline).
##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java:
##
@@ -268,6 +304,47 @@ private Map>
constrainedAssign(Map
Collections.sort(unfilledMembersWithUnderMinQuotaPartitions);
Collections.sort(unfilledMembersWithExactlyMinQuotaPartitions);
+unassignedPartitions =
rackInfo.sortPartitionsByRackConsumers(unassignedPartitions);
+
+// Round-Robin filling within racks for remaining members up to the
expected numbers of maxQuota,
+// otherwise, to minQuota
+int nextUnfilledConsumerIndex = 0;
+Iterator unassignedIter =
unassignedPartitions.iterator();
+while (!rackInfo.consumerRacks.isEmpty() && unassignedIter.hasNext()) {
+TopicPartition unassignedPartition = unassignedIter.next();
+String consumer = null;
+int nextIndex = rackInfo.nextRackConsumer(unassignedPartition,
unfilledMembersWithUnderMinQuotaPartitions, nextUnfilledConsumerIndex);
+if (nextIndex >= 0) {
+consumer =
unfilledMembersWithUnderMinQuotaPartitions.get(nextIndex);
+int assignmentCount = assignment.get(consumer).size() + 1;
+if (assignmentCount >= minQuota) {
+
unfilledMembersWithUnderMinQuotaPartitions.remove(consumer);
+if (assignmentCount < maxQuota)
+
unfilledMembersWithExactlyMinQuotaPartitions.add(consumer);
+} else
+nextIndex++;
Review Comm