[GitHub] [kafka] rajinisivaram commented on a diff in pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)
rajinisivaram commented on code in PR #12990: URL: https://github.com/apache/kafka/pull/12990#discussion_r1121733442 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -1951,20 +1954,47 @@ class PlaintextConsumerTest extends BaseConsumerTest { } @Test - def testConsumerRackIdPropagatedToPartitionAssignor(): Unit = { -consumerConfig.setProperty(ConsumerConfig.CLIENT_RACK_CONFIG, "rack-a") - consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, classOf[RackAwareAssignor].getName) -val consumer = createConsumer() -consumer.subscribe(Set(topic).asJava) -awaitAssignment(consumer, Set(tp, tp2)) - } -} + def testRackAwareRangeAssignor(): Unit = { Review Comment: Ah, I missed that one, I think it is one of the new tests that was added when fixing FFF issues. I moved this test to that class. It uses 2 brokers instead of 3, but that seems ok for this test. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on a diff in pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)
rajinisivaram commented on code in PR #12990: URL: https://github.com/apache/kafka/pull/12990#discussion_r1121729089 ## clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java: ## @@ -74,45 +105,194 @@ public String name() { private Map> consumersPerTopic(Map consumerMetadata) { Map> topicToConsumers = new HashMap<>(); -for (Map.Entry subscriptionEntry : consumerMetadata.entrySet()) { -String consumerId = subscriptionEntry.getKey(); -MemberInfo memberInfo = new MemberInfo(consumerId, subscriptionEntry.getValue().groupInstanceId()); -for (String topic : subscriptionEntry.getValue().topics()) { -put(topicToConsumers, topic, memberInfo); -} -} +consumerMetadata.forEach((consumerId, subscription) -> { +MemberInfo memberInfo = new MemberInfo(consumerId, subscription.groupInstanceId(), subscription.rackId()); +subscription.topics().forEach(topic -> put(topicToConsumers, topic, memberInfo)); +}); return topicToConsumers; } +/** + * Performs range assignment of the specified partitions for the consumers with the provided subscriptions. + * If rack-awareness is enabled for one or more consumers, we perform rack-aware assignment first to assign + * the subset of partitions that can be aligned on racks, while retaining the same co-partitioning and + * per-topic balancing guarantees as non-rack-aware range assignment. The remaining partitions are assigned + * using standard non-rack-aware range assignment logic, which may result in mis-aligned racks. + */ @Override -public Map> assign(Map partitionsPerTopic, -Map subscriptions) { +public Map> assignPartitions(Map> partitionsPerTopic, + Map subscriptions) { Map> consumersPerTopic = consumersPerTopic(subscriptions); +Map consumerRacks = consumerRacks(subscriptions); +List topicAssignmentStates = partitionsPerTopic.entrySet().stream() +.filter(e -> !e.getValue().isEmpty()) +.map(e -> new TopicAssignmentState(e.getKey(), e.getValue(), consumersPerTopic.get(e.getKey()), consumerRacks)) +.collect(Collectors.toList()); Map> assignment = new HashMap<>(); -for (String memberId : subscriptions.keySet()) -assignment.put(memberId, new ArrayList<>()); +subscriptions.keySet().forEach(memberId -> assignment.put(memberId, new ArrayList<>())); + +boolean useRackAware = topicAssignmentStates.stream().anyMatch(t -> t.needsRackAwareAssignment); +if (useRackAware) +assignWithRackMatching(topicAssignmentStates, assignment); + +topicAssignmentStates.forEach(t -> assignRanges(t, (c, tp) -> true, assignment)); + +if (useRackAware) +assignment.values().forEach(list -> list.sort(PARTITION_COMPARATOR)); +return assignment; +} + +// This method is not used, but retained for compatibility with any custom assignors that extend this class. +@Override +public Map> assign(Map partitionsPerTopic, +Map subscriptions) { +return assignPartitions(partitionInfosWithoutRacks(partitionsPerTopic), subscriptions); +} -for (Map.Entry> topicEntry : consumersPerTopic.entrySet()) { -String topic = topicEntry.getKey(); -List consumersForTopic = topicEntry.getValue(); +private void assignRanges(TopicAssignmentState assignmentState, + BiFunction mayAssign, + Map> assignment) { +for (String consumer : assignmentState.consumers.keySet()) { +if (assignmentState.unassignedPartitions.isEmpty()) +break; +List assignablePartitions = assignmentState.unassignedPartitions.stream() +.filter(tp -> mayAssign.apply(consumer, tp)) +.collect(Collectors.toList()); Review Comment: Good idea, updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on a diff in pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)
rajinisivaram commented on code in PR #12990: URL: https://github.com/apache/kafka/pull/12990#discussion_r1121728688 ## clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java: ## @@ -74,45 +105,194 @@ public String name() { private Map> consumersPerTopic(Map consumerMetadata) { Map> topicToConsumers = new HashMap<>(); -for (Map.Entry subscriptionEntry : consumerMetadata.entrySet()) { -String consumerId = subscriptionEntry.getKey(); -MemberInfo memberInfo = new MemberInfo(consumerId, subscriptionEntry.getValue().groupInstanceId()); -for (String topic : subscriptionEntry.getValue().topics()) { -put(topicToConsumers, topic, memberInfo); -} -} +consumerMetadata.forEach((consumerId, subscription) -> { +MemberInfo memberInfo = new MemberInfo(consumerId, subscription.groupInstanceId(), subscription.rackId()); +subscription.topics().forEach(topic -> put(topicToConsumers, topic, memberInfo)); +}); return topicToConsumers; } +/** + * Performs range assignment of the specified partitions for the consumers with the provided subscriptions. + * If rack-awareness is enabled for one or more consumers, we perform rack-aware assignment first to assign + * the subset of partitions that can be aligned on racks, while retaining the same co-partitioning and + * per-topic balancing guarantees as non-rack-aware range assignment. The remaining partitions are assigned + * using standard non-rack-aware range assignment logic, which may result in mis-aligned racks. + */ @Override -public Map> assign(Map partitionsPerTopic, -Map subscriptions) { +public Map> assignPartitions(Map> partitionsPerTopic, + Map subscriptions) { Map> consumersPerTopic = consumersPerTopic(subscriptions); +Map consumerRacks = consumerRacks(subscriptions); +List topicAssignmentStates = partitionsPerTopic.entrySet().stream() +.filter(e -> !e.getValue().isEmpty()) +.map(e -> new TopicAssignmentState(e.getKey(), e.getValue(), consumersPerTopic.get(e.getKey()), consumerRacks)) +.collect(Collectors.toList()); Map> assignment = new HashMap<>(); -for (String memberId : subscriptions.keySet()) -assignment.put(memberId, new ArrayList<>()); +subscriptions.keySet().forEach(memberId -> assignment.put(memberId, new ArrayList<>())); + +boolean useRackAware = topicAssignmentStates.stream().anyMatch(t -> t.needsRackAwareAssignment); +if (useRackAware) +assignWithRackMatching(topicAssignmentStates, assignment); + +topicAssignmentStates.forEach(t -> assignRanges(t, (c, tp) -> true, assignment)); + +if (useRackAware) +assignment.values().forEach(list -> list.sort(PARTITION_COMPARATOR)); +return assignment; +} + +// This method is not used, but retained for compatibility with any custom assignors that extend this class. +@Override +public Map> assign(Map partitionsPerTopic, +Map subscriptions) { +return assignPartitions(partitionInfosWithoutRacks(partitionsPerTopic), subscriptions); +} -for (Map.Entry> topicEntry : consumersPerTopic.entrySet()) { -String topic = topicEntry.getKey(); -List consumersForTopic = topicEntry.getValue(); +private void assignRanges(TopicAssignmentState assignmentState, + BiFunction mayAssign, + Map> assignment) { +for (String consumer : assignmentState.consumers.keySet()) { +if (assignmentState.unassignedPartitions.isEmpty()) +break; +List assignablePartitions = assignmentState.unassignedPartitions.stream() +.filter(tp -> mayAssign.apply(consumer, tp)) +.collect(Collectors.toList()); -Integer numPartitionsForTopic = partitionsPerTopic.get(topic); -if (numPartitionsForTopic == null) +int maxAssignable = Math.min(assignmentState.maxAssignable(consumer), assignablePartitions.size()); +if (maxAssignable <= 0) continue; -Collections.sort(consumersForTopic); +assign(consumer, assignablePartitions.subList(0, maxAssignable), assignmentState, assignment); +} +} -int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size(); -int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size(); +private void assignWithRackMatching(Collection assignmentStates, +
[GitHub] [kafka] rajinisivaram commented on a diff in pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)
rajinisivaram commented on code in PR #12990: URL: https://github.com/apache/kafka/pull/12990#discussion_r1115773500 ## clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java: ## @@ -74,45 +105,194 @@ public String name() { private Map> consumersPerTopic(Map consumerMetadata) { Map> topicToConsumers = new HashMap<>(); -for (Map.Entry subscriptionEntry : consumerMetadata.entrySet()) { -String consumerId = subscriptionEntry.getKey(); -MemberInfo memberInfo = new MemberInfo(consumerId, subscriptionEntry.getValue().groupInstanceId()); -for (String topic : subscriptionEntry.getValue().topics()) { -put(topicToConsumers, topic, memberInfo); -} -} +consumerMetadata.forEach((consumerId, subscription) -> { +MemberInfo memberInfo = new MemberInfo(consumerId, subscription.groupInstanceId(), subscription.rackId()); +subscription.topics().forEach(topic -> put(topicToConsumers, topic, memberInfo)); +}); return topicToConsumers; } +/** + * Performs range assignment of the specified partitions for the consumers with the provided subscriptions. + * If rack-awareness is enabled for one or more consumers, we perform rack-aware assignment first to assign + * the subset of partitions that can be aligned on racks, while retaining the same co-partitioning and + * per-topic balancing guarantees as non-rack-aware range assignment. The remaining partitions are assigned + * using standard non-rack-aware range assignment logic, which may result in mis-aligned racks. + */ @Override -public Map> assign(Map partitionsPerTopic, -Map subscriptions) { +public Map> assignPartitions(Map> partitionsPerTopic, + Map subscriptions) { Map> consumersPerTopic = consumersPerTopic(subscriptions); +Map consumerRacks = consumerRacks(subscriptions); +List topicAssignmentStates = partitionsPerTopic.entrySet().stream() +.filter(e -> !e.getValue().isEmpty()) +.map(e -> new TopicAssignmentState(e.getKey(), e.getValue(), consumersPerTopic.get(e.getKey()), consumerRacks)) +.collect(Collectors.toList()); Map> assignment = new HashMap<>(); -for (String memberId : subscriptions.keySet()) -assignment.put(memberId, new ArrayList<>()); +subscriptions.keySet().forEach(memberId -> assignment.put(memberId, new ArrayList<>())); + +boolean useRackAware = topicAssignmentStates.stream().anyMatch(t -> t.needsRackAwareAssignment); +if (useRackAware) +assignWithRackMatching(topicAssignmentStates, assignment); + +topicAssignmentStates.forEach(t -> assignRanges(t, (c, tp) -> true, assignment)); + +if (useRackAware) +assignment.values().forEach(list -> list.sort(PARTITION_COMPARATOR)); +return assignment; +} + +// This method is not used, but retained for compatibility with any custom assignors that extend this class. +@Override +public Map> assign(Map partitionsPerTopic, +Map subscriptions) { +return assignPartitions(partitionInfosWithoutRacks(partitionsPerTopic), subscriptions); +} -for (Map.Entry> topicEntry : consumersPerTopic.entrySet()) { -String topic = topicEntry.getKey(); -List consumersForTopic = topicEntry.getValue(); +private void assignRanges(TopicAssignmentState assignmentState, + BiFunction mayAssign, + Map> assignment) { +for (String consumer : assignmentState.consumers.keySet()) { +if (assignmentState.unassignedPartitions.isEmpty()) +break; +List assignablePartitions = assignmentState.unassignedPartitions.stream() +.filter(tp -> mayAssign.apply(consumer, tp)) +.collect(Collectors.toList()); -Integer numPartitionsForTopic = partitionsPerTopic.get(topic); -if (numPartitionsForTopic == null) +int maxAssignable = Math.min(assignmentState.maxAssignable(consumer), assignablePartitions.size()); +if (maxAssignable <= 0) continue; -Collections.sort(consumersForTopic); Review Comment: @dajac Thanks for the review. This `sort` is still done, I just moved it to TopicAssignmentState where the members are processed. All the tests in RangeAssignorTest are also run in rack-aware mode where all partitions have all racks to verify that the new algorithm gives the same result as the old algorithm. RangeAssignorTest.testStaticMemberRangeAssignmentPersistentAfterMemberIdChanges tests the scenario
[GitHub] [kafka] rajinisivaram commented on a diff in pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)
rajinisivaram commented on code in PR #12990: URL: https://github.com/apache/kafka/pull/12990#discussion_r1112029042 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -1951,20 +1954,47 @@ class PlaintextConsumerTest extends BaseConsumerTest { } @Test - def testConsumerRackIdPropagatedToPartitionAssignor(): Unit = { -consumerConfig.setProperty(ConsumerConfig.CLIENT_RACK_CONFIG, "rack-a") - consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, classOf[RackAwareAssignor].getName) -val consumer = createConsumer() -consumer.subscribe(Set(topic).asJava) -awaitAssignment(consumer, Set(tp, tp2)) - } -} + def testRackAwareRangeAssignor(): Unit = { Review Comment: I wanted to put this test into a class that had FFF enabled, but I couldn't find any integration tests for FFF. So included here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on a diff in pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)
rajinisivaram commented on code in PR #12990: URL: https://github.com/apache/kafka/pull/12990#discussion_r1112027236 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -1951,20 +1954,47 @@ class PlaintextConsumerTest extends BaseConsumerTest { } @Test - def testConsumerRackIdPropagatedToPartitionAssignor(): Unit = { -consumerConfig.setProperty(ConsumerConfig.CLIENT_RACK_CONFIG, "rack-a") - consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, classOf[RackAwareAssignor].getName) -val consumer = createConsumer() -consumer.subscribe(Set(topic).asJava) -awaitAssignment(consumer, Set(tp, tp2)) - } -} + def testRackAwareRangeAssignor(): Unit = { +val partitionList = servers.indices.toList + +val topicWithAllPartitionsOnAllRacks = "topicWithAllPartitionsOnAllRacks" +createTopic(topicWithAllPartitionsOnAllRacks, servers.size, servers.size) + +// Racks are in order of broker ids, assign leaders in reverse order +val topicWithSingleRackPartitions = "topicWithSingleRackPartitions" +createTopicWithAssignment(topicWithSingleRackPartitions, partitionList.map(i => (i, Seq(servers.size - i - 1))).toMap) + +// Create consumers with instance ids in ascending order, with racks in the same order. + consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, classOf[RangeAssignor].getName) +val consumers = servers.map { server => + consumerConfig.setProperty(ConsumerConfig.CLIENT_RACK_CONFIG, server.config.rack.orNull) + consumerConfig.setProperty(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, s"instance-${server.config.brokerId}") + createConsumer() +} + +val executor = Executors.newFixedThreadPool(consumers.size) +def waitForAssignments(assignments: List[Set[TopicPartition]]): Unit = { + val futures = consumers.zipWithIndex.map { case (consumer, i) => +executor.submit(() => awaitAssignment(consumer, assignments(i)), 0) + } + futures.foreach(future => assertEquals(0, future.get(20, TimeUnit.SECONDS))) +} -class RackAwareAssignor extends RoundRobinAssignor { - override def assign(partitionsPerTopic: util.Map[String, Integer], subscriptions: util.Map[String, ConsumerPartitionAssignor.Subscription]): util.Map[String, util.List[TopicPartition]] = { -assertEquals(1, subscriptions.size()) -assertEquals(Optional.of("rack-a"), subscriptions.values.asScala.head.rackId) -super.assign(partitionsPerTopic, subscriptions) +try { + // Rack-based assignment results in partitions assigned in reverse order since partition racks are in the reverse order. + consumers.foreach(_.subscribe(Collections.singleton(topicWithSingleRackPartitions))) + waitForAssignments(partitionList.reverse.map(p => Set(new TopicPartition(topicWithSingleRackPartitions, p + + // Non-rack-aware assignment results in ordered partitions. + consumers.foreach(_.subscribe(Collections.singleton(topicWithAllPartitionsOnAllRacks))) + waitForAssignments(partitionList.map(p => Set(new TopicPartition(topicWithAllPartitionsOnAllRacks, p + + // Rack-aware assignment with co-partitioning results in reverse assignment for both topics. + consumers.foreach(_.subscribe(Set(topicWithSingleRackPartitions, topicWithAllPartitionsOnAllRacks).asJava)) + waitForAssignments(partitionList.reverse.map(p => Set(new TopicPartition(topicWithAllPartitionsOnAllRacks, p), new TopicPartition(topicWithSingleRackPartitions, p Review Comment: No, we don't do any leader-specific assignment, we are assuming all replicas are equal. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on a diff in pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)
rajinisivaram commented on code in PR #12990: URL: https://github.com/apache/kafka/pull/12990#discussion_r1112026015 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -29,24 +35,21 @@ import org.apache.kafka.common.utils.Utils import org.apache.kafka.test.{MockConsumerInterceptor, MockProducerInterceptor} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Test - -import scala.jdk.CollectionConverters._ -import scala.collection.mutable.Buffer -import kafka.server.QuotaType -import kafka.server.KafkaServer -import org.apache.kafka.clients.admin.NewPartitions -import org.apache.kafka.clients.admin.NewTopic -import org.apache.kafka.common.config.TopicConfig import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource -import java.util.concurrent.TimeUnit -import java.util.concurrent.locks.ReentrantLock import scala.collection.mutable +import scala.collection.mutable.Buffer +import scala.jdk.CollectionConverters._ /* We have some tests in this class instead of `BaseConsumerTest` in order to keep the build time under control. */ class PlaintextConsumerTest extends BaseConsumerTest { + override def modifyConfigs(props: collection.Seq[Properties]): Unit = { +super.modifyConfigs(props) +props.zipWithIndex.foreach{ case (p, i) => p.setProperty(KafkaConfig.RackProp, i.toString) } Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on a diff in pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)
rajinisivaram commented on code in PR #12990: URL: https://github.com/apache/kafka/pull/12990#discussion_r1112025710 ## clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java: ## @@ -76,43 +99,185 @@ private Map> consumersPerTopic(Map> topicToConsumers = new HashMap<>(); for (Map.Entry subscriptionEntry : consumerMetadata.entrySet()) { String consumerId = subscriptionEntry.getKey(); -MemberInfo memberInfo = new MemberInfo(consumerId, subscriptionEntry.getValue().groupInstanceId()); -for (String topic : subscriptionEntry.getValue().topics()) { +Subscription subscription = subscriptionEntry.getValue(); +MemberInfo memberInfo = new MemberInfo(consumerId, subscription.groupInstanceId(), subscription.rackId()); +for (String topic : subscription.topics()) { put(topicToConsumers, topic, memberInfo); } } return topicToConsumers; } @Override -public Map> assign(Map partitionsPerTopic, -Map subscriptions) { +public Map> assignPartitions(Map> partitionsPerTopic, + Map subscriptions) { Map> consumersPerTopic = consumersPerTopic(subscriptions); +List topicAssignmentStates = partitionsPerTopic.entrySet().stream() +.filter(e -> !e.getValue().isEmpty()) +.map(e -> new TopicAssignmentState(e.getKey(), e.getValue(), consumersPerTopic.get(e.getKey( +.collect(Collectors.toList()); Map> assignment = new HashMap<>(); for (String memberId : subscriptions.keySet()) assignment.put(memberId, new ArrayList<>()); -for (Map.Entry> topicEntry : consumersPerTopic.entrySet()) { -String topic = topicEntry.getKey(); -List consumersForTopic = topicEntry.getValue(); +boolean useRackAware = topicAssignmentStates.stream().anyMatch(t -> t.needsRackAwareAssignment); +if (useRackAware) +assignWithRackMatching(topicAssignmentStates, assignment); + +topicAssignmentStates.forEach(t -> assignRanges(t, (c, tp) -> true, assignment)); + +if (useRackAware) +assignment.values().forEach(list -> list.sort(PARTITION_COMPARATOR)); +return assignment; +} + +// This method is not used, but retained for compatibility with any custom assignors that extend this class. +@Override +public Map> assign(Map partitionsPerTopic, +Map subscriptions) { +return assignPartitions(partitionInfosWithoutRacks(partitionsPerTopic), subscriptions); +} + +private void assignRanges(TopicAssignmentState assignmentState, + BiFunction mayAssign, + Map> assignment) { +for (String consumer : assignmentState.consumers) { +if (assignmentState.unassignedPartitions.isEmpty()) +break; +List assignablePartitions = assignmentState.unassignedPartitions.stream() +.filter(tp -> mayAssign.apply(consumer, tp)) +.collect(Collectors.toList()); -Integer numPartitionsForTopic = partitionsPerTopic.get(topic); -if (numPartitionsForTopic == null) +int maxAssignable = Math.min(assignmentState.maxAssignable(consumer), assignablePartitions.size()); +if (maxAssignable <= 0) continue; -Collections.sort(consumersForTopic); +assign(consumer, assignablePartitions.subList(0, maxAssignable), assignmentState, assignment); +} +} + +private void assignWithRackMatching(Collection assignmentStates, +Map> assignment) { -int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size(); -int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size(); +assignmentStates.stream().collect(Collectors.groupingBy(t -> t.consumers)).forEach((consumers, states) -> { +states.stream().collect(Collectors.groupingBy(t -> t.partitionRacks.size())).forEach((numPartitions, coPartitionedStates) -> { +if (coPartitionedStates.size() > 1) +assignCoPartitionedWithRackMatching(consumers, numPartitions, states, assignment); +else { +TopicAssignmentState state = coPartitionedStates.get(0); +if (state.needsRackAwareAssignment) +assignRanges(state, state::racksMatch, assignment); +} +}); +}); +} + +private void assignCoPartitionedWithRackMatching(List consumers, + int
[GitHub] [kafka] rajinisivaram commented on a diff in pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)
rajinisivaram commented on code in PR #12990: URL: https://github.com/apache/kafka/pull/12990#discussion_r1112016639 ## clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java: ## @@ -76,43 +99,185 @@ private Map> consumersPerTopic(Map> topicToConsumers = new HashMap<>(); for (Map.Entry subscriptionEntry : consumerMetadata.entrySet()) { String consumerId = subscriptionEntry.getKey(); -MemberInfo memberInfo = new MemberInfo(consumerId, subscriptionEntry.getValue().groupInstanceId()); -for (String topic : subscriptionEntry.getValue().topics()) { +Subscription subscription = subscriptionEntry.getValue(); +MemberInfo memberInfo = new MemberInfo(consumerId, subscription.groupInstanceId(), subscription.rackId()); +for (String topic : subscription.topics()) { put(topicToConsumers, topic, memberInfo); } } return topicToConsumers; } @Override -public Map> assign(Map partitionsPerTopic, -Map subscriptions) { +public Map> assignPartitions(Map> partitionsPerTopic, + Map subscriptions) { Map> consumersPerTopic = consumersPerTopic(subscriptions); +List topicAssignmentStates = partitionsPerTopic.entrySet().stream() +.filter(e -> !e.getValue().isEmpty()) +.map(e -> new TopicAssignmentState(e.getKey(), e.getValue(), consumersPerTopic.get(e.getKey( +.collect(Collectors.toList()); Map> assignment = new HashMap<>(); for (String memberId : subscriptions.keySet()) assignment.put(memberId, new ArrayList<>()); -for (Map.Entry> topicEntry : consumersPerTopic.entrySet()) { -String topic = topicEntry.getKey(); -List consumersForTopic = topicEntry.getValue(); +boolean useRackAware = topicAssignmentStates.stream().anyMatch(t -> t.needsRackAwareAssignment); +if (useRackAware) +assignWithRackMatching(topicAssignmentStates, assignment); + +topicAssignmentStates.forEach(t -> assignRanges(t, (c, tp) -> true, assignment)); + +if (useRackAware) +assignment.values().forEach(list -> list.sort(PARTITION_COMPARATOR)); +return assignment; +} + +// This method is not used, but retained for compatibility with any custom assignors that extend this class. +@Override +public Map> assign(Map partitionsPerTopic, +Map subscriptions) { +return assignPartitions(partitionInfosWithoutRacks(partitionsPerTopic), subscriptions); +} + +private void assignRanges(TopicAssignmentState assignmentState, + BiFunction mayAssign, + Map> assignment) { +for (String consumer : assignmentState.consumers) { +if (assignmentState.unassignedPartitions.isEmpty()) +break; +List assignablePartitions = assignmentState.unassignedPartitions.stream() +.filter(tp -> mayAssign.apply(consumer, tp)) +.collect(Collectors.toList()); -Integer numPartitionsForTopic = partitionsPerTopic.get(topic); -if (numPartitionsForTopic == null) +int maxAssignable = Math.min(assignmentState.maxAssignable(consumer), assignablePartitions.size()); +if (maxAssignable <= 0) continue; -Collections.sort(consumersForTopic); +assign(consumer, assignablePartitions.subList(0, maxAssignable), assignmentState, assignment); +} +} + +private void assignWithRackMatching(Collection assignmentStates, +Map> assignment) { -int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size(); -int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size(); +assignmentStates.stream().collect(Collectors.groupingBy(t -> t.consumers)).forEach((consumers, states) -> { +states.stream().collect(Collectors.groupingBy(t -> t.partitionRacks.size())).forEach((numPartitions, coPartitionedStates) -> { +if (coPartitionedStates.size() > 1) +assignCoPartitionedWithRackMatching(consumers, numPartitions, states, assignment); +else { +TopicAssignmentState state = coPartitionedStates.get(0); +if (state.needsRackAwareAssignment) +assignRanges(state, state::racksMatch, assignment); +} +}); +}); +} + +private void assignCoPartitionedWithRackMatching(List consumers, + int
[GitHub] [kafka] rajinisivaram commented on a diff in pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)
rajinisivaram commented on code in PR #12990: URL: https://github.com/apache/kafka/pull/12990#discussion_r1112015848 ## clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java: ## @@ -76,43 +99,185 @@ private Map> consumersPerTopic(Map> topicToConsumers = new HashMap<>(); for (Map.Entry subscriptionEntry : consumerMetadata.entrySet()) { String consumerId = subscriptionEntry.getKey(); -MemberInfo memberInfo = new MemberInfo(consumerId, subscriptionEntry.getValue().groupInstanceId()); -for (String topic : subscriptionEntry.getValue().topics()) { +Subscription subscription = subscriptionEntry.getValue(); +MemberInfo memberInfo = new MemberInfo(consumerId, subscription.groupInstanceId(), subscription.rackId()); +for (String topic : subscription.topics()) { put(topicToConsumers, topic, memberInfo); } } return topicToConsumers; } @Override -public Map> assign(Map partitionsPerTopic, -Map subscriptions) { +public Map> assignPartitions(Map> partitionsPerTopic, + Map subscriptions) { Map> consumersPerTopic = consumersPerTopic(subscriptions); +List topicAssignmentStates = partitionsPerTopic.entrySet().stream() +.filter(e -> !e.getValue().isEmpty()) +.map(e -> new TopicAssignmentState(e.getKey(), e.getValue(), consumersPerTopic.get(e.getKey( +.collect(Collectors.toList()); Map> assignment = new HashMap<>(); for (String memberId : subscriptions.keySet()) assignment.put(memberId, new ArrayList<>()); -for (Map.Entry> topicEntry : consumersPerTopic.entrySet()) { -String topic = topicEntry.getKey(); -List consumersForTopic = topicEntry.getValue(); +boolean useRackAware = topicAssignmentStates.stream().anyMatch(t -> t.needsRackAwareAssignment); +if (useRackAware) +assignWithRackMatching(topicAssignmentStates, assignment); + +topicAssignmentStates.forEach(t -> assignRanges(t, (c, tp) -> true, assignment)); + +if (useRackAware) +assignment.values().forEach(list -> list.sort(PARTITION_COMPARATOR)); +return assignment; +} + +// This method is not used, but retained for compatibility with any custom assignors that extend this class. +@Override +public Map> assign(Map partitionsPerTopic, +Map subscriptions) { +return assignPartitions(partitionInfosWithoutRacks(partitionsPerTopic), subscriptions); +} + +private void assignRanges(TopicAssignmentState assignmentState, + BiFunction mayAssign, + Map> assignment) { +for (String consumer : assignmentState.consumers) { +if (assignmentState.unassignedPartitions.isEmpty()) +break; +List assignablePartitions = assignmentState.unassignedPartitions.stream() +.filter(tp -> mayAssign.apply(consumer, tp)) +.collect(Collectors.toList()); -Integer numPartitionsForTopic = partitionsPerTopic.get(topic); -if (numPartitionsForTopic == null) +int maxAssignable = Math.min(assignmentState.maxAssignable(consumer), assignablePartitions.size()); +if (maxAssignable <= 0) continue; -Collections.sort(consumersForTopic); +assign(consumer, assignablePartitions.subList(0, maxAssignable), assignmentState, assignment); +} +} + +private void assignWithRackMatching(Collection assignmentStates, +Map> assignment) { -int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size(); -int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size(); +assignmentStates.stream().collect(Collectors.groupingBy(t -> t.consumers)).forEach((consumers, states) -> { +states.stream().collect(Collectors.groupingBy(t -> t.partitionRacks.size())).forEach((numPartitions, coPartitionedStates) -> { +if (coPartitionedStates.size() > 1) +assignCoPartitionedWithRackMatching(consumers, numPartitions, states, assignment); +else { +TopicAssignmentState state = coPartitionedStates.get(0); +if (state.needsRackAwareAssignment) +assignRanges(state, state::racksMatch, assignment); +} +}); +}); +} + +private void assignCoPartitionedWithRackMatching(List consumers, + int
[GitHub] [kafka] rajinisivaram commented on a diff in pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)
rajinisivaram commented on code in PR #12990: URL: https://github.com/apache/kafka/pull/12990#discussion_r1112014546 ## clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java: ## @@ -76,43 +99,185 @@ private Map> consumersPerTopic(Map> topicToConsumers = new HashMap<>(); for (Map.Entry subscriptionEntry : consumerMetadata.entrySet()) { String consumerId = subscriptionEntry.getKey(); -MemberInfo memberInfo = new MemberInfo(consumerId, subscriptionEntry.getValue().groupInstanceId()); -for (String topic : subscriptionEntry.getValue().topics()) { +Subscription subscription = subscriptionEntry.getValue(); +MemberInfo memberInfo = new MemberInfo(consumerId, subscription.groupInstanceId(), subscription.rackId()); +for (String topic : subscription.topics()) { put(topicToConsumers, topic, memberInfo); } } return topicToConsumers; } @Override -public Map> assign(Map partitionsPerTopic, -Map subscriptions) { +public Map> assignPartitions(Map> partitionsPerTopic, + Map subscriptions) { Map> consumersPerTopic = consumersPerTopic(subscriptions); +List topicAssignmentStates = partitionsPerTopic.entrySet().stream() +.filter(e -> !e.getValue().isEmpty()) +.map(e -> new TopicAssignmentState(e.getKey(), e.getValue(), consumersPerTopic.get(e.getKey( +.collect(Collectors.toList()); Map> assignment = new HashMap<>(); for (String memberId : subscriptions.keySet()) assignment.put(memberId, new ArrayList<>()); -for (Map.Entry> topicEntry : consumersPerTopic.entrySet()) { -String topic = topicEntry.getKey(); -List consumersForTopic = topicEntry.getValue(); +boolean useRackAware = topicAssignmentStates.stream().anyMatch(t -> t.needsRackAwareAssignment); +if (useRackAware) +assignWithRackMatching(topicAssignmentStates, assignment); + +topicAssignmentStates.forEach(t -> assignRanges(t, (c, tp) -> true, assignment)); + +if (useRackAware) +assignment.values().forEach(list -> list.sort(PARTITION_COMPARATOR)); +return assignment; +} + +// This method is not used, but retained for compatibility with any custom assignors that extend this class. +@Override +public Map> assign(Map partitionsPerTopic, +Map subscriptions) { +return assignPartitions(partitionInfosWithoutRacks(partitionsPerTopic), subscriptions); +} + +private void assignRanges(TopicAssignmentState assignmentState, + BiFunction mayAssign, + Map> assignment) { +for (String consumer : assignmentState.consumers) { +if (assignmentState.unassignedPartitions.isEmpty()) +break; +List assignablePartitions = assignmentState.unassignedPartitions.stream() +.filter(tp -> mayAssign.apply(consumer, tp)) +.collect(Collectors.toList()); -Integer numPartitionsForTopic = partitionsPerTopic.get(topic); -if (numPartitionsForTopic == null) +int maxAssignable = Math.min(assignmentState.maxAssignable(consumer), assignablePartitions.size()); +if (maxAssignable <= 0) continue; -Collections.sort(consumersForTopic); +assign(consumer, assignablePartitions.subList(0, maxAssignable), assignmentState, assignment); +} +} + +private void assignWithRackMatching(Collection assignmentStates, +Map> assignment) { -int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size(); -int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size(); +assignmentStates.stream().collect(Collectors.groupingBy(t -> t.consumers)).forEach((consumers, states) -> { +states.stream().collect(Collectors.groupingBy(t -> t.partitionRacks.size())).forEach((numPartitions, coPartitionedStates) -> { +if (coPartitionedStates.size() > 1) +assignCoPartitionedWithRackMatching(consumers, numPartitions, states, assignment); +else { +TopicAssignmentState state = coPartitionedStates.get(0); +if (state.needsRackAwareAssignment) +assignRanges(state, state::racksMatch, assignment); +} +}); +}); +} + +private void assignCoPartitionedWithRackMatching(List consumers, + int
[GitHub] [kafka] rajinisivaram commented on a diff in pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)
rajinisivaram commented on code in PR #12990: URL: https://github.com/apache/kafka/pull/12990#discussion_r1112013804 ## clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java: ## @@ -76,43 +99,185 @@ private Map> consumersPerTopic(Map> topicToConsumers = new HashMap<>(); for (Map.Entry subscriptionEntry : consumerMetadata.entrySet()) { String consumerId = subscriptionEntry.getKey(); -MemberInfo memberInfo = new MemberInfo(consumerId, subscriptionEntry.getValue().groupInstanceId()); -for (String topic : subscriptionEntry.getValue().topics()) { +Subscription subscription = subscriptionEntry.getValue(); +MemberInfo memberInfo = new MemberInfo(consumerId, subscription.groupInstanceId(), subscription.rackId()); +for (String topic : subscription.topics()) { put(topicToConsumers, topic, memberInfo); } } return topicToConsumers; } @Override -public Map> assign(Map partitionsPerTopic, -Map subscriptions) { +public Map> assignPartitions(Map> partitionsPerTopic, + Map subscriptions) { Map> consumersPerTopic = consumersPerTopic(subscriptions); +List topicAssignmentStates = partitionsPerTopic.entrySet().stream() +.filter(e -> !e.getValue().isEmpty()) +.map(e -> new TopicAssignmentState(e.getKey(), e.getValue(), consumersPerTopic.get(e.getKey( +.collect(Collectors.toList()); Map> assignment = new HashMap<>(); for (String memberId : subscriptions.keySet()) assignment.put(memberId, new ArrayList<>()); -for (Map.Entry> topicEntry : consumersPerTopic.entrySet()) { -String topic = topicEntry.getKey(); -List consumersForTopic = topicEntry.getValue(); +boolean useRackAware = topicAssignmentStates.stream().anyMatch(t -> t.needsRackAwareAssignment); +if (useRackAware) +assignWithRackMatching(topicAssignmentStates, assignment); + +topicAssignmentStates.forEach(t -> assignRanges(t, (c, tp) -> true, assignment)); + +if (useRackAware) +assignment.values().forEach(list -> list.sort(PARTITION_COMPARATOR)); +return assignment; +} + +// This method is not used, but retained for compatibility with any custom assignors that extend this class. +@Override +public Map> assign(Map partitionsPerTopic, +Map subscriptions) { +return assignPartitions(partitionInfosWithoutRacks(partitionsPerTopic), subscriptions); +} + +private void assignRanges(TopicAssignmentState assignmentState, + BiFunction mayAssign, + Map> assignment) { +for (String consumer : assignmentState.consumers) { +if (assignmentState.unassignedPartitions.isEmpty()) +break; +List assignablePartitions = assignmentState.unassignedPartitions.stream() +.filter(tp -> mayAssign.apply(consumer, tp)) +.collect(Collectors.toList()); -Integer numPartitionsForTopic = partitionsPerTopic.get(topic); -if (numPartitionsForTopic == null) +int maxAssignable = Math.min(assignmentState.maxAssignable(consumer), assignablePartitions.size()); +if (maxAssignable <= 0) continue; -Collections.sort(consumersForTopic); +assign(consumer, assignablePartitions.subList(0, maxAssignable), assignmentState, assignment); +} +} + +private void assignWithRackMatching(Collection assignmentStates, +Map> assignment) { -int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size(); -int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size(); +assignmentStates.stream().collect(Collectors.groupingBy(t -> t.consumers)).forEach((consumers, states) -> { +states.stream().collect(Collectors.groupingBy(t -> t.partitionRacks.size())).forEach((numPartitions, coPartitionedStates) -> { +if (coPartitionedStates.size() > 1) +assignCoPartitionedWithRackMatching(consumers, numPartitions, states, assignment); +else { +TopicAssignmentState state = coPartitionedStates.get(0); +if (state.needsRackAwareAssignment) +assignRanges(state, state::racksMatch, assignment); +} +}); +}); +} + +private void assignCoPartitionedWithRackMatching(List consumers, + int
[GitHub] [kafka] rajinisivaram commented on a diff in pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)
rajinisivaram commented on code in PR #12990: URL: https://github.com/apache/kafka/pull/12990#discussion_r1112011199 ## clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java: ## @@ -63,9 +76,19 @@ * I0: [t0p0, t0p1, t1p0, t1p1] * I1: [t0p2, t1p2] * + * + * Rack-aware assignment is used if both consumer and partition replica racks are available and + * some partitions have replicas only on a subset of racks. We attempt to match consumer racks with + * partition replica racks on a best-effort basis, prioritizing balanced assignment over rack alignment. + * Topics with equal partition count and same set of subscribers prioritize co-partitioning guarantee + * over rack alignment. In this case, aligning partition replicas of these topics on the same racks + * will improve locality for consumers. For example, if partitions 0 of all topics have a replica on + * rack 'a', partition 1 on rack 'b' etc., partition 0 of all topics can be assigned to a consumer + * on rack 'a', partition 1 to a consumer on rack 'b' and so on. Review Comment: Yes, that is correct. ## clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java: ## @@ -76,43 +99,185 @@ private Map> consumersPerTopic(Map> topicToConsumers = new HashMap<>(); for (Map.Entry subscriptionEntry : consumerMetadata.entrySet()) { Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org