showuon commented on code in PR #14212:
URL: https://github.com/apache/kafka/pull/14212#discussion_r1302906694


##########
core/src/test/scala/unit/kafka/server/ReplicaFetcherTierStateMachineTest.scala:
##########
@@ -85,6 +85,73 @@ class ReplicaFetcherTierStateMachineTest {
     assertEquals(9L, replicaState.logEndOffset)
   }
 
+  /**
+   * This test verifies the following scenario:
+   * 1. Leader is archiving to tiered storage and has a follower.
+   * 2. Follower has caught up to offset X (exclusive).
+   * 3. While follower is offline, leader moves X to tiered storage and 
expires data locally till Y, such that,
+   *    `Y = leaderLocalLogStartOffset` and `leaderLocalLogStartOffset > X`. 
Meanwhile, X has been expired from
+   *    tiered storage as well. Hence, `X < globalLogStartOffset`.
+   * 4. Follower comes online and tries to fetch X from leader.
+   */
+  @Test
+  def testFollowerFetchOffsetOutOfRangeWithTieredStore(): Unit = {
+    val partition = new TopicPartition("topic", 0)
+
+    val replicaLog = Seq(
+      mkBatch(baseOffset = 0, leaderEpoch = 0, new SimpleRecord("a".getBytes)),
+      mkBatch(baseOffset = 1, leaderEpoch = 2, new SimpleRecord("b".getBytes)),
+      mkBatch(baseOffset = 2, leaderEpoch = 4, new SimpleRecord("c".getBytes)))
+
+    val replicaState = PartitionState(replicaLog, leaderEpoch = 7, 
highWatermark = 0L, rlmEnabled = true)
+
+    val mockLeaderEndpoint = new MockLeaderEndPoint
+    val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint)
+    val fetcher = new MockFetcherThread(mockLeaderEndpoint, 
mockTierStateMachine)
+
+    fetcher.setReplicaState(partition, replicaState)
+    fetcher.addPartitions(Map(partition -> 
initialFetchState(topicIds.get(partition.topic), 3L, leaderEpoch = 7)))
+
+    val leaderLog = Seq(
+      mkBatch(baseOffset = 7, leaderEpoch = 7, new SimpleRecord("h".getBytes)),
+      mkBatch(baseOffset = 8, leaderEpoch = 7, new SimpleRecord("i".getBytes)),
+      mkBatch(baseOffset = 9, leaderEpoch = 7, new SimpleRecord("j".getBytes)),
+      mkBatch(baseOffset = 10, leaderEpoch = 7, new 
SimpleRecord("k".getBytes)))
+
+    val leaderState = PartitionState(leaderLog, leaderEpoch = 7, highWatermark 
= 10L, rlmEnabled = true)
+    // Overriding the log start offset to 5 for mocking the scenario of 
segments 5-6 moved to remote store and
+    // segments 0-4 expired.
+    leaderState.logStartOffset = 5
+    fetcher.mockLeader.setLeaderState(partition, leaderState)
+    
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+    assertEquals(3L, replicaState.logEndOffset)
+    val expectedState = if (truncateOnFetch) Option(Fetching) else 
Option(Truncating)
+    assertEquals(expectedState, fetcher.fetchState(partition).map(_.state))
+
+    fetcher.doWork()
+    // Verify that the out of range error is triggered and the fetch offset is 
reset to the global log start offset.
+    assertEquals(0L, replicaState.logStartOffset)
+    assertEquals(5L, replicaState.localLogStartOffset)
+    assertEquals(5L, replicaState.highWatermark)
+    assertEquals(5L, replicaState.logEndOffset)
+
+    fetcher.doWork()
+    // Verify that the offset moved to tiered store error is triggered and 
respective states are truncated to expected.

Review Comment:
   nit: truncated to expected [position].



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to