apoorvmittal10 commented on code in PR #16274:
URL: https://github.com/apache/kafka/pull/16274#discussion_r1638135999


##########
core/src/main/java/kafka/server/SharePartition.java:
##########
@@ -238,8 +245,77 @@ public static RecordState forId(byte id) {
      * @return The next fetch offset that should be fetched from the leader.
      */
     public long nextFetchOffset() {
-        // TODO: Implement the logic to compute the next fetch offset.
-        return 0;
+        /*
+        The logic for determining the next offset to fetch data from a Share 
Partition hinges on a
+        flag called findNextFetchOffset. If this flag is set to true, then the 
next fetch offset
+        should be re-computed, otherwise the next fetch offset is Share 
Partition End Offset + 1.
+        The flag is set to true in the following cases:
+        1. When some previously acquired records are acknowledged with type 
RELEASE.
+        2. When the record lock duration expires for some acquired records.
+        3. When some records are released on share session close.
+        The re-computation of next fetch offset is done by iterating over the 
cachedState and finding
+        the first available record. If no available record is found, then the 
next fetch offset is
+        set to Share Partition End Offset + 1 and findNextFetchOffset flag is 
set to false.
+        */
+        lock.writeLock().lock();
+        try {
+            // When none of the records in the cachedState are in the 
AVAILABLE state, findNextFetchOffset will be false
+            if (!findNextFetchOffset.get()) {
+                if (cachedState.isEmpty() || startOffset > 
cachedState.lastEntry().getValue().lastOffset()) {
+                    // 1. When cachedState is empty, endOffset is set to the 
next offset of the last offset removed from
+                    // batch, which is the next offset to be fetched.
+                    // 2. When startOffset has moved beyond the in-flight 
records, startOffset and endOffset point to the LSO,
+                    // which is the next offset to be fetched.
+                    return endOffset;
+                } else {
+                    return endOffset + 1;
+                }
+            }
+
+            // If this piece of code is reached, it means that 
findNextFetchOffset is true

Review Comment:
   I have added some tests which verifies some scenarios with 
`findNextFetchOffset` as true. For more tests I require `acknowledge` 
functionality and will be adding those in next PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to