showuon commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1609081346
########## core/src/main/scala/kafka/log/UnifiedLog.scala: ########## @@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Given a message offset, find its corresponding offset metadata in the log. - * If the message offset is out of range, throw an OffsetOutOfRangeException + * 1. If the message offset is lesser than the log-start-offset, then throw an OffsetOutOfRangeException + * 2. If the message offset is lesser than the local-log-start-offset, then it returns the message-only metadata + * 3. If the message offset is greater than the log-end-offset, then it returns the message-only metadata */ - private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = { + private[log] def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = { Review Comment: > However, if nextOffsetMetadata doesn't change and somehow startOffset > maxOffsetMetadata.messageOffset, then we could loop forever. I still don't know why the infinite loop will happen. In `Locallog#read`, we'll check if `startOffset > nextOffsetMetadata.messageOffset`, if so, `OffsetOutOfRangeException` will be thrown. ``` def read(startOffset: Long, maxLength: Int, minOneMessage: Boolean, maxOffsetMetadata: LogOffsetMetadata, includeAbortedTxns: Boolean): FetchDataInfo = { .... // [Luke] We will check if startOffset > nextOffsetMetadata.messageOffset, so if the provided `maxOffsetMetadata == nextOffsetMetadata`, the `OffsetOutOfRangeException` should be thrown before entering another `convertToOffsetMetadataOrThrow` loop val endOffsetMetadata = nextOffsetMetadata val endOffset = endOffsetMetadata.messageOffset var segmentOpt = segments.floorSegment(startOffset) // return error on attempt to read beyond the log end offset if (startOffset > endOffset || !segmentOpt.isPresent) throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, but we only have log segments upto $endOffset.") if (startOffset == maxOffsetMetadata.messageOffset) emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns) // [Luke] So for the first time, the provided `maxOffsetMetadata` might be highWaterMark or LastStableOffset, it could enter this `else if` condition because HWM/LSO should is always less than LEO. But the 2nd time won't enter here. else if (startOffset > maxOffsetMetadata.messageOffset) emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), includeAbortedTxns) ``` So there should be no chance that we do `Locallog#read` with these `maxOffsetMetadata = nextOffsetMetadata` provided entering another `convertToOffsetMetadataOrThrow` loop. If this did happen, it means the `logEndOffset < highWaterMarkOffset` or `logEndOffset < lastStableOffseet`, which I don't think this would happen. Does that make sense? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org