showuon commented on a change in pull request #10552:
URL: https://github.com/apache/kafka/pull/10552#discussion_r615228600
##########
File path:
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -387,58 +398,121 @@ private boolean allSubscriptionsEqual(Set<String>
allTopics,
TreeSet<String> sortedCurrentSubscriptions = new TreeSet<>(new
SubscriptionComparator(currentAssignment));
sortedCurrentSubscriptions.addAll(currentAssignment.keySet());
+ int totalPartitionCount =
partitionsPerTopic.values().stream().reduce(0, Integer::sum);
+
balance(currentAssignment, prevAssignment, sortedPartitions,
unassignedPartitions, sortedCurrentSubscriptions,
- consumer2AllPotentialPartitions, partition2AllPotentialConsumers,
currentPartitionConsumer, revocationRequired);
+ consumer2AllPotentialTopics, topic2AllPotentialConsumers,
currentPartitionConsumer, revocationRequired,
+ partitionsPerTopic, totalPartitionCount);
+
return currentAssignment;
}
+ /**
+ * get the unassigned partition list by computing the difference set of
the sortedPartitions(all partitions)
+ * and toBeRemovedPartitions. We use two pointers technique here:
+ *
+ * We loop the sortedPartition, and compare the ith element in sorted
toBeRemovedPartitions(i start from 0):
+ * - if not equal to the ith element, add to unassignedPartitions
+ * - if equal to the the ith element, get next element from
toBeRemovedPartitions
+ *
+ * @param sortedPartitions: sorted all partitions
+ * @param toBeRemovedPartitions: sorted partitions, all are included in
the sortedPartitions
+ * @return the partitions don't assign to any current consumers
+ */
+ private List<TopicPartition> getUnassignedPartitions(List<TopicPartition>
sortedPartitions,
Review comment:
refactor 2:
We used to have an ArrayList of `unassignedPartitions`, with all sorted
partitions (ex: 1 million partitions), and loop through current assignment, to
remove already assigned partitions, ex: 999,000 of them, so we'll only have
1000 partitions left. However, the ArrayList element remove is pretty slow for
huge size because it needs to find element first, and then, do arrayCopy for
the removed array with size of (originalSize -1). This situation should happen
a lot since each rebalance, we should only have small set of changes (ex: 1
consumer dropped), so this is an important improvement.
To refactor it, I used two pointer technique to loop through 2 sorted list:
`sortedPartitions` and `sortedToBeRemovedPartitions`. And only add the
difference set of the 2 lists. The looping and element adding is very fast in
ArrayList. So, it improves a lot.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]