apoorvmittal10 commented on code in PR #20746:
URL: https://github.com/apache/kafka/pull/20746#discussion_r2461015155


##########
server/src/main/java/org/apache/kafka/server/share/fetch/PartitionMaxBytesStrategy.java:
##########
@@ -58,7 +64,27 @@ static PartitionMaxBytesStrategy type(StrategyType type) {
     private static LinkedHashMap<TopicIdPartition, Integer> 
uniformPartitionMaxBytes(int requestMaxBytes, Set<TopicIdPartition> partitions, 
int acquiredPartitionsSize) {
         checkValidArguments(requestMaxBytes, partitions, 
acquiredPartitionsSize);
         LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes = new 
LinkedHashMap<>();
-        partitions.forEach(partition -> partitionMaxBytes.put(partition, 
requestMaxBytes / acquiredPartitionsSize));
+        if (requestMaxBytes >= acquiredPartitionsSize) {
+            // Case 1: requestMaxBytes can be evenly distributed within 
partitions.
+            partitions.forEach(partition -> partitionMaxBytes.put(partition, 
requestMaxBytes / acquiredPartitionsSize));
+        } else if (requestMaxBytes >= partitions.size()) {
+            // Case 2: we will be distributing requestMaxBytes greedily in 
this scenario to prevent any starvation.
+            partitions.forEach(partition -> partitionMaxBytes.put(partition, 
requestMaxBytes / partitions.size()));

Review Comment:
   My concern was related to multiple reads from log or remote in same 
DelayedShareFetch. As far as I remember, we introduced `acquiredPartitionsSize` 
to this method as `acquiredPartitionsSize` can be greater than `partitions` 
size, in this method, because there could be multiple independent reads 
occurring. Now simply dividing with `partitions.size` and distributing the 
bytes seems incorrect. 



##########
server/src/main/java/org/apache/kafka/server/share/fetch/PartitionMaxBytesStrategy.java:
##########
@@ -38,6 +44,8 @@ public String toString() {
 
     /**
      * Returns the partition max bytes for a given partition based on the 
strategy type.
+     * The partitions passed for maxBytes calculation are a subset of total 
acquired partitions for the share fetch request.

Review Comment:
   Not neccessarily always but `can be a subset`, right?



##########
server/src/main/java/org/apache/kafka/server/share/fetch/PartitionMaxBytesStrategy.java:
##########
@@ -38,6 +44,8 @@ public String toString() {
 
     /**
      * Returns the partition max bytes for a given partition based on the 
strategy type.
+     * The partitions passed for maxBytes calculation are a subset of total 
acquired partitions for the share fetch request.
+     * Thus, partitions for which we want to compute the max bytes <= acquired 
partitions.

Review Comment:
   Can you please explain what do we mean here.



##########
server/src/main/java/org/apache/kafka/server/share/fetch/PartitionMaxBytesStrategy.java:
##########
@@ -58,10 +66,47 @@ static PartitionMaxBytesStrategy type(StrategyType type) {
     private static LinkedHashMap<TopicIdPartition, Integer> 
uniformPartitionMaxBytes(int requestMaxBytes, Set<TopicIdPartition> partitions, 
int acquiredPartitionsSize) {
         checkValidArguments(requestMaxBytes, partitions, 
acquiredPartitionsSize);
         LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes = new 
LinkedHashMap<>();
-        partitions.forEach(partition -> partitionMaxBytes.put(partition, 
requestMaxBytes / acquiredPartitionsSize));
+        if (requestMaxBytes >= acquiredPartitionsSize) {
+            // Case 1: requestMaxBytes can be evenly distributed within 
partitions. If there is extra bytes left post
+            // dividing it uniformly, assign it randomly to any one of the 
partitions.
+            partitions.forEach(partition -> partitionMaxBytes.put(partition, 
requestMaxBytes / acquiredPartitionsSize));
+            if (requestMaxBytes % acquiredPartitionsSize != 0) {
+                TopicIdPartition randomPartition = 
selectPartitionRandomly(partitionMaxBytes);
+                partitionMaxBytes.put(randomPartition,
+                    (requestMaxBytes / acquiredPartitionsSize) + 
(requestMaxBytes % acquiredPartitionsSize));

Review Comment:
   nit: Though the calculation is trivial but not required to be repeated i.e. 
`requestMaxBytes / acquiredPartitionsSize` rather fetch the existing and add.
   
   If you don't like it then ignore the suggestion.
   
   ```suggestion
               int remainingBytes = requestMaxBytes % acquiredPartitionsSize;
               if (remainingBytes != 0) {
                   TopicIdPartition randomPartition = 
selectPartitionRandomly(partitionMaxBytes);
                   partitionMaxBytes.put(randomPartition,
                       partitionMaxBytes.get(randomPartition) + remainingBytes);
   ```



##########
server/src/main/java/org/apache/kafka/server/share/fetch/PartitionMaxBytesStrategy.java:
##########
@@ -58,7 +64,27 @@ static PartitionMaxBytesStrategy type(StrategyType type) {
     private static LinkedHashMap<TopicIdPartition, Integer> 
uniformPartitionMaxBytes(int requestMaxBytes, Set<TopicIdPartition> partitions, 
int acquiredPartitionsSize) {
         checkValidArguments(requestMaxBytes, partitions, 
acquiredPartitionsSize);
         LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes = new 
LinkedHashMap<>();
-        partitions.forEach(partition -> partitionMaxBytes.put(partition, 
requestMaxBytes / acquiredPartitionsSize));
+        if (requestMaxBytes >= acquiredPartitionsSize) {
+            // Case 1: requestMaxBytes can be evenly distributed within 
partitions.
+            partitions.forEach(partition -> partitionMaxBytes.put(partition, 
requestMaxBytes / acquiredPartitionsSize));
+        } else if (requestMaxBytes >= partitions.size()) {
+            // Case 2: we will be distributing requestMaxBytes greedily in 
this scenario to prevent any starvation.
+            partitions.forEach(partition -> partitionMaxBytes.put(partition, 
requestMaxBytes / partitions.size()));
+        } else {
+            // Case 3: we will distribute requestMaxBytes to as many 
partitions possible randomly to avoid starvation.
+            List<TopicIdPartition> partitionsList = new 
ArrayList<>(partitions);
+            Collections.shuffle(partitionsList);

Review Comment:
   Ok.
   
   1. Case 3 will be rare but what guarantees do you get by shuffling i.e. the 
acquired partitions can be different between 2 iterations which means the 
partitions can be random already, correct? However if there is a single client 
with multiple partitions then this code makes sense.
   
   2. Copying an array and shuffling seems a waste of operations and memory to 
me, ofcourse you can argue. If we just want to prevent startvation then pick 
any random number between (0 and partitionsList.size()) and start filling the 1 
byte to each partition till you are done with requestMaxBytes. It can be a 
simple implementation and should work, it doesn't require additional overhead 
of copying and shuffling.



##########
server/src/main/java/org/apache/kafka/server/share/fetch/PartitionMaxBytesStrategy.java:
##########
@@ -58,10 +66,47 @@ static PartitionMaxBytesStrategy type(StrategyType type) {
     private static LinkedHashMap<TopicIdPartition, Integer> 
uniformPartitionMaxBytes(int requestMaxBytes, Set<TopicIdPartition> partitions, 
int acquiredPartitionsSize) {
         checkValidArguments(requestMaxBytes, partitions, 
acquiredPartitionsSize);
         LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes = new 
LinkedHashMap<>();
-        partitions.forEach(partition -> partitionMaxBytes.put(partition, 
requestMaxBytes / acquiredPartitionsSize));
+        if (requestMaxBytes >= acquiredPartitionsSize) {
+            // Case 1: requestMaxBytes can be evenly distributed within 
partitions. If there is extra bytes left post
+            // dividing it uniformly, assign it randomly to any one of the 
partitions.
+            partitions.forEach(partition -> partitionMaxBytes.put(partition, 
requestMaxBytes / acquiredPartitionsSize));
+            if (requestMaxBytes % acquiredPartitionsSize != 0) {
+                TopicIdPartition randomPartition = 
selectPartitionRandomly(partitionMaxBytes);
+                partitionMaxBytes.put(randomPartition,
+                    (requestMaxBytes / acquiredPartitionsSize) + 
(requestMaxBytes % acquiredPartitionsSize));
+            }
+        } else if (requestMaxBytes >= partitions.size()) {
+            // Case 2: we will be distributing requestMaxBytes greedily in 
this scenario to prevent any starvation. If
+            // there is extra bytes left post dividing it uniformly, assign it 
randomly to any one of the partitions.
+            partitions.forEach(partition -> partitionMaxBytes.put(partition, 
requestMaxBytes / partitions.size()));
+            if (requestMaxBytes % partitions.size() != 0) {
+                TopicIdPartition randomPartition = 
selectPartitionRandomly(partitionMaxBytes);
+                partitionMaxBytes.put(randomPartition,
+                    (requestMaxBytes / partitions.size()) + (requestMaxBytes % 
partitions.size()));
+            }

Review Comment:
   The code seems repeated in case 1 and 2, where you only need to determine 
which value to divide. I think it can be simplified.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to