junrao commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1599161815
##########
core/src/main/scala/kafka/log/LocalLog.scala:
##########
@@ -370,11 +370,12 @@ class LocalLog(@volatile private var _dir: File,
throw new OffsetOutOfRangeException(s"Received request for offset
$startOffset for partition $topicPartition, " +
s"but we only have log segments upto $endOffset.")
- if (startOffset == maxOffsetMetadata.messageOffset)
+ if (startOffset == maxOffsetMetadata.messageOffset) {
emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns)
- else if (startOffset > maxOffsetMetadata.messageOffset)
- emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset),
includeAbortedTxns)
- else {
+ } else if (startOffset > maxOffsetMetadata.messageOffset) {
+ // Instead of converting the `startOffset` to metadata, returning
message-only metadata to avoid potential loop
+ emptyFetchDataInfo(new LogOffsetMetadata(startOffset),
includeAbortedTxns)
Review Comment:
Hmm, I thought the design of this PR is to allow maxOffsetMetadata to be
message-only in some of the rare cases, right?
##########
core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala:
##########
@@ -164,18 +169,71 @@ class DelayedFetchTest {
assertTrue(delayedFetch.tryComplete())
assertTrue(delayedFetch.isCompleted)
assertTrue(fetchResultOpt.isDefined)
+
+ val fetchResult = fetchResultOpt.get
+ assertEquals(Errors.NONE, fetchResult.error)
+ }
+
+ @ParameterizedTest(name = "testDelayedFetchWithInvalidHighWatermark
minBytes={0}")
+ @ValueSource(ints = Array(1, 2))
+ def testDelayedFetchWithInvalidHighWatermark(minBytes: Int): Unit = {
+ val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
+ val fetchOffset = 450L
+ val logStartOffset = 5L
+ val currentLeaderEpoch = Optional.of[Integer](10)
+ val replicaId = 1
+
+ val fetchStatus = FetchPartitionStatus(
+ startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
+ fetchInfo = new FetchRequest.PartitionData(topicIdPartition.topicId,
fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch))
+ val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500,
minBytes = minBytes)
+
+ var fetchResultOpt: Option[FetchPartitionData] = None
+ def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit
= {
+ fetchResultOpt = Some(responses.head._2)
+ }
+
+ val delayedFetch = new DelayedFetch(
+ params = fetchParams,
+ fetchPartitionStatus = Seq(topicIdPartition -> fetchStatus),
+ replicaManager = replicaManager,
+ quota = replicaQuota,
+ responseCallback = callback
+ )
+
+ val partition: Partition = mock(classOf[Partition])
+
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition)).thenReturn(partition)
+ // high-watermark is lesser than the log-start-offset
+ val endOffsetMetadata = new LogOffsetMetadata(0L, 0L, 0)
+ when(partition.fetchOffsetSnapshot(
+ currentLeaderEpoch,
+ fetchOnlyFromLeader = true))
+ .thenReturn(new LogOffsetSnapshot(0L, endOffsetMetadata,
endOffsetMetadata, endOffsetMetadata))
+ when(replicaManager.isAddingReplica(any(), anyInt())).thenReturn(false)
+ expectReadFromReplica(fetchParams, topicIdPartition,
fetchStatus.fetchInfo, Errors.NONE)
+
+ val expected = minBytes == 1
+ assertEquals(expected, delayedFetch.tryComplete())
+ assertEquals(expected, delayedFetch.isCompleted)
Review Comment:
Yes, I understand that the test passes as it is. I am just saying that the
logic in DelayedFetch is not consistent.
If the offset metadata is available, we accumulate bytes only `if
(fetchOffset.messageOffset < endOffset.messageOffset)`. To be consistent, we
need to do the same `if` test if the offset metadata is not available.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]