vahidhashemian commented on a change in pull request #10552:
URL: https://github.com/apache/kafka/pull/10552#discussion_r638437845
##########
File path:
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -637,19 +709,27 @@ private void assignPartition(TopicPartition partition,
}
}
- private boolean canParticipateInReassignment(TopicPartition partition,
- Map<TopicPartition,
List<String>> partition2AllPotentialConsumers) {
+ private boolean canParticipateInReassignment(String topic,
+ Map<String, List<String>>
topic2AllPotentialConsumers) {
// if a partition has two or more potential consumers it is subject to
reassignment.
Review comment:
Comment needs an update.
##########
File path:
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -637,19 +709,27 @@ private void assignPartition(TopicPartition partition,
}
}
- private boolean canParticipateInReassignment(TopicPartition partition,
- Map<TopicPartition,
List<String>> partition2AllPotentialConsumers) {
+ private boolean canParticipateInReassignment(String topic,
+ Map<String, List<String>>
topic2AllPotentialConsumers) {
// if a partition has two or more potential consumers it is subject to
reassignment.
- return partition2AllPotentialConsumers.get(partition).size() >= 2;
+ return topic2AllPotentialConsumers.get(topic).size() >= 2;
}
private boolean canParticipateInReassignment(String consumer,
Map<String,
List<TopicPartition>> currentAssignment,
- Map<String,
List<TopicPartition>> consumer2AllPotentialPartitions,
- Map<TopicPartition,
List<String>> partition2AllPotentialConsumers) {
+ Map<String, List<String>>
consumer2AllPotentialTopics,
+ Map<String, List<String>>
topic2AllPotentialConsumers,
+ Map<String, Integer>
partitionsPerTopic,
+ int totalPartitionCount) {
List<TopicPartition> currentPartitions =
currentAssignment.get(consumer);
int currentAssignmentSize = currentPartitions.size();
- int maxAssignmentSize =
consumer2AllPotentialPartitions.get(consumer).size();
+ List<String> allSubscribedTopics =
consumer2AllPotentialTopics.get(consumer);
+ int maxAssignmentSize;
+ if (allSubscribedTopics.size() == partitionsPerTopic.size()) {
+ maxAssignmentSize = totalPartitionCount;
+ } else {
+ maxAssignmentSize = allSubscribedTopics.stream().map(topic ->
partitionsPerTopic.get(topic)).reduce(0, Integer::sum);
+ }
Review comment:
The same code block appears in lines 638-644. Is it possible to somehow
factor it out?
##########
File path:
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -469,73 +426,190 @@ private boolean allSubscriptionsEqual(Set<String>
allTopics,
TreeSet<String> sortedCurrentSubscriptions = new TreeSet<>(new
SubscriptionComparator(currentAssignment));
sortedCurrentSubscriptions.addAll(currentAssignment.keySet());
- balance(currentAssignment, prevAssignment, sortedPartitions,
unassignedPartitions, sortedCurrentSubscriptions,
- consumer2AllPotentialPartitions, partition2AllPotentialConsumers,
currentPartitionConsumer, revocationRequired);
+ balance(currentAssignment, prevAssignment, sortedAllPartitions,
unassignedPartitions, sortedCurrentSubscriptions,
+ consumer2AllPotentialTopics, topic2AllPotentialConsumers,
currentPartitionConsumer, revocationRequired,
+ partitionsPerTopic, totalPartitionsCount);
+
+ if (log.isDebugEnabled()) {
+ log.debug("final assignment: {}", currentAssignment);
+ }
+
return currentAssignment;
}
+ /**
+ * get the unassigned partition list by computing the difference set of
the sortedPartitions(all partitions)
+ * and sortedAssignedPartitions. If no assigned partitions, we'll just
return all sorted topic partitions.
+ * This is used in generalAssign method
+ *
+ * We loop the sortedPartition, and compare the ith element in
sortedAssignedPartitions(i start from 0):
+ * - if not equal to the ith element, add to unassignedPartitions
+ * - if equal to the the ith element, get next element from
sortedAssignedPartitions
+ *
+ * @param sortedAllPartitions: sorted all partitions
+ * @param sortedAssignedPartitions: sorted partitions, all are
included in the sortedPartitions
+ * @param topic2AllPotentialConsumers: topics mapped to all consumers
that subscribed to it
+ * @return partitions that aren't assigned to
any current consumer
+ */
+ private List<TopicPartition> getUnassignedPartitions(List<TopicPartition>
sortedAllPartitions,
+ List<TopicPartition>
sortedAssignedPartitions,
+ Map<String,
List<String>> topic2AllPotentialConsumers) {
+ if (sortedAssignedPartitions.isEmpty()) {
+ return sortedAllPartitions;
+ }
+
+ List<TopicPartition> unassignedPartitions = new ArrayList<>();
+
+ Collections.sort(sortedAssignedPartitions, new
PartitionComparator(topic2AllPotentialConsumers));
+
+ boolean shouldAddDirectly = false;
+ Iterator<TopicPartition> sortedAssignedPartitionsIter =
sortedAssignedPartitions.iterator();
+ TopicPartition nextAssignedPartition =
sortedAssignedPartitionsIter.next();
+
+ for (TopicPartition topicPartition : sortedAllPartitions) {
+ if (shouldAddDirectly ||
!nextAssignedPartition.equals(topicPartition)) {
+ unassignedPartitions.add(topicPartition);
+ } else {
+ // this partition is in assignedPartitions, don't add to
unassignedPartitions, just get next assigned partition
+ if (sortedAssignedPartitionsIter.hasNext()) {
+ nextAssignedPartition =
sortedAssignedPartitionsIter.next();
+ } else {
+ // add the remaining directly since there is no more
sortedAssignedPartitions
+ shouldAddDirectly = true;
+ }
+ }
+ }
+ return unassignedPartitions;
+ }
+
+ /**
+ * get the unassigned partition list by computing the difference set of
all sorted partitions
+ * and sortedAssignedPartitions. If no assigned partitions, we'll just
return all sorted topic partitions.
+ * This is used in constrainedAssign method
+ *
+ * To compute the difference set, we use two pointers technique here:
+ *
+ * We loop through the all sorted topics, and then iterate all partitions
the topic has,
+ * compared with the ith element in sortedAssignedPartitions(i starts from
0):
+ * - if not equal to the ith element, add to unassignedPartitions
+ * - if equal to the the ith element, get next element from
sortedAssignedPartitions
+ *
+ * @param totalPartitionsCount all partitions counts in this
assignment
+ * @param partitionsPerTopic the number of partitions for each
subscribed topic.
+ * @param sortedAssignedPartitions sorted partitions, all are included in
the sortedPartitions
+ * @return the partitions not yet assigned to any
consumers
+ */
+ private List<TopicPartition> getUnassignedPartitions(int
totalPartitionsCount,
+ Map<String, Integer>
partitionsPerTopic,
+ List<TopicPartition>
sortedAssignedPartitions) {
+ List<String> sortedAllTopics = new
ArrayList<>(partitionsPerTopic.keySet());
+ // sort all topics first, then we can have sorted all topic partitions
by adding partitions starting from 0
+ Collections.sort(sortedAllTopics);
+
+ if (sortedAssignedPartitions.isEmpty()) {
+ // no assigned partitions means all partitions are unassigned
partitions
+ return getAllTopicPartitions(partitionsPerTopic, sortedAllTopics,
totalPartitionsCount);
+ }
+
+ List<TopicPartition> unassignedPartitions = new
ArrayList<>(totalPartitionsCount - sortedAssignedPartitions.size());
+
+ Collections.sort(sortedAssignedPartitions,
Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition));
+
+ boolean shouldAddDirectly = false;
+ Iterator<TopicPartition> sortedAssignedPartitionsIter =
sortedAssignedPartitions.iterator();
+ TopicPartition nextAssignedPartition =
sortedAssignedPartitionsIter.next();
+
+ for (String topic : sortedAllTopics) {
+ int partitionCount = partitionsPerTopic.get(topic);
+ for (int i = 0; i < partitionCount; i++) {
+ if (shouldAddDirectly ||
!(nextAssignedPartition.topic().equals(topic) &&
nextAssignedPartition.partition() == i)) {
+ unassignedPartitions.add(new TopicPartition(topic, i));
+ } else {
+ // this partition is in assignedPartitions, don't add to
unassignedPartitions, just get next assigned partition
+ if (sortedAssignedPartitionsIter.hasNext()) {
+ nextAssignedPartition =
sortedAssignedPartitionsIter.next();
+ } else {
+ // add the remaining directly since there is no more
sortedAssignedPartitions
+ shouldAddDirectly = true;
+ }
+ }
+ }
+ }
+
+ return unassignedPartitions;
+ }
+
+ /**
+ * update the prevAssignment with the partitions, consumer and generation
in parameters
+ *
+ * @param partitions: The partitions to be updated the
prevAssignement
+ * @param consumer: The consumer Id
+ * @param prevAssignment: The assignment contains the assignment with
the 2nd largest generation
+ * @param generation: The generation of this assignment (partitions)
+ */
+ private void updatePrevAssignment(Map<TopicPartition,
ConsumerGenerationPair> prevAssignment,
+ List<TopicPartition> partitions,
+ String consumer,
+ int generation) {
+ for (TopicPartition partition: partitions) {
+ if (prevAssignment.containsKey(partition)) {
+ // only keep the latest previous assignment
+ if (generation > prevAssignment.get(partition).generation) {
+ prevAssignment.put(partition, new
ConsumerGenerationPair(consumer, generation));
+ }
+ } else {
+ prevAssignment.put(partition, new
ConsumerGenerationPair(consumer, generation));
+ }
+ }
+ }
+
+ /**
+ * filling in the prevAssignment from the subscriptions.
+ *
+ * @param subscriptions: Map from the member id to their respective
topic subscription
+ * @param prevAssignment: The assignment contains the assignment
with the 2nd largest generation
+ */
private void prepopulateCurrentAssignments(Map<String, Subscription>
subscriptions,
- Map<String,
List<TopicPartition>> currentAssignment,
Map<TopicPartition,
ConsumerGenerationPair> prevAssignment) {
// we need to process subscriptions' user data with each consumer's
reported generation in mind
// higher generations overwrite lower generations in case of a conflict
// note that a conflict could exists only if user data is for
different generations
- // for each partition we create a sorted map of its consumers by
generation
- Map<TopicPartition, TreeMap<Integer, String>>
sortedPartitionConsumersByGeneration = new HashMap<>();
for (Map.Entry<String, Subscription> subscriptionEntry:
subscriptions.entrySet()) {
String consumer = subscriptionEntry.getKey();
- MemberData memberData = memberData(subscriptionEntry.getValue());
-
- for (TopicPartition partition: memberData.partitions) {
- if
(sortedPartitionConsumersByGeneration.containsKey(partition)) {
- Map<Integer, String> consumers =
sortedPartitionConsumersByGeneration.get(partition);
- if (memberData.generation.isPresent() &&
consumers.containsKey(memberData.generation.get())) {
- // same partition is assigned to two consumers during
the same rebalance.
- // log a warning and skip this record
- log.warn("Partition '{}' is assigned to multiple
consumers following sticky assignment generation {}.",
- partition, memberData.generation);
- } else
-
consumers.put(memberData.generation.orElse(DEFAULT_GENERATION), consumer);
- } else {
- TreeMap<Integer, String> sortedConsumers = new TreeMap<>();
-
sortedConsumers.put(memberData.generation.orElse(DEFAULT_GENERATION), consumer);
- sortedPartitionConsumersByGeneration.put(partition,
sortedConsumers);
- }
+ Subscription subscription = subscriptionEntry.getValue();
+ if (subscription.userData() != null) {
+ // since this is our 2nd time to deserialize memberData,
rewind userData is necessary
+ subscription.userData().rewind();
}
Review comment:
This block didn't exist before, why is it needed now?
##########
File path:
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -384,37 +326,39 @@ private boolean allSubscriptionsEqual(Set<String>
allTopics,
*
* @param partitionsPerTopic The number of partitions for each
subscribed topic.
* @param subscriptions Map from the member id to their
respective topic subscription
+ * @param currentAssignment Each consumer's previously owned and
still-subscribed partitions
*
* @return Map from each member to the list of
partitions assigned to them.
*/
private Map<String, List<TopicPartition>> generalAssign(Map<String,
Integer> partitionsPerTopic,
- Map<String,
Subscription> subscriptions) {
- Map<String, List<TopicPartition>> currentAssignment = new HashMap<>();
+ Map<String,
Subscription> subscriptions,
+ Map<String,
List<TopicPartition>> currentAssignment) {
+ if (log.isDebugEnabled()) {
+ log.debug("performing general assign. partitionsPerTopic: {},
subscriptions: {}, currentAssignment: {}",
+ partitionsPerTopic, subscriptions, currentAssignment);
+ }
+
Map<TopicPartition, ConsumerGenerationPair> prevAssignment = new
HashMap<>();
partitionMovements = new PartitionMovements();
- prepopulateCurrentAssignments(subscriptions, currentAssignment,
prevAssignment);
+ prepopulateCurrentAssignments(subscriptions, prevAssignment);
- // a mapping of all topic partitions to all consumers that can be
assigned to them
- final Map<TopicPartition, List<String>>
partition2AllPotentialConsumers = new HashMap<>();
- // a mapping of all consumers to all potential topic partitions that
can be assigned to them
- final Map<String, List<TopicPartition>>
consumer2AllPotentialPartitions = new HashMap<>();
+ // a mapping of all topics to all consumers that can be assigned to
them
+ final Map<String, List<String>> topic2AllPotentialConsumers = new
HashMap<>(partitionsPerTopic.keySet().size());
+ // a mapping of all consumers to all potential topics that can be
assigned to them
+ final Map<String, List<String>> consumer2AllPotentialTopics = new
HashMap<>(subscriptions.keySet().size());
- // initialize partition2AllPotentialConsumers and
consumer2AllPotentialPartitions in the following two for loops
- for (Entry<String, Integer> entry: partitionsPerTopic.entrySet()) {
- for (int i = 0; i < entry.getValue(); ++i)
- partition2AllPotentialConsumers.put(new
TopicPartition(entry.getKey(), i), new ArrayList<>());
- }
+ // initialize topic2AllPotentialConsumers and
consumer2AllPotentialTopics
+ partitionsPerTopic.keySet().stream().forEach(
+ topicName -> topic2AllPotentialConsumers.put(topicName, new
ArrayList<>()));
for (Entry<String, Subscription> entry: subscriptions.entrySet()) {
String consumerId = entry.getKey();
- consumer2AllPotentialPartitions.put(consumerId, new ArrayList<>());
+ List<String> subscribedTopics = new
ArrayList<>(entry.getValue().topics().size());
+ consumer2AllPotentialTopics.put(consumerId, subscribedTopics);
entry.getValue().topics().stream().filter(topic ->
partitionsPerTopic.get(topic) != null).forEach(topic -> {
- for (int i = 0; i < partitionsPerTopic.get(topic); ++i) {
- TopicPartition topicPartition = new TopicPartition(topic,
i);
-
consumer2AllPotentialPartitions.get(consumerId).add(topicPartition);
-
partition2AllPotentialConsumers.get(topicPartition).add(consumerId);
- }
+ subscribedTopics.add(topic);
Review comment:
Right, sorry I misread that line.
##########
File path:
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -444,23 +392,32 @@ private boolean allSubscriptionsEqual(Set<String>
allTopics,
// otherwise (the consumer still exists)
for (Iterator<TopicPartition> partitionIter =
entry.getValue().iterator(); partitionIter.hasNext();) {
TopicPartition partition = partitionIter.next();
- if
(!partition2AllPotentialConsumers.containsKey(partition)) {
- // if this topic partition of this consumer no longer
exists remove it from currentAssignment of the consumer
+ if
(!topic2AllPotentialConsumers.containsKey(partition.topic())) {
+ // if this topic partition of this consumer no longer
exists, remove it from currentAssignment of the consumer
partitionIter.remove();
currentPartitionConsumer.remove(partition);
- } else if
(!subscriptions.get(entry.getKey()).topics().contains(partition.topic())) {
- // if this partition cannot remain assigned to its
current consumer because the consumer
- // is no longer subscribed to its topic remove it from
currentAssignment of the consumer
+ } else if
(!consumerSubscription.topics().contains(partition.topic())) {
+ // because the consumer is no longer subscribed to its
topic, remove it from currentAssignment of the consumer
partitionIter.remove();
revocationRequired = true;
- } else
+ } else {
// otherwise, remove the topic partition from those
that need to be assigned only if
// its current consumer is still subscribed to its
topic (because it is already assigned
// and we would want to preserve that assignment as
much as possible)
- unassignedPartitions.remove(partition);
+ assignedPartitions.add(partition);
+ }
}
}
}
+
+ // all partitions that needed to be assigned
+ List<TopicPartition> unassignedPartitions =
getUnassignedPartitions(sortedAllPartitions, assignedPartitions,
topic2AllPotentialConsumers);
+ assignedPartitions = null;
Review comment:
So this is assuming the following `balance()` call could run beyond the
next GC?
In that case imho `assignedPartitions.clear()` would look better (having
almost the same impact).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]