nikramakrishnan commented on code in PR #14212:
URL: https://github.com/apache/kafka/pull/14212#discussion_r1295970964


##########
core/src/test/scala/unit/kafka/server/ReplicaFetcherTierStateMachineTest.scala:
##########
@@ -85,6 +85,64 @@ class ReplicaFetcherTierStateMachineTest {
     assertEquals(9L, replicaState.logEndOffset)
   }
 
+  @Test
+  def testFollowerFetchMovedToAndDeletedFromTieredStore(): Unit = {
+    val partition = new TopicPartition("topic", 0)
+
+    val replicaLog = Seq(
+      mkBatch(baseOffset = 0, leaderEpoch = 0, new SimpleRecord("a".getBytes)),
+      mkBatch(baseOffset = 1, leaderEpoch = 2, new SimpleRecord("b".getBytes)),
+      mkBatch(baseOffset = 2, leaderEpoch = 4, new SimpleRecord("c".getBytes)))
+
+    val replicaState = PartitionState(replicaLog, leaderEpoch = 7, 
highWatermark = 0L, rlmEnabled = true)
+
+    val mockLeaderEndpoint = new MockLeaderEndPoint
+    val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint)
+    val fetcher = new MockFetcherThread(mockLeaderEndpoint, 
mockTierStateMachine)
+
+    fetcher.setReplicaState(partition, replicaState)
+    fetcher.addPartitions(Map(partition -> 
initialFetchState(topicIds.get(partition.topic), 3L, leaderEpoch = 7)))
+
+    val leaderLog = Seq(
+      mkBatch(baseOffset = 7, leaderEpoch = 7, new SimpleRecord("h".getBytes)),
+      mkBatch(baseOffset = 8, leaderEpoch = 7, new SimpleRecord("i".getBytes)),
+      mkBatch(baseOffset = 9, leaderEpoch = 7, new SimpleRecord("j".getBytes)),
+      mkBatch(baseOffset = 10, leaderEpoch = 7, new 
SimpleRecord("k".getBytes)))
+
+    val leaderState = PartitionState(leaderLog, leaderEpoch = 7, highWatermark 
= 10L, rlmEnabled = true)
+    // Overriding the log start offset to 5 for mocking the scenario of 
segments 5-6 moved to remote store and
+    // segments 0-4 expired.
+    leaderState.logStartOffset = 5
+    fetcher.mockLeader.setLeaderState(partition, leaderState)
+    
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+    assertEquals(3L, replicaState.logEndOffset)
+    val expectedState = if (truncateOnFetch) Option(Fetching) else 
Option(Truncating)
+    assertEquals(expectedState, fetcher.fetchState(partition).map(_.state))
+
+    fetcher.doWork()
+    // Verify that the out of range error is triggered and the fetch offset is 
reset to the global log start offset.
+    assertEquals(0L, replicaState.logStartOffset)

Review Comment:
   1. The first fetch simply truncates and resets the fetch offsets (local 
start offset, high watermark and end offset) to 5.
   2. The second fetch builds the auxiliary state and advances the same offsets 
to 7.
   3. Actual data is replicated starting with the third fetch call, which also 
updates the log start offset to 5.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to