apoorvmittal10 commented on code in PR #20746:
URL: https://github.com/apache/kafka/pull/20746#discussion_r2461015155
##########
server/src/main/java/org/apache/kafka/server/share/fetch/PartitionMaxBytesStrategy.java:
##########
@@ -58,7 +64,27 @@ static PartitionMaxBytesStrategy type(StrategyType type) {
private static LinkedHashMap<TopicIdPartition, Integer>
uniformPartitionMaxBytes(int requestMaxBytes, Set<TopicIdPartition> partitions,
int acquiredPartitionsSize) {
checkValidArguments(requestMaxBytes, partitions,
acquiredPartitionsSize);
LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes = new
LinkedHashMap<>();
- partitions.forEach(partition -> partitionMaxBytes.put(partition,
requestMaxBytes / acquiredPartitionsSize));
+ if (requestMaxBytes >= acquiredPartitionsSize) {
+ // Case 1: requestMaxBytes can be evenly distributed within
partitions.
+ partitions.forEach(partition -> partitionMaxBytes.put(partition,
requestMaxBytes / acquiredPartitionsSize));
+ } else if (requestMaxBytes >= partitions.size()) {
+ // Case 2: we will be distributing requestMaxBytes greedily in
this scenario to prevent any starvation.
+ partitions.forEach(partition -> partitionMaxBytes.put(partition,
requestMaxBytes / partitions.size()));
Review Comment:
My concern was related to multiple reads from log or remote in same
DelayedShareFetch. As far as I remember, we introduced `acquiredPartitionsSize`
to this method as `acquiredPartitionsSize` can be greater than `partitions`
size, in this method, because there could be multiple independent reads
occurring. Now simply dividing with `partitions.size` and distributing the
bytes seems incorrect.
##########
server/src/main/java/org/apache/kafka/server/share/fetch/PartitionMaxBytesStrategy.java:
##########
@@ -38,6 +44,8 @@ public String toString() {
/**
* Returns the partition max bytes for a given partition based on the
strategy type.
+ * The partitions passed for maxBytes calculation are a subset of total
acquired partitions for the share fetch request.
Review Comment:
Not neccessarily always but `can be a subset`, right?
##########
server/src/main/java/org/apache/kafka/server/share/fetch/PartitionMaxBytesStrategy.java:
##########
@@ -38,6 +44,8 @@ public String toString() {
/**
* Returns the partition max bytes for a given partition based on the
strategy type.
+ * The partitions passed for maxBytes calculation are a subset of total
acquired partitions for the share fetch request.
+ * Thus, partitions for which we want to compute the max bytes <= acquired
partitions.
Review Comment:
Can you please explain what do we mean here.
##########
server/src/main/java/org/apache/kafka/server/share/fetch/PartitionMaxBytesStrategy.java:
##########
@@ -58,10 +66,47 @@ static PartitionMaxBytesStrategy type(StrategyType type) {
private static LinkedHashMap<TopicIdPartition, Integer>
uniformPartitionMaxBytes(int requestMaxBytes, Set<TopicIdPartition> partitions,
int acquiredPartitionsSize) {
checkValidArguments(requestMaxBytes, partitions,
acquiredPartitionsSize);
LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes = new
LinkedHashMap<>();
- partitions.forEach(partition -> partitionMaxBytes.put(partition,
requestMaxBytes / acquiredPartitionsSize));
+ if (requestMaxBytes >= acquiredPartitionsSize) {
+ // Case 1: requestMaxBytes can be evenly distributed within
partitions. If there is extra bytes left post
+ // dividing it uniformly, assign it randomly to any one of the
partitions.
+ partitions.forEach(partition -> partitionMaxBytes.put(partition,
requestMaxBytes / acquiredPartitionsSize));
+ if (requestMaxBytes % acquiredPartitionsSize != 0) {
+ TopicIdPartition randomPartition =
selectPartitionRandomly(partitionMaxBytes);
+ partitionMaxBytes.put(randomPartition,
+ (requestMaxBytes / acquiredPartitionsSize) +
(requestMaxBytes % acquiredPartitionsSize));
Review Comment:
nit: Though the calculation is trivial but not required to be repeated i.e.
`requestMaxBytes / acquiredPartitionsSize` rather fetch the existing and add.
If you don't like it then ignore the suggestion.
```suggestion
int remainingBytes = requestMaxBytes % acquiredPartitionsSize;
if (remainingBytes != 0) {
TopicIdPartition randomPartition =
selectPartitionRandomly(partitionMaxBytes);
partitionMaxBytes.put(randomPartition,
partitionMaxBytes.get(randomPartition) + remainingBytes);
```
##########
server/src/main/java/org/apache/kafka/server/share/fetch/PartitionMaxBytesStrategy.java:
##########
@@ -58,7 +64,27 @@ static PartitionMaxBytesStrategy type(StrategyType type) {
private static LinkedHashMap<TopicIdPartition, Integer>
uniformPartitionMaxBytes(int requestMaxBytes, Set<TopicIdPartition> partitions,
int acquiredPartitionsSize) {
checkValidArguments(requestMaxBytes, partitions,
acquiredPartitionsSize);
LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes = new
LinkedHashMap<>();
- partitions.forEach(partition -> partitionMaxBytes.put(partition,
requestMaxBytes / acquiredPartitionsSize));
+ if (requestMaxBytes >= acquiredPartitionsSize) {
+ // Case 1: requestMaxBytes can be evenly distributed within
partitions.
+ partitions.forEach(partition -> partitionMaxBytes.put(partition,
requestMaxBytes / acquiredPartitionsSize));
+ } else if (requestMaxBytes >= partitions.size()) {
+ // Case 2: we will be distributing requestMaxBytes greedily in
this scenario to prevent any starvation.
+ partitions.forEach(partition -> partitionMaxBytes.put(partition,
requestMaxBytes / partitions.size()));
+ } else {
+ // Case 3: we will distribute requestMaxBytes to as many
partitions possible randomly to avoid starvation.
+ List<TopicIdPartition> partitionsList = new
ArrayList<>(partitions);
+ Collections.shuffle(partitionsList);
Review Comment:
Ok.
1. Case 3 will be rare but what guarantees do you get by shuffling i.e. the
acquired partitions can be different between 2 iterations which means the
partitions can be random already, correct? However if there is a single client
with multiple partitions then this code makes sense.
2. Copying an array and shuffling seems a waste of operations and memory to
me, ofcourse you can argue. If we just want to prevent startvation then pick
any random number between (0 and partitionsList.size()) and start filling the 1
byte to each partition till you are done with requestMaxBytes. It can be a
simple implementation and should work, it doesn't require additional overhead
of copying and shuffling.
##########
server/src/main/java/org/apache/kafka/server/share/fetch/PartitionMaxBytesStrategy.java:
##########
@@ -58,10 +66,47 @@ static PartitionMaxBytesStrategy type(StrategyType type) {
private static LinkedHashMap<TopicIdPartition, Integer>
uniformPartitionMaxBytes(int requestMaxBytes, Set<TopicIdPartition> partitions,
int acquiredPartitionsSize) {
checkValidArguments(requestMaxBytes, partitions,
acquiredPartitionsSize);
LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes = new
LinkedHashMap<>();
- partitions.forEach(partition -> partitionMaxBytes.put(partition,
requestMaxBytes / acquiredPartitionsSize));
+ if (requestMaxBytes >= acquiredPartitionsSize) {
+ // Case 1: requestMaxBytes can be evenly distributed within
partitions. If there is extra bytes left post
+ // dividing it uniformly, assign it randomly to any one of the
partitions.
+ partitions.forEach(partition -> partitionMaxBytes.put(partition,
requestMaxBytes / acquiredPartitionsSize));
+ if (requestMaxBytes % acquiredPartitionsSize != 0) {
+ TopicIdPartition randomPartition =
selectPartitionRandomly(partitionMaxBytes);
+ partitionMaxBytes.put(randomPartition,
+ (requestMaxBytes / acquiredPartitionsSize) +
(requestMaxBytes % acquiredPartitionsSize));
+ }
+ } else if (requestMaxBytes >= partitions.size()) {
+ // Case 2: we will be distributing requestMaxBytes greedily in
this scenario to prevent any starvation. If
+ // there is extra bytes left post dividing it uniformly, assign it
randomly to any one of the partitions.
+ partitions.forEach(partition -> partitionMaxBytes.put(partition,
requestMaxBytes / partitions.size()));
+ if (requestMaxBytes % partitions.size() != 0) {
+ TopicIdPartition randomPartition =
selectPartitionRandomly(partitionMaxBytes);
+ partitionMaxBytes.put(randomPartition,
+ (requestMaxBytes / partitions.size()) + (requestMaxBytes %
partitions.size()));
+ }
Review Comment:
The code seems repeated in case 1 and 2, where you only need to determine
which value to divide. I think it can be simplified.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]