adixitconfluent commented on code in PR #17870:
URL: https://github.com/apache/kafka/pull/17870#discussion_r1912187645
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -390,18 +407,27 @@ private void handleFetchException(
}
// Visible for testing.
- LinkedHashMap<TopicIdPartition, LogReadResult>
combineLogReadResponse(LinkedHashMap<TopicIdPartition,
FetchRequest.PartitionData> topicPartitionData,
+ LinkedHashMap<TopicIdPartition, LogReadResult>
combineLogReadResponse(LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
LinkedHashMap<TopicIdPartition, LogReadResult> existingFetchedData) {
- LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData>
missingLogReadTopicPartitions = new LinkedHashMap<>();
- topicPartitionData.forEach((topicIdPartition, partitionData) -> {
+ LinkedHashMap<TopicIdPartition, Long> missingLogReadTopicPartitions =
new LinkedHashMap<>();
+ topicPartitionData.forEach((topicIdPartition, fetchOffset) -> {
if (!existingFetchedData.containsKey(topicIdPartition)) {
- missingLogReadTopicPartitions.put(topicIdPartition,
partitionData);
+ missingLogReadTopicPartitions.put(topicIdPartition,
fetchOffset);
}
});
if (missingLogReadTopicPartitions.isEmpty()) {
return existingFetchedData;
}
- LinkedHashMap<TopicIdPartition, LogReadResult>
missingTopicPartitionsLogReadResponse =
readFromLog(missingLogReadTopicPartitions);
+
+ // Computing the total bytes that has already been fetched for the
existing fetched data.
+ int totalPartitionMaxBytesUsed = 0;
+ for (LogReadResult logReadResult : existingFetchedData.values()) {
+ totalPartitionMaxBytesUsed +=
logReadResult.info().records.sizeInBytes();
+ }
+
Review Comment:
yeah, this is a slight better version of UNIFORM strategy. The other
strategies might already depend on produce volume, so it may already be handled
over there and this code might not be useful. Okay, I'll change it to a basic
version where partitionMaxBytes for leftover partitions will solely depend upon
acquired topic partitions size.
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -390,18 +407,27 @@ private void handleFetchException(
}
// Visible for testing.
- LinkedHashMap<TopicIdPartition, LogReadResult>
combineLogReadResponse(LinkedHashMap<TopicIdPartition,
FetchRequest.PartitionData> topicPartitionData,
+ LinkedHashMap<TopicIdPartition, LogReadResult>
combineLogReadResponse(LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
LinkedHashMap<TopicIdPartition, LogReadResult> existingFetchedData) {
- LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData>
missingLogReadTopicPartitions = new LinkedHashMap<>();
- topicPartitionData.forEach((topicIdPartition, partitionData) -> {
+ LinkedHashMap<TopicIdPartition, Long> missingLogReadTopicPartitions =
new LinkedHashMap<>();
+ topicPartitionData.forEach((topicIdPartition, fetchOffset) -> {
if (!existingFetchedData.containsKey(topicIdPartition)) {
- missingLogReadTopicPartitions.put(topicIdPartition,
partitionData);
+ missingLogReadTopicPartitions.put(topicIdPartition,
fetchOffset);
}
});
if (missingLogReadTopicPartitions.isEmpty()) {
return existingFetchedData;
}
- LinkedHashMap<TopicIdPartition, LogReadResult>
missingTopicPartitionsLogReadResponse =
readFromLog(missingLogReadTopicPartitions);
+
+ // Computing the total bytes that has already been fetched for the
existing fetched data.
+ int totalPartitionMaxBytesUsed = 0;
+ for (LogReadResult logReadResult : existingFetchedData.values()) {
+ totalPartitionMaxBytesUsed +=
logReadResult.info().records.sizeInBytes();
+ }
+
Review Comment:
yeah, this is a slight better version of UNIFORM strategy. The other
strategies might already depend on produce volume, so it may already be handled
over there and this code might not be useful. Okay, I'll change it to a basic
version where `partitionMaxBytes` for leftover partitions will solely depend
upon acquired topic partitions size.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]