apoorvmittal10 commented on code in PR #19010:
URL: https://github.com/apache/kafka/pull/19010#discussion_r1966868720


##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1061,33 +1098,103 @@ void updateCacheAndOffsets(long logStartOffset) {
         }
     }
 
+    /**
+     * The method archives the available records in the cached state that are 
between the fetch offset
+     * and the base offset of the first fetched batch. This method is required 
to handle the compacted
+     * topics where the already fetched batch which is marked re-available, 
might not result in subsequent
+     * fetch response from log. Hence, the batches need to be archived to 
allow the SPSO and next fetch
+     * offset to progress.
+     *
+     * @param fetchOffset The fetch offset.
+     * @param baseOffset  The base offset of the first fetched batch.
+     */
+    private void maybeArchiveStaleBatches(long fetchOffset, long baseOffset) {
+        lock.writeLock().lock();
+        try {
+            // If the fetch happens from within a batch then fetchOffset can 
be ahead of base offset else
+            // should be same as baseOffset of the first fetched batch. 
Otherwise, we might need to archive
+            // some stale batches.
+            if (cachedState.isEmpty() || fetchOffset >= baseOffset) {
+                // No stale batches to archive.
+                return;
+            }
+
+            // The fetch offset can exist in the middle of the batch. Hence, 
find the floor offset
+            // for the fetch offset and then find the sub-map from the floor 
offset to the base offset.
+            long mapFetchOffset = fetchOffset;
+            Map.Entry<Long, InFlightBatch> floorOffset = 
cachedState.floorEntry(fetchOffset);
+            if (floorOffset != null && floorOffset.getValue().lastOffset() >= 
fetchOffset) {
+                mapFetchOffset = floorOffset.getKey();
+            }
+
+            NavigableMap<Long, InFlightBatch> subMap = 
cachedState.subMap(mapFetchOffset, true, baseOffset, false);
+            if (subMap.isEmpty()) {
+                // No stale batches to archive.
+                return;
+            }
+            // Though such batches can be removed from the cache, but it is 
better to archive them so
+            // that they are never acquired again.
+            boolean anyRecordArchived = archiveAvailableRecords(fetchOffset, 
baseOffset, subMap);
+            // If we have transitioned the state of any batch/offset from 
AVAILABLE to ARCHIVED,
+            // then there is a chance that the next fetch offset can change.
+            if (anyRecordArchived) {
+                findNextFetchOffset.set(true);
+            }
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    /**
+     * The method archives the available records in the cached state that are 
before the log start offset.
+     *
+     * @param logStartOffset The log start offset.
+     * @return A boolean which indicates whether any record is archived or not.
+     */
     private boolean archiveAvailableRecordsOnLsoMovement(long logStartOffset) {
+        lock.writeLock().lock();
+        try {
+            return archiveAvailableRecords(startOffset, logStartOffset, 
cachedState);
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    /**
+     * The method archive the available records in the given map that are 
before the end offset.
+     *
+     * @param startOffset The offset from which the available records should 
be archived.
+     * @param endOffset The offset before which the available records should 
be archived.
+     * @param map The map containing the in-flight records.
+     * @return A boolean which indicates whether any record is archived or not.
+     */
+    private boolean archiveAvailableRecords(long startOffset, long endOffset, 
NavigableMap<Long, InFlightBatch> map) {
         lock.writeLock().lock();

Review Comment:
   Yes, but as a good practive I have added the lock so if the method is used 
independently in future then this shall not create problem. The lock is 
re-entrant lock.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to