apoorvmittal10 commented on code in PR #20246:
URL: https://github.com/apache/kafka/pull/20246#discussion_r2463851546
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -846,25 +851,35 @@ public ShareAcquiredRecords acquire(
inFlightBatch, groupId, topicIdPartition);
continue;
}
- // Schedule acquisition lock timeout for the batch.
- AcquisitionLockTimerTask acquisitionLockTimeoutTask =
scheduleAcquisitionLockTimeout(memberId, inFlightBatch.firstOffset(),
inFlightBatch.lastOffset());
- // Set the acquisition lock timeout task for the batch.
-
inFlightBatch.updateAcquisitionLockTimeout(acquisitionLockTimeoutTask);
+ long numRecordsAcquired = inFlightBatch.lastOffset() -
inFlightBatch.firstOffset() + 1;
+ int numRecordsRemaining = maxRecordsToAcquire - acquiredCount;
+ if (isRecordLimitMode && (numRecordsAcquired >
numRecordsRemaining)) {
Review Comment:
Seems like a full batch acquisition cannot happen here, so should we not
handle this situation as like partial batch acknowledgement done above in this
method i.e.`if (!fullMatch || inFlightBatch.offsetState() != null)` should have
additional condition?
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1656,10 +1712,54 @@ private List<AcquiredRecords> createBatches(
.setLastOffset(lastAcquiredOffset)
.setDeliveryCount((short) 1));
- result.forEach(acquiredRecords -> {
- // Schedule acquisition lock timeout for the batch.
- AcquisitionLockTimerTask timerTask =
scheduleAcquisitionLockTimeout(memberId, acquiredRecords.firstOffset(),
acquiredRecords.lastOffset());
- // Add the new batch to the in-flight records along with the
acquisition lock timeout task for the batch.
+ if (isRecordLimitMode) {
+ AcquiredRecords acquiredRecords = result.get(0);
+ if (acquiredRecords.lastOffset() -
acquiredRecords.firstOffset() + 1 > maxFetchRecords) {
+ // Initialize the timeout task, as the offset-level
acquisition lock timeout task needs to be set up via
+ // batch-level acquisition lock timeout task
+ AcquisitionLockTimerTask timerTask =
scheduleAcquisitionLockTimeout(
+ memberId, acquiredRecords.firstOffset(),
acquiredRecords.lastOffset());
+ InFlightBatch inFlightBatch = new InFlightBatch(
+ timer,
+ time,
+ memberId,
+ acquiredRecords.firstOffset(),
+ acquiredRecords.lastOffset(),
+ RecordState.ACQUIRED,
+ 1,
+ timerTask,
+ timeoutHandler,
+ sharePartitionMetrics);
+ cachedState.put(acquiredRecords.firstOffset(),
inFlightBatch);
+ acquiredRecords =
filterShareAcquiredRecordsInRecordLimitMode(maxFetchRecords, inFlightBatch);
+ sharePartitionMetrics.recordInFlightBatchMessageCount(
+ acquiredRecords.lastOffset() -
acquiredRecords.firstOffset() + 1);
+ return List.of(acquiredRecords);
Review Comment:
Rather than first creating `AcquisitionLockTimerTask` for complete batch and
then cancelling same and creating new per offset seems not required. When we
know that it's first time batch is being created then why not to attach
individual offsets locks there itself i.e. create an inflight batch but with
offset based tracking by default rather filtering later.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]