apoorvmittal10 commented on code in PR #20246:
URL: https://github.com/apache/kafka/pull/20246#discussion_r2475452012
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1607,16 +1624,54 @@ private ShareAcquiredRecords acquireNewBatchRecords(
if (lastAcquiredOffset > endOffset) {
endOffset = lastAcquiredOffset;
}
+ // Update lastAcquireOffset to the last offset of acquired records
in record_limit mode
+ // Since this is required to calculate the actual number of
offsets acquired.
+ if (isRecordLimitMode) {
+ lastAcquiredOffset = acquiredRecords.get(0).lastOffset();
+ }
maybeUpdatePersisterGapWindowStartOffset(lastAcquiredOffset + 1);
return new ShareAcquiredRecords(acquiredRecords, (int)
(lastAcquiredOffset - firstAcquiredOffset + 1));
} finally {
lock.writeLock().unlock();
}
}
+ private AcquiredRecords filterShareAcquiredRecordsInRecordLimitMode(int
maxFetchRecords, InFlightBatch inFlightBatch) {
Review Comment:
I think we can get rid of this method if we change out approach.
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -846,25 +851,35 @@ public ShareAcquiredRecords acquire(
inFlightBatch, groupId, topicIdPartition);
continue;
}
- // Schedule acquisition lock timeout for the batch.
- AcquisitionLockTimerTask acquisitionLockTimeoutTask =
scheduleAcquisitionLockTimeout(memberId, inFlightBatch.firstOffset(),
inFlightBatch.lastOffset());
- // Set the acquisition lock timeout task for the batch.
-
inFlightBatch.updateAcquisitionLockTimeout(acquisitionLockTimeoutTask);
+ long numRecordsAcquired = inFlightBatch.lastOffset() -
inFlightBatch.firstOffset() + 1;
+ int numRecordsRemaining = maxRecordsToAcquire - acquiredCount;
+ if (isRecordLimitMode && (numRecordsAcquired >
numRecordsRemaining)) {
Review Comment:
So we are in the part of the code where whole batch should be acquired, but
we eventually acquire partial bath. Which happens in phases, 1st the complete
batch and then filtering.
But we can change the approach, this part of code should not be executed as
complete batch is not being acquired rather partital hence the situation is not
other than above where we check `(!fullMatch || inFlightBatch.offsetState() !=
null)`, hence we need to add another check i.e. `(!fullMatch ||
inFlightBatch.offsetState() != null || (isRecordLimitMode &&
(numRecordsAcquired > numRecordsRemaining))` and let
`acquireSubsetBatchRecords` do the processing.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]