apoorvmittal10 commented on code in PR #20246:
URL: https://github.com/apache/kafka/pull/20246#discussion_r2463851546


##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -846,25 +851,35 @@ public ShareAcquiredRecords acquire(
                         inFlightBatch, groupId, topicIdPartition);
                     continue;
                 }
-                // Schedule acquisition lock timeout for the batch.
-                AcquisitionLockTimerTask acquisitionLockTimeoutTask = 
scheduleAcquisitionLockTimeout(memberId, inFlightBatch.firstOffset(), 
inFlightBatch.lastOffset());
-                // Set the acquisition lock timeout task for the batch.
-                
inFlightBatch.updateAcquisitionLockTimeout(acquisitionLockTimeoutTask);
+                long numRecordsAcquired = inFlightBatch.lastOffset() - 
inFlightBatch.firstOffset() + 1;
+                int numRecordsRemaining = maxRecordsToAcquire - acquiredCount;
+                if (isRecordLimitMode && (numRecordsAcquired > 
numRecordsRemaining)) {

Review Comment:
   Seems like a full batch acquisition cannot happen here, so should we not 
handle this situation as like partial batch acknowledgement done above in this 
method i.e.`if (!fullMatch || inFlightBatch.offsetState() != null)` should have 
additional condition?



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1656,10 +1712,54 @@ private List<AcquiredRecords> createBatches(
                 .setLastOffset(lastAcquiredOffset)
                 .setDeliveryCount((short) 1));
 
-            result.forEach(acquiredRecords -> {
-                // Schedule acquisition lock timeout for the batch.
-                AcquisitionLockTimerTask timerTask = 
scheduleAcquisitionLockTimeout(memberId, acquiredRecords.firstOffset(), 
acquiredRecords.lastOffset());
-                // Add the new batch to the in-flight records along with the 
acquisition lock timeout task for the batch.
+            if (isRecordLimitMode) {
+                AcquiredRecords acquiredRecords = result.get(0);
+                if (acquiredRecords.lastOffset() - 
acquiredRecords.firstOffset() + 1 > maxFetchRecords) {
+                    // Initialize the timeout task, as the offset-level 
acquisition lock timeout task needs to be set up via
+                    // batch-level acquisition lock timeout task
+                    AcquisitionLockTimerTask timerTask = 
scheduleAcquisitionLockTimeout(
+                        memberId, acquiredRecords.firstOffset(), 
acquiredRecords.lastOffset());
+                    InFlightBatch inFlightBatch = new InFlightBatch(
+                        timer,
+                        time,
+                        memberId,
+                        acquiredRecords.firstOffset(),
+                        acquiredRecords.lastOffset(),
+                        RecordState.ACQUIRED,
+                        1,
+                        timerTask,
+                        timeoutHandler,
+                        sharePartitionMetrics);
+                    cachedState.put(acquiredRecords.firstOffset(), 
inFlightBatch);
+                    acquiredRecords = 
filterShareAcquiredRecordsInRecordLimitMode(maxFetchRecords, inFlightBatch);
+                    sharePartitionMetrics.recordInFlightBatchMessageCount(
+                        acquiredRecords.lastOffset() - 
acquiredRecords.firstOffset() + 1);
+                    return List.of(acquiredRecords);

Review Comment:
   Rather than first creating `AcquisitionLockTimerTask` for complete batch and 
then cancelling same and creating new per offset seems not required. When we 
know that it's first time batch is being created then why not to attach 
individual offsets locks there itself i.e. create an inflight batch but with 
offset based tracking by default rather filtering later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to