vahidhashemian commented on a change in pull request #10552:
URL: https://github.com/apache/kafka/pull/10552#discussion_r637664106



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -80,9 +80,7 @@ public MemberData(List<TopicPartition> partitions, 
Optional<Integer> generation)
             log.debug("Detected that all not consumers were subscribed to same 
set of topics, falling back to the "
                           + "general case assignment algorithm");

Review comment:
       Can you please fix the wording here too?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -469,73 +426,190 @@ private boolean allSubscriptionsEqual(Set<String> 
allTopics,
         TreeSet<String> sortedCurrentSubscriptions = new TreeSet<>(new 
SubscriptionComparator(currentAssignment));
         sortedCurrentSubscriptions.addAll(currentAssignment.keySet());
 
-        balance(currentAssignment, prevAssignment, sortedPartitions, 
unassignedPartitions, sortedCurrentSubscriptions,
-            consumer2AllPotentialPartitions, partition2AllPotentialConsumers, 
currentPartitionConsumer, revocationRequired);
+        balance(currentAssignment, prevAssignment, sortedAllPartitions, 
unassignedPartitions, sortedCurrentSubscriptions,
+            consumer2AllPotentialTopics, topic2AllPotentialConsumers, 
currentPartitionConsumer, revocationRequired,
+            partitionsPerTopic, totalPartitionsCount);
+
+        if (log.isDebugEnabled()) {
+            log.debug("final assignment: {}", currentAssignment);
+        }
+
         return currentAssignment;
     }
 
+    /**
+     * get the unassigned partition list by computing the difference set of 
the sortedPartitions(all partitions)
+     * and sortedAssignedPartitions. If no assigned partitions, we'll just 
return all sorted topic partitions.
+     * This is used in generalAssign method
+     *
+     * We loop the sortedPartition, and compare the ith element in 
sortedAssignedPartitions(i start from 0):
+     *   - if not equal to the ith element, add to unassignedPartitions
+     *   - if equal to the the ith element, get next element from 
sortedAssignedPartitions
+     *
+     * @param sortedAllPartitions:          sorted all partitions
+     * @param sortedAssignedPartitions:     sorted partitions, all are 
included in the sortedPartitions
+     * @param topic2AllPotentialConsumers:  topics mapped to all consumers 
that subscribed to it
+     * @return                              the partitions don't assign to any 
current consumers

Review comment:
       nit wording: `partitions that aren't assigned to any current consumer`

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -384,37 +326,39 @@ private boolean allSubscriptionsEqual(Set<String> 
allTopics,
      *
      * @param partitionsPerTopic         The number of partitions for each 
subscribed topic.
      * @param subscriptions              Map from the member id to their 
respective topic subscription
+     * @param currentAssignment          Each consumer's previously owned and 
still-subscribed partitions
      *
      * @return                           Map from each member to the list of 
partitions assigned to them.
      */
     private Map<String, List<TopicPartition>> generalAssign(Map<String, 
Integer> partitionsPerTopic,
-                                                            Map<String, 
Subscription> subscriptions) {
-        Map<String, List<TopicPartition>> currentAssignment = new HashMap<>();
+                                                            Map<String, 
Subscription> subscriptions,
+                                                            Map<String, 
List<TopicPartition>> currentAssignment) {
+        if (log.isDebugEnabled()) {
+            log.debug("performing general assign. partitionsPerTopic: {}, 
subscriptions: {}, currentAssignment: {}",
+                partitionsPerTopic, subscriptions, currentAssignment);
+        }
+
         Map<TopicPartition, ConsumerGenerationPair> prevAssignment = new 
HashMap<>();
         partitionMovements = new PartitionMovements();
 
-        prepopulateCurrentAssignments(subscriptions, currentAssignment, 
prevAssignment);
+        prepopulateCurrentAssignments(subscriptions, prevAssignment);
 
-        // a mapping of all topic partitions to all consumers that can be 
assigned to them
-        final Map<TopicPartition, List<String>> 
partition2AllPotentialConsumers = new HashMap<>();
-        // a mapping of all consumers to all potential topic partitions that 
can be assigned to them
-        final Map<String, List<TopicPartition>> 
consumer2AllPotentialPartitions = new HashMap<>();
+        // a mapping of all topics to all consumers that can be assigned to 
them
+        final Map<String, List<String>> topic2AllPotentialConsumers = new 
HashMap<>(partitionsPerTopic.keySet().size());
+        // a mapping of all consumers to all potential topics that can be 
assigned to them
+        final Map<String, List<String>> consumer2AllPotentialTopics = new 
HashMap<>(subscriptions.keySet().size());
 
-        // initialize partition2AllPotentialConsumers and 
consumer2AllPotentialPartitions in the following two for loops
-        for (Entry<String, Integer> entry: partitionsPerTopic.entrySet()) {
-            for (int i = 0; i < entry.getValue(); ++i)
-                partition2AllPotentialConsumers.put(new 
TopicPartition(entry.getKey(), i), new ArrayList<>());
-        }
+        // initialize topic2AllPotentialConsumers and 
consumer2AllPotentialTopics
+        partitionsPerTopic.keySet().stream().forEach(
+            topicName -> topic2AllPotentialConsumers.put(topicName, new 
ArrayList<>()));
 
         for (Entry<String, Subscription> entry: subscriptions.entrySet()) {
             String consumerId = entry.getKey();
-            consumer2AllPotentialPartitions.put(consumerId, new ArrayList<>());
+            List<String> subscribedTopics = new 
ArrayList<>(entry.getValue().topics().size());
+            consumer2AllPotentialTopics.put(consumerId, subscribedTopics);
             entry.getValue().topics().stream().filter(topic -> 
partitionsPerTopic.get(topic) != null).forEach(topic -> {
-                for (int i = 0; i < partitionsPerTopic.get(topic); ++i) {
-                    TopicPartition topicPartition = new TopicPartition(topic, 
i);
-                    
consumer2AllPotentialPartitions.get(consumerId).add(topicPartition);
-                    
partition2AllPotentialConsumers.get(topicPartition).add(consumerId);
-                }
+                subscribedTopics.add(topic);

Review comment:
       Isn't `topic` already added in line 357 above?

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java
##########
@@ -598,6 +555,43 @@ public void 
testLargeAssignmentAndGroupWithUniformSubscription() {
         assignor.assign(partitionsPerTopic, subscriptions);
     }
 
+    @Timeout(40)
+    @Test
+    public void testLargeAssignmentAndGroupWithNonEqualSubscription() {
+        // 1 million partitions!
+        int topicCount = 500;
+        int partitionCount = 2_000;
+        int consumerCount = 2_000;
+
+        List<String> topics = new ArrayList<>();
+        Map<String, Integer> partitionsPerTopic = new HashMap<>();
+        for (int i = 0; i < topicCount; i++) {
+            String topicName = getTopicName(i, topicCount);
+            topics.add(topicName);
+            partitionsPerTopic.put(topicName, partitionCount);
+        }
+        for (int i = 0; i < consumerCount; i++) {
+            if (i == consumerCount - 1) {
+                subscriptions.put(getConsumerName(i, consumerCount), new 
Subscription(topics.subList(0, 1)));
+            } else {
+                subscriptions.put(getConsumerName(i, consumerCount), new 
Subscription(topics));
+            }
+        }
+
+        Map<String, List<TopicPartition>> assignment = 
assignor.assign(partitionsPerTopic, subscriptions);
+

Review comment:
       Would it be worthwhile to add some verification after the `assign` calls 
in this test?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -444,23 +392,32 @@ private boolean allSubscriptionsEqual(Set<String> 
allTopics,
                 // otherwise (the consumer still exists)
                 for (Iterator<TopicPartition> partitionIter = 
entry.getValue().iterator(); partitionIter.hasNext();) {
                     TopicPartition partition = partitionIter.next();
-                    if 
(!partition2AllPotentialConsumers.containsKey(partition)) {
-                        // if this topic partition of this consumer no longer 
exists remove it from currentAssignment of the consumer
+                    if 
(!topic2AllPotentialConsumers.containsKey(partition.topic())) {
+                        // if this topic partition of this consumer no longer 
exists, remove it from currentAssignment of the consumer
                         partitionIter.remove();
                         currentPartitionConsumer.remove(partition);
-                    } else if 
(!subscriptions.get(entry.getKey()).topics().contains(partition.topic())) {
-                        // if this partition cannot remain assigned to its 
current consumer because the consumer
-                        // is no longer subscribed to its topic remove it from 
currentAssignment of the consumer
+                    } else if 
(!consumerSubscription.topics().contains(partition.topic())) {
+                        // because the consumer is no longer subscribed to its 
topic, remove it from currentAssignment of the consumer
                         partitionIter.remove();
                         revocationRequired = true;
-                    } else
+                    } else {
                         // otherwise, remove the topic partition from those 
that need to be assigned only if
                         // its current consumer is still subscribed to its 
topic (because it is already assigned
                         // and we would want to preserve that assignment as 
much as possible)
-                        unassignedPartitions.remove(partition);
+                        assignedPartitions.add(partition);
+                    }
                 }
             }
         }
+
+        // all partitions that needed to be assigned
+        List<TopicPartition> unassignedPartitions = 
getUnassignedPartitions(sortedAllPartitions, assignedPartitions, 
topic2AllPotentialConsumers);
+        assignedPartitions = null;

Review comment:
       Is this `null` assignment needed? Don't see the variable used after this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to