JimmyWang6 commented on code in PR #20246:
URL: https://github.com/apache/kafka/pull/20246#discussion_r2476291413
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1607,16 +1624,54 @@ private ShareAcquiredRecords acquireNewBatchRecords(
if (lastAcquiredOffset > endOffset) {
endOffset = lastAcquiredOffset;
}
+ // Update lastAcquireOffset to the last offset of acquired records
in record_limit mode
+ // Since this is required to calculate the actual number of
offsets acquired.
+ if (isRecordLimitMode) {
+ lastAcquiredOffset = acquiredRecords.get(0).lastOffset();
+ }
maybeUpdatePersisterGapWindowStartOffset(lastAcquiredOffset + 1);
return new ShareAcquiredRecords(acquiredRecords, (int)
(lastAcquiredOffset - firstAcquiredOffset + 1));
} finally {
lock.writeLock().unlock();
}
}
+ private AcquiredRecords filterShareAcquiredRecordsInRecordLimitMode(int
maxFetchRecords, InFlightBatch inFlightBatch) {
Review Comment:
@apoorvmittal10 I think you're right. I've just made the changes as you
suggested, please take another look.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]