showuon commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1609081346


##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
     * Given a message offset, find its corresponding offset metadata in the 
log.
-    * If the message offset is out of range, throw an OffsetOutOfRangeException
+    * 1. If the message offset is lesser than the log-start-offset, then throw 
an OffsetOutOfRangeException
+    * 2. If the message offset is lesser than the local-log-start-offset, then 
it returns the message-only metadata
+    * 3. If the message offset is greater than the log-end-offset, then it 
returns the message-only metadata
     */
-  private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata 
= {
+  private[log] def convertToOffsetMetadataOrThrow(offset: Long): 
LogOffsetMetadata = {

Review Comment:
   > However, if nextOffsetMetadata doesn't change and somehow startOffset > 
maxOffsetMetadata.messageOffset, then we could loop forever.
   
   I still don't know why the infinite loop will happen. In `Locallog#read`, 
we'll check if `startOffset > nextOffsetMetadata.messageOffset`, if so, 
`OffsetOutOfRangeException` will be thrown. 
   
   ```
     def read(startOffset: Long,
              maxLength: Int,
              minOneMessage: Boolean,
              maxOffsetMetadata: LogOffsetMetadata,
              includeAbortedTxns: Boolean): FetchDataInfo = {
   ....
   
         // [Luke] We will check if startOffset > 
nextOffsetMetadata.messageOffset, so if the provided `maxOffsetMetadata == 
nextOffsetMetadata`, the `OffsetOutOfRangeException` should be thrown before 
entering another `convertToOffsetMetadataOrThrow` loop
   
         val endOffsetMetadata = nextOffsetMetadata
         val endOffset = endOffsetMetadata.messageOffset
         var segmentOpt = segments.floorSegment(startOffset)
   
         // return error on attempt to read beyond the log end offset
         if (startOffset > endOffset || !segmentOpt.isPresent)
           throw new OffsetOutOfRangeException(s"Received request for offset 
$startOffset for partition $topicPartition,  but we only have log segments upto 
$endOffset.")
   
         if (startOffset == maxOffsetMetadata.messageOffset)
           emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns)
         
        // [Luke] So for the first time, the provided `maxOffsetMetadata` might 
be highWaterMark or LastStableOffset, it could enter this `else if` condition 
because HWM/LSO should is always less than LEO. But the 2nd time won't enter 
here.
   
         else if (startOffset > maxOffsetMetadata.messageOffset)
           emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), 
includeAbortedTxns)
   ```
   So there should be no chance that we do  `Locallog#read` with these 
`maxOffsetMetadata = nextOffsetMetadata` provided entering another 
`convertToOffsetMetadataOrThrow` loop.
   
   If this did happen, it means the `logEndOffset < highWaterMarkOffset` or 
`logEndOffset < lastStableOffseet`, which I don't think this would happen.
   
   Does that make sense?
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to