ableegoldman commented on a change in pull request #10509:
URL: https://github.com/apache/kafka/pull/10509#discussion_r621502594
##########
File path:
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -163,127 +159,179 @@ private boolean allSubscriptionsEqual(Set<String>
allTopics,
*/
private Map<String, List<TopicPartition>> constrainedAssign(Map<String,
Integer> partitionsPerTopic,
Map<String,
List<TopicPartition>> consumerToOwnedPartitions) {
- SortedSet<TopicPartition> unassignedPartitions =
getTopicPartitions(partitionsPerTopic);
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("performing constrained assign.
partitionsPerTopic: %s, consumerToOwnedPartitions: %s",
+ partitionsPerTopic, consumerToOwnedPartitions));
+ }
+
+ List<TopicPartition> sortedAllPartitions =
getTopicPartitions(partitionsPerTopic);
Set<TopicPartition> allRevokedPartitions = new HashSet<>();
- // Each consumer should end up in exactly one of the below
- // the consumers not yet at capacity
+ // the consumers not yet at expected capacity
List<String> unfilledMembers = new LinkedList<>();
- // the members with exactly maxQuota partitions assigned
- Queue<String> maxCapacityMembers = new LinkedList<>();
- // the members with exactly minQuota partitions assigned
- Queue<String> minCapacityMembers = new LinkedList<>();
int numberOfConsumers = consumerToOwnedPartitions.size();
- int minQuota = (int) Math.floor(((double) unassignedPartitions.size())
/ numberOfConsumers);
- int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size())
/ numberOfConsumers);
+ int totalPartitionsCount = sortedAllPartitions.size();
+
+ int minQuota = (int) Math.floor(((double) totalPartitionsCount) /
numberOfConsumers);
+ int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) /
numberOfConsumers);
+ // the expected number of members with maxQuota assignment
+ int numExpectedMaxCapacityMembers = totalPartitionsCount %
numberOfConsumers;
+ // the number of members with exactly maxQuota partitions assigned
+ int numMaxCapacityMembers = 0;
- // initialize the assignment map with an empty array of size minQuota
for all members
+ // initialize the assignment map with an empty array of size maxQuota
for all members
Map<String, List<TopicPartition>> assignment = new HashMap<>(
-
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c
-> new ArrayList<>(minQuota))));
+
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c
-> new ArrayList<>(maxQuota))));
+ List<TopicPartition> toBeRemovedPartitions = new ArrayList<>();
// Reassign as many previously owned partitions as possible
for (Map.Entry<String, List<TopicPartition>> consumerEntry :
consumerToOwnedPartitions.entrySet()) {
String consumer = consumerEntry.getKey();
List<TopicPartition> ownedPartitions = consumerEntry.getValue();
List<TopicPartition> consumerAssignment = assignment.get(consumer);
- int i = 0;
- // assign the first N partitions up to the max quota, and mark the
remaining as being revoked
- for (TopicPartition tp : ownedPartitions) {
- if (i < maxQuota) {
- consumerAssignment.add(tp);
- unassignedPartitions.remove(tp);
- } else {
- allRevokedPartitions.add(tp);
- }
- ++i;
- }
if (ownedPartitions.size() < minQuota) {
+ // the expected assignment size is more than consumer have
now, so keep all the owned partitions
+ // and put this member into unfilled member list
+ if (ownedPartitions.size() > 0) {
+ consumerAssignment.addAll(ownedPartitions);
+ toBeRemovedPartitions.addAll(ownedPartitions);
+ }
unfilledMembers.add(consumer);
+ } else if (ownedPartitions.size() >= maxQuota &&
numMaxCapacityMembers++ <= numExpectedMaxCapacityMembers) {
Review comment:
nit: don't use `++` inline like this, it makes the code harder to
understand plus we'll end up incrementing it past the
`numExpectedMaxCapacityMembers` so its value won't represent the actual number
of members at max capacity in the end. This might be confusing, eg if we want
to add logging that includes this value later on. Let's increment it in the
body of this `else if`
##########
File path:
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -163,127 +159,179 @@ private boolean allSubscriptionsEqual(Set<String>
allTopics,
*/
private Map<String, List<TopicPartition>> constrainedAssign(Map<String,
Integer> partitionsPerTopic,
Map<String,
List<TopicPartition>> consumerToOwnedPartitions) {
- SortedSet<TopicPartition> unassignedPartitions =
getTopicPartitions(partitionsPerTopic);
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("performing constrained assign.
partitionsPerTopic: %s, consumerToOwnedPartitions: %s",
+ partitionsPerTopic, consumerToOwnedPartitions));
+ }
+
+ List<TopicPartition> sortedAllPartitions =
getTopicPartitions(partitionsPerTopic);
Set<TopicPartition> allRevokedPartitions = new HashSet<>();
- // Each consumer should end up in exactly one of the below
- // the consumers not yet at capacity
+ // the consumers not yet at expected capacity
List<String> unfilledMembers = new LinkedList<>();
- // the members with exactly maxQuota partitions assigned
- Queue<String> maxCapacityMembers = new LinkedList<>();
- // the members with exactly minQuota partitions assigned
- Queue<String> minCapacityMembers = new LinkedList<>();
int numberOfConsumers = consumerToOwnedPartitions.size();
- int minQuota = (int) Math.floor(((double) unassignedPartitions.size())
/ numberOfConsumers);
- int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size())
/ numberOfConsumers);
+ int totalPartitionsCount = sortedAllPartitions.size();
+
+ int minQuota = (int) Math.floor(((double) totalPartitionsCount) /
numberOfConsumers);
+ int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) /
numberOfConsumers);
+ // the expected number of members with maxQuota assignment
+ int numExpectedMaxCapacityMembers = totalPartitionsCount %
numberOfConsumers;
+ // the number of members with exactly maxQuota partitions assigned
+ int numMaxCapacityMembers = 0;
- // initialize the assignment map with an empty array of size minQuota
for all members
+ // initialize the assignment map with an empty array of size maxQuota
for all members
Map<String, List<TopicPartition>> assignment = new HashMap<>(
-
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c
-> new ArrayList<>(minQuota))));
+
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c
-> new ArrayList<>(maxQuota))));
+ List<TopicPartition> toBeRemovedPartitions = new ArrayList<>();
// Reassign as many previously owned partitions as possible
for (Map.Entry<String, List<TopicPartition>> consumerEntry :
consumerToOwnedPartitions.entrySet()) {
String consumer = consumerEntry.getKey();
List<TopicPartition> ownedPartitions = consumerEntry.getValue();
List<TopicPartition> consumerAssignment = assignment.get(consumer);
- int i = 0;
- // assign the first N partitions up to the max quota, and mark the
remaining as being revoked
- for (TopicPartition tp : ownedPartitions) {
- if (i < maxQuota) {
- consumerAssignment.add(tp);
- unassignedPartitions.remove(tp);
- } else {
- allRevokedPartitions.add(tp);
- }
- ++i;
- }
if (ownedPartitions.size() < minQuota) {
+ // the expected assignment size is more than consumer have
now, so keep all the owned partitions
+ // and put this member into unfilled member list
+ if (ownedPartitions.size() > 0) {
+ consumerAssignment.addAll(ownedPartitions);
+ toBeRemovedPartitions.addAll(ownedPartitions);
+ }
unfilledMembers.add(consumer);
+ } else if (ownedPartitions.size() >= maxQuota &&
numMaxCapacityMembers++ <= numExpectedMaxCapacityMembers) {
+ // consumer owned the "maxQuota" of partitions or more, and we
still under the number of expected max capacity members
Review comment:
```suggestion
// consumer owned the "maxQuota" of partitions or more, and
we're still under the number of expected max capacity members
```
##########
File path:
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -163,127 +159,179 @@ private boolean allSubscriptionsEqual(Set<String>
allTopics,
*/
private Map<String, List<TopicPartition>> constrainedAssign(Map<String,
Integer> partitionsPerTopic,
Map<String,
List<TopicPartition>> consumerToOwnedPartitions) {
- SortedSet<TopicPartition> unassignedPartitions =
getTopicPartitions(partitionsPerTopic);
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("performing constrained assign.
partitionsPerTopic: %s, consumerToOwnedPartitions: %s",
+ partitionsPerTopic, consumerToOwnedPartitions));
+ }
+
+ List<TopicPartition> sortedAllPartitions =
getTopicPartitions(partitionsPerTopic);
Set<TopicPartition> allRevokedPartitions = new HashSet<>();
- // Each consumer should end up in exactly one of the below
- // the consumers not yet at capacity
+ // the consumers not yet at expected capacity
List<String> unfilledMembers = new LinkedList<>();
- // the members with exactly maxQuota partitions assigned
- Queue<String> maxCapacityMembers = new LinkedList<>();
- // the members with exactly minQuota partitions assigned
- Queue<String> minCapacityMembers = new LinkedList<>();
int numberOfConsumers = consumerToOwnedPartitions.size();
- int minQuota = (int) Math.floor(((double) unassignedPartitions.size())
/ numberOfConsumers);
- int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size())
/ numberOfConsumers);
+ int totalPartitionsCount = sortedAllPartitions.size();
+
+ int minQuota = (int) Math.floor(((double) totalPartitionsCount) /
numberOfConsumers);
+ int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) /
numberOfConsumers);
+ // the expected number of members with maxQuota assignment
+ int numExpectedMaxCapacityMembers = totalPartitionsCount %
numberOfConsumers;
+ // the number of members with exactly maxQuota partitions assigned
+ int numMaxCapacityMembers = 0;
- // initialize the assignment map with an empty array of size minQuota
for all members
+ // initialize the assignment map with an empty array of size maxQuota
for all members
Map<String, List<TopicPartition>> assignment = new HashMap<>(
-
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c
-> new ArrayList<>(minQuota))));
+
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c
-> new ArrayList<>(maxQuota))));
+ List<TopicPartition> toBeRemovedPartitions = new ArrayList<>();
// Reassign as many previously owned partitions as possible
for (Map.Entry<String, List<TopicPartition>> consumerEntry :
consumerToOwnedPartitions.entrySet()) {
String consumer = consumerEntry.getKey();
List<TopicPartition> ownedPartitions = consumerEntry.getValue();
List<TopicPartition> consumerAssignment = assignment.get(consumer);
- int i = 0;
- // assign the first N partitions up to the max quota, and mark the
remaining as being revoked
- for (TopicPartition tp : ownedPartitions) {
- if (i < maxQuota) {
- consumerAssignment.add(tp);
- unassignedPartitions.remove(tp);
- } else {
- allRevokedPartitions.add(tp);
- }
- ++i;
- }
if (ownedPartitions.size() < minQuota) {
+ // the expected assignment size is more than consumer have
now, so keep all the owned partitions
+ // and put this member into unfilled member list
+ if (ownedPartitions.size() > 0) {
+ consumerAssignment.addAll(ownedPartitions);
+ toBeRemovedPartitions.addAll(ownedPartitions);
+ }
unfilledMembers.add(consumer);
+ } else if (ownedPartitions.size() >= maxQuota &&
numMaxCapacityMembers++ <= numExpectedMaxCapacityMembers) {
+ // consumer owned the "maxQuota" of partitions or more, and we
still under the number of expected max capacity members
+ // so keep "maxQuota" of the owned partitions, and revoke the
rest of the partitions
+ consumerAssignment.addAll(ownedPartitions.subList(0,
maxQuota));
+ toBeRemovedPartitions.addAll(ownedPartitions.subList(0,
maxQuota));
+ allRevokedPartitions.addAll(ownedPartitions.subList(maxQuota,
ownedPartitions.size()));
} else {
- // It's possible for a consumer to be at both min and max
capacity if minQuota == maxQuota
- if (consumerAssignment.size() == minQuota)
- minCapacityMembers.add(consumer);
- if (consumerAssignment.size() == maxQuota)
- maxCapacityMembers.add(consumer);
+ // consumer owned the "minQuota" of partitions or more
Review comment:
```suggestion
// consumer owned at least "minQuota" of partitions but
we're already at the allowed number of max capacity members
```
##########
File path:
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -163,127 +159,179 @@ private boolean allSubscriptionsEqual(Set<String>
allTopics,
*/
private Map<String, List<TopicPartition>> constrainedAssign(Map<String,
Integer> partitionsPerTopic,
Map<String,
List<TopicPartition>> consumerToOwnedPartitions) {
- SortedSet<TopicPartition> unassignedPartitions =
getTopicPartitions(partitionsPerTopic);
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("performing constrained assign.
partitionsPerTopic: %s, consumerToOwnedPartitions: %s",
+ partitionsPerTopic, consumerToOwnedPartitions));
+ }
+
+ List<TopicPartition> sortedAllPartitions =
getTopicPartitions(partitionsPerTopic);
Set<TopicPartition> allRevokedPartitions = new HashSet<>();
- // Each consumer should end up in exactly one of the below
- // the consumers not yet at capacity
+ // the consumers not yet at expected capacity
List<String> unfilledMembers = new LinkedList<>();
- // the members with exactly maxQuota partitions assigned
- Queue<String> maxCapacityMembers = new LinkedList<>();
- // the members with exactly minQuota partitions assigned
- Queue<String> minCapacityMembers = new LinkedList<>();
int numberOfConsumers = consumerToOwnedPartitions.size();
- int minQuota = (int) Math.floor(((double) unassignedPartitions.size())
/ numberOfConsumers);
- int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size())
/ numberOfConsumers);
+ int totalPartitionsCount = sortedAllPartitions.size();
+
+ int minQuota = (int) Math.floor(((double) totalPartitionsCount) /
numberOfConsumers);
+ int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) /
numberOfConsumers);
+ // the expected number of members with maxQuota assignment
+ int numExpectedMaxCapacityMembers = totalPartitionsCount %
numberOfConsumers;
+ // the number of members with exactly maxQuota partitions assigned
+ int numMaxCapacityMembers = 0;
- // initialize the assignment map with an empty array of size minQuota
for all members
+ // initialize the assignment map with an empty array of size maxQuota
for all members
Map<String, List<TopicPartition>> assignment = new HashMap<>(
-
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c
-> new ArrayList<>(minQuota))));
+
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c
-> new ArrayList<>(maxQuota))));
+ List<TopicPartition> toBeRemovedPartitions = new ArrayList<>();
// Reassign as many previously owned partitions as possible
for (Map.Entry<String, List<TopicPartition>> consumerEntry :
consumerToOwnedPartitions.entrySet()) {
String consumer = consumerEntry.getKey();
List<TopicPartition> ownedPartitions = consumerEntry.getValue();
List<TopicPartition> consumerAssignment = assignment.get(consumer);
- int i = 0;
- // assign the first N partitions up to the max quota, and mark the
remaining as being revoked
- for (TopicPartition tp : ownedPartitions) {
- if (i < maxQuota) {
- consumerAssignment.add(tp);
- unassignedPartitions.remove(tp);
- } else {
- allRevokedPartitions.add(tp);
- }
- ++i;
- }
if (ownedPartitions.size() < minQuota) {
+ // the expected assignment size is more than consumer have
now, so keep all the owned partitions
+ // and put this member into unfilled member list
+ if (ownedPartitions.size() > 0) {
+ consumerAssignment.addAll(ownedPartitions);
+ toBeRemovedPartitions.addAll(ownedPartitions);
+ }
unfilledMembers.add(consumer);
+ } else if (ownedPartitions.size() >= maxQuota &&
numMaxCapacityMembers++ <= numExpectedMaxCapacityMembers) {
+ // consumer owned the "maxQuota" of partitions or more, and we
still under the number of expected max capacity members
+ // so keep "maxQuota" of the owned partitions, and revoke the
rest of the partitions
+ consumerAssignment.addAll(ownedPartitions.subList(0,
maxQuota));
+ toBeRemovedPartitions.addAll(ownedPartitions.subList(0,
maxQuota));
+ allRevokedPartitions.addAll(ownedPartitions.subList(maxQuota,
ownedPartitions.size()));
} else {
- // It's possible for a consumer to be at both min and max
capacity if minQuota == maxQuota
- if (consumerAssignment.size() == minQuota)
- minCapacityMembers.add(consumer);
- if (consumerAssignment.size() == maxQuota)
- maxCapacityMembers.add(consumer);
+ // consumer owned the "minQuota" of partitions or more
+ // so keep "minQuota" of the owned partitions, and revoke the
rest of the partitions
+ consumerAssignment.addAll(ownedPartitions.subList(0,
minQuota));
+ toBeRemovedPartitions.addAll(ownedPartitions.subList(0,
minQuota));
+ allRevokedPartitions.addAll(ownedPartitions.subList(minQuota,
ownedPartitions.size()));
}
}
+ List<TopicPartition> unassignedPartitions;
+ if (!toBeRemovedPartitions.isEmpty()) {
+ Collections.sort(toBeRemovedPartitions,
+
Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition));
+ unassignedPartitions =
getUnassignedPartitions(sortedAllPartitions, toBeRemovedPartitions);
+ sortedAllPartitions = null;
+ } else {
+ unassignedPartitions = sortedAllPartitions;
+ }
+ toBeRemovedPartitions = null;
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format(
+ "After reassigning previously owned partitions, unfilled
members: %s, unassigned partitions: %s, " +
+ "current assignment: %s", unfilledMembers,
unassignedPartitions, assignment));
+ }
+
Collections.sort(unfilledMembers);
Iterator<TopicPartition> unassignedPartitionsIter =
unassignedPartitions.iterator();
- // Fill remaining members up to minQuota
+ // fill remaining members up to the expected numbers of maxQuota,
otherwise, to minQuota
while (!unfilledMembers.isEmpty() && !unassignedPartitions.isEmpty()) {
Iterator<String> unfilledConsumerIter = unfilledMembers.iterator();
while (unfilledConsumerIter.hasNext()) {
String consumer = unfilledConsumerIter.next();
List<TopicPartition> consumerAssignment =
assignment.get(consumer);
+ int expectedAssignedCount = numMaxCapacityMembers <
numExpectedMaxCapacityMembers ? maxQuota : minQuota;
+ int currentAssignedCount = consumerAssignment.size();
if (unassignedPartitionsIter.hasNext()) {
TopicPartition tp = unassignedPartitionsIter.next();
consumerAssignment.add(tp);
- unassignedPartitionsIter.remove();
+ currentAssignedCount++;
// We already assigned all possible ownedPartitions, so we
know this must be newly to this consumer
if (allRevokedPartitions.contains(tp))
partitionsTransferringOwnership.put(tp, consumer);
} else {
+ // This will only happen when current consumer has
minQuota of partitions, and in previous round,
+ // the expectedAssignedCount is maxQuota, so, still in
unfilledMembers list.
+ // But now, expectedAssignedCount is minQuota, we can
remove it.
+ if (currentAssignedCount != minQuota) {
+ // Should not enter here since we have calculated the
exact number to assign to each consumer
+ log.warn(String.format(
+ "No more partitions to be assigned. consumer: [%s]
with current size: %d, but expected size is %d",
+ consumer, currentAssignedCount,
expectedAssignedCount));
+ }
+ unfilledConsumerIter.remove();
break;
}
- if (consumerAssignment.size() == minQuota) {
- minCapacityMembers.add(consumer);
+ if (currentAssignedCount == expectedAssignedCount) {
+ if (currentAssignedCount == maxQuota) {
+ numMaxCapacityMembers++;
+ }
unfilledConsumerIter.remove();
}
}
}
- // If we ran out of unassigned partitions before filling all
consumers, we need to start stealing partitions
- // from the over-full consumers at max capacity
- for (String consumer : unfilledMembers) {
- List<TopicPartition> consumerAssignment = assignment.get(consumer);
- int remainingCapacity = minQuota - consumerAssignment.size();
- while (remainingCapacity > 0) {
- String overloadedConsumer = maxCapacityMembers.poll();
- if (overloadedConsumer == null) {
- throw new IllegalStateException("Some consumers are under
capacity but all partitions have been assigned");
+ if (log.isDebugEnabled()) {
+ log.debug("final assignment: " + assignment);
Review comment:
```suggestion
log.debug("Final assignment of partitions to consumers: \n{}",
assignment);
```
##########
File path:
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -163,127 +159,179 @@ private boolean allSubscriptionsEqual(Set<String>
allTopics,
*/
private Map<String, List<TopicPartition>> constrainedAssign(Map<String,
Integer> partitionsPerTopic,
Map<String,
List<TopicPartition>> consumerToOwnedPartitions) {
- SortedSet<TopicPartition> unassignedPartitions =
getTopicPartitions(partitionsPerTopic);
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("performing constrained assign.
partitionsPerTopic: %s, consumerToOwnedPartitions: %s",
+ partitionsPerTopic, consumerToOwnedPartitions));
+ }
+
+ List<TopicPartition> sortedAllPartitions =
getTopicPartitions(partitionsPerTopic);
Set<TopicPartition> allRevokedPartitions = new HashSet<>();
- // Each consumer should end up in exactly one of the below
- // the consumers not yet at capacity
+ // the consumers not yet at expected capacity
List<String> unfilledMembers = new LinkedList<>();
- // the members with exactly maxQuota partitions assigned
- Queue<String> maxCapacityMembers = new LinkedList<>();
- // the members with exactly minQuota partitions assigned
- Queue<String> minCapacityMembers = new LinkedList<>();
int numberOfConsumers = consumerToOwnedPartitions.size();
- int minQuota = (int) Math.floor(((double) unassignedPartitions.size())
/ numberOfConsumers);
- int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size())
/ numberOfConsumers);
+ int totalPartitionsCount = sortedAllPartitions.size();
+
+ int minQuota = (int) Math.floor(((double) totalPartitionsCount) /
numberOfConsumers);
+ int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) /
numberOfConsumers);
+ // the expected number of members with maxQuota assignment
+ int numExpectedMaxCapacityMembers = totalPartitionsCount %
numberOfConsumers;
+ // the number of members with exactly maxQuota partitions assigned
+ int numMaxCapacityMembers = 0;
- // initialize the assignment map with an empty array of size minQuota
for all members
+ // initialize the assignment map with an empty array of size maxQuota
for all members
Map<String, List<TopicPartition>> assignment = new HashMap<>(
-
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c
-> new ArrayList<>(minQuota))));
+
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c
-> new ArrayList<>(maxQuota))));
+ List<TopicPartition> toBeRemovedPartitions = new ArrayList<>();
Review comment:
Can we pick a better name for this? I had trouble understanding what
this list was for, but after reading all the code it sounds like it's actually
just the partitions which have been assigned. And thus are "to be removed" from
the list of partitions to be assigned -- is that where the name comes from? I
do see the logic behind that, but it wasn't clear until I'd carefully read
everything. Maybe just `assignedPartitions` would be clearer? 🤷♀️
##########
File path:
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -263,16 +279,59 @@ private boolean allSubscriptionsEqual(Set<String>
allTopics,
if (log.isDebugEnabled()) {
log.debug("final assignment: " + assignment);
}
-
+
return assignment;
}
- private SortedSet<TopicPartition> getTopicPartitions(Map<String, Integer>
partitionsPerTopic) {
- SortedSet<TopicPartition> allPartitions =
- new
TreeSet<>(Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition));
- for (Entry<String, Integer> entry: partitionsPerTopic.entrySet()) {
- String topic = entry.getKey();
- for (int i = 0; i < entry.getValue(); ++i) {
+ /**
+ * get the unassigned partition list by computing the difference set of
the sortedPartitions(all partitions)
+ * and sortedToBeRemovedPartitions. We use two pointers technique here:
+ *
+ * We loop the sortedPartition, and compare the ith element in sorted
toBeRemovedPartitions(i start from 0):
+ * - if not equal to the ith element, add to unassignedPartitions
+ * - if equal to the the ith element, get next element from
sortedToBeRemovedPartitions
+ *
+ * @param sortedPartitions: sorted all partitions
+ * @param sortedToBeRemovedPartitions: sorted partitions, all are included
in the sortedPartitions
+ * @return the partitions don't assign to any current consumers
+ */
+ private List<TopicPartition> getUnassignedPartitions(List<TopicPartition>
sortedPartitions,
+ List<TopicPartition>
sortedToBeRemovedPartitions) {
+ List<TopicPartition> unassignedPartitions = new ArrayList<>(
+ sortedPartitions.size() - sortedToBeRemovedPartitions.size());
+
+ int index = 0;
+ boolean shouldAddDirectly = false;
+ int sizeToBeRemovedPartitions = sortedToBeRemovedPartitions.size();
+ TopicPartition nextPartition = sortedToBeRemovedPartitions.get(index);
+ for (TopicPartition topicPartition : sortedPartitions) {
+ if (shouldAddDirectly || !nextPartition.equals(topicPartition)) {
+ unassignedPartitions.add(topicPartition);
+ } else {
+ // equal case, don't add to unassignedPartitions, just get
next partition
+ if (index < sizeToBeRemovedPartitions - 1) {
+ nextPartition = sortedToBeRemovedPartitions.get(++index);
+ } else {
+ // add the remaining directly since there is no more
toBeRemovedPartitions
+ shouldAddDirectly = true;
+ }
+ }
+ }
+ return unassignedPartitions;
+ }
+
+
+ private List<TopicPartition> getTopicPartitions(Map<String, Integer>
partitionsPerTopic) {
+ List<TopicPartition> allPartitions = new ArrayList<>(
+ partitionsPerTopic.values().stream().reduce(0, Integer::sum));
+
+ List<String> allTopics = new ArrayList<>(partitionsPerTopic.keySet());
+ // sort all topics first, then we can have sorted all topic partitions
by adding partitions starting from 0
+ Collections.sort(allTopics);
Review comment:
I did the math and it seems to come down to roughly O(N*logN) vs
O(2*logN), which for N = 1 million is a roughly 10x improvement. Not bad, of
course it is a tradeoff and there are other factors as mentioned above. But
still very nice 👍
##########
File path:
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -263,16 +279,59 @@ private boolean allSubscriptionsEqual(Set<String>
allTopics,
if (log.isDebugEnabled()) {
log.debug("final assignment: " + assignment);
}
-
+
return assignment;
}
- private SortedSet<TopicPartition> getTopicPartitions(Map<String, Integer>
partitionsPerTopic) {
- SortedSet<TopicPartition> allPartitions =
- new
TreeSet<>(Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition));
- for (Entry<String, Integer> entry: partitionsPerTopic.entrySet()) {
- String topic = entry.getKey();
- for (int i = 0; i < entry.getValue(); ++i) {
+ /**
+ * get the unassigned partition list by computing the difference set of
the sortedPartitions(all partitions)
+ * and sortedToBeRemovedPartitions. We use two pointers technique here:
+ *
+ * We loop the sortedPartition, and compare the ith element in sorted
toBeRemovedPartitions(i start from 0):
+ * - if not equal to the ith element, add to unassignedPartitions
+ * - if equal to the the ith element, get next element from
sortedToBeRemovedPartitions
+ *
+ * @param sortedPartitions: sorted all partitions
+ * @param sortedToBeRemovedPartitions: sorted partitions, all are included
in the sortedPartitions
+ * @return the partitions don't assign to any current consumers
+ */
+ private List<TopicPartition> getUnassignedPartitions(List<TopicPartition>
sortedPartitions,
Review comment:
Just to be clear, a `removeAll(M partitions)` operation on TreeSet
should still be only O(M*logN) since each individual remove is only logN. Even
for N = 1 million, logN is under 20. So it scales more with how many partitions
are being removed.
I tried to do the math here and found the time complexity of the original to
be (slightly) better on paper, but maybe I missed something or the reality is
just different for certain input parameters (bigO time is not an absolute law
after all 🙂 ) Maybe you can check my work and then run some tests with and
without this specific change (but with all other improvements included).
Let's say the number of consumers is C, the number of partitions assigned to
each consumer is M, and the total number of partitions is N
Before:
Loop through all consumers and call (TreeSet)
unassignedPartitions.removeAll(assignedPartitions). This is `C * M * logN`
(where N will actually decrease down to ~0 by the end of the loop since as you
pointed out, most partitions should be reassigned in the _sticky_ assignor)
After:
Loop through all consumers and call (ArrayList)
toBeRemovedPartitions.addAll(assignedPartitions). Since addAll has to copy all
M elements, this is C * M. After that we call sort(toBeRemovedPartitions),
where toBeRemovedPartitions = C * M, so this is C * M * log(C * M). And then
finally we call getUnassignedPartitions, which is O(N). So putting it
altogether we have C * M + C * M * log(C*M) + N
It's a little hard to compare these directly but in general we'll have C * M
≈ N (since most partitions are reassigned) in which case the before vs after
runtimes can be reduced to O(N<sup>2</sup>log(N)) vs O(2N +
N<sup>2</sup>log(N)). The before case is actually better, although they're
roughly the same for large N. If this analysis holds up in your experiments
then we should just go with whichever one uses less memory and/or has the
simplest code. In my (admittedly biased) opinion the original method was the
easiest to understand in the code, and objectively speaking it also used less
memory since we only need the one `unassignedPartitions` data structure. ie we
can just rename `sortedAllPartitions` back to `unassignedPartitions` (or maybe
partitionsToBeAssigned is better?) and then we can delete both the
`partitionsToBeRemoved` and the second `unassignedPartitions`. Thoughts?
##########
File path:
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -263,16 +279,59 @@ private boolean allSubscriptionsEqual(Set<String>
allTopics,
if (log.isDebugEnabled()) {
log.debug("final assignment: " + assignment);
}
-
+
return assignment;
}
- private SortedSet<TopicPartition> getTopicPartitions(Map<String, Integer>
partitionsPerTopic) {
- SortedSet<TopicPartition> allPartitions =
- new
TreeSet<>(Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition));
- for (Entry<String, Integer> entry: partitionsPerTopic.entrySet()) {
- String topic = entry.getKey();
- for (int i = 0; i < entry.getValue(); ++i) {
+ /**
+ * get the unassigned partition list by computing the difference set of
the sortedPartitions(all partitions)
+ * and sortedToBeRemovedPartitions. We use two pointers technique here:
+ *
+ * We loop the sortedPartition, and compare the ith element in sorted
toBeRemovedPartitions(i start from 0):
+ * - if not equal to the ith element, add to unassignedPartitions
+ * - if equal to the the ith element, get next element from
sortedToBeRemovedPartitions
+ *
+ * @param sortedPartitions: sorted all partitions
+ * @param sortedToBeRemovedPartitions: sorted partitions, all are included
in the sortedPartitions
+ * @return the partitions don't assign to any current consumers
+ */
+ private List<TopicPartition> getUnassignedPartitions(List<TopicPartition>
sortedPartitions,
Review comment:
Of course we also need to take into account that going back to the
TreeSet will mean we lose the improvement in `getTopicPartitions` as well. I
did the math there and found it came down to O(NlogN) vs O(2N). And since logN
grows so quickly this is like a 10x improvement for 1 million partitions.
Adding this all up, the new algorithm does come out _slightly_ on top now:
Before we had O(NlogN + N<sup>2</sup>log(N)) and now we have O(4N + NlogN +
N<sup>2</sup>log(N)). Taking logN ≈ 20 means a 5x improvement for the part that
scales as N.
-- which is great, obviously, but the time for `getTopicPartitions` is
negligible compared to the time complexity of this loop we discussed above, ie
the N<sup>2</sup>term is going to dominate the N term. So we have a tradeoff
here between an unknown but possibly small performance improvement that uses a
lot more memory, and a somewhat worse algorithm with only the one data
structure and slightly cleaner code.
That's a hard tradeoff to comment on without more data. If you could re-run
your experiments with all other improvements implemented but without these two
things (ie the use of `toBeRemovedPartitions instead of dynamically removing
from `unassignedPartitions`, and going back to TreeSet in `getTopicPartitions`)
then we can see what the actual performance characteristics are and also get a
sense of how much extra memory we're talking. I'd recommend first addressing
all my other comments here and re-running with/without this stuff in case those
other comments have an impact, so we can be fair to both sides.
Anyways that's my analysis here, but you've been looking at this more
carefully, more recently so it's possible I'm missing something here.
Interested to hear your take on this
##########
File path:
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -108,6 +107,8 @@ private boolean allSubscriptionsEqual(Set<String> allTopics,
subscribedTopics.addAll(subscription.topics());
} else if (!(subscription.topics().size() ==
subscribedTopics.size()
&& subscribedTopics.containsAll(subscription.topics()))) {
+ // we don't need consumerToOwnedPartitions in general assign
case
+ consumerToOwnedPartitions = null;
Review comment:
nit: can we set it to null up in `#assign`, before invoking
`generalAssign`? I feel that's slightly more future-proof, as it's easy to miss
that this gets cleared when it occurs deep in this boolean check method, in
case someone might decide they want to use this map in `generalAssign`.
Which could happen since it does build up basically this exact same
information in a later loop -- an alternative to nullifying this map we could
just pass it in to `generalAssign` to replace the `currentAssignment` map that
gets filled in via `prepopulateCurrentAssignments`. That won't save us from
looping through all the assignments entirely since we also need to populate
`prevAssignments` but will still save some time by cutting out the filling in
of `currentAssignment`
##########
File path:
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -163,127 +159,179 @@ private boolean allSubscriptionsEqual(Set<String>
allTopics,
*/
private Map<String, List<TopicPartition>> constrainedAssign(Map<String,
Integer> partitionsPerTopic,
Map<String,
List<TopicPartition>> consumerToOwnedPartitions) {
- SortedSet<TopicPartition> unassignedPartitions =
getTopicPartitions(partitionsPerTopic);
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("performing constrained assign.
partitionsPerTopic: %s, consumerToOwnedPartitions: %s",
+ partitionsPerTopic, consumerToOwnedPartitions));
+ }
+
+ List<TopicPartition> sortedAllPartitions =
getTopicPartitions(partitionsPerTopic);
Set<TopicPartition> allRevokedPartitions = new HashSet<>();
- // Each consumer should end up in exactly one of the below
- // the consumers not yet at capacity
+ // the consumers not yet at expected capacity
List<String> unfilledMembers = new LinkedList<>();
- // the members with exactly maxQuota partitions assigned
- Queue<String> maxCapacityMembers = new LinkedList<>();
- // the members with exactly minQuota partitions assigned
- Queue<String> minCapacityMembers = new LinkedList<>();
int numberOfConsumers = consumerToOwnedPartitions.size();
- int minQuota = (int) Math.floor(((double) unassignedPartitions.size())
/ numberOfConsumers);
- int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size())
/ numberOfConsumers);
+ int totalPartitionsCount = sortedAllPartitions.size();
+
+ int minQuota = (int) Math.floor(((double) totalPartitionsCount) /
numberOfConsumers);
+ int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) /
numberOfConsumers);
+ // the expected number of members with maxQuota assignment
+ int numExpectedMaxCapacityMembers = totalPartitionsCount %
numberOfConsumers;
+ // the number of members with exactly maxQuota partitions assigned
+ int numMaxCapacityMembers = 0;
- // initialize the assignment map with an empty array of size minQuota
for all members
+ // initialize the assignment map with an empty array of size maxQuota
for all members
Map<String, List<TopicPartition>> assignment = new HashMap<>(
-
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c
-> new ArrayList<>(minQuota))));
+
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c
-> new ArrayList<>(maxQuota))));
+ List<TopicPartition> toBeRemovedPartitions = new ArrayList<>();
// Reassign as many previously owned partitions as possible
for (Map.Entry<String, List<TopicPartition>> consumerEntry :
consumerToOwnedPartitions.entrySet()) {
String consumer = consumerEntry.getKey();
List<TopicPartition> ownedPartitions = consumerEntry.getValue();
List<TopicPartition> consumerAssignment = assignment.get(consumer);
- int i = 0;
- // assign the first N partitions up to the max quota, and mark the
remaining as being revoked
- for (TopicPartition tp : ownedPartitions) {
- if (i < maxQuota) {
- consumerAssignment.add(tp);
- unassignedPartitions.remove(tp);
- } else {
- allRevokedPartitions.add(tp);
- }
- ++i;
- }
if (ownedPartitions.size() < minQuota) {
+ // the expected assignment size is more than consumer have
now, so keep all the owned partitions
+ // and put this member into unfilled member list
+ if (ownedPartitions.size() > 0) {
+ consumerAssignment.addAll(ownedPartitions);
+ toBeRemovedPartitions.addAll(ownedPartitions);
+ }
unfilledMembers.add(consumer);
+ } else if (ownedPartitions.size() >= maxQuota &&
numMaxCapacityMembers++ <= numExpectedMaxCapacityMembers) {
+ // consumer owned the "maxQuota" of partitions or more, and we
still under the number of expected max capacity members
+ // so keep "maxQuota" of the owned partitions, and revoke the
rest of the partitions
+ consumerAssignment.addAll(ownedPartitions.subList(0,
maxQuota));
+ toBeRemovedPartitions.addAll(ownedPartitions.subList(0,
maxQuota));
+ allRevokedPartitions.addAll(ownedPartitions.subList(maxQuota,
ownedPartitions.size()));
} else {
- // It's possible for a consumer to be at both min and max
capacity if minQuota == maxQuota
- if (consumerAssignment.size() == minQuota)
- minCapacityMembers.add(consumer);
- if (consumerAssignment.size() == maxQuota)
- maxCapacityMembers.add(consumer);
+ // consumer owned the "minQuota" of partitions or more
+ // so keep "minQuota" of the owned partitions, and revoke the
rest of the partitions
+ consumerAssignment.addAll(ownedPartitions.subList(0,
minQuota));
+ toBeRemovedPartitions.addAll(ownedPartitions.subList(0,
minQuota));
Review comment:
Same as above, let's assign `ownedPartitions.subList(0, minQuota)` to a
variable
##########
File path:
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -163,127 +159,179 @@ private boolean allSubscriptionsEqual(Set<String>
allTopics,
*/
private Map<String, List<TopicPartition>> constrainedAssign(Map<String,
Integer> partitionsPerTopic,
Map<String,
List<TopicPartition>> consumerToOwnedPartitions) {
- SortedSet<TopicPartition> unassignedPartitions =
getTopicPartitions(partitionsPerTopic);
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("performing constrained assign.
partitionsPerTopic: %s, consumerToOwnedPartitions: %s",
+ partitionsPerTopic, consumerToOwnedPartitions));
+ }
+
+ List<TopicPartition> sortedAllPartitions =
getTopicPartitions(partitionsPerTopic);
Set<TopicPartition> allRevokedPartitions = new HashSet<>();
- // Each consumer should end up in exactly one of the below
- // the consumers not yet at capacity
+ // the consumers not yet at expected capacity
List<String> unfilledMembers = new LinkedList<>();
- // the members with exactly maxQuota partitions assigned
- Queue<String> maxCapacityMembers = new LinkedList<>();
- // the members with exactly minQuota partitions assigned
- Queue<String> minCapacityMembers = new LinkedList<>();
int numberOfConsumers = consumerToOwnedPartitions.size();
- int minQuota = (int) Math.floor(((double) unassignedPartitions.size())
/ numberOfConsumers);
- int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size())
/ numberOfConsumers);
+ int totalPartitionsCount = sortedAllPartitions.size();
+
+ int minQuota = (int) Math.floor(((double) totalPartitionsCount) /
numberOfConsumers);
+ int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) /
numberOfConsumers);
+ // the expected number of members with maxQuota assignment
+ int numExpectedMaxCapacityMembers = totalPartitionsCount %
numberOfConsumers;
+ // the number of members with exactly maxQuota partitions assigned
+ int numMaxCapacityMembers = 0;
- // initialize the assignment map with an empty array of size minQuota
for all members
+ // initialize the assignment map with an empty array of size maxQuota
for all members
Map<String, List<TopicPartition>> assignment = new HashMap<>(
-
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c
-> new ArrayList<>(minQuota))));
+
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c
-> new ArrayList<>(maxQuota))));
+ List<TopicPartition> toBeRemovedPartitions = new ArrayList<>();
// Reassign as many previously owned partitions as possible
for (Map.Entry<String, List<TopicPartition>> consumerEntry :
consumerToOwnedPartitions.entrySet()) {
String consumer = consumerEntry.getKey();
List<TopicPartition> ownedPartitions = consumerEntry.getValue();
List<TopicPartition> consumerAssignment = assignment.get(consumer);
- int i = 0;
- // assign the first N partitions up to the max quota, and mark the
remaining as being revoked
- for (TopicPartition tp : ownedPartitions) {
- if (i < maxQuota) {
- consumerAssignment.add(tp);
- unassignedPartitions.remove(tp);
- } else {
- allRevokedPartitions.add(tp);
- }
- ++i;
- }
if (ownedPartitions.size() < minQuota) {
+ // the expected assignment size is more than consumer have
now, so keep all the owned partitions
+ // and put this member into unfilled member list
+ if (ownedPartitions.size() > 0) {
+ consumerAssignment.addAll(ownedPartitions);
+ toBeRemovedPartitions.addAll(ownedPartitions);
+ }
unfilledMembers.add(consumer);
+ } else if (ownedPartitions.size() >= maxQuota &&
numMaxCapacityMembers++ <= numExpectedMaxCapacityMembers) {
Review comment:
Also, should it be `numMaxCapacityMembers <
numExpectedMaxCapacityMember`? If `numMaxCapacityMembers` is already equal to
the expected number then that means we can't give this member a `maxQuota`
number of partitions. I'm guessing this is a typo since you yourself say "we're
still _under_ the number of expected max capacity members.." in the comment
below. Or maybe you made it `<=` because of the `++`, but that's a
post-increment which means the value is only incremented _after_ the variable
is used in the comparison, so you'd only need `<=` to be correct if it was
`++numMaxCapacityMembers` instead. This post/-pre-increment stuff is confusing,
that's exactly why I suggested to not do this inline but just increment it in
the body 🙂
##########
File path:
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -234,23 +245,28 @@ private boolean allSubscriptionsEqual(Set<String>
allTopics,
List<TopicPartition> consumerAssignment =
assignment.get(consumer);
int expectedAssignedCount = numMaxCapacityMembers <
numExpectedMaxCapacityMembers ? maxQuota : minQuota;
+ int currentAssignedCount = consumerAssignment.size();
if (unassignedPartitionsIter.hasNext()) {
TopicPartition tp = unassignedPartitionsIter.next();
consumerAssignment.add(tp);
- unassignedPartitionsIter.remove();
Review comment:
Well, `unassignedPartitions` used to be a TreeSet so removal should
still be relatively fast (`O(logN)`). But I see you turned this into an
ArrayList which certainly would be slow. That's a good observation though,
since we have computed the exact capacity then we can know the remaining number
of unfilled members exactly and thus we don't need to remove from this list. I
think we can then simplify things even further, see my comment above
##########
File path:
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -163,127 +159,179 @@ private boolean allSubscriptionsEqual(Set<String>
allTopics,
*/
private Map<String, List<TopicPartition>> constrainedAssign(Map<String,
Integer> partitionsPerTopic,
Map<String,
List<TopicPartition>> consumerToOwnedPartitions) {
- SortedSet<TopicPartition> unassignedPartitions =
getTopicPartitions(partitionsPerTopic);
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("performing constrained assign.
partitionsPerTopic: %s, consumerToOwnedPartitions: %s",
+ partitionsPerTopic, consumerToOwnedPartitions));
+ }
+
+ List<TopicPartition> sortedAllPartitions =
getTopicPartitions(partitionsPerTopic);
Set<TopicPartition> allRevokedPartitions = new HashSet<>();
- // Each consumer should end up in exactly one of the below
- // the consumers not yet at capacity
+ // the consumers not yet at expected capacity
List<String> unfilledMembers = new LinkedList<>();
- // the members with exactly maxQuota partitions assigned
- Queue<String> maxCapacityMembers = new LinkedList<>();
- // the members with exactly minQuota partitions assigned
- Queue<String> minCapacityMembers = new LinkedList<>();
int numberOfConsumers = consumerToOwnedPartitions.size();
- int minQuota = (int) Math.floor(((double) unassignedPartitions.size())
/ numberOfConsumers);
- int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size())
/ numberOfConsumers);
+ int totalPartitionsCount = sortedAllPartitions.size();
+
+ int minQuota = (int) Math.floor(((double) totalPartitionsCount) /
numberOfConsumers);
+ int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) /
numberOfConsumers);
+ // the expected number of members with maxQuota assignment
+ int numExpectedMaxCapacityMembers = totalPartitionsCount %
numberOfConsumers;
+ // the number of members with exactly maxQuota partitions assigned
+ int numMaxCapacityMembers = 0;
- // initialize the assignment map with an empty array of size minQuota
for all members
+ // initialize the assignment map with an empty array of size maxQuota
for all members
Map<String, List<TopicPartition>> assignment = new HashMap<>(
-
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c
-> new ArrayList<>(minQuota))));
+
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c
-> new ArrayList<>(maxQuota))));
+ List<TopicPartition> toBeRemovedPartitions = new ArrayList<>();
// Reassign as many previously owned partitions as possible
for (Map.Entry<String, List<TopicPartition>> consumerEntry :
consumerToOwnedPartitions.entrySet()) {
String consumer = consumerEntry.getKey();
List<TopicPartition> ownedPartitions = consumerEntry.getValue();
List<TopicPartition> consumerAssignment = assignment.get(consumer);
- int i = 0;
- // assign the first N partitions up to the max quota, and mark the
remaining as being revoked
- for (TopicPartition tp : ownedPartitions) {
- if (i < maxQuota) {
- consumerAssignment.add(tp);
- unassignedPartitions.remove(tp);
- } else {
- allRevokedPartitions.add(tp);
- }
- ++i;
- }
if (ownedPartitions.size() < minQuota) {
+ // the expected assignment size is more than consumer have
now, so keep all the owned partitions
+ // and put this member into unfilled member list
+ if (ownedPartitions.size() > 0) {
+ consumerAssignment.addAll(ownedPartitions);
+ toBeRemovedPartitions.addAll(ownedPartitions);
+ }
unfilledMembers.add(consumer);
+ } else if (ownedPartitions.size() >= maxQuota &&
numMaxCapacityMembers++ <= numExpectedMaxCapacityMembers) {
+ // consumer owned the "maxQuota" of partitions or more, and we
still under the number of expected max capacity members
+ // so keep "maxQuota" of the owned partitions, and revoke the
rest of the partitions
+ consumerAssignment.addAll(ownedPartitions.subList(0,
maxQuota));
+ toBeRemovedPartitions.addAll(ownedPartitions.subList(0,
maxQuota));
+ allRevokedPartitions.addAll(ownedPartitions.subList(maxQuota,
ownedPartitions.size()));
} else {
- // It's possible for a consumer to be at both min and max
capacity if minQuota == maxQuota
- if (consumerAssignment.size() == minQuota)
- minCapacityMembers.add(consumer);
- if (consumerAssignment.size() == maxQuota)
- maxCapacityMembers.add(consumer);
+ // consumer owned the "minQuota" of partitions or more
+ // so keep "minQuota" of the owned partitions, and revoke the
rest of the partitions
+ consumerAssignment.addAll(ownedPartitions.subList(0,
minQuota));
+ toBeRemovedPartitions.addAll(ownedPartitions.subList(0,
minQuota));
+ allRevokedPartitions.addAll(ownedPartitions.subList(minQuota,
ownedPartitions.size()));
}
}
+ List<TopicPartition> unassignedPartitions;
+ if (!toBeRemovedPartitions.isEmpty()) {
+ Collections.sort(toBeRemovedPartitions,
+
Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition));
+ unassignedPartitions =
getUnassignedPartitions(sortedAllPartitions, toBeRemovedPartitions);
+ sortedAllPartitions = null;
+ } else {
+ unassignedPartitions = sortedAllPartitions;
+ }
+ toBeRemovedPartitions = null;
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format(
+ "After reassigning previously owned partitions, unfilled
members: %s, unassigned partitions: %s, " +
+ "current assignment: %s", unfilledMembers,
unassignedPartitions, assignment));
+ }
+
Collections.sort(unfilledMembers);
Iterator<TopicPartition> unassignedPartitionsIter =
unassignedPartitions.iterator();
- // Fill remaining members up to minQuota
+ // fill remaining members up to the expected numbers of maxQuota,
otherwise, to minQuota
while (!unfilledMembers.isEmpty() && !unassignedPartitions.isEmpty()) {
Review comment:
Now that we're not removing from `unassignedPartitions` there's no
reason to include it here. But I'd actually go a step further and change this
to `for (TopicPartition unassignedPartition : unassignedPartitions)` -- that
way we don't have to worry about breaking as we should only need to loop over
to assign each of these partitions once, and we have one less Iterator to deal
with
Then we can check and log an error (or even throw an exception) if there
are still consumers in `unfilledMembers` at the end, or of course if we run
out of `unfilledMembers` before exiting the for loop
##########
File path:
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -163,127 +159,179 @@ private boolean allSubscriptionsEqual(Set<String>
allTopics,
*/
private Map<String, List<TopicPartition>> constrainedAssign(Map<String,
Integer> partitionsPerTopic,
Map<String,
List<TopicPartition>> consumerToOwnedPartitions) {
- SortedSet<TopicPartition> unassignedPartitions =
getTopicPartitions(partitionsPerTopic);
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("performing constrained assign.
partitionsPerTopic: %s, consumerToOwnedPartitions: %s",
+ partitionsPerTopic, consumerToOwnedPartitions));
+ }
+
+ List<TopicPartition> sortedAllPartitions =
getTopicPartitions(partitionsPerTopic);
Set<TopicPartition> allRevokedPartitions = new HashSet<>();
- // Each consumer should end up in exactly one of the below
- // the consumers not yet at capacity
+ // the consumers not yet at expected capacity
List<String> unfilledMembers = new LinkedList<>();
- // the members with exactly maxQuota partitions assigned
- Queue<String> maxCapacityMembers = new LinkedList<>();
- // the members with exactly minQuota partitions assigned
- Queue<String> minCapacityMembers = new LinkedList<>();
int numberOfConsumers = consumerToOwnedPartitions.size();
- int minQuota = (int) Math.floor(((double) unassignedPartitions.size())
/ numberOfConsumers);
- int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size())
/ numberOfConsumers);
+ int totalPartitionsCount = sortedAllPartitions.size();
+
+ int minQuota = (int) Math.floor(((double) totalPartitionsCount) /
numberOfConsumers);
+ int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) /
numberOfConsumers);
+ // the expected number of members with maxQuota assignment
+ int numExpectedMaxCapacityMembers = totalPartitionsCount %
numberOfConsumers;
+ // the number of members with exactly maxQuota partitions assigned
+ int numMaxCapacityMembers = 0;
- // initialize the assignment map with an empty array of size minQuota
for all members
+ // initialize the assignment map with an empty array of size maxQuota
for all members
Map<String, List<TopicPartition>> assignment = new HashMap<>(
-
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c
-> new ArrayList<>(minQuota))));
+
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c
-> new ArrayList<>(maxQuota))));
+ List<TopicPartition> toBeRemovedPartitions = new ArrayList<>();
// Reassign as many previously owned partitions as possible
for (Map.Entry<String, List<TopicPartition>> consumerEntry :
consumerToOwnedPartitions.entrySet()) {
String consumer = consumerEntry.getKey();
List<TopicPartition> ownedPartitions = consumerEntry.getValue();
List<TopicPartition> consumerAssignment = assignment.get(consumer);
- int i = 0;
- // assign the first N partitions up to the max quota, and mark the
remaining as being revoked
- for (TopicPartition tp : ownedPartitions) {
- if (i < maxQuota) {
- consumerAssignment.add(tp);
- unassignedPartitions.remove(tp);
- } else {
- allRevokedPartitions.add(tp);
- }
- ++i;
- }
if (ownedPartitions.size() < minQuota) {
+ // the expected assignment size is more than consumer have
now, so keep all the owned partitions
+ // and put this member into unfilled member list
+ if (ownedPartitions.size() > 0) {
+ consumerAssignment.addAll(ownedPartitions);
+ toBeRemovedPartitions.addAll(ownedPartitions);
+ }
unfilledMembers.add(consumer);
+ } else if (ownedPartitions.size() >= maxQuota &&
numMaxCapacityMembers++ <= numExpectedMaxCapacityMembers) {
+ // consumer owned the "maxQuota" of partitions or more, and we
still under the number of expected max capacity members
+ // so keep "maxQuota" of the owned partitions, and revoke the
rest of the partitions
+ consumerAssignment.addAll(ownedPartitions.subList(0,
maxQuota));
+ toBeRemovedPartitions.addAll(ownedPartitions.subList(0,
maxQuota));
Review comment:
I'm not sure how much of a difference this will make, but since we're
using the `ownedPartitions.subList(0, maxQuota)` twice maybe we should assign
it to a variable rather than compute it twice
##########
File path:
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -163,127 +159,179 @@ private boolean allSubscriptionsEqual(Set<String>
allTopics,
*/
private Map<String, List<TopicPartition>> constrainedAssign(Map<String,
Integer> partitionsPerTopic,
Map<String,
List<TopicPartition>> consumerToOwnedPartitions) {
- SortedSet<TopicPartition> unassignedPartitions =
getTopicPartitions(partitionsPerTopic);
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("performing constrained assign.
partitionsPerTopic: %s, consumerToOwnedPartitions: %s",
+ partitionsPerTopic, consumerToOwnedPartitions));
+ }
+
+ List<TopicPartition> sortedAllPartitions =
getTopicPartitions(partitionsPerTopic);
Set<TopicPartition> allRevokedPartitions = new HashSet<>();
- // Each consumer should end up in exactly one of the below
- // the consumers not yet at capacity
+ // the consumers not yet at expected capacity
List<String> unfilledMembers = new LinkedList<>();
- // the members with exactly maxQuota partitions assigned
- Queue<String> maxCapacityMembers = new LinkedList<>();
- // the members with exactly minQuota partitions assigned
- Queue<String> minCapacityMembers = new LinkedList<>();
int numberOfConsumers = consumerToOwnedPartitions.size();
- int minQuota = (int) Math.floor(((double) unassignedPartitions.size())
/ numberOfConsumers);
- int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size())
/ numberOfConsumers);
+ int totalPartitionsCount = sortedAllPartitions.size();
+
+ int minQuota = (int) Math.floor(((double) totalPartitionsCount) /
numberOfConsumers);
+ int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) /
numberOfConsumers);
+ // the expected number of members with maxQuota assignment
+ int numExpectedMaxCapacityMembers = totalPartitionsCount %
numberOfConsumers;
+ // the number of members with exactly maxQuota partitions assigned
+ int numMaxCapacityMembers = 0;
- // initialize the assignment map with an empty array of size minQuota
for all members
+ // initialize the assignment map with an empty array of size maxQuota
for all members
Map<String, List<TopicPartition>> assignment = new HashMap<>(
-
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c
-> new ArrayList<>(minQuota))));
+
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c
-> new ArrayList<>(maxQuota))));
+ List<TopicPartition> toBeRemovedPartitions = new ArrayList<>();
// Reassign as many previously owned partitions as possible
for (Map.Entry<String, List<TopicPartition>> consumerEntry :
consumerToOwnedPartitions.entrySet()) {
String consumer = consumerEntry.getKey();
List<TopicPartition> ownedPartitions = consumerEntry.getValue();
List<TopicPartition> consumerAssignment = assignment.get(consumer);
- int i = 0;
- // assign the first N partitions up to the max quota, and mark the
remaining as being revoked
- for (TopicPartition tp : ownedPartitions) {
- if (i < maxQuota) {
- consumerAssignment.add(tp);
- unassignedPartitions.remove(tp);
- } else {
- allRevokedPartitions.add(tp);
- }
- ++i;
- }
if (ownedPartitions.size() < minQuota) {
+ // the expected assignment size is more than consumer have
now, so keep all the owned partitions
+ // and put this member into unfilled member list
+ if (ownedPartitions.size() > 0) {
+ consumerAssignment.addAll(ownedPartitions);
+ toBeRemovedPartitions.addAll(ownedPartitions);
+ }
unfilledMembers.add(consumer);
+ } else if (ownedPartitions.size() >= maxQuota &&
numMaxCapacityMembers++ <= numExpectedMaxCapacityMembers) {
+ // consumer owned the "maxQuota" of partitions or more, and we
still under the number of expected max capacity members
+ // so keep "maxQuota" of the owned partitions, and revoke the
rest of the partitions
+ consumerAssignment.addAll(ownedPartitions.subList(0,
maxQuota));
+ toBeRemovedPartitions.addAll(ownedPartitions.subList(0,
maxQuota));
+ allRevokedPartitions.addAll(ownedPartitions.subList(maxQuota,
ownedPartitions.size()));
} else {
- // It's possible for a consumer to be at both min and max
capacity if minQuota == maxQuota
- if (consumerAssignment.size() == minQuota)
- minCapacityMembers.add(consumer);
- if (consumerAssignment.size() == maxQuota)
- maxCapacityMembers.add(consumer);
+ // consumer owned the "minQuota" of partitions or more
+ // so keep "minQuota" of the owned partitions, and revoke the
rest of the partitions
+ consumerAssignment.addAll(ownedPartitions.subList(0,
minQuota));
+ toBeRemovedPartitions.addAll(ownedPartitions.subList(0,
minQuota));
+ allRevokedPartitions.addAll(ownedPartitions.subList(minQuota,
ownedPartitions.size()));
}
}
+ List<TopicPartition> unassignedPartitions;
+ if (!toBeRemovedPartitions.isEmpty()) {
+ Collections.sort(toBeRemovedPartitions,
+
Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition));
+ unassignedPartitions =
getUnassignedPartitions(sortedAllPartitions, toBeRemovedPartitions);
+ sortedAllPartitions = null;
Review comment:
`sortedAllPartitions` can be cleared for both cases I think, like
`toBeRemovedPartitions`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]