adixitconfluent commented on code in PR #17870:
URL: https://github.com/apache/kafka/pull/17870#discussion_r1912173308
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -390,18 +407,27 @@ private void handleFetchException(
}
// Visible for testing.
- LinkedHashMap<TopicIdPartition, LogReadResult>
combineLogReadResponse(LinkedHashMap<TopicIdPartition,
FetchRequest.PartitionData> topicPartitionData,
+ LinkedHashMap<TopicIdPartition, LogReadResult>
combineLogReadResponse(LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
LinkedHashMap<TopicIdPartition, LogReadResult> existingFetchedData) {
- LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData>
missingLogReadTopicPartitions = new LinkedHashMap<>();
- topicPartitionData.forEach((topicIdPartition, partitionData) -> {
+ LinkedHashMap<TopicIdPartition, Long> missingLogReadTopicPartitions =
new LinkedHashMap<>();
+ topicPartitionData.forEach((topicIdPartition, fetchOffset) -> {
if (!existingFetchedData.containsKey(topicIdPartition)) {
- missingLogReadTopicPartitions.put(topicIdPartition,
partitionData);
+ missingLogReadTopicPartitions.put(topicIdPartition,
fetchOffset);
}
});
if (missingLogReadTopicPartitions.isEmpty()) {
return existingFetchedData;
}
- LinkedHashMap<TopicIdPartition, LogReadResult>
missingTopicPartitionsLogReadResponse =
readFromLog(missingLogReadTopicPartitions);
+
+ // Computing the total bytes that has already been fetched for the
existing fetched data.
+ int totalPartitionMaxBytesUsed = 0;
+ for (LogReadResult logReadResult : existingFetchedData.values()) {
+ totalPartitionMaxBytesUsed +=
logReadResult.info().records.sizeInBytes();
+ }
+
Review Comment:
@apoorvmittal10, Yes, we can do that as well. But, I believe that this
dynamic approach of computing `partitionMaxBytes` for leftover partitions if
you know the bytes fetched for some partitions, is better blindly using the
approach that we just divide it equally. Because, even if we set
`partitionMaxBytes` of some partitions as 2MB, they may not have enough data
produced in them so we might only fetch for example - 0.5 MB for those
partitions. Now, using this dynamic approach, we give ourselves the opportunity
to possibly fetch more data from the leftover partitions (which would be
benefitial in the scenario if they have a heavier produce than the former
partitions).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]