ableegoldman commented on a change in pull request #10509:
URL: https://github.com/apache/kafka/pull/10509#discussion_r627012042
##########
File path:
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -149,141 +149,210 @@ private boolean allSubscriptionsEqual(Set<String>
allTopics,
* This constrainedAssign optimizes the assignment algorithm when all
consumers were subscribed to same set of topics.
* The method includes the following steps:
*
- * 1. Reassign as many previously owned partitions as possible, up to the
maxQuota
- * 2. Fill remaining members up to minQuota
- * 3. If we ran out of unassigned partitions before filling all consumers,
we need to start stealing partitions
- * from the over-full consumers at max capacity
- * 4. Otherwise we may have run out of unfilled consumers before assigning
all partitions, in which case we
- * should just distribute one partition each to all consumers at min
capacity
+ * 1. Reassign previously owned partitions:
+ * a. if owned less than minQuota partitions, just assign all owned
partitions, and put the member into unfilled member list
+ * b. if owned maxQuota or more, and we're still under the number of
expected max capacity members, assign maxQuota partitions
+ * c. if owned at least "minQuota" of partitions, assign minQuota
partitions, and put the member into unfilled member list if
+ * we're still under the number of expected max capacity members
+ * 2. Fill remaining members up to the expected numbers of maxQuota
partitions, otherwise, to minQuota partitions
*
* @param partitionsPerTopic The number of partitions for each
subscribed topic
* @param consumerToOwnedPartitions Each consumer's previously owned and
still-subscribed partitions
*
- * @return Map from each member to the list of partitions assigned to them.
+ * @return Map from each member to the list of
partitions assigned to them.
*/
private Map<String, List<TopicPartition>> constrainedAssign(Map<String,
Integer> partitionsPerTopic,
Map<String,
List<TopicPartition>> consumerToOwnedPartitions) {
- SortedSet<TopicPartition> unassignedPartitions =
getTopicPartitions(partitionsPerTopic);
+ if (log.isDebugEnabled()) {
+ log.debug("performing constrained assign. partitionsPerTopic: {},
consumerToOwnedPartitions: {}",
+ partitionsPerTopic, consumerToOwnedPartitions);
+ }
Set<TopicPartition> allRevokedPartitions = new HashSet<>();
- // Each consumer should end up in exactly one of the below
- // the consumers not yet at capacity
+ // the consumers not yet at expected capacity
List<String> unfilledMembers = new LinkedList<>();
- // the members with exactly maxQuota partitions assigned
- Queue<String> maxCapacityMembers = new LinkedList<>();
- // the members with exactly minQuota partitions assigned
- Queue<String> minCapacityMembers = new LinkedList<>();
int numberOfConsumers = consumerToOwnedPartitions.size();
- int minQuota = (int) Math.floor(((double) unassignedPartitions.size())
/ numberOfConsumers);
- int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size())
/ numberOfConsumers);
+ int totalPartitionsCount =
partitionsPerTopic.values().stream().reduce(0, Integer::sum);
+
+ int minQuota = (int) Math.floor(((double) totalPartitionsCount) /
numberOfConsumers);
+ int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) /
numberOfConsumers);
+ // the expected number of members with maxQuota assignment
+ int expectedNumMembersHavingMorePartitions = totalPartitionsCount %
numberOfConsumers;
Review comment:
Just a nit -- and to clarify up front, if you agree with this let's
still hold off on doing it here so this PR can finally be merged, as I figure
any nits can be addressed in your general assign PR:
It's still a bit unclear what this value will be sued for when you first see
it, maybe we can work in the word `minQuota` somewhere in the name? Eg
`expectedNumMembersWithMoreThanMinQuotaPartitions`, or for a slightly shorter
example `numConsumersAssignedOverMinQuota`, or something between or similar to
those
FYI I'm also ok with it as-is if you prefer the current name -- just wanted
to throw out some other suggestions. I'll trust you to pick whatever name feels
right 🙂
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]