Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-27 Thread via GitHub


showuon merged PR #15825:
URL: https://github.com/apache/kafka/pull/15825


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-24 Thread via GitHub


junrao commented on PR #15825:
URL: https://github.com/apache/kafka/pull/15825#issuecomment-2130475760

   @showuon : Any other comments from you?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-24 Thread via GitHub


kamalcph commented on PR #15825:
URL: https://github.com/apache/kafka/pull/15825#issuecomment-2129976827

   Test failures are unrelated. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-23 Thread via GitHub


junrao commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1612186225


##
core/src/main/scala/kafka/server/DelayedFetch.scala:
##
@@ -103,7 +103,7 @@ class DelayedFetch(
 // We will not force complete the fetch request if a replica 
should be throttled.
 if (!params.isFromFollower || 
!replicaManager.shouldLeaderThrottle(quota, partition, params.replicaId))
   return forceComplete()
-  } else if (fetchOffset.messageOffset < endOffset.messageOffset) {
+  } else if (fetchOffset.onSameSegment(endOffset) && 
fetchOffset.messageOffset < endOffset.messageOffset) {

Review Comment:
   Hmm, we assume that `fetchOffset.messageOffset > endOffset.messageOffset` is 
the truncated leader case. In that case, we should always call 
`forceComplete()` immediately, right? The current code only calls 
`forceComplete()` when the offset metadata is available, but it's more 
consistent to do that regardless of the availability of the offset metadata. 
Should we do sth like the following?
   
   ```
   if (fetchOffset.messageOffset > endOffset.messageOffset) {
 // Case F, this can happen when the new fetch operation is on 
a truncated leader
 debug(s"Satisfying fetch $this since it is fetching later 
segments of partition $topicIdPartition.")
 return forceComplete()
   } else if (fetchOffset.messageOffset < endOffset.messageOffset) {
 if (fetchOffset.onOlderSegment(endOffset)) {
   // Case F, this can happen when the fetch operation is 
falling behind the current segment
   // or the partition has just rolled a new segment
   debug(s"Satisfying fetch $this immediately since it is 
fetching older segments.")
   // We will not force complete the fetch request if a replica 
should be throttled.
   if (!params.isFromFollower || 
!replicaManager.shouldLeaderThrottle(quota, partition, params.replicaId))
 return forceComplete()
 } else if (fetchOffset.onSameSegment(endOffset)) {
   // we take the partition fetch size as upper bound when 
accumulating the bytes (skip if a throttled partition)
   val bytesAvailable = 
math.min(endOffset.positionDiff(fetchOffset), fetchStatus.fetchInfo.maxBytes)
   if (!params.isFromFollower || 
!replicaManager.shouldLeaderThrottle(quota, partition, params.replicaId))
 accumulatedSize += bytesAvailable
 }
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-23 Thread via GitHub


junrao commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1612025298


##
core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala:
##
@@ -164,18 +169,71 @@ class DelayedFetchTest {
 assertTrue(delayedFetch.tryComplete())
 assertTrue(delayedFetch.isCompleted)
 assertTrue(fetchResultOpt.isDefined)
+
+val fetchResult = fetchResultOpt.get
+assertEquals(Errors.NONE, fetchResult.error)
+  }
+
+  @ParameterizedTest(name = "testDelayedFetchWithMessageOnlyHighWatermark 
endOffset={0}")
+  @ValueSource(longs = Array(0, 500))
+  def testDelayedFetchWithMessageOnlyHighWatermark(endOffset: Long): Unit = {
+val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
+val fetchOffset = 450L
+val logStartOffset = 5L
+val currentLeaderEpoch = Optional.of[Integer](10)
+val replicaId = 1
+
+val fetchStatus = FetchPartitionStatus(
+  startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
+  fetchInfo = new FetchRequest.PartitionData(topicIdPartition.topicId, 
fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch))
+val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500)
+
+var fetchResultOpt: Option[FetchPartitionData] = None
+def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit 
= {
+  fetchResultOpt = Some(responses.head._2)
+}
+
+val delayedFetch = new DelayedFetch(
+  params = fetchParams,
+  fetchPartitionStatus = Seq(topicIdPartition -> fetchStatus),
+  replicaManager = replicaManager,
+  quota = replicaQuota,
+  responseCallback = callback
+)
+
+val partition: Partition = mock(classOf[Partition])
+
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition)).thenReturn(partition)
+// Note that the high-watermark does not contain the complete metadata
+val endOffsetMetadata = new LogOffsetMetadata(endOffset, -1L, -1)
+when(partition.fetchOffsetSnapshot(
+  currentLeaderEpoch,
+  fetchOnlyFromLeader = true))
+  .thenReturn(new LogOffsetSnapshot(0L, endOffsetMetadata, 
endOffsetMetadata, endOffsetMetadata))
+when(replicaManager.isAddingReplica(any(), anyInt())).thenReturn(false)
+expectReadFromReplica(fetchParams, topicIdPartition, 
fetchStatus.fetchInfo, Errors.NONE)
+
+// 1. When `endOffset` is 0, it refers to the truncation case
+// 2. When `endOffset` is 500, it refers to the normal case
+val expected = endOffset == 0

Review Comment:
   Got it. We won't complete when `endOffset` is 500 because it doesn't contain 
offset metadata. But this is not the normal case. So, we want to adjust "it 
refers to the normal case" accordingly.



##
core/src/main/scala/kafka/server/DelayedFetch.scala:
##
@@ -91,19 +91,23 @@ class DelayedFetch(
 // Go directly to the check for Case G if the message offsets are 
the same. If the log segment
 // has just rolled, then the high watermark offset will remain the 
same but be on the old segment,
 // which would incorrectly be seen as an instance of Case F.
-if (endOffset.messageOffset != fetchOffset.messageOffset) {
-  if (endOffset.onOlderSegment(fetchOffset)) {
-// Case F, this can happen when the new fetch operation is on 
a truncated leader
-debug(s"Satisfying fetch $this since it is fetching later 
segments of partition $topicIdPartition.")
-return forceComplete()
+if (fetchOffset.messageOffset > endOffset.messageOffset) {
+  // Case F, this can happen when the new fetch operation is on a 
truncated leader
+  debug(s"Satisfying fetch $this since it is fetching later 
segments of partition $topicIdPartition.")
+  return forceComplete()
+} else if (fetchOffset.messageOffset < endOffset.messageOffset) {
+  if (fetchOffset.messageOffsetOnly() || 
endOffset.messageOffsetOnly()) {

Review Comment:
   Yes, since `onOlderSegment()` and `onSameSegment()` do the 
`messageOffsetOnly()` check already. We could just get rid of this condition.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-23 Thread via GitHub


showuon commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1611591188


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogOffsetMetadata.java:
##
@@ -51,23 +50,24 @@ public LogOffsetMetadata(long messageOffset,
 
 // check if this offset is already on an older segment compared with the 
given offset
 public boolean onOlderSegment(LogOffsetMetadata that) {
-if (messageOffsetOnly())
-throw new KafkaException(this + " cannot compare its segment info 
with " + that + " since it only has message offset info");
-
+if (messageOffsetOnly() || that.messageOffsetOnly())
+return false;
 return this.segmentBaseOffset < that.segmentBaseOffset;
 }
 
 // check if this offset is on the same segment with the given offset
-private boolean onSameSegment(LogOffsetMetadata that) {
+public boolean onSameSegment(LogOffsetMetadata that) {
+if (messageOffsetOnly() || that.messageOffsetOnly())
+return false;
 return this.segmentBaseOffset == that.segmentBaseOffset;
 }
 
 // compute the number of bytes between this offset to the given offset
 // if they are on the same segment and this offset precedes the given 
offset
 public int positionDiff(LogOffsetMetadata that) {
-if (messageOffsetOnly())
+if (messageOffsetOnly() || that.messageOffsetOnly())
 throw new KafkaException(this + " cannot compare its segment 
position with " + that + " since it only has message offset info");
-if (!onSameSegment(that))
+if (this.segmentBaseOffset != that.segmentBaseOffset)

Review Comment:
   Fair enough!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-23 Thread via GitHub


kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1611570051


##
core/src/main/scala/kafka/server/DelayedFetch.scala:
##
@@ -91,19 +91,23 @@ class DelayedFetch(
 // Go directly to the check for Case G if the message offsets are 
the same. If the log segment
 // has just rolled, then the high watermark offset will remain the 
same but be on the old segment,
 // which would incorrectly be seen as an instance of Case F.
-if (endOffset.messageOffset != fetchOffset.messageOffset) {
-  if (endOffset.onOlderSegment(fetchOffset)) {
-// Case F, this can happen when the new fetch operation is on 
a truncated leader
-debug(s"Satisfying fetch $this since it is fetching later 
segments of partition $topicIdPartition.")
-return forceComplete()
+if (fetchOffset.messageOffset > endOffset.messageOffset) {
+  // Case F, this can happen when the new fetch operation is on a 
truncated leader
+  debug(s"Satisfying fetch $this since it is fetching later 
segments of partition $topicIdPartition.")
+  return forceComplete()
+} else if (fetchOffset.messageOffset < endOffset.messageOffset) {
+  if (fetchOffset.messageOffsetOnly() || 
endOffset.messageOffsetOnly()) {

Review Comment:
   With the latest changes in LogOffsetMetadata, it seems we only have to add 
one check in 
[DelayedFetch.scala#106](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/DelayedFetch.scala#L106)
 compared to trunk, that would suffice:
   
   ```
   else if (fetchOffset.onSameSegment(endOffset) && fetchOffset.messageOffset < 
endOffset.messageOffset) 
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-23 Thread via GitHub


kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1611559594


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogOffsetMetadata.java:
##
@@ -51,23 +50,24 @@ public LogOffsetMetadata(long messageOffset,
 
 // check if this offset is already on an older segment compared with the 
given offset
 public boolean onOlderSegment(LogOffsetMetadata that) {
-if (messageOffsetOnly())
-throw new KafkaException(this + " cannot compare its segment info 
with " + that + " since it only has message offset info");
-
+if (messageOffsetOnly() || that.messageOffsetOnly())
+return false;
 return this.segmentBaseOffset < that.segmentBaseOffset;
 }
 
 // check if this offset is on the same segment with the given offset
-private boolean onSameSegment(LogOffsetMetadata that) {
+public boolean onSameSegment(LogOffsetMetadata that) {
+if (messageOffsetOnly() || that.messageOffsetOnly())
+return false;
 return this.segmentBaseOffset == that.segmentBaseOffset;
 }
 
 // compute the number of bytes between this offset to the given offset
 // if they are on the same segment and this offset precedes the given 
offset
 public int positionDiff(LogOffsetMetadata that) {
-if (messageOffsetOnly())
+if (messageOffsetOnly() || that.messageOffsetOnly())
 throw new KafkaException(this + " cannot compare its segment 
position with " + that + " since it only has message offset info");
-if (!onSameSegment(that))
+if (this.segmentBaseOffset != that.segmentBaseOffset)

Review Comment:
   We already performed the `messageOffsetOnly` check, so tried to avoid it. 
(micro-optimization, reverted it as the cost is trivial)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-23 Thread via GitHub


showuon commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1611046740


##
core/src/test/scala/unit/kafka/log/LogSegmentTest.scala:
##
@@ -144,6 +144,35 @@ class LogSegmentTest {
 checkEquals(ms2.records.iterator, read.records.records.iterator)
   }
 
+  @Test
+  def testReadWhenNoMaxPosition(): Unit = {
+val maxPosition: Optional[java.lang.Long] = Optional.empty()
+val maxSize = 1
+val seg = createSegment(40)
+val ms = records(50, "hello", "there")
+seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms)
+for (minOneMessage <- Array(true, false)) {

Review Comment:
   Could we use `@ParameterizedTest` to test it? Otherwise, if the test fail, 
it's not clear which case causing the failure.
   ```
 @ParameterizedTest
 @ValueSource(booleans = Array(true, false))
   ```



##
core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala:
##
@@ -4211,6 +4211,46 @@ class UnifiedLogTest {
 assertEquals(31, log.localLogStartOffset())
   }
 
+  @Test
+  def testConvertToOffsetMetadataDoesNotThrowOffsetOutOfRangeError(): Unit = {
+val logConfig = LogTestUtils.createLogConfig(localRetentionBytes = 1, 
fileDeleteDelayMs = 0, remoteLogStorageEnable = true)
+val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true)
+
+var offset = 0L
+for(_ <- 0 until 50) {
+  val records = TestUtils.singletonRecords("test".getBytes())
+  val info = log.appendAsLeader(records, leaderEpoch = 0)
+  offset = info.lastOffset
+  if (offset != 0 && offset % 10 == 0)
+log.roll()
+}
+assertEquals(5, log.logSegments.size)
+log.updateHighWatermark(log.logEndOffset)
+// simulate calls to upload 3 segments to remote storage
+log.updateHighestOffsetInRemoteStorage(30)
+
+log.deleteOldSegments()
+assertEquals(2, log.logSegments.size())
+assertEquals(0, log.logStartOffset)
+assertEquals(31, log.localLogStartOffset())
+
+log.updateLogStartOffsetFromRemoteTier(15)
+assertEquals(15, log.logStartOffset)
+
+// case-1: offset is higher than the local-log-start-offset.
+// log-start-offset < local-log-start-offset < offset-to-be-converted < 
log-end-offset
+assertEquals(new LogOffsetMetadata(35, 31, 288), 
log.maybeConvertToOffsetMetadata(35))
+// case-2: offset is lesser than the local-log-start-offset

Review Comment:
   less than



##
core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala:
##
@@ -4211,6 +4211,46 @@ class UnifiedLogTest {
 assertEquals(31, log.localLogStartOffset())
   }
 
+  @Test
+  def testConvertToOffsetMetadataDoesNotThrowOffsetOutOfRangeError(): Unit = {
+val logConfig = LogTestUtils.createLogConfig(localRetentionBytes = 1, 
fileDeleteDelayMs = 0, remoteLogStorageEnable = true)
+val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true)
+
+var offset = 0L
+for(_ <- 0 until 50) {
+  val records = TestUtils.singletonRecords("test".getBytes())
+  val info = log.appendAsLeader(records, leaderEpoch = 0)
+  offset = info.lastOffset
+  if (offset != 0 && offset % 10 == 0)
+log.roll()
+}
+assertEquals(5, log.logSegments.size)
+log.updateHighWatermark(log.logEndOffset)
+// simulate calls to upload 3 segments to remote storage
+log.updateHighestOffsetInRemoteStorage(30)
+
+log.deleteOldSegments()
+assertEquals(2, log.logSegments.size())
+assertEquals(0, log.logStartOffset)
+assertEquals(31, log.localLogStartOffset())
+
+log.updateLogStartOffsetFromRemoteTier(15)
+assertEquals(15, log.logStartOffset)
+
+// case-1: offset is higher than the local-log-start-offset.
+// log-start-offset < local-log-start-offset < offset-to-be-converted < 
log-end-offset
+assertEquals(new LogOffsetMetadata(35, 31, 288), 
log.maybeConvertToOffsetMetadata(35))
+// case-2: offset is lesser than the local-log-start-offset
+// log-start-offset < offset-to-be-converted < local-log-start-offset < 
log-end-offset
+assertEquals(new LogOffsetMetadata(29, -1L, -1), 
log.maybeConvertToOffsetMetadata(29))
+// case-3: offset is higher than the log-end-offset
+// log-start-offset < local-log-start-offset < log-end-offset < 
offset-to-be-converted
+assertEquals(new LogOffsetMetadata(log.logEndOffset + 1, -1L, -1), 
log.maybeConvertToOffsetMetadata(log.logEndOffset + 1))
+// case-4: offset is lesser than the log-start-offset

Review Comment:
   nit: lesser than -> less than



##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogOffsetMetadata.java:
##
@@ -51,23 +50,24 @@ public LogOffsetMetadata(long messageOffset,
 
 // check if this offset is already on an older segment compared with the 
given offset
 public boolean onOlderSegment(LogOffsetMetadata that) {
-if (messageOffsetOnly())
-throw new KafkaException(this + " cannot compare its segment info 

Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-22 Thread via GitHub


kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1610893769


##
core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala:
##
@@ -164,18 +169,71 @@ class DelayedFetchTest {
 assertTrue(delayedFetch.tryComplete())
 assertTrue(delayedFetch.isCompleted)
 assertTrue(fetchResultOpt.isDefined)
+
+val fetchResult = fetchResultOpt.get
+assertEquals(Errors.NONE, fetchResult.error)
+  }
+
+  @ParameterizedTest(name = "testDelayedFetchWithMessageOnlyHighWatermark 
endOffset={0}")
+  @ValueSource(longs = Array(0, 500))
+  def testDelayedFetchWithMessageOnlyHighWatermark(endOffset: Long): Unit = {
+val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
+val fetchOffset = 450L
+val logStartOffset = 5L
+val currentLeaderEpoch = Optional.of[Integer](10)
+val replicaId = 1
+
+val fetchStatus = FetchPartitionStatus(
+  startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
+  fetchInfo = new FetchRequest.PartitionData(topicIdPartition.topicId, 
fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch))
+val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500)
+
+var fetchResultOpt: Option[FetchPartitionData] = None
+def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit 
= {
+  fetchResultOpt = Some(responses.head._2)
+}
+
+val delayedFetch = new DelayedFetch(
+  params = fetchParams,
+  fetchPartitionStatus = Seq(topicIdPartition -> fetchStatus),
+  replicaManager = replicaManager,
+  quota = replicaQuota,
+  responseCallback = callback
+)
+
+val partition: Partition = mock(classOf[Partition])
+
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition)).thenReturn(partition)
+// Note that the high-watermark does not contain the complete metadata
+val endOffsetMetadata = new LogOffsetMetadata(endOffset, -1L, -1)
+when(partition.fetchOffsetSnapshot(
+  currentLeaderEpoch,
+  fetchOnlyFromLeader = true))
+  .thenReturn(new LogOffsetSnapshot(0L, endOffsetMetadata, 
endOffsetMetadata, endOffsetMetadata))
+when(replicaManager.isAddingReplica(any(), anyInt())).thenReturn(false)
+expectReadFromReplica(fetchParams, topicIdPartition, 
fetchStatus.fetchInfo, Errors.NONE)
+
+// 1. When `endOffset` is 0, it refers to the truncation case
+// 2. When `endOffset` is 500, it refers to the normal case
+val expected = endOffset == 0

Review Comment:
   when fetchOffset > endOffset, `forceComplete` gets called. In this case. 450 
> 0, so it triggers force completion.
   
   This is the only behavior change post the refactor. Previously when 
fetchOffset > endOffset:
   
   1. If the offsets lie on the same segment, then we wait for the end-offset 
to move (or) timeout the request. 
   2. If the endOffset lie on the older segment compared to fetch-offset, then 
we complete the request by calling force-complete. 
   
   We can retain the same behavior if required.
   
   ```
   if (fetchOffset.messageOffset > endOffset.messageOffset) {
 if (endOffset.onOlderSegment(fetchOffset)) {
   // Case F, this can happen when the new fetch operation is on a 
truncated leader
   debug(s"Satisfying fetch $this since it is fetching later segments of 
partition $topicIdPartition.")
   return forceComplete()
 }
   } else if (fetchOffset.messageOffset < endOffset.messageOffset) {
  ...
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-22 Thread via GitHub


kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1610893769


##
core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala:
##
@@ -164,18 +169,71 @@ class DelayedFetchTest {
 assertTrue(delayedFetch.tryComplete())
 assertTrue(delayedFetch.isCompleted)
 assertTrue(fetchResultOpt.isDefined)
+
+val fetchResult = fetchResultOpt.get
+assertEquals(Errors.NONE, fetchResult.error)
+  }
+
+  @ParameterizedTest(name = "testDelayedFetchWithMessageOnlyHighWatermark 
endOffset={0}")
+  @ValueSource(longs = Array(0, 500))
+  def testDelayedFetchWithMessageOnlyHighWatermark(endOffset: Long): Unit = {
+val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
+val fetchOffset = 450L
+val logStartOffset = 5L
+val currentLeaderEpoch = Optional.of[Integer](10)
+val replicaId = 1
+
+val fetchStatus = FetchPartitionStatus(
+  startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
+  fetchInfo = new FetchRequest.PartitionData(topicIdPartition.topicId, 
fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch))
+val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500)
+
+var fetchResultOpt: Option[FetchPartitionData] = None
+def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit 
= {
+  fetchResultOpt = Some(responses.head._2)
+}
+
+val delayedFetch = new DelayedFetch(
+  params = fetchParams,
+  fetchPartitionStatus = Seq(topicIdPartition -> fetchStatus),
+  replicaManager = replicaManager,
+  quota = replicaQuota,
+  responseCallback = callback
+)
+
+val partition: Partition = mock(classOf[Partition])
+
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition)).thenReturn(partition)
+// Note that the high-watermark does not contain the complete metadata
+val endOffsetMetadata = new LogOffsetMetadata(endOffset, -1L, -1)
+when(partition.fetchOffsetSnapshot(
+  currentLeaderEpoch,
+  fetchOnlyFromLeader = true))
+  .thenReturn(new LogOffsetSnapshot(0L, endOffsetMetadata, 
endOffsetMetadata, endOffsetMetadata))
+when(replicaManager.isAddingReplica(any(), anyInt())).thenReturn(false)
+expectReadFromReplica(fetchParams, topicIdPartition, 
fetchStatus.fetchInfo, Errors.NONE)
+
+// 1. When `endOffset` is 0, it refers to the truncation case
+// 2. When `endOffset` is 500, it refers to the normal case
+val expected = endOffset == 0

Review Comment:
   when fetchOffset > endOffset, `forceComplete` gets called. In this case. 450 
> 0, so it triggers force completion.
   
   This is the only behavior change post the refactor. Previously when 
fetchOffset > endOffset:
   
   1. If the offsets lie on the same segment, then we wait for the end-offset 
to move (or) timeout the request. 
   2. If they lies on the different segment, then we complete the request by 
calling force-complete. 
   
   We can retain the same behavior if required.
   
   ```
   if (fetchOffset.messageOffset > endOffset.messageOffset) {
 if (endOffset.onOlderSegment(fetchOffset)) {
   // Case F, this can happen when the new fetch operation is on a 
truncated leader
   debug(s"Satisfying fetch $this since it is fetching later segments of 
partition $topicIdPartition.")
   return forceComplete()
 }
   } else if (fetchOffset.messageOffset < endOffset.messageOffset) {
  ...
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-22 Thread via GitHub


kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1610895466


##
core/src/main/scala/kafka/server/DelayedFetch.scala:
##
@@ -91,19 +91,23 @@ class DelayedFetch(
 // Go directly to the check for Case G if the message offsets are 
the same. If the log segment
 // has just rolled, then the high watermark offset will remain the 
same but be on the old segment,
 // which would incorrectly be seen as an instance of Case F.
-if (endOffset.messageOffset != fetchOffset.messageOffset) {
-  if (endOffset.onOlderSegment(fetchOffset)) {
-// Case F, this can happen when the new fetch operation is on 
a truncated leader
-debug(s"Satisfying fetch $this since it is fetching later 
segments of partition $topicIdPartition.")
-return forceComplete()
+if (fetchOffset.messageOffset > endOffset.messageOffset) {
+  // Case F, this can happen when the new fetch operation is on a 
truncated leader
+  debug(s"Satisfying fetch $this since it is fetching later 
segments of partition $topicIdPartition.")
+  return forceComplete()
+} else if (fetchOffset.messageOffset < endOffset.messageOffset) {
+  if (fetchOffset.messageOffsetOnly() || 
endOffset.messageOffsetOnly()) {

Review Comment:
   Not clear on this one. The current `if` checks are easy to read, we can add 
one debug log to avoid the empty `if` block.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-22 Thread via GitHub


kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1610893769


##
core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala:
##
@@ -164,18 +169,71 @@ class DelayedFetchTest {
 assertTrue(delayedFetch.tryComplete())
 assertTrue(delayedFetch.isCompleted)
 assertTrue(fetchResultOpt.isDefined)
+
+val fetchResult = fetchResultOpt.get
+assertEquals(Errors.NONE, fetchResult.error)
+  }
+
+  @ParameterizedTest(name = "testDelayedFetchWithMessageOnlyHighWatermark 
endOffset={0}")
+  @ValueSource(longs = Array(0, 500))
+  def testDelayedFetchWithMessageOnlyHighWatermark(endOffset: Long): Unit = {
+val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
+val fetchOffset = 450L
+val logStartOffset = 5L
+val currentLeaderEpoch = Optional.of[Integer](10)
+val replicaId = 1
+
+val fetchStatus = FetchPartitionStatus(
+  startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
+  fetchInfo = new FetchRequest.PartitionData(topicIdPartition.topicId, 
fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch))
+val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500)
+
+var fetchResultOpt: Option[FetchPartitionData] = None
+def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit 
= {
+  fetchResultOpt = Some(responses.head._2)
+}
+
+val delayedFetch = new DelayedFetch(
+  params = fetchParams,
+  fetchPartitionStatus = Seq(topicIdPartition -> fetchStatus),
+  replicaManager = replicaManager,
+  quota = replicaQuota,
+  responseCallback = callback
+)
+
+val partition: Partition = mock(classOf[Partition])
+
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition)).thenReturn(partition)
+// Note that the high-watermark does not contain the complete metadata
+val endOffsetMetadata = new LogOffsetMetadata(endOffset, -1L, -1)
+when(partition.fetchOffsetSnapshot(
+  currentLeaderEpoch,
+  fetchOnlyFromLeader = true))
+  .thenReturn(new LogOffsetSnapshot(0L, endOffsetMetadata, 
endOffsetMetadata, endOffsetMetadata))
+when(replicaManager.isAddingReplica(any(), anyInt())).thenReturn(false)
+expectReadFromReplica(fetchParams, topicIdPartition, 
fetchStatus.fetchInfo, Errors.NONE)
+
+// 1. When `endOffset` is 0, it refers to the truncation case
+// 2. When `endOffset` is 500, it refers to the normal case
+val expected = endOffset == 0

Review Comment:
   when fetchOffset > endOffset, `forceComplete` gets called. In this case. 450 
> 0, so it triggers force completion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-22 Thread via GitHub


junrao commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1610572095


##
core/src/main/scala/kafka/server/DelayedFetch.scala:
##
@@ -91,19 +91,23 @@ class DelayedFetch(
 // Go directly to the check for Case G if the message offsets are 
the same. If the log segment
 // has just rolled, then the high watermark offset will remain the 
same but be on the old segment,
 // which would incorrectly be seen as an instance of Case F.
-if (endOffset.messageOffset != fetchOffset.messageOffset) {
-  if (endOffset.onOlderSegment(fetchOffset)) {
-// Case F, this can happen when the new fetch operation is on 
a truncated leader
-debug(s"Satisfying fetch $this since it is fetching later 
segments of partition $topicIdPartition.")
-return forceComplete()
+if (fetchOffset.messageOffset > endOffset.messageOffset) {
+  // Case F, this can happen when the new fetch operation is on a 
truncated leader
+  debug(s"Satisfying fetch $this since it is fetching later 
segments of partition $topicIdPartition.")
+  return forceComplete()
+} else if (fetchOffset.messageOffset < endOffset.messageOffset) {
+  if (fetchOffset.messageOffsetOnly() || 
endOffset.messageOffsetOnly()) {

Review Comment:
   Could we fold this into the condition above?



##
core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala:
##
@@ -164,18 +169,71 @@ class DelayedFetchTest {
 assertTrue(delayedFetch.tryComplete())
 assertTrue(delayedFetch.isCompleted)
 assertTrue(fetchResultOpt.isDefined)
+
+val fetchResult = fetchResultOpt.get
+assertEquals(Errors.NONE, fetchResult.error)
+  }
+
+  @ParameterizedTest(name = "testDelayedFetchWithMessageOnlyHighWatermark 
endOffset={0}")
+  @ValueSource(longs = Array(0, 500))
+  def testDelayedFetchWithMessageOnlyHighWatermark(endOffset: Long): Unit = {
+val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
+val fetchOffset = 450L
+val logStartOffset = 5L
+val currentLeaderEpoch = Optional.of[Integer](10)
+val replicaId = 1
+
+val fetchStatus = FetchPartitionStatus(
+  startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
+  fetchInfo = new FetchRequest.PartitionData(topicIdPartition.topicId, 
fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch))
+val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500)
+
+var fetchResultOpt: Option[FetchPartitionData] = None
+def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit 
= {
+  fetchResultOpt = Some(responses.head._2)
+}
+
+val delayedFetch = new DelayedFetch(
+  params = fetchParams,
+  fetchPartitionStatus = Seq(topicIdPartition -> fetchStatus),
+  replicaManager = replicaManager,
+  quota = replicaQuota,
+  responseCallback = callback
+)
+
+val partition: Partition = mock(classOf[Partition])
+
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition)).thenReturn(partition)
+// Note that the high-watermark does not contain the complete metadata
+val endOffsetMetadata = new LogOffsetMetadata(endOffset, -1L, -1)
+when(partition.fetchOffsetSnapshot(
+  currentLeaderEpoch,
+  fetchOnlyFromLeader = true))
+  .thenReturn(new LogOffsetSnapshot(0L, endOffsetMetadata, 
endOffsetMetadata, endOffsetMetadata))
+when(replicaManager.isAddingReplica(any(), anyInt())).thenReturn(false)
+expectReadFromReplica(fetchParams, topicIdPartition, 
fetchStatus.fetchInfo, Errors.NONE)
+
+// 1. When `endOffset` is 0, it refers to the truncation case
+// 2. When `endOffset` is 500, it refers to the normal case
+val expected = endOffset == 0

Review Comment:
   Hmm, in both cases, we are not forcing the completion of the delayed fetch, 
right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-22 Thread via GitHub


junrao commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1610452888


##
core/src/main/scala/kafka/server/DelayedFetch.scala:
##
@@ -91,19 +91,24 @@ class DelayedFetch(
 // Go directly to the check for Case G if the message offsets are 
the same. If the log segment
 // has just rolled, then the high watermark offset will remain the 
same but be on the old segment,
 // which would incorrectly be seen as an instance of Case F.
-if (endOffset.messageOffset != fetchOffset.messageOffset) {
-  if (endOffset.onOlderSegment(fetchOffset)) {
-// Case F, this can happen when the new fetch operation is on 
a truncated leader
-debug(s"Satisfying fetch $this since it is fetching later 
segments of partition $topicIdPartition.")
-return forceComplete()
+if (fetchOffset.messageOffset > endOffset.messageOffset) {
+  // Case F, this can happen when the new fetch operation is on a 
truncated leader
+  debug(s"Satisfying fetch $this since it is fetching later 
segments of partition $topicIdPartition.")
+  return forceComplete()
+} else if (fetchOffset.messageOffset < endOffset.messageOffset) {
+  if (endOffset.messageOffsetOnly() || 
fetchOffset.messageOffsetOnly()) {
+// If we don't know the position of the offset on log 
segments, just pessimistically assume that we
+// only gained 1 byte when fetchOffset < endOffset, otherwise 
do nothing. This can happen when the
+// high-watermark is stale, but should be rare.
+accumulatedSize += 1

Review Comment:
   Yes, in that case, we just wait until endOffset moves to within the local 
log segments. If the timeout hits, we will just return empty.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-22 Thread via GitHub


kamalcph commented on PR #15825:
URL: https://github.com/apache/kafka/pull/15825#issuecomment-2125471280

   @junrao 
   
   Thanks for the review! Addressed all your comments. PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-22 Thread via GitHub


kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1610404165


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java:
##
@@ -445,7 +445,7 @@ public FetchDataInfo read(long startOffset, int maxSize, 
long maxPosition, boole
 adjustedMaxSize = Math.max(maxSize, startOffsetAndSize.size);
 
 // return a log segment but with zero size in the case below
-if (adjustedMaxSize == 0)
+if (adjustedMaxSize == 0 || maxPosition == -1)

Review Comment:
   LogSegment is in Java and using Optional as a method parameter in 
LogSegment#read shows warnings in the intelliJ and require some refactoring 
(passing `int` for `long` parameter gets converted implicitly but not with 
optional), so went with the negative value approach:
   
   ```
   'Optional' used as type for parameter 'maxPosition' 
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-22 Thread via GitHub


kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1610404165


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java:
##
@@ -445,7 +445,7 @@ public FetchDataInfo read(long startOffset, int maxSize, 
long maxPosition, boole
 adjustedMaxSize = Math.max(maxSize, startOffsetAndSize.size);
 
 // return a log segment but with zero size in the case below
-if (adjustedMaxSize == 0)
+if (adjustedMaxSize == 0 || maxPosition == -1)

Review Comment:
   LogSegment is in Java and using Optional as a method parameter in 
LogSegment#read shows warnings in the intelliJ and require some refactoring 
(passing `int` for `long` parameter gets converted implicitly not with 
optional), so went with the negative value approach:
   
   ```
   'Optional' used as type for parameter 'maxPosition' 
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-22 Thread via GitHub


kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1610404165


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java:
##
@@ -445,7 +445,7 @@ public FetchDataInfo read(long startOffset, int maxSize, 
long maxPosition, boole
 adjustedMaxSize = Math.max(maxSize, startOffsetAndSize.size);
 
 // return a log segment but with zero size in the case below
-if (adjustedMaxSize == 0)
+if (adjustedMaxSize == 0 || maxPosition == -1)

Review Comment:
   LogSegment is in Java and using Optional as a method parameter in 
LogSegment#read shows warnings in the intelliJ and require some refactoring, so 
went with the negative value approach:
   
   ```
   'Optional' used as type for parameter 'maxPosition' 
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-22 Thread via GitHub


kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1610401875


##
core/src/main/scala/kafka/server/DelayedFetch.scala:
##
@@ -91,19 +91,24 @@ class DelayedFetch(
 // Go directly to the check for Case G if the message offsets are 
the same. If the log segment
 // has just rolled, then the high watermark offset will remain the 
same but be on the old segment,
 // which would incorrectly be seen as an instance of Case F.
-if (endOffset.messageOffset != fetchOffset.messageOffset) {
-  if (endOffset.onOlderSegment(fetchOffset)) {
-// Case F, this can happen when the new fetch operation is on 
a truncated leader
-debug(s"Satisfying fetch $this since it is fetching later 
segments of partition $topicIdPartition.")
-return forceComplete()
+if (fetchOffset.messageOffset > endOffset.messageOffset) {
+  // Case F, this can happen when the new fetch operation is on a 
truncated leader
+  debug(s"Satisfying fetch $this since it is fetching later 
segments of partition $topicIdPartition.")
+  return forceComplete()
+} else if (fetchOffset.messageOffset < endOffset.messageOffset) {
+  if (endOffset.messageOffsetOnly() || 
fetchOffset.messageOffsetOnly()) {
+// If we don't know the position of the offset on log 
segments, just pessimistically assume that we
+// only gained 1 byte when fetchOffset < endOffset, otherwise 
do nothing. This can happen when the
+// high-watermark is stale, but should be rare.
+accumulatedSize += 1

Review Comment:
   To confirm, Should we avoid accumulating the 1 byte when fetchOffset < 
endOffset? The FETCH request will be parked in the purgatory for 500 ms, I 
don't see any issues with it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-22 Thread via GitHub


kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1610399596


##
core/src/test/scala/unit/kafka/log/LogSegmentTest.scala:
##
@@ -143,6 +143,39 @@ class LogSegmentTest {
 checkEquals(ms2.records.iterator, read.records.records.iterator)
   }
 
+  @Test
+  def testReadWhenNoMaxPosition(): Unit = {
+val maxPosition = -1
+val maxSize = 1
+val seg = createSegment(40)
+val ms = records(50, "hello", "there")
+seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms)
+for (minOneMessage <- Array(true, false)) {
+  // read before first offset
+  var read = seg.read(48, maxSize, maxPosition, minOneMessage)

Review Comment:
   yes, when maxSize is negative (-1), then the following error will be thrown:
   
   ```
   java.lang.IllegalArgumentException: Invalid max size -1 for log read from 
segment FileRecords(size=78, 
file=/var/folders/bq/w6tnkbq964q8sqpvj4fbmjr4gq/T/kafka-18165220027855018994/0040.log,
 start=0, end=2147483647)
at 
org.apache.kafka.storage.internals.log.LogSegment.read(LogSegment.java:432)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-22 Thread via GitHub


kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1610399596


##
core/src/test/scala/unit/kafka/log/LogSegmentTest.scala:
##
@@ -143,6 +143,39 @@ class LogSegmentTest {
 checkEquals(ms2.records.iterator, read.records.records.iterator)
   }
 
+  @Test
+  def testReadWhenNoMaxPosition(): Unit = {
+val maxPosition = -1
+val maxSize = 1
+val seg = createSegment(40)
+val ms = records(50, "hello", "there")
+seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms)
+for (minOneMessage <- Array(true, false)) {
+  // read before first offset
+  var read = seg.read(48, maxSize, maxPosition, minOneMessage)

Review Comment:
   yes, when maxSize is 1, the following error will be thrown:
   
   ```
   java.lang.IllegalArgumentException: Invalid max size -1 for log read from 
segment FileRecords(size=78, 
file=/var/folders/bq/w6tnkbq964q8sqpvj4fbmjr4gq/T/kafka-18165220027855018994/0040.log,
 start=0, end=2147483647)
at 
org.apache.kafka.storage.internals.log.LogSegment.read(LogSegment.java:432)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-22 Thread via GitHub


junrao commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1610310799


##
core/src/test/scala/unit/kafka/log/LogSegmentTest.scala:
##
@@ -143,6 +143,39 @@ class LogSegmentTest {
 checkEquals(ms2.records.iterator, read.records.records.iterator)
   }
 
+  @Test
+  def testReadWhenNoMaxPosition(): Unit = {
+val maxPosition = -1
+val maxSize = 1
+val seg = createSegment(40)
+val ms = records(50, "hello", "there")
+seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms)
+for (minOneMessage <- Array(true, false)) {
+  // read before first offset
+  var read = seg.read(48, maxSize, maxPosition, minOneMessage)

Review Comment:
   Hmm, maxSize=-1 is an invalid input, right?



##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
 * Given a message offset, find its corresponding offset metadata in the 
log.
-* If the message offset is out of range, throw an OffsetOutOfRangeException
+* 1. If the message offset is lesser than the log-start-offset, then throw 
an OffsetOutOfRangeException
+* 2. If the message offset is lesser than the local-log-start-offset, then 
it returns the message-only metadata
+* 3. If the message offset is greater than the log-end-offset, then it 
returns the message-only metadata
 */
-  private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata 
= {
+  private[log] def convertToOffsetMetadataOrThrow(offset: Long): 
LogOffsetMetadata = {

Review Comment:
   Thanks for the explanation. Yes, it makes sense to me now.



##
core/src/test/scala/unit/kafka/log/LogSegmentTest.scala:
##
@@ -143,6 +143,39 @@ class LogSegmentTest {
 checkEquals(ms2.records.iterator, read.records.records.iterator)
   }
 
+  @Test
+  def testReadWhenNoMaxPosition(): Unit = {
+val maxPosition = -1
+val maxSize = 1
+val seg = createSegment(40)
+val ms = records(50, "hello", "there")
+seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms)
+for (minOneMessage <- Array(true, false)) {
+  // read before first offset
+  var read = seg.read(48, maxSize, maxPosition, minOneMessage)
+  assertEquals(new LogOffsetMetadata(48, 40, 0), read.fetchOffsetMetadata)
+  assertTrue(read.records.records().iterator().asScala.isEmpty)
+
+  // read at first offset
+  read = seg.read(50, maxSize, maxPosition, minOneMessage)
+  assertEquals(new LogOffsetMetadata(50, 40, 0), read.fetchOffsetMetadata)
+  assertTrue(read.records.records().iterator().asScala.isEmpty)
+
+  // read beyond first offset
+  read = seg.read(51, maxSize, maxPosition, minOneMessage)
+  assertEquals(new LogOffsetMetadata(51, 40, 39), read.fetchOffsetMetadata)
+  assertTrue(read.records.records().iterator().asScala.isEmpty)
+
+  // read at last offset

Review Comment:
   Hmm, 51 is the last offset, right?



##
core/src/main/scala/kafka/server/DelayedFetch.scala:
##
@@ -91,19 +91,24 @@ class DelayedFetch(
 // Go directly to the check for Case G if the message offsets are 
the same. If the log segment
 // has just rolled, then the high watermark offset will remain the 
same but be on the old segment,
 // which would incorrectly be seen as an instance of Case F.
-if (endOffset.messageOffset != fetchOffset.messageOffset) {
-  if (endOffset.onOlderSegment(fetchOffset)) {
-// Case F, this can happen when the new fetch operation is on 
a truncated leader
-debug(s"Satisfying fetch $this since it is fetching later 
segments of partition $topicIdPartition.")
-return forceComplete()
+if (fetchOffset.messageOffset > endOffset.messageOffset) {
+  // Case F, this can happen when the new fetch operation is on a 
truncated leader
+  debug(s"Satisfying fetch $this since it is fetching later 
segments of partition $topicIdPartition.")
+  return forceComplete()
+} else if (fetchOffset.messageOffset < endOffset.messageOffset) {
+  if (endOffset.messageOffsetOnly() || 
fetchOffset.messageOffsetOnly()) {
+// If we don't know the position of the offset on log 
segments, just pessimistically assume that we
+// only gained 1 byte when fetchOffset < endOffset, otherwise 
do nothing. This can happen when the
+// high-watermark is stale, but should be rare.
+accumulatedSize += 1

Review Comment:
   If endOffset or fetchOffset is message only, we return empty records when 
reading from the log. So, to be consistent, we want to avoid accumulating new 
bytes in this case.



##
core/src/main/scala/kafka/log/LocalLog.scala:
##
@@ -370,11 +370,11 @@ class LocalLog(@volatile private var _dir: File,
 throw new 

Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-21 Thread via GitHub


kamalcph commented on PR #15825:
URL: https://github.com/apache/kafka/pull/15825#issuecomment-2123824613

   @junrao @showuon 
   Thanks for the review! Addressed all the review comments. PTAL. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-21 Thread via GitHub


kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1609227442


##
core/src/main/scala/kafka/log/LocalLog.scala:
##
@@ -370,11 +370,17 @@ class LocalLog(@volatile private var _dir: File,
 throw new OffsetOutOfRangeException(s"Received request for offset 
$startOffset for partition $topicPartition, " +
   s"but we only have log segments upto $endOffset.")
 
-  if (startOffset == maxOffsetMetadata.messageOffset)
+  if (startOffset == maxOffsetMetadata.messageOffset) {
 emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns)
-  else if (startOffset > maxOffsetMetadata.messageOffset)
-emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), 
includeAbortedTxns)
-  else {
+  } else if (startOffset > maxOffsetMetadata.messageOffset ||
+maxOffsetMetadata.messageOffsetOnly() ||
+maxOffsetMetadata.segmentBaseOffset < segmentOpt.get().baseOffset()) {
+// We need to be careful before reading the segment as 
`maxOffsetMetadata` may not be a complete metadata:
+// 1. If maxOffsetMetadata is message-offset-only, then return empty 
fetchDataInfo since
+// maxOffsetMetadata.offset is not on local log segments.
+// 2. If maxOffsetMetadata.segmentBaseOffset is smaller than 
segment.baseOffset, then return empty fetchDataInfo.

Review Comment:
   ack, kept the same behavior. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-21 Thread via GitHub


kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1609227180


##
core/src/main/scala/kafka/log/LocalLog.scala:
##
@@ -370,11 +370,17 @@ class LocalLog(@volatile private var _dir: File,
 throw new OffsetOutOfRangeException(s"Received request for offset 
$startOffset for partition $topicPartition, " +
   s"but we only have log segments upto $endOffset.")
 
-  if (startOffset == maxOffsetMetadata.messageOffset)
+  if (startOffset == maxOffsetMetadata.messageOffset) {
 emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns)
-  else if (startOffset > maxOffsetMetadata.messageOffset)
+  } else if (startOffset > maxOffsetMetadata.messageOffset ||
+maxOffsetMetadata.messageOffsetOnly() ||
+maxOffsetMetadata.segmentBaseOffset < segmentOpt.get.baseOffset) {

Review Comment:
   Addressed with the latest commit 77f90f63408ebe5b3da5c3305ad2affb7901eff4



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-21 Thread via GitHub


showuon commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1609081346


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
 * Given a message offset, find its corresponding offset metadata in the 
log.
-* If the message offset is out of range, throw an OffsetOutOfRangeException
+* 1. If the message offset is lesser than the log-start-offset, then throw 
an OffsetOutOfRangeException
+* 2. If the message offset is lesser than the local-log-start-offset, then 
it returns the message-only metadata
+* 3. If the message offset is greater than the log-end-offset, then it 
returns the message-only metadata
 */
-  private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata 
= {
+  private[log] def convertToOffsetMetadataOrThrow(offset: Long): 
LogOffsetMetadata = {

Review Comment:
   > However, if nextOffsetMetadata doesn't change and somehow startOffset > 
maxOffsetMetadata.messageOffset, then we could loop forever.
   
   I still don't know why the infinite loop will happen. In `Locallog#read`, 
we'll check if `startOffset > nextOffsetMetadata.messageOffset`, if so, 
`OffsetOutOfRangeException` will be thrown. 
   
   ```
 def read(startOffset: Long,
  maxLength: Int,
  minOneMessage: Boolean,
  maxOffsetMetadata: LogOffsetMetadata,
  includeAbortedTxns: Boolean): FetchDataInfo = {
   
   
 // [Luke] We will check if startOffset > 
nextOffsetMetadata.messageOffset (endOffset), 
 // so if the provided `maxOffsetMetadata == nextOffsetMetadata`, 
 // the `OffsetOutOfRangeException` should be thrown before entering 
another `convertToOffsetMetadataOrThrow` loop
   
 val endOffsetMetadata = nextOffsetMetadata
 val endOffset = endOffsetMetadata.messageOffset
 var segmentOpt = segments.floorSegment(startOffset)
   
 // return error on attempt to read beyond the log end offset
 if (startOffset > endOffset || !segmentOpt.isPresent)
   throw new OffsetOutOfRangeException(s"Received request for offset 
$startOffset for partition $topicPartition,  but we only have log segments upto 
$endOffset.")
   
 if (startOffset == maxOffsetMetadata.messageOffset)
   emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns)
 
// [Luke] So for the first time, the provided `maxOffsetMetadata` might 
be highWaterMark or LastStableOffset, 
// it could enter this `else if` condition because HWM/LSO should is 
always less than LEO. But the 2nd time won't enter here.
   
 else if (startOffset > maxOffsetMetadata.messageOffset)
   emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), 
includeAbortedTxns)
   ```
   So there should be no chance that we do  `Locallog#read` with these 
`maxOffsetMetadata = nextOffsetMetadata` provided entering another 
`convertToOffsetMetadataOrThrow` loop, because this time, 
   `if (startOffset > maxOffsetMetadata.messageOffset)` == `if (startOffset > 
endOffset)`
   
   If this did happen, it means the `logEndOffset < highWaterMarkOffset` or 
`logEndOffset < lastStableOffseet`, which I don't think this would happen.
   
   Does that make sense?
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-21 Thread via GitHub


showuon commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1609081346


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
 * Given a message offset, find its corresponding offset metadata in the 
log.
-* If the message offset is out of range, throw an OffsetOutOfRangeException
+* 1. If the message offset is lesser than the log-start-offset, then throw 
an OffsetOutOfRangeException
+* 2. If the message offset is lesser than the local-log-start-offset, then 
it returns the message-only metadata
+* 3. If the message offset is greater than the log-end-offset, then it 
returns the message-only metadata
 */
-  private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata 
= {
+  private[log] def convertToOffsetMetadataOrThrow(offset: Long): 
LogOffsetMetadata = {

Review Comment:
   > However, if nextOffsetMetadata doesn't change and somehow startOffset > 
maxOffsetMetadata.messageOffset, then we could loop forever.
   
   I still don't know why the infinite loop will happen. In `Locallog#read`, 
we'll check if `startOffset > nextOffsetMetadata.messageOffset`, if so, 
`OffsetOutOfRangeException` will be thrown. 
   
   ```
 def read(startOffset: Long,
  maxLength: Int,
  minOneMessage: Boolean,
  maxOffsetMetadata: LogOffsetMetadata,
  includeAbortedTxns: Boolean): FetchDataInfo = {
   
   
 // [Luke] We will check if startOffset > 
nextOffsetMetadata.messageOffset (endOffset), 
 // so if the provided `maxOffsetMetadata == nextOffsetMetadata`, 
 // the `OffsetOutOfRangeException` should be thrown before entering 
another `convertToOffsetMetadataOrThrow` loop
   
 val endOffsetMetadata = nextOffsetMetadata
 val endOffset = endOffsetMetadata.messageOffset
 var segmentOpt = segments.floorSegment(startOffset)
   
 // return error on attempt to read beyond the log end offset
 if (startOffset > endOffset || !segmentOpt.isPresent)
   throw new OffsetOutOfRangeException(s"Received request for offset 
$startOffset for partition $topicPartition,  but we only have log segments upto 
$endOffset.")
   
 if (startOffset == maxOffsetMetadata.messageOffset)
   emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns)
 
// [Luke] So for the first time, the provided `maxOffsetMetadata` might 
be highWaterMark or LastStableOffset, 
// it could enter this `else if` condition because HWM/LSO should is 
always less than LEO. But the 2nd time won't enter here.
   
 else if (startOffset > maxOffsetMetadata.messageOffset)
   emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), 
includeAbortedTxns)
   ```
   So there should be no chance that we do  `Locallog#read` with these 
`maxOffsetMetadata = nextOffsetMetadata` provided entering another 
`convertToOffsetMetadataOrThrow` loop, because 
   `if (startOffset > maxOffsetMetadata.messageOffset)` == `if (startOffset > 
endOffset)`
   
   If this did happen, it means the `logEndOffset < highWaterMarkOffset` or 
`logEndOffset < lastStableOffseet`, which I don't think this would happen.
   
   Does that make sense?
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-21 Thread via GitHub


showuon commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1609081346


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
 * Given a message offset, find its corresponding offset metadata in the 
log.
-* If the message offset is out of range, throw an OffsetOutOfRangeException
+* 1. If the message offset is lesser than the log-start-offset, then throw 
an OffsetOutOfRangeException
+* 2. If the message offset is lesser than the local-log-start-offset, then 
it returns the message-only metadata
+* 3. If the message offset is greater than the log-end-offset, then it 
returns the message-only metadata
 */
-  private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata 
= {
+  private[log] def convertToOffsetMetadataOrThrow(offset: Long): 
LogOffsetMetadata = {

Review Comment:
   > However, if nextOffsetMetadata doesn't change and somehow startOffset > 
maxOffsetMetadata.messageOffset, then we could loop forever.
   
   I still don't know why the infinite loop will happen. In `Locallog#read`, 
we'll check if `startOffset > nextOffsetMetadata.messageOffset`, if so, 
`OffsetOutOfRangeException` will be thrown. 
   
   ```
 def read(startOffset: Long,
  maxLength: Int,
  minOneMessage: Boolean,
  maxOffsetMetadata: LogOffsetMetadata,
  includeAbortedTxns: Boolean): FetchDataInfo = {
   
   
 // [Luke] We will check if startOffset > 
nextOffsetMetadata.messageOffset (endOffset), 
 // so if the provided `maxOffsetMetadata == nextOffsetMetadata`, 
 // the `OffsetOutOfRangeException` should be thrown before entering 
another `convertToOffsetMetadataOrThrow` loop
   
 val endOffsetMetadata = nextOffsetMetadata
 val endOffset = endOffsetMetadata.messageOffset
 var segmentOpt = segments.floorSegment(startOffset)
   
 // return error on attempt to read beyond the log end offset
 if (startOffset > endOffset || !segmentOpt.isPresent)
   throw new OffsetOutOfRangeException(s"Received request for offset 
$startOffset for partition $topicPartition,  but we only have log segments upto 
$endOffset.")
   
 if (startOffset == maxOffsetMetadata.messageOffset)
   emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns)
 
// [Luke] So for the first time, the provided `maxOffsetMetadata` might 
be highWaterMark or LastStableOffset, 
// it could enter this `else if` condition because HWM/LSO should is 
always less than LEO. But the 2nd time won't enter here.
   
 else if (startOffset > maxOffsetMetadata.messageOffset)
   emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), 
includeAbortedTxns)
   ```
   So there should be no chance that we do  `Locallog#read` with these 
`maxOffsetMetadata = nextOffsetMetadata` provided entering another 
`convertToOffsetMetadataOrThrow` loop.
   
   If this did happen, it means the `logEndOffset < highWaterMarkOffset` or 
`logEndOffset < lastStableOffseet`, which I don't think this would happen.
   
   Does that make sense?
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-21 Thread via GitHub


showuon commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1609081346


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
 * Given a message offset, find its corresponding offset metadata in the 
log.
-* If the message offset is out of range, throw an OffsetOutOfRangeException
+* 1. If the message offset is lesser than the log-start-offset, then throw 
an OffsetOutOfRangeException
+* 2. If the message offset is lesser than the local-log-start-offset, then 
it returns the message-only metadata
+* 3. If the message offset is greater than the log-end-offset, then it 
returns the message-only metadata
 */
-  private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata 
= {
+  private[log] def convertToOffsetMetadataOrThrow(offset: Long): 
LogOffsetMetadata = {

Review Comment:
   > However, if nextOffsetMetadata doesn't change and somehow startOffset > 
maxOffsetMetadata.messageOffset, then we could loop forever.
   
   I still don't know why the infinite loop will happen. In `Locallog#read`, 
we'll check if `startOffset > nextOffsetMetadata.messageOffset`, if so, 
`OffsetOutOfRangeException` will be thrown. 
   
   ```
 def read(startOffset: Long,
  maxLength: Int,
  minOneMessage: Boolean,
  maxOffsetMetadata: LogOffsetMetadata,
  includeAbortedTxns: Boolean): FetchDataInfo = {
   
   
 // [Luke] We will check if startOffset > 
nextOffsetMetadata.messageOffset, 
 // so if the provided `maxOffsetMetadata == nextOffsetMetadata`, 
 // the `OffsetOutOfRangeException` should be thrown before entering 
another `convertToOffsetMetadataOrThrow` loop
   
 val endOffsetMetadata = nextOffsetMetadata
 val endOffset = endOffsetMetadata.messageOffset
 var segmentOpt = segments.floorSegment(startOffset)
   
 // return error on attempt to read beyond the log end offset
 if (startOffset > endOffset || !segmentOpt.isPresent)
   throw new OffsetOutOfRangeException(s"Received request for offset 
$startOffset for partition $topicPartition,  but we only have log segments upto 
$endOffset.")
   
 if (startOffset == maxOffsetMetadata.messageOffset)
   emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns)
 
// [Luke] So for the first time, the provided `maxOffsetMetadata` might 
be highWaterMark or LastStableOffset, 
// it could enter this `else if` condition because HWM/LSO should is 
always less than LEO. But the 2nd time won't enter here.
   
 else if (startOffset > maxOffsetMetadata.messageOffset)
   emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), 
includeAbortedTxns)
   ```
   So there should be no chance that we do  `Locallog#read` with these 
`maxOffsetMetadata = nextOffsetMetadata` provided entering another 
`convertToOffsetMetadataOrThrow` loop.
   
   If this did happen, it means the `logEndOffset < highWaterMarkOffset` or 
`logEndOffset < lastStableOffseet`, which I don't think this would happen.
   
   Does that make sense?
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-21 Thread via GitHub


showuon commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1609081346


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
 * Given a message offset, find its corresponding offset metadata in the 
log.
-* If the message offset is out of range, throw an OffsetOutOfRangeException
+* 1. If the message offset is lesser than the log-start-offset, then throw 
an OffsetOutOfRangeException
+* 2. If the message offset is lesser than the local-log-start-offset, then 
it returns the message-only metadata
+* 3. If the message offset is greater than the log-end-offset, then it 
returns the message-only metadata
 */
-  private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata 
= {
+  private[log] def convertToOffsetMetadataOrThrow(offset: Long): 
LogOffsetMetadata = {

Review Comment:
   > However, if nextOffsetMetadata doesn't change and somehow startOffset > 
maxOffsetMetadata.messageOffset, then we could loop forever.
   
   I still don't know why the infinite loop will happen. In `Locallog#read`, 
we'll check if `startOffset > nextOffsetMetadata.messageOffset`, if so, 
`OffsetOutOfRangeException` will be thrown. 
   
   ```
 def read(startOffset: Long,
  maxLength: Int,
  minOneMessage: Boolean,
  maxOffsetMetadata: LogOffsetMetadata,
  includeAbortedTxns: Boolean): FetchDataInfo = {
   
   
 // [Luke] We will check if startOffset > 
nextOffsetMetadata.messageOffset, so if the provided `maxOffsetMetadata == 
nextOffsetMetadata`, the `OffsetOutOfRangeException` should be thrown before 
entering another `convertToOffsetMetadataOrThrow` loop
   
 val endOffsetMetadata = nextOffsetMetadata
 val endOffset = endOffsetMetadata.messageOffset
 var segmentOpt = segments.floorSegment(startOffset)
   
 // return error on attempt to read beyond the log end offset
 if (startOffset > endOffset || !segmentOpt.isPresent)
   throw new OffsetOutOfRangeException(s"Received request for offset 
$startOffset for partition $topicPartition,  but we only have log segments upto 
$endOffset.")
   
 if (startOffset == maxOffsetMetadata.messageOffset)
   emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns)
 
// [Luke] So for the first time, the provided `maxOffsetMetadata` might 
be highWaterMark or LastStableOffset, it could enter this `else if` condition 
because HWM/LSO should is always less than LEO. But the 2nd time won't enter 
here.
   
 else if (startOffset > maxOffsetMetadata.messageOffset)
   emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), 
includeAbortedTxns)
   ```
   So there should be no chance that we do  `Locallog#read` with these 
`maxOffsetMetadata = nextOffsetMetadata` provided entering another 
`convertToOffsetMetadataOrThrow` loop.
   
   If this did happen, it means the `logEndOffset < highWaterMarkOffset` or 
`logEndOffset < lastStableOffseet`, which I don't think this would happen.
   
   Does that make sense?
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-21 Thread via GitHub


showuon commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1609081346


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
 * Given a message offset, find its corresponding offset metadata in the 
log.
-* If the message offset is out of range, throw an OffsetOutOfRangeException
+* 1. If the message offset is lesser than the log-start-offset, then throw 
an OffsetOutOfRangeException
+* 2. If the message offset is lesser than the local-log-start-offset, then 
it returns the message-only metadata
+* 3. If the message offset is greater than the log-end-offset, then it 
returns the message-only metadata
 */
-  private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata 
= {
+  private[log] def convertToOffsetMetadataOrThrow(offset: Long): 
LogOffsetMetadata = {

Review Comment:
   > However, if nextOffsetMetadata doesn't change and somehow startOffset > 
maxOffsetMetadata.messageOffset, then we could loop forever.
   
   I still don't know why the infinite loop will happen. In `Locallog#read`, 
we'll check if `startOffset > nextOffsetMetadata.messageOffset`, if so, 
`OffsetOutOfRangeException` will be thrown. 
   
   ```
 def read(startOffset: Long,
  maxLength: Int,
  minOneMessage: Boolean,
  maxOffsetMetadata: LogOffsetMetadata,
  includeAbortedTxns: Boolean): FetchDataInfo = {
   
   
 // (1) We will check if startOffset > 
nextOffsetMetadata.messageOffset, so if the provided `maxOffsetMetadata == 
nextOffsetMetadata`, the `OffsetOutOfRangeException` should be thrown before 
entering another `convertToOffsetMetadataOrThrow` loop
 val endOffsetMetadata = nextOffsetMetadata
 val endOffset = endOffsetMetadata.messageOffset
 var segmentOpt = segments.floorSegment(startOffset)
   
 // return error on attempt to read beyond the log end offset
 if (startOffset > endOffset || !segmentOpt.isPresent)
   throw new OffsetOutOfRangeException(s"Received request for offset 
$startOffset for partition $topicPartition,  but we only have log segments upto 
$endOffset.")
   
 if (startOffset == maxOffsetMetadata.messageOffset)
   emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns)
 
// So for the first time, the provided `maxOffsetMetadata` might be 
highWaterMark or LastStableOffset, it could enter this `else if` condition 
because HWM/LSO should is always less than LEO. But the 2nd time won't enter 
here.
 else if (startOffset > maxOffsetMetadata.messageOffset)
   emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), 
includeAbortedTxns)
   ```
   So there should be no chance that we do  `Locallog#read` with these 
`maxOffsetMetadata = nextOffsetMetadata` provided entering another 
`convertToOffsetMetadataOrThrow` loop.
   
   If this did happen, it means the `logEndOffset < highWaterMarkOffset` or 
`logEndOffset < lastStableOffseet`, which I don't think this would happen.
   
   Does that make sense?
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-21 Thread via GitHub


junrao commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1608986312


##
core/src/main/scala/kafka/log/LocalLog.scala:
##
@@ -370,11 +370,17 @@ class LocalLog(@volatile private var _dir: File,
 throw new OffsetOutOfRangeException(s"Received request for offset 
$startOffset for partition $topicPartition, " +
   s"but we only have log segments upto $endOffset.")
 
-  if (startOffset == maxOffsetMetadata.messageOffset)
+  if (startOffset == maxOffsetMetadata.messageOffset) {
 emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns)
-  else if (startOffset > maxOffsetMetadata.messageOffset)
-emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), 
includeAbortedTxns)
-  else {
+  } else if (startOffset > maxOffsetMetadata.messageOffset ||
+maxOffsetMetadata.messageOffsetOnly() ||
+maxOffsetMetadata.segmentBaseOffset < segmentOpt.get().baseOffset()) {
+// We need to be careful before reading the segment as 
`maxOffsetMetadata` may not be a complete metadata:
+// 1. If maxOffsetMetadata is message-offset-only, then return empty 
fetchDataInfo since
+// maxOffsetMetadata.offset is not on local log segments.
+// 2. If maxOffsetMetadata.segmentBaseOffset is smaller than 
segment.baseOffset, then return empty fetchDataInfo.

Review Comment:
   Looking at the code again. @showuon is correct. 
`convertToOffsetMetadataOrThrow(startOffset)` uses nextOffset, which always has 
the offset metadata. So, we can keep the 
`convertToOffsetMetadataOrThrow(startOffset)` call.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-21 Thread via GitHub


junrao commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1608984545


##
core/src/main/scala/kafka/log/LocalLog.scala:
##
@@ -370,11 +370,17 @@ class LocalLog(@volatile private var _dir: File,
 throw new OffsetOutOfRangeException(s"Received request for offset 
$startOffset for partition $topicPartition, " +
   s"but we only have log segments upto $endOffset.")
 
-  if (startOffset == maxOffsetMetadata.messageOffset)
+  if (startOffset == maxOffsetMetadata.messageOffset) {
 emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns)
-  else if (startOffset > maxOffsetMetadata.messageOffset)
+  } else if (startOffset > maxOffsetMetadata.messageOffset ||
+maxOffsetMetadata.messageOffsetOnly() ||
+maxOffsetMetadata.segmentBaseOffset < segmentOpt.get.baseOffset) {

Review Comment:
   Returning the current offset is correct. I am wondering why the suggested 
approach returns nextOffset. `segment.read(startOffset, maxLength, maxPosition, 
minOneMessage)` should return a non-null fetchDataInfo since the startOffset 
exists in the first segment, right? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-21 Thread via GitHub


kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1608834195


##
core/src/main/scala/kafka/log/LocalLog.scala:
##
@@ -370,11 +370,17 @@ class LocalLog(@volatile private var _dir: File,
 throw new OffsetOutOfRangeException(s"Received request for offset 
$startOffset for partition $topicPartition, " +
   s"but we only have log segments upto $endOffset.")
 
-  if (startOffset == maxOffsetMetadata.messageOffset)
+  if (startOffset == maxOffsetMetadata.messageOffset) {
 emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns)
-  else if (startOffset > maxOffsetMetadata.messageOffset)
-emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), 
includeAbortedTxns)
-  else {
+  } else if (startOffset > maxOffsetMetadata.messageOffset ||
+maxOffsetMetadata.messageOffsetOnly() ||
+maxOffsetMetadata.segmentBaseOffset < segmentOpt.get().baseOffset()) {
+// We need to be careful before reading the segment as 
`maxOffsetMetadata` may not be a complete metadata:
+// 1. If maxOffsetMetadata is message-offset-only, then return empty 
fetchDataInfo since
+// maxOffsetMetadata.offset is not on local log segments.
+// 2. If maxOffsetMetadata.segmentBaseOffset is smaller than 
segment.baseOffset, then return empty fetchDataInfo.

Review Comment:
   This comment is pending. Will wait for @showuon review before applying the 
change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-21 Thread via GitHub


kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1608831211


##
core/src/main/scala/kafka/log/LocalLog.scala:
##
@@ -370,11 +370,17 @@ class LocalLog(@volatile private var _dir: File,
 throw new OffsetOutOfRangeException(s"Received request for offset 
$startOffset for partition $topicPartition, " +
   s"but we only have log segments upto $endOffset.")
 
-  if (startOffset == maxOffsetMetadata.messageOffset)
+  if (startOffset == maxOffsetMetadata.messageOffset) {
 emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns)
-  else if (startOffset > maxOffsetMetadata.messageOffset)
+  } else if (startOffset > maxOffsetMetadata.messageOffset ||
+maxOffsetMetadata.messageOffsetOnly() ||
+maxOffsetMetadata.segmentBaseOffset < segmentOpt.get.baseOffset) {

Review Comment:
   When applying this change, the newly added 
LocalLog#testWhenFetchOffsetHigherThanMaxOffset test fails for the case-3 and 
case-4. (ie) Which offset to return back in the FetchDataInfo when the 
conditions didn't met?
   
   1. In current approach, the same fetch-offset gets returned back in the 
FetchDataInfo
   2. In the suggested approach, the next-offset/log-end-offset gets returned 
back in the FetchDataInfo
   
   I'm not sure which one is correct. Please suggest. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-21 Thread via GitHub


kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1608831211


##
core/src/main/scala/kafka/log/LocalLog.scala:
##
@@ -370,11 +370,17 @@ class LocalLog(@volatile private var _dir: File,
 throw new OffsetOutOfRangeException(s"Received request for offset 
$startOffset for partition $topicPartition, " +
   s"but we only have log segments upto $endOffset.")
 
-  if (startOffset == maxOffsetMetadata.messageOffset)
+  if (startOffset == maxOffsetMetadata.messageOffset) {
 emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns)
-  else if (startOffset > maxOffsetMetadata.messageOffset)
+  } else if (startOffset > maxOffsetMetadata.messageOffset ||
+maxOffsetMetadata.messageOffsetOnly() ||
+maxOffsetMetadata.segmentBaseOffset < segmentOpt.get.baseOffset) {

Review Comment:
   When applying this change, the newly added 
LocalLog#testWhenFetchOffsetHigherThanMaxOffset test fails for the case-3 and 
case-4. (ie) Which offset to return back in the FetchDataInfo when the 
conditions didn't met?
   
   1. In current approach, the same fetch-offset gets returned back in the 
FetchDataInfo
   2. In the suggested approach, the next-offset/log-end-offset gets returned 
back in the FetchDataInfo



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-21 Thread via GitHub


kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1608823231


##
storage/src/test/java/org/apache/kafka/tiered/storage/actions/EraseBrokerStorageAction.java:
##
@@ -19,24 +19,35 @@
 import org.apache.kafka.tiered.storage.TieredStorageTestAction;
 import org.apache.kafka.tiered.storage.TieredStorageTestContext;
 
+import java.io.FilenameFilter;
 import java.io.IOException;
 import java.io.PrintStream;
 
 public final class EraseBrokerStorageAction implements TieredStorageTestAction 
{
 
 private final int brokerId;
+private final FilenameFilter filenameFilter;
+private final boolean isStopped;
 
 public EraseBrokerStorageAction(int brokerId) {
+this(brokerId, (dir, name) -> true, false);
+}
+
+public EraseBrokerStorageAction(int brokerId,
+FilenameFilter filenameFilter,
+boolean isStopped) {
 this.brokerId = brokerId;
+this.filenameFilter = filenameFilter;
+this.isStopped = isStopped;
 }
 
 @Override
 public void doExecute(TieredStorageTestContext context) throws IOException 
{
-context.eraseBrokerStorage(brokerId);
+context.eraseBrokerStorage(brokerId, filenameFilter, isStopped);
 }
 
 @Override
 public void describe(PrintStream output) {
-output.println("erase-broker-storage: " + brokerId);
+output.println("erase-broker-storage: " + brokerId + ", isStopped: " + 
isStopped);

Review Comment:
   `filenameFilter` is a lambda expression, it results to the object name in 
the log so omitted it from the output.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-21 Thread via GitHub


junrao commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1607381004


##
core/src/main/scala/kafka/server/DelayedFetch.scala:
##
@@ -92,7 +92,14 @@ class DelayedFetch(
 // has just rolled, then the high watermark offset will remain the 
same but be on the old segment,
 // which would incorrectly be seen as an instance of Case F.
 if (endOffset.messageOffset != fetchOffset.messageOffset) {
-  if (endOffset.onOlderSegment(fetchOffset)) {
+  if (endOffset.messageOffsetOnly() || 
fetchOffset.messageOffsetOnly()) {
+// If we don't know the position of the offset on log 
segments, just pessimistically assume that we
+// only gained 1 byte when fetchOffset < endOffset, otherwise 
do nothing. This can happen when the
+// high-watermark is stale, but should be rare.
+if (fetchOffset.messageOffset < endOffset.messageOffset) {

Review Comment:
   We can organize the code a bit clearer. I am thinking of sth like the 
following.
   
   ```
   if (fetchOffset.messageOffset > endOffset.messageOffset) {
   // Case F, this can happen when the new fetch operation is 
on a truncated leader
   debug(s"Satisfying fetch $this since it is fetching later 
segments of partition $topicIdPartition.")
   return forceComplete()
} else if (fetchOffset.messageOffset < endOffset.messageOffset) {
 if (endOffset.messageOffsetOnly() || 
fetchOffset.messageOffsetOnly()) {
   // If we don't know the position of the offset on log 
segments, just pessimistically assume that we
   // only gained 1 byte when fetchOffset < endOffset, 
otherwise do nothing. This can happen when the
   // high-watermark is stale, but should be rare.
 accumulatedSize += 1
 } else if (fetchOffset.onOlderSegment(endOffset)) {
   // Case F, this can happen when the fetch operation is 
falling behind the current segment
   // or the partition has just rolled a new segment
   debug(s"Satisfying fetch $this immediately since it is 
fetching older segments.")
   // We will not force complete the fetch request if a replica 
should be throttled.
   if (!params.isFromFollower || 
!replicaManager.shouldLeaderThrottle(quota, partition, params.replicaId))
 return forceComplete()
 } else if (fetchOffset.onSameSegment(endOffset)) {
   // we take the partition fetch size as upper bound when 
accumulating the bytes (skip if a throttled partition)
   val bytesAvailable = 
math.min(endOffset.positionDiff(fetchOffset), fetchStatus.fetchInfo.maxBytes)
   if (!params.isFromFollower || 
!replicaManager.shouldLeaderThrottle(quota, partition, params.replicaId))
 accumulatedSize += bytesAvailable
 }
   
   ```



##
core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala:
##
@@ -164,18 +169,71 @@ class DelayedFetchTest {
 assertTrue(delayedFetch.tryComplete())
 assertTrue(delayedFetch.isCompleted)
 assertTrue(fetchResultOpt.isDefined)
+
+val fetchResult = fetchResultOpt.get
+assertEquals(Errors.NONE, fetchResult.error)
+  }
+
+  @ParameterizedTest(name = "testDelayedFetchWithInvalidHighWatermark 
endOffset={0}")
+  @ValueSource(longs = Array(0, 500))
+  def testDelayedFetchWithInvalidHighWatermark(endOffset: Long): Unit = {
+val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
+val fetchOffset = 450L
+val logStartOffset = 5L
+val currentLeaderEpoch = Optional.of[Integer](10)
+val replicaId = 1
+
+val fetchStatus = FetchPartitionStatus(
+  startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
+  fetchInfo = new FetchRequest.PartitionData(topicIdPartition.topicId, 
fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch))
+val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500)
+
+var fetchResultOpt: Option[FetchPartitionData] = None
+def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit 
= {
+  fetchResultOpt = Some(responses.head._2)
+}
+
+val delayedFetch = new DelayedFetch(
+  params = fetchParams,
+  fetchPartitionStatus = Seq(topicIdPartition -> fetchStatus),
+  replicaManager = replicaManager,
+  quota = replicaQuota,
+  responseCallback = callback
+)
+
+val partition: Partition = mock(classOf[Partition])
+
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition)).thenReturn(partition)
+// Note that the high-watermark does not contains the complete metadata

Review Comment:
   does not contains => does not contain




Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-21 Thread via GitHub


kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1608327384


##
core/src/main/scala/kafka/log/LocalLog.scala:
##
@@ -370,11 +370,17 @@ class LocalLog(@volatile private var _dir: File,
 throw new OffsetOutOfRangeException(s"Received request for offset 
$startOffset for partition $topicPartition, " +
   s"but we only have log segments upto $endOffset.")
 
-  if (startOffset == maxOffsetMetadata.messageOffset)
+  if (startOffset == maxOffsetMetadata.messageOffset) {
 emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns)
-  else if (startOffset > maxOffsetMetadata.messageOffset)
+  } else if (startOffset > maxOffsetMetadata.messageOffset ||
+maxOffsetMetadata.messageOffsetOnly() ||
+maxOffsetMetadata.segmentBaseOffset < segmentOpt.get.baseOffset) {

Review Comment:
   Not sure when do we need this condition (added as per Jun's suggestion):
   ```
   maxOffsetMetadata.segmentBaseOffset < segmentOpt.get.baseOffset
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-21 Thread via GitHub


kamalcph commented on PR #15825:
URL: https://github.com/apache/kafka/pull/15825#issuecomment-2122625313

   > @kamalcph , had another look, left some comments. Also, could we add an 
integration test for this case like you described in the PR description? 
`FetchFromFollowerIntegrationTest` should be a good place to do that. There are 
some examples to shutdown nodes and then restart it, and then verify 
producer/consumer still work well.
   
   Added `FetchFromLeaderWithCorruptedCheckpointTest` integration test and 
ensured that the produce fails without the patch (ran the test against trunk 
and this branch).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-21 Thread via GitHub


showuon commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1608010033


##
core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala:
##
@@ -391,8 +398,10 @@ class UnifiedLogTest {
 assertNonEmptyFetch(log, offset, FetchIsolation.HIGH_WATERMARK)
   }
 
-  (log.highWatermark to log.logEndOffset).foreach { offset =>
-assertEmptyFetch(log, offset, FetchIsolation.HIGH_WATERMARK)
+  assertEmptyFetch(log, log.highWatermark, FetchIsolation.HIGH_WATERMARK)
+
+  (log.highWatermark + 1 to log.logEndOffset).foreach { offset =>

Review Comment:
   With my 
[comment](https://github.com/apache/kafka/pull/15825#discussion_r1608007659), I 
think we don't need this change now, which makes this change smaller and safer. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-21 Thread via GitHub


showuon commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1608007659


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
 * Given a message offset, find its corresponding offset metadata in the 
log.
-* If the message offset is out of range, throw an OffsetOutOfRangeException
+* 1. If the message offset is lesser than the log-start-offset, then throw 
an OffsetOutOfRangeException
+* 2. If the message offset is lesser than the local-log-start-offset, then 
it returns the message-only metadata
+* 3. If the message offset is greater than the log-end-offset, then it 
returns the message-only metadata
 */
-  private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata 
= {
+  private[log] def convertToOffsetMetadataOrThrow(offset: Long): 
LogOffsetMetadata = {

Review Comment:
   I don't think we need to change it because it won't cause infinite recursion 
in `LocalLog.read()`:
   ```
   else if (startOffset > maxOffsetMetadata.messageOffset)
   emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), 
includeAbortedTxns)
   ```
   
   The reason is when in `LocalLog#convertToOffsetMetadataOrThrow`, the 
`maxOffsetMetadata` will be set to `nextOffsetMetadata`, which is the 
`logEndOffset`.
   
   ```
   read(offset,
 maxLength = 1,
 minOneMessage = false,
 maxOffsetMetadata = nextOffsetMetadata,
 includeAbortedTxns = false)
   ```
   So, even if it enters the `LocalLog.read()` again, but this time, it'll 
enter different if/else block. It won't enter the `else if (startOffset > 
maxOffsetMetadata.messageOffset)` block this time because the check earlier 
will catch it!
   ```
 if (startOffset > endOffset || !segmentOpt.isPresent)
   throw new OffsetOutOfRangeException(s"Received request for offset 
$startOffset for partition $topicPartition, " +  s"but we only have log 
segments upto $endOffset.")
   ```
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-21 Thread via GitHub


showuon commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1608007659


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
 * Given a message offset, find its corresponding offset metadata in the 
log.
-* If the message offset is out of range, throw an OffsetOutOfRangeException
+* 1. If the message offset is lesser than the log-start-offset, then throw 
an OffsetOutOfRangeException
+* 2. If the message offset is lesser than the local-log-start-offset, then 
it returns the message-only metadata
+* 3. If the message offset is greater than the log-end-offset, then it 
returns the message-only metadata
 */
-  private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata 
= {
+  private[log] def convertToOffsetMetadataOrThrow(offset: Long): 
LogOffsetMetadata = {

Review Comment:
   I don't think we need to change it because it won't cause infinite recursion 
in `LocalLog.read()`:
   ```
   else if (startOffset > maxOffsetMetadata.messageOffset)
   emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), 
includeAbortedTxns)
   ```
   
   The reason is when in `LocalLog#convertToOffsetMetadataOrThrow`, the 
`maxOffsetMetadata` will be set to `nextOffsetMetadata`, which is the 
`logEndOffset`.
   
   ```
   read(offset,
 maxLength = 1,
 minOneMessage = false,
 maxOffsetMetadata = nextOffsetMetadata,
 includeAbortedTxns = false)
   ```
   So, even it enters the `LocalLog.read()` again, but this time, it'll enter 
different if/else block. It won't enter the `else if (startOffset > 
maxOffsetMetadata.messageOffset)` block this time because the check earlier 
will catch it!
   ```
 if (startOffset > endOffset || !segmentOpt.isPresent)
   throw new OffsetOutOfRangeException(s"Received request for offset 
$startOffset for partition $topicPartition, " +  s"but we only have log 
segments upto $endOffset.")
   ```
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-20 Thread via GitHub


kamalcph commented on PR #15825:
URL: https://github.com/apache/kafka/pull/15825#issuecomment-2121704898

   Test failures are unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-20 Thread via GitHub


kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1606925582


##
core/src/main/scala/kafka/log/LocalLog.scala:
##
@@ -370,11 +370,12 @@ class LocalLog(@volatile private var _dir: File,
 throw new OffsetOutOfRangeException(s"Received request for offset 
$startOffset for partition $topicPartition, " +
   s"but we only have log segments upto $endOffset.")
 
-  if (startOffset == maxOffsetMetadata.messageOffset)
+  if (startOffset == maxOffsetMetadata.messageOffset) {
 emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns)
-  else if (startOffset > maxOffsetMetadata.messageOffset)
-emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), 
includeAbortedTxns)
-  else {
+  } else if (startOffset > maxOffsetMetadata.messageOffset) {
+// Instead of converting the `startOffset` to metadata, returning 
message-only metadata to avoid potential loop
+emptyFetchDataInfo(new LogOffsetMetadata(startOffset), 
includeAbortedTxns)

Review Comment:
   With the newly added checks in L376 and L377, the empty fetch-data-info is 
returned and the actual data will be returned only when the `maxOffsetMetadata` 
gets resolved to complete metadata. 
   
   Earlier, the below condition returned `segment.size` for message-only 
maxOffsetMetadata:
   ```
   if (maxOffsetMetadata.segmentBaseOffset == segment.baseOffset) 
maxOffsetMetadata.relativePositionInSegment
   else segment.size
   ```
   so we might be returning the data beyond the allowed maxOffset. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-20 Thread via GitHub


kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1606711171


##
core/src/main/scala/kafka/log/LocalLog.scala:
##
@@ -370,11 +370,12 @@ class LocalLog(@volatile private var _dir: File,
 throw new OffsetOutOfRangeException(s"Received request for offset 
$startOffset for partition $topicPartition, " +
   s"but we only have log segments upto $endOffset.")
 
-  if (startOffset == maxOffsetMetadata.messageOffset)
+  if (startOffset == maxOffsetMetadata.messageOffset) {
 emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns)
-  else if (startOffset > maxOffsetMetadata.messageOffset)
-emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), 
includeAbortedTxns)
-  else {
+  } else if (startOffset > maxOffsetMetadata.messageOffset) {
+// Instead of converting the `startOffset` to metadata, returning 
message-only metadata to avoid potential loop
+emptyFetchDataInfo(new LogOffsetMetadata(startOffset), 
includeAbortedTxns)

Review Comment:
   Addressed the review comment in 6452a452c40001ca32ea4272e98e0691b69e9d28 and 
51b6bb8d208754985196a401d13a27c282422c8c



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-20 Thread via GitHub


kamalcph commented on PR #15825:
URL: https://github.com/apache/kafka/pull/15825#issuecomment-2120351527

   > @kamalcph , do we have any update for this PR?
   
   sorry for the delay. Addressed all the review comments. PTAL. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-20 Thread via GitHub


showuon commented on PR #15825:
URL: https://github.com/apache/kafka/pull/15825#issuecomment-2119875404

   @kamalcph , do we have any update for this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-13 Thread via GitHub


junrao commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1599161815


##
core/src/main/scala/kafka/log/LocalLog.scala:
##
@@ -370,11 +370,12 @@ class LocalLog(@volatile private var _dir: File,
 throw new OffsetOutOfRangeException(s"Received request for offset 
$startOffset for partition $topicPartition, " +
   s"but we only have log segments upto $endOffset.")
 
-  if (startOffset == maxOffsetMetadata.messageOffset)
+  if (startOffset == maxOffsetMetadata.messageOffset) {
 emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns)
-  else if (startOffset > maxOffsetMetadata.messageOffset)
-emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), 
includeAbortedTxns)
-  else {
+  } else if (startOffset > maxOffsetMetadata.messageOffset) {
+// Instead of converting the `startOffset` to metadata, returning 
message-only metadata to avoid potential loop
+emptyFetchDataInfo(new LogOffsetMetadata(startOffset), 
includeAbortedTxns)

Review Comment:
   Hmm, I thought the design of this PR is to allow maxOffsetMetadata to be 
message-only in some of the rare cases, right?



##
core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala:
##
@@ -164,18 +169,71 @@ class DelayedFetchTest {
 assertTrue(delayedFetch.tryComplete())
 assertTrue(delayedFetch.isCompleted)
 assertTrue(fetchResultOpt.isDefined)
+
+val fetchResult = fetchResultOpt.get
+assertEquals(Errors.NONE, fetchResult.error)
+  }
+
+  @ParameterizedTest(name = "testDelayedFetchWithInvalidHighWatermark 
minBytes={0}")
+  @ValueSource(ints = Array(1, 2))
+  def testDelayedFetchWithInvalidHighWatermark(minBytes: Int): Unit = {
+val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
+val fetchOffset = 450L
+val logStartOffset = 5L
+val currentLeaderEpoch = Optional.of[Integer](10)
+val replicaId = 1
+
+val fetchStatus = FetchPartitionStatus(
+  startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
+  fetchInfo = new FetchRequest.PartitionData(topicIdPartition.topicId, 
fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch))
+val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500, 
minBytes = minBytes)
+
+var fetchResultOpt: Option[FetchPartitionData] = None
+def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit 
= {
+  fetchResultOpt = Some(responses.head._2)
+}
+
+val delayedFetch = new DelayedFetch(
+  params = fetchParams,
+  fetchPartitionStatus = Seq(topicIdPartition -> fetchStatus),
+  replicaManager = replicaManager,
+  quota = replicaQuota,
+  responseCallback = callback
+)
+
+val partition: Partition = mock(classOf[Partition])
+
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition)).thenReturn(partition)
+// high-watermark is lesser than the log-start-offset
+val endOffsetMetadata = new LogOffsetMetadata(0L, 0L, 0)
+when(partition.fetchOffsetSnapshot(
+  currentLeaderEpoch,
+  fetchOnlyFromLeader = true))
+  .thenReturn(new LogOffsetSnapshot(0L, endOffsetMetadata, 
endOffsetMetadata, endOffsetMetadata))
+when(replicaManager.isAddingReplica(any(), anyInt())).thenReturn(false)
+expectReadFromReplica(fetchParams, topicIdPartition, 
fetchStatus.fetchInfo, Errors.NONE)
+
+val expected = minBytes == 1
+assertEquals(expected, delayedFetch.tryComplete())
+assertEquals(expected, delayedFetch.isCompleted)

Review Comment:
   Yes, I understand that the test passes as it is. I am just saying that the 
logic in DelayedFetch is not consistent.
   
   If the offset metadata is available, we accumulate bytes only `if 
(fetchOffset.messageOffset < endOffset.messageOffset)`. To be consistent, we 
need to do the same `if` test if the offset metadata is not available.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-13 Thread via GitHub


kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1598838884


##
core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala:
##
@@ -391,8 +398,10 @@ class UnifiedLogTest {
 assertNonEmptyFetch(log, offset, FetchIsolation.HIGH_WATERMARK)
   }
 
-  (log.highWatermark to log.logEndOffset).foreach { offset =>
-assertEmptyFetch(log, offset, FetchIsolation.HIGH_WATERMARK)
+  assertEmptyFetch(log, log.highWatermark, FetchIsolation.HIGH_WATERMARK)
+
+  (log.highWatermark + 1 to log.logEndOffset).foreach { offset =>

Review Comment:
   since `log.highWatermark` contains the full-metadata, divided the check into 
2 when reading at (or) beyond the high-watermark:
   
   1) When the fetchOffset equals to the high-watermark, we return 
empty-records but with complete offset metadata
   2) When the fetchOffset is beyond the 
high-watermark/max-offset-allowed-to-read, we return empty-records with 
message-only metadata. 
   
   We can reconsider the case whether to return message-only-offset metadata 
(or) complete-offset metadata when the fetch-offset is beyond the 
`max-offset-allowed-to-read`. 
([comment](https://github.com/apache/kafka/pull/15825#discussion_r1598841362))
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-13 Thread via GitHub


kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1598848735


##
core/src/main/scala/kafka/log/LocalLog.scala:
##
@@ -370,11 +370,12 @@ class LocalLog(@volatile private var _dir: File,
 throw new OffsetOutOfRangeException(s"Received request for offset 
$startOffset for partition $topicPartition, " +
   s"but we only have log segments upto $endOffset.")
 
-  if (startOffset == maxOffsetMetadata.messageOffset)
+  if (startOffset == maxOffsetMetadata.messageOffset) {
 emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns)
-  else if (startOffset > maxOffsetMetadata.messageOffset)
-emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), 
includeAbortedTxns)
-  else {
+  } else if (startOffset > maxOffsetMetadata.messageOffset) {
+// Instead of converting the `startOffset` to metadata, returning 
message-only metadata to avoid potential loop
+emptyFetchDataInfo(new LogOffsetMetadata(startOffset), 
includeAbortedTxns)

Review Comment:
   Agree, this patch is getting tricky. We want to validate all the scenarios 
especially when there is no data to read from the server, the number of fetch 
requests rate from the clients should be almost the same.
   
   To avoid/reduce the cases, Can we always resolve the maxOffsetMetadata to 
complete metadata?
https://github.com/apache/kafka/pull/15825#discussion_r1598841362



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-13 Thread via GitHub


kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1598848735


##
core/src/main/scala/kafka/log/LocalLog.scala:
##
@@ -370,11 +370,12 @@ class LocalLog(@volatile private var _dir: File,
 throw new OffsetOutOfRangeException(s"Received request for offset 
$startOffset for partition $topicPartition, " +
   s"but we only have log segments upto $endOffset.")
 
-  if (startOffset == maxOffsetMetadata.messageOffset)
+  if (startOffset == maxOffsetMetadata.messageOffset) {
 emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns)
-  else if (startOffset > maxOffsetMetadata.messageOffset)
-emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), 
includeAbortedTxns)
-  else {
+  } else if (startOffset > maxOffsetMetadata.messageOffset) {
+// Instead of converting the `startOffset` to metadata, returning 
message-only metadata to avoid potential loop
+emptyFetchDataInfo(new LogOffsetMetadata(startOffset), 
includeAbortedTxns)

Review Comment:
   Agree, this patch is getting tricky. We want to validate all the scenarios 
especially when there is no data to read from the server, the number of fetch 
requests rate from the clients should be almost the same.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-13 Thread via GitHub


kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1598841362


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
 * Given a message offset, find its corresponding offset metadata in the 
log.
-* If the message offset is out of range, throw an OffsetOutOfRangeException
+* 1. If the message offset is lesser than the log-start-offset, then throw 
an OffsetOutOfRangeException
+* 2. If the message offset is lesser than the local-log-start-offset, then 
it returns the message-only metadata
+* 3. If the message offset is greater than the log-end-offset, then it 
returns the message-only metadata
 */
-  private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata 
= {
+  private[log] def convertToOffsetMetadataOrThrow(offset: Long): 
LogOffsetMetadata = {

Review Comment:
   To solve the infinite loop, instead of returning the message-only 
`LogOffsetMetadata` when startOffset is beyond the maxOffsetMetadata. Can we 
retain the same behavior without the loop? Something like below:
   
   ```java
   def read(startOffset: Long,
  maxLength: Int,
  minOneMessage: Boolean,
  maxOffsetMetadata: LogOffsetMetadata,
  includeAbortedTxns: Boolean): FetchDataInfo = {
   maybeHandleIOException(s"Exception while reading from $topicPartition in 
dir ${dir.getParent}") {
 trace(s"Reading maximum $maxLength bytes at offset $startOffset from 
log with " +
   s"total length ${segments.sizeInBytes} bytes")
   
 val endOffsetMetadata = nextOffsetMetadata
 val endOffset = endOffsetMetadata.messageOffset
 val segmentOpt = segments.floorSegment(startOffset)
   
 // return error on attempt to read beyond the log end offset
 if (startOffset > endOffset || !segmentOpt.isPresent)
   throw new OffsetOutOfRangeException(s"Received request for offset 
$startOffset for partition $topicPartition, " +
 s"but we only have log segments upto $endOffset.")
   
 if (startOffset == maxOffsetMetadata.messageOffset) {
   emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns)
 } else if (startOffset > maxOffsetMetadata.messageOffset) {
  // Updated code to avoid the loop:
   val tmpFetchDataInfo = readFetchDataInfo(segmentOpt.get, 
startOffset, maxLength = 1, minOneMessage = false, nextOffsetMetadata, 
includeAbortedTxns = false)
   emptyFetchDataInfo(tmpFetchDataInfo.fetchOffsetMetadata, 
includeAbortedTxns)
 } else {
   readFetchDataInfo(segmentOpt.get, startOffset, maxLength, 
minOneMessage, maxOffsetMetadata, includeAbortedTxns)
 }
   }
 }
   
 private def readFetchDataInfo(segment: LogSegment,
   startOffset: Long,
   maxLength: Int,
   minOneMessage: Boolean,
   maxOffsetMetadata: LogOffsetMetadata,
   includeAbortedTxns: Boolean): FetchDataInfo 
= {
   // Do the read on the segment with a base offset less than the target 
offset
   // but if that segment doesn't contain any messages with an offset 
greater than that
   // continue to read from successive segments until we get some messages 
or we reach the end of the log
   var fetchDataInfo: FetchDataInfo = null
   var segmentOpt: Optional[LogSegment] = Optional.of(segment)
   while (fetchDataInfo == null && segmentOpt.isPresent) {
 val segment = segmentOpt.get
 val baseOffset = segment.baseOffset
   
 val maxPosition =
   // Use the max offset position if it is on this segment; otherwise, 
the segment size is the limit.
   if (maxOffsetMetadata.segmentBaseOffset == segment.baseOffset) 
maxOffsetMetadata.relativePositionInSegment
   else segment.size
   
 fetchDataInfo = segment.read(startOffset, maxLength, maxPosition, 
minOneMessage)
 if (fetchDataInfo != null) {
   if (includeAbortedTxns)
 fetchDataInfo = addAbortedTransactions(startOffset, segment, 
fetchDataInfo)
 } else segmentOpt = segments.higherSegment(baseOffset)
   }
   
   if (fetchDataInfo != null) fetchDataInfo
   else {
 // okay we are beyond the end of the last segment with no data fetched 
although the start offset is in range,
 // this can happen when all messages with offset larger than start 
offsets have been deleted.
 // In this case, we will return the empty set with log end offset 
metadata
 new FetchDataInfo(nextOffsetMetadata, MemoryRecords.EMPTY)
   }
 }
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the

Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-13 Thread via GitHub


kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1598841362


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
 * Given a message offset, find its corresponding offset metadata in the 
log.
-* If the message offset is out of range, throw an OffsetOutOfRangeException
+* 1. If the message offset is lesser than the log-start-offset, then throw 
an OffsetOutOfRangeException
+* 2. If the message offset is lesser than the local-log-start-offset, then 
it returns the message-only metadata
+* 3. If the message offset is greater than the log-end-offset, then it 
returns the message-only metadata
 */
-  private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata 
= {
+  private[log] def convertToOffsetMetadataOrThrow(offset: Long): 
LogOffsetMetadata = {

Review Comment:
   To solve the infinite loop, instead of returning the message-only 
`LogOffsetMetadata`. Can we retain the same behavior without the loop? 
Something like below:
   
   ```java
   def read(startOffset: Long,
  maxLength: Int,
  minOneMessage: Boolean,
  maxOffsetMetadata: LogOffsetMetadata,
  includeAbortedTxns: Boolean): FetchDataInfo = {
   maybeHandleIOException(s"Exception while reading from $topicPartition in 
dir ${dir.getParent}") {
 trace(s"Reading maximum $maxLength bytes at offset $startOffset from 
log with " +
   s"total length ${segments.sizeInBytes} bytes")
   
 val endOffsetMetadata = nextOffsetMetadata
 val endOffset = endOffsetMetadata.messageOffset
 val segmentOpt = segments.floorSegment(startOffset)
   
 // return error on attempt to read beyond the log end offset
 if (startOffset > endOffset || !segmentOpt.isPresent)
   throw new OffsetOutOfRangeException(s"Received request for offset 
$startOffset for partition $topicPartition, " +
 s"but we only have log segments upto $endOffset.")
   
 if (startOffset == maxOffsetMetadata.messageOffset) {
   emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns)
 } else if (startOffset > maxOffsetMetadata.messageOffset) {
  // Updated code to avoid the loop:
   val tmpFetchDataInfo = readFetchDataInfo(segmentOpt.get, 
startOffset, maxLength = 1, minOneMessage = false, nextOffsetMetadata, 
includeAbortedTxns = false)
   emptyFetchDataInfo(tmpFetchDataInfo.fetchOffsetMetadata, 
includeAbortedTxns)
 } else {
   readFetchDataInfo(segmentOpt.get, startOffset, maxLength, 
minOneMessage, maxOffsetMetadata, includeAbortedTxns)
 }
   }
 }
   
 private def readFetchDataInfo(segment: LogSegment,
   startOffset: Long,
   maxLength: Int,
   minOneMessage: Boolean,
   maxOffsetMetadata: LogOffsetMetadata,
   includeAbortedTxns: Boolean): FetchDataInfo 
= {
   // Do the read on the segment with a base offset less than the target 
offset
   // but if that segment doesn't contain any messages with an offset 
greater than that
   // continue to read from successive segments until we get some messages 
or we reach the end of the log
   var fetchDataInfo: FetchDataInfo = null
   var segmentOpt: Optional[LogSegment] = Optional.of(segment)
   while (fetchDataInfo == null && segmentOpt.isPresent) {
 val segment = segmentOpt.get
 val baseOffset = segment.baseOffset
   
 val maxPosition =
   // Use the max offset position if it is on this segment; otherwise, 
the segment size is the limit.
   if (maxOffsetMetadata.segmentBaseOffset == segment.baseOffset) 
maxOffsetMetadata.relativePositionInSegment
   else segment.size
   
 fetchDataInfo = segment.read(startOffset, maxLength, maxPosition, 
minOneMessage)
 if (fetchDataInfo != null) {
   if (includeAbortedTxns)
 fetchDataInfo = addAbortedTransactions(startOffset, segment, 
fetchDataInfo)
 } else segmentOpt = segments.higherSegment(baseOffset)
   }
   
   if (fetchDataInfo != null) fetchDataInfo
   else {
 // okay we are beyond the end of the last segment with no data fetched 
although the start offset is in range,
 // this can happen when all messages with offset larger than start 
offsets have been deleted.
 // In this case, we will return the empty set with log end offset 
metadata
 new FetchDataInfo(nextOffsetMetadata, MemoryRecords.EMPTY)
   }
 }
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To 

Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-13 Thread via GitHub


kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1598838884


##
core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala:
##
@@ -391,8 +398,10 @@ class UnifiedLogTest {
 assertNonEmptyFetch(log, offset, FetchIsolation.HIGH_WATERMARK)
   }
 
-  (log.highWatermark to log.logEndOffset).foreach { offset =>
-assertEmptyFetch(log, offset, FetchIsolation.HIGH_WATERMARK)
+  assertEmptyFetch(log, log.highWatermark, FetchIsolation.HIGH_WATERMARK)
+
+  (log.highWatermark + 1 to log.logEndOffset).foreach { offset =>

Review Comment:
   since `log.highWatermark` contains the full-metadata, divided the check into 
2 when reading at (or) beyond the high-watermark:
   
   1) When the fetchOffset equals to the high-watermark, we return 
empty-records but with complete offset metadata
   2) When the fetchOffset is beyond the 
high-watermark/max-offset-allowed-to-read, we return empty-records with 
message-only metadata. 
   
   We can reconsider the case whether to return message-only-offset metadata 
(or) complete-offset metadata when the fetch-offset is beyond the 
`max-offset-allowed-to-read`.
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-13 Thread via GitHub


kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1598832131


##
core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala:
##
@@ -164,18 +169,71 @@ class DelayedFetchTest {
 assertTrue(delayedFetch.tryComplete())
 assertTrue(delayedFetch.isCompleted)
 assertTrue(fetchResultOpt.isDefined)
+
+val fetchResult = fetchResultOpt.get
+assertEquals(Errors.NONE, fetchResult.error)
+  }
+
+  @ParameterizedTest(name = "testDelayedFetchWithInvalidHighWatermark 
minBytes={0}")
+  @ValueSource(ints = Array(1, 2))
+  def testDelayedFetchWithInvalidHighWatermark(minBytes: Int): Unit = {
+val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
+val fetchOffset = 450L
+val logStartOffset = 5L
+val currentLeaderEpoch = Optional.of[Integer](10)
+val replicaId = 1
+
+val fetchStatus = FetchPartitionStatus(
+  startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
+  fetchInfo = new FetchRequest.PartitionData(topicIdPartition.topicId, 
fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch))
+val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500, 
minBytes = minBytes)
+
+var fetchResultOpt: Option[FetchPartitionData] = None
+def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit 
= {
+  fetchResultOpt = Some(responses.head._2)
+}
+
+val delayedFetch = new DelayedFetch(
+  params = fetchParams,
+  fetchPartitionStatus = Seq(topicIdPartition -> fetchStatus),
+  replicaManager = replicaManager,
+  quota = replicaQuota,
+  responseCallback = callback
+)
+
+val partition: Partition = mock(classOf[Partition])
+
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition)).thenReturn(partition)
+// high-watermark is lesser than the log-start-offset
+val endOffsetMetadata = new LogOffsetMetadata(0L, 0L, 0)
+when(partition.fetchOffsetSnapshot(
+  currentLeaderEpoch,
+  fetchOnlyFromLeader = true))
+  .thenReturn(new LogOffsetSnapshot(0L, endOffsetMetadata, 
endOffsetMetadata, endOffsetMetadata))
+when(replicaManager.isAddingReplica(any(), anyInt())).thenReturn(false)
+expectReadFromReplica(fetchParams, topicIdPartition, 
fetchStatus.fetchInfo, Errors.NONE)
+
+val expected = minBytes == 1
+assertEquals(expected, delayedFetch.tryComplete())
+assertEquals(expected, delayedFetch.isCompleted)

Review Comment:
   In the test, the 
[LogOffsetSnapshot](https://github.com/apache/kafka/blob/trunk/storage/src/main/java/org/apache/kafka/storage/internals/log/LogOffsetSnapshot.java)
 contains message-only offset for logEndOffset, highWatermark, and 
lastStableOffset in DelayedFetchTest.java#207. So, the test passed with the 
newly added condition.
   
   In real scenario, we expect the LogOffsetSnapshot to contain the complete 
metadata for all the offsets.



##
storage/src/test/java/org/apache/kafka/storage/internals/log/LogOffsetMetadataTest.java:
##
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.KafkaException;
+import org.junit.jupiter.api.Test;
+
+import static 
org.apache.kafka.storage.internals.log.LogOffsetMetadata.UNKNOWN_OFFSET_METADATA;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class LogOffsetMetadataTest {
+
+@Test
+void testOnOlderSegment() {
+LogOffsetMetadata metadata1 = new LogOffsetMetadata(1L, 0L, 1);
+LogOffsetMetadata metadata2 = new LogOffsetMetadata(5L, 4L, 2);
+LogOffsetMetadata messageOnlyMetadata = new LogOffsetMetadata(1L);
+
assertFalse(UNKNOWN_OFFSET_METADATA.onOlderSegment(UNKNOWN_OFFSET_METADATA));
+assertFalse(metadata1.onOlderSegment(messageOnlyMetadata));
+assertFalse(messageOnlyMetadata.onOlderSegment(metadata1));
+assertFalse(metadata1.onOlderSegment(metadata1));
+

Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-13 Thread via GitHub


kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1598832131


##
core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala:
##
@@ -164,18 +169,71 @@ class DelayedFetchTest {
 assertTrue(delayedFetch.tryComplete())
 assertTrue(delayedFetch.isCompleted)
 assertTrue(fetchResultOpt.isDefined)
+
+val fetchResult = fetchResultOpt.get
+assertEquals(Errors.NONE, fetchResult.error)
+  }
+
+  @ParameterizedTest(name = "testDelayedFetchWithInvalidHighWatermark 
minBytes={0}")
+  @ValueSource(ints = Array(1, 2))
+  def testDelayedFetchWithInvalidHighWatermark(minBytes: Int): Unit = {
+val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
+val fetchOffset = 450L
+val logStartOffset = 5L
+val currentLeaderEpoch = Optional.of[Integer](10)
+val replicaId = 1
+
+val fetchStatus = FetchPartitionStatus(
+  startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
+  fetchInfo = new FetchRequest.PartitionData(topicIdPartition.topicId, 
fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch))
+val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500, 
minBytes = minBytes)
+
+var fetchResultOpt: Option[FetchPartitionData] = None
+def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit 
= {
+  fetchResultOpt = Some(responses.head._2)
+}
+
+val delayedFetch = new DelayedFetch(
+  params = fetchParams,
+  fetchPartitionStatus = Seq(topicIdPartition -> fetchStatus),
+  replicaManager = replicaManager,
+  quota = replicaQuota,
+  responseCallback = callback
+)
+
+val partition: Partition = mock(classOf[Partition])
+
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition)).thenReturn(partition)
+// high-watermark is lesser than the log-start-offset
+val endOffsetMetadata = new LogOffsetMetadata(0L, 0L, 0)
+when(partition.fetchOffsetSnapshot(
+  currentLeaderEpoch,
+  fetchOnlyFromLeader = true))
+  .thenReturn(new LogOffsetSnapshot(0L, endOffsetMetadata, 
endOffsetMetadata, endOffsetMetadata))
+when(replicaManager.isAddingReplica(any(), anyInt())).thenReturn(false)
+expectReadFromReplica(fetchParams, topicIdPartition, 
fetchStatus.fetchInfo, Errors.NONE)
+
+val expected = minBytes == 1
+assertEquals(expected, delayedFetch.tryComplete())
+assertEquals(expected, delayedFetch.isCompleted)

Review Comment:
   In the test, the 
[LogOffsetSnapshot](https://github.com/apache/kafka/blob/trunk/storage/src/main/java/org/apache/kafka/storage/internals/log/LogOffsetSnapshot.java)
 contains message-only offset for logEndOffset, highWatermark, and 
lastStableOffset in DelayedFetchTest.java#207. So, the test passed with the 
newly added condition.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-10 Thread via GitHub


junrao commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1597073119


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
 * Given a message offset, find its corresponding offset metadata in the 
log.
-* If the message offset is out of range, throw an OffsetOutOfRangeException
+* 1. If the message offset is less than the log-start-offset (or) 
local-log-start-offset, then it returns the
+   * message-only metadata.
+* 2. If the message offset is beyond the log-end-offset, then it returns 
the message-only metadata.
+* 3. For all other cases, it returns the offset metadata from the log.
 */
-  private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata 
= {
-checkLogStartOffset(offset)
-localLog.convertToOffsetMetadataOrThrow(offset)
+  private[log] def convertToOffsetMetadata(offset: Long): LogOffsetMetadata = {

Review Comment:
   convertToOffsetMetadata => maybeConvertToOffsetMetadata ?



##
core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala:
##
@@ -164,18 +169,71 @@ class DelayedFetchTest {
 assertTrue(delayedFetch.tryComplete())
 assertTrue(delayedFetch.isCompleted)
 assertTrue(fetchResultOpt.isDefined)
+
+val fetchResult = fetchResultOpt.get
+assertEquals(Errors.NONE, fetchResult.error)
+  }
+
+  @ParameterizedTest(name = "testDelayedFetchWithInvalidHighWatermark 
minBytes={0}")
+  @ValueSource(ints = Array(1, 2))
+  def testDelayedFetchWithInvalidHighWatermark(minBytes: Int): Unit = {
+val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
+val fetchOffset = 450L
+val logStartOffset = 5L
+val currentLeaderEpoch = Optional.of[Integer](10)
+val replicaId = 1
+
+val fetchStatus = FetchPartitionStatus(
+  startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
+  fetchInfo = new FetchRequest.PartitionData(topicIdPartition.topicId, 
fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch))
+val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500, 
minBytes = minBytes)
+
+var fetchResultOpt: Option[FetchPartitionData] = None
+def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit 
= {
+  fetchResultOpt = Some(responses.head._2)
+}
+
+val delayedFetch = new DelayedFetch(
+  params = fetchParams,
+  fetchPartitionStatus = Seq(topicIdPartition -> fetchStatus),
+  replicaManager = replicaManager,
+  quota = replicaQuota,
+  responseCallback = callback
+)
+
+val partition: Partition = mock(classOf[Partition])
+
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition)).thenReturn(partition)
+// high-watermark is lesser than the log-start-offset
+val endOffsetMetadata = new LogOffsetMetadata(0L, 0L, 0)
+when(partition.fetchOffsetSnapshot(
+  currentLeaderEpoch,
+  fetchOnlyFromLeader = true))
+  .thenReturn(new LogOffsetSnapshot(0L, endOffsetMetadata, 
endOffsetMetadata, endOffsetMetadata))
+when(replicaManager.isAddingReplica(any(), anyInt())).thenReturn(false)
+expectReadFromReplica(fetchParams, topicIdPartition, 
fetchStatus.fetchInfo, Errors.NONE)
+
+val expected = minBytes == 1
+assertEquals(expected, delayedFetch.tryComplete())
+assertEquals(expected, delayedFetch.isCompleted)

Review Comment:
   This exposes an issue in delayedFetch. If HWM is less than fetchOffset, we 
haven't gained any bytes. So, we shouldn't complete the delayedFetch 
immediately.



##
storage/src/test/java/org/apache/kafka/storage/internals/log/LogOffsetMetadataTest.java:
##
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.KafkaException;
+import org.junit.jupiter.api.Test;
+
+import static 
org.apache.kafka.storage.internals.log.LogOffsetMetadata.UNKNOWN_OFFSET_METADATA;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static 

Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-10 Thread via GitHub


jsancio commented on PR #15825:
URL: https://github.com/apache/kafka/pull/15825#issuecomment-2105087021

   @kamalcph, looks like a bug to me. The predicate should be `if 
(!hwm.messageOffsetOnly)` or the if/else blocks should be swapped. I suspect 
that we haven't noticed this bug in the KRaft implementation 
(`KafkaRaftClient`) because kraft never looks at the segment and byte position 
for the HWM.
   
   If you are going to fix this code, do you mind adding a test for this case? 
Since `KafkaMetadataLog` calls `UnifiedLog.fetchOffsetSnapshot`, 
`hwm.messageOffsetOnly` should always be false.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-09 Thread via GitHub


kamalcph commented on PR #15825:
URL: https://github.com/apache/kafka/pull/15825#issuecomment-2103912972

   While going through the usages, it looks to me that the LogOffsetMetadata 
conversion happens in the KafkaMetadataLog is not correct. Could someone please 
double check?
   
   ```
   org.apache.kafka.storage.internals.log.LogOffsetMetadata -> 
org.apache.kafka.raft.LogOffsetMetadata
   ```
   
   
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala#L226
   
   Question: Why do we make the  
org.apache.kafka.raft.LogOffsetMetadata#segmentPosition as empty when 
hwm.messageOffsetOnly is false?
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-09 Thread via GitHub


kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1596278649


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
 * Given a message offset, find its corresponding offset metadata in the 
log.
-* If the message offset is out of range, throw an OffsetOutOfRangeException
+* 1. If the message offset is lesser than the log-start-offset, then throw 
an OffsetOutOfRangeException

Review Comment:
   Removed the `checkLogStartOffset` from the `convertToOffsetMetadataOrThrow` 
method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-09 Thread via GitHub


kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1596276757


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
 * Given a message offset, find its corresponding offset metadata in the 
log.
-* If the message offset is out of range, throw an OffsetOutOfRangeException
+* 1. If the message offset is lesser than the log-start-offset, then throw 
an OffsetOutOfRangeException
+* 2. If the message offset is lesser than the local-log-start-offset, then 
it returns the message-only metadata
+* 3. If the message offset is greater than the log-end-offset, then it 
returns the message-only metadata
 */
-  private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata 
= {
+  private[log] def convertToOffsetMetadataOrThrow(offset: Long): 
LogOffsetMetadata = {

Review Comment:
   This is a potential loop (not sure when it would be triggered), updated the 
logic to return the message-only metadata. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-09 Thread via GitHub


junrao commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1596055964


##
core/src/main/scala/kafka/server/DelayedFetch.scala:
##
@@ -92,7 +92,10 @@ class DelayedFetch(
 // has just rolled, then the high watermark offset will remain the 
same but be on the old segment,
 // which would incorrectly be seen as an instance of Case F.
 if (endOffset.messageOffset != fetchOffset.messageOffset) {
-  if (endOffset.onOlderSegment(fetchOffset)) {
+  if (endOffset.messageOffsetOnly() || 
fetchOffset.messageOffsetOnly()) {
+// This case is to handle the stale high-watermark on the 
leader until it gets updated with the correct value

Review Comment:
   Perhaps change to sth like the following.
   
   "If we don't know the position of the offset on log segments, just 
pessimistically assume that we only gained 1 byte. 
This can happen when the high watermark is stale, but should be rare."  



##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
 * Given a message offset, find its corresponding offset metadata in the 
log.
-* If the message offset is out of range, throw an OffsetOutOfRangeException
+* 1. If the message offset is lesser than the log-start-offset, then throw 
an OffsetOutOfRangeException
+* 2. If the message offset is lesser than the local-log-start-offset, then 
it returns the message-only metadata
+* 3. If the message offset is greater than the log-end-offset, then it 
returns the message-only metadata
 */
-  private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata 
= {
+  private[log] def convertToOffsetMetadataOrThrow(offset: Long): 
LogOffsetMetadata = {

Review Comment:
   `LocalLog.read()` also calls `convertToOffsetMetadataOrThrow`.
   
   ```
 else if (startOffset > maxOffsetMetadata.messageOffset)
   emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), 
includeAbortedTxns)
   ```
   It seems this could lead to infinite recursion. To avoid that, we could 
change the above code to avoid calling `convertToOffsetMetadataOrThrow` and 
return a message-only `LogOffsetMetadata` instead to `emptyFetchDataInfo`.



##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
 * Given a message offset, find its corresponding offset metadata in the 
log.
-* If the message offset is out of range, throw an OffsetOutOfRangeException
+* 1. If the message offset is lesser than the log-start-offset, then throw 
an OffsetOutOfRangeException

Review Comment:
   Good point. Since we change the logic such that it's ok not to have the 
metadata for an offset, we could just skip the call to `checkLogStartOffset`.



##
core/src/main/scala/kafka/server/DelayedFetch.scala:
##
@@ -92,7 +92,10 @@ class DelayedFetch(
 // has just rolled, then the high watermark offset will remain the 
same but be on the old segment,
 // which would incorrectly be seen as an instance of Case F.
 if (endOffset.messageOffset != fetchOffset.messageOffset) {
-  if (endOffset.onOlderSegment(fetchOffset)) {
+  if (endOffset.messageOffsetOnly() || 
fetchOffset.messageOffsetOnly()) {

Review Comment:
   `fetchOffset` typically shouldn't be message only. But it doesn't hurt to 
have the check.



##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
 * Given a message offset, find its corresponding offset metadata in the 
log.
-* If the message offset is out of range, throw an OffsetOutOfRangeException
+* 1. If the message offset is lesser than the log-start-offset, then throw 
an OffsetOutOfRangeException
+* 2. If the message offset is lesser than the local-log-start-offset, then 
it returns the message-only metadata

Review Comment:
   lesser => less



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-09 Thread via GitHub


kamalcph commented on PR #15825:
URL: https://github.com/apache/kafka/pull/15825#issuecomment-2103078761

   Test failures are unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-08 Thread via GitHub


showuon commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1593525765


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
 * Given a message offset, find its corresponding offset metadata in the 
log.
-* If the message offset is out of range, throw an OffsetOutOfRangeException
+* 1. If the message offset is lesser than the log-start-offset, then throw 
an OffsetOutOfRangeException

Review Comment:
   > While updating the 
[UnifiedLog#updateLogStartOffset](https://sourcegraph.com/github.com/apache/kafka@5f933ac840343429e911f5706ccb7cd8dc379462/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L535),
 the HWM also gets updated and it doesn't hit the fetchHighWatermarkMetadata 
(or) convertToOffsetMetadataOrThrow so the call will succeed even when 
current-log-start-offset > old-HWM.
   
   For 
[UnifiedLog#updateLogStartOffset](https://sourcegraph.com/github.com/apache/kafka@5f933ac840343429e911f5706ccb7cd8dc379462/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L535),
 I think it's safe we didn't throw exception when `current-log-start-offset > 
old-HWM` because it will be called when initialization or 
`maybeIncrementLogStartOffset`. In the latter, we've checked `newLogStartOffset 
> logStartOffset` already.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-08 Thread via GitHub


showuon commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1593511224


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
 * Given a message offset, find its corresponding offset metadata in the 
log.
-* If the message offset is out of range, throw an OffsetOutOfRangeException
+* 1. If the message offset is lesser than the log-start-offset, then throw 
an OffsetOutOfRangeException

Review Comment:
   Ah, OK. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-08 Thread via GitHub


kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1593471183


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
 * Given a message offset, find its corresponding offset metadata in the 
log.
-* If the message offset is out of range, throw an OffsetOutOfRangeException
+* 1. If the message offset is lesser than the log-start-offset, then throw 
an OffsetOutOfRangeException

Review Comment:
   Should we avoid throwing the error and return message-only metadata when the 
offset is lesser than the log-start-offset?
   
   While updating the 
[UnifiedLog#updateLogStartOffset](https://sourcegraph.com/github.com/apache/kafka@5f933ac840343429e911f5706ccb7cd8dc379462/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L535),
 the HWM also gets updated and it doesn't hit the `fetchHighWatermarkMetadata` 
(or) `convertToOffsetMetadataOrThrow` so the call will succeed even when 
current-log-start-offset > old-HWM.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-08 Thread via GitHub


kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1593471183


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
 * Given a message offset, find its corresponding offset metadata in the 
log.
-* If the message offset is out of range, throw an OffsetOutOfRangeException
+* 1. If the message offset is lesser than the log-start-offset, then throw 
an OffsetOutOfRangeException

Review Comment:
   Should we avoid throwing the error and return message-only metadata when the 
offset is lesser than the log-start-offset?
   
   While updating the 
[UnifiedLog#updateLogStartOffset](https://sourcegraph.com/github.com/apache/kafka@5f933ac840343429e911f5706ccb7cd8dc379462/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L535),
 the HWM also gets updated and it doesn't hit the `fetchHighWatermarkMetadata` 
(or) `convertToOffsetMetadataOrThrow` so the call will succeed even when 
log-start-offset > old-HWM.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-08 Thread via GitHub


kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1593471183


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
 * Given a message offset, find its corresponding offset metadata in the 
log.
-* If the message offset is out of range, throw an OffsetOutOfRangeException
+* 1. If the message offset is lesser than the log-start-offset, then throw 
an OffsetOutOfRangeException

Review Comment:
   Should we avoid throwing the error and return message-only metadata when the 
offset is lesser than the log-start-offset?
   
   While updating the 
[UnifiedLog#updateLogStartOffset](https://sourcegraph.com/github.com/apache/kafka@5f933ac840343429e911f5706ccb7cd8dc379462/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L535),
 the HWM also gets updated and it doesn't hit the `fetchHighWatermarkMetadata` 
(or) `convertToOffsetMetadataOrThrow` so the call will succeed even when 
log-start-offset > HWM.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-08 Thread via GitHub


kamalcph commented on PR #15825:
URL: https://github.com/apache/kafka/pull/15825#issuecomment-2099844734

   > Thanks for the PR. One question: So when we temporarily set high-watermark 
as ` LogOffsetMetadata(0)` for the leader, we're waiting for the high-watermark 
gets updated after followers fetch from the leader, right?
   
   yes, the call to 
[maybeIncrementLeaderHW](https://sourcegraph.com/github.com/apache/kafka@5f933ac840343429e911f5706ccb7cd8dc379462/-/blob/core/src/main/scala/kafka/cluster/Partition.scala?L1166)
 will succeed when the node becomes leader for the partition. Note that if the 
current node is the only alive replica, then the high-watermark gets updated to 
the leader-log-end-offset. The behavior is same for both normal and 
remote-storage enabled topic.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-07 Thread via GitHub


kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1593363337


##
core/src/main/scala/kafka/server/DelayedFetch.scala:
##
@@ -92,7 +92,10 @@ class DelayedFetch(
 // has just rolled, then the high watermark offset will remain the 
same but be on the old segment,
 // which would incorrectly be seen as an instance of Case F.
 if (endOffset.messageOffset != fetchOffset.messageOffset) {
-  if (endOffset.onOlderSegment(fetchOffset)) {
+  if (endOffset.messageOffsetOnly() || 
fetchOffset.messageOffsetOnly()) {

Review Comment:
   `fetchOffset` can be message-only metadata when there is a diverging-epoch. 
If there is a diverged-epoch in the LogReadResults, then it won't enter the 
DelayedFetch. We can remove the check if it is not required. 



##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
 * Given a message offset, find its corresponding offset metadata in the 
log.
-* If the message offset is out of range, throw an OffsetOutOfRangeException
+* 1. If the message offset is lesser than the log-start-offset, then throw 
an OffsetOutOfRangeException

Review Comment:
   `checkLogStartOffset` will throw `OffsetOutOfRangeException` if the offset 
is lesser than the `logStartOffset`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-07 Thread via GitHub


showuon commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1593331212


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
 * Given a message offset, find its corresponding offset metadata in the 
log.
-* If the message offset is out of range, throw an OffsetOutOfRangeException
+* 1. If the message offset is lesser than the log-start-offset, then throw 
an OffsetOutOfRangeException

Review Comment:
   For case 1, it looks like we never throw `OffsetOutOfRangeException` now, 
doesn't it?



##
core/src/main/scala/kafka/server/DelayedFetch.scala:
##
@@ -92,7 +92,10 @@ class DelayedFetch(
 // has just rolled, then the high watermark offset will remain the 
same but be on the old segment,
 // which would incorrectly be seen as an instance of Case F.
 if (endOffset.messageOffset != fetchOffset.messageOffset) {
-  if (endOffset.onOlderSegment(fetchOffset)) {
+  if (endOffset.messageOffsetOnly() || 
fetchOffset.messageOffsetOnly()) {

Review Comment:
   I can understand the `endOffset.messageOffsetOnly()` case since the leader's 
high watermark is still not updated. But when will `fetchOffset` be 
`messageOffsetOnly`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-07 Thread via GitHub


kamalcph commented on PR #15825:
URL: https://github.com/apache/kafka/pull/15825#issuecomment-2097869168

   @junrao @satishd @chia7712 @showuon 
   
   Updated the test plan in the summary. Verified that the patch fixes the 
issue by running the trunk and patched build. With the fix, the high-watermark 
value gets updated to the valid offset. Please take a look when you get chance.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-01 Thread via GitHub


junrao commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1586835182


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogOffsetMetadata.java:
##
@@ -28,6 +28,7 @@ public final class LogOffsetMetadata {
 
 //TODO KAFKA-14484 remove once UnifiedLog has been moved to the storage 
module
 private static final long UNIFIED_LOG_UNKNOWN_OFFSET = -1L;
+public static final long REMOTE_LOG_UNKNOWN_OFFSET = -2L;

Review Comment:
   We probably don't need this. The existing UNIFIED_LOG_UNKNOWN_OFFSET should 
be enough.



##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogOffsetMetadata.java:
##
@@ -51,6 +57,8 @@ public LogOffsetMetadata(long messageOffset,
 
 // check if this offset is already on an older segment compared with the 
given offset
 public boolean onOlderSegment(LogOffsetMetadata that) {
+if (this.segmentBaseOffset == REMOTE_LOG_UNKNOWN_OFFSET || 
that.segmentBaseOffset == REMOTE_LOG_UNKNOWN_OFFSET)

Review Comment:
   We probably don't need this. If messageOffsetOnly() is true, we can just 
return false.



##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogOffsetMetadata.java:
##
@@ -65,6 +73,8 @@ private boolean onSameSegment(LogOffsetMetadata that) {
 // compute the number of bytes between this offset to the given offset
 // if they are on the same segment and this offset precedes the given 
offset
 public int positionDiff(LogOffsetMetadata that) {
+if (this.segmentBaseOffset == REMOTE_LOG_UNKNOWN_OFFSET || 
that.segmentBaseOffset == REMOTE_LOG_UNKNOWN_OFFSET)
+return 1;

Review Comment:
   It's a bit hacky to do this here. I was thinking of doing this in 
DelayedFetch.
   
   ```
   if (endOffset.messageOffset != fetchOffset.messageOffset) {
 if (endOffset.messageOnly() || fetchOffset.messageOnly()) {
accumulatedSize += 1
 } else if (endOffset.onOlderSegment(fetchOffset)) {
   // Case F, this can happen when the new fetch operation is 
on a truncated leader
   ...
   }
   ```



##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1428,7 +1428,11 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 */
   private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata 
= {
 checkLogStartOffset(offset)
-localLog.convertToOffsetMetadataOrThrow(offset)
+if (remoteLogEnabled() && offset < localLogStartOffset()) {
+  new LogOffsetMetadata(offset, 
LogOffsetMetadata.REMOTE_LOG_UNKNOWN_OFFSET)
+} else {
+  localLog.convertToOffsetMetadataOrThrow(offset)

Review Comment:
   I was thinking that we could change 
localLog.convertToOffsetMetadataOrThrow() such that if read() throws an 
exception, it just returns a message-only LogOffsetMetadata.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-04-28 Thread via GitHub


kamalcph opened a new pull request, #15825:
URL: https://github.com/apache/kafka/pull/15825

   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org