junrao commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1597073119


##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
     * Given a message offset, find its corresponding offset metadata in the 
log.
-    * If the message offset is out of range, throw an OffsetOutOfRangeException
+    * 1. If the message offset is less than the log-start-offset (or) 
local-log-start-offset, then it returns the
+   *     message-only metadata.
+    * 2. If the message offset is beyond the log-end-offset, then it returns 
the message-only metadata.
+    * 3. For all other cases, it returns the offset metadata from the log.
     */
-  private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata 
= {
-    checkLogStartOffset(offset)
-    localLog.convertToOffsetMetadataOrThrow(offset)
+  private[log] def convertToOffsetMetadata(offset: Long): LogOffsetMetadata = {

Review Comment:
   convertToOffsetMetadata => maybeConvertToOffsetMetadata ?



##########
core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala:
##########
@@ -164,18 +169,71 @@ class DelayedFetchTest {
     assertTrue(delayedFetch.tryComplete())
     assertTrue(delayedFetch.isCompleted)
     assertTrue(fetchResultOpt.isDefined)
+
+    val fetchResult = fetchResultOpt.get
+    assertEquals(Errors.NONE, fetchResult.error)
+  }
+
+  @ParameterizedTest(name = "testDelayedFetchWithInvalidHighWatermark 
minBytes={0}")
+  @ValueSource(ints = Array(1, 2))
+  def testDelayedFetchWithInvalidHighWatermark(minBytes: Int): Unit = {
+    val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
+    val fetchOffset = 450L
+    val logStartOffset = 5L
+    val currentLeaderEpoch = Optional.of[Integer](10)
+    val replicaId = 1
+
+    val fetchStatus = FetchPartitionStatus(
+      startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
+      fetchInfo = new FetchRequest.PartitionData(topicIdPartition.topicId, 
fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch))
+    val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500, 
minBytes = minBytes)
+
+    var fetchResultOpt: Option[FetchPartitionData] = None
+    def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit 
= {
+      fetchResultOpt = Some(responses.head._2)
+    }
+
+    val delayedFetch = new DelayedFetch(
+      params = fetchParams,
+      fetchPartitionStatus = Seq(topicIdPartition -> fetchStatus),
+      replicaManager = replicaManager,
+      quota = replicaQuota,
+      responseCallback = callback
+    )
+
+    val partition: Partition = mock(classOf[Partition])
+    
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition)).thenReturn(partition)
+    // high-watermark is lesser than the log-start-offset
+    val endOffsetMetadata = new LogOffsetMetadata(0L, 0L, 0)
+    when(partition.fetchOffsetSnapshot(
+      currentLeaderEpoch,
+      fetchOnlyFromLeader = true))
+      .thenReturn(new LogOffsetSnapshot(0L, endOffsetMetadata, 
endOffsetMetadata, endOffsetMetadata))
+    when(replicaManager.isAddingReplica(any(), anyInt())).thenReturn(false)
+    expectReadFromReplica(fetchParams, topicIdPartition, 
fetchStatus.fetchInfo, Errors.NONE)
+
+    val expected = minBytes == 1
+    assertEquals(expected, delayedFetch.tryComplete())
+    assertEquals(expected, delayedFetch.isCompleted)

Review Comment:
   This exposes an issue in delayedFetch. If HWM is less than fetchOffset, we 
haven't gained any bytes. So, we shouldn't complete the delayedFetch 
immediately.



##########
storage/src/test/java/org/apache/kafka/storage/internals/log/LogOffsetMetadataTest.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.KafkaException;
+import org.junit.jupiter.api.Test;
+
+import static 
org.apache.kafka.storage.internals.log.LogOffsetMetadata.UNKNOWN_OFFSET_METADATA;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class LogOffsetMetadataTest {
+
+    @Test
+    void testOnOlderSegment() {
+        LogOffsetMetadata metadata1 = new LogOffsetMetadata(1L, 0L, 1);
+        LogOffsetMetadata metadata2 = new LogOffsetMetadata(5L, 4L, 2);
+        LogOffsetMetadata messageOnlyMetadata = new LogOffsetMetadata(1L);
+        
assertFalse(UNKNOWN_OFFSET_METADATA.onOlderSegment(UNKNOWN_OFFSET_METADATA));
+        assertFalse(metadata1.onOlderSegment(messageOnlyMetadata));
+        assertFalse(messageOnlyMetadata.onOlderSegment(metadata1));
+        assertFalse(metadata1.onOlderSegment(metadata1));
+        assertFalse(metadata2.onOlderSegment(metadata1));
+        assertTrue(metadata1.onOlderSegment(metadata2));
+    }
+
+    @Test
+    void testPositionDiff() {
+        LogOffsetMetadata metadata1 = new LogOffsetMetadata(1L);
+        LogOffsetMetadata metadata2 = new LogOffsetMetadata(5L, 0L, 5);
+        KafkaException exception = assertThrows(KafkaException.class, () -> 
metadata1.positionDiff(metadata2));
+        assertTrue(exception.getMessage().endsWith("since it only has message 
offset info"));
+
+        exception = assertThrows(KafkaException.class, () -> 
metadata2.positionDiff(metadata1));
+        assertTrue(exception.getMessage().endsWith("since they are not on the 
same segment"));
+
+        LogOffsetMetadata metadata3 = new LogOffsetMetadata(15L, 10L, 5);
+        exception = assertThrows(KafkaException.class, () -> 
metadata3.positionDiff(metadata2));
+        assertTrue(exception.getMessage().endsWith("since they are not on the 
same segment"));
+
+        LogOffsetMetadata metadata4 = new LogOffsetMetadata(40L, 10L, 100);
+        assertEquals(95, metadata4.positionDiff(metadata3));
+    }
+
+    @Test
+    void testMessageOffsetOnly() {
+        LogOffsetMetadata metadata1 = new LogOffsetMetadata(1L);
+        LogOffsetMetadata metadata2 = new LogOffsetMetadata(1L, 0L, 1);
+        assertFalse(UNKNOWN_OFFSET_METADATA.messageOffsetOnly());

Review Comment:
   It's more natural for UNKNOWN_OFFSET_METADATA to be message offset only. 



##########
core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala:
##########
@@ -391,8 +398,10 @@ class UnifiedLogTest {
         assertNonEmptyFetch(log, offset, FetchIsolation.HIGH_WATERMARK)
       }
 
-      (log.highWatermark to log.logEndOffset).foreach { offset =>
-        assertEmptyFetch(log, offset, FetchIsolation.HIGH_WATERMARK)
+      assertEmptyFetch(log, log.highWatermark, FetchIsolation.HIGH_WATERMARK)
+
+      (log.highWatermark + 1 to log.logEndOffset).foreach { offset =>

Review Comment:
   Why are we changing to start from log.highWatermark + 1 instead of 
log.highWatermark?



##########
core/src/main/scala/kafka/log/LocalLog.scala:
##########
@@ -370,11 +370,12 @@ class LocalLog(@volatile private var _dir: File,
         throw new OffsetOutOfRangeException(s"Received request for offset 
$startOffset for partition $topicPartition, " +
           s"but we only have log segments upto $endOffset.")
 
-      if (startOffset == maxOffsetMetadata.messageOffset)
+      if (startOffset == maxOffsetMetadata.messageOffset) {
         emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns)
-      else if (startOffset > maxOffsetMetadata.messageOffset)
-        emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), 
includeAbortedTxns)
-      else {
+      } else if (startOffset > maxOffsetMetadata.messageOffset) {
+        // Instead of converting the `startOffset` to metadata, returning 
message-only metadata to avoid potential loop
+        emptyFetchDataInfo(new LogOffsetMetadata(startOffset), 
includeAbortedTxns)

Review Comment:
   We have the following code below.
   
   ```
               if (maxOffsetMetadata.segmentBaseOffset == segment.baseOffset) 
maxOffsetMetadata.relativePositionInSegment
               else segment.size
   ```
   
   We need to be more careful there given maxOffsetMetadata may not have a 
corresponding relativePositionInSegment.
   
   If maxOffsetMetadata.segmentBaseOffset is -1, we can return empty since 
maxOffsetMetadata.offset is not on local log segments.
   
   If maxOffsetMetadata.segmentBaseOffset equals to segment.baseOffset, we can 
use maxOffsetMetadata.relativePositionInSegment.
   
   If maxOffsetMetadata.segmentBaseOffset is larger than segment.baseOffset, we 
can use segment.size.
   
   If maxOffsetMetadata.segmentBaseOffset is smaller than segment.baseOffset, 
we return empty.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to