Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]
dajac merged PR #16088: URL: https://github.com/apache/kafka/pull/16088 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]
dajac commented on code in PR #16088: URL: https://github.com/apache/kafka/pull/16088#discussion_r1622324676 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java: ## @@ -158,12 +159,11 @@ public void testFirstAssignmentTwoMembersTwoTopicsNoMemberRacks() { Map>> expectedAssignment = new HashMap<>(); expectedAssignment.put(memberA, mkAssignment( -mkTopicAssignment(topic1Uuid, 0, 2), -mkTopicAssignment(topic3Uuid, 1) +mkTopicAssignment(topic1Uuid, 0), +mkTopicAssignment(topic3Uuid, 0, 1) )); expectedAssignment.put(memberB, mkAssignment( -mkTopicAssignment(topic1Uuid, 1), -mkTopicAssignment(topic3Uuid, 0) +mkTopicAssignment(topic1Uuid, 1, 2) Review Comment: The changes that I made changed the output of the assignor. The output is still correct though. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]
dajac commented on PR #16088: URL: https://github.com/apache/kafka/pull/16088#issuecomment-2141914425 @jeffkbkim @jolshan @rreddy-22 Thanks for your comments. I addressed them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]
dajac commented on code in PR #16088: URL: https://github.com/apache/kafka/pull/16088#discussion_r1622323588 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java: ## @@ -144,216 +133,137 @@ protected GroupAssignment buildAssignment() throws PartitionAssignorException { } } -// The minimum required quota that each member needs to meet for a balanced assignment. -// This is the same for all members. -final int numberOfMembers = groupSpec.members().size(); -final int minQuota = totalPartitionsCount / numberOfMembers; +// Compute the minimum required quota per member and the number of members +// who should receive an extra partition. +int numberOfMembers = groupSpec.members().size(); +minimumMemberQuota = totalPartitionsCount / numberOfMembers; remainingMembersToGetAnExtraPartition = totalPartitionsCount % numberOfMembers; -groupSpec.members().keySet().forEach(memberId -> -targetAssignment.put(memberId, new MemberAssignment(new HashMap<>()) -)); - -potentiallyUnfilledMembers = assignStickyPartitions(minQuota); - -unassignedPartitionsRoundRobinAssignment(); +// Revoke the partitions which are either not part of the subscriptions or above +// the maximum quota. +maybeRevokePartitions(); -if (!unassignedPartitions.isEmpty()) { -throw new PartitionAssignorException("Partitions were left unassigned"); -} +// Assign the unassigned partitions to the members with space. +assignRemainingPartitions(); return new GroupAssignment(targetAssignment); } -/** - * Retains a set of partitions from the existing assignment and includes them in the target assignment. - * Only relevant partitions that exist in the current topic metadata and subscriptions are considered. - * - * For each member: - * - * Find the valid current assignment considering topic subscriptions and metadata - * If the current assignment exists, retain partitions up to the minimum quota. - * If the current assignment size is greater than the minimum quota and - * there are members that could get an extra partition, assign the next partition as well. - * Finally, if the member's current assignment size is less than the minimum quota, - * add them to the potentially unfilled members map and track the number of remaining - * partitions required to meet the quota. - * - * - * - * @return Members mapped to the remaining number of partitions needed to meet the minimum quota, - * including members that are eligible to receive an extra partition. - */ -private Map assignStickyPartitions(int minQuota) { -Map potentiallyUnfilledMembers = new HashMap<>(); - -groupSpec.members().forEach((memberId, assignmentMemberSpec) -> { -List validCurrentMemberAssignment = validCurrentMemberAssignment( -assignmentMemberSpec.assignedPartitions() -); - -int currentAssignmentSize = validCurrentMemberAssignment.size(); -// Number of partitions required to meet the minimum quota. -int remaining = minQuota - currentAssignmentSize; - -if (currentAssignmentSize > 0) { -int retainedPartitionsCount = min(currentAssignmentSize, minQuota); -IntStream.range(0, retainedPartitionsCount).forEach(i -> { -TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(i); -addPartitionToAssignment( -targetAssignment, -memberId, -topicIdPartition.topicId(), -topicIdPartition.partitionId() -); -}); - -if (remaining < 0) { -// The extra partition is located at the last index from the previous step. -if (remainingMembersToGetAnExtraPartition > 0) { -TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(retainedPartitionsCount++); -addPartitionToAssignment( -targetAssignment, -memberId, -topicIdPartition.topicId(), -topicIdPartition.partitionId() -); -remainingMembersToGetAnExtraPartition--; +private void maybeRevokePartitions() { +for (Map.Entry entry : groupSpec.members().entrySet()) { +String memberId = entry.getKey(); +AssignmentMemberSpec assignmentMemberSpec = entry.getValue(); +Map> oldAssignment = a
Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]
dajac commented on code in PR #16088: URL: https://github.com/apache/kafka/pull/16088#discussion_r1622321434 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java: ## @@ -144,216 +133,137 @@ protected GroupAssignment buildAssignment() throws PartitionAssignorException { } } -// The minimum required quota that each member needs to meet for a balanced assignment. -// This is the same for all members. -final int numberOfMembers = groupSpec.members().size(); -final int minQuota = totalPartitionsCount / numberOfMembers; +// Compute the minimum required quota per member and the number of members +// who should receive an extra partition. +int numberOfMembers = groupSpec.members().size(); +minimumMemberQuota = totalPartitionsCount / numberOfMembers; remainingMembersToGetAnExtraPartition = totalPartitionsCount % numberOfMembers; -groupSpec.members().keySet().forEach(memberId -> -targetAssignment.put(memberId, new MemberAssignment(new HashMap<>()) -)); - -potentiallyUnfilledMembers = assignStickyPartitions(minQuota); - -unassignedPartitionsRoundRobinAssignment(); +// Revoke the partitions which are either not part of the subscriptions or above +// the maximum quota. +maybeRevokePartitions(); -if (!unassignedPartitions.isEmpty()) { -throw new PartitionAssignorException("Partitions were left unassigned"); -} +// Assign the unassigned partitions to the members with space. +assignRemainingPartitions(); return new GroupAssignment(targetAssignment); } -/** - * Retains a set of partitions from the existing assignment and includes them in the target assignment. - * Only relevant partitions that exist in the current topic metadata and subscriptions are considered. - * - * For each member: - * - * Find the valid current assignment considering topic subscriptions and metadata - * If the current assignment exists, retain partitions up to the minimum quota. - * If the current assignment size is greater than the minimum quota and - * there are members that could get an extra partition, assign the next partition as well. - * Finally, if the member's current assignment size is less than the minimum quota, - * add them to the potentially unfilled members map and track the number of remaining - * partitions required to meet the quota. - * - * - * - * @return Members mapped to the remaining number of partitions needed to meet the minimum quota, - * including members that are eligible to receive an extra partition. - */ -private Map assignStickyPartitions(int minQuota) { -Map potentiallyUnfilledMembers = new HashMap<>(); - -groupSpec.members().forEach((memberId, assignmentMemberSpec) -> { -List validCurrentMemberAssignment = validCurrentMemberAssignment( -assignmentMemberSpec.assignedPartitions() -); - -int currentAssignmentSize = validCurrentMemberAssignment.size(); -// Number of partitions required to meet the minimum quota. -int remaining = minQuota - currentAssignmentSize; - -if (currentAssignmentSize > 0) { -int retainedPartitionsCount = min(currentAssignmentSize, minQuota); -IntStream.range(0, retainedPartitionsCount).forEach(i -> { -TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(i); -addPartitionToAssignment( -targetAssignment, -memberId, -topicIdPartition.topicId(), -topicIdPartition.partitionId() -); -}); - -if (remaining < 0) { -// The extra partition is located at the last index from the previous step. -if (remainingMembersToGetAnExtraPartition > 0) { -TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(retainedPartitionsCount++); -addPartitionToAssignment( -targetAssignment, -memberId, -topicIdPartition.topicId(), -topicIdPartition.partitionId() -); -remainingMembersToGetAnExtraPartition--; +private void maybeRevokePartitions() { +for (Map.Entry entry : groupSpec.members().entrySet()) { +String memberId = entry.getKey(); +AssignmentMemberSpec assignmentMemberSpec = entry.getValue(); +Map> oldAssignment = a
Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]
dajac commented on code in PR #16088: URL: https://github.com/apache/kafka/pull/16088#discussion_r1622321107 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java: ## @@ -144,216 +133,137 @@ protected GroupAssignment buildAssignment() throws PartitionAssignorException { } } -// The minimum required quota that each member needs to meet for a balanced assignment. -// This is the same for all members. -final int numberOfMembers = groupSpec.members().size(); -final int minQuota = totalPartitionsCount / numberOfMembers; +// Compute the minimum required quota per member and the number of members +// who should receive an extra partition. +int numberOfMembers = groupSpec.members().size(); +minimumMemberQuota = totalPartitionsCount / numberOfMembers; remainingMembersToGetAnExtraPartition = totalPartitionsCount % numberOfMembers; -groupSpec.members().keySet().forEach(memberId -> -targetAssignment.put(memberId, new MemberAssignment(new HashMap<>()) -)); - -potentiallyUnfilledMembers = assignStickyPartitions(minQuota); - -unassignedPartitionsRoundRobinAssignment(); +// Revoke the partitions which are either not part of the subscriptions or above +// the maximum quota. +maybeRevokePartitions(); -if (!unassignedPartitions.isEmpty()) { -throw new PartitionAssignorException("Partitions were left unassigned"); -} +// Assign the unassigned partitions to the members with space. +assignRemainingPartitions(); return new GroupAssignment(targetAssignment); } -/** - * Retains a set of partitions from the existing assignment and includes them in the target assignment. - * Only relevant partitions that exist in the current topic metadata and subscriptions are considered. - * - * For each member: - * - * Find the valid current assignment considering topic subscriptions and metadata - * If the current assignment exists, retain partitions up to the minimum quota. - * If the current assignment size is greater than the minimum quota and - * there are members that could get an extra partition, assign the next partition as well. - * Finally, if the member's current assignment size is less than the minimum quota, - * add them to the potentially unfilled members map and track the number of remaining - * partitions required to meet the quota. - * - * - * - * @return Members mapped to the remaining number of partitions needed to meet the minimum quota, - * including members that are eligible to receive an extra partition. - */ -private Map assignStickyPartitions(int minQuota) { -Map potentiallyUnfilledMembers = new HashMap<>(); - -groupSpec.members().forEach((memberId, assignmentMemberSpec) -> { -List validCurrentMemberAssignment = validCurrentMemberAssignment( -assignmentMemberSpec.assignedPartitions() -); - -int currentAssignmentSize = validCurrentMemberAssignment.size(); -// Number of partitions required to meet the minimum quota. -int remaining = minQuota - currentAssignmentSize; - -if (currentAssignmentSize > 0) { -int retainedPartitionsCount = min(currentAssignmentSize, minQuota); -IntStream.range(0, retainedPartitionsCount).forEach(i -> { -TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(i); -addPartitionToAssignment( -targetAssignment, -memberId, -topicIdPartition.topicId(), -topicIdPartition.partitionId() -); -}); - -if (remaining < 0) { -// The extra partition is located at the last index from the previous step. -if (remainingMembersToGetAnExtraPartition > 0) { -TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(retainedPartitionsCount++); -addPartitionToAssignment( -targetAssignment, -memberId, -topicIdPartition.topicId(), -topicIdPartition.partitionId() -); -remainingMembersToGetAnExtraPartition--; +private void maybeRevokePartitions() { +for (Map.Entry entry : groupSpec.members().entrySet()) { +String memberId = entry.getKey(); +AssignmentMemberSpec assignmentMemberSpec = entry.getValue(); +Map> oldAssignment = a
Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]
dajac commented on code in PR #16088: URL: https://github.com/apache/kafka/pull/16088#discussion_r1622320299 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java: ## @@ -144,216 +133,137 @@ protected GroupAssignment buildAssignment() throws PartitionAssignorException { } } -// The minimum required quota that each member needs to meet for a balanced assignment. -// This is the same for all members. -final int numberOfMembers = groupSpec.members().size(); -final int minQuota = totalPartitionsCount / numberOfMembers; +// Compute the minimum required quota per member and the number of members +// who should receive an extra partition. +int numberOfMembers = groupSpec.members().size(); +minimumMemberQuota = totalPartitionsCount / numberOfMembers; remainingMembersToGetAnExtraPartition = totalPartitionsCount % numberOfMembers; -groupSpec.members().keySet().forEach(memberId -> -targetAssignment.put(memberId, new MemberAssignment(new HashMap<>()) -)); - -potentiallyUnfilledMembers = assignStickyPartitions(minQuota); - -unassignedPartitionsRoundRobinAssignment(); +// Revoke the partitions which are either not part of the subscriptions or above +// the maximum quota. +maybeRevokePartitions(); -if (!unassignedPartitions.isEmpty()) { -throw new PartitionAssignorException("Partitions were left unassigned"); -} +// Assign the unassigned partitions to the members with space. +assignRemainingPartitions(); return new GroupAssignment(targetAssignment); } -/** - * Retains a set of partitions from the existing assignment and includes them in the target assignment. - * Only relevant partitions that exist in the current topic metadata and subscriptions are considered. - * - * For each member: - * - * Find the valid current assignment considering topic subscriptions and metadata - * If the current assignment exists, retain partitions up to the minimum quota. - * If the current assignment size is greater than the minimum quota and - * there are members that could get an extra partition, assign the next partition as well. - * Finally, if the member's current assignment size is less than the minimum quota, - * add them to the potentially unfilled members map and track the number of remaining - * partitions required to meet the quota. - * - * - * - * @return Members mapped to the remaining number of partitions needed to meet the minimum quota, - * including members that are eligible to receive an extra partition. - */ -private Map assignStickyPartitions(int minQuota) { -Map potentiallyUnfilledMembers = new HashMap<>(); - -groupSpec.members().forEach((memberId, assignmentMemberSpec) -> { -List validCurrentMemberAssignment = validCurrentMemberAssignment( -assignmentMemberSpec.assignedPartitions() -); - -int currentAssignmentSize = validCurrentMemberAssignment.size(); -// Number of partitions required to meet the minimum quota. -int remaining = minQuota - currentAssignmentSize; - -if (currentAssignmentSize > 0) { -int retainedPartitionsCount = min(currentAssignmentSize, minQuota); -IntStream.range(0, retainedPartitionsCount).forEach(i -> { -TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(i); -addPartitionToAssignment( -targetAssignment, -memberId, -topicIdPartition.topicId(), -topicIdPartition.partitionId() -); -}); - -if (remaining < 0) { -// The extra partition is located at the last index from the previous step. -if (remainingMembersToGetAnExtraPartition > 0) { -TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(retainedPartitionsCount++); -addPartitionToAssignment( -targetAssignment, -memberId, -topicIdPartition.topicId(), -topicIdPartition.partitionId() -); -remainingMembersToGetAnExtraPartition--; +private void maybeRevokePartitions() { +for (Map.Entry entry : groupSpec.members().entrySet()) { +String memberId = entry.getKey(); +AssignmentMemberSpec assignmentMemberSpec = entry.getValue(); +Map> oldAssignment = a
Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]
dajac commented on code in PR #16088: URL: https://github.com/apache/kafka/pull/16088#discussion_r1622319160 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java: ## @@ -144,216 +133,137 @@ protected GroupAssignment buildAssignment() throws PartitionAssignorException { } } -// The minimum required quota that each member needs to meet for a balanced assignment. -// This is the same for all members. -final int numberOfMembers = groupSpec.members().size(); -final int minQuota = totalPartitionsCount / numberOfMembers; +// Compute the minimum required quota per member and the number of members +// who should receive an extra partition. +int numberOfMembers = groupSpec.members().size(); +minimumMemberQuota = totalPartitionsCount / numberOfMembers; remainingMembersToGetAnExtraPartition = totalPartitionsCount % numberOfMembers; -groupSpec.members().keySet().forEach(memberId -> -targetAssignment.put(memberId, new MemberAssignment(new HashMap<>()) -)); - -potentiallyUnfilledMembers = assignStickyPartitions(minQuota); - -unassignedPartitionsRoundRobinAssignment(); +// Revoke the partitions which are either not part of the subscriptions or above +// the maximum quota. +maybeRevokePartitions(); -if (!unassignedPartitions.isEmpty()) { -throw new PartitionAssignorException("Partitions were left unassigned"); -} +// Assign the unassigned partitions to the members with space. +assignRemainingPartitions(); return new GroupAssignment(targetAssignment); } -/** - * Retains a set of partitions from the existing assignment and includes them in the target assignment. - * Only relevant partitions that exist in the current topic metadata and subscriptions are considered. - * - * For each member: - * - * Find the valid current assignment considering topic subscriptions and metadata - * If the current assignment exists, retain partitions up to the minimum quota. - * If the current assignment size is greater than the minimum quota and - * there are members that could get an extra partition, assign the next partition as well. - * Finally, if the member's current assignment size is less than the minimum quota, - * add them to the potentially unfilled members map and track the number of remaining - * partitions required to meet the quota. - * - * - * - * @return Members mapped to the remaining number of partitions needed to meet the minimum quota, - * including members that are eligible to receive an extra partition. - */ -private Map assignStickyPartitions(int minQuota) { -Map potentiallyUnfilledMembers = new HashMap<>(); - -groupSpec.members().forEach((memberId, assignmentMemberSpec) -> { -List validCurrentMemberAssignment = validCurrentMemberAssignment( -assignmentMemberSpec.assignedPartitions() -); - -int currentAssignmentSize = validCurrentMemberAssignment.size(); -// Number of partitions required to meet the minimum quota. -int remaining = minQuota - currentAssignmentSize; - -if (currentAssignmentSize > 0) { -int retainedPartitionsCount = min(currentAssignmentSize, minQuota); -IntStream.range(0, retainedPartitionsCount).forEach(i -> { -TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(i); -addPartitionToAssignment( -targetAssignment, -memberId, -topicIdPartition.topicId(), -topicIdPartition.partitionId() -); -}); - -if (remaining < 0) { -// The extra partition is located at the last index from the previous step. -if (remainingMembersToGetAnExtraPartition > 0) { -TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(retainedPartitionsCount++); -addPartitionToAssignment( -targetAssignment, -memberId, -topicIdPartition.topicId(), -topicIdPartition.partitionId() -); -remainingMembersToGetAnExtraPartition--; +private void maybeRevokePartitions() { +for (Map.Entry entry : groupSpec.members().entrySet()) { +String memberId = entry.getKey(); +AssignmentMemberSpec assignmentMemberSpec = entry.getValue(); +Map> oldAssignment = a
Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]
dajac commented on code in PR #16088: URL: https://github.com/apache/kafka/pull/16088#discussion_r1622314880 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java: ## @@ -71,63 +69,54 @@ public class OptimizedUniformAssignmentBuilder extends AbstractUniformAssignment */ private final Set subscribedTopicIds; -/** - * The number of members to receive an extra partition beyond the minimum quota. - * Minimum Quota = Total Partitions / Total Members - * Example: If there are 11 partitions to be distributed among 3 members, - * each member gets 3 (11 / 3) [minQuota] partitions and 2 (11 % 3) members get an extra partition. - */ -private int remainingMembersToGetAnExtraPartition; - /** * Members mapped to the remaining number of partitions needed to meet the minimum quota. - * Minimum quota = total partitions / total members. */ -private Map potentiallyUnfilledMembers; +private final List potentiallyUnfilledMembers; Review Comment: I think that this is not true in the current implementation because I adjust the quota before adding to unfilled members. Let me rename it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]
dajac commented on code in PR #16088: URL: https://github.com/apache/kafka/pull/16088#discussion_r1622313268 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java: ## @@ -53,8 +42,17 @@ * The assignment builder prioritizes the properties in the following order: * Balance > Stickiness. */ -public class OptimizedUniformAssignmentBuilder extends AbstractUniformAssignmentBuilder { -private static final Logger LOG = LoggerFactory.getLogger(OptimizedUniformAssignmentBuilder.class); +public class OptimizedUniformAssignmentBuilder { Review Comment: We don't need it at the moment and I don't think that we will need it in the future. As this code is performance sensitive, I also think that we should keep it as strait as possible and self contained if possible. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]
dajac commented on code in PR #16088: URL: https://github.com/apache/kafka/pull/16088#discussion_r1622312275 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformAssignor.java: ## @@ -66,21 +66,19 @@ public GroupAssignment assign( GroupSpec groupSpec, SubscribedTopicDescriber subscribedTopicDescriber ) throws PartitionAssignorException { -AbstractUniformAssignmentBuilder assignmentBuilder; - if (groupSpec.members().isEmpty()) return new GroupAssignment(Collections.emptyMap()); if (groupSpec.subscriptionType().equals(HOMOGENEOUS)) { LOG.debug("Detected that all members are subscribed to the same set of topics, invoking the " + "optimized assignment algorithm"); -assignmentBuilder = new OptimizedUniformAssignmentBuilder(groupSpec, subscribedTopicDescriber); +return new OptimizedUniformAssignmentBuilder(groupSpec, subscribedTopicDescriber) +.build(); Review Comment: I thought that `assignment` was redundant as this is an assignment builder. I will also rename the general one but I will do it in a different PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]
rreddy-22 commented on code in PR #16088: URL: https://github.com/apache/kafka/pull/16088#discussion_r1621843719 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java: ## @@ -71,63 +69,54 @@ public class OptimizedUniformAssignmentBuilder extends AbstractUniformAssignment */ private final Set subscribedTopicIds; -/** - * The number of members to receive an extra partition beyond the minimum quota. - * Minimum Quota = Total Partitions / Total Members - * Example: If there are 11 partitions to be distributed among 3 members, - * each member gets 3 (11 / 3) [minQuota] partitions and 2 (11 % 3) members get an extra partition. - */ -private int remainingMembersToGetAnExtraPartition; - /** * Members mapped to the remaining number of partitions needed to meet the minimum quota. - * Minimum quota = total partitions / total members. */ -private Map potentiallyUnfilledMembers; +private final List potentiallyUnfilledMembers; Review Comment: There is a set number of extra partitions that we can give out once every member gets an equal number of partitions. For exmaple if there were 5 partitions and 3 members. Each member gets 1 each and then we have 2 extras. Let's say A,B,C have all received 1 partition, they are all potentially unfilled. Once A and B get their extra partitions, C isn't unfilled and doesn't receive any partitions -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]
jolshan commented on code in PR #16088: URL: https://github.com/apache/kafka/pull/16088#discussion_r1621445974 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java: ## @@ -144,216 +133,137 @@ protected GroupAssignment buildAssignment() throws PartitionAssignorException { } } -// The minimum required quota that each member needs to meet for a balanced assignment. -// This is the same for all members. -final int numberOfMembers = groupSpec.members().size(); -final int minQuota = totalPartitionsCount / numberOfMembers; +// Compute the minimum required quota per member and the number of members +// who should receive an extra partition. +int numberOfMembers = groupSpec.members().size(); +minimumMemberQuota = totalPartitionsCount / numberOfMembers; remainingMembersToGetAnExtraPartition = totalPartitionsCount % numberOfMembers; -groupSpec.members().keySet().forEach(memberId -> -targetAssignment.put(memberId, new MemberAssignment(new HashMap<>()) -)); - -potentiallyUnfilledMembers = assignStickyPartitions(minQuota); - -unassignedPartitionsRoundRobinAssignment(); +// Revoke the partitions which are either not part of the subscriptions or above +// the maximum quota. +maybeRevokePartitions(); -if (!unassignedPartitions.isEmpty()) { -throw new PartitionAssignorException("Partitions were left unassigned"); -} +// Assign the unassigned partitions to the members with space. +assignRemainingPartitions(); return new GroupAssignment(targetAssignment); } -/** - * Retains a set of partitions from the existing assignment and includes them in the target assignment. - * Only relevant partitions that exist in the current topic metadata and subscriptions are considered. - * - * For each member: - * - * Find the valid current assignment considering topic subscriptions and metadata - * If the current assignment exists, retain partitions up to the minimum quota. - * If the current assignment size is greater than the minimum quota and - * there are members that could get an extra partition, assign the next partition as well. - * Finally, if the member's current assignment size is less than the minimum quota, - * add them to the potentially unfilled members map and track the number of remaining - * partitions required to meet the quota. - * - * - * - * @return Members mapped to the remaining number of partitions needed to meet the minimum quota, - * including members that are eligible to receive an extra partition. - */ -private Map assignStickyPartitions(int minQuota) { -Map potentiallyUnfilledMembers = new HashMap<>(); - -groupSpec.members().forEach((memberId, assignmentMemberSpec) -> { -List validCurrentMemberAssignment = validCurrentMemberAssignment( -assignmentMemberSpec.assignedPartitions() -); - -int currentAssignmentSize = validCurrentMemberAssignment.size(); -// Number of partitions required to meet the minimum quota. -int remaining = minQuota - currentAssignmentSize; - -if (currentAssignmentSize > 0) { -int retainedPartitionsCount = min(currentAssignmentSize, minQuota); -IntStream.range(0, retainedPartitionsCount).forEach(i -> { -TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(i); -addPartitionToAssignment( -targetAssignment, -memberId, -topicIdPartition.topicId(), -topicIdPartition.partitionId() -); -}); - -if (remaining < 0) { -// The extra partition is located at the last index from the previous step. -if (remainingMembersToGetAnExtraPartition > 0) { -TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(retainedPartitionsCount++); -addPartitionToAssignment( -targetAssignment, -memberId, -topicIdPartition.topicId(), -topicIdPartition.partitionId() -); -remainingMembersToGetAnExtraPartition--; +private void maybeRevokePartitions() { +for (Map.Entry entry : groupSpec.members().entrySet()) { +String memberId = entry.getKey(); +AssignmentMemberSpec assignmentMemberSpec = entry.getValue(); +Map> oldAssignment =
Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]
jolshan commented on code in PR #16088: URL: https://github.com/apache/kafka/pull/16088#discussion_r1621442335 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformAssignor.java: ## @@ -66,21 +66,19 @@ public GroupAssignment assign( GroupSpec groupSpec, SubscribedTopicDescriber subscribedTopicDescriber ) throws PartitionAssignorException { -AbstractUniformAssignmentBuilder assignmentBuilder; - if (groupSpec.members().isEmpty()) return new GroupAssignment(Collections.emptyMap()); if (groupSpec.subscriptionType().equals(HOMOGENEOUS)) { LOG.debug("Detected that all members are subscribed to the same set of topics, invoking the " + "optimized assignment algorithm"); -assignmentBuilder = new OptimizedUniformAssignmentBuilder(groupSpec, subscribedTopicDescriber); +return new OptimizedUniformAssignmentBuilder(groupSpec, subscribedTopicDescriber) +.build(); Review Comment: any reason why we changed the name to not match the general assignor? Or is this also changed in the original that renamed the files? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]
jolshan commented on code in PR #16088: URL: https://github.com/apache/kafka/pull/16088#discussion_r1621424229 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java: ## @@ -71,63 +69,54 @@ public class OptimizedUniformAssignmentBuilder extends AbstractUniformAssignment */ private final Set subscribedTopicIds; -/** - * The number of members to receive an extra partition beyond the minimum quota. - * Minimum Quota = Total Partitions / Total Members - * Example: If there are 11 partitions to be distributed among 3 members, - * each member gets 3 (11 / 3) [minQuota] partitions and 2 (11 % 3) members get an extra partition. - */ -private int remainingMembersToGetAnExtraPartition; - /** * Members mapped to the remaining number of partitions needed to meet the minimum quota. - * Minimum quota = total partitions / total members. */ -private Map potentiallyUnfilledMembers; +private final List potentiallyUnfilledMembers; Review Comment: why do we call this potentiallyUnfilledMembers rather than unfilledMembers? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]
jolshan commented on code in PR #16088: URL: https://github.com/apache/kafka/pull/16088#discussion_r1621422471 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java: ## @@ -144,216 +133,137 @@ protected GroupAssignment buildAssignment() throws PartitionAssignorException { } } -// The minimum required quota that each member needs to meet for a balanced assignment. -// This is the same for all members. -final int numberOfMembers = groupSpec.members().size(); -final int minQuota = totalPartitionsCount / numberOfMembers; +// Compute the minimum required quota per member and the number of members +// who should receive an extra partition. +int numberOfMembers = groupSpec.members().size(); +minimumMemberQuota = totalPartitionsCount / numberOfMembers; remainingMembersToGetAnExtraPartition = totalPartitionsCount % numberOfMembers; -groupSpec.members().keySet().forEach(memberId -> -targetAssignment.put(memberId, new MemberAssignment(new HashMap<>()) -)); - -potentiallyUnfilledMembers = assignStickyPartitions(minQuota); - -unassignedPartitionsRoundRobinAssignment(); +// Revoke the partitions which are either not part of the subscriptions or above +// the maximum quota. +maybeRevokePartitions(); -if (!unassignedPartitions.isEmpty()) { -throw new PartitionAssignorException("Partitions were left unassigned"); -} +// Assign the unassigned partitions to the members with space. +assignRemainingPartitions(); return new GroupAssignment(targetAssignment); } -/** - * Retains a set of partitions from the existing assignment and includes them in the target assignment. - * Only relevant partitions that exist in the current topic metadata and subscriptions are considered. - * - * For each member: - * - * Find the valid current assignment considering topic subscriptions and metadata - * If the current assignment exists, retain partitions up to the minimum quota. - * If the current assignment size is greater than the minimum quota and - * there are members that could get an extra partition, assign the next partition as well. - * Finally, if the member's current assignment size is less than the minimum quota, - * add them to the potentially unfilled members map and track the number of remaining - * partitions required to meet the quota. - * - * - * - * @return Members mapped to the remaining number of partitions needed to meet the minimum quota, - * including members that are eligible to receive an extra partition. - */ -private Map assignStickyPartitions(int minQuota) { -Map potentiallyUnfilledMembers = new HashMap<>(); - -groupSpec.members().forEach((memberId, assignmentMemberSpec) -> { -List validCurrentMemberAssignment = validCurrentMemberAssignment( -assignmentMemberSpec.assignedPartitions() -); - -int currentAssignmentSize = validCurrentMemberAssignment.size(); -// Number of partitions required to meet the minimum quota. -int remaining = minQuota - currentAssignmentSize; - -if (currentAssignmentSize > 0) { -int retainedPartitionsCount = min(currentAssignmentSize, minQuota); -IntStream.range(0, retainedPartitionsCount).forEach(i -> { -TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(i); -addPartitionToAssignment( -targetAssignment, -memberId, -topicIdPartition.topicId(), -topicIdPartition.partitionId() -); -}); - -if (remaining < 0) { -// The extra partition is located at the last index from the previous step. -if (remainingMembersToGetAnExtraPartition > 0) { -TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(retainedPartitionsCount++); -addPartitionToAssignment( -targetAssignment, -memberId, -topicIdPartition.topicId(), -topicIdPartition.partitionId() -); -remainingMembersToGetAnExtraPartition--; +private void maybeRevokePartitions() { +for (Map.Entry entry : groupSpec.members().entrySet()) { +String memberId = entry.getKey(); +AssignmentMemberSpec assignmentMemberSpec = entry.getValue(); +Map> oldAssignment =
Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]
jolshan commented on code in PR #16088: URL: https://github.com/apache/kafka/pull/16088#discussion_r1621421194 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java: ## @@ -144,216 +133,137 @@ protected GroupAssignment buildAssignment() throws PartitionAssignorException { } } -// The minimum required quota that each member needs to meet for a balanced assignment. -// This is the same for all members. -final int numberOfMembers = groupSpec.members().size(); -final int minQuota = totalPartitionsCount / numberOfMembers; +// Compute the minimum required quota per member and the number of members +// who should receive an extra partition. +int numberOfMembers = groupSpec.members().size(); +minimumMemberQuota = totalPartitionsCount / numberOfMembers; remainingMembersToGetAnExtraPartition = totalPartitionsCount % numberOfMembers; -groupSpec.members().keySet().forEach(memberId -> -targetAssignment.put(memberId, new MemberAssignment(new HashMap<>()) -)); - -potentiallyUnfilledMembers = assignStickyPartitions(minQuota); - -unassignedPartitionsRoundRobinAssignment(); +// Revoke the partitions which are either not part of the subscriptions or above +// the maximum quota. +maybeRevokePartitions(); -if (!unassignedPartitions.isEmpty()) { -throw new PartitionAssignorException("Partitions were left unassigned"); -} +// Assign the unassigned partitions to the members with space. +assignRemainingPartitions(); return new GroupAssignment(targetAssignment); } -/** - * Retains a set of partitions from the existing assignment and includes them in the target assignment. - * Only relevant partitions that exist in the current topic metadata and subscriptions are considered. - * - * For each member: - * - * Find the valid current assignment considering topic subscriptions and metadata - * If the current assignment exists, retain partitions up to the minimum quota. - * If the current assignment size is greater than the minimum quota and - * there are members that could get an extra partition, assign the next partition as well. - * Finally, if the member's current assignment size is less than the minimum quota, - * add them to the potentially unfilled members map and track the number of remaining - * partitions required to meet the quota. - * - * - * - * @return Members mapped to the remaining number of partitions needed to meet the minimum quota, - * including members that are eligible to receive an extra partition. - */ -private Map assignStickyPartitions(int minQuota) { -Map potentiallyUnfilledMembers = new HashMap<>(); - -groupSpec.members().forEach((memberId, assignmentMemberSpec) -> { -List validCurrentMemberAssignment = validCurrentMemberAssignment( -assignmentMemberSpec.assignedPartitions() -); - -int currentAssignmentSize = validCurrentMemberAssignment.size(); -// Number of partitions required to meet the minimum quota. -int remaining = minQuota - currentAssignmentSize; - -if (currentAssignmentSize > 0) { -int retainedPartitionsCount = min(currentAssignmentSize, minQuota); -IntStream.range(0, retainedPartitionsCount).forEach(i -> { -TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(i); -addPartitionToAssignment( -targetAssignment, -memberId, -topicIdPartition.topicId(), -topicIdPartition.partitionId() -); -}); - -if (remaining < 0) { -// The extra partition is located at the last index from the previous step. -if (remainingMembersToGetAnExtraPartition > 0) { -TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(retainedPartitionsCount++); -addPartitionToAssignment( -targetAssignment, -memberId, -topicIdPartition.topicId(), -topicIdPartition.partitionId() -); -remainingMembersToGetAnExtraPartition--; +private void maybeRevokePartitions() { +for (Map.Entry entry : groupSpec.members().entrySet()) { +String memberId = entry.getKey(); +AssignmentMemberSpec assignmentMemberSpec = entry.getValue(); +Map> oldAssignment =
Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]
rreddy-22 commented on code in PR #16088: URL: https://github.com/apache/kafka/pull/16088#discussion_r1619961360 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java: ## @@ -18,29 +18,18 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.server.common.TopicIdPartition; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Queue; import java.util.Set; -import java.util.stream.Collectors; -import java.util.stream.IntStream; - -import static java.lang.Math.min; /** - * The optimized uniform assignment builder is used to generate the target assignment for a consumer group with + * The homogenous uniform assignment builder is used to generate the target assignment for a consumer group with Review Comment: That was for the subscription type right which is just one word? Homogeneous uniform assignment builder is kind of a mouthful. I used the wrong term, by user I meant anyone looking at the code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]
rreddy-22 commented on code in PR #16088: URL: https://github.com/apache/kafka/pull/16088#discussion_r1619961360 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java: ## @@ -18,29 +18,18 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.server.common.TopicIdPartition; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Queue; import java.util.Set; -import java.util.stream.Collectors; -import java.util.stream.IntStream; - -import static java.lang.Math.min; /** - * The optimized uniform assignment builder is used to generate the target assignment for a consumer group with + * The homogenous uniform assignment builder is used to generate the target assignment for a consumer group with Review Comment: That was for the subscription type right? Homogeneous uniform assignment builder is kind of a mouthful. I used the wrong term, by user I meant anyone looking at the code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]
rreddy-22 commented on code in PR #16088: URL: https://github.com/apache/kafka/pull/16088#discussion_r1619937439 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java: ## @@ -144,216 +133,137 @@ protected GroupAssignment buildAssignment() throws PartitionAssignorException { } } -// The minimum required quota that each member needs to meet for a balanced assignment. -// This is the same for all members. -final int numberOfMembers = groupSpec.members().size(); -final int minQuota = totalPartitionsCount / numberOfMembers; +// Compute the minimum required quota per member and the number of members +// who should receive an extra partition. +int numberOfMembers = groupSpec.members().size(); +minimumMemberQuota = totalPartitionsCount / numberOfMembers; remainingMembersToGetAnExtraPartition = totalPartitionsCount % numberOfMembers; -groupSpec.members().keySet().forEach(memberId -> -targetAssignment.put(memberId, new MemberAssignment(new HashMap<>()) -)); - -potentiallyUnfilledMembers = assignStickyPartitions(minQuota); - -unassignedPartitionsRoundRobinAssignment(); +// Revoke the partitions which are either not part of the subscriptions or above +// the maximum quota. +maybeRevokePartitions(); -if (!unassignedPartitions.isEmpty()) { -throw new PartitionAssignorException("Partitions were left unassigned"); -} +// Assign the unassigned partitions to the members with space. +assignRemainingPartitions(); return new GroupAssignment(targetAssignment); } -/** - * Retains a set of partitions from the existing assignment and includes them in the target assignment. - * Only relevant partitions that exist in the current topic metadata and subscriptions are considered. - * - * For each member: - * - * Find the valid current assignment considering topic subscriptions and metadata - * If the current assignment exists, retain partitions up to the minimum quota. - * If the current assignment size is greater than the minimum quota and - * there are members that could get an extra partition, assign the next partition as well. - * Finally, if the member's current assignment size is less than the minimum quota, - * add them to the potentially unfilled members map and track the number of remaining - * partitions required to meet the quota. - * - * - * - * @return Members mapped to the remaining number of partitions needed to meet the minimum quota, - * including members that are eligible to receive an extra partition. - */ -private Map assignStickyPartitions(int minQuota) { -Map potentiallyUnfilledMembers = new HashMap<>(); - -groupSpec.members().forEach((memberId, assignmentMemberSpec) -> { -List validCurrentMemberAssignment = validCurrentMemberAssignment( -assignmentMemberSpec.assignedPartitions() -); - -int currentAssignmentSize = validCurrentMemberAssignment.size(); -// Number of partitions required to meet the minimum quota. -int remaining = minQuota - currentAssignmentSize; - -if (currentAssignmentSize > 0) { -int retainedPartitionsCount = min(currentAssignmentSize, minQuota); -IntStream.range(0, retainedPartitionsCount).forEach(i -> { -TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(i); -addPartitionToAssignment( -targetAssignment, -memberId, -topicIdPartition.topicId(), -topicIdPartition.partitionId() -); -}); - -if (remaining < 0) { -// The extra partition is located at the last index from the previous step. -if (remainingMembersToGetAnExtraPartition > 0) { -TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(retainedPartitionsCount++); -addPartitionToAssignment( -targetAssignment, -memberId, -topicIdPartition.topicId(), -topicIdPartition.partitionId() -); -remainingMembersToGetAnExtraPartition--; +private void maybeRevokePartitions() { +for (Map.Entry entry : groupSpec.members().entrySet()) { +String memberId = entry.getKey(); +AssignmentMemberSpec assignmentMemberSpec = entry.getValue(); +Map> oldAssignment
Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]
rreddy-22 commented on code in PR #16088: URL: https://github.com/apache/kafka/pull/16088#discussion_r1619924951 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java: ## @@ -144,216 +133,137 @@ protected GroupAssignment buildAssignment() throws PartitionAssignorException { } } -// The minimum required quota that each member needs to meet for a balanced assignment. -// This is the same for all members. -final int numberOfMembers = groupSpec.members().size(); -final int minQuota = totalPartitionsCount / numberOfMembers; +// Compute the minimum required quota per member and the number of members +// who should receive an extra partition. +int numberOfMembers = groupSpec.members().size(); +minimumMemberQuota = totalPartitionsCount / numberOfMembers; remainingMembersToGetAnExtraPartition = totalPartitionsCount % numberOfMembers; -groupSpec.members().keySet().forEach(memberId -> -targetAssignment.put(memberId, new MemberAssignment(new HashMap<>()) -)); - -potentiallyUnfilledMembers = assignStickyPartitions(minQuota); - -unassignedPartitionsRoundRobinAssignment(); +// Revoke the partitions which are either not part of the subscriptions or above +// the maximum quota. +maybeRevokePartitions(); -if (!unassignedPartitions.isEmpty()) { -throw new PartitionAssignorException("Partitions were left unassigned"); -} +// Assign the unassigned partitions to the members with space. +assignRemainingPartitions(); return new GroupAssignment(targetAssignment); } -/** - * Retains a set of partitions from the existing assignment and includes them in the target assignment. - * Only relevant partitions that exist in the current topic metadata and subscriptions are considered. - * - * For each member: - * - * Find the valid current assignment considering topic subscriptions and metadata - * If the current assignment exists, retain partitions up to the minimum quota. - * If the current assignment size is greater than the minimum quota and - * there are members that could get an extra partition, assign the next partition as well. - * Finally, if the member's current assignment size is less than the minimum quota, - * add them to the potentially unfilled members map and track the number of remaining - * partitions required to meet the quota. - * - * - * - * @return Members mapped to the remaining number of partitions needed to meet the minimum quota, - * including members that are eligible to receive an extra partition. - */ -private Map assignStickyPartitions(int minQuota) { -Map potentiallyUnfilledMembers = new HashMap<>(); - -groupSpec.members().forEach((memberId, assignmentMemberSpec) -> { -List validCurrentMemberAssignment = validCurrentMemberAssignment( -assignmentMemberSpec.assignedPartitions() -); - -int currentAssignmentSize = validCurrentMemberAssignment.size(); -// Number of partitions required to meet the minimum quota. -int remaining = minQuota - currentAssignmentSize; - -if (currentAssignmentSize > 0) { -int retainedPartitionsCount = min(currentAssignmentSize, minQuota); -IntStream.range(0, retainedPartitionsCount).forEach(i -> { -TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(i); -addPartitionToAssignment( -targetAssignment, -memberId, -topicIdPartition.topicId(), -topicIdPartition.partitionId() -); -}); - -if (remaining < 0) { -// The extra partition is located at the last index from the previous step. -if (remainingMembersToGetAnExtraPartition > 0) { -TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(retainedPartitionsCount++); -addPartitionToAssignment( -targetAssignment, -memberId, -topicIdPartition.topicId(), -topicIdPartition.partitionId() -); -remainingMembersToGetAnExtraPartition--; +private void maybeRevokePartitions() { Review Comment: nit: can we add java docs for this method -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific
Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]
rreddy-22 commented on code in PR #16088: URL: https://github.com/apache/kafka/pull/16088#discussion_r1619917677 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java: ## @@ -144,216 +133,137 @@ protected GroupAssignment buildAssignment() throws PartitionAssignorException { } } -// The minimum required quota that each member needs to meet for a balanced assignment. -// This is the same for all members. -final int numberOfMembers = groupSpec.members().size(); -final int minQuota = totalPartitionsCount / numberOfMembers; +// Compute the minimum required quota per member and the number of members +// who should receive an extra partition. +int numberOfMembers = groupSpec.members().size(); +minimumMemberQuota = totalPartitionsCount / numberOfMembers; remainingMembersToGetAnExtraPartition = totalPartitionsCount % numberOfMembers; -groupSpec.members().keySet().forEach(memberId -> -targetAssignment.put(memberId, new MemberAssignment(new HashMap<>()) -)); - -potentiallyUnfilledMembers = assignStickyPartitions(minQuota); - -unassignedPartitionsRoundRobinAssignment(); +// Revoke the partitions which are either not part of the subscriptions or above +// the maximum quota. +maybeRevokePartitions(); -if (!unassignedPartitions.isEmpty()) { -throw new PartitionAssignorException("Partitions were left unassigned"); -} +// Assign the unassigned partitions to the members with space. +assignRemainingPartitions(); return new GroupAssignment(targetAssignment); } -/** - * Retains a set of partitions from the existing assignment and includes them in the target assignment. - * Only relevant partitions that exist in the current topic metadata and subscriptions are considered. - * - * For each member: - * - * Find the valid current assignment considering topic subscriptions and metadata - * If the current assignment exists, retain partitions up to the minimum quota. - * If the current assignment size is greater than the minimum quota and - * there are members that could get an extra partition, assign the next partition as well. - * Finally, if the member's current assignment size is less than the minimum quota, - * add them to the potentially unfilled members map and track the number of remaining - * partitions required to meet the quota. - * - * - * - * @return Members mapped to the remaining number of partitions needed to meet the minimum quota, - * including members that are eligible to receive an extra partition. - */ -private Map assignStickyPartitions(int minQuota) { -Map potentiallyUnfilledMembers = new HashMap<>(); - -groupSpec.members().forEach((memberId, assignmentMemberSpec) -> { -List validCurrentMemberAssignment = validCurrentMemberAssignment( -assignmentMemberSpec.assignedPartitions() -); - -int currentAssignmentSize = validCurrentMemberAssignment.size(); -// Number of partitions required to meet the minimum quota. -int remaining = minQuota - currentAssignmentSize; - -if (currentAssignmentSize > 0) { -int retainedPartitionsCount = min(currentAssignmentSize, minQuota); -IntStream.range(0, retainedPartitionsCount).forEach(i -> { -TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(i); -addPartitionToAssignment( -targetAssignment, -memberId, -topicIdPartition.topicId(), -topicIdPartition.partitionId() -); -}); - -if (remaining < 0) { -// The extra partition is located at the last index from the previous step. -if (remainingMembersToGetAnExtraPartition > 0) { -TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(retainedPartitionsCount++); -addPartitionToAssignment( -targetAssignment, -memberId, -topicIdPartition.topicId(), -topicIdPartition.partitionId() -); -remainingMembersToGetAnExtraPartition--; +private void maybeRevokePartitions() { +for (Map.Entry entry : groupSpec.members().entrySet()) { +String memberId = entry.getKey(); +AssignmentMemberSpec assignmentMemberSpec = entry.getValue(); +Map> oldAssignment
Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]
rreddy-22 commented on code in PR #16088: URL: https://github.com/apache/kafka/pull/16088#discussion_r1619915938 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java: ## @@ -144,216 +133,137 @@ protected GroupAssignment buildAssignment() throws PartitionAssignorException { } } -// The minimum required quota that each member needs to meet for a balanced assignment. -// This is the same for all members. -final int numberOfMembers = groupSpec.members().size(); -final int minQuota = totalPartitionsCount / numberOfMembers; +// Compute the minimum required quota per member and the number of members +// who should receive an extra partition. +int numberOfMembers = groupSpec.members().size(); +minimumMemberQuota = totalPartitionsCount / numberOfMembers; remainingMembersToGetAnExtraPartition = totalPartitionsCount % numberOfMembers; -groupSpec.members().keySet().forEach(memberId -> -targetAssignment.put(memberId, new MemberAssignment(new HashMap<>()) -)); - -potentiallyUnfilledMembers = assignStickyPartitions(minQuota); - -unassignedPartitionsRoundRobinAssignment(); +// Revoke the partitions which are either not part of the subscriptions or above +// the maximum quota. +maybeRevokePartitions(); -if (!unassignedPartitions.isEmpty()) { -throw new PartitionAssignorException("Partitions were left unassigned"); -} +// Assign the unassigned partitions to the members with space. +assignRemainingPartitions(); return new GroupAssignment(targetAssignment); } -/** - * Retains a set of partitions from the existing assignment and includes them in the target assignment. - * Only relevant partitions that exist in the current topic metadata and subscriptions are considered. - * - * For each member: - * - * Find the valid current assignment considering topic subscriptions and metadata - * If the current assignment exists, retain partitions up to the minimum quota. - * If the current assignment size is greater than the minimum quota and - * there are members that could get an extra partition, assign the next partition as well. - * Finally, if the member's current assignment size is less than the minimum quota, - * add them to the potentially unfilled members map and track the number of remaining - * partitions required to meet the quota. - * - * - * - * @return Members mapped to the remaining number of partitions needed to meet the minimum quota, - * including members that are eligible to receive an extra partition. - */ -private Map assignStickyPartitions(int minQuota) { -Map potentiallyUnfilledMembers = new HashMap<>(); - -groupSpec.members().forEach((memberId, assignmentMemberSpec) -> { -List validCurrentMemberAssignment = validCurrentMemberAssignment( -assignmentMemberSpec.assignedPartitions() -); - -int currentAssignmentSize = validCurrentMemberAssignment.size(); -// Number of partitions required to meet the minimum quota. -int remaining = minQuota - currentAssignmentSize; - -if (currentAssignmentSize > 0) { -int retainedPartitionsCount = min(currentAssignmentSize, minQuota); -IntStream.range(0, retainedPartitionsCount).forEach(i -> { -TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(i); -addPartitionToAssignment( -targetAssignment, -memberId, -topicIdPartition.topicId(), -topicIdPartition.partitionId() -); -}); - -if (remaining < 0) { -// The extra partition is located at the last index from the previous step. -if (remainingMembersToGetAnExtraPartition > 0) { -TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(retainedPartitionsCount++); -addPartitionToAssignment( -targetAssignment, -memberId, -topicIdPartition.topicId(), -topicIdPartition.partitionId() -); -remainingMembersToGetAnExtraPartition--; +private void maybeRevokePartitions() { +for (Map.Entry entry : groupSpec.members().entrySet()) { +String memberId = entry.getKey(); +AssignmentMemberSpec assignmentMemberSpec = entry.getValue(); +Map> oldAssignment
Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]
rreddy-22 commented on code in PR #16088: URL: https://github.com/apache/kafka/pull/16088#discussion_r1619915938 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java: ## @@ -144,216 +133,137 @@ protected GroupAssignment buildAssignment() throws PartitionAssignorException { } } -// The minimum required quota that each member needs to meet for a balanced assignment. -// This is the same for all members. -final int numberOfMembers = groupSpec.members().size(); -final int minQuota = totalPartitionsCount / numberOfMembers; +// Compute the minimum required quota per member and the number of members +// who should receive an extra partition. +int numberOfMembers = groupSpec.members().size(); +minimumMemberQuota = totalPartitionsCount / numberOfMembers; remainingMembersToGetAnExtraPartition = totalPartitionsCount % numberOfMembers; -groupSpec.members().keySet().forEach(memberId -> -targetAssignment.put(memberId, new MemberAssignment(new HashMap<>()) -)); - -potentiallyUnfilledMembers = assignStickyPartitions(minQuota); - -unassignedPartitionsRoundRobinAssignment(); +// Revoke the partitions which are either not part of the subscriptions or above +// the maximum quota. +maybeRevokePartitions(); -if (!unassignedPartitions.isEmpty()) { -throw new PartitionAssignorException("Partitions were left unassigned"); -} +// Assign the unassigned partitions to the members with space. +assignRemainingPartitions(); return new GroupAssignment(targetAssignment); } -/** - * Retains a set of partitions from the existing assignment and includes them in the target assignment. - * Only relevant partitions that exist in the current topic metadata and subscriptions are considered. - * - * For each member: - * - * Find the valid current assignment considering topic subscriptions and metadata - * If the current assignment exists, retain partitions up to the minimum quota. - * If the current assignment size is greater than the minimum quota and - * there are members that could get an extra partition, assign the next partition as well. - * Finally, if the member's current assignment size is less than the minimum quota, - * add them to the potentially unfilled members map and track the number of remaining - * partitions required to meet the quota. - * - * - * - * @return Members mapped to the remaining number of partitions needed to meet the minimum quota, - * including members that are eligible to receive an extra partition. - */ -private Map assignStickyPartitions(int minQuota) { -Map potentiallyUnfilledMembers = new HashMap<>(); - -groupSpec.members().forEach((memberId, assignmentMemberSpec) -> { -List validCurrentMemberAssignment = validCurrentMemberAssignment( -assignmentMemberSpec.assignedPartitions() -); - -int currentAssignmentSize = validCurrentMemberAssignment.size(); -// Number of partitions required to meet the minimum quota. -int remaining = minQuota - currentAssignmentSize; - -if (currentAssignmentSize > 0) { -int retainedPartitionsCount = min(currentAssignmentSize, minQuota); -IntStream.range(0, retainedPartitionsCount).forEach(i -> { -TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(i); -addPartitionToAssignment( -targetAssignment, -memberId, -topicIdPartition.topicId(), -topicIdPartition.partitionId() -); -}); - -if (remaining < 0) { -// The extra partition is located at the last index from the previous step. -if (remainingMembersToGetAnExtraPartition > 0) { -TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(retainedPartitionsCount++); -addPartitionToAssignment( -targetAssignment, -memberId, -topicIdPartition.topicId(), -topicIdPartition.partitionId() -); -remainingMembersToGetAnExtraPartition--; +private void maybeRevokePartitions() { +for (Map.Entry entry : groupSpec.members().entrySet()) { +String memberId = entry.getKey(); +AssignmentMemberSpec assignmentMemberSpec = entry.getValue(); +Map> oldAssignment
Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]
rreddy-22 commented on code in PR #16088: URL: https://github.com/apache/kafka/pull/16088#discussion_r1619907267 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java: ## @@ -144,216 +133,137 @@ protected GroupAssignment buildAssignment() throws PartitionAssignorException { } } -// The minimum required quota that each member needs to meet for a balanced assignment. -// This is the same for all members. -final int numberOfMembers = groupSpec.members().size(); -final int minQuota = totalPartitionsCount / numberOfMembers; +// Compute the minimum required quota per member and the number of members +// who should receive an extra partition. +int numberOfMembers = groupSpec.members().size(); +minimumMemberQuota = totalPartitionsCount / numberOfMembers; remainingMembersToGetAnExtraPartition = totalPartitionsCount % numberOfMembers; -groupSpec.members().keySet().forEach(memberId -> -targetAssignment.put(memberId, new MemberAssignment(new HashMap<>()) -)); - -potentiallyUnfilledMembers = assignStickyPartitions(minQuota); - -unassignedPartitionsRoundRobinAssignment(); +// Revoke the partitions which are either not part of the subscriptions or above +// the maximum quota. +maybeRevokePartitions(); -if (!unassignedPartitions.isEmpty()) { -throw new PartitionAssignorException("Partitions were left unassigned"); -} +// Assign the unassigned partitions to the members with space. +assignRemainingPartitions(); return new GroupAssignment(targetAssignment); } -/** - * Retains a set of partitions from the existing assignment and includes them in the target assignment. - * Only relevant partitions that exist in the current topic metadata and subscriptions are considered. - * - * For each member: - * - * Find the valid current assignment considering topic subscriptions and metadata - * If the current assignment exists, retain partitions up to the minimum quota. - * If the current assignment size is greater than the minimum quota and - * there are members that could get an extra partition, assign the next partition as well. - * Finally, if the member's current assignment size is less than the minimum quota, - * add them to the potentially unfilled members map and track the number of remaining - * partitions required to meet the quota. - * - * - * - * @return Members mapped to the remaining number of partitions needed to meet the minimum quota, - * including members that are eligible to receive an extra partition. - */ -private Map assignStickyPartitions(int minQuota) { -Map potentiallyUnfilledMembers = new HashMap<>(); - -groupSpec.members().forEach((memberId, assignmentMemberSpec) -> { -List validCurrentMemberAssignment = validCurrentMemberAssignment( -assignmentMemberSpec.assignedPartitions() -); - -int currentAssignmentSize = validCurrentMemberAssignment.size(); -// Number of partitions required to meet the minimum quota. -int remaining = minQuota - currentAssignmentSize; - -if (currentAssignmentSize > 0) { -int retainedPartitionsCount = min(currentAssignmentSize, minQuota); -IntStream.range(0, retainedPartitionsCount).forEach(i -> { -TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(i); -addPartitionToAssignment( -targetAssignment, -memberId, -topicIdPartition.topicId(), -topicIdPartition.partitionId() -); -}); - -if (remaining < 0) { -// The extra partition is located at the last index from the previous step. -if (remainingMembersToGetAnExtraPartition > 0) { -TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(retainedPartitionsCount++); -addPartitionToAssignment( -targetAssignment, -memberId, -topicIdPartition.topicId(), -topicIdPartition.partitionId() -); -remainingMembersToGetAnExtraPartition--; +private void maybeRevokePartitions() { +for (Map.Entry entry : groupSpec.members().entrySet()) { +String memberId = entry.getKey(); +AssignmentMemberSpec assignmentMemberSpec = entry.getValue(); +Map> oldAssignment
Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]
rreddy-22 commented on code in PR #16088: URL: https://github.com/apache/kafka/pull/16088#discussion_r1619898066 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java: ## @@ -144,216 +133,137 @@ protected GroupAssignment buildAssignment() throws PartitionAssignorException { } } -// The minimum required quota that each member needs to meet for a balanced assignment. -// This is the same for all members. -final int numberOfMembers = groupSpec.members().size(); -final int minQuota = totalPartitionsCount / numberOfMembers; +// Compute the minimum required quota per member and the number of members +// who should receive an extra partition. +int numberOfMembers = groupSpec.members().size(); +minimumMemberQuota = totalPartitionsCount / numberOfMembers; remainingMembersToGetAnExtraPartition = totalPartitionsCount % numberOfMembers; -groupSpec.members().keySet().forEach(memberId -> -targetAssignment.put(memberId, new MemberAssignment(new HashMap<>()) -)); - -potentiallyUnfilledMembers = assignStickyPartitions(minQuota); - -unassignedPartitionsRoundRobinAssignment(); +// Revoke the partitions which are either not part of the subscriptions or above +// the maximum quota. +maybeRevokePartitions(); -if (!unassignedPartitions.isEmpty()) { -throw new PartitionAssignorException("Partitions were left unassigned"); -} +// Assign the unassigned partitions to the members with space. +assignRemainingPartitions(); return new GroupAssignment(targetAssignment); } -/** - * Retains a set of partitions from the existing assignment and includes them in the target assignment. - * Only relevant partitions that exist in the current topic metadata and subscriptions are considered. - * - * For each member: - * - * Find the valid current assignment considering topic subscriptions and metadata - * If the current assignment exists, retain partitions up to the minimum quota. - * If the current assignment size is greater than the minimum quota and - * there are members that could get an extra partition, assign the next partition as well. - * Finally, if the member's current assignment size is less than the minimum quota, - * add them to the potentially unfilled members map and track the number of remaining - * partitions required to meet the quota. - * - * - * - * @return Members mapped to the remaining number of partitions needed to meet the minimum quota, - * including members that are eligible to receive an extra partition. - */ -private Map assignStickyPartitions(int minQuota) { -Map potentiallyUnfilledMembers = new HashMap<>(); - -groupSpec.members().forEach((memberId, assignmentMemberSpec) -> { -List validCurrentMemberAssignment = validCurrentMemberAssignment( -assignmentMemberSpec.assignedPartitions() -); - -int currentAssignmentSize = validCurrentMemberAssignment.size(); -// Number of partitions required to meet the minimum quota. -int remaining = minQuota - currentAssignmentSize; - -if (currentAssignmentSize > 0) { -int retainedPartitionsCount = min(currentAssignmentSize, minQuota); -IntStream.range(0, retainedPartitionsCount).forEach(i -> { -TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(i); -addPartitionToAssignment( -targetAssignment, -memberId, -topicIdPartition.topicId(), -topicIdPartition.partitionId() -); -}); - -if (remaining < 0) { -// The extra partition is located at the last index from the previous step. -if (remainingMembersToGetAnExtraPartition > 0) { -TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(retainedPartitionsCount++); -addPartitionToAssignment( -targetAssignment, -memberId, -topicIdPartition.topicId(), -topicIdPartition.partitionId() -); -remainingMembersToGetAnExtraPartition--; +private void maybeRevokePartitions() { +for (Map.Entry entry : groupSpec.members().entrySet()) { +String memberId = entry.getKey(); +AssignmentMemberSpec assignmentMemberSpec = entry.getValue(); +Map> oldAssignment
Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]
rreddy-22 commented on code in PR #16088: URL: https://github.com/apache/kafka/pull/16088#discussion_r1619894784 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java: ## @@ -144,216 +133,137 @@ protected GroupAssignment buildAssignment() throws PartitionAssignorException { } } -// The minimum required quota that each member needs to meet for a balanced assignment. -// This is the same for all members. -final int numberOfMembers = groupSpec.members().size(); -final int minQuota = totalPartitionsCount / numberOfMembers; +// Compute the minimum required quota per member and the number of members +// who should receive an extra partition. +int numberOfMembers = groupSpec.members().size(); +minimumMemberQuota = totalPartitionsCount / numberOfMembers; remainingMembersToGetAnExtraPartition = totalPartitionsCount % numberOfMembers; -groupSpec.members().keySet().forEach(memberId -> -targetAssignment.put(memberId, new MemberAssignment(new HashMap<>()) -)); - -potentiallyUnfilledMembers = assignStickyPartitions(minQuota); - -unassignedPartitionsRoundRobinAssignment(); +// Revoke the partitions which are either not part of the subscriptions or above Review Comment: nit: // Revoke the partitions that either: // 1. Are not part of the member's current subscription. // 2. Exceed the maximum quota assigned to each member. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]
rreddy-22 commented on code in PR #16088: URL: https://github.com/apache/kafka/pull/16088#discussion_r1619892264 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java: ## @@ -144,216 +133,137 @@ protected GroupAssignment buildAssignment() throws PartitionAssignorException { } } -// The minimum required quota that each member needs to meet for a balanced assignment. -// This is the same for all members. -final int numberOfMembers = groupSpec.members().size(); -final int minQuota = totalPartitionsCount / numberOfMembers; +// Compute the minimum required quota per member and the number of members +// who should receive an extra partition. Review Comment: nit: members that* should -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]
rreddy-22 commented on code in PR #16088: URL: https://github.com/apache/kafka/pull/16088#discussion_r1619888416 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java: ## @@ -53,8 +42,17 @@ * The assignment builder prioritizes the properties in the following order: * Balance > Stickiness. */ -public class OptimizedUniformAssignmentBuilder extends AbstractUniformAssignmentBuilder { -private static final Logger LOG = LoggerFactory.getLogger(OptimizedUniformAssignmentBuilder.class); +public class OptimizedUniformAssignmentBuilder { +private static final Class UNMODIFIALBE_MAP_CLASS = Collections.unmodifiableMap(new HashMap<>()).getClass(); +private static final Class EMPTY_MAP_CLASS = Collections.emptyMap().getClass(); + +/** + * @return True if the provided map is an UnmodifiableMap or EmptyMap. Those classes are not + * public hence we cannot use the `instanceof` operator. + */ +private static boolean isImmutableMap(Map map) { +return UNMODIFIALBE_MAP_CLASS.isInstance(map) || EMPTY_MAP_CLASS.isInstance(map); Review Comment: nit: UNMODIFIABLE * -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]
dajac commented on code in PR #16088: URL: https://github.com/apache/kafka/pull/16088#discussion_r1619884277 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java: ## @@ -18,29 +18,18 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.server.common.TopicIdPartition; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Queue; import java.util.Set; -import java.util.stream.Collectors; -import java.util.stream.IntStream; - -import static java.lang.Math.min; /** - * The optimized uniform assignment builder is used to generate the target assignment for a consumer group with + * The homogenous uniform assignment builder is used to generate the target assignment for a consumer group with Review Comment: Interesting… So you’re fine with adding those names to the public interface (what the end user really see) but not to follow the same naming scheme for the internal classes (that the end user don’t see at all). In my opinion, it makes sense to align as it will make it more coherent. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]
rreddy-22 commented on code in PR #16088: URL: https://github.com/apache/kafka/pull/16088#discussion_r1619875174 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java: ## @@ -18,29 +18,18 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.server.common.TopicIdPartition; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Queue; import java.util.Set; -import java.util.stream.Collectors; -import java.util.stream.IntStream; - -import static java.lang.Math.min; /** - * The optimized uniform assignment builder is used to generate the target assignment for a consumer group with + * The homogenous uniform assignment builder is used to generate the target assignment for a consumer group with Review Comment: I feel like homogeneous and heterogeneous are harder names from a user pov, I think we should keep it as is -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]
rreddy-22 commented on code in PR #16088: URL: https://github.com/apache/kafka/pull/16088#discussion_r1619511354 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java: ## @@ -53,8 +42,17 @@ * The assignment builder prioritizes the properties in the following order: * Balance > Stickiness. */ -public class OptimizedUniformAssignmentBuilder extends AbstractUniformAssignmentBuilder { -private static final Logger LOG = LoggerFactory.getLogger(OptimizedUniformAssignmentBuilder.class); +public class OptimizedUniformAssignmentBuilder { Review Comment: I had the same question, we might need it in the rack aware case for all the common methods -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]
jeffkbkim commented on code in PR #16088: URL: https://github.com/apache/kafka/pull/16088#discussion_r1619407866 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java: ## @@ -53,8 +42,17 @@ * The assignment builder prioritizes the properties in the following order: * Balance > Stickiness. */ -public class OptimizedUniformAssignmentBuilder extends AbstractUniformAssignmentBuilder { -private static final Logger LOG = LoggerFactory.getLogger(OptimizedUniformAssignmentBuilder.class); +public class OptimizedUniformAssignmentBuilder { Review Comment: are we going to remove AbstractUniformAssignmentBuilder? ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java: ## @@ -158,12 +159,11 @@ public void testFirstAssignmentTwoMembersTwoTopicsNoMemberRacks() { Map>> expectedAssignment = new HashMap<>(); expectedAssignment.put(memberA, mkAssignment( -mkTopicAssignment(topic1Uuid, 0, 2), -mkTopicAssignment(topic3Uuid, 1) +mkTopicAssignment(topic1Uuid, 0), +mkTopicAssignment(topic3Uuid, 0, 1) )); expectedAssignment.put(memberB, mkAssignment( -mkTopicAssignment(topic1Uuid, 1), -mkTopicAssignment(topic3Uuid, 0) +mkTopicAssignment(topic1Uuid, 1, 2) Review Comment: I see some test cases where we have changed the expected assignment. why did it change? ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java: ## @@ -144,216 +133,137 @@ protected GroupAssignment buildAssignment() throws PartitionAssignorException { } } -// The minimum required quota that each member needs to meet for a balanced assignment. -// This is the same for all members. -final int numberOfMembers = groupSpec.members().size(); -final int minQuota = totalPartitionsCount / numberOfMembers; +// Compute the minimum required quota per member and the number of members +// who should receive an extra partition. +int numberOfMembers = groupSpec.members().size(); +minimumMemberQuota = totalPartitionsCount / numberOfMembers; remainingMembersToGetAnExtraPartition = totalPartitionsCount % numberOfMembers; -groupSpec.members().keySet().forEach(memberId -> -targetAssignment.put(memberId, new MemberAssignment(new HashMap<>()) -)); - -potentiallyUnfilledMembers = assignStickyPartitions(minQuota); - -unassignedPartitionsRoundRobinAssignment(); +// Revoke the partitions which are either not part of the subscriptions or above +// the maximum quota. +maybeRevokePartitions(); -if (!unassignedPartitions.isEmpty()) { -throw new PartitionAssignorException("Partitions were left unassigned"); -} +// Assign the unassigned partitions to the members with space. +assignRemainingPartitions(); return new GroupAssignment(targetAssignment); } -/** - * Retains a set of partitions from the existing assignment and includes them in the target assignment. - * Only relevant partitions that exist in the current topic metadata and subscriptions are considered. - * - * For each member: - * - * Find the valid current assignment considering topic subscriptions and metadata - * If the current assignment exists, retain partitions up to the minimum quota. - * If the current assignment size is greater than the minimum quota and - * there are members that could get an extra partition, assign the next partition as well. - * Finally, if the member's current assignment size is less than the minimum quota, - * add them to the potentially unfilled members map and track the number of remaining - * partitions required to meet the quota. - * - * - * - * @return Members mapped to the remaining number of partitions needed to meet the minimum quota, - * including members that are eligible to receive an extra partition. - */ -private Map assignStickyPartitions(int minQuota) { -Map potentiallyUnfilledMembers = new HashMap<>(); - -groupSpec.members().forEach((memberId, assignmentMemberSpec) -> { -List validCurrentMemberAssignment = validCurrentMemberAssignment( -assignmentMemberSpec.assignedPartitions() -); - -int currentAssignmentSize = validCurrentMemberAssignment.size(); -// Number of partitions required to meet the minimum quota. -int remaining = minQuota - currentAssignmentSize; - -if (currentAssignmentSize > 0) { -int re
Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]
dajac commented on code in PR #16088: URL: https://github.com/apache/kafka/pull/16088#discussion_r1619198458 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java: ## @@ -18,29 +18,18 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.server.common.TopicIdPartition; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Queue; import java.util.Set; -import java.util.stream.Collectors; -import java.util.stream.IntStream; - -import static java.lang.Math.min; /** - * The optimized uniform assignment builder is used to generate the target assignment for a consumer group with + * The homogenous uniform assignment builder is used to generate the target assignment for a consumer group with Review Comment: I would like to. I initially renamed the homogeneous one in this PR but @jeffkbkim asked to do it as a follow-up in order to simplify the diff. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]
dajac commented on code in PR #16088: URL: https://github.com/apache/kafka/pull/16088#discussion_r1619198458 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java: ## @@ -18,29 +18,18 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.server.common.TopicIdPartition; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Queue; import java.util.Set; -import java.util.stream.Collectors; -import java.util.stream.IntStream; - -import static java.lang.Math.min; /** - * The optimized uniform assignment builder is used to generate the target assignment for a consumer group with + * The homogenous uniform assignment builder is used to generate the target assignment for a consumer group with Review Comment: I would like too. I initially renamed the homogeneous one in this PR but @jeffkbkim asked to do it as a follow-up in order to simplify the diff. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]
dajac commented on PR #16088: URL: https://github.com/apache/kafka/pull/16088#issuecomment-2137850455 @rreddy-22 I just triggered a new build. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]
rreddy-22 commented on code in PR #16088: URL: https://github.com/apache/kafka/pull/16088#discussion_r1619195556 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java: ## @@ -18,29 +18,18 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.server.common.TopicIdPartition; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Queue; import java.util.Set; -import java.util.stream.Collectors; -import java.util.stream.IntStream; - -import static java.lang.Math.min; /** - * The optimized uniform assignment builder is used to generate the target assignment for a consumer group with + * The homogenous uniform assignment builder is used to generate the target assignment for a consumer group with Review Comment: Are we trying to rename the assignor to homogeneous and heterogeneous instead of optimized and general? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]
rreddy-22 commented on PR #16088: URL: https://github.com/apache/kafka/pull/16088#issuecomment-2137846798 Thanks for the PR! The build seems to be failing, can we look into it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]
dajac commented on PR #16088: URL: https://github.com/apache/kafka/pull/16088#issuecomment-2136604335 @jeffkbkim Your understanding is correct. The biggest gain comes from the rewrite of the "assign sticky partitions" methods. I did two things there: a) avoid the intermediate data structure (the topicpartition list); and b) avoid copying unchanged assignments. Regarding your point 3), the sorting does not have a real impact on the incremental case but it does have in the full assignment case. As you saw, we don't need it. Note that we don't use a Set anymore for the unassigned partitions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]
jeffkbkim commented on PR #16088: URL: https://github.com/apache/kafka/pull/16088#issuecomment-2136254190 @dajac for me, this seems like a non-trivial change. it seems we are changing a core part of the assignment logic. some general questions wrt the improved performance (numbers look great!) I have, specifically on what contributed to the large gain: 1. It seems a lot of the gain come from the fact that the cost of assigning a partition was expensive and we now can bypass a lot of it. Not just in terms of object creation but also bookkeeping/updating all of the different data structures 2. Usually we will not revoke many partitions at once, which i think is the case for the benchmark. So we also gain from `num_partitions_to_revoke <<< num_partitions_to_assign` 3. We no longer sort the unassigned(remaining) partitions. We don't need this as we iterate the `Set unassignedPartitions` via the partition index. Let me know if my understanding is correct. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]
dajac commented on PR #16088: URL: https://github.com/apache/kafka/pull/16088#issuecomment-2135700577 @jeffkbkim Done. However, I am not sure if it will really help because the changes are minor. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]
jeffkbkim commented on PR #16088: URL: https://github.com/apache/kafka/pull/16088#issuecomment-2135625791 it's a bit difficult to see the changes which are supposedly minor since the file name has changed. Can we do that separately, so that we can see the diff? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Optimize uniform (homogenous) assignor [kafka]
dajac opened a new pull request, #16088: URL: https://github.com/apache/kafka/pull/16088 This patch optimizes uniform (homogenous) assignor by avoiding creating a copy of all the assignments. Instead, the assignor creates a copy only if the assignment is updated. It is a sort of copy-on-write. This change reduces the overhead of the TargetAssignmentBuilder when ran with the uniform (homogenous) assignor. Trunk: ``` Benchmark (memberCount) (partitionsToMemberRatio) (topicCount) Mode Cnt Score Error Units TargetAssignmentBuilderBenchmark.build1 10 100 avgt5 24.535 ± 1.583 ms/op TargetAssignmentBuilderBenchmark.build1 10 1000 avgt5 24.094 ± 0.223 ms/op JMH benchmarks done ``` ``` Benchmark (assignmentType) (assignorType) (isRackAware) (memberCount) (partitionsToMemberRatio) (subscriptionType) (topicCount) Mode Cnt Score Error Units ServerSideAssignorBenchmark.doAssignment INCREMENTAL UNIFORM false 1 10 HOMOGENEOUS 100 avgt5 14.697 ± 0.133 ms/op ServerSideAssignorBenchmark.doAssignment INCREMENTAL UNIFORM false 1 10 HOMOGENEOUS 1000 avgt5 15.073 ± 0.135 ms/op JMH benchmarks done ``` Patch: ``` Benchmark (memberCount) (partitionsToMemberRatio) (topicCount) Mode Cnt Score Error Units TargetAssignmentBuilderBenchmark.build1 10 100 avgt5 3.376 ± 0.577 ms/op TargetAssignmentBuilderBenchmark.build1 10 1000 avgt5 3.731 ± 0.359 ms/op JMH benchmarks done ``` ``` Benchmark (assignmentType) (assignorType) (isRackAware) (memberCount) (partitionsToMemberRatio) (subscriptionType) (topicCount) Mode Cnt Score Error Units ServerSideAssignorBenchmark.doAssignment INCREMENTAL UNIFORM false 1 10 HOMOGENEOUS 100 avgt5 1.975 ± 0.086 ms/op ServerSideAssignorBenchmark.doAssignment INCREMENTAL UNIFORM false 1 10 HOMOGENEOUS 1000 avgt5 2.026 ± 0.190 ms/op JMH benchmarks done ``` ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org