[GitHub] [kafka] dajac commented on a diff in pull request #12914: KAFKA-14352: Rack-aware consumer partition assignment (KIP-881)

2022-12-05 Thread GitBox


dajac commented on code in PR #12914:
URL: https://github.com/apache/kafka/pull/12914#discussion_r1039767650


##
clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java:
##
@@ -76,43 +88,145 @@ private Map> 
consumersPerTopic(Map> topicToConsumers = new HashMap<>();
 for (Map.Entry subscriptionEntry : 
consumerMetadata.entrySet()) {
 String consumerId = subscriptionEntry.getKey();
-MemberInfo memberInfo = new MemberInfo(consumerId, 
subscriptionEntry.getValue().groupInstanceId());
-for (String topic : subscriptionEntry.getValue().topics()) {
+Subscription subscription = subscriptionEntry.getValue();
+MemberInfo memberInfo = new MemberInfo(consumerId, 
subscription.groupInstanceId(), subscription.rackId());
+for (String topic : subscription.topics()) {
 put(topicToConsumers, topic, memberInfo);
 }
 }
 return topicToConsumers;
 }
 
 @Override
-public Map> assign(Map 
partitionsPerTopic,
-Map 
subscriptions) {
+public Map> assignPartitions(Map> partitionsPerTopic,
+  Map subscriptions) {
 Map> consumersPerTopic = 
consumersPerTopic(subscriptions);
+Map topicAssignmentStates = 
partitionsPerTopic.entrySet().stream()
+.collect(Collectors.toMap(Map.Entry::getKey, e -> new 
TopicAssignmentState(e.getValue(), consumersPerTopic.get(e.getKey();
 
 Map> assignment = new HashMap<>();
 for (String memberId : subscriptions.keySet())
 assignment.put(memberId, new ArrayList<>());
 
-for (Map.Entry> topicEntry : 
consumersPerTopic.entrySet()) {
+for (Map.Entry topicEntry : 
topicAssignmentStates.entrySet()) {

Review Comment:
   Yeah, this is what I thought. I think that we should prioritize the order 
over the locality as co-partitioning is the fundamental contract of the range 
assignor.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12914: KAFKA-14352: Rack-aware consumer partition assignment (KIP-881)

2022-12-02 Thread GitBox


dajac commented on code in PR #12914:
URL: https://github.com/apache/kafka/pull/12914#discussion_r1037881208


##
clients/src/main/resources/common/message/ConsumerProtocolAssignment.json:
##
@@ -23,6 +23,7 @@
   // that new versions cannot remove or reorder any of the existing fields.
   //
   // Version 2 is to support a new field "GenerationId" in 
ConsumerProtocolSubscription.
+  // Version 3 adds rack id to ConsumerProtocolSubscription.
   "validVersions": "0-2",

Review Comment:
   We need to bump the version here.



##
clients/src/main/resources/common/message/ConsumerProtocolSubscription.json:
##
@@ -24,6 +24,7 @@
 
   // Version 1 added the "OwnedPartitions" field to allow assigner know what 
partitions each member owned
   // Version 2 added a new field "GenerationId" to indicate if the member has 
out-of-date ownedPartitions.
+  // Version 3 adds rack id to enable rack-aware assignment.
   "validVersions": "0-2",

Review Comment:
   We need to bump the version here as well.



##
clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java:
##
@@ -76,43 +88,145 @@ private Map> 
consumersPerTopic(Map> topicToConsumers = new HashMap<>();
 for (Map.Entry subscriptionEntry : 
consumerMetadata.entrySet()) {
 String consumerId = subscriptionEntry.getKey();
-MemberInfo memberInfo = new MemberInfo(consumerId, 
subscriptionEntry.getValue().groupInstanceId());
-for (String topic : subscriptionEntry.getValue().topics()) {
+Subscription subscription = subscriptionEntry.getValue();
+MemberInfo memberInfo = new MemberInfo(consumerId, 
subscription.groupInstanceId(), subscription.rackId());
+for (String topic : subscription.topics()) {
 put(topicToConsumers, topic, memberInfo);
 }
 }
 return topicToConsumers;
 }
 
 @Override
-public Map> assign(Map 
partitionsPerTopic,
-Map 
subscriptions) {
+public Map> assignPartitions(Map> partitionsPerTopic,
+  Map subscriptions) {
 Map> consumersPerTopic = 
consumersPerTopic(subscriptions);
+Map topicAssignmentStates = 
partitionsPerTopic.entrySet().stream()
+.collect(Collectors.toMap(Map.Entry::getKey, e -> new 
TopicAssignmentState(e.getValue(), consumersPerTopic.get(e.getKey();
 
 Map> assignment = new HashMap<>();
 for (String memberId : subscriptions.keySet())
 assignment.put(memberId, new ArrayList<>());
 
-for (Map.Entry> topicEntry : 
consumersPerTopic.entrySet()) {
+for (Map.Entry topicEntry : 
topicAssignmentStates.entrySet()) {

Review Comment:
   I have a general question here. One of the key promise of the range assignor 
is that it co-partitions partitions. If you have two topics (foo and bar) with 
three partitions each and two consumers, the first consumer will get foo-0, 
bar-0, foo-1, bar-1 and the second one will get foo-2, bar-2. I am trying to 
understand if we still maintain this property with the rack aware assignment. 
Do we?
   
   I suppose that we do when all the partitions have all the same racks. I am 
not sure about the case where for instance foo would have no rack or only a 
subset of the racks (e.g. brokers are offline).



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java:
##
@@ -268,6 +304,47 @@ private Map> 
constrainedAssign(Map
 
 Collections.sort(unfilledMembersWithUnderMinQuotaPartitions);
 Collections.sort(unfilledMembersWithExactlyMinQuotaPartitions);
+unassignedPartitions = 
rackInfo.sortPartitionsByRackConsumers(unassignedPartitions);
+
+// Round-Robin filling within racks for remaining members up to the 
expected numbers of maxQuota,
+// otherwise, to minQuota
+int nextUnfilledConsumerIndex = 0;
+Iterator unassignedIter = 
unassignedPartitions.iterator();
+while (!rackInfo.consumerRacks.isEmpty() && unassignedIter.hasNext()) {
+TopicPartition unassignedPartition = unassignedIter.next();
+String consumer = null;
+int nextIndex = rackInfo.nextRackConsumer(unassignedPartition, 
unfilledMembersWithUnderMinQuotaPartitions, nextUnfilledConsumerIndex);
+if (nextIndex >= 0) {
+consumer = 
unfilledMembersWithUnderMinQuotaPartitions.get(nextIndex);
+int assignmentCount = assignment.get(consumer).size() + 1;
+if (assignmentCount >= minQuota) {
+
unfilledMembersWithUnderMinQuotaPartitions.remove(consumer);
+if (assignmentCount < maxQuota)
+
unfilledMembersWithExactlyMinQuotaPartitions.add(consumer);
+} else
+nextIndex++;

Review Comm