AndrewJSchofield commented on code in PR #17322:
URL: https://github.com/apache/kafka/pull/17322#discussion_r1812547139
##########
core/src/main/java/kafka/server/share/ShareFetchUtils.java:
##########
@@ -84,24 +88,31 @@ static Map<TopicIdPartition,
ShareFetchResponseData.PartitionData> processFetchR
partitionData.setErrorMessage(Errors.NONE.message());
}
} else {
- List<AcquiredRecords> acquiredRecords =
sharePartition.acquire(shareFetchData.memberId(), fetchPartitionData);
+ ShareAcquiredRecords shareAcquiredRecords =
sharePartition.acquire(shareFetchData.memberId(),
shareFetchData.maxFetchRecords() - acquiredRecordsCount, fetchPartitionData);
log.trace("Acquired records for topicIdPartition: {} with
share fetch data: {}, records: {}",
- topicIdPartition, shareFetchData, acquiredRecords);
+ topicIdPartition, shareFetchData, shareAcquiredRecords);
// Maybe, in the future, check if no records are acquired, and
we want to retry
// replica manager fetch. Depends on the share partition
manager implementation,
// if we want parallel requests for the same share partition
or not.
- if (acquiredRecords.isEmpty()) {
+ if (shareAcquiredRecords.records().isEmpty()) {
partitionData
.setRecords(null)
.setAcquiredRecords(Collections.emptyList());
} else {
partitionData
+ // We set the records to the fetchPartitionData
records. We do not alter the records
+ // fetched from the replica manager as they follow
zero copy buffer. The acquired records
Review Comment:
@chia7712 The aim is to drive this from the consumer. For some workloads, it
will be critical not to over-fetch records because the processing time is
significant and we just risk lots of acquisition lock timeouts and wasted
fetched records. For others, we want pre-fetching to allow overlapping of
fetching and processing. This PR lets us start to experiment with this area.
I'll make a partner PR for the consumer soon (but not yet changing the protocol
because that will be the final step when we have decided how to do it nicely).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]