Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]

2024-05-31 Thread via GitHub


dajac merged PR #16088:
URL: https://github.com/apache/kafka/pull/16088


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]

2024-05-31 Thread via GitHub


dajac commented on code in PR #16088:
URL: https://github.com/apache/kafka/pull/16088#discussion_r1622324676


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java:
##
@@ -158,12 +159,11 @@ public void 
testFirstAssignmentTwoMembersTwoTopicsNoMemberRacks() {
 
 Map>> expectedAssignment = new 
HashMap<>();
 expectedAssignment.put(memberA, mkAssignment(
-mkTopicAssignment(topic1Uuid, 0, 2),
-mkTopicAssignment(topic3Uuid, 1)
+mkTopicAssignment(topic1Uuid, 0),
+mkTopicAssignment(topic3Uuid, 0, 1)
 ));
 expectedAssignment.put(memberB, mkAssignment(
-mkTopicAssignment(topic1Uuid, 1),
-mkTopicAssignment(topic3Uuid, 0)
+mkTopicAssignment(topic1Uuid, 1, 2)

Review Comment:
   The changes that I made changed the output of the assignor. The output is 
still correct though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]

2024-05-31 Thread via GitHub


dajac commented on PR #16088:
URL: https://github.com/apache/kafka/pull/16088#issuecomment-2141914425

   @jeffkbkim @jolshan @rreddy-22 Thanks for your comments. I addressed them.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]

2024-05-31 Thread via GitHub


dajac commented on code in PR #16088:
URL: https://github.com/apache/kafka/pull/16088#discussion_r1622323588


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java:
##
@@ -144,216 +133,137 @@ protected GroupAssignment buildAssignment() throws 
PartitionAssignorException {
 }
 }
 
-// The minimum required quota that each member needs to meet for a 
balanced assignment.
-// This is the same for all members.
-final int numberOfMembers = groupSpec.members().size();
-final int minQuota = totalPartitionsCount / numberOfMembers;
+// Compute the minimum required quota per member and the number of 
members
+// who should receive an extra partition.
+int numberOfMembers = groupSpec.members().size();
+minimumMemberQuota = totalPartitionsCount / numberOfMembers;
 remainingMembersToGetAnExtraPartition = totalPartitionsCount % 
numberOfMembers;
 
-groupSpec.members().keySet().forEach(memberId ->
-targetAssignment.put(memberId, new MemberAssignment(new 
HashMap<>())
-));
-
-potentiallyUnfilledMembers = assignStickyPartitions(minQuota);
-
-unassignedPartitionsRoundRobinAssignment();
+// Revoke the partitions which are either not part of the 
subscriptions or above
+// the maximum quota.
+maybeRevokePartitions();
 
-if (!unassignedPartitions.isEmpty()) {
-throw new PartitionAssignorException("Partitions were left 
unassigned");
-}
+// Assign the unassigned partitions to the members with space.
+assignRemainingPartitions();
 
 return new GroupAssignment(targetAssignment);
 }
 
-/**
- * Retains a set of partitions from the existing assignment and includes 
them in the target assignment.
- * Only relevant partitions that exist in the current topic metadata and 
subscriptions are considered.
- *
- *  For each member:
- * 
- *  Find the valid current assignment considering topic 
subscriptions and metadata
- *  If the current assignment exists, retain partitions up to the 
minimum quota.
- *  If the current assignment size is greater than the minimum 
quota and
- *  there are members that could get an extra partition, assign 
the next partition as well.
- *  Finally, if the member's current assignment size is less than 
the minimum quota,
- *  add them to the potentially unfilled members map and track the 
number of remaining
- *  partitions required to meet the quota.
- * 
- * 
- *
- * @return  Members mapped to the remaining number of partitions needed to 
meet the minimum quota,
- *  including members that are eligible to receive an extra 
partition.
- */
-private Map assignStickyPartitions(int minQuota) {
-Map potentiallyUnfilledMembers = new HashMap<>();
-
-groupSpec.members().forEach((memberId, assignmentMemberSpec) -> {
-List validCurrentMemberAssignment = 
validCurrentMemberAssignment(
-assignmentMemberSpec.assignedPartitions()
-);
-
-int currentAssignmentSize = validCurrentMemberAssignment.size();
-// Number of partitions required to meet the minimum quota.
-int remaining = minQuota - currentAssignmentSize;
-
-if (currentAssignmentSize > 0) {
-int retainedPartitionsCount = min(currentAssignmentSize, 
minQuota);
-IntStream.range(0, retainedPartitionsCount).forEach(i -> {
-TopicIdPartition topicIdPartition = 
validCurrentMemberAssignment.get(i);
-addPartitionToAssignment(
-targetAssignment,
-memberId,
-topicIdPartition.topicId(),
-topicIdPartition.partitionId()
-);
-});
-
-if (remaining < 0) {
-// The extra partition is located at the last index from 
the previous step.
-if (remainingMembersToGetAnExtraPartition > 0) {
-TopicIdPartition topicIdPartition = 
validCurrentMemberAssignment.get(retainedPartitionsCount++);
-addPartitionToAssignment(
-targetAssignment,
-memberId,
-topicIdPartition.topicId(),
-topicIdPartition.partitionId()
-);
-remainingMembersToGetAnExtraPartition--;
+private void maybeRevokePartitions() {
+for (Map.Entry entry : 
groupSpec.members().entrySet()) {
+String memberId = entry.getKey();
+AssignmentMemberSpec assignmentMemberSpec = entry.getValue();
+Map> oldAssignment = 
a

Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]

2024-05-31 Thread via GitHub


dajac commented on code in PR #16088:
URL: https://github.com/apache/kafka/pull/16088#discussion_r1622321434


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java:
##
@@ -144,216 +133,137 @@ protected GroupAssignment buildAssignment() throws 
PartitionAssignorException {
 }
 }
 
-// The minimum required quota that each member needs to meet for a 
balanced assignment.
-// This is the same for all members.
-final int numberOfMembers = groupSpec.members().size();
-final int minQuota = totalPartitionsCount / numberOfMembers;
+// Compute the minimum required quota per member and the number of 
members
+// who should receive an extra partition.
+int numberOfMembers = groupSpec.members().size();
+minimumMemberQuota = totalPartitionsCount / numberOfMembers;
 remainingMembersToGetAnExtraPartition = totalPartitionsCount % 
numberOfMembers;
 
-groupSpec.members().keySet().forEach(memberId ->
-targetAssignment.put(memberId, new MemberAssignment(new 
HashMap<>())
-));
-
-potentiallyUnfilledMembers = assignStickyPartitions(minQuota);
-
-unassignedPartitionsRoundRobinAssignment();
+// Revoke the partitions which are either not part of the 
subscriptions or above
+// the maximum quota.
+maybeRevokePartitions();
 
-if (!unassignedPartitions.isEmpty()) {
-throw new PartitionAssignorException("Partitions were left 
unassigned");
-}
+// Assign the unassigned partitions to the members with space.
+assignRemainingPartitions();
 
 return new GroupAssignment(targetAssignment);
 }
 
-/**
- * Retains a set of partitions from the existing assignment and includes 
them in the target assignment.
- * Only relevant partitions that exist in the current topic metadata and 
subscriptions are considered.
- *
- *  For each member:
- * 
- *  Find the valid current assignment considering topic 
subscriptions and metadata
- *  If the current assignment exists, retain partitions up to the 
minimum quota.
- *  If the current assignment size is greater than the minimum 
quota and
- *  there are members that could get an extra partition, assign 
the next partition as well.
- *  Finally, if the member's current assignment size is less than 
the minimum quota,
- *  add them to the potentially unfilled members map and track the 
number of remaining
- *  partitions required to meet the quota.
- * 
- * 
- *
- * @return  Members mapped to the remaining number of partitions needed to 
meet the minimum quota,
- *  including members that are eligible to receive an extra 
partition.
- */
-private Map assignStickyPartitions(int minQuota) {
-Map potentiallyUnfilledMembers = new HashMap<>();
-
-groupSpec.members().forEach((memberId, assignmentMemberSpec) -> {
-List validCurrentMemberAssignment = 
validCurrentMemberAssignment(
-assignmentMemberSpec.assignedPartitions()
-);
-
-int currentAssignmentSize = validCurrentMemberAssignment.size();
-// Number of partitions required to meet the minimum quota.
-int remaining = minQuota - currentAssignmentSize;
-
-if (currentAssignmentSize > 0) {
-int retainedPartitionsCount = min(currentAssignmentSize, 
minQuota);
-IntStream.range(0, retainedPartitionsCount).forEach(i -> {
-TopicIdPartition topicIdPartition = 
validCurrentMemberAssignment.get(i);
-addPartitionToAssignment(
-targetAssignment,
-memberId,
-topicIdPartition.topicId(),
-topicIdPartition.partitionId()
-);
-});
-
-if (remaining < 0) {
-// The extra partition is located at the last index from 
the previous step.
-if (remainingMembersToGetAnExtraPartition > 0) {
-TopicIdPartition topicIdPartition = 
validCurrentMemberAssignment.get(retainedPartitionsCount++);
-addPartitionToAssignment(
-targetAssignment,
-memberId,
-topicIdPartition.topicId(),
-topicIdPartition.partitionId()
-);
-remainingMembersToGetAnExtraPartition--;
+private void maybeRevokePartitions() {
+for (Map.Entry entry : 
groupSpec.members().entrySet()) {
+String memberId = entry.getKey();
+AssignmentMemberSpec assignmentMemberSpec = entry.getValue();
+Map> oldAssignment = 
a

Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]

2024-05-31 Thread via GitHub


dajac commented on code in PR #16088:
URL: https://github.com/apache/kafka/pull/16088#discussion_r1622321107


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java:
##
@@ -144,216 +133,137 @@ protected GroupAssignment buildAssignment() throws 
PartitionAssignorException {
 }
 }
 
-// The minimum required quota that each member needs to meet for a 
balanced assignment.
-// This is the same for all members.
-final int numberOfMembers = groupSpec.members().size();
-final int minQuota = totalPartitionsCount / numberOfMembers;
+// Compute the minimum required quota per member and the number of 
members
+// who should receive an extra partition.
+int numberOfMembers = groupSpec.members().size();
+minimumMemberQuota = totalPartitionsCount / numberOfMembers;
 remainingMembersToGetAnExtraPartition = totalPartitionsCount % 
numberOfMembers;
 
-groupSpec.members().keySet().forEach(memberId ->
-targetAssignment.put(memberId, new MemberAssignment(new 
HashMap<>())
-));
-
-potentiallyUnfilledMembers = assignStickyPartitions(minQuota);
-
-unassignedPartitionsRoundRobinAssignment();
+// Revoke the partitions which are either not part of the 
subscriptions or above
+// the maximum quota.
+maybeRevokePartitions();
 
-if (!unassignedPartitions.isEmpty()) {
-throw new PartitionAssignorException("Partitions were left 
unassigned");
-}
+// Assign the unassigned partitions to the members with space.
+assignRemainingPartitions();
 
 return new GroupAssignment(targetAssignment);
 }
 
-/**
- * Retains a set of partitions from the existing assignment and includes 
them in the target assignment.
- * Only relevant partitions that exist in the current topic metadata and 
subscriptions are considered.
- *
- *  For each member:
- * 
- *  Find the valid current assignment considering topic 
subscriptions and metadata
- *  If the current assignment exists, retain partitions up to the 
minimum quota.
- *  If the current assignment size is greater than the minimum 
quota and
- *  there are members that could get an extra partition, assign 
the next partition as well.
- *  Finally, if the member's current assignment size is less than 
the minimum quota,
- *  add them to the potentially unfilled members map and track the 
number of remaining
- *  partitions required to meet the quota.
- * 
- * 
- *
- * @return  Members mapped to the remaining number of partitions needed to 
meet the minimum quota,
- *  including members that are eligible to receive an extra 
partition.
- */
-private Map assignStickyPartitions(int minQuota) {
-Map potentiallyUnfilledMembers = new HashMap<>();
-
-groupSpec.members().forEach((memberId, assignmentMemberSpec) -> {
-List validCurrentMemberAssignment = 
validCurrentMemberAssignment(
-assignmentMemberSpec.assignedPartitions()
-);
-
-int currentAssignmentSize = validCurrentMemberAssignment.size();
-// Number of partitions required to meet the minimum quota.
-int remaining = minQuota - currentAssignmentSize;
-
-if (currentAssignmentSize > 0) {
-int retainedPartitionsCount = min(currentAssignmentSize, 
minQuota);
-IntStream.range(0, retainedPartitionsCount).forEach(i -> {
-TopicIdPartition topicIdPartition = 
validCurrentMemberAssignment.get(i);
-addPartitionToAssignment(
-targetAssignment,
-memberId,
-topicIdPartition.topicId(),
-topicIdPartition.partitionId()
-);
-});
-
-if (remaining < 0) {
-// The extra partition is located at the last index from 
the previous step.
-if (remainingMembersToGetAnExtraPartition > 0) {
-TopicIdPartition topicIdPartition = 
validCurrentMemberAssignment.get(retainedPartitionsCount++);
-addPartitionToAssignment(
-targetAssignment,
-memberId,
-topicIdPartition.topicId(),
-topicIdPartition.partitionId()
-);
-remainingMembersToGetAnExtraPartition--;
+private void maybeRevokePartitions() {
+for (Map.Entry entry : 
groupSpec.members().entrySet()) {
+String memberId = entry.getKey();
+AssignmentMemberSpec assignmentMemberSpec = entry.getValue();
+Map> oldAssignment = 
a

Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]

2024-05-31 Thread via GitHub


dajac commented on code in PR #16088:
URL: https://github.com/apache/kafka/pull/16088#discussion_r1622320299


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java:
##
@@ -144,216 +133,137 @@ protected GroupAssignment buildAssignment() throws 
PartitionAssignorException {
 }
 }
 
-// The minimum required quota that each member needs to meet for a 
balanced assignment.
-// This is the same for all members.
-final int numberOfMembers = groupSpec.members().size();
-final int minQuota = totalPartitionsCount / numberOfMembers;
+// Compute the minimum required quota per member and the number of 
members
+// who should receive an extra partition.
+int numberOfMembers = groupSpec.members().size();
+minimumMemberQuota = totalPartitionsCount / numberOfMembers;
 remainingMembersToGetAnExtraPartition = totalPartitionsCount % 
numberOfMembers;
 
-groupSpec.members().keySet().forEach(memberId ->
-targetAssignment.put(memberId, new MemberAssignment(new 
HashMap<>())
-));
-
-potentiallyUnfilledMembers = assignStickyPartitions(minQuota);
-
-unassignedPartitionsRoundRobinAssignment();
+// Revoke the partitions which are either not part of the 
subscriptions or above
+// the maximum quota.
+maybeRevokePartitions();
 
-if (!unassignedPartitions.isEmpty()) {
-throw new PartitionAssignorException("Partitions were left 
unassigned");
-}
+// Assign the unassigned partitions to the members with space.
+assignRemainingPartitions();
 
 return new GroupAssignment(targetAssignment);
 }
 
-/**
- * Retains a set of partitions from the existing assignment and includes 
them in the target assignment.
- * Only relevant partitions that exist in the current topic metadata and 
subscriptions are considered.
- *
- *  For each member:
- * 
- *  Find the valid current assignment considering topic 
subscriptions and metadata
- *  If the current assignment exists, retain partitions up to the 
minimum quota.
- *  If the current assignment size is greater than the minimum 
quota and
- *  there are members that could get an extra partition, assign 
the next partition as well.
- *  Finally, if the member's current assignment size is less than 
the minimum quota,
- *  add them to the potentially unfilled members map and track the 
number of remaining
- *  partitions required to meet the quota.
- * 
- * 
- *
- * @return  Members mapped to the remaining number of partitions needed to 
meet the minimum quota,
- *  including members that are eligible to receive an extra 
partition.
- */
-private Map assignStickyPartitions(int minQuota) {
-Map potentiallyUnfilledMembers = new HashMap<>();
-
-groupSpec.members().forEach((memberId, assignmentMemberSpec) -> {
-List validCurrentMemberAssignment = 
validCurrentMemberAssignment(
-assignmentMemberSpec.assignedPartitions()
-);
-
-int currentAssignmentSize = validCurrentMemberAssignment.size();
-// Number of partitions required to meet the minimum quota.
-int remaining = minQuota - currentAssignmentSize;
-
-if (currentAssignmentSize > 0) {
-int retainedPartitionsCount = min(currentAssignmentSize, 
minQuota);
-IntStream.range(0, retainedPartitionsCount).forEach(i -> {
-TopicIdPartition topicIdPartition = 
validCurrentMemberAssignment.get(i);
-addPartitionToAssignment(
-targetAssignment,
-memberId,
-topicIdPartition.topicId(),
-topicIdPartition.partitionId()
-);
-});
-
-if (remaining < 0) {
-// The extra partition is located at the last index from 
the previous step.
-if (remainingMembersToGetAnExtraPartition > 0) {
-TopicIdPartition topicIdPartition = 
validCurrentMemberAssignment.get(retainedPartitionsCount++);
-addPartitionToAssignment(
-targetAssignment,
-memberId,
-topicIdPartition.topicId(),
-topicIdPartition.partitionId()
-);
-remainingMembersToGetAnExtraPartition--;
+private void maybeRevokePartitions() {
+for (Map.Entry entry : 
groupSpec.members().entrySet()) {
+String memberId = entry.getKey();
+AssignmentMemberSpec assignmentMemberSpec = entry.getValue();
+Map> oldAssignment = 
a

Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]

2024-05-31 Thread via GitHub


dajac commented on code in PR #16088:
URL: https://github.com/apache/kafka/pull/16088#discussion_r1622319160


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java:
##
@@ -144,216 +133,137 @@ protected GroupAssignment buildAssignment() throws 
PartitionAssignorException {
 }
 }
 
-// The minimum required quota that each member needs to meet for a 
balanced assignment.
-// This is the same for all members.
-final int numberOfMembers = groupSpec.members().size();
-final int minQuota = totalPartitionsCount / numberOfMembers;
+// Compute the minimum required quota per member and the number of 
members
+// who should receive an extra partition.
+int numberOfMembers = groupSpec.members().size();
+minimumMemberQuota = totalPartitionsCount / numberOfMembers;
 remainingMembersToGetAnExtraPartition = totalPartitionsCount % 
numberOfMembers;
 
-groupSpec.members().keySet().forEach(memberId ->
-targetAssignment.put(memberId, new MemberAssignment(new 
HashMap<>())
-));
-
-potentiallyUnfilledMembers = assignStickyPartitions(minQuota);
-
-unassignedPartitionsRoundRobinAssignment();
+// Revoke the partitions which are either not part of the 
subscriptions or above
+// the maximum quota.
+maybeRevokePartitions();
 
-if (!unassignedPartitions.isEmpty()) {
-throw new PartitionAssignorException("Partitions were left 
unassigned");
-}
+// Assign the unassigned partitions to the members with space.
+assignRemainingPartitions();
 
 return new GroupAssignment(targetAssignment);
 }
 
-/**
- * Retains a set of partitions from the existing assignment and includes 
them in the target assignment.
- * Only relevant partitions that exist in the current topic metadata and 
subscriptions are considered.
- *
- *  For each member:
- * 
- *  Find the valid current assignment considering topic 
subscriptions and metadata
- *  If the current assignment exists, retain partitions up to the 
minimum quota.
- *  If the current assignment size is greater than the minimum 
quota and
- *  there are members that could get an extra partition, assign 
the next partition as well.
- *  Finally, if the member's current assignment size is less than 
the minimum quota,
- *  add them to the potentially unfilled members map and track the 
number of remaining
- *  partitions required to meet the quota.
- * 
- * 
- *
- * @return  Members mapped to the remaining number of partitions needed to 
meet the minimum quota,
- *  including members that are eligible to receive an extra 
partition.
- */
-private Map assignStickyPartitions(int minQuota) {
-Map potentiallyUnfilledMembers = new HashMap<>();
-
-groupSpec.members().forEach((memberId, assignmentMemberSpec) -> {
-List validCurrentMemberAssignment = 
validCurrentMemberAssignment(
-assignmentMemberSpec.assignedPartitions()
-);
-
-int currentAssignmentSize = validCurrentMemberAssignment.size();
-// Number of partitions required to meet the minimum quota.
-int remaining = minQuota - currentAssignmentSize;
-
-if (currentAssignmentSize > 0) {
-int retainedPartitionsCount = min(currentAssignmentSize, 
minQuota);
-IntStream.range(0, retainedPartitionsCount).forEach(i -> {
-TopicIdPartition topicIdPartition = 
validCurrentMemberAssignment.get(i);
-addPartitionToAssignment(
-targetAssignment,
-memberId,
-topicIdPartition.topicId(),
-topicIdPartition.partitionId()
-);
-});
-
-if (remaining < 0) {
-// The extra partition is located at the last index from 
the previous step.
-if (remainingMembersToGetAnExtraPartition > 0) {
-TopicIdPartition topicIdPartition = 
validCurrentMemberAssignment.get(retainedPartitionsCount++);
-addPartitionToAssignment(
-targetAssignment,
-memberId,
-topicIdPartition.topicId(),
-topicIdPartition.partitionId()
-);
-remainingMembersToGetAnExtraPartition--;
+private void maybeRevokePartitions() {
+for (Map.Entry entry : 
groupSpec.members().entrySet()) {
+String memberId = entry.getKey();
+AssignmentMemberSpec assignmentMemberSpec = entry.getValue();
+Map> oldAssignment = 
a

Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]

2024-05-31 Thread via GitHub


dajac commented on code in PR #16088:
URL: https://github.com/apache/kafka/pull/16088#discussion_r1622314880


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java:
##
@@ -71,63 +69,54 @@ public class OptimizedUniformAssignmentBuilder extends 
AbstractUniformAssignment
  */
 private final Set subscribedTopicIds;
 
-/**
- * The number of members to receive an extra partition beyond the minimum 
quota.
- * Minimum Quota = Total Partitions / Total Members
- * Example: If there are 11 partitions to be distributed among 3 members,
- *  each member gets 3 (11 / 3) [minQuota] partitions and 2 (11 % 
3) members get an extra partition.
- */
-private int remainingMembersToGetAnExtraPartition;
-
 /**
  * Members mapped to the remaining number of partitions needed to meet the 
minimum quota.
- * Minimum quota = total partitions / total members.
  */
-private Map potentiallyUnfilledMembers;
+private final List potentiallyUnfilledMembers;

Review Comment:
   I think that this is not true in the current implementation because I adjust 
the quota before adding to unfilled members. Let me rename it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]

2024-05-31 Thread via GitHub


dajac commented on code in PR #16088:
URL: https://github.com/apache/kafka/pull/16088#discussion_r1622313268


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java:
##
@@ -53,8 +42,17 @@
  * The assignment builder prioritizes the properties in the following order:
  *  Balance > Stickiness.
  */
-public class OptimizedUniformAssignmentBuilder extends 
AbstractUniformAssignmentBuilder {
-private static final Logger LOG = 
LoggerFactory.getLogger(OptimizedUniformAssignmentBuilder.class);
+public class OptimizedUniformAssignmentBuilder {

Review Comment:
   We don't need it at the moment and I don't think that we will need it in the 
future. As this code is performance sensitive, I also think that we should keep 
it as strait as possible and self contained if possible.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]

2024-05-31 Thread via GitHub


dajac commented on code in PR #16088:
URL: https://github.com/apache/kafka/pull/16088#discussion_r1622312275


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformAssignor.java:
##
@@ -66,21 +66,19 @@ public GroupAssignment assign(
 GroupSpec groupSpec,
 SubscribedTopicDescriber subscribedTopicDescriber
 ) throws PartitionAssignorException {
-AbstractUniformAssignmentBuilder assignmentBuilder;
-
 if (groupSpec.members().isEmpty())
 return new GroupAssignment(Collections.emptyMap());
 
 if (groupSpec.subscriptionType().equals(HOMOGENEOUS)) {
 LOG.debug("Detected that all members are subscribed to the same 
set of topics, invoking the "
 + "optimized assignment algorithm");
-assignmentBuilder = new 
OptimizedUniformAssignmentBuilder(groupSpec, subscribedTopicDescriber);
+return new OptimizedUniformAssignmentBuilder(groupSpec, 
subscribedTopicDescriber)
+.build();

Review Comment:
   I thought that `assignment` was redundant as this is an assignment builder. 
I will also rename the general one but I will do it in a different PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]

2024-05-31 Thread via GitHub


rreddy-22 commented on code in PR #16088:
URL: https://github.com/apache/kafka/pull/16088#discussion_r1621843719


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java:
##
@@ -71,63 +69,54 @@ public class OptimizedUniformAssignmentBuilder extends 
AbstractUniformAssignment
  */
 private final Set subscribedTopicIds;
 
-/**
- * The number of members to receive an extra partition beyond the minimum 
quota.
- * Minimum Quota = Total Partitions / Total Members
- * Example: If there are 11 partitions to be distributed among 3 members,
- *  each member gets 3 (11 / 3) [minQuota] partitions and 2 (11 % 
3) members get an extra partition.
- */
-private int remainingMembersToGetAnExtraPartition;
-
 /**
  * Members mapped to the remaining number of partitions needed to meet the 
minimum quota.
- * Minimum quota = total partitions / total members.
  */
-private Map potentiallyUnfilledMembers;
+private final List potentiallyUnfilledMembers;

Review Comment:
   There is a set number of extra partitions that we can give out once every 
member gets an equal number of partitions. For exmaple if there were 5 
partitions and 3 members. Each member gets 1 each and then we have 2 extras. 
Let's say A,B,C have all received 1 partition, they are all potentially 
unfilled. Once A and B get their extra partitions, C isn't unfilled and doesn't 
receive any partitions



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]

2024-05-30 Thread via GitHub


jolshan commented on code in PR #16088:
URL: https://github.com/apache/kafka/pull/16088#discussion_r1621445974


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java:
##
@@ -144,216 +133,137 @@ protected GroupAssignment buildAssignment() throws 
PartitionAssignorException {
 }
 }
 
-// The minimum required quota that each member needs to meet for a 
balanced assignment.
-// This is the same for all members.
-final int numberOfMembers = groupSpec.members().size();
-final int minQuota = totalPartitionsCount / numberOfMembers;
+// Compute the minimum required quota per member and the number of 
members
+// who should receive an extra partition.
+int numberOfMembers = groupSpec.members().size();
+minimumMemberQuota = totalPartitionsCount / numberOfMembers;
 remainingMembersToGetAnExtraPartition = totalPartitionsCount % 
numberOfMembers;
 
-groupSpec.members().keySet().forEach(memberId ->
-targetAssignment.put(memberId, new MemberAssignment(new 
HashMap<>())
-));
-
-potentiallyUnfilledMembers = assignStickyPartitions(minQuota);
-
-unassignedPartitionsRoundRobinAssignment();
+// Revoke the partitions which are either not part of the 
subscriptions or above
+// the maximum quota.
+maybeRevokePartitions();
 
-if (!unassignedPartitions.isEmpty()) {
-throw new PartitionAssignorException("Partitions were left 
unassigned");
-}
+// Assign the unassigned partitions to the members with space.
+assignRemainingPartitions();
 
 return new GroupAssignment(targetAssignment);
 }
 
-/**
- * Retains a set of partitions from the existing assignment and includes 
them in the target assignment.
- * Only relevant partitions that exist in the current topic metadata and 
subscriptions are considered.
- *
- *  For each member:
- * 
- *  Find the valid current assignment considering topic 
subscriptions and metadata
- *  If the current assignment exists, retain partitions up to the 
minimum quota.
- *  If the current assignment size is greater than the minimum 
quota and
- *  there are members that could get an extra partition, assign 
the next partition as well.
- *  Finally, if the member's current assignment size is less than 
the minimum quota,
- *  add them to the potentially unfilled members map and track the 
number of remaining
- *  partitions required to meet the quota.
- * 
- * 
- *
- * @return  Members mapped to the remaining number of partitions needed to 
meet the minimum quota,
- *  including members that are eligible to receive an extra 
partition.
- */
-private Map assignStickyPartitions(int minQuota) {
-Map potentiallyUnfilledMembers = new HashMap<>();
-
-groupSpec.members().forEach((memberId, assignmentMemberSpec) -> {
-List validCurrentMemberAssignment = 
validCurrentMemberAssignment(
-assignmentMemberSpec.assignedPartitions()
-);
-
-int currentAssignmentSize = validCurrentMemberAssignment.size();
-// Number of partitions required to meet the minimum quota.
-int remaining = minQuota - currentAssignmentSize;
-
-if (currentAssignmentSize > 0) {
-int retainedPartitionsCount = min(currentAssignmentSize, 
minQuota);
-IntStream.range(0, retainedPartitionsCount).forEach(i -> {
-TopicIdPartition topicIdPartition = 
validCurrentMemberAssignment.get(i);
-addPartitionToAssignment(
-targetAssignment,
-memberId,
-topicIdPartition.topicId(),
-topicIdPartition.partitionId()
-);
-});
-
-if (remaining < 0) {
-// The extra partition is located at the last index from 
the previous step.
-if (remainingMembersToGetAnExtraPartition > 0) {
-TopicIdPartition topicIdPartition = 
validCurrentMemberAssignment.get(retainedPartitionsCount++);
-addPartitionToAssignment(
-targetAssignment,
-memberId,
-topicIdPartition.topicId(),
-topicIdPartition.partitionId()
-);
-remainingMembersToGetAnExtraPartition--;
+private void maybeRevokePartitions() {
+for (Map.Entry entry : 
groupSpec.members().entrySet()) {
+String memberId = entry.getKey();
+AssignmentMemberSpec assignmentMemberSpec = entry.getValue();
+Map> oldAssignment = 

Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]

2024-05-30 Thread via GitHub


jolshan commented on code in PR #16088:
URL: https://github.com/apache/kafka/pull/16088#discussion_r1621442335


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformAssignor.java:
##
@@ -66,21 +66,19 @@ public GroupAssignment assign(
 GroupSpec groupSpec,
 SubscribedTopicDescriber subscribedTopicDescriber
 ) throws PartitionAssignorException {
-AbstractUniformAssignmentBuilder assignmentBuilder;
-
 if (groupSpec.members().isEmpty())
 return new GroupAssignment(Collections.emptyMap());
 
 if (groupSpec.subscriptionType().equals(HOMOGENEOUS)) {
 LOG.debug("Detected that all members are subscribed to the same 
set of topics, invoking the "
 + "optimized assignment algorithm");
-assignmentBuilder = new 
OptimizedUniformAssignmentBuilder(groupSpec, subscribedTopicDescriber);
+return new OptimizedUniformAssignmentBuilder(groupSpec, 
subscribedTopicDescriber)
+.build();

Review Comment:
   any reason why we changed the name to not match the general assignor? Or is 
this also changed in the original that renamed the files?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]

2024-05-30 Thread via GitHub


jolshan commented on code in PR #16088:
URL: https://github.com/apache/kafka/pull/16088#discussion_r1621424229


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java:
##
@@ -71,63 +69,54 @@ public class OptimizedUniformAssignmentBuilder extends 
AbstractUniformAssignment
  */
 private final Set subscribedTopicIds;
 
-/**
- * The number of members to receive an extra partition beyond the minimum 
quota.
- * Minimum Quota = Total Partitions / Total Members
- * Example: If there are 11 partitions to be distributed among 3 members,
- *  each member gets 3 (11 / 3) [minQuota] partitions and 2 (11 % 
3) members get an extra partition.
- */
-private int remainingMembersToGetAnExtraPartition;
-
 /**
  * Members mapped to the remaining number of partitions needed to meet the 
minimum quota.
- * Minimum quota = total partitions / total members.
  */
-private Map potentiallyUnfilledMembers;
+private final List potentiallyUnfilledMembers;

Review Comment:
   why do we call this potentiallyUnfilledMembers rather than unfilledMembers?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]

2024-05-30 Thread via GitHub


jolshan commented on code in PR #16088:
URL: https://github.com/apache/kafka/pull/16088#discussion_r1621422471


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java:
##
@@ -144,216 +133,137 @@ protected GroupAssignment buildAssignment() throws 
PartitionAssignorException {
 }
 }
 
-// The minimum required quota that each member needs to meet for a 
balanced assignment.
-// This is the same for all members.
-final int numberOfMembers = groupSpec.members().size();
-final int minQuota = totalPartitionsCount / numberOfMembers;
+// Compute the minimum required quota per member and the number of 
members
+// who should receive an extra partition.
+int numberOfMembers = groupSpec.members().size();
+minimumMemberQuota = totalPartitionsCount / numberOfMembers;
 remainingMembersToGetAnExtraPartition = totalPartitionsCount % 
numberOfMembers;
 
-groupSpec.members().keySet().forEach(memberId ->
-targetAssignment.put(memberId, new MemberAssignment(new 
HashMap<>())
-));
-
-potentiallyUnfilledMembers = assignStickyPartitions(minQuota);
-
-unassignedPartitionsRoundRobinAssignment();
+// Revoke the partitions which are either not part of the 
subscriptions or above
+// the maximum quota.
+maybeRevokePartitions();
 
-if (!unassignedPartitions.isEmpty()) {
-throw new PartitionAssignorException("Partitions were left 
unassigned");
-}
+// Assign the unassigned partitions to the members with space.
+assignRemainingPartitions();
 
 return new GroupAssignment(targetAssignment);
 }
 
-/**
- * Retains a set of partitions from the existing assignment and includes 
them in the target assignment.
- * Only relevant partitions that exist in the current topic metadata and 
subscriptions are considered.
- *
- *  For each member:
- * 
- *  Find the valid current assignment considering topic 
subscriptions and metadata
- *  If the current assignment exists, retain partitions up to the 
minimum quota.
- *  If the current assignment size is greater than the minimum 
quota and
- *  there are members that could get an extra partition, assign 
the next partition as well.
- *  Finally, if the member's current assignment size is less than 
the minimum quota,
- *  add them to the potentially unfilled members map and track the 
number of remaining
- *  partitions required to meet the quota.
- * 
- * 
- *
- * @return  Members mapped to the remaining number of partitions needed to 
meet the minimum quota,
- *  including members that are eligible to receive an extra 
partition.
- */
-private Map assignStickyPartitions(int minQuota) {
-Map potentiallyUnfilledMembers = new HashMap<>();
-
-groupSpec.members().forEach((memberId, assignmentMemberSpec) -> {
-List validCurrentMemberAssignment = 
validCurrentMemberAssignment(
-assignmentMemberSpec.assignedPartitions()
-);
-
-int currentAssignmentSize = validCurrentMemberAssignment.size();
-// Number of partitions required to meet the minimum quota.
-int remaining = minQuota - currentAssignmentSize;
-
-if (currentAssignmentSize > 0) {
-int retainedPartitionsCount = min(currentAssignmentSize, 
minQuota);
-IntStream.range(0, retainedPartitionsCount).forEach(i -> {
-TopicIdPartition topicIdPartition = 
validCurrentMemberAssignment.get(i);
-addPartitionToAssignment(
-targetAssignment,
-memberId,
-topicIdPartition.topicId(),
-topicIdPartition.partitionId()
-);
-});
-
-if (remaining < 0) {
-// The extra partition is located at the last index from 
the previous step.
-if (remainingMembersToGetAnExtraPartition > 0) {
-TopicIdPartition topicIdPartition = 
validCurrentMemberAssignment.get(retainedPartitionsCount++);
-addPartitionToAssignment(
-targetAssignment,
-memberId,
-topicIdPartition.topicId(),
-topicIdPartition.partitionId()
-);
-remainingMembersToGetAnExtraPartition--;
+private void maybeRevokePartitions() {
+for (Map.Entry entry : 
groupSpec.members().entrySet()) {
+String memberId = entry.getKey();
+AssignmentMemberSpec assignmentMemberSpec = entry.getValue();
+Map> oldAssignment = 

Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]

2024-05-30 Thread via GitHub


jolshan commented on code in PR #16088:
URL: https://github.com/apache/kafka/pull/16088#discussion_r1621421194


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java:
##
@@ -144,216 +133,137 @@ protected GroupAssignment buildAssignment() throws 
PartitionAssignorException {
 }
 }
 
-// The minimum required quota that each member needs to meet for a 
balanced assignment.
-// This is the same for all members.
-final int numberOfMembers = groupSpec.members().size();
-final int minQuota = totalPartitionsCount / numberOfMembers;
+// Compute the minimum required quota per member and the number of 
members
+// who should receive an extra partition.
+int numberOfMembers = groupSpec.members().size();
+minimumMemberQuota = totalPartitionsCount / numberOfMembers;
 remainingMembersToGetAnExtraPartition = totalPartitionsCount % 
numberOfMembers;
 
-groupSpec.members().keySet().forEach(memberId ->
-targetAssignment.put(memberId, new MemberAssignment(new 
HashMap<>())
-));
-
-potentiallyUnfilledMembers = assignStickyPartitions(minQuota);
-
-unassignedPartitionsRoundRobinAssignment();
+// Revoke the partitions which are either not part of the 
subscriptions or above
+// the maximum quota.
+maybeRevokePartitions();
 
-if (!unassignedPartitions.isEmpty()) {
-throw new PartitionAssignorException("Partitions were left 
unassigned");
-}
+// Assign the unassigned partitions to the members with space.
+assignRemainingPartitions();
 
 return new GroupAssignment(targetAssignment);
 }
 
-/**
- * Retains a set of partitions from the existing assignment and includes 
them in the target assignment.
- * Only relevant partitions that exist in the current topic metadata and 
subscriptions are considered.
- *
- *  For each member:
- * 
- *  Find the valid current assignment considering topic 
subscriptions and metadata
- *  If the current assignment exists, retain partitions up to the 
minimum quota.
- *  If the current assignment size is greater than the minimum 
quota and
- *  there are members that could get an extra partition, assign 
the next partition as well.
- *  Finally, if the member's current assignment size is less than 
the minimum quota,
- *  add them to the potentially unfilled members map and track the 
number of remaining
- *  partitions required to meet the quota.
- * 
- * 
- *
- * @return  Members mapped to the remaining number of partitions needed to 
meet the minimum quota,
- *  including members that are eligible to receive an extra 
partition.
- */
-private Map assignStickyPartitions(int minQuota) {
-Map potentiallyUnfilledMembers = new HashMap<>();
-
-groupSpec.members().forEach((memberId, assignmentMemberSpec) -> {
-List validCurrentMemberAssignment = 
validCurrentMemberAssignment(
-assignmentMemberSpec.assignedPartitions()
-);
-
-int currentAssignmentSize = validCurrentMemberAssignment.size();
-// Number of partitions required to meet the minimum quota.
-int remaining = minQuota - currentAssignmentSize;
-
-if (currentAssignmentSize > 0) {
-int retainedPartitionsCount = min(currentAssignmentSize, 
minQuota);
-IntStream.range(0, retainedPartitionsCount).forEach(i -> {
-TopicIdPartition topicIdPartition = 
validCurrentMemberAssignment.get(i);
-addPartitionToAssignment(
-targetAssignment,
-memberId,
-topicIdPartition.topicId(),
-topicIdPartition.partitionId()
-);
-});
-
-if (remaining < 0) {
-// The extra partition is located at the last index from 
the previous step.
-if (remainingMembersToGetAnExtraPartition > 0) {
-TopicIdPartition topicIdPartition = 
validCurrentMemberAssignment.get(retainedPartitionsCount++);
-addPartitionToAssignment(
-targetAssignment,
-memberId,
-topicIdPartition.topicId(),
-topicIdPartition.partitionId()
-);
-remainingMembersToGetAnExtraPartition--;
+private void maybeRevokePartitions() {
+for (Map.Entry entry : 
groupSpec.members().entrySet()) {
+String memberId = entry.getKey();
+AssignmentMemberSpec assignmentMemberSpec = entry.getValue();
+Map> oldAssignment = 

Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]

2024-05-29 Thread via GitHub


rreddy-22 commented on code in PR #16088:
URL: https://github.com/apache/kafka/pull/16088#discussion_r1619961360


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java:
##
@@ -18,29 +18,18 @@
 
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.server.common.TopicIdPartition;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Queue;
 import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-import static java.lang.Math.min;
 
 /**
- * The optimized uniform assignment builder is used to generate the target 
assignment for a consumer group with
+ * The homogenous uniform assignment builder is used to generate the target 
assignment for a consumer group with

Review Comment:
   That was for the subscription type right which is just one word? Homogeneous 
uniform assignment builder is kind of a mouthful. 
   I used the wrong term, by user I meant anyone looking at the code. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]

2024-05-29 Thread via GitHub


rreddy-22 commented on code in PR #16088:
URL: https://github.com/apache/kafka/pull/16088#discussion_r1619961360


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java:
##
@@ -18,29 +18,18 @@
 
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.server.common.TopicIdPartition;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Queue;
 import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-import static java.lang.Math.min;
 
 /**
- * The optimized uniform assignment builder is used to generate the target 
assignment for a consumer group with
+ * The homogenous uniform assignment builder is used to generate the target 
assignment for a consumer group with

Review Comment:
   That was for the subscription type right? Homogeneous uniform assignment 
builder is kind of a mouthful. 
   I used the wrong term, by user I meant anyone looking at the code. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]

2024-05-29 Thread via GitHub


rreddy-22 commented on code in PR #16088:
URL: https://github.com/apache/kafka/pull/16088#discussion_r1619937439


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java:
##
@@ -144,216 +133,137 @@ protected GroupAssignment buildAssignment() throws 
PartitionAssignorException {
 }
 }
 
-// The minimum required quota that each member needs to meet for a 
balanced assignment.
-// This is the same for all members.
-final int numberOfMembers = groupSpec.members().size();
-final int minQuota = totalPartitionsCount / numberOfMembers;
+// Compute the minimum required quota per member and the number of 
members
+// who should receive an extra partition.
+int numberOfMembers = groupSpec.members().size();
+minimumMemberQuota = totalPartitionsCount / numberOfMembers;
 remainingMembersToGetAnExtraPartition = totalPartitionsCount % 
numberOfMembers;
 
-groupSpec.members().keySet().forEach(memberId ->
-targetAssignment.put(memberId, new MemberAssignment(new 
HashMap<>())
-));
-
-potentiallyUnfilledMembers = assignStickyPartitions(minQuota);
-
-unassignedPartitionsRoundRobinAssignment();
+// Revoke the partitions which are either not part of the 
subscriptions or above
+// the maximum quota.
+maybeRevokePartitions();
 
-if (!unassignedPartitions.isEmpty()) {
-throw new PartitionAssignorException("Partitions were left 
unassigned");
-}
+// Assign the unassigned partitions to the members with space.
+assignRemainingPartitions();
 
 return new GroupAssignment(targetAssignment);
 }
 
-/**
- * Retains a set of partitions from the existing assignment and includes 
them in the target assignment.
- * Only relevant partitions that exist in the current topic metadata and 
subscriptions are considered.
- *
- *  For each member:
- * 
- *  Find the valid current assignment considering topic 
subscriptions and metadata
- *  If the current assignment exists, retain partitions up to the 
minimum quota.
- *  If the current assignment size is greater than the minimum 
quota and
- *  there are members that could get an extra partition, assign 
the next partition as well.
- *  Finally, if the member's current assignment size is less than 
the minimum quota,
- *  add them to the potentially unfilled members map and track the 
number of remaining
- *  partitions required to meet the quota.
- * 
- * 
- *
- * @return  Members mapped to the remaining number of partitions needed to 
meet the minimum quota,
- *  including members that are eligible to receive an extra 
partition.
- */
-private Map assignStickyPartitions(int minQuota) {
-Map potentiallyUnfilledMembers = new HashMap<>();
-
-groupSpec.members().forEach((memberId, assignmentMemberSpec) -> {
-List validCurrentMemberAssignment = 
validCurrentMemberAssignment(
-assignmentMemberSpec.assignedPartitions()
-);
-
-int currentAssignmentSize = validCurrentMemberAssignment.size();
-// Number of partitions required to meet the minimum quota.
-int remaining = minQuota - currentAssignmentSize;
-
-if (currentAssignmentSize > 0) {
-int retainedPartitionsCount = min(currentAssignmentSize, 
minQuota);
-IntStream.range(0, retainedPartitionsCount).forEach(i -> {
-TopicIdPartition topicIdPartition = 
validCurrentMemberAssignment.get(i);
-addPartitionToAssignment(
-targetAssignment,
-memberId,
-topicIdPartition.topicId(),
-topicIdPartition.partitionId()
-);
-});
-
-if (remaining < 0) {
-// The extra partition is located at the last index from 
the previous step.
-if (remainingMembersToGetAnExtraPartition > 0) {
-TopicIdPartition topicIdPartition = 
validCurrentMemberAssignment.get(retainedPartitionsCount++);
-addPartitionToAssignment(
-targetAssignment,
-memberId,
-topicIdPartition.topicId(),
-topicIdPartition.partitionId()
-);
-remainingMembersToGetAnExtraPartition--;
+private void maybeRevokePartitions() {
+for (Map.Entry entry : 
groupSpec.members().entrySet()) {
+String memberId = entry.getKey();
+AssignmentMemberSpec assignmentMemberSpec = entry.getValue();
+Map> oldAssignment 

Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]

2024-05-29 Thread via GitHub


rreddy-22 commented on code in PR #16088:
URL: https://github.com/apache/kafka/pull/16088#discussion_r1619924951


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java:
##
@@ -144,216 +133,137 @@ protected GroupAssignment buildAssignment() throws 
PartitionAssignorException {
 }
 }
 
-// The minimum required quota that each member needs to meet for a 
balanced assignment.
-// This is the same for all members.
-final int numberOfMembers = groupSpec.members().size();
-final int minQuota = totalPartitionsCount / numberOfMembers;
+// Compute the minimum required quota per member and the number of 
members
+// who should receive an extra partition.
+int numberOfMembers = groupSpec.members().size();
+minimumMemberQuota = totalPartitionsCount / numberOfMembers;
 remainingMembersToGetAnExtraPartition = totalPartitionsCount % 
numberOfMembers;
 
-groupSpec.members().keySet().forEach(memberId ->
-targetAssignment.put(memberId, new MemberAssignment(new 
HashMap<>())
-));
-
-potentiallyUnfilledMembers = assignStickyPartitions(minQuota);
-
-unassignedPartitionsRoundRobinAssignment();
+// Revoke the partitions which are either not part of the 
subscriptions or above
+// the maximum quota.
+maybeRevokePartitions();
 
-if (!unassignedPartitions.isEmpty()) {
-throw new PartitionAssignorException("Partitions were left 
unassigned");
-}
+// Assign the unassigned partitions to the members with space.
+assignRemainingPartitions();
 
 return new GroupAssignment(targetAssignment);
 }
 
-/**
- * Retains a set of partitions from the existing assignment and includes 
them in the target assignment.
- * Only relevant partitions that exist in the current topic metadata and 
subscriptions are considered.
- *
- *  For each member:
- * 
- *  Find the valid current assignment considering topic 
subscriptions and metadata
- *  If the current assignment exists, retain partitions up to the 
minimum quota.
- *  If the current assignment size is greater than the minimum 
quota and
- *  there are members that could get an extra partition, assign 
the next partition as well.
- *  Finally, if the member's current assignment size is less than 
the minimum quota,
- *  add them to the potentially unfilled members map and track the 
number of remaining
- *  partitions required to meet the quota.
- * 
- * 
- *
- * @return  Members mapped to the remaining number of partitions needed to 
meet the minimum quota,
- *  including members that are eligible to receive an extra 
partition.
- */
-private Map assignStickyPartitions(int minQuota) {
-Map potentiallyUnfilledMembers = new HashMap<>();
-
-groupSpec.members().forEach((memberId, assignmentMemberSpec) -> {
-List validCurrentMemberAssignment = 
validCurrentMemberAssignment(
-assignmentMemberSpec.assignedPartitions()
-);
-
-int currentAssignmentSize = validCurrentMemberAssignment.size();
-// Number of partitions required to meet the minimum quota.
-int remaining = minQuota - currentAssignmentSize;
-
-if (currentAssignmentSize > 0) {
-int retainedPartitionsCount = min(currentAssignmentSize, 
minQuota);
-IntStream.range(0, retainedPartitionsCount).forEach(i -> {
-TopicIdPartition topicIdPartition = 
validCurrentMemberAssignment.get(i);
-addPartitionToAssignment(
-targetAssignment,
-memberId,
-topicIdPartition.topicId(),
-topicIdPartition.partitionId()
-);
-});
-
-if (remaining < 0) {
-// The extra partition is located at the last index from 
the previous step.
-if (remainingMembersToGetAnExtraPartition > 0) {
-TopicIdPartition topicIdPartition = 
validCurrentMemberAssignment.get(retainedPartitionsCount++);
-addPartitionToAssignment(
-targetAssignment,
-memberId,
-topicIdPartition.topicId(),
-topicIdPartition.partitionId()
-);
-remainingMembersToGetAnExtraPartition--;
+private void maybeRevokePartitions() {

Review Comment:
   nit: can we add java docs for this method



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific

Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]

2024-05-29 Thread via GitHub


rreddy-22 commented on code in PR #16088:
URL: https://github.com/apache/kafka/pull/16088#discussion_r1619917677


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java:
##
@@ -144,216 +133,137 @@ protected GroupAssignment buildAssignment() throws 
PartitionAssignorException {
 }
 }
 
-// The minimum required quota that each member needs to meet for a 
balanced assignment.
-// This is the same for all members.
-final int numberOfMembers = groupSpec.members().size();
-final int minQuota = totalPartitionsCount / numberOfMembers;
+// Compute the minimum required quota per member and the number of 
members
+// who should receive an extra partition.
+int numberOfMembers = groupSpec.members().size();
+minimumMemberQuota = totalPartitionsCount / numberOfMembers;
 remainingMembersToGetAnExtraPartition = totalPartitionsCount % 
numberOfMembers;
 
-groupSpec.members().keySet().forEach(memberId ->
-targetAssignment.put(memberId, new MemberAssignment(new 
HashMap<>())
-));
-
-potentiallyUnfilledMembers = assignStickyPartitions(minQuota);
-
-unassignedPartitionsRoundRobinAssignment();
+// Revoke the partitions which are either not part of the 
subscriptions or above
+// the maximum quota.
+maybeRevokePartitions();
 
-if (!unassignedPartitions.isEmpty()) {
-throw new PartitionAssignorException("Partitions were left 
unassigned");
-}
+// Assign the unassigned partitions to the members with space.
+assignRemainingPartitions();
 
 return new GroupAssignment(targetAssignment);
 }
 
-/**
- * Retains a set of partitions from the existing assignment and includes 
them in the target assignment.
- * Only relevant partitions that exist in the current topic metadata and 
subscriptions are considered.
- *
- *  For each member:
- * 
- *  Find the valid current assignment considering topic 
subscriptions and metadata
- *  If the current assignment exists, retain partitions up to the 
minimum quota.
- *  If the current assignment size is greater than the minimum 
quota and
- *  there are members that could get an extra partition, assign 
the next partition as well.
- *  Finally, if the member's current assignment size is less than 
the minimum quota,
- *  add them to the potentially unfilled members map and track the 
number of remaining
- *  partitions required to meet the quota.
- * 
- * 
- *
- * @return  Members mapped to the remaining number of partitions needed to 
meet the minimum quota,
- *  including members that are eligible to receive an extra 
partition.
- */
-private Map assignStickyPartitions(int minQuota) {
-Map potentiallyUnfilledMembers = new HashMap<>();
-
-groupSpec.members().forEach((memberId, assignmentMemberSpec) -> {
-List validCurrentMemberAssignment = 
validCurrentMemberAssignment(
-assignmentMemberSpec.assignedPartitions()
-);
-
-int currentAssignmentSize = validCurrentMemberAssignment.size();
-// Number of partitions required to meet the minimum quota.
-int remaining = minQuota - currentAssignmentSize;
-
-if (currentAssignmentSize > 0) {
-int retainedPartitionsCount = min(currentAssignmentSize, 
minQuota);
-IntStream.range(0, retainedPartitionsCount).forEach(i -> {
-TopicIdPartition topicIdPartition = 
validCurrentMemberAssignment.get(i);
-addPartitionToAssignment(
-targetAssignment,
-memberId,
-topicIdPartition.topicId(),
-topicIdPartition.partitionId()
-);
-});
-
-if (remaining < 0) {
-// The extra partition is located at the last index from 
the previous step.
-if (remainingMembersToGetAnExtraPartition > 0) {
-TopicIdPartition topicIdPartition = 
validCurrentMemberAssignment.get(retainedPartitionsCount++);
-addPartitionToAssignment(
-targetAssignment,
-memberId,
-topicIdPartition.topicId(),
-topicIdPartition.partitionId()
-);
-remainingMembersToGetAnExtraPartition--;
+private void maybeRevokePartitions() {
+for (Map.Entry entry : 
groupSpec.members().entrySet()) {
+String memberId = entry.getKey();
+AssignmentMemberSpec assignmentMemberSpec = entry.getValue();
+Map> oldAssignment 

Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]

2024-05-29 Thread via GitHub


rreddy-22 commented on code in PR #16088:
URL: https://github.com/apache/kafka/pull/16088#discussion_r1619915938


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java:
##
@@ -144,216 +133,137 @@ protected GroupAssignment buildAssignment() throws 
PartitionAssignorException {
 }
 }
 
-// The minimum required quota that each member needs to meet for a 
balanced assignment.
-// This is the same for all members.
-final int numberOfMembers = groupSpec.members().size();
-final int minQuota = totalPartitionsCount / numberOfMembers;
+// Compute the minimum required quota per member and the number of 
members
+// who should receive an extra partition.
+int numberOfMembers = groupSpec.members().size();
+minimumMemberQuota = totalPartitionsCount / numberOfMembers;
 remainingMembersToGetAnExtraPartition = totalPartitionsCount % 
numberOfMembers;
 
-groupSpec.members().keySet().forEach(memberId ->
-targetAssignment.put(memberId, new MemberAssignment(new 
HashMap<>())
-));
-
-potentiallyUnfilledMembers = assignStickyPartitions(minQuota);
-
-unassignedPartitionsRoundRobinAssignment();
+// Revoke the partitions which are either not part of the 
subscriptions or above
+// the maximum quota.
+maybeRevokePartitions();
 
-if (!unassignedPartitions.isEmpty()) {
-throw new PartitionAssignorException("Partitions were left 
unassigned");
-}
+// Assign the unassigned partitions to the members with space.
+assignRemainingPartitions();
 
 return new GroupAssignment(targetAssignment);
 }
 
-/**
- * Retains a set of partitions from the existing assignment and includes 
them in the target assignment.
- * Only relevant partitions that exist in the current topic metadata and 
subscriptions are considered.
- *
- *  For each member:
- * 
- *  Find the valid current assignment considering topic 
subscriptions and metadata
- *  If the current assignment exists, retain partitions up to the 
minimum quota.
- *  If the current assignment size is greater than the minimum 
quota and
- *  there are members that could get an extra partition, assign 
the next partition as well.
- *  Finally, if the member's current assignment size is less than 
the minimum quota,
- *  add them to the potentially unfilled members map and track the 
number of remaining
- *  partitions required to meet the quota.
- * 
- * 
- *
- * @return  Members mapped to the remaining number of partitions needed to 
meet the minimum quota,
- *  including members that are eligible to receive an extra 
partition.
- */
-private Map assignStickyPartitions(int minQuota) {
-Map potentiallyUnfilledMembers = new HashMap<>();
-
-groupSpec.members().forEach((memberId, assignmentMemberSpec) -> {
-List validCurrentMemberAssignment = 
validCurrentMemberAssignment(
-assignmentMemberSpec.assignedPartitions()
-);
-
-int currentAssignmentSize = validCurrentMemberAssignment.size();
-// Number of partitions required to meet the minimum quota.
-int remaining = minQuota - currentAssignmentSize;
-
-if (currentAssignmentSize > 0) {
-int retainedPartitionsCount = min(currentAssignmentSize, 
minQuota);
-IntStream.range(0, retainedPartitionsCount).forEach(i -> {
-TopicIdPartition topicIdPartition = 
validCurrentMemberAssignment.get(i);
-addPartitionToAssignment(
-targetAssignment,
-memberId,
-topicIdPartition.topicId(),
-topicIdPartition.partitionId()
-);
-});
-
-if (remaining < 0) {
-// The extra partition is located at the last index from 
the previous step.
-if (remainingMembersToGetAnExtraPartition > 0) {
-TopicIdPartition topicIdPartition = 
validCurrentMemberAssignment.get(retainedPartitionsCount++);
-addPartitionToAssignment(
-targetAssignment,
-memberId,
-topicIdPartition.topicId(),
-topicIdPartition.partitionId()
-);
-remainingMembersToGetAnExtraPartition--;
+private void maybeRevokePartitions() {
+for (Map.Entry entry : 
groupSpec.members().entrySet()) {
+String memberId = entry.getKey();
+AssignmentMemberSpec assignmentMemberSpec = entry.getValue();
+Map> oldAssignment 

Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]

2024-05-29 Thread via GitHub


rreddy-22 commented on code in PR #16088:
URL: https://github.com/apache/kafka/pull/16088#discussion_r1619915938


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java:
##
@@ -144,216 +133,137 @@ protected GroupAssignment buildAssignment() throws 
PartitionAssignorException {
 }
 }
 
-// The minimum required quota that each member needs to meet for a 
balanced assignment.
-// This is the same for all members.
-final int numberOfMembers = groupSpec.members().size();
-final int minQuota = totalPartitionsCount / numberOfMembers;
+// Compute the minimum required quota per member and the number of 
members
+// who should receive an extra partition.
+int numberOfMembers = groupSpec.members().size();
+minimumMemberQuota = totalPartitionsCount / numberOfMembers;
 remainingMembersToGetAnExtraPartition = totalPartitionsCount % 
numberOfMembers;
 
-groupSpec.members().keySet().forEach(memberId ->
-targetAssignment.put(memberId, new MemberAssignment(new 
HashMap<>())
-));
-
-potentiallyUnfilledMembers = assignStickyPartitions(minQuota);
-
-unassignedPartitionsRoundRobinAssignment();
+// Revoke the partitions which are either not part of the 
subscriptions or above
+// the maximum quota.
+maybeRevokePartitions();
 
-if (!unassignedPartitions.isEmpty()) {
-throw new PartitionAssignorException("Partitions were left 
unassigned");
-}
+// Assign the unassigned partitions to the members with space.
+assignRemainingPartitions();
 
 return new GroupAssignment(targetAssignment);
 }
 
-/**
- * Retains a set of partitions from the existing assignment and includes 
them in the target assignment.
- * Only relevant partitions that exist in the current topic metadata and 
subscriptions are considered.
- *
- *  For each member:
- * 
- *  Find the valid current assignment considering topic 
subscriptions and metadata
- *  If the current assignment exists, retain partitions up to the 
minimum quota.
- *  If the current assignment size is greater than the minimum 
quota and
- *  there are members that could get an extra partition, assign 
the next partition as well.
- *  Finally, if the member's current assignment size is less than 
the minimum quota,
- *  add them to the potentially unfilled members map and track the 
number of remaining
- *  partitions required to meet the quota.
- * 
- * 
- *
- * @return  Members mapped to the remaining number of partitions needed to 
meet the minimum quota,
- *  including members that are eligible to receive an extra 
partition.
- */
-private Map assignStickyPartitions(int minQuota) {
-Map potentiallyUnfilledMembers = new HashMap<>();
-
-groupSpec.members().forEach((memberId, assignmentMemberSpec) -> {
-List validCurrentMemberAssignment = 
validCurrentMemberAssignment(
-assignmentMemberSpec.assignedPartitions()
-);
-
-int currentAssignmentSize = validCurrentMemberAssignment.size();
-// Number of partitions required to meet the minimum quota.
-int remaining = minQuota - currentAssignmentSize;
-
-if (currentAssignmentSize > 0) {
-int retainedPartitionsCount = min(currentAssignmentSize, 
minQuota);
-IntStream.range(0, retainedPartitionsCount).forEach(i -> {
-TopicIdPartition topicIdPartition = 
validCurrentMemberAssignment.get(i);
-addPartitionToAssignment(
-targetAssignment,
-memberId,
-topicIdPartition.topicId(),
-topicIdPartition.partitionId()
-);
-});
-
-if (remaining < 0) {
-// The extra partition is located at the last index from 
the previous step.
-if (remainingMembersToGetAnExtraPartition > 0) {
-TopicIdPartition topicIdPartition = 
validCurrentMemberAssignment.get(retainedPartitionsCount++);
-addPartitionToAssignment(
-targetAssignment,
-memberId,
-topicIdPartition.topicId(),
-topicIdPartition.partitionId()
-);
-remainingMembersToGetAnExtraPartition--;
+private void maybeRevokePartitions() {
+for (Map.Entry entry : 
groupSpec.members().entrySet()) {
+String memberId = entry.getKey();
+AssignmentMemberSpec assignmentMemberSpec = entry.getValue();
+Map> oldAssignment 

Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]

2024-05-29 Thread via GitHub


rreddy-22 commented on code in PR #16088:
URL: https://github.com/apache/kafka/pull/16088#discussion_r1619907267


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java:
##
@@ -144,216 +133,137 @@ protected GroupAssignment buildAssignment() throws 
PartitionAssignorException {
 }
 }
 
-// The minimum required quota that each member needs to meet for a 
balanced assignment.
-// This is the same for all members.
-final int numberOfMembers = groupSpec.members().size();
-final int minQuota = totalPartitionsCount / numberOfMembers;
+// Compute the minimum required quota per member and the number of 
members
+// who should receive an extra partition.
+int numberOfMembers = groupSpec.members().size();
+minimumMemberQuota = totalPartitionsCount / numberOfMembers;
 remainingMembersToGetAnExtraPartition = totalPartitionsCount % 
numberOfMembers;
 
-groupSpec.members().keySet().forEach(memberId ->
-targetAssignment.put(memberId, new MemberAssignment(new 
HashMap<>())
-));
-
-potentiallyUnfilledMembers = assignStickyPartitions(minQuota);
-
-unassignedPartitionsRoundRobinAssignment();
+// Revoke the partitions which are either not part of the 
subscriptions or above
+// the maximum quota.
+maybeRevokePartitions();
 
-if (!unassignedPartitions.isEmpty()) {
-throw new PartitionAssignorException("Partitions were left 
unassigned");
-}
+// Assign the unassigned partitions to the members with space.
+assignRemainingPartitions();
 
 return new GroupAssignment(targetAssignment);
 }
 
-/**
- * Retains a set of partitions from the existing assignment and includes 
them in the target assignment.
- * Only relevant partitions that exist in the current topic metadata and 
subscriptions are considered.
- *
- *  For each member:
- * 
- *  Find the valid current assignment considering topic 
subscriptions and metadata
- *  If the current assignment exists, retain partitions up to the 
minimum quota.
- *  If the current assignment size is greater than the minimum 
quota and
- *  there are members that could get an extra partition, assign 
the next partition as well.
- *  Finally, if the member's current assignment size is less than 
the minimum quota,
- *  add them to the potentially unfilled members map and track the 
number of remaining
- *  partitions required to meet the quota.
- * 
- * 
- *
- * @return  Members mapped to the remaining number of partitions needed to 
meet the minimum quota,
- *  including members that are eligible to receive an extra 
partition.
- */
-private Map assignStickyPartitions(int minQuota) {
-Map potentiallyUnfilledMembers = new HashMap<>();
-
-groupSpec.members().forEach((memberId, assignmentMemberSpec) -> {
-List validCurrentMemberAssignment = 
validCurrentMemberAssignment(
-assignmentMemberSpec.assignedPartitions()
-);
-
-int currentAssignmentSize = validCurrentMemberAssignment.size();
-// Number of partitions required to meet the minimum quota.
-int remaining = minQuota - currentAssignmentSize;
-
-if (currentAssignmentSize > 0) {
-int retainedPartitionsCount = min(currentAssignmentSize, 
minQuota);
-IntStream.range(0, retainedPartitionsCount).forEach(i -> {
-TopicIdPartition topicIdPartition = 
validCurrentMemberAssignment.get(i);
-addPartitionToAssignment(
-targetAssignment,
-memberId,
-topicIdPartition.topicId(),
-topicIdPartition.partitionId()
-);
-});
-
-if (remaining < 0) {
-// The extra partition is located at the last index from 
the previous step.
-if (remainingMembersToGetAnExtraPartition > 0) {
-TopicIdPartition topicIdPartition = 
validCurrentMemberAssignment.get(retainedPartitionsCount++);
-addPartitionToAssignment(
-targetAssignment,
-memberId,
-topicIdPartition.topicId(),
-topicIdPartition.partitionId()
-);
-remainingMembersToGetAnExtraPartition--;
+private void maybeRevokePartitions() {
+for (Map.Entry entry : 
groupSpec.members().entrySet()) {
+String memberId = entry.getKey();
+AssignmentMemberSpec assignmentMemberSpec = entry.getValue();
+Map> oldAssignment 

Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]

2024-05-29 Thread via GitHub


rreddy-22 commented on code in PR #16088:
URL: https://github.com/apache/kafka/pull/16088#discussion_r1619898066


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java:
##
@@ -144,216 +133,137 @@ protected GroupAssignment buildAssignment() throws 
PartitionAssignorException {
 }
 }
 
-// The minimum required quota that each member needs to meet for a 
balanced assignment.
-// This is the same for all members.
-final int numberOfMembers = groupSpec.members().size();
-final int minQuota = totalPartitionsCount / numberOfMembers;
+// Compute the minimum required quota per member and the number of 
members
+// who should receive an extra partition.
+int numberOfMembers = groupSpec.members().size();
+minimumMemberQuota = totalPartitionsCount / numberOfMembers;
 remainingMembersToGetAnExtraPartition = totalPartitionsCount % 
numberOfMembers;
 
-groupSpec.members().keySet().forEach(memberId ->
-targetAssignment.put(memberId, new MemberAssignment(new 
HashMap<>())
-));
-
-potentiallyUnfilledMembers = assignStickyPartitions(minQuota);
-
-unassignedPartitionsRoundRobinAssignment();
+// Revoke the partitions which are either not part of the 
subscriptions or above
+// the maximum quota.
+maybeRevokePartitions();
 
-if (!unassignedPartitions.isEmpty()) {
-throw new PartitionAssignorException("Partitions were left 
unassigned");
-}
+// Assign the unassigned partitions to the members with space.
+assignRemainingPartitions();
 
 return new GroupAssignment(targetAssignment);
 }
 
-/**
- * Retains a set of partitions from the existing assignment and includes 
them in the target assignment.
- * Only relevant partitions that exist in the current topic metadata and 
subscriptions are considered.
- *
- *  For each member:
- * 
- *  Find the valid current assignment considering topic 
subscriptions and metadata
- *  If the current assignment exists, retain partitions up to the 
minimum quota.
- *  If the current assignment size is greater than the minimum 
quota and
- *  there are members that could get an extra partition, assign 
the next partition as well.
- *  Finally, if the member's current assignment size is less than 
the minimum quota,
- *  add them to the potentially unfilled members map and track the 
number of remaining
- *  partitions required to meet the quota.
- * 
- * 
- *
- * @return  Members mapped to the remaining number of partitions needed to 
meet the minimum quota,
- *  including members that are eligible to receive an extra 
partition.
- */
-private Map assignStickyPartitions(int minQuota) {
-Map potentiallyUnfilledMembers = new HashMap<>();
-
-groupSpec.members().forEach((memberId, assignmentMemberSpec) -> {
-List validCurrentMemberAssignment = 
validCurrentMemberAssignment(
-assignmentMemberSpec.assignedPartitions()
-);
-
-int currentAssignmentSize = validCurrentMemberAssignment.size();
-// Number of partitions required to meet the minimum quota.
-int remaining = minQuota - currentAssignmentSize;
-
-if (currentAssignmentSize > 0) {
-int retainedPartitionsCount = min(currentAssignmentSize, 
minQuota);
-IntStream.range(0, retainedPartitionsCount).forEach(i -> {
-TopicIdPartition topicIdPartition = 
validCurrentMemberAssignment.get(i);
-addPartitionToAssignment(
-targetAssignment,
-memberId,
-topicIdPartition.topicId(),
-topicIdPartition.partitionId()
-);
-});
-
-if (remaining < 0) {
-// The extra partition is located at the last index from 
the previous step.
-if (remainingMembersToGetAnExtraPartition > 0) {
-TopicIdPartition topicIdPartition = 
validCurrentMemberAssignment.get(retainedPartitionsCount++);
-addPartitionToAssignment(
-targetAssignment,
-memberId,
-topicIdPartition.topicId(),
-topicIdPartition.partitionId()
-);
-remainingMembersToGetAnExtraPartition--;
+private void maybeRevokePartitions() {
+for (Map.Entry entry : 
groupSpec.members().entrySet()) {
+String memberId = entry.getKey();
+AssignmentMemberSpec assignmentMemberSpec = entry.getValue();
+Map> oldAssignment 

Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]

2024-05-29 Thread via GitHub


rreddy-22 commented on code in PR #16088:
URL: https://github.com/apache/kafka/pull/16088#discussion_r1619894784


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java:
##
@@ -144,216 +133,137 @@ protected GroupAssignment buildAssignment() throws 
PartitionAssignorException {
 }
 }
 
-// The minimum required quota that each member needs to meet for a 
balanced assignment.
-// This is the same for all members.
-final int numberOfMembers = groupSpec.members().size();
-final int minQuota = totalPartitionsCount / numberOfMembers;
+// Compute the minimum required quota per member and the number of 
members
+// who should receive an extra partition.
+int numberOfMembers = groupSpec.members().size();
+minimumMemberQuota = totalPartitionsCount / numberOfMembers;
 remainingMembersToGetAnExtraPartition = totalPartitionsCount % 
numberOfMembers;
 
-groupSpec.members().keySet().forEach(memberId ->
-targetAssignment.put(memberId, new MemberAssignment(new 
HashMap<>())
-));
-
-potentiallyUnfilledMembers = assignStickyPartitions(minQuota);
-
-unassignedPartitionsRoundRobinAssignment();
+// Revoke the partitions which are either not part of the 
subscriptions or above

Review Comment:
   nit: // Revoke the partitions that either:
   // 1. Are not part of the member's current subscription.
   // 2. Exceed the maximum quota assigned to each member.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]

2024-05-29 Thread via GitHub


rreddy-22 commented on code in PR #16088:
URL: https://github.com/apache/kafka/pull/16088#discussion_r1619892264


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java:
##
@@ -144,216 +133,137 @@ protected GroupAssignment buildAssignment() throws 
PartitionAssignorException {
 }
 }
 
-// The minimum required quota that each member needs to meet for a 
balanced assignment.
-// This is the same for all members.
-final int numberOfMembers = groupSpec.members().size();
-final int minQuota = totalPartitionsCount / numberOfMembers;
+// Compute the minimum required quota per member and the number of 
members
+// who should receive an extra partition.

Review Comment:
   nit: members that* should



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]

2024-05-29 Thread via GitHub


rreddy-22 commented on code in PR #16088:
URL: https://github.com/apache/kafka/pull/16088#discussion_r1619888416


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java:
##
@@ -53,8 +42,17 @@
  * The assignment builder prioritizes the properties in the following order:
  *  Balance > Stickiness.
  */
-public class OptimizedUniformAssignmentBuilder extends 
AbstractUniformAssignmentBuilder {
-private static final Logger LOG = 
LoggerFactory.getLogger(OptimizedUniformAssignmentBuilder.class);
+public class OptimizedUniformAssignmentBuilder {
+private static final Class UNMODIFIALBE_MAP_CLASS = 
Collections.unmodifiableMap(new HashMap<>()).getClass();
+private static final Class EMPTY_MAP_CLASS = 
Collections.emptyMap().getClass();
+
+/**
+ * @return True if the provided map is an UnmodifiableMap or EmptyMap. 
Those classes are not
+ * public hence we cannot use the `instanceof` operator.
+ */
+private static boolean isImmutableMap(Map map) {
+return UNMODIFIALBE_MAP_CLASS.isInstance(map) || 
EMPTY_MAP_CLASS.isInstance(map);

Review Comment:
   nit: UNMODIFIABLE *



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]

2024-05-29 Thread via GitHub


dajac commented on code in PR #16088:
URL: https://github.com/apache/kafka/pull/16088#discussion_r1619884277


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java:
##
@@ -18,29 +18,18 @@
 
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.server.common.TopicIdPartition;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Queue;
 import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-import static java.lang.Math.min;
 
 /**
- * The optimized uniform assignment builder is used to generate the target 
assignment for a consumer group with
+ * The homogenous uniform assignment builder is used to generate the target 
assignment for a consumer group with

Review Comment:
   Interesting… So you’re fine with adding those names to the public interface 
(what the end user really see) but not to follow the same naming scheme for the 
internal classes (that the end user don’t see at all). In my opinion, it makes 
sense to align as it will make it more coherent.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]

2024-05-29 Thread via GitHub


rreddy-22 commented on code in PR #16088:
URL: https://github.com/apache/kafka/pull/16088#discussion_r1619875174


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java:
##
@@ -18,29 +18,18 @@
 
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.server.common.TopicIdPartition;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Queue;
 import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-import static java.lang.Math.min;
 
 /**
- * The optimized uniform assignment builder is used to generate the target 
assignment for a consumer group with
+ * The homogenous uniform assignment builder is used to generate the target 
assignment for a consumer group with

Review Comment:
   I feel like homogeneous and heterogeneous are harder names from a user pov, 
I think we should keep it as is



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]

2024-05-29 Thread via GitHub


rreddy-22 commented on code in PR #16088:
URL: https://github.com/apache/kafka/pull/16088#discussion_r1619511354


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java:
##
@@ -53,8 +42,17 @@
  * The assignment builder prioritizes the properties in the following order:
  *  Balance > Stickiness.
  */
-public class OptimizedUniformAssignmentBuilder extends 
AbstractUniformAssignmentBuilder {
-private static final Logger LOG = 
LoggerFactory.getLogger(OptimizedUniformAssignmentBuilder.class);
+public class OptimizedUniformAssignmentBuilder {

Review Comment:
   I had the same question, we might need it in the rack aware case for all the 
common methods



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]

2024-05-29 Thread via GitHub


jeffkbkim commented on code in PR #16088:
URL: https://github.com/apache/kafka/pull/16088#discussion_r1619407866


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java:
##
@@ -53,8 +42,17 @@
  * The assignment builder prioritizes the properties in the following order:
  *  Balance > Stickiness.
  */
-public class OptimizedUniformAssignmentBuilder extends 
AbstractUniformAssignmentBuilder {
-private static final Logger LOG = 
LoggerFactory.getLogger(OptimizedUniformAssignmentBuilder.class);
+public class OptimizedUniformAssignmentBuilder {

Review Comment:
   are we going to remove AbstractUniformAssignmentBuilder?



##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java:
##
@@ -158,12 +159,11 @@ public void 
testFirstAssignmentTwoMembersTwoTopicsNoMemberRacks() {
 
 Map>> expectedAssignment = new 
HashMap<>();
 expectedAssignment.put(memberA, mkAssignment(
-mkTopicAssignment(topic1Uuid, 0, 2),
-mkTopicAssignment(topic3Uuid, 1)
+mkTopicAssignment(topic1Uuid, 0),
+mkTopicAssignment(topic3Uuid, 0, 1)
 ));
 expectedAssignment.put(memberB, mkAssignment(
-mkTopicAssignment(topic1Uuid, 1),
-mkTopicAssignment(topic3Uuid, 0)
+mkTopicAssignment(topic1Uuid, 1, 2)

Review Comment:
   I see some test cases where we have changed the expected assignment. why did 
it change?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java:
##
@@ -144,216 +133,137 @@ protected GroupAssignment buildAssignment() throws 
PartitionAssignorException {
 }
 }
 
-// The minimum required quota that each member needs to meet for a 
balanced assignment.
-// This is the same for all members.
-final int numberOfMembers = groupSpec.members().size();
-final int minQuota = totalPartitionsCount / numberOfMembers;
+// Compute the minimum required quota per member and the number of 
members
+// who should receive an extra partition.
+int numberOfMembers = groupSpec.members().size();
+minimumMemberQuota = totalPartitionsCount / numberOfMembers;
 remainingMembersToGetAnExtraPartition = totalPartitionsCount % 
numberOfMembers;
 
-groupSpec.members().keySet().forEach(memberId ->
-targetAssignment.put(memberId, new MemberAssignment(new 
HashMap<>())
-));
-
-potentiallyUnfilledMembers = assignStickyPartitions(minQuota);
-
-unassignedPartitionsRoundRobinAssignment();
+// Revoke the partitions which are either not part of the 
subscriptions or above
+// the maximum quota.
+maybeRevokePartitions();
 
-if (!unassignedPartitions.isEmpty()) {
-throw new PartitionAssignorException("Partitions were left 
unassigned");
-}
+// Assign the unassigned partitions to the members with space.
+assignRemainingPartitions();
 
 return new GroupAssignment(targetAssignment);
 }
 
-/**
- * Retains a set of partitions from the existing assignment and includes 
them in the target assignment.
- * Only relevant partitions that exist in the current topic metadata and 
subscriptions are considered.
- *
- *  For each member:
- * 
- *  Find the valid current assignment considering topic 
subscriptions and metadata
- *  If the current assignment exists, retain partitions up to the 
minimum quota.
- *  If the current assignment size is greater than the minimum 
quota and
- *  there are members that could get an extra partition, assign 
the next partition as well.
- *  Finally, if the member's current assignment size is less than 
the minimum quota,
- *  add them to the potentially unfilled members map and track the 
number of remaining
- *  partitions required to meet the quota.
- * 
- * 
- *
- * @return  Members mapped to the remaining number of partitions needed to 
meet the minimum quota,
- *  including members that are eligible to receive an extra 
partition.
- */
-private Map assignStickyPartitions(int minQuota) {
-Map potentiallyUnfilledMembers = new HashMap<>();
-
-groupSpec.members().forEach((memberId, assignmentMemberSpec) -> {
-List validCurrentMemberAssignment = 
validCurrentMemberAssignment(
-assignmentMemberSpec.assignedPartitions()
-);
-
-int currentAssignmentSize = validCurrentMemberAssignment.size();
-// Number of partitions required to meet the minimum quota.
-int remaining = minQuota - currentAssignmentSize;
-
-if (currentAssignmentSize > 0) {
-int re

Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]

2024-05-29 Thread via GitHub


dajac commented on code in PR #16088:
URL: https://github.com/apache/kafka/pull/16088#discussion_r1619198458


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java:
##
@@ -18,29 +18,18 @@
 
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.server.common.TopicIdPartition;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Queue;
 import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-import static java.lang.Math.min;
 
 /**
- * The optimized uniform assignment builder is used to generate the target 
assignment for a consumer group with
+ * The homogenous uniform assignment builder is used to generate the target 
assignment for a consumer group with

Review Comment:
   I would like to. I initially renamed the homogeneous one in this PR but 
@jeffkbkim asked to do it as a follow-up in order to simplify the diff.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]

2024-05-29 Thread via GitHub


dajac commented on code in PR #16088:
URL: https://github.com/apache/kafka/pull/16088#discussion_r1619198458


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java:
##
@@ -18,29 +18,18 @@
 
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.server.common.TopicIdPartition;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Queue;
 import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-import static java.lang.Math.min;
 
 /**
- * The optimized uniform assignment builder is used to generate the target 
assignment for a consumer group with
+ * The homogenous uniform assignment builder is used to generate the target 
assignment for a consumer group with

Review Comment:
   I would like too. I initially renamed the homogeneous one in this PR but 
@jeffkbkim asked to do it as a follow-up in order to simplify the diff.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]

2024-05-29 Thread via GitHub


dajac commented on PR #16088:
URL: https://github.com/apache/kafka/pull/16088#issuecomment-2137850455

   @rreddy-22 I just triggered a new build.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]

2024-05-29 Thread via GitHub


rreddy-22 commented on code in PR #16088:
URL: https://github.com/apache/kafka/pull/16088#discussion_r1619195556


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java:
##
@@ -18,29 +18,18 @@
 
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.server.common.TopicIdPartition;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Queue;
 import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-import static java.lang.Math.min;
 
 /**
- * The optimized uniform assignment builder is used to generate the target 
assignment for a consumer group with
+ * The homogenous uniform assignment builder is used to generate the target 
assignment for a consumer group with

Review Comment:
   Are we trying to rename the assignor to homogeneous and heterogeneous 
instead of optimized and general?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]

2024-05-29 Thread via GitHub


rreddy-22 commented on PR #16088:
URL: https://github.com/apache/kafka/pull/16088#issuecomment-2137846798

   Thanks for the PR! 
   The build seems to be failing, can we look into it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]

2024-05-28 Thread via GitHub


dajac commented on PR #16088:
URL: https://github.com/apache/kafka/pull/16088#issuecomment-2136604335

   @jeffkbkim Your understanding is correct. The biggest gain comes from the 
rewrite of the "assign sticky partitions" methods. I did two things there: a) 
avoid the intermediate data structure (the topicpartition list); and b) avoid 
copying unchanged assignments. Regarding your point 3), the sorting does not 
have a real impact on the incremental case but it does have in the full 
assignment case. As you saw, we don't need it. Note that we don't use a Set 
anymore for the unassigned partitions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]

2024-05-28 Thread via GitHub


jeffkbkim commented on PR #16088:
URL: https://github.com/apache/kafka/pull/16088#issuecomment-2136254190

   @dajac for me, this seems like a non-trivial change. it seems we are 
changing a core part of the assignment logic.
   
   some general questions wrt the improved performance (numbers look great!) I 
have, specifically on what contributed to the large gain:
   1. It seems a lot of the gain come from the fact that the cost of assigning 
a partition was expensive and we now can bypass a lot of it. Not just in terms 
of object creation but also bookkeeping/updating all of the different data 
structures
   2. Usually we will not revoke many partitions at once, which i think is the 
case for the benchmark. So we also gain from `num_partitions_to_revoke <<< 
num_partitions_to_assign`
   3. We no longer sort the unassigned(remaining) partitions. We don't need 
this as we iterate the `Set unassignedPartitions` via the partition 
index. 
   
   Let me know if my understanding is correct. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]

2024-05-28 Thread via GitHub


dajac commented on PR #16088:
URL: https://github.com/apache/kafka/pull/16088#issuecomment-2135700577

   @jeffkbkim Done. However, I am not sure if it will really help because the 
changes are minor.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]

2024-05-28 Thread via GitHub


jeffkbkim commented on PR #16088:
URL: https://github.com/apache/kafka/pull/16088#issuecomment-2135625791

   it's a bit difficult to see the changes which are supposedly minor since the 
file name has changed. Can we do that separately, so that we can see the diff?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Optimize uniform (homogenous) assignor [kafka]

2024-05-27 Thread via GitHub


dajac opened a new pull request, #16088:
URL: https://github.com/apache/kafka/pull/16088

   This patch optimizes uniform (homogenous) assignor by avoiding creating a 
copy of all the assignments. Instead, the assignor creates a copy only if the 
assignment is updated. It is a sort of copy-on-write. This change reduces the 
overhead of the TargetAssignmentBuilder when ran with the uniform (homogenous) 
assignor.
   
   Trunk:
   
   ```
   Benchmark (memberCount)  
(partitionsToMemberRatio)  (topicCount)  Mode  Cnt   Score   Error  Units
   TargetAssignmentBuilderBenchmark.build1  
   10   100  avgt5  24.535 ± 1.583  ms/op
   TargetAssignmentBuilderBenchmark.build1  
   10  1000  avgt5  24.094 ± 0.223  ms/op
   JMH benchmarks done
   ```
   
   ```
   Benchmark   (assignmentType)  
(assignorType)  (isRackAware)  (memberCount)  (partitionsToMemberRatio)  
(subscriptionType)  (topicCount)  Mode  Cnt   Score   Error  Units
   ServerSideAssignorBenchmark.doAssignment INCREMENTAL 
UNIFORM  false  1 10 
HOMOGENEOUS   100  avgt5  14.697 ± 0.133  ms/op
   ServerSideAssignorBenchmark.doAssignment INCREMENTAL 
UNIFORM  false  1 10 
HOMOGENEOUS  1000  avgt5  15.073 ± 0.135  ms/op
   JMH benchmarks done
   ```
   
   Patch:
   
   ```
   Benchmark (memberCount)  
(partitionsToMemberRatio)  (topicCount)  Mode  Cnt  Score   Error  Units
   TargetAssignmentBuilderBenchmark.build1  
   10   100  avgt5  3.376 ± 0.577  ms/op
   TargetAssignmentBuilderBenchmark.build1  
   10  1000  avgt5  3.731 ± 0.359  ms/op
   JMH benchmarks done
   ```
   
   ```
   Benchmark   (assignmentType)  
(assignorType)  (isRackAware)  (memberCount)  (partitionsToMemberRatio)  
(subscriptionType)  (topicCount)  Mode  Cnt  Score   Error  Units
   ServerSideAssignorBenchmark.doAssignment INCREMENTAL 
UNIFORM  false  1 10 
HOMOGENEOUS   100  avgt5  1.975 ± 0.086  ms/op
   ServerSideAssignorBenchmark.doAssignment INCREMENTAL 
UNIFORM  false  1 10 
HOMOGENEOUS  1000  avgt5  2.026 ± 0.190  ms/op
   JMH benchmarks done
   ```
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org