showuon commented on a change in pull request #10552:
URL: https://github.com/apache/kafka/pull/10552#discussion_r615229166



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -387,58 +398,121 @@ private boolean allSubscriptionsEqual(Set<String> 
allTopics,
         TreeSet<String> sortedCurrentSubscriptions = new TreeSet<>(new 
SubscriptionComparator(currentAssignment));
         sortedCurrentSubscriptions.addAll(currentAssignment.keySet());
 
+        int totalPartitionCount = 
partitionsPerTopic.values().stream().reduce(0, Integer::sum);
+
         balance(currentAssignment, prevAssignment, sortedPartitions, 
unassignedPartitions, sortedCurrentSubscriptions,
-            consumer2AllPotentialPartitions, partition2AllPotentialConsumers, 
currentPartitionConsumer, revocationRequired);
+            consumer2AllPotentialTopics, topic2AllPotentialConsumers, 
currentPartitionConsumer, revocationRequired,
+            partitionsPerTopic, totalPartitionCount);
+        
         return currentAssignment;
     }
 
+    /**
+     * get the unassigned partition list by computing the difference set of 
the sortedPartitions(all partitions)
+     * and toBeRemovedPartitions. We use two pointers technique here:
+     *
+     * We loop the sortedPartition, and compare the ith element in sorted 
toBeRemovedPartitions(i start from 0):
+     *   - if not equal to the ith element, add to unassignedPartitions
+     *   - if equal to the the ith element, get next element from 
toBeRemovedPartitions
+     *
+     * @param sortedPartitions: sorted all partitions
+     * @param toBeRemovedPartitions: sorted partitions, all are included in 
the sortedPartitions
+     * @return the partitions don't assign to any current consumers
+     */
+    private List<TopicPartition> getUnassignedPartitions(List<TopicPartition> 
sortedPartitions,
+                                                         List<TopicPartition> 
toBeRemovedPartitions) {
+        List<TopicPartition> unassignedPartitions = new ArrayList<>();
+
+        int index = 0;
+        boolean shouldAddDirectly = false;
+        int sizeToBeRemovedPartitions = toBeRemovedPartitions.size();
+        TopicPartition nextPartition = toBeRemovedPartitions.get(index);
+        for (TopicPartition topicPartition : sortedPartitions) {
+            if (shouldAddDirectly || !nextPartition.equals(topicPartition)) {
+                unassignedPartitions.add(topicPartition);
+            } else {
+                // equal case, don't add to unassignedPartitions, just get 
next partition
+                if (index < sizeToBeRemovedPartitions - 1) {
+                    nextPartition = toBeRemovedPartitions.get(++index);
+                } else {
+                    // add the remaining directly since there is no more 
toBeRemovedPartitions
+                    shouldAddDirectly = true;
+                }
+            }
+        }
+        return unassignedPartitions;
+    }
+
+    /**
+     * update the prevAssignment with the partitions, consumer and generation 
in parameters
+     *
+     * @param partitions: The partitions to be updated the prevAssignement
+     * @param consumer: The consumer Id
+     * @param prevAssignment: The assignment contains the assignment with the 
2nd largest generation
+     * @param generation: The generation of this assignment (partitions)
+     */
+    private void updatePrevAssignment(Map<TopicPartition, 
ConsumerGenerationPair> prevAssignment,
+                                      List<TopicPartition> partitions,
+                                      String consumer,
+                                      int generation) {
+        for (TopicPartition partition: partitions) {
+            ConsumerGenerationPair consumerGeneration = 
prevAssignment.get(partition);
+            if (consumerGeneration != null) {
+                // only keep the latest previous assignment
+                if (generation > consumerGeneration.generation)
+                    prevAssignment.put(partition, new 
ConsumerGenerationPair(consumer, generation));
+            } else {
+                prevAssignment.put(partition, new 
ConsumerGenerationPair(consumer, generation));
+            }
+        }
+    }
+
+    /**
+     * filling in the currentAssignment and prevAssignment from the 
subscriptions.
+     *
+     * @param subscriptions: Map from the member id to their respective topic 
subscription
+     * @param currentAssignment: The assignment contains the assignments with 
the largest generation
+     * @param prevAssignment: The assignment contains the assignment with the 
2nd largest generation
+     */
     private void prepopulateCurrentAssignments(Map<String, Subscription> 
subscriptions,
                                                Map<String, 
List<TopicPartition>> currentAssignment,
                                                Map<TopicPartition, 
ConsumerGenerationPair> prevAssignment) {
         // we need to process subscriptions' user data with each consumer's 
reported generation in mind
         // higher generations overwrite lower generations in case of a conflict
         // note that a conflict could exists only if user data is for 
different generations
 
-        // for each partition we create a sorted map of its consumers by 
generation
-        Map<TopicPartition, TreeMap<Integer, String>> 
sortedPartitionConsumersByGeneration = new HashMap<>();

Review comment:
       refactor 3: 
   We used to have a `sortedPartitionConsumersByGeneration` map to store all 
partitions with all generation/consumer, to compute the `currentAssignment` and 
`prevAssignment`. It takes many memory and slow down the calculation. Improve 
it by computing the  `currentAssignment` and `prevAssignment` while looping the 
subscriptions list (referred to the `allSubscriptionsEqual` method :)) . 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to