junrao commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1607381004


##########
core/src/main/scala/kafka/server/DelayedFetch.scala:
##########
@@ -92,7 +92,14 @@ class DelayedFetch(
             // has just rolled, then the high watermark offset will remain the 
same but be on the old segment,
             // which would incorrectly be seen as an instance of Case F.
             if (endOffset.messageOffset != fetchOffset.messageOffset) {
-              if (endOffset.onOlderSegment(fetchOffset)) {
+              if (endOffset.messageOffsetOnly() || 
fetchOffset.messageOffsetOnly()) {
+                // If we don't know the position of the offset on log 
segments, just pessimistically assume that we
+                // only gained 1 byte when fetchOffset < endOffset, otherwise 
do nothing. This can happen when the
+                // high-watermark is stale, but should be rare.
+                if (fetchOffset.messageOffset < endOffset.messageOffset) {

Review Comment:
   We can organize the code a bit clearer. I am thinking of sth like the 
following.
   
   ```
               if (fetchOffset.messageOffset > endOffset.messageOffset) {
                   // Case F, this can happen when the new fetch operation is 
on a truncated leader
                   debug(s"Satisfying fetch $this since it is fetching later 
segments of partition $topicIdPartition.")
                   return forceComplete()
            } else if (fetchOffset.messageOffset < endOffset.messageOffset) {
                 if (endOffset.messageOffsetOnly() || 
fetchOffset.messageOffsetOnly()) {
                   // If we don't know the position of the offset on log 
segments, just pessimistically assume that we
                   // only gained 1 byte when fetchOffset < endOffset, 
otherwise do nothing. This can happen when the
                   // high-watermark is stale, but should be rare.
                     accumulatedSize += 1
                 } else if (fetchOffset.onOlderSegment(endOffset)) {
                   // Case F, this can happen when the fetch operation is 
falling behind the current segment
                   // or the partition has just rolled a new segment
                   debug(s"Satisfying fetch $this immediately since it is 
fetching older segments.")
                   // We will not force complete the fetch request if a replica 
should be throttled.
                   if (!params.isFromFollower || 
!replicaManager.shouldLeaderThrottle(quota, partition, params.replicaId))
                     return forceComplete()
                 } else if (fetchOffset.onSameSegment(endOffset)) {
                   // we take the partition fetch size as upper bound when 
accumulating the bytes (skip if a throttled partition)
                   val bytesAvailable = 
math.min(endOffset.positionDiff(fetchOffset), fetchStatus.fetchInfo.maxBytes)
                   if (!params.isFromFollower || 
!replicaManager.shouldLeaderThrottle(quota, partition, params.replicaId))
                     accumulatedSize += bytesAvailable
                 }
   
   ```



##########
core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala:
##########
@@ -164,18 +169,71 @@ class DelayedFetchTest {
     assertTrue(delayedFetch.tryComplete())
     assertTrue(delayedFetch.isCompleted)
     assertTrue(fetchResultOpt.isDefined)
+
+    val fetchResult = fetchResultOpt.get
+    assertEquals(Errors.NONE, fetchResult.error)
+  }
+
+  @ParameterizedTest(name = "testDelayedFetchWithInvalidHighWatermark 
endOffset={0}")
+  @ValueSource(longs = Array(0, 500))
+  def testDelayedFetchWithInvalidHighWatermark(endOffset: Long): Unit = {
+    val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
+    val fetchOffset = 450L
+    val logStartOffset = 5L
+    val currentLeaderEpoch = Optional.of[Integer](10)
+    val replicaId = 1
+
+    val fetchStatus = FetchPartitionStatus(
+      startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
+      fetchInfo = new FetchRequest.PartitionData(topicIdPartition.topicId, 
fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch))
+    val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500)
+
+    var fetchResultOpt: Option[FetchPartitionData] = None
+    def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit 
= {
+      fetchResultOpt = Some(responses.head._2)
+    }
+
+    val delayedFetch = new DelayedFetch(
+      params = fetchParams,
+      fetchPartitionStatus = Seq(topicIdPartition -> fetchStatus),
+      replicaManager = replicaManager,
+      quota = replicaQuota,
+      responseCallback = callback
+    )
+
+    val partition: Partition = mock(classOf[Partition])
+    
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition)).thenReturn(partition)
+    // Note that the high-watermark does not contains the complete metadata

Review Comment:
   does not contains => does not contain



##########
core/src/main/scala/kafka/log/LocalLog.scala:
##########
@@ -370,11 +370,17 @@ class LocalLog(@volatile private var _dir: File,
         throw new OffsetOutOfRangeException(s"Received request for offset 
$startOffset for partition $topicPartition, " +
           s"but we only have log segments upto $endOffset.")
 
-      if (startOffset == maxOffsetMetadata.messageOffset)
+      if (startOffset == maxOffsetMetadata.messageOffset) {
         emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns)
-      else if (startOffset > maxOffsetMetadata.messageOffset)
-        emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), 
includeAbortedTxns)
-      else {
+      } else if (startOffset > maxOffsetMetadata.messageOffset ||
+        maxOffsetMetadata.messageOffsetOnly() ||
+        maxOffsetMetadata.segmentBaseOffset < segmentOpt.get().baseOffset()) {
+        // We need to be careful before reading the segment as 
`maxOffsetMetadata` may not be a complete metadata:
+        // 1. If maxOffsetMetadata is message-offset-only, then return empty 
fetchDataInfo since
+        // maxOffsetMetadata.offset is not on local log segments.
+        // 2. If maxOffsetMetadata.segmentBaseOffset is smaller than 
segment.baseOffset, then return empty fetchDataInfo.

Review Comment:
   I am still not sure about calling 
`convertToOffsetMetadataOrThrow(startOffset)` recursively. If nextOffset 
doesn't change, we could still get into an infinite loop. It's rare for us to 
hit this path. But if we do hit this path, it's more important to prevent the 
infinite loop than getting the correct metadata of startOffset. If we want to 
get the correct metadata of startOffset, we could break this method into two as 
Kamal suggested in 
https://github.com/apache/kafka/pull/15825#discussion_r1598841362.



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
     * Given a message offset, find its corresponding offset metadata in the 
log.
-    * If the message offset is out of range, throw an OffsetOutOfRangeException
+    * 1. If the message offset is lesser than the log-start-offset, then throw 
an OffsetOutOfRangeException
+    * 2. If the message offset is lesser than the local-log-start-offset, then 
it returns the message-only metadata
+    * 3. If the message offset is greater than the log-end-offset, then it 
returns the message-only metadata
     */
-  private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata 
= {
+  private[log] def convertToOffsetMetadataOrThrow(offset: Long): 
LogOffsetMetadata = {

Review Comment:
   It's true that `read()` will pick up the latest `nextOffsetMetadata` if it 
changes. However, if nextOffsetMetadata doesn't change and somehow startOffset 
> maxOffsetMetadata.messageOffset, then we could loop forever.



##########
storage/src/test/java/org/apache/kafka/tiered/storage/actions/EraseBrokerStorageAction.java:
##########
@@ -19,24 +19,35 @@
 import org.apache.kafka.tiered.storage.TieredStorageTestAction;
 import org.apache.kafka.tiered.storage.TieredStorageTestContext;
 
+import java.io.FilenameFilter;
 import java.io.IOException;
 import java.io.PrintStream;
 
 public final class EraseBrokerStorageAction implements TieredStorageTestAction 
{
 
     private final int brokerId;
+    private final FilenameFilter filenameFilter;
+    private final boolean isStopped;
 
     public EraseBrokerStorageAction(int brokerId) {
+        this(brokerId, (dir, name) -> true, false);
+    }
+
+    public EraseBrokerStorageAction(int brokerId,
+                                    FilenameFilter filenameFilter,
+                                    boolean isStopped) {
         this.brokerId = brokerId;
+        this.filenameFilter = filenameFilter;
+        this.isStopped = isStopped;
     }
 
     @Override
     public void doExecute(TieredStorageTestContext context) throws IOException 
{
-        context.eraseBrokerStorage(brokerId);
+        context.eraseBrokerStorage(brokerId, filenameFilter, isStopped);
     }
 
     @Override
     public void describe(PrintStream output) {
-        output.println("erase-broker-storage: " + brokerId);
+        output.println("erase-broker-storage: " + brokerId + ", isStopped: " + 
isStopped);

Review Comment:
   Should we include filenameFilter?



##########
storage/src/test/java/org/apache/kafka/tiered/storage/integration/FetchFromLeaderWithCorruptedCheckpointTest.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.integration;
+
+import kafka.log.LogManager;
+import kafka.server.ReplicaManager;
+import org.apache.kafka.storage.internals.checkpoint.CleanShutdownFileHandler;
+import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
+import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
+import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
+public class FetchFromLeaderWithCorruptedCheckpointTest extends 
TieredStorageTestHarness {
+
+    @Override
+    public int brokerCount() {
+        return 2;
+    }
+
+    @Override
+    protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
+        final Integer broker0 = 0;
+        final Integer broker1 = 1;
+        final String topicA = "topicA";
+        final Integer p0 = 0;
+        final Integer partitionCount = 1;
+        final Integer replicationFactor = 2;
+        final Integer maxBatchCountPerSegment = 1;
+        final boolean enableRemoteLogStorage = true;
+        final Map<Integer, List<Integer>> assignment = mkMap(mkEntry(p0, 
Arrays.asList(broker0, broker1)));
+        final List<String> checkpointFiles = Arrays.asList(
+                ReplicaManager.HighWatermarkFilename(),
+                LogManager.RecoveryPointCheckpointFile(),
+                CleanShutdownFileHandler.CLEAN_SHUTDOWN_FILE_NAME);
+
+        builder
+                .createTopic(topicA, partitionCount, replicationFactor, 
maxBatchCountPerSegment, assignment,

Review Comment:
   Could we merge this line with the one above?



##########
core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala:
##########
@@ -164,18 +169,71 @@ class DelayedFetchTest {
     assertTrue(delayedFetch.tryComplete())
     assertTrue(delayedFetch.isCompleted)
     assertTrue(fetchResultOpt.isDefined)
+
+    val fetchResult = fetchResultOpt.get
+    assertEquals(Errors.NONE, fetchResult.error)
+  }
+
+  @ParameterizedTest(name = "testDelayedFetchWithInvalidHighWatermark 
endOffset={0}")
+  @ValueSource(longs = Array(0, 500))
+  def testDelayedFetchWithInvalidHighWatermark(endOffset: Long): Unit = {

Review Comment:
   testDelayedFetchWithInvalidHighWatermark => 
testDelayedFetchWithMessageOnlyHighWatermark?



##########
core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala:
##########
@@ -164,18 +169,71 @@ class DelayedFetchTest {
     assertTrue(delayedFetch.tryComplete())
     assertTrue(delayedFetch.isCompleted)
     assertTrue(fetchResultOpt.isDefined)
+
+    val fetchResult = fetchResultOpt.get
+    assertEquals(Errors.NONE, fetchResult.error)
+  }
+
+  @ParameterizedTest(name = "testDelayedFetchWithInvalidHighWatermark 
endOffset={0}")
+  @ValueSource(longs = Array(0, 500))
+  def testDelayedFetchWithInvalidHighWatermark(endOffset: Long): Unit = {
+    val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
+    val fetchOffset = 450L
+    val logStartOffset = 5L
+    val currentLeaderEpoch = Optional.of[Integer](10)
+    val replicaId = 1
+
+    val fetchStatus = FetchPartitionStatus(
+      startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
+      fetchInfo = new FetchRequest.PartitionData(topicIdPartition.topicId, 
fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch))
+    val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500)
+
+    var fetchResultOpt: Option[FetchPartitionData] = None
+    def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit 
= {
+      fetchResultOpt = Some(responses.head._2)
+    }
+
+    val delayedFetch = new DelayedFetch(
+      params = fetchParams,
+      fetchPartitionStatus = Seq(topicIdPartition -> fetchStatus),
+      replicaManager = replicaManager,
+      quota = replicaQuota,
+      responseCallback = callback
+    )
+
+    val partition: Partition = mock(classOf[Partition])
+    
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition)).thenReturn(partition)
+    // Note that the high-watermark does not contains the complete metadata
+    val endOffsetMetadata = new LogOffsetMetadata(endOffset, -1L, -1)
+    when(partition.fetchOffsetSnapshot(
+      currentLeaderEpoch,
+      fetchOnlyFromLeader = true))
+      .thenReturn(new LogOffsetSnapshot(0L, endOffsetMetadata, 
endOffsetMetadata, endOffsetMetadata))
+    when(replicaManager.isAddingReplica(any(), anyInt())).thenReturn(false)
+    expectReadFromReplica(fetchParams, topicIdPartition, 
fetchStatus.fetchInfo, Errors.NONE)
+
+    val expected = endOffset == 500
+    assertEquals(expected, delayedFetch.tryComplete())

Review Comment:
   When endOffset is 0, we treat it as the case that the broker has truncation. 
So, it should also complete successfully.



##########
core/src/main/scala/kafka/log/LocalLog.scala:
##########
@@ -370,11 +370,17 @@ class LocalLog(@volatile private var _dir: File,
         throw new OffsetOutOfRangeException(s"Received request for offset 
$startOffset for partition $topicPartition, " +
           s"but we only have log segments upto $endOffset.")
 
-      if (startOffset == maxOffsetMetadata.messageOffset)
+      if (startOffset == maxOffsetMetadata.messageOffset) {
         emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns)
-      else if (startOffset > maxOffsetMetadata.messageOffset)
+      } else if (startOffset > maxOffsetMetadata.messageOffset ||
+        maxOffsetMetadata.messageOffsetOnly() ||
+        maxOffsetMetadata.segmentBaseOffset < segmentOpt.get.baseOffset) {

Review Comment:
   For the condition `maxOffsetMetadata.segmentBaseOffset < 
segmentOpt.get().baseOffset()`, perhaps it's better to fold them under the 
following logic below. This is because we could iterate through multiple 
segments later.
   
   ```
             val maxPosition =
             // Use the max offset position if it is on this segment; 
otherwise, the segment size is the limit.
               if (maxOffsetMetadata.segmentBaseOffset == segment.baseOffset) 
maxOffsetMetadata.relativePositionInSegment
               else segment.size
   
   ```
   We can change the logic to sth like the following.
   
   ```
             val maxPosition: Option[Int]  =
               if (segment.baseOffset < maxOffsetMetadata.segmentBaseOffset)
                 Some(segment.size)
               else if (segment.baseOffset == 
maxOffsetMetadata.segmentBaseOffset && !maxOffsetMetadata.messageOffsetOnly()) 
                  Some(maxOffsetMetadata.relativePositionInSegment)
               else 
                  None
   ```
   
   We can then change the logic in `segment.read()` to return empty if 
`maxPosition` is empty.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to