junrao commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1596055964


##########
core/src/main/scala/kafka/server/DelayedFetch.scala:
##########
@@ -92,7 +92,10 @@ class DelayedFetch(
             // has just rolled, then the high watermark offset will remain the 
same but be on the old segment,
             // which would incorrectly be seen as an instance of Case F.
             if (endOffset.messageOffset != fetchOffset.messageOffset) {
-              if (endOffset.onOlderSegment(fetchOffset)) {
+              if (endOffset.messageOffsetOnly() || 
fetchOffset.messageOffsetOnly()) {
+                // This case is to handle the stale high-watermark on the 
leader until it gets updated with the correct value

Review Comment:
   Perhaps change to sth like the following.
   
   "If we don't know the position of the offset on log segments, just 
pessimistically assume that we only gained 1 byte. 
    This can happen when the high watermark is stale, but should be rare."  



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
     * Given a message offset, find its corresponding offset metadata in the 
log.
-    * If the message offset is out of range, throw an OffsetOutOfRangeException
+    * 1. If the message offset is lesser than the log-start-offset, then throw 
an OffsetOutOfRangeException
+    * 2. If the message offset is lesser than the local-log-start-offset, then 
it returns the message-only metadata
+    * 3. If the message offset is greater than the log-end-offset, then it 
returns the message-only metadata
     */
-  private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata 
= {
+  private[log] def convertToOffsetMetadataOrThrow(offset: Long): 
LogOffsetMetadata = {

Review Comment:
   `LocalLog.read()` also calls `convertToOffsetMetadataOrThrow`.
   
   ```
         else if (startOffset > maxOffsetMetadata.messageOffset)
           emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), 
includeAbortedTxns)
   ```
   It seems this could lead to infinite recursion. To avoid that, we could 
change the above code to avoid calling `convertToOffsetMetadataOrThrow` and 
return a message-only `LogOffsetMetadata` instead to `emptyFetchDataInfo`.



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
     * Given a message offset, find its corresponding offset metadata in the 
log.
-    * If the message offset is out of range, throw an OffsetOutOfRangeException
+    * 1. If the message offset is lesser than the log-start-offset, then throw 
an OffsetOutOfRangeException

Review Comment:
   Good point. Since we change the logic such that it's ok not to have the 
metadata for an offset, we could just skip the call to `checkLogStartOffset`.



##########
core/src/main/scala/kafka/server/DelayedFetch.scala:
##########
@@ -92,7 +92,10 @@ class DelayedFetch(
             // has just rolled, then the high watermark offset will remain the 
same but be on the old segment,
             // which would incorrectly be seen as an instance of Case F.
             if (endOffset.messageOffset != fetchOffset.messageOffset) {
-              if (endOffset.onOlderSegment(fetchOffset)) {
+              if (endOffset.messageOffsetOnly() || 
fetchOffset.messageOffsetOnly()) {

Review Comment:
   `fetchOffset` typically shouldn't be message only. But it doesn't hurt to 
have the check.



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
     * Given a message offset, find its corresponding offset metadata in the 
log.
-    * If the message offset is out of range, throw an OffsetOutOfRangeException
+    * 1. If the message offset is lesser than the log-start-offset, then throw 
an OffsetOutOfRangeException
+    * 2. If the message offset is lesser than the local-log-start-offset, then 
it returns the message-only metadata

Review Comment:
   lesser => less



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to