JimmyWang6 commented on code in PR #20246:
URL: https://github.com/apache/kafka/pull/20246#discussion_r2476291413


##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1607,16 +1624,54 @@ private ShareAcquiredRecords acquireNewBatchRecords(
             if (lastAcquiredOffset > endOffset) {
                 endOffset = lastAcquiredOffset;
             }
+            // Update lastAcquireOffset to the last offset of acquired records 
in record_limit mode
+            // Since this is required to calculate the actual number of 
offsets acquired.
+            if (isRecordLimitMode) {
+                lastAcquiredOffset = acquiredRecords.get(0).lastOffset();
+            }
             maybeUpdatePersisterGapWindowStartOffset(lastAcquiredOffset + 1);
             return new ShareAcquiredRecords(acquiredRecords, (int) 
(lastAcquiredOffset - firstAcquiredOffset + 1));
         } finally {
             lock.writeLock().unlock();
         }
     }
 
+    private AcquiredRecords filterShareAcquiredRecordsInRecordLimitMode(int 
maxFetchRecords, InFlightBatch inFlightBatch) {

Review Comment:
   @apoorvmittal10 I think you're right. I've just made the changes as you 
suggested, please take another look.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to