[GitHub] [kafka] rajinisivaram commented on a diff in pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)

2023-03-01 Thread via GitHub


rajinisivaram commented on code in PR #12990:
URL: https://github.com/apache/kafka/pull/12990#discussion_r1121733442


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -1951,20 +1954,47 @@ class PlaintextConsumerTest extends BaseConsumerTest {
   }
 
   @Test
-  def testConsumerRackIdPropagatedToPartitionAssignor(): Unit = {
-consumerConfig.setProperty(ConsumerConfig.CLIENT_RACK_CONFIG, "rack-a")
-
consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
classOf[RackAwareAssignor].getName)
-val consumer = createConsumer()
-consumer.subscribe(Set(topic).asJava)
-awaitAssignment(consumer, Set(tp, tp2))
-  }
-}
+  def testRackAwareRangeAssignor(): Unit = {

Review Comment:
   Ah, I missed that one, I think it is one of the new tests that was added 
when fixing FFF issues. I moved this test to that class. It uses 2 brokers 
instead of 3, but that seems ok for this test. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on a diff in pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)

2023-03-01 Thread via GitHub


rajinisivaram commented on code in PR #12990:
URL: https://github.com/apache/kafka/pull/12990#discussion_r1121729089


##
clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java:
##
@@ -74,45 +105,194 @@ public String name() {
 
 private Map> consumersPerTopic(Map consumerMetadata) {
 Map> topicToConsumers = new HashMap<>();
-for (Map.Entry subscriptionEntry : 
consumerMetadata.entrySet()) {
-String consumerId = subscriptionEntry.getKey();
-MemberInfo memberInfo = new MemberInfo(consumerId, 
subscriptionEntry.getValue().groupInstanceId());
-for (String topic : subscriptionEntry.getValue().topics()) {
-put(topicToConsumers, topic, memberInfo);
-}
-}
+consumerMetadata.forEach((consumerId, subscription) -> {
+MemberInfo memberInfo = new MemberInfo(consumerId, 
subscription.groupInstanceId(), subscription.rackId());
+subscription.topics().forEach(topic -> put(topicToConsumers, 
topic, memberInfo));
+});
 return topicToConsumers;
 }
 
+/**
+ * Performs range assignment of the specified partitions for the consumers 
with the provided subscriptions.
+ * If rack-awareness is enabled for one or more consumers, we perform 
rack-aware assignment first to assign
+ * the subset of partitions that can be aligned on racks, while retaining 
the same co-partitioning and
+ * per-topic balancing guarantees as non-rack-aware range assignment. The 
remaining partitions are assigned
+ * using standard non-rack-aware range assignment logic, which may result 
in mis-aligned racks.
+ */
 @Override
-public Map> assign(Map 
partitionsPerTopic,
-Map 
subscriptions) {
+public Map> assignPartitions(Map> partitionsPerTopic,
+  Map subscriptions) {
 Map> consumersPerTopic = 
consumersPerTopic(subscriptions);
+Map consumerRacks = consumerRacks(subscriptions);
+List topicAssignmentStates = 
partitionsPerTopic.entrySet().stream()
+.filter(e -> !e.getValue().isEmpty())
+.map(e -> new TopicAssignmentState(e.getKey(), e.getValue(), 
consumersPerTopic.get(e.getKey()), consumerRacks))
+.collect(Collectors.toList());
 
 Map> assignment = new HashMap<>();
-for (String memberId : subscriptions.keySet())
-assignment.put(memberId, new ArrayList<>());
+subscriptions.keySet().forEach(memberId -> assignment.put(memberId, 
new ArrayList<>()));
+
+boolean useRackAware = topicAssignmentStates.stream().anyMatch(t -> 
t.needsRackAwareAssignment);
+if (useRackAware)
+assignWithRackMatching(topicAssignmentStates, assignment);
+
+topicAssignmentStates.forEach(t -> assignRanges(t, (c, tp) -> true, 
assignment));
+
+if (useRackAware)
+assignment.values().forEach(list -> 
list.sort(PARTITION_COMPARATOR));
+return assignment;
+}
+
+// This method is not used, but retained for compatibility with any custom 
assignors that extend this class.
+@Override
+public Map> assign(Map 
partitionsPerTopic,
+Map 
subscriptions) {
+return 
assignPartitions(partitionInfosWithoutRacks(partitionsPerTopic), subscriptions);
+}
 
-for (Map.Entry> topicEntry : 
consumersPerTopic.entrySet()) {
-String topic = topicEntry.getKey();
-List consumersForTopic = topicEntry.getValue();
+private void assignRanges(TopicAssignmentState assignmentState,
+  BiFunction 
mayAssign,
+  Map> assignment) {
+for (String consumer : assignmentState.consumers.keySet()) {
+if (assignmentState.unassignedPartitions.isEmpty())
+break;
+List assignablePartitions = 
assignmentState.unassignedPartitions.stream()
+.filter(tp -> mayAssign.apply(consumer, tp))
+.collect(Collectors.toList());

Review Comment:
   Good idea, updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on a diff in pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)

2023-03-01 Thread via GitHub


rajinisivaram commented on code in PR #12990:
URL: https://github.com/apache/kafka/pull/12990#discussion_r1121728688


##
clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java:
##
@@ -74,45 +105,194 @@ public String name() {
 
 private Map> consumersPerTopic(Map consumerMetadata) {
 Map> topicToConsumers = new HashMap<>();
-for (Map.Entry subscriptionEntry : 
consumerMetadata.entrySet()) {
-String consumerId = subscriptionEntry.getKey();
-MemberInfo memberInfo = new MemberInfo(consumerId, 
subscriptionEntry.getValue().groupInstanceId());
-for (String topic : subscriptionEntry.getValue().topics()) {
-put(topicToConsumers, topic, memberInfo);
-}
-}
+consumerMetadata.forEach((consumerId, subscription) -> {
+MemberInfo memberInfo = new MemberInfo(consumerId, 
subscription.groupInstanceId(), subscription.rackId());
+subscription.topics().forEach(topic -> put(topicToConsumers, 
topic, memberInfo));
+});
 return topicToConsumers;
 }
 
+/**
+ * Performs range assignment of the specified partitions for the consumers 
with the provided subscriptions.
+ * If rack-awareness is enabled for one or more consumers, we perform 
rack-aware assignment first to assign
+ * the subset of partitions that can be aligned on racks, while retaining 
the same co-partitioning and
+ * per-topic balancing guarantees as non-rack-aware range assignment. The 
remaining partitions are assigned
+ * using standard non-rack-aware range assignment logic, which may result 
in mis-aligned racks.
+ */
 @Override
-public Map> assign(Map 
partitionsPerTopic,
-Map 
subscriptions) {
+public Map> assignPartitions(Map> partitionsPerTopic,
+  Map subscriptions) {
 Map> consumersPerTopic = 
consumersPerTopic(subscriptions);
+Map consumerRacks = consumerRacks(subscriptions);
+List topicAssignmentStates = 
partitionsPerTopic.entrySet().stream()
+.filter(e -> !e.getValue().isEmpty())
+.map(e -> new TopicAssignmentState(e.getKey(), e.getValue(), 
consumersPerTopic.get(e.getKey()), consumerRacks))
+.collect(Collectors.toList());
 
 Map> assignment = new HashMap<>();
-for (String memberId : subscriptions.keySet())
-assignment.put(memberId, new ArrayList<>());
+subscriptions.keySet().forEach(memberId -> assignment.put(memberId, 
new ArrayList<>()));
+
+boolean useRackAware = topicAssignmentStates.stream().anyMatch(t -> 
t.needsRackAwareAssignment);
+if (useRackAware)
+assignWithRackMatching(topicAssignmentStates, assignment);
+
+topicAssignmentStates.forEach(t -> assignRanges(t, (c, tp) -> true, 
assignment));
+
+if (useRackAware)
+assignment.values().forEach(list -> 
list.sort(PARTITION_COMPARATOR));
+return assignment;
+}
+
+// This method is not used, but retained for compatibility with any custom 
assignors that extend this class.
+@Override
+public Map> assign(Map 
partitionsPerTopic,
+Map 
subscriptions) {
+return 
assignPartitions(partitionInfosWithoutRacks(partitionsPerTopic), subscriptions);
+}
 
-for (Map.Entry> topicEntry : 
consumersPerTopic.entrySet()) {
-String topic = topicEntry.getKey();
-List consumersForTopic = topicEntry.getValue();
+private void assignRanges(TopicAssignmentState assignmentState,
+  BiFunction 
mayAssign,
+  Map> assignment) {
+for (String consumer : assignmentState.consumers.keySet()) {
+if (assignmentState.unassignedPartitions.isEmpty())
+break;
+List assignablePartitions = 
assignmentState.unassignedPartitions.stream()
+.filter(tp -> mayAssign.apply(consumer, tp))
+.collect(Collectors.toList());
 
-Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
-if (numPartitionsForTopic == null)
+int maxAssignable = 
Math.min(assignmentState.maxAssignable(consumer), assignablePartitions.size());
+if (maxAssignable <= 0)
 continue;
 
-Collections.sort(consumersForTopic);
+assign(consumer, assignablePartitions.subList(0, maxAssignable), 
assignmentState, assignment);
+}
+}
 
-int numPartitionsPerConsumer = numPartitionsForTopic / 
consumersForTopic.size();
-int consumersWithExtraPartition = numPartitionsForTopic % 
consumersForTopic.size();
+private void assignWithRackMatching(Collection 
assignmentStates,
+

[GitHub] [kafka] rajinisivaram commented on a diff in pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)

2023-02-23 Thread via GitHub


rajinisivaram commented on code in PR #12990:
URL: https://github.com/apache/kafka/pull/12990#discussion_r1115773500


##
clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java:
##
@@ -74,45 +105,194 @@ public String name() {
 
 private Map> consumersPerTopic(Map consumerMetadata) {
 Map> topicToConsumers = new HashMap<>();
-for (Map.Entry subscriptionEntry : 
consumerMetadata.entrySet()) {
-String consumerId = subscriptionEntry.getKey();
-MemberInfo memberInfo = new MemberInfo(consumerId, 
subscriptionEntry.getValue().groupInstanceId());
-for (String topic : subscriptionEntry.getValue().topics()) {
-put(topicToConsumers, topic, memberInfo);
-}
-}
+consumerMetadata.forEach((consumerId, subscription) -> {
+MemberInfo memberInfo = new MemberInfo(consumerId, 
subscription.groupInstanceId(), subscription.rackId());
+subscription.topics().forEach(topic -> put(topicToConsumers, 
topic, memberInfo));
+});
 return topicToConsumers;
 }
 
+/**
+ * Performs range assignment of the specified partitions for the consumers 
with the provided subscriptions.
+ * If rack-awareness is enabled for one or more consumers, we perform 
rack-aware assignment first to assign
+ * the subset of partitions that can be aligned on racks, while retaining 
the same co-partitioning and
+ * per-topic balancing guarantees as non-rack-aware range assignment. The 
remaining partitions are assigned
+ * using standard non-rack-aware range assignment logic, which may result 
in mis-aligned racks.
+ */
 @Override
-public Map> assign(Map 
partitionsPerTopic,
-Map 
subscriptions) {
+public Map> assignPartitions(Map> partitionsPerTopic,
+  Map subscriptions) {
 Map> consumersPerTopic = 
consumersPerTopic(subscriptions);
+Map consumerRacks = consumerRacks(subscriptions);
+List topicAssignmentStates = 
partitionsPerTopic.entrySet().stream()
+.filter(e -> !e.getValue().isEmpty())
+.map(e -> new TopicAssignmentState(e.getKey(), e.getValue(), 
consumersPerTopic.get(e.getKey()), consumerRacks))
+.collect(Collectors.toList());
 
 Map> assignment = new HashMap<>();
-for (String memberId : subscriptions.keySet())
-assignment.put(memberId, new ArrayList<>());
+subscriptions.keySet().forEach(memberId -> assignment.put(memberId, 
new ArrayList<>()));
+
+boolean useRackAware = topicAssignmentStates.stream().anyMatch(t -> 
t.needsRackAwareAssignment);
+if (useRackAware)
+assignWithRackMatching(topicAssignmentStates, assignment);
+
+topicAssignmentStates.forEach(t -> assignRanges(t, (c, tp) -> true, 
assignment));
+
+if (useRackAware)
+assignment.values().forEach(list -> 
list.sort(PARTITION_COMPARATOR));
+return assignment;
+}
+
+// This method is not used, but retained for compatibility with any custom 
assignors that extend this class.
+@Override
+public Map> assign(Map 
partitionsPerTopic,
+Map 
subscriptions) {
+return 
assignPartitions(partitionInfosWithoutRacks(partitionsPerTopic), subscriptions);
+}
 
-for (Map.Entry> topicEntry : 
consumersPerTopic.entrySet()) {
-String topic = topicEntry.getKey();
-List consumersForTopic = topicEntry.getValue();
+private void assignRanges(TopicAssignmentState assignmentState,
+  BiFunction 
mayAssign,
+  Map> assignment) {
+for (String consumer : assignmentState.consumers.keySet()) {
+if (assignmentState.unassignedPartitions.isEmpty())
+break;
+List assignablePartitions = 
assignmentState.unassignedPartitions.stream()
+.filter(tp -> mayAssign.apply(consumer, tp))
+.collect(Collectors.toList());
 
-Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
-if (numPartitionsForTopic == null)
+int maxAssignable = 
Math.min(assignmentState.maxAssignable(consumer), assignablePartitions.size());
+if (maxAssignable <= 0)
 continue;
 
-Collections.sort(consumersForTopic);

Review Comment:
   @dajac Thanks for the review. This `sort` is still done, I just moved it to 
TopicAssignmentState where the members are processed. All the tests in 
RangeAssignorTest are also run in rack-aware mode where all partitions have all 
racks to verify that the new algorithm gives the same result as the old 
algorithm. 
RangeAssignorTest.testStaticMemberRangeAssignmentPersistentAfterMemberIdChanges 
tests the scenario 

[GitHub] [kafka] rajinisivaram commented on a diff in pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)

2023-02-20 Thread via GitHub


rajinisivaram commented on code in PR #12990:
URL: https://github.com/apache/kafka/pull/12990#discussion_r1112029042


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -1951,20 +1954,47 @@ class PlaintextConsumerTest extends BaseConsumerTest {
   }
 
   @Test
-  def testConsumerRackIdPropagatedToPartitionAssignor(): Unit = {
-consumerConfig.setProperty(ConsumerConfig.CLIENT_RACK_CONFIG, "rack-a")
-
consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
classOf[RackAwareAssignor].getName)
-val consumer = createConsumer()
-consumer.subscribe(Set(topic).asJava)
-awaitAssignment(consumer, Set(tp, tp2))
-  }
-}
+  def testRackAwareRangeAssignor(): Unit = {

Review Comment:
   I wanted to put this test into a class that had FFF enabled, but I couldn't 
find any integration tests for FFF. So included here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on a diff in pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)

2023-02-20 Thread via GitHub


rajinisivaram commented on code in PR #12990:
URL: https://github.com/apache/kafka/pull/12990#discussion_r1112027236


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -1951,20 +1954,47 @@ class PlaintextConsumerTest extends BaseConsumerTest {
   }
 
   @Test
-  def testConsumerRackIdPropagatedToPartitionAssignor(): Unit = {
-consumerConfig.setProperty(ConsumerConfig.CLIENT_RACK_CONFIG, "rack-a")
-
consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
classOf[RackAwareAssignor].getName)
-val consumer = createConsumer()
-consumer.subscribe(Set(topic).asJava)
-awaitAssignment(consumer, Set(tp, tp2))
-  }
-}
+  def testRackAwareRangeAssignor(): Unit = {
+val partitionList = servers.indices.toList
+
+val topicWithAllPartitionsOnAllRacks = "topicWithAllPartitionsOnAllRacks"
+createTopic(topicWithAllPartitionsOnAllRacks, servers.size, servers.size)
+
+// Racks are in order of broker ids, assign leaders in reverse order
+val topicWithSingleRackPartitions = "topicWithSingleRackPartitions"
+createTopicWithAssignment(topicWithSingleRackPartitions, 
partitionList.map(i => (i, Seq(servers.size - i - 1))).toMap)
+
+// Create consumers with instance ids in ascending order, with racks in 
the same order.
+
consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
classOf[RangeAssignor].getName)
+val consumers = servers.map { server =>
+  consumerConfig.setProperty(ConsumerConfig.CLIENT_RACK_CONFIG, 
server.config.rack.orNull)
+  consumerConfig.setProperty(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, 
s"instance-${server.config.brokerId}")
+  createConsumer()
+}
+
+val executor = Executors.newFixedThreadPool(consumers.size)
+def waitForAssignments(assignments: List[Set[TopicPartition]]): Unit = {
+  val futures = consumers.zipWithIndex.map { case (consumer, i) =>
+executor.submit(() => awaitAssignment(consumer, assignments(i)), 0)
+  }
+  futures.foreach(future => assertEquals(0, future.get(20, 
TimeUnit.SECONDS)))
+}
 
-class RackAwareAssignor extends RoundRobinAssignor {
-  override def assign(partitionsPerTopic: util.Map[String, Integer], 
subscriptions: util.Map[String, ConsumerPartitionAssignor.Subscription]): 
util.Map[String, util.List[TopicPartition]] = {
-assertEquals(1, subscriptions.size())
-assertEquals(Optional.of("rack-a"), 
subscriptions.values.asScala.head.rackId)
-super.assign(partitionsPerTopic, subscriptions)
+try {
+  // Rack-based assignment results in partitions assigned in reverse order 
since partition racks are in the reverse order.
+  
consumers.foreach(_.subscribe(Collections.singleton(topicWithSingleRackPartitions)))
+  waitForAssignments(partitionList.reverse.map(p => Set(new 
TopicPartition(topicWithSingleRackPartitions, p
+
+  // Non-rack-aware assignment results in ordered partitions.
+  
consumers.foreach(_.subscribe(Collections.singleton(topicWithAllPartitionsOnAllRacks)))
+  waitForAssignments(partitionList.map(p => Set(new 
TopicPartition(topicWithAllPartitionsOnAllRacks, p
+
+  // Rack-aware assignment with co-partitioning results in reverse 
assignment for both topics.
+  consumers.foreach(_.subscribe(Set(topicWithSingleRackPartitions, 
topicWithAllPartitionsOnAllRacks).asJava))
+  waitForAssignments(partitionList.reverse.map(p => Set(new 
TopicPartition(topicWithAllPartitionsOnAllRacks, p), new 
TopicPartition(topicWithSingleRackPartitions, p

Review Comment:
   No, we don't do any leader-specific assignment, we are assuming all replicas 
are equal.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on a diff in pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)

2023-02-20 Thread via GitHub


rajinisivaram commented on code in PR #12990:
URL: https://github.com/apache/kafka/pull/12990#discussion_r1112026015


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -29,24 +35,21 @@ import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.test.{MockConsumerInterceptor, MockProducerInterceptor}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Test
-
-import scala.jdk.CollectionConverters._
-import scala.collection.mutable.Buffer
-import kafka.server.QuotaType
-import kafka.server.KafkaServer
-import org.apache.kafka.clients.admin.NewPartitions
-import org.apache.kafka.clients.admin.NewTopic
-import org.apache.kafka.common.config.TopicConfig
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
 
-import java.util.concurrent.TimeUnit
-import java.util.concurrent.locks.ReentrantLock
 import scala.collection.mutable
+import scala.collection.mutable.Buffer
+import scala.jdk.CollectionConverters._
 
 /* We have some tests in this class instead of `BaseConsumerTest` in order to 
keep the build time under control. */
 class PlaintextConsumerTest extends BaseConsumerTest {
 
+  override def modifyConfigs(props: collection.Seq[Properties]): Unit = {
+super.modifyConfigs(props)
+props.zipWithIndex.foreach{ case (p, i) => 
p.setProperty(KafkaConfig.RackProp, i.toString) }

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on a diff in pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)

2023-02-20 Thread via GitHub


rajinisivaram commented on code in PR #12990:
URL: https://github.com/apache/kafka/pull/12990#discussion_r1112025710


##
clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java:
##
@@ -76,43 +99,185 @@ private Map> 
consumersPerTopic(Map> topicToConsumers = new HashMap<>();
 for (Map.Entry subscriptionEntry : 
consumerMetadata.entrySet()) {
 String consumerId = subscriptionEntry.getKey();
-MemberInfo memberInfo = new MemberInfo(consumerId, 
subscriptionEntry.getValue().groupInstanceId());
-for (String topic : subscriptionEntry.getValue().topics()) {
+Subscription subscription = subscriptionEntry.getValue();
+MemberInfo memberInfo = new MemberInfo(consumerId, 
subscription.groupInstanceId(), subscription.rackId());
+for (String topic : subscription.topics()) {
 put(topicToConsumers, topic, memberInfo);
 }
 }
 return topicToConsumers;
 }
 
 @Override
-public Map> assign(Map 
partitionsPerTopic,
-Map 
subscriptions) {
+public Map> assignPartitions(Map> partitionsPerTopic,
+  Map subscriptions) {
 Map> consumersPerTopic = 
consumersPerTopic(subscriptions);
+List topicAssignmentStates = 
partitionsPerTopic.entrySet().stream()
+.filter(e -> !e.getValue().isEmpty())
+.map(e -> new TopicAssignmentState(e.getKey(), e.getValue(), 
consumersPerTopic.get(e.getKey(
+.collect(Collectors.toList());
 
 Map> assignment = new HashMap<>();
 for (String memberId : subscriptions.keySet())
 assignment.put(memberId, new ArrayList<>());
 
-for (Map.Entry> topicEntry : 
consumersPerTopic.entrySet()) {
-String topic = topicEntry.getKey();
-List consumersForTopic = topicEntry.getValue();
+boolean useRackAware = topicAssignmentStates.stream().anyMatch(t -> 
t.needsRackAwareAssignment);
+if (useRackAware)
+assignWithRackMatching(topicAssignmentStates, assignment);
+
+topicAssignmentStates.forEach(t -> assignRanges(t, (c, tp) -> true, 
assignment));
+
+if (useRackAware)
+assignment.values().forEach(list -> 
list.sort(PARTITION_COMPARATOR));
+return assignment;
+}
+
+// This method is not used, but retained for compatibility with any custom 
assignors that extend this class.
+@Override
+public Map> assign(Map 
partitionsPerTopic,
+Map 
subscriptions) {
+return 
assignPartitions(partitionInfosWithoutRacks(partitionsPerTopic), subscriptions);
+}
+
+private void assignRanges(TopicAssignmentState assignmentState,
+  BiFunction 
mayAssign,
+  Map> assignment) {
+for (String consumer : assignmentState.consumers) {
+if (assignmentState.unassignedPartitions.isEmpty())
+break;
+List assignablePartitions = 
assignmentState.unassignedPartitions.stream()
+.filter(tp -> mayAssign.apply(consumer, tp))
+.collect(Collectors.toList());
 
-Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
-if (numPartitionsForTopic == null)
+int maxAssignable = 
Math.min(assignmentState.maxAssignable(consumer), assignablePartitions.size());
+if (maxAssignable <= 0)
 continue;
 
-Collections.sort(consumersForTopic);
+assign(consumer, assignablePartitions.subList(0, maxAssignable), 
assignmentState, assignment);
+}
+}
+
+private void assignWithRackMatching(Collection 
assignmentStates,
+Map> 
assignment) {
 
-int numPartitionsPerConsumer = numPartitionsForTopic / 
consumersForTopic.size();
-int consumersWithExtraPartition = numPartitionsForTopic % 
consumersForTopic.size();
+assignmentStates.stream().collect(Collectors.groupingBy(t -> 
t.consumers)).forEach((consumers, states) -> {
+states.stream().collect(Collectors.groupingBy(t -> 
t.partitionRacks.size())).forEach((numPartitions, coPartitionedStates) -> {
+if (coPartitionedStates.size() > 1)
+assignCoPartitionedWithRackMatching(consumers, 
numPartitions, states, assignment);
+else {
+TopicAssignmentState state = coPartitionedStates.get(0);
+if (state.needsRackAwareAssignment)
+assignRanges(state, state::racksMatch, assignment);
+}
+});
+});
+}
+
+private void assignCoPartitionedWithRackMatching(List consumers,
+ int 

[GitHub] [kafka] rajinisivaram commented on a diff in pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)

2023-02-20 Thread via GitHub


rajinisivaram commented on code in PR #12990:
URL: https://github.com/apache/kafka/pull/12990#discussion_r1112016639


##
clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java:
##
@@ -76,43 +99,185 @@ private Map> 
consumersPerTopic(Map> topicToConsumers = new HashMap<>();
 for (Map.Entry subscriptionEntry : 
consumerMetadata.entrySet()) {
 String consumerId = subscriptionEntry.getKey();
-MemberInfo memberInfo = new MemberInfo(consumerId, 
subscriptionEntry.getValue().groupInstanceId());
-for (String topic : subscriptionEntry.getValue().topics()) {
+Subscription subscription = subscriptionEntry.getValue();
+MemberInfo memberInfo = new MemberInfo(consumerId, 
subscription.groupInstanceId(), subscription.rackId());
+for (String topic : subscription.topics()) {
 put(topicToConsumers, topic, memberInfo);
 }
 }
 return topicToConsumers;
 }
 
 @Override
-public Map> assign(Map 
partitionsPerTopic,
-Map 
subscriptions) {
+public Map> assignPartitions(Map> partitionsPerTopic,
+  Map subscriptions) {
 Map> consumersPerTopic = 
consumersPerTopic(subscriptions);
+List topicAssignmentStates = 
partitionsPerTopic.entrySet().stream()
+.filter(e -> !e.getValue().isEmpty())
+.map(e -> new TopicAssignmentState(e.getKey(), e.getValue(), 
consumersPerTopic.get(e.getKey(
+.collect(Collectors.toList());
 
 Map> assignment = new HashMap<>();
 for (String memberId : subscriptions.keySet())
 assignment.put(memberId, new ArrayList<>());
 
-for (Map.Entry> topicEntry : 
consumersPerTopic.entrySet()) {
-String topic = topicEntry.getKey();
-List consumersForTopic = topicEntry.getValue();
+boolean useRackAware = topicAssignmentStates.stream().anyMatch(t -> 
t.needsRackAwareAssignment);
+if (useRackAware)
+assignWithRackMatching(topicAssignmentStates, assignment);
+
+topicAssignmentStates.forEach(t -> assignRanges(t, (c, tp) -> true, 
assignment));
+
+if (useRackAware)
+assignment.values().forEach(list -> 
list.sort(PARTITION_COMPARATOR));
+return assignment;
+}
+
+// This method is not used, but retained for compatibility with any custom 
assignors that extend this class.
+@Override
+public Map> assign(Map 
partitionsPerTopic,
+Map 
subscriptions) {
+return 
assignPartitions(partitionInfosWithoutRacks(partitionsPerTopic), subscriptions);
+}
+
+private void assignRanges(TopicAssignmentState assignmentState,
+  BiFunction 
mayAssign,
+  Map> assignment) {
+for (String consumer : assignmentState.consumers) {
+if (assignmentState.unassignedPartitions.isEmpty())
+break;
+List assignablePartitions = 
assignmentState.unassignedPartitions.stream()
+.filter(tp -> mayAssign.apply(consumer, tp))
+.collect(Collectors.toList());
 
-Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
-if (numPartitionsForTopic == null)
+int maxAssignable = 
Math.min(assignmentState.maxAssignable(consumer), assignablePartitions.size());
+if (maxAssignable <= 0)
 continue;
 
-Collections.sort(consumersForTopic);
+assign(consumer, assignablePartitions.subList(0, maxAssignable), 
assignmentState, assignment);
+}
+}
+
+private void assignWithRackMatching(Collection 
assignmentStates,
+Map> 
assignment) {
 
-int numPartitionsPerConsumer = numPartitionsForTopic / 
consumersForTopic.size();
-int consumersWithExtraPartition = numPartitionsForTopic % 
consumersForTopic.size();
+assignmentStates.stream().collect(Collectors.groupingBy(t -> 
t.consumers)).forEach((consumers, states) -> {
+states.stream().collect(Collectors.groupingBy(t -> 
t.partitionRacks.size())).forEach((numPartitions, coPartitionedStates) -> {
+if (coPartitionedStates.size() > 1)
+assignCoPartitionedWithRackMatching(consumers, 
numPartitions, states, assignment);
+else {
+TopicAssignmentState state = coPartitionedStates.get(0);
+if (state.needsRackAwareAssignment)
+assignRanges(state, state::racksMatch, assignment);
+}
+});
+});
+}
+
+private void assignCoPartitionedWithRackMatching(List consumers,
+ int 

[GitHub] [kafka] rajinisivaram commented on a diff in pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)

2023-02-20 Thread via GitHub


rajinisivaram commented on code in PR #12990:
URL: https://github.com/apache/kafka/pull/12990#discussion_r1112015848


##
clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java:
##
@@ -76,43 +99,185 @@ private Map> 
consumersPerTopic(Map> topicToConsumers = new HashMap<>();
 for (Map.Entry subscriptionEntry : 
consumerMetadata.entrySet()) {
 String consumerId = subscriptionEntry.getKey();
-MemberInfo memberInfo = new MemberInfo(consumerId, 
subscriptionEntry.getValue().groupInstanceId());
-for (String topic : subscriptionEntry.getValue().topics()) {
+Subscription subscription = subscriptionEntry.getValue();
+MemberInfo memberInfo = new MemberInfo(consumerId, 
subscription.groupInstanceId(), subscription.rackId());
+for (String topic : subscription.topics()) {
 put(topicToConsumers, topic, memberInfo);
 }
 }
 return topicToConsumers;
 }
 
 @Override
-public Map> assign(Map 
partitionsPerTopic,
-Map 
subscriptions) {
+public Map> assignPartitions(Map> partitionsPerTopic,
+  Map subscriptions) {
 Map> consumersPerTopic = 
consumersPerTopic(subscriptions);
+List topicAssignmentStates = 
partitionsPerTopic.entrySet().stream()
+.filter(e -> !e.getValue().isEmpty())
+.map(e -> new TopicAssignmentState(e.getKey(), e.getValue(), 
consumersPerTopic.get(e.getKey(
+.collect(Collectors.toList());
 
 Map> assignment = new HashMap<>();
 for (String memberId : subscriptions.keySet())
 assignment.put(memberId, new ArrayList<>());
 
-for (Map.Entry> topicEntry : 
consumersPerTopic.entrySet()) {
-String topic = topicEntry.getKey();
-List consumersForTopic = topicEntry.getValue();
+boolean useRackAware = topicAssignmentStates.stream().anyMatch(t -> 
t.needsRackAwareAssignment);
+if (useRackAware)
+assignWithRackMatching(topicAssignmentStates, assignment);
+
+topicAssignmentStates.forEach(t -> assignRanges(t, (c, tp) -> true, 
assignment));
+
+if (useRackAware)
+assignment.values().forEach(list -> 
list.sort(PARTITION_COMPARATOR));
+return assignment;
+}
+
+// This method is not used, but retained for compatibility with any custom 
assignors that extend this class.
+@Override
+public Map> assign(Map 
partitionsPerTopic,
+Map 
subscriptions) {
+return 
assignPartitions(partitionInfosWithoutRacks(partitionsPerTopic), subscriptions);
+}
+
+private void assignRanges(TopicAssignmentState assignmentState,
+  BiFunction 
mayAssign,
+  Map> assignment) {
+for (String consumer : assignmentState.consumers) {
+if (assignmentState.unassignedPartitions.isEmpty())
+break;
+List assignablePartitions = 
assignmentState.unassignedPartitions.stream()
+.filter(tp -> mayAssign.apply(consumer, tp))
+.collect(Collectors.toList());
 
-Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
-if (numPartitionsForTopic == null)
+int maxAssignable = 
Math.min(assignmentState.maxAssignable(consumer), assignablePartitions.size());
+if (maxAssignable <= 0)
 continue;
 
-Collections.sort(consumersForTopic);
+assign(consumer, assignablePartitions.subList(0, maxAssignable), 
assignmentState, assignment);
+}
+}
+
+private void assignWithRackMatching(Collection 
assignmentStates,
+Map> 
assignment) {
 
-int numPartitionsPerConsumer = numPartitionsForTopic / 
consumersForTopic.size();
-int consumersWithExtraPartition = numPartitionsForTopic % 
consumersForTopic.size();
+assignmentStates.stream().collect(Collectors.groupingBy(t -> 
t.consumers)).forEach((consumers, states) -> {
+states.stream().collect(Collectors.groupingBy(t -> 
t.partitionRacks.size())).forEach((numPartitions, coPartitionedStates) -> {
+if (coPartitionedStates.size() > 1)
+assignCoPartitionedWithRackMatching(consumers, 
numPartitions, states, assignment);
+else {
+TopicAssignmentState state = coPartitionedStates.get(0);
+if (state.needsRackAwareAssignment)
+assignRanges(state, state::racksMatch, assignment);
+}
+});
+});
+}
+
+private void assignCoPartitionedWithRackMatching(List consumers,
+ int 

[GitHub] [kafka] rajinisivaram commented on a diff in pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)

2023-02-20 Thread via GitHub


rajinisivaram commented on code in PR #12990:
URL: https://github.com/apache/kafka/pull/12990#discussion_r1112014546


##
clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java:
##
@@ -76,43 +99,185 @@ private Map> 
consumersPerTopic(Map> topicToConsumers = new HashMap<>();
 for (Map.Entry subscriptionEntry : 
consumerMetadata.entrySet()) {
 String consumerId = subscriptionEntry.getKey();
-MemberInfo memberInfo = new MemberInfo(consumerId, 
subscriptionEntry.getValue().groupInstanceId());
-for (String topic : subscriptionEntry.getValue().topics()) {
+Subscription subscription = subscriptionEntry.getValue();
+MemberInfo memberInfo = new MemberInfo(consumerId, 
subscription.groupInstanceId(), subscription.rackId());
+for (String topic : subscription.topics()) {
 put(topicToConsumers, topic, memberInfo);
 }
 }
 return topicToConsumers;
 }
 
 @Override
-public Map> assign(Map 
partitionsPerTopic,
-Map 
subscriptions) {
+public Map> assignPartitions(Map> partitionsPerTopic,
+  Map subscriptions) {
 Map> consumersPerTopic = 
consumersPerTopic(subscriptions);
+List topicAssignmentStates = 
partitionsPerTopic.entrySet().stream()
+.filter(e -> !e.getValue().isEmpty())
+.map(e -> new TopicAssignmentState(e.getKey(), e.getValue(), 
consumersPerTopic.get(e.getKey(
+.collect(Collectors.toList());
 
 Map> assignment = new HashMap<>();
 for (String memberId : subscriptions.keySet())
 assignment.put(memberId, new ArrayList<>());
 
-for (Map.Entry> topicEntry : 
consumersPerTopic.entrySet()) {
-String topic = topicEntry.getKey();
-List consumersForTopic = topicEntry.getValue();
+boolean useRackAware = topicAssignmentStates.stream().anyMatch(t -> 
t.needsRackAwareAssignment);
+if (useRackAware)
+assignWithRackMatching(topicAssignmentStates, assignment);
+
+topicAssignmentStates.forEach(t -> assignRanges(t, (c, tp) -> true, 
assignment));
+
+if (useRackAware)
+assignment.values().forEach(list -> 
list.sort(PARTITION_COMPARATOR));
+return assignment;
+}
+
+// This method is not used, but retained for compatibility with any custom 
assignors that extend this class.
+@Override
+public Map> assign(Map 
partitionsPerTopic,
+Map 
subscriptions) {
+return 
assignPartitions(partitionInfosWithoutRacks(partitionsPerTopic), subscriptions);
+}
+
+private void assignRanges(TopicAssignmentState assignmentState,
+  BiFunction 
mayAssign,
+  Map> assignment) {
+for (String consumer : assignmentState.consumers) {
+if (assignmentState.unassignedPartitions.isEmpty())
+break;
+List assignablePartitions = 
assignmentState.unassignedPartitions.stream()
+.filter(tp -> mayAssign.apply(consumer, tp))
+.collect(Collectors.toList());
 
-Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
-if (numPartitionsForTopic == null)
+int maxAssignable = 
Math.min(assignmentState.maxAssignable(consumer), assignablePartitions.size());
+if (maxAssignable <= 0)
 continue;
 
-Collections.sort(consumersForTopic);
+assign(consumer, assignablePartitions.subList(0, maxAssignable), 
assignmentState, assignment);
+}
+}
+
+private void assignWithRackMatching(Collection 
assignmentStates,
+Map> 
assignment) {
 
-int numPartitionsPerConsumer = numPartitionsForTopic / 
consumersForTopic.size();
-int consumersWithExtraPartition = numPartitionsForTopic % 
consumersForTopic.size();
+assignmentStates.stream().collect(Collectors.groupingBy(t -> 
t.consumers)).forEach((consumers, states) -> {
+states.stream().collect(Collectors.groupingBy(t -> 
t.partitionRacks.size())).forEach((numPartitions, coPartitionedStates) -> {
+if (coPartitionedStates.size() > 1)
+assignCoPartitionedWithRackMatching(consumers, 
numPartitions, states, assignment);
+else {
+TopicAssignmentState state = coPartitionedStates.get(0);
+if (state.needsRackAwareAssignment)
+assignRanges(state, state::racksMatch, assignment);
+}
+});
+});
+}
+
+private void assignCoPartitionedWithRackMatching(List consumers,
+ int 

[GitHub] [kafka] rajinisivaram commented on a diff in pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)

2023-02-20 Thread via GitHub


rajinisivaram commented on code in PR #12990:
URL: https://github.com/apache/kafka/pull/12990#discussion_r1112013804


##
clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java:
##
@@ -76,43 +99,185 @@ private Map> 
consumersPerTopic(Map> topicToConsumers = new HashMap<>();
 for (Map.Entry subscriptionEntry : 
consumerMetadata.entrySet()) {
 String consumerId = subscriptionEntry.getKey();
-MemberInfo memberInfo = new MemberInfo(consumerId, 
subscriptionEntry.getValue().groupInstanceId());
-for (String topic : subscriptionEntry.getValue().topics()) {
+Subscription subscription = subscriptionEntry.getValue();
+MemberInfo memberInfo = new MemberInfo(consumerId, 
subscription.groupInstanceId(), subscription.rackId());
+for (String topic : subscription.topics()) {
 put(topicToConsumers, topic, memberInfo);
 }
 }
 return topicToConsumers;
 }
 
 @Override
-public Map> assign(Map 
partitionsPerTopic,
-Map 
subscriptions) {
+public Map> assignPartitions(Map> partitionsPerTopic,
+  Map subscriptions) {
 Map> consumersPerTopic = 
consumersPerTopic(subscriptions);
+List topicAssignmentStates = 
partitionsPerTopic.entrySet().stream()
+.filter(e -> !e.getValue().isEmpty())
+.map(e -> new TopicAssignmentState(e.getKey(), e.getValue(), 
consumersPerTopic.get(e.getKey(
+.collect(Collectors.toList());
 
 Map> assignment = new HashMap<>();
 for (String memberId : subscriptions.keySet())
 assignment.put(memberId, new ArrayList<>());
 
-for (Map.Entry> topicEntry : 
consumersPerTopic.entrySet()) {
-String topic = topicEntry.getKey();
-List consumersForTopic = topicEntry.getValue();
+boolean useRackAware = topicAssignmentStates.stream().anyMatch(t -> 
t.needsRackAwareAssignment);
+if (useRackAware)
+assignWithRackMatching(topicAssignmentStates, assignment);
+
+topicAssignmentStates.forEach(t -> assignRanges(t, (c, tp) -> true, 
assignment));
+
+if (useRackAware)
+assignment.values().forEach(list -> 
list.sort(PARTITION_COMPARATOR));
+return assignment;
+}
+
+// This method is not used, but retained for compatibility with any custom 
assignors that extend this class.
+@Override
+public Map> assign(Map 
partitionsPerTopic,
+Map 
subscriptions) {
+return 
assignPartitions(partitionInfosWithoutRacks(partitionsPerTopic), subscriptions);
+}
+
+private void assignRanges(TopicAssignmentState assignmentState,
+  BiFunction 
mayAssign,
+  Map> assignment) {
+for (String consumer : assignmentState.consumers) {
+if (assignmentState.unassignedPartitions.isEmpty())
+break;
+List assignablePartitions = 
assignmentState.unassignedPartitions.stream()
+.filter(tp -> mayAssign.apply(consumer, tp))
+.collect(Collectors.toList());
 
-Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
-if (numPartitionsForTopic == null)
+int maxAssignable = 
Math.min(assignmentState.maxAssignable(consumer), assignablePartitions.size());
+if (maxAssignable <= 0)
 continue;
 
-Collections.sort(consumersForTopic);
+assign(consumer, assignablePartitions.subList(0, maxAssignable), 
assignmentState, assignment);
+}
+}
+
+private void assignWithRackMatching(Collection 
assignmentStates,
+Map> 
assignment) {
 
-int numPartitionsPerConsumer = numPartitionsForTopic / 
consumersForTopic.size();
-int consumersWithExtraPartition = numPartitionsForTopic % 
consumersForTopic.size();
+assignmentStates.stream().collect(Collectors.groupingBy(t -> 
t.consumers)).forEach((consumers, states) -> {
+states.stream().collect(Collectors.groupingBy(t -> 
t.partitionRacks.size())).forEach((numPartitions, coPartitionedStates) -> {
+if (coPartitionedStates.size() > 1)
+assignCoPartitionedWithRackMatching(consumers, 
numPartitions, states, assignment);
+else {
+TopicAssignmentState state = coPartitionedStates.get(0);
+if (state.needsRackAwareAssignment)
+assignRanges(state, state::racksMatch, assignment);
+}
+});
+});
+}
+
+private void assignCoPartitionedWithRackMatching(List consumers,
+ int 

[GitHub] [kafka] rajinisivaram commented on a diff in pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)

2023-02-20 Thread via GitHub


rajinisivaram commented on code in PR #12990:
URL: https://github.com/apache/kafka/pull/12990#discussion_r1112011199


##
clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java:
##
@@ -63,9 +76,19 @@
  * I0: [t0p0, t0p1, t1p0, t1p1]
  * I1: [t0p2, t1p2]
  * 
+ * 
+ * Rack-aware assignment is used if both consumer and partition replica racks 
are available and
+ * some partitions have replicas only on a subset of racks. We attempt to 
match consumer racks with
+ * partition replica racks on a best-effort basis, prioritizing balanced 
assignment over rack alignment.
+ * Topics with equal partition count and same set of subscribers prioritize 
co-partitioning guarantee
+ * over rack alignment. In this case, aligning partition replicas of these 
topics on the same racks
+ * will improve locality for consumers. For example, if partitions 0 of all 
topics have a replica on
+ * rack 'a', partition 1 on rack 'b' etc., partition 0 of all topics can be 
assigned to a consumer
+ * on rack 'a', partition 1 to a consumer on rack 'b' and so on.

Review Comment:
   Yes, that is correct. 



##
clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java:
##
@@ -76,43 +99,185 @@ private Map> 
consumersPerTopic(Map> topicToConsumers = new HashMap<>();
 for (Map.Entry subscriptionEntry : 
consumerMetadata.entrySet()) {

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org