showuon commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1609081346
##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
/**
* Given a message offset, find its corresponding offset metadata in the
log.
- * If the message offset is out of range, throw an OffsetOutOfRangeException
+ * 1. If the message offset is lesser than the log-start-offset, then throw
an OffsetOutOfRangeException
+ * 2. If the message offset is lesser than the local-log-start-offset, then
it returns the message-only metadata
+ * 3. If the message offset is greater than the log-end-offset, then it
returns the message-only metadata
*/
- private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata
= {
+ private[log] def convertToOffsetMetadataOrThrow(offset: Long):
LogOffsetMetadata = {
Review Comment:
> However, if nextOffsetMetadata doesn't change and somehow startOffset >
maxOffsetMetadata.messageOffset, then we could loop forever.
I still don't know why the infinite loop will happen. In `Locallog#read`,
we'll check if `startOffset > nextOffsetMetadata.messageOffset`, if so,
`OffsetOutOfRangeException` will be thrown.
```
def read(startOffset: Long,
maxLength: Int,
minOneMessage: Boolean,
maxOffsetMetadata: LogOffsetMetadata,
includeAbortedTxns: Boolean): FetchDataInfo = {
....
// [Luke] We will check if startOffset >
nextOffsetMetadata.messageOffset,
// so if the provided `maxOffsetMetadata == nextOffsetMetadata`,
// the `OffsetOutOfRangeException` should be thrown before entering
another `convertToOffsetMetadataOrThrow` loop
val endOffsetMetadata = nextOffsetMetadata
val endOffset = endOffsetMetadata.messageOffset
var segmentOpt = segments.floorSegment(startOffset)
// return error on attempt to read beyond the log end offset
if (startOffset > endOffset || !segmentOpt.isPresent)
throw new OffsetOutOfRangeException(s"Received request for offset
$startOffset for partition $topicPartition, but we only have log segments upto
$endOffset.")
if (startOffset == maxOffsetMetadata.messageOffset)
emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns)
// [Luke] So for the first time, the provided `maxOffsetMetadata` might
be highWaterMark or LastStableOffset,
// it could enter this `else if` condition because HWM/LSO should is
always less than LEO. But the 2nd time won't enter here.
else if (startOffset > maxOffsetMetadata.messageOffset)
emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset),
includeAbortedTxns)
```
So there should be no chance that we do `Locallog#read` with these
`maxOffsetMetadata = nextOffsetMetadata` provided entering another
`convertToOffsetMetadataOrThrow` loop.
If this did happen, it means the `logEndOffset < highWaterMarkOffset` or
`logEndOffset < lastStableOffseet`, which I don't think this would happen.
Does that make sense?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]