showuon commented on a change in pull request #10552: URL: https://github.com/apache/kafka/pull/10552#discussion_r615228600
########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ########## @@ -387,58 +398,121 @@ private boolean allSubscriptionsEqual(Set<String> allTopics, TreeSet<String> sortedCurrentSubscriptions = new TreeSet<>(new SubscriptionComparator(currentAssignment)); sortedCurrentSubscriptions.addAll(currentAssignment.keySet()); + int totalPartitionCount = partitionsPerTopic.values().stream().reduce(0, Integer::sum); + balance(currentAssignment, prevAssignment, sortedPartitions, unassignedPartitions, sortedCurrentSubscriptions, - consumer2AllPotentialPartitions, partition2AllPotentialConsumers, currentPartitionConsumer, revocationRequired); + consumer2AllPotentialTopics, topic2AllPotentialConsumers, currentPartitionConsumer, revocationRequired, + partitionsPerTopic, totalPartitionCount); + return currentAssignment; } + /** + * get the unassigned partition list by computing the difference set of the sortedPartitions(all partitions) + * and toBeRemovedPartitions. We use two pointers technique here: + * + * We loop the sortedPartition, and compare the ith element in sorted toBeRemovedPartitions(i start from 0): + * - if not equal to the ith element, add to unassignedPartitions + * - if equal to the the ith element, get next element from toBeRemovedPartitions + * + * @param sortedPartitions: sorted all partitions + * @param toBeRemovedPartitions: sorted partitions, all are included in the sortedPartitions + * @return the partitions don't assign to any current consumers + */ + private List<TopicPartition> getUnassignedPartitions(List<TopicPartition> sortedPartitions, Review comment: refactor 2: We used to have an ArrayList of `unassignedPartitions`, with all sorted partitions (ex: 1 million partitions), and loop through current assignment, to remove already assigned partitions, ex: 999,000 of them, so we'll only have 1000 partitions left. However, the ArrayList element remove is pretty slow for huge size because it needs to find element first, and then, do arrayCopy for the removed array with size of (originalSize -1). This situation should happen a lot since each rebalance, we should only have small set of changes (ex: 1 consumer dropped), so this is an important improvement. To refactor it, I used two pointer technique to loop through 2 sorted list: `sortedPartitions` and `sortedToBeRemovedPartitions`. And only add the difference set of the 2 lists. The looping and element adding is very fast in ArrayList. So, it improves a lot. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org