Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
showuon merged PR #15825: URL: https://github.com/apache/kafka/pull/15825 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
junrao commented on PR #15825: URL: https://github.com/apache/kafka/pull/15825#issuecomment-2130475760 @showuon : Any other comments from you? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
kamalcph commented on PR #15825: URL: https://github.com/apache/kafka/pull/15825#issuecomment-2129976827 Test failures are unrelated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
junrao commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1612186225 ## core/src/main/scala/kafka/server/DelayedFetch.scala: ## @@ -103,7 +103,7 @@ class DelayedFetch( // We will not force complete the fetch request if a replica should be throttled. if (!params.isFromFollower || !replicaManager.shouldLeaderThrottle(quota, partition, params.replicaId)) return forceComplete() - } else if (fetchOffset.messageOffset < endOffset.messageOffset) { + } else if (fetchOffset.onSameSegment(endOffset) && fetchOffset.messageOffset < endOffset.messageOffset) { Review Comment: Hmm, we assume that `fetchOffset.messageOffset > endOffset.messageOffset` is the truncated leader case. In that case, we should always call `forceComplete()` immediately, right? The current code only calls `forceComplete()` when the offset metadata is available, but it's more consistent to do that regardless of the availability of the offset metadata. Should we do sth like the following? ``` if (fetchOffset.messageOffset > endOffset.messageOffset) { // Case F, this can happen when the new fetch operation is on a truncated leader debug(s"Satisfying fetch $this since it is fetching later segments of partition $topicIdPartition.") return forceComplete() } else if (fetchOffset.messageOffset < endOffset.messageOffset) { if (fetchOffset.onOlderSegment(endOffset)) { // Case F, this can happen when the fetch operation is falling behind the current segment // or the partition has just rolled a new segment debug(s"Satisfying fetch $this immediately since it is fetching older segments.") // We will not force complete the fetch request if a replica should be throttled. if (!params.isFromFollower || !replicaManager.shouldLeaderThrottle(quota, partition, params.replicaId)) return forceComplete() } else if (fetchOffset.onSameSegment(endOffset)) { // we take the partition fetch size as upper bound when accumulating the bytes (skip if a throttled partition) val bytesAvailable = math.min(endOffset.positionDiff(fetchOffset), fetchStatus.fetchInfo.maxBytes) if (!params.isFromFollower || !replicaManager.shouldLeaderThrottle(quota, partition, params.replicaId)) accumulatedSize += bytesAvailable } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
junrao commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1612025298 ## core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala: ## @@ -164,18 +169,71 @@ class DelayedFetchTest { assertTrue(delayedFetch.tryComplete()) assertTrue(delayedFetch.isCompleted) assertTrue(fetchResultOpt.isDefined) + +val fetchResult = fetchResultOpt.get +assertEquals(Errors.NONE, fetchResult.error) + } + + @ParameterizedTest(name = "testDelayedFetchWithMessageOnlyHighWatermark endOffset={0}") + @ValueSource(longs = Array(0, 500)) + def testDelayedFetchWithMessageOnlyHighWatermark(endOffset: Long): Unit = { +val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "topic") +val fetchOffset = 450L +val logStartOffset = 5L +val currentLeaderEpoch = Optional.of[Integer](10) +val replicaId = 1 + +val fetchStatus = FetchPartitionStatus( + startOffsetMetadata = new LogOffsetMetadata(fetchOffset), + fetchInfo = new FetchRequest.PartitionData(topicIdPartition.topicId, fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch)) +val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500) + +var fetchResultOpt: Option[FetchPartitionData] = None +def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = { + fetchResultOpt = Some(responses.head._2) +} + +val delayedFetch = new DelayedFetch( + params = fetchParams, + fetchPartitionStatus = Seq(topicIdPartition -> fetchStatus), + replicaManager = replicaManager, + quota = replicaQuota, + responseCallback = callback +) + +val partition: Partition = mock(classOf[Partition]) + when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition)).thenReturn(partition) +// Note that the high-watermark does not contain the complete metadata +val endOffsetMetadata = new LogOffsetMetadata(endOffset, -1L, -1) +when(partition.fetchOffsetSnapshot( + currentLeaderEpoch, + fetchOnlyFromLeader = true)) + .thenReturn(new LogOffsetSnapshot(0L, endOffsetMetadata, endOffsetMetadata, endOffsetMetadata)) +when(replicaManager.isAddingReplica(any(), anyInt())).thenReturn(false) +expectReadFromReplica(fetchParams, topicIdPartition, fetchStatus.fetchInfo, Errors.NONE) + +// 1. When `endOffset` is 0, it refers to the truncation case +// 2. When `endOffset` is 500, it refers to the normal case +val expected = endOffset == 0 Review Comment: Got it. We won't complete when `endOffset` is 500 because it doesn't contain offset metadata. But this is not the normal case. So, we want to adjust "it refers to the normal case" accordingly. ## core/src/main/scala/kafka/server/DelayedFetch.scala: ## @@ -91,19 +91,23 @@ class DelayedFetch( // Go directly to the check for Case G if the message offsets are the same. If the log segment // has just rolled, then the high watermark offset will remain the same but be on the old segment, // which would incorrectly be seen as an instance of Case F. -if (endOffset.messageOffset != fetchOffset.messageOffset) { - if (endOffset.onOlderSegment(fetchOffset)) { -// Case F, this can happen when the new fetch operation is on a truncated leader -debug(s"Satisfying fetch $this since it is fetching later segments of partition $topicIdPartition.") -return forceComplete() +if (fetchOffset.messageOffset > endOffset.messageOffset) { + // Case F, this can happen when the new fetch operation is on a truncated leader + debug(s"Satisfying fetch $this since it is fetching later segments of partition $topicIdPartition.") + return forceComplete() +} else if (fetchOffset.messageOffset < endOffset.messageOffset) { + if (fetchOffset.messageOffsetOnly() || endOffset.messageOffsetOnly()) { Review Comment: Yes, since `onOlderSegment()` and `onSameSegment()` do the `messageOffsetOnly()` check already. We could just get rid of this condition. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
showuon commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1611591188 ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogOffsetMetadata.java: ## @@ -51,23 +50,24 @@ public LogOffsetMetadata(long messageOffset, // check if this offset is already on an older segment compared with the given offset public boolean onOlderSegment(LogOffsetMetadata that) { -if (messageOffsetOnly()) -throw new KafkaException(this + " cannot compare its segment info with " + that + " since it only has message offset info"); - +if (messageOffsetOnly() || that.messageOffsetOnly()) +return false; return this.segmentBaseOffset < that.segmentBaseOffset; } // check if this offset is on the same segment with the given offset -private boolean onSameSegment(LogOffsetMetadata that) { +public boolean onSameSegment(LogOffsetMetadata that) { +if (messageOffsetOnly() || that.messageOffsetOnly()) +return false; return this.segmentBaseOffset == that.segmentBaseOffset; } // compute the number of bytes between this offset to the given offset // if they are on the same segment and this offset precedes the given offset public int positionDiff(LogOffsetMetadata that) { -if (messageOffsetOnly()) +if (messageOffsetOnly() || that.messageOffsetOnly()) throw new KafkaException(this + " cannot compare its segment position with " + that + " since it only has message offset info"); -if (!onSameSegment(that)) +if (this.segmentBaseOffset != that.segmentBaseOffset) Review Comment: Fair enough! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
kamalcph commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1611570051 ## core/src/main/scala/kafka/server/DelayedFetch.scala: ## @@ -91,19 +91,23 @@ class DelayedFetch( // Go directly to the check for Case G if the message offsets are the same. If the log segment // has just rolled, then the high watermark offset will remain the same but be on the old segment, // which would incorrectly be seen as an instance of Case F. -if (endOffset.messageOffset != fetchOffset.messageOffset) { - if (endOffset.onOlderSegment(fetchOffset)) { -// Case F, this can happen when the new fetch operation is on a truncated leader -debug(s"Satisfying fetch $this since it is fetching later segments of partition $topicIdPartition.") -return forceComplete() +if (fetchOffset.messageOffset > endOffset.messageOffset) { + // Case F, this can happen when the new fetch operation is on a truncated leader + debug(s"Satisfying fetch $this since it is fetching later segments of partition $topicIdPartition.") + return forceComplete() +} else if (fetchOffset.messageOffset < endOffset.messageOffset) { + if (fetchOffset.messageOffsetOnly() || endOffset.messageOffsetOnly()) { Review Comment: With the latest changes in LogOffsetMetadata, it seems we only have to add one check in [DelayedFetch.scala#106](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/DelayedFetch.scala#L106) compared to trunk, that would suffice: ``` else if (fetchOffset.onSameSegment(endOffset) && fetchOffset.messageOffset < endOffset.messageOffset) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
kamalcph commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1611559594 ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogOffsetMetadata.java: ## @@ -51,23 +50,24 @@ public LogOffsetMetadata(long messageOffset, // check if this offset is already on an older segment compared with the given offset public boolean onOlderSegment(LogOffsetMetadata that) { -if (messageOffsetOnly()) -throw new KafkaException(this + " cannot compare its segment info with " + that + " since it only has message offset info"); - +if (messageOffsetOnly() || that.messageOffsetOnly()) +return false; return this.segmentBaseOffset < that.segmentBaseOffset; } // check if this offset is on the same segment with the given offset -private boolean onSameSegment(LogOffsetMetadata that) { +public boolean onSameSegment(LogOffsetMetadata that) { +if (messageOffsetOnly() || that.messageOffsetOnly()) +return false; return this.segmentBaseOffset == that.segmentBaseOffset; } // compute the number of bytes between this offset to the given offset // if they are on the same segment and this offset precedes the given offset public int positionDiff(LogOffsetMetadata that) { -if (messageOffsetOnly()) +if (messageOffsetOnly() || that.messageOffsetOnly()) throw new KafkaException(this + " cannot compare its segment position with " + that + " since it only has message offset info"); -if (!onSameSegment(that)) +if (this.segmentBaseOffset != that.segmentBaseOffset) Review Comment: We already performed the `messageOffsetOnly` check, so tried to avoid it. (micro-optimization, reverted it as the cost is trivial) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
showuon commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1611046740 ## core/src/test/scala/unit/kafka/log/LogSegmentTest.scala: ## @@ -144,6 +144,35 @@ class LogSegmentTest { checkEquals(ms2.records.iterator, read.records.records.iterator) } + @Test + def testReadWhenNoMaxPosition(): Unit = { +val maxPosition: Optional[java.lang.Long] = Optional.empty() +val maxSize = 1 +val seg = createSegment(40) +val ms = records(50, "hello", "there") +seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms) +for (minOneMessage <- Array(true, false)) { Review Comment: Could we use `@ParameterizedTest` to test it? Otherwise, if the test fail, it's not clear which case causing the failure. ``` @ParameterizedTest @ValueSource(booleans = Array(true, false)) ``` ## core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala: ## @@ -4211,6 +4211,46 @@ class UnifiedLogTest { assertEquals(31, log.localLogStartOffset()) } + @Test + def testConvertToOffsetMetadataDoesNotThrowOffsetOutOfRangeError(): Unit = { +val logConfig = LogTestUtils.createLogConfig(localRetentionBytes = 1, fileDeleteDelayMs = 0, remoteLogStorageEnable = true) +val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true) + +var offset = 0L +for(_ <- 0 until 50) { + val records = TestUtils.singletonRecords("test".getBytes()) + val info = log.appendAsLeader(records, leaderEpoch = 0) + offset = info.lastOffset + if (offset != 0 && offset % 10 == 0) +log.roll() +} +assertEquals(5, log.logSegments.size) +log.updateHighWatermark(log.logEndOffset) +// simulate calls to upload 3 segments to remote storage +log.updateHighestOffsetInRemoteStorage(30) + +log.deleteOldSegments() +assertEquals(2, log.logSegments.size()) +assertEquals(0, log.logStartOffset) +assertEquals(31, log.localLogStartOffset()) + +log.updateLogStartOffsetFromRemoteTier(15) +assertEquals(15, log.logStartOffset) + +// case-1: offset is higher than the local-log-start-offset. +// log-start-offset < local-log-start-offset < offset-to-be-converted < log-end-offset +assertEquals(new LogOffsetMetadata(35, 31, 288), log.maybeConvertToOffsetMetadata(35)) +// case-2: offset is lesser than the local-log-start-offset Review Comment: less than ## core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala: ## @@ -4211,6 +4211,46 @@ class UnifiedLogTest { assertEquals(31, log.localLogStartOffset()) } + @Test + def testConvertToOffsetMetadataDoesNotThrowOffsetOutOfRangeError(): Unit = { +val logConfig = LogTestUtils.createLogConfig(localRetentionBytes = 1, fileDeleteDelayMs = 0, remoteLogStorageEnable = true) +val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true) + +var offset = 0L +for(_ <- 0 until 50) { + val records = TestUtils.singletonRecords("test".getBytes()) + val info = log.appendAsLeader(records, leaderEpoch = 0) + offset = info.lastOffset + if (offset != 0 && offset % 10 == 0) +log.roll() +} +assertEquals(5, log.logSegments.size) +log.updateHighWatermark(log.logEndOffset) +// simulate calls to upload 3 segments to remote storage +log.updateHighestOffsetInRemoteStorage(30) + +log.deleteOldSegments() +assertEquals(2, log.logSegments.size()) +assertEquals(0, log.logStartOffset) +assertEquals(31, log.localLogStartOffset()) + +log.updateLogStartOffsetFromRemoteTier(15) +assertEquals(15, log.logStartOffset) + +// case-1: offset is higher than the local-log-start-offset. +// log-start-offset < local-log-start-offset < offset-to-be-converted < log-end-offset +assertEquals(new LogOffsetMetadata(35, 31, 288), log.maybeConvertToOffsetMetadata(35)) +// case-2: offset is lesser than the local-log-start-offset +// log-start-offset < offset-to-be-converted < local-log-start-offset < log-end-offset +assertEquals(new LogOffsetMetadata(29, -1L, -1), log.maybeConvertToOffsetMetadata(29)) +// case-3: offset is higher than the log-end-offset +// log-start-offset < local-log-start-offset < log-end-offset < offset-to-be-converted +assertEquals(new LogOffsetMetadata(log.logEndOffset + 1, -1L, -1), log.maybeConvertToOffsetMetadata(log.logEndOffset + 1)) +// case-4: offset is lesser than the log-start-offset Review Comment: nit: lesser than -> less than ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogOffsetMetadata.java: ## @@ -51,23 +50,24 @@ public LogOffsetMetadata(long messageOffset, // check if this offset is already on an older segment compared with the given offset public boolean onOlderSegment(LogOffsetMetadata that) { -if (messageOffsetOnly()) -throw new KafkaException(this + " cannot compare its segment info
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
kamalcph commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1610893769 ## core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala: ## @@ -164,18 +169,71 @@ class DelayedFetchTest { assertTrue(delayedFetch.tryComplete()) assertTrue(delayedFetch.isCompleted) assertTrue(fetchResultOpt.isDefined) + +val fetchResult = fetchResultOpt.get +assertEquals(Errors.NONE, fetchResult.error) + } + + @ParameterizedTest(name = "testDelayedFetchWithMessageOnlyHighWatermark endOffset={0}") + @ValueSource(longs = Array(0, 500)) + def testDelayedFetchWithMessageOnlyHighWatermark(endOffset: Long): Unit = { +val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "topic") +val fetchOffset = 450L +val logStartOffset = 5L +val currentLeaderEpoch = Optional.of[Integer](10) +val replicaId = 1 + +val fetchStatus = FetchPartitionStatus( + startOffsetMetadata = new LogOffsetMetadata(fetchOffset), + fetchInfo = new FetchRequest.PartitionData(topicIdPartition.topicId, fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch)) +val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500) + +var fetchResultOpt: Option[FetchPartitionData] = None +def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = { + fetchResultOpt = Some(responses.head._2) +} + +val delayedFetch = new DelayedFetch( + params = fetchParams, + fetchPartitionStatus = Seq(topicIdPartition -> fetchStatus), + replicaManager = replicaManager, + quota = replicaQuota, + responseCallback = callback +) + +val partition: Partition = mock(classOf[Partition]) + when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition)).thenReturn(partition) +// Note that the high-watermark does not contain the complete metadata +val endOffsetMetadata = new LogOffsetMetadata(endOffset, -1L, -1) +when(partition.fetchOffsetSnapshot( + currentLeaderEpoch, + fetchOnlyFromLeader = true)) + .thenReturn(new LogOffsetSnapshot(0L, endOffsetMetadata, endOffsetMetadata, endOffsetMetadata)) +when(replicaManager.isAddingReplica(any(), anyInt())).thenReturn(false) +expectReadFromReplica(fetchParams, topicIdPartition, fetchStatus.fetchInfo, Errors.NONE) + +// 1. When `endOffset` is 0, it refers to the truncation case +// 2. When `endOffset` is 500, it refers to the normal case +val expected = endOffset == 0 Review Comment: when fetchOffset > endOffset, `forceComplete` gets called. In this case. 450 > 0, so it triggers force completion. This is the only behavior change post the refactor. Previously when fetchOffset > endOffset: 1. If the offsets lie on the same segment, then we wait for the end-offset to move (or) timeout the request. 2. If the endOffset lie on the older segment compared to fetch-offset, then we complete the request by calling force-complete. We can retain the same behavior if required. ``` if (fetchOffset.messageOffset > endOffset.messageOffset) { if (endOffset.onOlderSegment(fetchOffset)) { // Case F, this can happen when the new fetch operation is on a truncated leader debug(s"Satisfying fetch $this since it is fetching later segments of partition $topicIdPartition.") return forceComplete() } } else if (fetchOffset.messageOffset < endOffset.messageOffset) { ... ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
kamalcph commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1610893769 ## core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala: ## @@ -164,18 +169,71 @@ class DelayedFetchTest { assertTrue(delayedFetch.tryComplete()) assertTrue(delayedFetch.isCompleted) assertTrue(fetchResultOpt.isDefined) + +val fetchResult = fetchResultOpt.get +assertEquals(Errors.NONE, fetchResult.error) + } + + @ParameterizedTest(name = "testDelayedFetchWithMessageOnlyHighWatermark endOffset={0}") + @ValueSource(longs = Array(0, 500)) + def testDelayedFetchWithMessageOnlyHighWatermark(endOffset: Long): Unit = { +val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "topic") +val fetchOffset = 450L +val logStartOffset = 5L +val currentLeaderEpoch = Optional.of[Integer](10) +val replicaId = 1 + +val fetchStatus = FetchPartitionStatus( + startOffsetMetadata = new LogOffsetMetadata(fetchOffset), + fetchInfo = new FetchRequest.PartitionData(topicIdPartition.topicId, fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch)) +val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500) + +var fetchResultOpt: Option[FetchPartitionData] = None +def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = { + fetchResultOpt = Some(responses.head._2) +} + +val delayedFetch = new DelayedFetch( + params = fetchParams, + fetchPartitionStatus = Seq(topicIdPartition -> fetchStatus), + replicaManager = replicaManager, + quota = replicaQuota, + responseCallback = callback +) + +val partition: Partition = mock(classOf[Partition]) + when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition)).thenReturn(partition) +// Note that the high-watermark does not contain the complete metadata +val endOffsetMetadata = new LogOffsetMetadata(endOffset, -1L, -1) +when(partition.fetchOffsetSnapshot( + currentLeaderEpoch, + fetchOnlyFromLeader = true)) + .thenReturn(new LogOffsetSnapshot(0L, endOffsetMetadata, endOffsetMetadata, endOffsetMetadata)) +when(replicaManager.isAddingReplica(any(), anyInt())).thenReturn(false) +expectReadFromReplica(fetchParams, topicIdPartition, fetchStatus.fetchInfo, Errors.NONE) + +// 1. When `endOffset` is 0, it refers to the truncation case +// 2. When `endOffset` is 500, it refers to the normal case +val expected = endOffset == 0 Review Comment: when fetchOffset > endOffset, `forceComplete` gets called. In this case. 450 > 0, so it triggers force completion. This is the only behavior change post the refactor. Previously when fetchOffset > endOffset: 1. If the offsets lie on the same segment, then we wait for the end-offset to move (or) timeout the request. 2. If they lies on the different segment, then we complete the request by calling force-complete. We can retain the same behavior if required. ``` if (fetchOffset.messageOffset > endOffset.messageOffset) { if (endOffset.onOlderSegment(fetchOffset)) { // Case F, this can happen when the new fetch operation is on a truncated leader debug(s"Satisfying fetch $this since it is fetching later segments of partition $topicIdPartition.") return forceComplete() } } else if (fetchOffset.messageOffset < endOffset.messageOffset) { ... ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
kamalcph commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1610895466 ## core/src/main/scala/kafka/server/DelayedFetch.scala: ## @@ -91,19 +91,23 @@ class DelayedFetch( // Go directly to the check for Case G if the message offsets are the same. If the log segment // has just rolled, then the high watermark offset will remain the same but be on the old segment, // which would incorrectly be seen as an instance of Case F. -if (endOffset.messageOffset != fetchOffset.messageOffset) { - if (endOffset.onOlderSegment(fetchOffset)) { -// Case F, this can happen when the new fetch operation is on a truncated leader -debug(s"Satisfying fetch $this since it is fetching later segments of partition $topicIdPartition.") -return forceComplete() +if (fetchOffset.messageOffset > endOffset.messageOffset) { + // Case F, this can happen when the new fetch operation is on a truncated leader + debug(s"Satisfying fetch $this since it is fetching later segments of partition $topicIdPartition.") + return forceComplete() +} else if (fetchOffset.messageOffset < endOffset.messageOffset) { + if (fetchOffset.messageOffsetOnly() || endOffset.messageOffsetOnly()) { Review Comment: Not clear on this one. The current `if` checks are easy to read, we can add one debug log to avoid the empty `if` block. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
kamalcph commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1610893769 ## core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala: ## @@ -164,18 +169,71 @@ class DelayedFetchTest { assertTrue(delayedFetch.tryComplete()) assertTrue(delayedFetch.isCompleted) assertTrue(fetchResultOpt.isDefined) + +val fetchResult = fetchResultOpt.get +assertEquals(Errors.NONE, fetchResult.error) + } + + @ParameterizedTest(name = "testDelayedFetchWithMessageOnlyHighWatermark endOffset={0}") + @ValueSource(longs = Array(0, 500)) + def testDelayedFetchWithMessageOnlyHighWatermark(endOffset: Long): Unit = { +val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "topic") +val fetchOffset = 450L +val logStartOffset = 5L +val currentLeaderEpoch = Optional.of[Integer](10) +val replicaId = 1 + +val fetchStatus = FetchPartitionStatus( + startOffsetMetadata = new LogOffsetMetadata(fetchOffset), + fetchInfo = new FetchRequest.PartitionData(topicIdPartition.topicId, fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch)) +val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500) + +var fetchResultOpt: Option[FetchPartitionData] = None +def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = { + fetchResultOpt = Some(responses.head._2) +} + +val delayedFetch = new DelayedFetch( + params = fetchParams, + fetchPartitionStatus = Seq(topicIdPartition -> fetchStatus), + replicaManager = replicaManager, + quota = replicaQuota, + responseCallback = callback +) + +val partition: Partition = mock(classOf[Partition]) + when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition)).thenReturn(partition) +// Note that the high-watermark does not contain the complete metadata +val endOffsetMetadata = new LogOffsetMetadata(endOffset, -1L, -1) +when(partition.fetchOffsetSnapshot( + currentLeaderEpoch, + fetchOnlyFromLeader = true)) + .thenReturn(new LogOffsetSnapshot(0L, endOffsetMetadata, endOffsetMetadata, endOffsetMetadata)) +when(replicaManager.isAddingReplica(any(), anyInt())).thenReturn(false) +expectReadFromReplica(fetchParams, topicIdPartition, fetchStatus.fetchInfo, Errors.NONE) + +// 1. When `endOffset` is 0, it refers to the truncation case +// 2. When `endOffset` is 500, it refers to the normal case +val expected = endOffset == 0 Review Comment: when fetchOffset > endOffset, `forceComplete` gets called. In this case. 450 > 0, so it triggers force completion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
junrao commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1610572095 ## core/src/main/scala/kafka/server/DelayedFetch.scala: ## @@ -91,19 +91,23 @@ class DelayedFetch( // Go directly to the check for Case G if the message offsets are the same. If the log segment // has just rolled, then the high watermark offset will remain the same but be on the old segment, // which would incorrectly be seen as an instance of Case F. -if (endOffset.messageOffset != fetchOffset.messageOffset) { - if (endOffset.onOlderSegment(fetchOffset)) { -// Case F, this can happen when the new fetch operation is on a truncated leader -debug(s"Satisfying fetch $this since it is fetching later segments of partition $topicIdPartition.") -return forceComplete() +if (fetchOffset.messageOffset > endOffset.messageOffset) { + // Case F, this can happen when the new fetch operation is on a truncated leader + debug(s"Satisfying fetch $this since it is fetching later segments of partition $topicIdPartition.") + return forceComplete() +} else if (fetchOffset.messageOffset < endOffset.messageOffset) { + if (fetchOffset.messageOffsetOnly() || endOffset.messageOffsetOnly()) { Review Comment: Could we fold this into the condition above? ## core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala: ## @@ -164,18 +169,71 @@ class DelayedFetchTest { assertTrue(delayedFetch.tryComplete()) assertTrue(delayedFetch.isCompleted) assertTrue(fetchResultOpt.isDefined) + +val fetchResult = fetchResultOpt.get +assertEquals(Errors.NONE, fetchResult.error) + } + + @ParameterizedTest(name = "testDelayedFetchWithMessageOnlyHighWatermark endOffset={0}") + @ValueSource(longs = Array(0, 500)) + def testDelayedFetchWithMessageOnlyHighWatermark(endOffset: Long): Unit = { +val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "topic") +val fetchOffset = 450L +val logStartOffset = 5L +val currentLeaderEpoch = Optional.of[Integer](10) +val replicaId = 1 + +val fetchStatus = FetchPartitionStatus( + startOffsetMetadata = new LogOffsetMetadata(fetchOffset), + fetchInfo = new FetchRequest.PartitionData(topicIdPartition.topicId, fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch)) +val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500) + +var fetchResultOpt: Option[FetchPartitionData] = None +def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = { + fetchResultOpt = Some(responses.head._2) +} + +val delayedFetch = new DelayedFetch( + params = fetchParams, + fetchPartitionStatus = Seq(topicIdPartition -> fetchStatus), + replicaManager = replicaManager, + quota = replicaQuota, + responseCallback = callback +) + +val partition: Partition = mock(classOf[Partition]) + when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition)).thenReturn(partition) +// Note that the high-watermark does not contain the complete metadata +val endOffsetMetadata = new LogOffsetMetadata(endOffset, -1L, -1) +when(partition.fetchOffsetSnapshot( + currentLeaderEpoch, + fetchOnlyFromLeader = true)) + .thenReturn(new LogOffsetSnapshot(0L, endOffsetMetadata, endOffsetMetadata, endOffsetMetadata)) +when(replicaManager.isAddingReplica(any(), anyInt())).thenReturn(false) +expectReadFromReplica(fetchParams, topicIdPartition, fetchStatus.fetchInfo, Errors.NONE) + +// 1. When `endOffset` is 0, it refers to the truncation case +// 2. When `endOffset` is 500, it refers to the normal case +val expected = endOffset == 0 Review Comment: Hmm, in both cases, we are not forcing the completion of the delayed fetch, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
junrao commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1610452888 ## core/src/main/scala/kafka/server/DelayedFetch.scala: ## @@ -91,19 +91,24 @@ class DelayedFetch( // Go directly to the check for Case G if the message offsets are the same. If the log segment // has just rolled, then the high watermark offset will remain the same but be on the old segment, // which would incorrectly be seen as an instance of Case F. -if (endOffset.messageOffset != fetchOffset.messageOffset) { - if (endOffset.onOlderSegment(fetchOffset)) { -// Case F, this can happen when the new fetch operation is on a truncated leader -debug(s"Satisfying fetch $this since it is fetching later segments of partition $topicIdPartition.") -return forceComplete() +if (fetchOffset.messageOffset > endOffset.messageOffset) { + // Case F, this can happen when the new fetch operation is on a truncated leader + debug(s"Satisfying fetch $this since it is fetching later segments of partition $topicIdPartition.") + return forceComplete() +} else if (fetchOffset.messageOffset < endOffset.messageOffset) { + if (endOffset.messageOffsetOnly() || fetchOffset.messageOffsetOnly()) { +// If we don't know the position of the offset on log segments, just pessimistically assume that we +// only gained 1 byte when fetchOffset < endOffset, otherwise do nothing. This can happen when the +// high-watermark is stale, but should be rare. +accumulatedSize += 1 Review Comment: Yes, in that case, we just wait until endOffset moves to within the local log segments. If the timeout hits, we will just return empty. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
kamalcph commented on PR #15825: URL: https://github.com/apache/kafka/pull/15825#issuecomment-2125471280 @junrao Thanks for the review! Addressed all your comments. PTAL. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
kamalcph commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1610404165 ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java: ## @@ -445,7 +445,7 @@ public FetchDataInfo read(long startOffset, int maxSize, long maxPosition, boole adjustedMaxSize = Math.max(maxSize, startOffsetAndSize.size); // return a log segment but with zero size in the case below -if (adjustedMaxSize == 0) +if (adjustedMaxSize == 0 || maxPosition == -1) Review Comment: LogSegment is in Java and using Optional as a method parameter in LogSegment#read shows warnings in the intelliJ and require some refactoring (passing `int` for `long` parameter gets converted implicitly but not with optional), so went with the negative value approach: ``` 'Optional' used as type for parameter 'maxPosition' ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
kamalcph commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1610404165 ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java: ## @@ -445,7 +445,7 @@ public FetchDataInfo read(long startOffset, int maxSize, long maxPosition, boole adjustedMaxSize = Math.max(maxSize, startOffsetAndSize.size); // return a log segment but with zero size in the case below -if (adjustedMaxSize == 0) +if (adjustedMaxSize == 0 || maxPosition == -1) Review Comment: LogSegment is in Java and using Optional as a method parameter in LogSegment#read shows warnings in the intelliJ and require some refactoring (passing `int` for `long` parameter gets converted implicitly not with optional), so went with the negative value approach: ``` 'Optional' used as type for parameter 'maxPosition' ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
kamalcph commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1610404165 ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java: ## @@ -445,7 +445,7 @@ public FetchDataInfo read(long startOffset, int maxSize, long maxPosition, boole adjustedMaxSize = Math.max(maxSize, startOffsetAndSize.size); // return a log segment but with zero size in the case below -if (adjustedMaxSize == 0) +if (adjustedMaxSize == 0 || maxPosition == -1) Review Comment: LogSegment is in Java and using Optional as a method parameter in LogSegment#read shows warnings in the intelliJ and require some refactoring, so went with the negative value approach: ``` 'Optional' used as type for parameter 'maxPosition' ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
kamalcph commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1610401875 ## core/src/main/scala/kafka/server/DelayedFetch.scala: ## @@ -91,19 +91,24 @@ class DelayedFetch( // Go directly to the check for Case G if the message offsets are the same. If the log segment // has just rolled, then the high watermark offset will remain the same but be on the old segment, // which would incorrectly be seen as an instance of Case F. -if (endOffset.messageOffset != fetchOffset.messageOffset) { - if (endOffset.onOlderSegment(fetchOffset)) { -// Case F, this can happen when the new fetch operation is on a truncated leader -debug(s"Satisfying fetch $this since it is fetching later segments of partition $topicIdPartition.") -return forceComplete() +if (fetchOffset.messageOffset > endOffset.messageOffset) { + // Case F, this can happen when the new fetch operation is on a truncated leader + debug(s"Satisfying fetch $this since it is fetching later segments of partition $topicIdPartition.") + return forceComplete() +} else if (fetchOffset.messageOffset < endOffset.messageOffset) { + if (endOffset.messageOffsetOnly() || fetchOffset.messageOffsetOnly()) { +// If we don't know the position of the offset on log segments, just pessimistically assume that we +// only gained 1 byte when fetchOffset < endOffset, otherwise do nothing. This can happen when the +// high-watermark is stale, but should be rare. +accumulatedSize += 1 Review Comment: To confirm, Should we avoid accumulating the 1 byte when fetchOffset < endOffset? The FETCH request will be parked in the purgatory for 500 ms, I don't see any issues with it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
kamalcph commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1610399596 ## core/src/test/scala/unit/kafka/log/LogSegmentTest.scala: ## @@ -143,6 +143,39 @@ class LogSegmentTest { checkEquals(ms2.records.iterator, read.records.records.iterator) } + @Test + def testReadWhenNoMaxPosition(): Unit = { +val maxPosition = -1 +val maxSize = 1 +val seg = createSegment(40) +val ms = records(50, "hello", "there") +seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms) +for (minOneMessage <- Array(true, false)) { + // read before first offset + var read = seg.read(48, maxSize, maxPosition, minOneMessage) Review Comment: yes, when maxSize is negative (-1), then the following error will be thrown: ``` java.lang.IllegalArgumentException: Invalid max size -1 for log read from segment FileRecords(size=78, file=/var/folders/bq/w6tnkbq964q8sqpvj4fbmjr4gq/T/kafka-18165220027855018994/0040.log, start=0, end=2147483647) at org.apache.kafka.storage.internals.log.LogSegment.read(LogSegment.java:432) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
kamalcph commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1610399596 ## core/src/test/scala/unit/kafka/log/LogSegmentTest.scala: ## @@ -143,6 +143,39 @@ class LogSegmentTest { checkEquals(ms2.records.iterator, read.records.records.iterator) } + @Test + def testReadWhenNoMaxPosition(): Unit = { +val maxPosition = -1 +val maxSize = 1 +val seg = createSegment(40) +val ms = records(50, "hello", "there") +seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms) +for (minOneMessage <- Array(true, false)) { + // read before first offset + var read = seg.read(48, maxSize, maxPosition, minOneMessage) Review Comment: yes, when maxSize is 1, the following error will be thrown: ``` java.lang.IllegalArgumentException: Invalid max size -1 for log read from segment FileRecords(size=78, file=/var/folders/bq/w6tnkbq964q8sqpvj4fbmjr4gq/T/kafka-18165220027855018994/0040.log, start=0, end=2147483647) at org.apache.kafka.storage.internals.log.LogSegment.read(LogSegment.java:432) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
junrao commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1610310799 ## core/src/test/scala/unit/kafka/log/LogSegmentTest.scala: ## @@ -143,6 +143,39 @@ class LogSegmentTest { checkEquals(ms2.records.iterator, read.records.records.iterator) } + @Test + def testReadWhenNoMaxPosition(): Unit = { +val maxPosition = -1 +val maxSize = 1 +val seg = createSegment(40) +val ms = records(50, "hello", "there") +seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms) +for (minOneMessage <- Array(true, false)) { + // read before first offset + var read = seg.read(48, maxSize, maxPosition, minOneMessage) Review Comment: Hmm, maxSize=-1 is an invalid input, right? ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Given a message offset, find its corresponding offset metadata in the log. -* If the message offset is out of range, throw an OffsetOutOfRangeException +* 1. If the message offset is lesser than the log-start-offset, then throw an OffsetOutOfRangeException +* 2. If the message offset is lesser than the local-log-start-offset, then it returns the message-only metadata +* 3. If the message offset is greater than the log-end-offset, then it returns the message-only metadata */ - private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = { + private[log] def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = { Review Comment: Thanks for the explanation. Yes, it makes sense to me now. ## core/src/test/scala/unit/kafka/log/LogSegmentTest.scala: ## @@ -143,6 +143,39 @@ class LogSegmentTest { checkEquals(ms2.records.iterator, read.records.records.iterator) } + @Test + def testReadWhenNoMaxPosition(): Unit = { +val maxPosition = -1 +val maxSize = 1 +val seg = createSegment(40) +val ms = records(50, "hello", "there") +seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms) +for (minOneMessage <- Array(true, false)) { + // read before first offset + var read = seg.read(48, maxSize, maxPosition, minOneMessage) + assertEquals(new LogOffsetMetadata(48, 40, 0), read.fetchOffsetMetadata) + assertTrue(read.records.records().iterator().asScala.isEmpty) + + // read at first offset + read = seg.read(50, maxSize, maxPosition, minOneMessage) + assertEquals(new LogOffsetMetadata(50, 40, 0), read.fetchOffsetMetadata) + assertTrue(read.records.records().iterator().asScala.isEmpty) + + // read beyond first offset + read = seg.read(51, maxSize, maxPosition, minOneMessage) + assertEquals(new LogOffsetMetadata(51, 40, 39), read.fetchOffsetMetadata) + assertTrue(read.records.records().iterator().asScala.isEmpty) + + // read at last offset Review Comment: Hmm, 51 is the last offset, right? ## core/src/main/scala/kafka/server/DelayedFetch.scala: ## @@ -91,19 +91,24 @@ class DelayedFetch( // Go directly to the check for Case G if the message offsets are the same. If the log segment // has just rolled, then the high watermark offset will remain the same but be on the old segment, // which would incorrectly be seen as an instance of Case F. -if (endOffset.messageOffset != fetchOffset.messageOffset) { - if (endOffset.onOlderSegment(fetchOffset)) { -// Case F, this can happen when the new fetch operation is on a truncated leader -debug(s"Satisfying fetch $this since it is fetching later segments of partition $topicIdPartition.") -return forceComplete() +if (fetchOffset.messageOffset > endOffset.messageOffset) { + // Case F, this can happen when the new fetch operation is on a truncated leader + debug(s"Satisfying fetch $this since it is fetching later segments of partition $topicIdPartition.") + return forceComplete() +} else if (fetchOffset.messageOffset < endOffset.messageOffset) { + if (endOffset.messageOffsetOnly() || fetchOffset.messageOffsetOnly()) { +// If we don't know the position of the offset on log segments, just pessimistically assume that we +// only gained 1 byte when fetchOffset < endOffset, otherwise do nothing. This can happen when the +// high-watermark is stale, but should be rare. +accumulatedSize += 1 Review Comment: If endOffset or fetchOffset is message only, we return empty records when reading from the log. So, to be consistent, we want to avoid accumulating new bytes in this case. ## core/src/main/scala/kafka/log/LocalLog.scala: ## @@ -370,11 +370,11 @@ class LocalLog(@volatile private var _dir: File, throw new Offs
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
kamalcph commented on PR #15825: URL: https://github.com/apache/kafka/pull/15825#issuecomment-2123824613 @junrao @showuon Thanks for the review! Addressed all the review comments. PTAL. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
kamalcph commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1609227442 ## core/src/main/scala/kafka/log/LocalLog.scala: ## @@ -370,11 +370,17 @@ class LocalLog(@volatile private var _dir: File, throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, " + s"but we only have log segments upto $endOffset.") - if (startOffset == maxOffsetMetadata.messageOffset) + if (startOffset == maxOffsetMetadata.messageOffset) { emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns) - else if (startOffset > maxOffsetMetadata.messageOffset) -emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), includeAbortedTxns) - else { + } else if (startOffset > maxOffsetMetadata.messageOffset || +maxOffsetMetadata.messageOffsetOnly() || +maxOffsetMetadata.segmentBaseOffset < segmentOpt.get().baseOffset()) { +// We need to be careful before reading the segment as `maxOffsetMetadata` may not be a complete metadata: +// 1. If maxOffsetMetadata is message-offset-only, then return empty fetchDataInfo since +// maxOffsetMetadata.offset is not on local log segments. +// 2. If maxOffsetMetadata.segmentBaseOffset is smaller than segment.baseOffset, then return empty fetchDataInfo. Review Comment: ack, kept the same behavior. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
kamalcph commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1609227180 ## core/src/main/scala/kafka/log/LocalLog.scala: ## @@ -370,11 +370,17 @@ class LocalLog(@volatile private var _dir: File, throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, " + s"but we only have log segments upto $endOffset.") - if (startOffset == maxOffsetMetadata.messageOffset) + if (startOffset == maxOffsetMetadata.messageOffset) { emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns) - else if (startOffset > maxOffsetMetadata.messageOffset) + } else if (startOffset > maxOffsetMetadata.messageOffset || +maxOffsetMetadata.messageOffsetOnly() || +maxOffsetMetadata.segmentBaseOffset < segmentOpt.get.baseOffset) { Review Comment: Addressed with the latest commit 77f90f63408ebe5b3da5c3305ad2affb7901eff4 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
showuon commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1609081346 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Given a message offset, find its corresponding offset metadata in the log. -* If the message offset is out of range, throw an OffsetOutOfRangeException +* 1. If the message offset is lesser than the log-start-offset, then throw an OffsetOutOfRangeException +* 2. If the message offset is lesser than the local-log-start-offset, then it returns the message-only metadata +* 3. If the message offset is greater than the log-end-offset, then it returns the message-only metadata */ - private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = { + private[log] def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = { Review Comment: > However, if nextOffsetMetadata doesn't change and somehow startOffset > maxOffsetMetadata.messageOffset, then we could loop forever. I still don't know why the infinite loop will happen. In `Locallog#read`, we'll check if `startOffset > nextOffsetMetadata.messageOffset`, if so, `OffsetOutOfRangeException` will be thrown. ``` def read(startOffset: Long, maxLength: Int, minOneMessage: Boolean, maxOffsetMetadata: LogOffsetMetadata, includeAbortedTxns: Boolean): FetchDataInfo = { // [Luke] We will check if startOffset > nextOffsetMetadata.messageOffset (endOffset), // so if the provided `maxOffsetMetadata == nextOffsetMetadata`, // the `OffsetOutOfRangeException` should be thrown before entering another `convertToOffsetMetadataOrThrow` loop val endOffsetMetadata = nextOffsetMetadata val endOffset = endOffsetMetadata.messageOffset var segmentOpt = segments.floorSegment(startOffset) // return error on attempt to read beyond the log end offset if (startOffset > endOffset || !segmentOpt.isPresent) throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, but we only have log segments upto $endOffset.") if (startOffset == maxOffsetMetadata.messageOffset) emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns) // [Luke] So for the first time, the provided `maxOffsetMetadata` might be highWaterMark or LastStableOffset, // it could enter this `else if` condition because HWM/LSO should is always less than LEO. But the 2nd time won't enter here. else if (startOffset > maxOffsetMetadata.messageOffset) emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), includeAbortedTxns) ``` So there should be no chance that we do `Locallog#read` with these `maxOffsetMetadata = nextOffsetMetadata` provided entering another `convertToOffsetMetadataOrThrow` loop, because this time, `if (startOffset > maxOffsetMetadata.messageOffset)` == `if (startOffset > endOffset)` If this did happen, it means the `logEndOffset < highWaterMarkOffset` or `logEndOffset < lastStableOffseet`, which I don't think this would happen. Does that make sense? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
showuon commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1609081346 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Given a message offset, find its corresponding offset metadata in the log. -* If the message offset is out of range, throw an OffsetOutOfRangeException +* 1. If the message offset is lesser than the log-start-offset, then throw an OffsetOutOfRangeException +* 2. If the message offset is lesser than the local-log-start-offset, then it returns the message-only metadata +* 3. If the message offset is greater than the log-end-offset, then it returns the message-only metadata */ - private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = { + private[log] def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = { Review Comment: > However, if nextOffsetMetadata doesn't change and somehow startOffset > maxOffsetMetadata.messageOffset, then we could loop forever. I still don't know why the infinite loop will happen. In `Locallog#read`, we'll check if `startOffset > nextOffsetMetadata.messageOffset`, if so, `OffsetOutOfRangeException` will be thrown. ``` def read(startOffset: Long, maxLength: Int, minOneMessage: Boolean, maxOffsetMetadata: LogOffsetMetadata, includeAbortedTxns: Boolean): FetchDataInfo = { // [Luke] We will check if startOffset > nextOffsetMetadata.messageOffset (endOffset), // so if the provided `maxOffsetMetadata == nextOffsetMetadata`, // the `OffsetOutOfRangeException` should be thrown before entering another `convertToOffsetMetadataOrThrow` loop val endOffsetMetadata = nextOffsetMetadata val endOffset = endOffsetMetadata.messageOffset var segmentOpt = segments.floorSegment(startOffset) // return error on attempt to read beyond the log end offset if (startOffset > endOffset || !segmentOpt.isPresent) throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, but we only have log segments upto $endOffset.") if (startOffset == maxOffsetMetadata.messageOffset) emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns) // [Luke] So for the first time, the provided `maxOffsetMetadata` might be highWaterMark or LastStableOffset, // it could enter this `else if` condition because HWM/LSO should is always less than LEO. But the 2nd time won't enter here. else if (startOffset > maxOffsetMetadata.messageOffset) emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), includeAbortedTxns) ``` So there should be no chance that we do `Locallog#read` with these `maxOffsetMetadata = nextOffsetMetadata` provided entering another `convertToOffsetMetadataOrThrow` loop, because `if (startOffset > maxOffsetMetadata.messageOffset)` == `if (startOffset > endOffset)` If this did happen, it means the `logEndOffset < highWaterMarkOffset` or `logEndOffset < lastStableOffseet`, which I don't think this would happen. Does that make sense? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
showuon commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1609081346 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Given a message offset, find its corresponding offset metadata in the log. -* If the message offset is out of range, throw an OffsetOutOfRangeException +* 1. If the message offset is lesser than the log-start-offset, then throw an OffsetOutOfRangeException +* 2. If the message offset is lesser than the local-log-start-offset, then it returns the message-only metadata +* 3. If the message offset is greater than the log-end-offset, then it returns the message-only metadata */ - private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = { + private[log] def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = { Review Comment: > However, if nextOffsetMetadata doesn't change and somehow startOffset > maxOffsetMetadata.messageOffset, then we could loop forever. I still don't know why the infinite loop will happen. In `Locallog#read`, we'll check if `startOffset > nextOffsetMetadata.messageOffset`, if so, `OffsetOutOfRangeException` will be thrown. ``` def read(startOffset: Long, maxLength: Int, minOneMessage: Boolean, maxOffsetMetadata: LogOffsetMetadata, includeAbortedTxns: Boolean): FetchDataInfo = { // [Luke] We will check if startOffset > nextOffsetMetadata.messageOffset (endOffset), // so if the provided `maxOffsetMetadata == nextOffsetMetadata`, // the `OffsetOutOfRangeException` should be thrown before entering another `convertToOffsetMetadataOrThrow` loop val endOffsetMetadata = nextOffsetMetadata val endOffset = endOffsetMetadata.messageOffset var segmentOpt = segments.floorSegment(startOffset) // return error on attempt to read beyond the log end offset if (startOffset > endOffset || !segmentOpt.isPresent) throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, but we only have log segments upto $endOffset.") if (startOffset == maxOffsetMetadata.messageOffset) emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns) // [Luke] So for the first time, the provided `maxOffsetMetadata` might be highWaterMark or LastStableOffset, // it could enter this `else if` condition because HWM/LSO should is always less than LEO. But the 2nd time won't enter here. else if (startOffset > maxOffsetMetadata.messageOffset) emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), includeAbortedTxns) ``` So there should be no chance that we do `Locallog#read` with these `maxOffsetMetadata = nextOffsetMetadata` provided entering another `convertToOffsetMetadataOrThrow` loop. If this did happen, it means the `logEndOffset < highWaterMarkOffset` or `logEndOffset < lastStableOffseet`, which I don't think this would happen. Does that make sense? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
showuon commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1609081346 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Given a message offset, find its corresponding offset metadata in the log. -* If the message offset is out of range, throw an OffsetOutOfRangeException +* 1. If the message offset is lesser than the log-start-offset, then throw an OffsetOutOfRangeException +* 2. If the message offset is lesser than the local-log-start-offset, then it returns the message-only metadata +* 3. If the message offset is greater than the log-end-offset, then it returns the message-only metadata */ - private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = { + private[log] def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = { Review Comment: > However, if nextOffsetMetadata doesn't change and somehow startOffset > maxOffsetMetadata.messageOffset, then we could loop forever. I still don't know why the infinite loop will happen. In `Locallog#read`, we'll check if `startOffset > nextOffsetMetadata.messageOffset`, if so, `OffsetOutOfRangeException` will be thrown. ``` def read(startOffset: Long, maxLength: Int, minOneMessage: Boolean, maxOffsetMetadata: LogOffsetMetadata, includeAbortedTxns: Boolean): FetchDataInfo = { // [Luke] We will check if startOffset > nextOffsetMetadata.messageOffset, // so if the provided `maxOffsetMetadata == nextOffsetMetadata`, // the `OffsetOutOfRangeException` should be thrown before entering another `convertToOffsetMetadataOrThrow` loop val endOffsetMetadata = nextOffsetMetadata val endOffset = endOffsetMetadata.messageOffset var segmentOpt = segments.floorSegment(startOffset) // return error on attempt to read beyond the log end offset if (startOffset > endOffset || !segmentOpt.isPresent) throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, but we only have log segments upto $endOffset.") if (startOffset == maxOffsetMetadata.messageOffset) emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns) // [Luke] So for the first time, the provided `maxOffsetMetadata` might be highWaterMark or LastStableOffset, // it could enter this `else if` condition because HWM/LSO should is always less than LEO. But the 2nd time won't enter here. else if (startOffset > maxOffsetMetadata.messageOffset) emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), includeAbortedTxns) ``` So there should be no chance that we do `Locallog#read` with these `maxOffsetMetadata = nextOffsetMetadata` provided entering another `convertToOffsetMetadataOrThrow` loop. If this did happen, it means the `logEndOffset < highWaterMarkOffset` or `logEndOffset < lastStableOffseet`, which I don't think this would happen. Does that make sense? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
showuon commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1609081346 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Given a message offset, find its corresponding offset metadata in the log. -* If the message offset is out of range, throw an OffsetOutOfRangeException +* 1. If the message offset is lesser than the log-start-offset, then throw an OffsetOutOfRangeException +* 2. If the message offset is lesser than the local-log-start-offset, then it returns the message-only metadata +* 3. If the message offset is greater than the log-end-offset, then it returns the message-only metadata */ - private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = { + private[log] def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = { Review Comment: > However, if nextOffsetMetadata doesn't change and somehow startOffset > maxOffsetMetadata.messageOffset, then we could loop forever. I still don't know why the infinite loop will happen. In `Locallog#read`, we'll check if `startOffset > nextOffsetMetadata.messageOffset`, if so, `OffsetOutOfRangeException` will be thrown. ``` def read(startOffset: Long, maxLength: Int, minOneMessage: Boolean, maxOffsetMetadata: LogOffsetMetadata, includeAbortedTxns: Boolean): FetchDataInfo = { // [Luke] We will check if startOffset > nextOffsetMetadata.messageOffset, so if the provided `maxOffsetMetadata == nextOffsetMetadata`, the `OffsetOutOfRangeException` should be thrown before entering another `convertToOffsetMetadataOrThrow` loop val endOffsetMetadata = nextOffsetMetadata val endOffset = endOffsetMetadata.messageOffset var segmentOpt = segments.floorSegment(startOffset) // return error on attempt to read beyond the log end offset if (startOffset > endOffset || !segmentOpt.isPresent) throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, but we only have log segments upto $endOffset.") if (startOffset == maxOffsetMetadata.messageOffset) emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns) // [Luke] So for the first time, the provided `maxOffsetMetadata` might be highWaterMark or LastStableOffset, it could enter this `else if` condition because HWM/LSO should is always less than LEO. But the 2nd time won't enter here. else if (startOffset > maxOffsetMetadata.messageOffset) emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), includeAbortedTxns) ``` So there should be no chance that we do `Locallog#read` with these `maxOffsetMetadata = nextOffsetMetadata` provided entering another `convertToOffsetMetadataOrThrow` loop. If this did happen, it means the `logEndOffset < highWaterMarkOffset` or `logEndOffset < lastStableOffseet`, which I don't think this would happen. Does that make sense? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
showuon commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1609081346 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Given a message offset, find its corresponding offset metadata in the log. -* If the message offset is out of range, throw an OffsetOutOfRangeException +* 1. If the message offset is lesser than the log-start-offset, then throw an OffsetOutOfRangeException +* 2. If the message offset is lesser than the local-log-start-offset, then it returns the message-only metadata +* 3. If the message offset is greater than the log-end-offset, then it returns the message-only metadata */ - private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = { + private[log] def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = { Review Comment: > However, if nextOffsetMetadata doesn't change and somehow startOffset > maxOffsetMetadata.messageOffset, then we could loop forever. I still don't know why the infinite loop will happen. In `Locallog#read`, we'll check if `startOffset > nextOffsetMetadata.messageOffset`, if so, `OffsetOutOfRangeException` will be thrown. ``` def read(startOffset: Long, maxLength: Int, minOneMessage: Boolean, maxOffsetMetadata: LogOffsetMetadata, includeAbortedTxns: Boolean): FetchDataInfo = { // (1) We will check if startOffset > nextOffsetMetadata.messageOffset, so if the provided `maxOffsetMetadata == nextOffsetMetadata`, the `OffsetOutOfRangeException` should be thrown before entering another `convertToOffsetMetadataOrThrow` loop val endOffsetMetadata = nextOffsetMetadata val endOffset = endOffsetMetadata.messageOffset var segmentOpt = segments.floorSegment(startOffset) // return error on attempt to read beyond the log end offset if (startOffset > endOffset || !segmentOpt.isPresent) throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, but we only have log segments upto $endOffset.") if (startOffset == maxOffsetMetadata.messageOffset) emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns) // So for the first time, the provided `maxOffsetMetadata` might be highWaterMark or LastStableOffset, it could enter this `else if` condition because HWM/LSO should is always less than LEO. But the 2nd time won't enter here. else if (startOffset > maxOffsetMetadata.messageOffset) emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), includeAbortedTxns) ``` So there should be no chance that we do `Locallog#read` with these `maxOffsetMetadata = nextOffsetMetadata` provided entering another `convertToOffsetMetadataOrThrow` loop. If this did happen, it means the `logEndOffset < highWaterMarkOffset` or `logEndOffset < lastStableOffseet`, which I don't think this would happen. Does that make sense? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
junrao commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1608986312 ## core/src/main/scala/kafka/log/LocalLog.scala: ## @@ -370,11 +370,17 @@ class LocalLog(@volatile private var _dir: File, throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, " + s"but we only have log segments upto $endOffset.") - if (startOffset == maxOffsetMetadata.messageOffset) + if (startOffset == maxOffsetMetadata.messageOffset) { emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns) - else if (startOffset > maxOffsetMetadata.messageOffset) -emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), includeAbortedTxns) - else { + } else if (startOffset > maxOffsetMetadata.messageOffset || +maxOffsetMetadata.messageOffsetOnly() || +maxOffsetMetadata.segmentBaseOffset < segmentOpt.get().baseOffset()) { +// We need to be careful before reading the segment as `maxOffsetMetadata` may not be a complete metadata: +// 1. If maxOffsetMetadata is message-offset-only, then return empty fetchDataInfo since +// maxOffsetMetadata.offset is not on local log segments. +// 2. If maxOffsetMetadata.segmentBaseOffset is smaller than segment.baseOffset, then return empty fetchDataInfo. Review Comment: Looking at the code again. @showuon is correct. `convertToOffsetMetadataOrThrow(startOffset)` uses nextOffset, which always has the offset metadata. So, we can keep the `convertToOffsetMetadataOrThrow(startOffset)` call. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
junrao commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1608984545 ## core/src/main/scala/kafka/log/LocalLog.scala: ## @@ -370,11 +370,17 @@ class LocalLog(@volatile private var _dir: File, throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, " + s"but we only have log segments upto $endOffset.") - if (startOffset == maxOffsetMetadata.messageOffset) + if (startOffset == maxOffsetMetadata.messageOffset) { emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns) - else if (startOffset > maxOffsetMetadata.messageOffset) + } else if (startOffset > maxOffsetMetadata.messageOffset || +maxOffsetMetadata.messageOffsetOnly() || +maxOffsetMetadata.segmentBaseOffset < segmentOpt.get.baseOffset) { Review Comment: Returning the current offset is correct. I am wondering why the suggested approach returns nextOffset. `segment.read(startOffset, maxLength, maxPosition, minOneMessage)` should return a non-null fetchDataInfo since the startOffset exists in the first segment, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
kamalcph commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1608834195 ## core/src/main/scala/kafka/log/LocalLog.scala: ## @@ -370,11 +370,17 @@ class LocalLog(@volatile private var _dir: File, throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, " + s"but we only have log segments upto $endOffset.") - if (startOffset == maxOffsetMetadata.messageOffset) + if (startOffset == maxOffsetMetadata.messageOffset) { emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns) - else if (startOffset > maxOffsetMetadata.messageOffset) -emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), includeAbortedTxns) - else { + } else if (startOffset > maxOffsetMetadata.messageOffset || +maxOffsetMetadata.messageOffsetOnly() || +maxOffsetMetadata.segmentBaseOffset < segmentOpt.get().baseOffset()) { +// We need to be careful before reading the segment as `maxOffsetMetadata` may not be a complete metadata: +// 1. If maxOffsetMetadata is message-offset-only, then return empty fetchDataInfo since +// maxOffsetMetadata.offset is not on local log segments. +// 2. If maxOffsetMetadata.segmentBaseOffset is smaller than segment.baseOffset, then return empty fetchDataInfo. Review Comment: This comment is pending. Will wait for @showuon review before applying the change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
kamalcph commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1608831211 ## core/src/main/scala/kafka/log/LocalLog.scala: ## @@ -370,11 +370,17 @@ class LocalLog(@volatile private var _dir: File, throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, " + s"but we only have log segments upto $endOffset.") - if (startOffset == maxOffsetMetadata.messageOffset) + if (startOffset == maxOffsetMetadata.messageOffset) { emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns) - else if (startOffset > maxOffsetMetadata.messageOffset) + } else if (startOffset > maxOffsetMetadata.messageOffset || +maxOffsetMetadata.messageOffsetOnly() || +maxOffsetMetadata.segmentBaseOffset < segmentOpt.get.baseOffset) { Review Comment: When applying this change, the newly added LocalLog#testWhenFetchOffsetHigherThanMaxOffset test fails for the case-3 and case-4. (ie) Which offset to return back in the FetchDataInfo when the conditions didn't met? 1. In current approach, the same fetch-offset gets returned back in the FetchDataInfo 2. In the suggested approach, the next-offset/log-end-offset gets returned back in the FetchDataInfo I'm not sure which one is correct. Please suggest. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
kamalcph commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1608831211 ## core/src/main/scala/kafka/log/LocalLog.scala: ## @@ -370,11 +370,17 @@ class LocalLog(@volatile private var _dir: File, throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, " + s"but we only have log segments upto $endOffset.") - if (startOffset == maxOffsetMetadata.messageOffset) + if (startOffset == maxOffsetMetadata.messageOffset) { emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns) - else if (startOffset > maxOffsetMetadata.messageOffset) + } else if (startOffset > maxOffsetMetadata.messageOffset || +maxOffsetMetadata.messageOffsetOnly() || +maxOffsetMetadata.segmentBaseOffset < segmentOpt.get.baseOffset) { Review Comment: When applying this change, the newly added LocalLog#testWhenFetchOffsetHigherThanMaxOffset test fails for the case-3 and case-4. (ie) Which offset to return back in the FetchDataInfo when the conditions didn't met? 1. In current approach, the same fetch-offset gets returned back in the FetchDataInfo 2. In the suggested approach, the next-offset/log-end-offset gets returned back in the FetchDataInfo -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
kamalcph commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1608823231 ## storage/src/test/java/org/apache/kafka/tiered/storage/actions/EraseBrokerStorageAction.java: ## @@ -19,24 +19,35 @@ import org.apache.kafka.tiered.storage.TieredStorageTestAction; import org.apache.kafka.tiered.storage.TieredStorageTestContext; +import java.io.FilenameFilter; import java.io.IOException; import java.io.PrintStream; public final class EraseBrokerStorageAction implements TieredStorageTestAction { private final int brokerId; +private final FilenameFilter filenameFilter; +private final boolean isStopped; public EraseBrokerStorageAction(int brokerId) { +this(brokerId, (dir, name) -> true, false); +} + +public EraseBrokerStorageAction(int brokerId, +FilenameFilter filenameFilter, +boolean isStopped) { this.brokerId = brokerId; +this.filenameFilter = filenameFilter; +this.isStopped = isStopped; } @Override public void doExecute(TieredStorageTestContext context) throws IOException { -context.eraseBrokerStorage(brokerId); +context.eraseBrokerStorage(brokerId, filenameFilter, isStopped); } @Override public void describe(PrintStream output) { -output.println("erase-broker-storage: " + brokerId); +output.println("erase-broker-storage: " + brokerId + ", isStopped: " + isStopped); Review Comment: `filenameFilter` is a lambda expression, it results to the object name in the log so omitted it from the output. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
junrao commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1607381004 ## core/src/main/scala/kafka/server/DelayedFetch.scala: ## @@ -92,7 +92,14 @@ class DelayedFetch( // has just rolled, then the high watermark offset will remain the same but be on the old segment, // which would incorrectly be seen as an instance of Case F. if (endOffset.messageOffset != fetchOffset.messageOffset) { - if (endOffset.onOlderSegment(fetchOffset)) { + if (endOffset.messageOffsetOnly() || fetchOffset.messageOffsetOnly()) { +// If we don't know the position of the offset on log segments, just pessimistically assume that we +// only gained 1 byte when fetchOffset < endOffset, otherwise do nothing. This can happen when the +// high-watermark is stale, but should be rare. +if (fetchOffset.messageOffset < endOffset.messageOffset) { Review Comment: We can organize the code a bit clearer. I am thinking of sth like the following. ``` if (fetchOffset.messageOffset > endOffset.messageOffset) { // Case F, this can happen when the new fetch operation is on a truncated leader debug(s"Satisfying fetch $this since it is fetching later segments of partition $topicIdPartition.") return forceComplete() } else if (fetchOffset.messageOffset < endOffset.messageOffset) { if (endOffset.messageOffsetOnly() || fetchOffset.messageOffsetOnly()) { // If we don't know the position of the offset on log segments, just pessimistically assume that we // only gained 1 byte when fetchOffset < endOffset, otherwise do nothing. This can happen when the // high-watermark is stale, but should be rare. accumulatedSize += 1 } else if (fetchOffset.onOlderSegment(endOffset)) { // Case F, this can happen when the fetch operation is falling behind the current segment // or the partition has just rolled a new segment debug(s"Satisfying fetch $this immediately since it is fetching older segments.") // We will not force complete the fetch request if a replica should be throttled. if (!params.isFromFollower || !replicaManager.shouldLeaderThrottle(quota, partition, params.replicaId)) return forceComplete() } else if (fetchOffset.onSameSegment(endOffset)) { // we take the partition fetch size as upper bound when accumulating the bytes (skip if a throttled partition) val bytesAvailable = math.min(endOffset.positionDiff(fetchOffset), fetchStatus.fetchInfo.maxBytes) if (!params.isFromFollower || !replicaManager.shouldLeaderThrottle(quota, partition, params.replicaId)) accumulatedSize += bytesAvailable } ``` ## core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala: ## @@ -164,18 +169,71 @@ class DelayedFetchTest { assertTrue(delayedFetch.tryComplete()) assertTrue(delayedFetch.isCompleted) assertTrue(fetchResultOpt.isDefined) + +val fetchResult = fetchResultOpt.get +assertEquals(Errors.NONE, fetchResult.error) + } + + @ParameterizedTest(name = "testDelayedFetchWithInvalidHighWatermark endOffset={0}") + @ValueSource(longs = Array(0, 500)) + def testDelayedFetchWithInvalidHighWatermark(endOffset: Long): Unit = { +val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "topic") +val fetchOffset = 450L +val logStartOffset = 5L +val currentLeaderEpoch = Optional.of[Integer](10) +val replicaId = 1 + +val fetchStatus = FetchPartitionStatus( + startOffsetMetadata = new LogOffsetMetadata(fetchOffset), + fetchInfo = new FetchRequest.PartitionData(topicIdPartition.topicId, fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch)) +val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500) + +var fetchResultOpt: Option[FetchPartitionData] = None +def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = { + fetchResultOpt = Some(responses.head._2) +} + +val delayedFetch = new DelayedFetch( + params = fetchParams, + fetchPartitionStatus = Seq(topicIdPartition -> fetchStatus), + replicaManager = replicaManager, + quota = replicaQuota, + responseCallback = callback +) + +val partition: Partition = mock(classOf[Partition]) + when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition)).thenReturn(partition) +// Note that the high-watermark does not contains the complete metadata Review Comment: does not contains => does not contain #
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
kamalcph commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1608327384 ## core/src/main/scala/kafka/log/LocalLog.scala: ## @@ -370,11 +370,17 @@ class LocalLog(@volatile private var _dir: File, throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, " + s"but we only have log segments upto $endOffset.") - if (startOffset == maxOffsetMetadata.messageOffset) + if (startOffset == maxOffsetMetadata.messageOffset) { emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns) - else if (startOffset > maxOffsetMetadata.messageOffset) + } else if (startOffset > maxOffsetMetadata.messageOffset || +maxOffsetMetadata.messageOffsetOnly() || +maxOffsetMetadata.segmentBaseOffset < segmentOpt.get.baseOffset) { Review Comment: Not sure when do we need this condition (added as per Jun's suggestion): ``` maxOffsetMetadata.segmentBaseOffset < segmentOpt.get.baseOffset ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
kamalcph commented on PR #15825: URL: https://github.com/apache/kafka/pull/15825#issuecomment-2122625313 > @kamalcph , had another look, left some comments. Also, could we add an integration test for this case like you described in the PR description? `FetchFromFollowerIntegrationTest` should be a good place to do that. There are some examples to shutdown nodes and then restart it, and then verify producer/consumer still work well. Added `FetchFromLeaderWithCorruptedCheckpointTest` integration test and ensured that the produce fails without the patch (ran the test against trunk and this branch). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
showuon commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1608010033 ## core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala: ## @@ -391,8 +398,10 @@ class UnifiedLogTest { assertNonEmptyFetch(log, offset, FetchIsolation.HIGH_WATERMARK) } - (log.highWatermark to log.logEndOffset).foreach { offset => -assertEmptyFetch(log, offset, FetchIsolation.HIGH_WATERMARK) + assertEmptyFetch(log, log.highWatermark, FetchIsolation.HIGH_WATERMARK) + + (log.highWatermark + 1 to log.logEndOffset).foreach { offset => Review Comment: With my [comment](https://github.com/apache/kafka/pull/15825#discussion_r1608007659), I think we don't need this change now, which makes this change smaller and safer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
showuon commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1608007659 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Given a message offset, find its corresponding offset metadata in the log. -* If the message offset is out of range, throw an OffsetOutOfRangeException +* 1. If the message offset is lesser than the log-start-offset, then throw an OffsetOutOfRangeException +* 2. If the message offset is lesser than the local-log-start-offset, then it returns the message-only metadata +* 3. If the message offset is greater than the log-end-offset, then it returns the message-only metadata */ - private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = { + private[log] def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = { Review Comment: I don't think we need to change it because it won't cause infinite recursion in `LocalLog.read()`: ``` else if (startOffset > maxOffsetMetadata.messageOffset) emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), includeAbortedTxns) ``` The reason is when in `LocalLog#convertToOffsetMetadataOrThrow`, the `maxOffsetMetadata` will be set to `nextOffsetMetadata`, which is the `logEndOffset`. ``` read(offset, maxLength = 1, minOneMessage = false, maxOffsetMetadata = nextOffsetMetadata, includeAbortedTxns = false) ``` So, even if it enters the `LocalLog.read()` again, but this time, it'll enter different if/else block. It won't enter the `else if (startOffset > maxOffsetMetadata.messageOffset)` block this time because the check earlier will catch it! ``` if (startOffset > endOffset || !segmentOpt.isPresent) throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, " + s"but we only have log segments upto $endOffset.") ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
showuon commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1608007659 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Given a message offset, find its corresponding offset metadata in the log. -* If the message offset is out of range, throw an OffsetOutOfRangeException +* 1. If the message offset is lesser than the log-start-offset, then throw an OffsetOutOfRangeException +* 2. If the message offset is lesser than the local-log-start-offset, then it returns the message-only metadata +* 3. If the message offset is greater than the log-end-offset, then it returns the message-only metadata */ - private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = { + private[log] def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = { Review Comment: I don't think we need to change it because it won't cause infinite recursion in `LocalLog.read()`: ``` else if (startOffset > maxOffsetMetadata.messageOffset) emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), includeAbortedTxns) ``` The reason is when in `LocalLog#convertToOffsetMetadataOrThrow`, the `maxOffsetMetadata` will be set to `nextOffsetMetadata`, which is the `logEndOffset`. ``` read(offset, maxLength = 1, minOneMessage = false, maxOffsetMetadata = nextOffsetMetadata, includeAbortedTxns = false) ``` So, even it enters the `LocalLog.read()` again, but this time, it'll enter different if/else block. It won't enter the `else if (startOffset > maxOffsetMetadata.messageOffset)` block this time because the check earlier will catch it! ``` if (startOffset > endOffset || !segmentOpt.isPresent) throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, " + s"but we only have log segments upto $endOffset.") ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
kamalcph commented on PR #15825: URL: https://github.com/apache/kafka/pull/15825#issuecomment-2121704898 Test failures are unrelated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
kamalcph commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1606925582 ## core/src/main/scala/kafka/log/LocalLog.scala: ## @@ -370,11 +370,12 @@ class LocalLog(@volatile private var _dir: File, throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, " + s"but we only have log segments upto $endOffset.") - if (startOffset == maxOffsetMetadata.messageOffset) + if (startOffset == maxOffsetMetadata.messageOffset) { emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns) - else if (startOffset > maxOffsetMetadata.messageOffset) -emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), includeAbortedTxns) - else { + } else if (startOffset > maxOffsetMetadata.messageOffset) { +// Instead of converting the `startOffset` to metadata, returning message-only metadata to avoid potential loop +emptyFetchDataInfo(new LogOffsetMetadata(startOffset), includeAbortedTxns) Review Comment: With the newly added checks in L376 and L377, the empty fetch-data-info is returned and the actual data will be returned only when the `maxOffsetMetadata` gets resolved to complete metadata. Earlier, the below condition returned `segment.size` for message-only maxOffsetMetadata: ``` if (maxOffsetMetadata.segmentBaseOffset == segment.baseOffset) maxOffsetMetadata.relativePositionInSegment else segment.size ``` so we might be returning the data beyond the allowed maxOffset. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
kamalcph commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1606711171 ## core/src/main/scala/kafka/log/LocalLog.scala: ## @@ -370,11 +370,12 @@ class LocalLog(@volatile private var _dir: File, throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, " + s"but we only have log segments upto $endOffset.") - if (startOffset == maxOffsetMetadata.messageOffset) + if (startOffset == maxOffsetMetadata.messageOffset) { emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns) - else if (startOffset > maxOffsetMetadata.messageOffset) -emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), includeAbortedTxns) - else { + } else if (startOffset > maxOffsetMetadata.messageOffset) { +// Instead of converting the `startOffset` to metadata, returning message-only metadata to avoid potential loop +emptyFetchDataInfo(new LogOffsetMetadata(startOffset), includeAbortedTxns) Review Comment: Addressed the review comment in 6452a452c40001ca32ea4272e98e0691b69e9d28 and 51b6bb8d208754985196a401d13a27c282422c8c -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
kamalcph commented on PR #15825: URL: https://github.com/apache/kafka/pull/15825#issuecomment-2120351527 > @kamalcph , do we have any update for this PR? sorry for the delay. Addressed all the review comments. PTAL. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
showuon commented on PR #15825: URL: https://github.com/apache/kafka/pull/15825#issuecomment-2119875404 @kamalcph , do we have any update for this PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
junrao commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1599161815 ## core/src/main/scala/kafka/log/LocalLog.scala: ## @@ -370,11 +370,12 @@ class LocalLog(@volatile private var _dir: File, throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, " + s"but we only have log segments upto $endOffset.") - if (startOffset == maxOffsetMetadata.messageOffset) + if (startOffset == maxOffsetMetadata.messageOffset) { emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns) - else if (startOffset > maxOffsetMetadata.messageOffset) -emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), includeAbortedTxns) - else { + } else if (startOffset > maxOffsetMetadata.messageOffset) { +// Instead of converting the `startOffset` to metadata, returning message-only metadata to avoid potential loop +emptyFetchDataInfo(new LogOffsetMetadata(startOffset), includeAbortedTxns) Review Comment: Hmm, I thought the design of this PR is to allow maxOffsetMetadata to be message-only in some of the rare cases, right? ## core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala: ## @@ -164,18 +169,71 @@ class DelayedFetchTest { assertTrue(delayedFetch.tryComplete()) assertTrue(delayedFetch.isCompleted) assertTrue(fetchResultOpt.isDefined) + +val fetchResult = fetchResultOpt.get +assertEquals(Errors.NONE, fetchResult.error) + } + + @ParameterizedTest(name = "testDelayedFetchWithInvalidHighWatermark minBytes={0}") + @ValueSource(ints = Array(1, 2)) + def testDelayedFetchWithInvalidHighWatermark(minBytes: Int): Unit = { +val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "topic") +val fetchOffset = 450L +val logStartOffset = 5L +val currentLeaderEpoch = Optional.of[Integer](10) +val replicaId = 1 + +val fetchStatus = FetchPartitionStatus( + startOffsetMetadata = new LogOffsetMetadata(fetchOffset), + fetchInfo = new FetchRequest.PartitionData(topicIdPartition.topicId, fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch)) +val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500, minBytes = minBytes) + +var fetchResultOpt: Option[FetchPartitionData] = None +def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = { + fetchResultOpt = Some(responses.head._2) +} + +val delayedFetch = new DelayedFetch( + params = fetchParams, + fetchPartitionStatus = Seq(topicIdPartition -> fetchStatus), + replicaManager = replicaManager, + quota = replicaQuota, + responseCallback = callback +) + +val partition: Partition = mock(classOf[Partition]) + when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition)).thenReturn(partition) +// high-watermark is lesser than the log-start-offset +val endOffsetMetadata = new LogOffsetMetadata(0L, 0L, 0) +when(partition.fetchOffsetSnapshot( + currentLeaderEpoch, + fetchOnlyFromLeader = true)) + .thenReturn(new LogOffsetSnapshot(0L, endOffsetMetadata, endOffsetMetadata, endOffsetMetadata)) +when(replicaManager.isAddingReplica(any(), anyInt())).thenReturn(false) +expectReadFromReplica(fetchParams, topicIdPartition, fetchStatus.fetchInfo, Errors.NONE) + +val expected = minBytes == 1 +assertEquals(expected, delayedFetch.tryComplete()) +assertEquals(expected, delayedFetch.isCompleted) Review Comment: Yes, I understand that the test passes as it is. I am just saying that the logic in DelayedFetch is not consistent. If the offset metadata is available, we accumulate bytes only `if (fetchOffset.messageOffset < endOffset.messageOffset)`. To be consistent, we need to do the same `if` test if the offset metadata is not available. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
kamalcph commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1598838884 ## core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala: ## @@ -391,8 +398,10 @@ class UnifiedLogTest { assertNonEmptyFetch(log, offset, FetchIsolation.HIGH_WATERMARK) } - (log.highWatermark to log.logEndOffset).foreach { offset => -assertEmptyFetch(log, offset, FetchIsolation.HIGH_WATERMARK) + assertEmptyFetch(log, log.highWatermark, FetchIsolation.HIGH_WATERMARK) + + (log.highWatermark + 1 to log.logEndOffset).foreach { offset => Review Comment: since `log.highWatermark` contains the full-metadata, divided the check into 2 when reading at (or) beyond the high-watermark: 1) When the fetchOffset equals to the high-watermark, we return empty-records but with complete offset metadata 2) When the fetchOffset is beyond the high-watermark/max-offset-allowed-to-read, we return empty-records with message-only metadata. We can reconsider the case whether to return message-only-offset metadata (or) complete-offset metadata when the fetch-offset is beyond the `max-offset-allowed-to-read`. ([comment](https://github.com/apache/kafka/pull/15825#discussion_r1598841362)) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
kamalcph commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1598848735 ## core/src/main/scala/kafka/log/LocalLog.scala: ## @@ -370,11 +370,12 @@ class LocalLog(@volatile private var _dir: File, throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, " + s"but we only have log segments upto $endOffset.") - if (startOffset == maxOffsetMetadata.messageOffset) + if (startOffset == maxOffsetMetadata.messageOffset) { emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns) - else if (startOffset > maxOffsetMetadata.messageOffset) -emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), includeAbortedTxns) - else { + } else if (startOffset > maxOffsetMetadata.messageOffset) { +// Instead of converting the `startOffset` to metadata, returning message-only metadata to avoid potential loop +emptyFetchDataInfo(new LogOffsetMetadata(startOffset), includeAbortedTxns) Review Comment: Agree, this patch is getting tricky. We want to validate all the scenarios especially when there is no data to read from the server, the number of fetch requests rate from the clients should be almost the same. To avoid/reduce the cases, Can we always resolve the maxOffsetMetadata to complete metadata? https://github.com/apache/kafka/pull/15825#discussion_r1598841362 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
kamalcph commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1598848735 ## core/src/main/scala/kafka/log/LocalLog.scala: ## @@ -370,11 +370,12 @@ class LocalLog(@volatile private var _dir: File, throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, " + s"but we only have log segments upto $endOffset.") - if (startOffset == maxOffsetMetadata.messageOffset) + if (startOffset == maxOffsetMetadata.messageOffset) { emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns) - else if (startOffset > maxOffsetMetadata.messageOffset) -emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), includeAbortedTxns) - else { + } else if (startOffset > maxOffsetMetadata.messageOffset) { +// Instead of converting the `startOffset` to metadata, returning message-only metadata to avoid potential loop +emptyFetchDataInfo(new LogOffsetMetadata(startOffset), includeAbortedTxns) Review Comment: Agree, this patch is getting tricky. We want to validate all the scenarios especially when there is no data to read from the server, the number of fetch requests rate from the clients should be almost the same. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
kamalcph commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1598841362 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Given a message offset, find its corresponding offset metadata in the log. -* If the message offset is out of range, throw an OffsetOutOfRangeException +* 1. If the message offset is lesser than the log-start-offset, then throw an OffsetOutOfRangeException +* 2. If the message offset is lesser than the local-log-start-offset, then it returns the message-only metadata +* 3. If the message offset is greater than the log-end-offset, then it returns the message-only metadata */ - private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = { + private[log] def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = { Review Comment: To solve the infinite loop, instead of returning the message-only `LogOffsetMetadata` when startOffset is beyond the maxOffsetMetadata. Can we retain the same behavior without the loop? Something like below: ```java def read(startOffset: Long, maxLength: Int, minOneMessage: Boolean, maxOffsetMetadata: LogOffsetMetadata, includeAbortedTxns: Boolean): FetchDataInfo = { maybeHandleIOException(s"Exception while reading from $topicPartition in dir ${dir.getParent}") { trace(s"Reading maximum $maxLength bytes at offset $startOffset from log with " + s"total length ${segments.sizeInBytes} bytes") val endOffsetMetadata = nextOffsetMetadata val endOffset = endOffsetMetadata.messageOffset val segmentOpt = segments.floorSegment(startOffset) // return error on attempt to read beyond the log end offset if (startOffset > endOffset || !segmentOpt.isPresent) throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, " + s"but we only have log segments upto $endOffset.") if (startOffset == maxOffsetMetadata.messageOffset) { emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns) } else if (startOffset > maxOffsetMetadata.messageOffset) { // Updated code to avoid the loop: val tmpFetchDataInfo = readFetchDataInfo(segmentOpt.get, startOffset, maxLength = 1, minOneMessage = false, nextOffsetMetadata, includeAbortedTxns = false) emptyFetchDataInfo(tmpFetchDataInfo.fetchOffsetMetadata, includeAbortedTxns) } else { readFetchDataInfo(segmentOpt.get, startOffset, maxLength, minOneMessage, maxOffsetMetadata, includeAbortedTxns) } } } private def readFetchDataInfo(segment: LogSegment, startOffset: Long, maxLength: Int, minOneMessage: Boolean, maxOffsetMetadata: LogOffsetMetadata, includeAbortedTxns: Boolean): FetchDataInfo = { // Do the read on the segment with a base offset less than the target offset // but if that segment doesn't contain any messages with an offset greater than that // continue to read from successive segments until we get some messages or we reach the end of the log var fetchDataInfo: FetchDataInfo = null var segmentOpt: Optional[LogSegment] = Optional.of(segment) while (fetchDataInfo == null && segmentOpt.isPresent) { val segment = segmentOpt.get val baseOffset = segment.baseOffset val maxPosition = // Use the max offset position if it is on this segment; otherwise, the segment size is the limit. if (maxOffsetMetadata.segmentBaseOffset == segment.baseOffset) maxOffsetMetadata.relativePositionInSegment else segment.size fetchDataInfo = segment.read(startOffset, maxLength, maxPosition, minOneMessage) if (fetchDataInfo != null) { if (includeAbortedTxns) fetchDataInfo = addAbortedTransactions(startOffset, segment, fetchDataInfo) } else segmentOpt = segments.higherSegment(baseOffset) } if (fetchDataInfo != null) fetchDataInfo else { // okay we are beyond the end of the last segment with no data fetched although the start offset is in range, // this can happen when all messages with offset larger than start offsets have been deleted. // In this case, we will return the empty set with log end offset metadata new FetchDataInfo(nextOffsetMetadata, MemoryRecords.EMPTY) } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the UR
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
kamalcph commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1598841362 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Given a message offset, find its corresponding offset metadata in the log. -* If the message offset is out of range, throw an OffsetOutOfRangeException +* 1. If the message offset is lesser than the log-start-offset, then throw an OffsetOutOfRangeException +* 2. If the message offset is lesser than the local-log-start-offset, then it returns the message-only metadata +* 3. If the message offset is greater than the log-end-offset, then it returns the message-only metadata */ - private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = { + private[log] def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = { Review Comment: To solve the infinite loop, instead of returning the message-only `LogOffsetMetadata`. Can we retain the same behavior without the loop? Something like below: ```java def read(startOffset: Long, maxLength: Int, minOneMessage: Boolean, maxOffsetMetadata: LogOffsetMetadata, includeAbortedTxns: Boolean): FetchDataInfo = { maybeHandleIOException(s"Exception while reading from $topicPartition in dir ${dir.getParent}") { trace(s"Reading maximum $maxLength bytes at offset $startOffset from log with " + s"total length ${segments.sizeInBytes} bytes") val endOffsetMetadata = nextOffsetMetadata val endOffset = endOffsetMetadata.messageOffset val segmentOpt = segments.floorSegment(startOffset) // return error on attempt to read beyond the log end offset if (startOffset > endOffset || !segmentOpt.isPresent) throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, " + s"but we only have log segments upto $endOffset.") if (startOffset == maxOffsetMetadata.messageOffset) { emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns) } else if (startOffset > maxOffsetMetadata.messageOffset) { // Updated code to avoid the loop: val tmpFetchDataInfo = readFetchDataInfo(segmentOpt.get, startOffset, maxLength = 1, minOneMessage = false, nextOffsetMetadata, includeAbortedTxns = false) emptyFetchDataInfo(tmpFetchDataInfo.fetchOffsetMetadata, includeAbortedTxns) } else { readFetchDataInfo(segmentOpt.get, startOffset, maxLength, minOneMessage, maxOffsetMetadata, includeAbortedTxns) } } } private def readFetchDataInfo(segment: LogSegment, startOffset: Long, maxLength: Int, minOneMessage: Boolean, maxOffsetMetadata: LogOffsetMetadata, includeAbortedTxns: Boolean): FetchDataInfo = { // Do the read on the segment with a base offset less than the target offset // but if that segment doesn't contain any messages with an offset greater than that // continue to read from successive segments until we get some messages or we reach the end of the log var fetchDataInfo: FetchDataInfo = null var segmentOpt: Optional[LogSegment] = Optional.of(segment) while (fetchDataInfo == null && segmentOpt.isPresent) { val segment = segmentOpt.get val baseOffset = segment.baseOffset val maxPosition = // Use the max offset position if it is on this segment; otherwise, the segment size is the limit. if (maxOffsetMetadata.segmentBaseOffset == segment.baseOffset) maxOffsetMetadata.relativePositionInSegment else segment.size fetchDataInfo = segment.read(startOffset, maxLength, maxPosition, minOneMessage) if (fetchDataInfo != null) { if (includeAbortedTxns) fetchDataInfo = addAbortedTransactions(startOffset, segment, fetchDataInfo) } else segmentOpt = segments.higherSegment(baseOffset) } if (fetchDataInfo != null) fetchDataInfo else { // okay we are beyond the end of the last segment with no data fetched although the start offset is in range, // this can happen when all messages with offset larger than start offsets have been deleted. // In this case, we will return the empty set with log end offset metadata new FetchDataInfo(nextOffsetMetadata, MemoryRecords.EMPTY) } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubs
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
kamalcph commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1598838884 ## core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala: ## @@ -391,8 +398,10 @@ class UnifiedLogTest { assertNonEmptyFetch(log, offset, FetchIsolation.HIGH_WATERMARK) } - (log.highWatermark to log.logEndOffset).foreach { offset => -assertEmptyFetch(log, offset, FetchIsolation.HIGH_WATERMARK) + assertEmptyFetch(log, log.highWatermark, FetchIsolation.HIGH_WATERMARK) + + (log.highWatermark + 1 to log.logEndOffset).foreach { offset => Review Comment: since `log.highWatermark` contains the full-metadata, divided the check into 2 when reading at (or) beyond the high-watermark: 1) When the fetchOffset equals to the high-watermark, we return empty-records but with complete offset metadata 2) When the fetchOffset is beyond the high-watermark/max-offset-allowed-to-read, we return empty-records with message-only metadata. We can reconsider the case whether to return message-only-offset metadata (or) complete-offset metadata when the fetch-offset is beyond the `max-offset-allowed-to-read`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
kamalcph commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1598832131 ## core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala: ## @@ -164,18 +169,71 @@ class DelayedFetchTest { assertTrue(delayedFetch.tryComplete()) assertTrue(delayedFetch.isCompleted) assertTrue(fetchResultOpt.isDefined) + +val fetchResult = fetchResultOpt.get +assertEquals(Errors.NONE, fetchResult.error) + } + + @ParameterizedTest(name = "testDelayedFetchWithInvalidHighWatermark minBytes={0}") + @ValueSource(ints = Array(1, 2)) + def testDelayedFetchWithInvalidHighWatermark(minBytes: Int): Unit = { +val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "topic") +val fetchOffset = 450L +val logStartOffset = 5L +val currentLeaderEpoch = Optional.of[Integer](10) +val replicaId = 1 + +val fetchStatus = FetchPartitionStatus( + startOffsetMetadata = new LogOffsetMetadata(fetchOffset), + fetchInfo = new FetchRequest.PartitionData(topicIdPartition.topicId, fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch)) +val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500, minBytes = minBytes) + +var fetchResultOpt: Option[FetchPartitionData] = None +def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = { + fetchResultOpt = Some(responses.head._2) +} + +val delayedFetch = new DelayedFetch( + params = fetchParams, + fetchPartitionStatus = Seq(topicIdPartition -> fetchStatus), + replicaManager = replicaManager, + quota = replicaQuota, + responseCallback = callback +) + +val partition: Partition = mock(classOf[Partition]) + when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition)).thenReturn(partition) +// high-watermark is lesser than the log-start-offset +val endOffsetMetadata = new LogOffsetMetadata(0L, 0L, 0) +when(partition.fetchOffsetSnapshot( + currentLeaderEpoch, + fetchOnlyFromLeader = true)) + .thenReturn(new LogOffsetSnapshot(0L, endOffsetMetadata, endOffsetMetadata, endOffsetMetadata)) +when(replicaManager.isAddingReplica(any(), anyInt())).thenReturn(false) +expectReadFromReplica(fetchParams, topicIdPartition, fetchStatus.fetchInfo, Errors.NONE) + +val expected = minBytes == 1 +assertEquals(expected, delayedFetch.tryComplete()) +assertEquals(expected, delayedFetch.isCompleted) Review Comment: In the test, the [LogOffsetSnapshot](https://github.com/apache/kafka/blob/trunk/storage/src/main/java/org/apache/kafka/storage/internals/log/LogOffsetSnapshot.java) contains message-only offset for logEndOffset, highWatermark, and lastStableOffset in DelayedFetchTest.java#207. So, the test passed with the newly added condition. In real scenario, we expect the LogOffsetSnapshot to contain the complete metadata for all the offsets. ## storage/src/test/java/org/apache/kafka/storage/internals/log/LogOffsetMetadataTest.java: ## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import org.apache.kafka.common.KafkaException; +import org.junit.jupiter.api.Test; + +import static org.apache.kafka.storage.internals.log.LogOffsetMetadata.UNKNOWN_OFFSET_METADATA; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class LogOffsetMetadataTest { + +@Test +void testOnOlderSegment() { +LogOffsetMetadata metadata1 = new LogOffsetMetadata(1L, 0L, 1); +LogOffsetMetadata metadata2 = new LogOffsetMetadata(5L, 4L, 2); +LogOffsetMetadata messageOnlyMetadata = new LogOffsetMetadata(1L); + assertFalse(UNKNOWN_OFFSET_METADATA.onOlderSegment(UNKNOWN_OFFSET_METADATA)); +assertFalse(metadata1.onOlderSegment(messageOnlyMetadata)); +assertFalse(messageOnlyMetadata.onOlderSegment(metadata1)); +assertFalse(metadata1.onOlderSegment(metadata1)); +assert
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
kamalcph commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1598832131 ## core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala: ## @@ -164,18 +169,71 @@ class DelayedFetchTest { assertTrue(delayedFetch.tryComplete()) assertTrue(delayedFetch.isCompleted) assertTrue(fetchResultOpt.isDefined) + +val fetchResult = fetchResultOpt.get +assertEquals(Errors.NONE, fetchResult.error) + } + + @ParameterizedTest(name = "testDelayedFetchWithInvalidHighWatermark minBytes={0}") + @ValueSource(ints = Array(1, 2)) + def testDelayedFetchWithInvalidHighWatermark(minBytes: Int): Unit = { +val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "topic") +val fetchOffset = 450L +val logStartOffset = 5L +val currentLeaderEpoch = Optional.of[Integer](10) +val replicaId = 1 + +val fetchStatus = FetchPartitionStatus( + startOffsetMetadata = new LogOffsetMetadata(fetchOffset), + fetchInfo = new FetchRequest.PartitionData(topicIdPartition.topicId, fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch)) +val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500, minBytes = minBytes) + +var fetchResultOpt: Option[FetchPartitionData] = None +def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = { + fetchResultOpt = Some(responses.head._2) +} + +val delayedFetch = new DelayedFetch( + params = fetchParams, + fetchPartitionStatus = Seq(topicIdPartition -> fetchStatus), + replicaManager = replicaManager, + quota = replicaQuota, + responseCallback = callback +) + +val partition: Partition = mock(classOf[Partition]) + when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition)).thenReturn(partition) +// high-watermark is lesser than the log-start-offset +val endOffsetMetadata = new LogOffsetMetadata(0L, 0L, 0) +when(partition.fetchOffsetSnapshot( + currentLeaderEpoch, + fetchOnlyFromLeader = true)) + .thenReturn(new LogOffsetSnapshot(0L, endOffsetMetadata, endOffsetMetadata, endOffsetMetadata)) +when(replicaManager.isAddingReplica(any(), anyInt())).thenReturn(false) +expectReadFromReplica(fetchParams, topicIdPartition, fetchStatus.fetchInfo, Errors.NONE) + +val expected = minBytes == 1 +assertEquals(expected, delayedFetch.tryComplete()) +assertEquals(expected, delayedFetch.isCompleted) Review Comment: In the test, the [LogOffsetSnapshot](https://github.com/apache/kafka/blob/trunk/storage/src/main/java/org/apache/kafka/storage/internals/log/LogOffsetSnapshot.java) contains message-only offset for logEndOffset, highWatermark, and lastStableOffset in DelayedFetchTest.java#207. So, the test passed with the newly added condition. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
junrao commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1597073119 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Given a message offset, find its corresponding offset metadata in the log. -* If the message offset is out of range, throw an OffsetOutOfRangeException +* 1. If the message offset is less than the log-start-offset (or) local-log-start-offset, then it returns the + * message-only metadata. +* 2. If the message offset is beyond the log-end-offset, then it returns the message-only metadata. +* 3. For all other cases, it returns the offset metadata from the log. */ - private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = { -checkLogStartOffset(offset) -localLog.convertToOffsetMetadataOrThrow(offset) + private[log] def convertToOffsetMetadata(offset: Long): LogOffsetMetadata = { Review Comment: convertToOffsetMetadata => maybeConvertToOffsetMetadata ? ## core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala: ## @@ -164,18 +169,71 @@ class DelayedFetchTest { assertTrue(delayedFetch.tryComplete()) assertTrue(delayedFetch.isCompleted) assertTrue(fetchResultOpt.isDefined) + +val fetchResult = fetchResultOpt.get +assertEquals(Errors.NONE, fetchResult.error) + } + + @ParameterizedTest(name = "testDelayedFetchWithInvalidHighWatermark minBytes={0}") + @ValueSource(ints = Array(1, 2)) + def testDelayedFetchWithInvalidHighWatermark(minBytes: Int): Unit = { +val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "topic") +val fetchOffset = 450L +val logStartOffset = 5L +val currentLeaderEpoch = Optional.of[Integer](10) +val replicaId = 1 + +val fetchStatus = FetchPartitionStatus( + startOffsetMetadata = new LogOffsetMetadata(fetchOffset), + fetchInfo = new FetchRequest.PartitionData(topicIdPartition.topicId, fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch)) +val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500, minBytes = minBytes) + +var fetchResultOpt: Option[FetchPartitionData] = None +def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = { + fetchResultOpt = Some(responses.head._2) +} + +val delayedFetch = new DelayedFetch( + params = fetchParams, + fetchPartitionStatus = Seq(topicIdPartition -> fetchStatus), + replicaManager = replicaManager, + quota = replicaQuota, + responseCallback = callback +) + +val partition: Partition = mock(classOf[Partition]) + when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition)).thenReturn(partition) +// high-watermark is lesser than the log-start-offset +val endOffsetMetadata = new LogOffsetMetadata(0L, 0L, 0) +when(partition.fetchOffsetSnapshot( + currentLeaderEpoch, + fetchOnlyFromLeader = true)) + .thenReturn(new LogOffsetSnapshot(0L, endOffsetMetadata, endOffsetMetadata, endOffsetMetadata)) +when(replicaManager.isAddingReplica(any(), anyInt())).thenReturn(false) +expectReadFromReplica(fetchParams, topicIdPartition, fetchStatus.fetchInfo, Errors.NONE) + +val expected = minBytes == 1 +assertEquals(expected, delayedFetch.tryComplete()) +assertEquals(expected, delayedFetch.isCompleted) Review Comment: This exposes an issue in delayedFetch. If HWM is less than fetchOffset, we haven't gained any bytes. So, we shouldn't complete the delayedFetch immediately. ## storage/src/test/java/org/apache/kafka/storage/internals/log/LogOffsetMetadataTest.java: ## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import org.apache.kafka.common.KafkaException; +import org.junit.jupiter.api.Test; + +import static org.apache.kafka.storage.internals.log.LogOffsetMetadata.UNKNOWN_OFFSET_METADATA; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.ju
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
jsancio commented on PR #15825: URL: https://github.com/apache/kafka/pull/15825#issuecomment-2105087021 @kamalcph, looks like a bug to me. The predicate should be `if (!hwm.messageOffsetOnly)` or the if/else blocks should be swapped. I suspect that we haven't noticed this bug in the KRaft implementation (`KafkaRaftClient`) because kraft never looks at the segment and byte position for the HWM. If you are going to fix this code, do you mind adding a test for this case? Since `KafkaMetadataLog` calls `UnifiedLog.fetchOffsetSnapshot`, `hwm.messageOffsetOnly` should always be false. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
kamalcph commented on PR #15825: URL: https://github.com/apache/kafka/pull/15825#issuecomment-2103912972 While going through the usages, it looks to me that the LogOffsetMetadata conversion happens in the KafkaMetadataLog is not correct. Could someone please double check? ``` org.apache.kafka.storage.internals.log.LogOffsetMetadata -> org.apache.kafka.raft.LogOffsetMetadata ``` https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala#L226 Question: Why do we make the org.apache.kafka.raft.LogOffsetMetadata#segmentPosition as empty when hwm.messageOffsetOnly is false? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
kamalcph commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1596278649 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Given a message offset, find its corresponding offset metadata in the log. -* If the message offset is out of range, throw an OffsetOutOfRangeException +* 1. If the message offset is lesser than the log-start-offset, then throw an OffsetOutOfRangeException Review Comment: Removed the `checkLogStartOffset` from the `convertToOffsetMetadataOrThrow` method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
kamalcph commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1596276757 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Given a message offset, find its corresponding offset metadata in the log. -* If the message offset is out of range, throw an OffsetOutOfRangeException +* 1. If the message offset is lesser than the log-start-offset, then throw an OffsetOutOfRangeException +* 2. If the message offset is lesser than the local-log-start-offset, then it returns the message-only metadata +* 3. If the message offset is greater than the log-end-offset, then it returns the message-only metadata */ - private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = { + private[log] def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = { Review Comment: This is a potential loop (not sure when it would be triggered), updated the logic to return the message-only metadata. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
junrao commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1596055964 ## core/src/main/scala/kafka/server/DelayedFetch.scala: ## @@ -92,7 +92,10 @@ class DelayedFetch( // has just rolled, then the high watermark offset will remain the same but be on the old segment, // which would incorrectly be seen as an instance of Case F. if (endOffset.messageOffset != fetchOffset.messageOffset) { - if (endOffset.onOlderSegment(fetchOffset)) { + if (endOffset.messageOffsetOnly() || fetchOffset.messageOffsetOnly()) { +// This case is to handle the stale high-watermark on the leader until it gets updated with the correct value Review Comment: Perhaps change to sth like the following. "If we don't know the position of the offset on log segments, just pessimistically assume that we only gained 1 byte. This can happen when the high watermark is stale, but should be rare." ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Given a message offset, find its corresponding offset metadata in the log. -* If the message offset is out of range, throw an OffsetOutOfRangeException +* 1. If the message offset is lesser than the log-start-offset, then throw an OffsetOutOfRangeException +* 2. If the message offset is lesser than the local-log-start-offset, then it returns the message-only metadata +* 3. If the message offset is greater than the log-end-offset, then it returns the message-only metadata */ - private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = { + private[log] def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = { Review Comment: `LocalLog.read()` also calls `convertToOffsetMetadataOrThrow`. ``` else if (startOffset > maxOffsetMetadata.messageOffset) emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), includeAbortedTxns) ``` It seems this could lead to infinite recursion. To avoid that, we could change the above code to avoid calling `convertToOffsetMetadataOrThrow` and return a message-only `LogOffsetMetadata` instead to `emptyFetchDataInfo`. ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Given a message offset, find its corresponding offset metadata in the log. -* If the message offset is out of range, throw an OffsetOutOfRangeException +* 1. If the message offset is lesser than the log-start-offset, then throw an OffsetOutOfRangeException Review Comment: Good point. Since we change the logic such that it's ok not to have the metadata for an offset, we could just skip the call to `checkLogStartOffset`. ## core/src/main/scala/kafka/server/DelayedFetch.scala: ## @@ -92,7 +92,10 @@ class DelayedFetch( // has just rolled, then the high watermark offset will remain the same but be on the old segment, // which would incorrectly be seen as an instance of Case F. if (endOffset.messageOffset != fetchOffset.messageOffset) { - if (endOffset.onOlderSegment(fetchOffset)) { + if (endOffset.messageOffsetOnly() || fetchOffset.messageOffsetOnly()) { Review Comment: `fetchOffset` typically shouldn't be message only. But it doesn't hurt to have the check. ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Given a message offset, find its corresponding offset metadata in the log. -* If the message offset is out of range, throw an OffsetOutOfRangeException +* 1. If the message offset is lesser than the log-start-offset, then throw an OffsetOutOfRangeException +* 2. If the message offset is lesser than the local-log-start-offset, then it returns the message-only metadata Review Comment: lesser => less -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
kamalcph commented on PR #15825: URL: https://github.com/apache/kafka/pull/15825#issuecomment-2103078761 Test failures are unrelated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
showuon commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1593525765 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Given a message offset, find its corresponding offset metadata in the log. -* If the message offset is out of range, throw an OffsetOutOfRangeException +* 1. If the message offset is lesser than the log-start-offset, then throw an OffsetOutOfRangeException Review Comment: > While updating the [UnifiedLog#updateLogStartOffset](https://sourcegraph.com/github.com/apache/kafka@5f933ac840343429e911f5706ccb7cd8dc379462/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L535), the HWM also gets updated and it doesn't hit the fetchHighWatermarkMetadata (or) convertToOffsetMetadataOrThrow so the call will succeed even when current-log-start-offset > old-HWM. For [UnifiedLog#updateLogStartOffset](https://sourcegraph.com/github.com/apache/kafka@5f933ac840343429e911f5706ccb7cd8dc379462/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L535), I think it's safe we didn't throw exception when `current-log-start-offset > old-HWM` because it will be called when initialization or `maybeIncrementLogStartOffset`. In the latter, we've checked `newLogStartOffset > logStartOffset` already. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
showuon commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1593511224 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Given a message offset, find its corresponding offset metadata in the log. -* If the message offset is out of range, throw an OffsetOutOfRangeException +* 1. If the message offset is lesser than the log-start-offset, then throw an OffsetOutOfRangeException Review Comment: Ah, OK. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
kamalcph commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1593471183 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Given a message offset, find its corresponding offset metadata in the log. -* If the message offset is out of range, throw an OffsetOutOfRangeException +* 1. If the message offset is lesser than the log-start-offset, then throw an OffsetOutOfRangeException Review Comment: Should we avoid throwing the error and return message-only metadata when the offset is lesser than the log-start-offset? While updating the [UnifiedLog#updateLogStartOffset](https://sourcegraph.com/github.com/apache/kafka@5f933ac840343429e911f5706ccb7cd8dc379462/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L535), the HWM also gets updated and it doesn't hit the `fetchHighWatermarkMetadata` (or) `convertToOffsetMetadataOrThrow` so the call will succeed even when current-log-start-offset > old-HWM. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
kamalcph commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1593471183 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Given a message offset, find its corresponding offset metadata in the log. -* If the message offset is out of range, throw an OffsetOutOfRangeException +* 1. If the message offset is lesser than the log-start-offset, then throw an OffsetOutOfRangeException Review Comment: Should we avoid throwing the error and return message-only metadata when the offset is lesser than the log-start-offset? While updating the [UnifiedLog#updateLogStartOffset](https://sourcegraph.com/github.com/apache/kafka@5f933ac840343429e911f5706ccb7cd8dc379462/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L535), the HWM also gets updated and it doesn't hit the `fetchHighWatermarkMetadata` (or) `convertToOffsetMetadataOrThrow` so the call will succeed even when log-start-offset > old-HWM. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
kamalcph commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1593471183 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Given a message offset, find its corresponding offset metadata in the log. -* If the message offset is out of range, throw an OffsetOutOfRangeException +* 1. If the message offset is lesser than the log-start-offset, then throw an OffsetOutOfRangeException Review Comment: Should we avoid throwing the error and return message-only metadata when the offset is lesser than the log-start-offset? While updating the [UnifiedLog#updateLogStartOffset](https://sourcegraph.com/github.com/apache/kafka@5f933ac840343429e911f5706ccb7cd8dc379462/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L535), the HWM also gets updated and it doesn't hit the `fetchHighWatermarkMetadata` (or) `convertToOffsetMetadataOrThrow` so the call will succeed even when log-start-offset > HWM. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
kamalcph commented on PR #15825: URL: https://github.com/apache/kafka/pull/15825#issuecomment-2099844734 > Thanks for the PR. One question: So when we temporarily set high-watermark as ` LogOffsetMetadata(0)` for the leader, we're waiting for the high-watermark gets updated after followers fetch from the leader, right? yes, the call to [maybeIncrementLeaderHW](https://sourcegraph.com/github.com/apache/kafka@5f933ac840343429e911f5706ccb7cd8dc379462/-/blob/core/src/main/scala/kafka/cluster/Partition.scala?L1166) will succeed when the node becomes leader for the partition. Note that if the current node is the only alive replica, then the high-watermark gets updated to the leader-log-end-offset. The behavior is same for both normal and remote-storage enabled topic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
kamalcph commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1593363337 ## core/src/main/scala/kafka/server/DelayedFetch.scala: ## @@ -92,7 +92,10 @@ class DelayedFetch( // has just rolled, then the high watermark offset will remain the same but be on the old segment, // which would incorrectly be seen as an instance of Case F. if (endOffset.messageOffset != fetchOffset.messageOffset) { - if (endOffset.onOlderSegment(fetchOffset)) { + if (endOffset.messageOffsetOnly() || fetchOffset.messageOffsetOnly()) { Review Comment: `fetchOffset` can be message-only metadata when there is a diverging-epoch. If there is a diverged-epoch in the LogReadResults, then it won't enter the DelayedFetch. We can remove the check if it is not required. ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Given a message offset, find its corresponding offset metadata in the log. -* If the message offset is out of range, throw an OffsetOutOfRangeException +* 1. If the message offset is lesser than the log-start-offset, then throw an OffsetOutOfRangeException Review Comment: `checkLogStartOffset` will throw `OffsetOutOfRangeException` if the offset is lesser than the `logStartOffset`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
showuon commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1593331212 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Given a message offset, find its corresponding offset metadata in the log. -* If the message offset is out of range, throw an OffsetOutOfRangeException +* 1. If the message offset is lesser than the log-start-offset, then throw an OffsetOutOfRangeException Review Comment: For case 1, it looks like we never throw `OffsetOutOfRangeException` now, doesn't it? ## core/src/main/scala/kafka/server/DelayedFetch.scala: ## @@ -92,7 +92,10 @@ class DelayedFetch( // has just rolled, then the high watermark offset will remain the same but be on the old segment, // which would incorrectly be seen as an instance of Case F. if (endOffset.messageOffset != fetchOffset.messageOffset) { - if (endOffset.onOlderSegment(fetchOffset)) { + if (endOffset.messageOffsetOnly() || fetchOffset.messageOffsetOnly()) { Review Comment: I can understand the `endOffset.messageOffsetOnly()` case since the leader's high watermark is still not updated. But when will `fetchOffset` be `messageOffsetOnly`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
kamalcph commented on PR #15825: URL: https://github.com/apache/kafka/pull/15825#issuecomment-2097869168 @junrao @satishd @chia7712 @showuon Updated the test plan in the summary. Verified that the patch fixes the issue by running the trunk and patched build. With the fix, the high-watermark value gets updated to the valid offset. Please take a look when you get chance. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
junrao commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1586835182 ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogOffsetMetadata.java: ## @@ -28,6 +28,7 @@ public final class LogOffsetMetadata { //TODO KAFKA-14484 remove once UnifiedLog has been moved to the storage module private static final long UNIFIED_LOG_UNKNOWN_OFFSET = -1L; +public static final long REMOTE_LOG_UNKNOWN_OFFSET = -2L; Review Comment: We probably don't need this. The existing UNIFIED_LOG_UNKNOWN_OFFSET should be enough. ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogOffsetMetadata.java: ## @@ -51,6 +57,8 @@ public LogOffsetMetadata(long messageOffset, // check if this offset is already on an older segment compared with the given offset public boolean onOlderSegment(LogOffsetMetadata that) { +if (this.segmentBaseOffset == REMOTE_LOG_UNKNOWN_OFFSET || that.segmentBaseOffset == REMOTE_LOG_UNKNOWN_OFFSET) Review Comment: We probably don't need this. If messageOffsetOnly() is true, we can just return false. ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogOffsetMetadata.java: ## @@ -65,6 +73,8 @@ private boolean onSameSegment(LogOffsetMetadata that) { // compute the number of bytes between this offset to the given offset // if they are on the same segment and this offset precedes the given offset public int positionDiff(LogOffsetMetadata that) { +if (this.segmentBaseOffset == REMOTE_LOG_UNKNOWN_OFFSET || that.segmentBaseOffset == REMOTE_LOG_UNKNOWN_OFFSET) +return 1; Review Comment: It's a bit hacky to do this here. I was thinking of doing this in DelayedFetch. ``` if (endOffset.messageOffset != fetchOffset.messageOffset) { if (endOffset.messageOnly() || fetchOffset.messageOnly()) { accumulatedSize += 1 } else if (endOffset.onOlderSegment(fetchOffset)) { // Case F, this can happen when the new fetch operation is on a truncated leader ... } ``` ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1428,7 +1428,11 @@ class UnifiedLog(@volatile var logStartOffset: Long, */ private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = { checkLogStartOffset(offset) -localLog.convertToOffsetMetadataOrThrow(offset) +if (remoteLogEnabled() && offset < localLogStartOffset()) { + new LogOffsetMetadata(offset, LogOffsetMetadata.REMOTE_LOG_UNKNOWN_OFFSET) +} else { + localLog.convertToOffsetMetadataOrThrow(offset) Review Comment: I was thinking that we could change localLog.convertToOffsetMetadataOrThrow() such that if read() throws an exception, it just returns a message-only LogOffsetMetadata. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
kamalcph opened a new pull request, #15825: URL: https://github.com/apache/kafka/pull/15825 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org