apoorvmittal10 commented on code in PR #20246:
URL: https://github.com/apache/kafka/pull/20246#discussion_r2475452012


##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1607,16 +1624,54 @@ private ShareAcquiredRecords acquireNewBatchRecords(
             if (lastAcquiredOffset > endOffset) {
                 endOffset = lastAcquiredOffset;
             }
+            // Update lastAcquireOffset to the last offset of acquired records 
in record_limit mode
+            // Since this is required to calculate the actual number of 
offsets acquired.
+            if (isRecordLimitMode) {
+                lastAcquiredOffset = acquiredRecords.get(0).lastOffset();
+            }
             maybeUpdatePersisterGapWindowStartOffset(lastAcquiredOffset + 1);
             return new ShareAcquiredRecords(acquiredRecords, (int) 
(lastAcquiredOffset - firstAcquiredOffset + 1));
         } finally {
             lock.writeLock().unlock();
         }
     }
 
+    private AcquiredRecords filterShareAcquiredRecordsInRecordLimitMode(int 
maxFetchRecords, InFlightBatch inFlightBatch) {

Review Comment:
   I think we can get rid of this method if we change out approach.



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -846,25 +851,35 @@ public ShareAcquiredRecords acquire(
                         inFlightBatch, groupId, topicIdPartition);
                     continue;
                 }
-                // Schedule acquisition lock timeout for the batch.
-                AcquisitionLockTimerTask acquisitionLockTimeoutTask = 
scheduleAcquisitionLockTimeout(memberId, inFlightBatch.firstOffset(), 
inFlightBatch.lastOffset());
-                // Set the acquisition lock timeout task for the batch.
-                
inFlightBatch.updateAcquisitionLockTimeout(acquisitionLockTimeoutTask);
+                long numRecordsAcquired = inFlightBatch.lastOffset() - 
inFlightBatch.firstOffset() + 1;
+                int numRecordsRemaining = maxRecordsToAcquire - acquiredCount;
+                if (isRecordLimitMode && (numRecordsAcquired > 
numRecordsRemaining)) {

Review Comment:
   So we are in the part of the code where whole batch should be acquired, but 
we eventually acquire partial bath. Which happens in phases, 1st the complete 
batch and then filtering.
   
   But we can change the approach, this part of code should not be executed as 
complete batch is not being acquired rather partital hence the situation is not 
other than above where we check `(!fullMatch || inFlightBatch.offsetState() != 
null)`, hence we need to add another check i.e. `(!fullMatch || 
inFlightBatch.offsetState() != null || (isRecordLimitMode && 
(numRecordsAcquired > numRecordsRemaining))` and let 
`acquireSubsetBatchRecords` do the processing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to