apoorvmittal10 commented on code in PR #17870:
URL: https://github.com/apache/kafka/pull/17870#discussion_r1908437561
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -207,20 +212,21 @@ LinkedHashMap<TopicIdPartition,
FetchRequest.PartitionData> acquirablePartitions
LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData>
topicPartitionData = new LinkedHashMap<>();
sharePartitions.forEach((topicIdPartition, sharePartition) -> {
- int partitionMaxBytes =
shareFetch.partitionMaxBytes().getOrDefault(topicIdPartition, 0);
// Add the share partition to the list of partitions to be fetched
only if we can
// acquire the fetch lock on it.
if (sharePartition.maybeAcquireFetchLock()) {
try {
// If the share partition is already at capacity, we
should not attempt to fetch.
if (sharePartition.canAcquireRecords()) {
+ // We do not know the total partitions that can be
acquired at this stage, hence we set maxBytes
+ // to 0 for now and will update it before doing the
replica manager fetch.
topicPartitionData.put(
topicIdPartition,
new FetchRequest.PartitionData(
topicIdPartition.topicId(),
sharePartition.nextFetchOffset(),
0,
- partitionMaxBytes,
+ 0,
Review Comment:
Should we set it 0 or some default value, just thinking what the right way
should be?
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -312,7 +323,7 @@ private boolean
isMinBytesSatisfied(LinkedHashMap<TopicIdPartition, FetchRequest
return true;
} else if
(fetchOffsetMetadata.onSameSegment(endOffsetMetadata)) {
// we take the partition fetch size as upper bound when
accumulating the bytes.
- long bytesAvailable =
Math.min(endOffsetMetadata.positionDiff(fetchOffsetMetadata),
partitionData.maxBytes);
+ long bytesAvailable =
Math.min(endOffsetMetadata.positionDiff(fetchOffsetMetadata),
shareFetch.fetchParams().maxBytes / topicPartitionData.size());
Review Comment:
This division represents the uniform division strategy so if we change our
partitionMaxBytesStrategy from UNIFORM to something else then will it be
correct?
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -207,20 +212,21 @@ LinkedHashMap<TopicIdPartition,
FetchRequest.PartitionData> acquirablePartitions
LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData>
topicPartitionData = new LinkedHashMap<>();
sharePartitions.forEach((topicIdPartition, sharePartition) -> {
- int partitionMaxBytes =
shareFetch.partitionMaxBytes().getOrDefault(topicIdPartition, 0);
// Add the share partition to the list of partitions to be fetched
only if we can
// acquire the fetch lock on it.
if (sharePartition.maybeAcquireFetchLock()) {
try {
// If the share partition is already at capacity, we
should not attempt to fetch.
if (sharePartition.canAcquireRecords()) {
+ // We do not know the total partitions that can be
acquired at this stage, hence we set maxBytes
+ // to 0 for now and will update it before doing the
replica manager fetch.
topicPartitionData.put(
topicIdPartition,
new FetchRequest.PartitionData(
topicIdPartition.topicId(),
sharePartition.nextFetchOffset(),
0,
- partitionMaxBytes,
+ 0,
Review Comment:
Do we require to create `FetchRequest.PartitionData` at this point or can
delay the creation to later when we know the maxBytes?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]