adixitconfluent commented on code in PR #20746:
URL: https://github.com/apache/kafka/pull/20746#discussion_r2464635004


##########
server/src/main/java/org/apache/kafka/server/share/fetch/PartitionMaxBytesStrategy.java:
##########
@@ -58,7 +64,27 @@ static PartitionMaxBytesStrategy type(StrategyType type) {
     private static LinkedHashMap<TopicIdPartition, Integer> 
uniformPartitionMaxBytes(int requestMaxBytes, Set<TopicIdPartition> partitions, 
int acquiredPartitionsSize) {
         checkValidArguments(requestMaxBytes, partitions, 
acquiredPartitionsSize);
         LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes = new 
LinkedHashMap<>();
-        partitions.forEach(partition -> partitionMaxBytes.put(partition, 
requestMaxBytes / acquiredPartitionsSize));
+        if (requestMaxBytes >= acquiredPartitionsSize) {
+            // Case 1: requestMaxBytes can be evenly distributed within 
partitions.
+            partitions.forEach(partition -> partitionMaxBytes.put(partition, 
requestMaxBytes / acquiredPartitionsSize));
+        } else if (requestMaxBytes >= partitions.size()) {
+            // Case 2: we will be distributing requestMaxBytes greedily in 
this scenario to prevent any starvation.
+            partitions.forEach(partition -> partitionMaxBytes.put(partition, 
requestMaxBytes / partitions.size()));
+        } else {
+            // Case 3: we will distribute requestMaxBytes to as many 
partitions possible randomly to avoid starvation.
+            List<TopicIdPartition> partitionsList = new 
ArrayList<>(partitions);
+            Collections.shuffle(partitionsList);

Review Comment:
   >Case 3 will be rare but what guarantees do you get by shuffling i.e. the 
acquired partitions can be different between 2 iterations which means the 
partitions can be random already, correct?
   
   That is correct, but we have to take some random factor in case the 
partitions are not random already
   
   >Copying an array and shuffling seems a waste of operations and memory to 
me, ofcourse you can argue. If we just want to prevent startvation then pick 
any random number between (0 and partitionsList.size()) and start filling the 1 
byte to each partition till you are done with requestMaxBytes. It can be a 
simple implementation and should work, it doesn't require additional overhead 
of copying and shuffling.
   
   Agree, shuffling induces extra operations and memory but I don't think its 
going to be that huge because its ultimately set of topic partitions that we 
are dealing with. Anyways, I have incorporated your suggestion to pick any 
random number between (0 and partitionsList.size()) and start filling the 1 
byte to each partition till you are done with requestMaxBytes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to