showuon commented on a change in pull request #10509:
URL: https://github.com/apache/kafka/pull/10509#discussion_r622040343



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -263,16 +279,59 @@ private boolean allSubscriptionsEqual(Set<String> 
allTopics,
         if (log.isDebugEnabled()) {
             log.debug("final assignment: " + assignment);
         }
-
+        
         return assignment;
     }
 
-    private SortedSet<TopicPartition> getTopicPartitions(Map<String, Integer> 
partitionsPerTopic) {
-        SortedSet<TopicPartition> allPartitions =
-            new 
TreeSet<>(Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition));
-        for (Entry<String, Integer> entry: partitionsPerTopic.entrySet()) {
-            String topic = entry.getKey();
-            for (int i = 0; i < entry.getValue(); ++i) {
+    /**
+     * get the unassigned partition list by computing the difference set of 
the sortedPartitions(all partitions)
+     * and sortedToBeRemovedPartitions. We use two pointers technique here:
+     *
+     * We loop the sortedPartition, and compare the ith element in sorted 
toBeRemovedPartitions(i start from 0):
+     *   - if not equal to the ith element, add to unassignedPartitions
+     *   - if equal to the the ith element, get next element from 
sortedToBeRemovedPartitions
+     *
+     * @param sortedPartitions: sorted all partitions
+     * @param sortedToBeRemovedPartitions: sorted partitions, all are included 
in the sortedPartitions
+     * @return the partitions don't assign to any current consumers
+     */
+    private List<TopicPartition> getUnassignedPartitions(List<TopicPartition> 
sortedPartitions,

Review comment:
       > In my (admittedly biased) opinion the original method was the easiest 
to understand in the code, and objectively speaking it also used less memory
   
   I like your honesty, haha! And yes, I agree wit you.
   
   The timing for `getTopicPartitions` and `remove assigned partitions` is here:
   **before:**
   ```
   [2021-04-28 17:39:10,260] ERROR getTopicPartitions took:695 
   [2021-04-28 17:39:10,266] ERROR remove assigned took:1 
   [2021-04-28 17:39:11,594] ERROR getTopicPartitions took:607
   [2021-04-28 17:39:12,889] ERROR remove assigned took:1291
   ```
   **after**
   ```
   [2021-04-28 17:36:02,701] ERROR getTopicPartitions took:41
   [2021-04-28 17:36:02,707] ERROR remove assigned took:1 
   [2021-04-28 17:36:03,290] ERROR getTopicPartitions took:15
   [2021-04-28 17:36:04,204] ERROR remove assigned took:910
   ```
   
   So, basically, your calculation is correct. The `getTopicPartitions` method 
has 10x (or even more) faster, and the `removed assigned partitions` takes 
almost the same time, but **after** is still faster. 
   
   I agree it is bad that **after** needs a lot more memory compared with 
**before**. The more memory usage is due to the additional 
`toBeRemovedPartitions` created. I can reduce the memory usage by not creating 
the `sortedAllPartitions` before the assignor started since I only need it when 
`getUnassignedPartitions`. In `getUnassignedPartitions`, I was looping 2 
partitions (`sortedAllPartitions` & `sortedToBeRemovedPartitions`), now, 
without `sortedAllPartitions`, I can loop through `sortedAllTopics`, something 
like this:
   ```
   List<String> allTopics = new ArrayList<>(partitionsPerTopic.keySet());
   Collections.sort(allTopics);
   for (String topic: allTopics) {
       int partitionCount = partitionsPerTopic.get(topic);
       for (int i = 0; i < partitionCount; i++) {
           if (!(nextPartition.topic().equals(topic) && 
nextPartition.partition() == i)) {
               ...
            }
        }
   }
   ```
   This way, **before** and **after** will have the same memory usage. And the 
time spent is basically the same:
   ```
   [2021-04-28 17:48:21,694] ERROR remove assigned took:46 
   [2021-04-28 17:48:23,405] ERROR remove assigned took:1035
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to