apoorvmittal10 commented on code in PR #20253:
URL: https://github.com/apache/kafka/pull/20253#discussion_r2237892481


##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1449,6 +1461,46 @@ private void maybeUpdateReadGapFetchOffset(long offset) {
         }
     }
 
+    private LastOffsetAndMaxRecords lastOffsetAndMaxRecordsToAcquire(long 
fetchOffset, int maxFetchRecords, long lastOffset) {
+        // There can always be records fetched exceeding the max in-flight 
messages limit. Hence,
+        // we need to check if the share partition has reached the max 
in-flight messages limit
+        // and only acquire limited records.
+        int maxRecordsToAcquire;
+        long lastOffsetToAcquire = lastOffset;
+        lock.readLock().lock();
+        try {
+            int inFlightRecordsCount = numInFlightRecords();
+            // Take minimum of maxFetchRecords and remaining capacity to fill 
max in-flight messages limit.
+            maxRecordsToAcquire = Math.min(maxFetchRecords, 
maxInFlightMessages - inFlightRecordsCount);

Review Comment:
   I have noted this. I wll send a separate patch for this. As I have to change 
a lot of comments and it will be a big refactor. I am avoiding it in this PR, 
else the main change of the PR will be hard to determine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to