adixitconfluent commented on code in PR #20746:
URL: https://github.com/apache/kafka/pull/20746#discussion_r2464635004
##########
server/src/main/java/org/apache/kafka/server/share/fetch/PartitionMaxBytesStrategy.java:
##########
@@ -58,7 +64,27 @@ static PartitionMaxBytesStrategy type(StrategyType type) {
private static LinkedHashMap<TopicIdPartition, Integer>
uniformPartitionMaxBytes(int requestMaxBytes, Set<TopicIdPartition> partitions,
int acquiredPartitionsSize) {
checkValidArguments(requestMaxBytes, partitions,
acquiredPartitionsSize);
LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes = new
LinkedHashMap<>();
- partitions.forEach(partition -> partitionMaxBytes.put(partition,
requestMaxBytes / acquiredPartitionsSize));
+ if (requestMaxBytes >= acquiredPartitionsSize) {
+ // Case 1: requestMaxBytes can be evenly distributed within
partitions.
+ partitions.forEach(partition -> partitionMaxBytes.put(partition,
requestMaxBytes / acquiredPartitionsSize));
+ } else if (requestMaxBytes >= partitions.size()) {
+ // Case 2: we will be distributing requestMaxBytes greedily in
this scenario to prevent any starvation.
+ partitions.forEach(partition -> partitionMaxBytes.put(partition,
requestMaxBytes / partitions.size()));
+ } else {
+ // Case 3: we will distribute requestMaxBytes to as many
partitions possible randomly to avoid starvation.
+ List<TopicIdPartition> partitionsList = new
ArrayList<>(partitions);
+ Collections.shuffle(partitionsList);
Review Comment:
>Case 3 will be rare but what guarantees do you get by shuffling i.e. the
acquired partitions can be different between 2 iterations which means the
partitions can be random already, correct?
That is correct, but we have to take some random factor in case the
partitions are not random already
>Copying an array and shuffling seems a waste of operations and memory to
me, ofcourse you can argue. If we just want to prevent startvation then pick
any random number between (0 and partitionsList.size()) and start filling the 1
byte to each partition till you are done with requestMaxBytes. It can be a
simple implementation and should work, it doesn't require additional overhead
of copying and shuffling.
Agree, shuffling induces extra operations and memory but I don't think its
going to be that huge because its ultimately set of topic partitions that we
are dealing with. Anyways, I have incorporated your suggestion to pick any
random number between (0 and partitionsList.size()) and start filling the 1
byte to each partition till you are done with requestMaxBytes
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]