showuon commented on code in PR #13999:
URL: https://github.com/apache/kafka/pull/13999#discussion_r1268886912


##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -3379,6 +3396,246 @@ class ReplicaManagerTest {
     testStopReplicaWithExistingPartition(LeaderAndIsr.NoEpoch, true, false, 
Errors.NONE)
   }
 
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(booleans = Array(true, false))
+  def testOffsetOutOfRangeExceptionWhenReadFromLog(isFromFollower: Boolean): 
Unit = {
+    val replicaId = if (isFromFollower) 1 else -1
+    val tp0 = new TopicPartition(topic, 0)
+    val tidp0 = new TopicIdPartition(topicId, tp0)
+    // create a replicaManager with remoteLog enabled
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, 
shouldMockLog = true)
+    try {
+      val offsetCheckpoints = new 
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
+      replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, 
isFutureReplica = false, offsetCheckpoints, None)
+      val partition0Replicas = Seq[Integer](0, 1).asJava
+      val topicIds = Map(tp0.topic -> topicId).asJava
+      val leaderEpoch = 0
+      val leaderAndIsrRequest = new 
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, 
brokerEpoch,
+        Seq(
+          new LeaderAndIsrPartitionState()
+            .setTopicName(tp0.topic)
+            .setPartitionIndex(tp0.partition)
+            .setControllerEpoch(0)
+            .setLeader(leaderEpoch)
+            .setLeaderEpoch(0)
+            .setIsr(partition0Replicas)
+            .setPartitionEpoch(0)
+            .setReplicas(partition0Replicas)
+            .setIsNew(true)
+        ).asJava,
+        topicIds,
+        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
+      replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => 
())
+
+      val params = new FetchParams(ApiKeys.FETCH.latestVersion, replicaId, 1, 
1000, 0, 100, FetchIsolation.LOG_END, None.asJava)
+      // when reading log, it'll throw OffsetOutOfRangeException, which will 
be handled separately
+      val result = replicaManager.readFromLog(params, Seq(tidp0 -> new 
PartitionData(topicId, 1, 0, 100000, Optional.of[Integer](leaderEpoch), 
Optional.of[Integer](leaderEpoch))), UnboundedQuota, false)
+
+      if (isFromFollower) {
+        // expect OFFSET_MOVED_TO_TIERED_STORAGE error returned if it's from 
follower, since the data is already available in remote log
+        assertEquals(Errors.OFFSET_MOVED_TO_TIERED_STORAGE, 
result.head._2.error)
+      } else {
+        assertEquals(Errors.NONE, result.head._2.error)
+      }
+      assertEquals(startOffset, result.head._2.leaderLogStartOffset)
+      assertEquals(endOffset, result.head._2.leaderLogEndOffset)
+      assertEquals(highHW, result.head._2.highWatermark)
+      if (isFromFollower) {
+        assertFalse(result.head._2.info.delayedRemoteStorageFetch.isPresent)
+      } else {
+        // for consumer fetch, we should return a delayedRemoteStorageFetch to 
wait for remote fetch
+        assertTrue(result.head._2.info.delayedRemoteStorageFetch.isPresent)
+      }
+    } finally {
+      replicaManager.shutdown(checkpointHW = false)
+    }
+  }
+
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(booleans = Array(true, false))
+  def testOffsetOutOfRangeExceptionWhenFetchMessages(isFromFollower: Boolean): 
Unit = {
+    val replicaId = if (isFromFollower) 1 else -1
+    val tp0 = new TopicPartition(topic, 0)
+    val tidp0 = new TopicIdPartition(topicId, tp0)
+    // create a replicaManager with remoteLog enabled
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, 
shouldMockLog= true)
+    try {
+      val offsetCheckpoints = new 
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
+      replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, 
isFutureReplica = false, offsetCheckpoints, None)
+      val partition0Replicas = Seq[Integer](0, 1).asJava
+      val topicIds = Map(tp0.topic -> topicId).asJava
+      val leaderEpoch = 0
+      val leaderAndIsrRequest = new 
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, 
brokerEpoch,
+        Seq(
+          new LeaderAndIsrPartitionState()
+            .setTopicName(tp0.topic)
+            .setPartitionIndex(tp0.partition)
+            .setControllerEpoch(0)
+            .setLeader(leaderEpoch)
+            .setLeaderEpoch(0)
+            .setIsr(partition0Replicas)
+            .setPartitionEpoch(0)
+            .setReplicas(partition0Replicas)
+            .setIsNew(true)
+        ).asJava,
+        topicIds,
+        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
+      replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => 
())
+
+      val params = new FetchParams(ApiKeys.FETCH.latestVersion, replicaId, 1, 
1000, 10, 100, FetchIsolation.LOG_END, None.asJava)
+      val fetchOffset = 1
+
+      def fetchCallback(responseStatus: Seq[(TopicIdPartition, 
FetchPartitionData)]): Unit = {
+        assertEquals(1, responseStatus.size)
+        assertEquals(tidp0, responseStatus.toMap.keySet.head)
+        val fetchPartitionData: FetchPartitionData = 
responseStatus.toMap.get(tidp0).get
+        // should only follower fetch enter callback since consumer fetch will 
enter remoteFetch purgatory
+        assertTrue(isFromFollower)
+        assertEquals(Errors.OFFSET_MOVED_TO_TIERED_STORAGE, 
fetchPartitionData.error)
+        assertEquals(startOffset, fetchPartitionData.logStartOffset)
+        assertEquals(highHW, fetchPartitionData.highWatermark)
+      }
+
+      // when reading log, it'll throw OffsetOutOfRangeException, which will 
be handled separately
+      replicaManager.fetchMessages(params, Seq(tidp0 -> new 
PartitionData(topicId, fetchOffset, 0, 100000, 
Optional.of[Integer](leaderEpoch), Optional.of[Integer](leaderEpoch))), 
UnboundedQuota, fetchCallback)
+
+      val remoteStorageFetchInfoArg: ArgumentCaptor[RemoteStorageFetchInfo] = 
ArgumentCaptor.forClass(classOf[RemoteStorageFetchInfo])
+      if (isFromFollower) {
+        verify(mockRemoteLogManager, 
never()).asyncRead(remoteStorageFetchInfoArg.capture(), any())
+      } else {
+        
verify(mockRemoteLogManager).asyncRead(remoteStorageFetchInfoArg.capture(), 
any())
+        val remoteStorageFetchInfo = remoteStorageFetchInfoArg.getValue
+        assertEquals(tp0, remoteStorageFetchInfo.topicPartition)
+        assertEquals(fetchOffset, remoteStorageFetchInfo.fetchInfo.fetchOffset)
+        assertEquals(topicId, remoteStorageFetchInfo.fetchInfo.topicId)
+        assertEquals(startOffset, 
remoteStorageFetchInfo.fetchInfo.logStartOffset)
+        assertEquals(leaderEpoch, 
remoteStorageFetchInfo.fetchInfo.currentLeaderEpoch.get())
+      }
+    } finally {
+      replicaManager.shutdown(checkpointHW = false)
+    }
+  }
+
+  @Test
+  def testRemoteLogReaderMetrics(): Unit = {
+    val replicaId = -1
+    val tp0 = new TopicPartition(topic, 0)
+    val tidp0 = new TopicIdPartition(topicId, tp0)
+
+    val props = new Properties()
+    props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, 
true)
+    props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, 
classOf[NoOpRemoteStorageManager].getName)
+    
props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, 
classOf[NoOpRemoteLogMetadataManager].getName)
+    // set log reader threads number to 2
+    props.put(RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP, 2)
+    val config = new AbstractConfig(RemoteLogManagerConfig.CONFIG_DEF, props)
+    val remoteLogManagerConfig = new RemoteLogManagerConfig(config)
+    val mockLog = mock(classOf[UnifiedLog])
+    val brokerTopicStats = new BrokerTopicStats
+    val remoteLogManager = new RemoteLogManager(
+      remoteLogManagerConfig,
+      0,
+      TestUtils.tempRelativeDir("data").getAbsolutePath,
+      "clusterId",
+      time,
+      _ => Optional.of(mockLog),
+      brokerTopicStats)
+    val spyRLM = spy(remoteLogManager)
+
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, 
shouldMockLog = true, remoteLogManager = Some(spyRLM))
+    try {
+      val offsetCheckpoints = new 
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
+      replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, 
isFutureReplica = false, offsetCheckpoints, None)
+      val partition0Replicas = Seq[Integer](0, 1).asJava
+      val topicIds = Map(tp0.topic -> topicId).asJava
+      val leaderEpoch = 0
+      val leaderAndIsrRequest = new 
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, 
brokerEpoch,
+        Seq(
+          new LeaderAndIsrPartitionState()
+            .setTopicName(tp0.topic)
+            .setPartitionIndex(tp0.partition)
+            .setControllerEpoch(0)
+            .setLeader(leaderEpoch)
+            .setLeaderEpoch(0)
+            .setIsr(partition0Replicas)
+            .setPartitionEpoch(0)
+            .setReplicas(partition0Replicas)
+            .setIsNew(true)
+        ).asJava,
+        topicIds,
+        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
+      replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => 
())
+
+      val params = new FetchParams(ApiKeys.FETCH.latestVersion, replicaId, 1, 
1000, 10, 100, FetchIsolation.LOG_END, None.asJava)
+      val fetchOffset = 1
+
+      def fetchCallback(responseStatus: Seq[(TopicIdPartition, 
FetchPartitionData)]): Unit = {
+        assertEquals(1, responseStatus.size)
+        assertEquals(tidp0, responseStatus.toMap.keySet.head)
+      }
+
+      assertEquals(1.0, 
yammerMetricValue("RemoteLogReaderAvgIdlePercent").asInstanceOf[Double])
+      assertEquals(0, 
yammerMetricValue("RemoteLogReaderTaskQueueSize").asInstanceOf[Int])
+
+      // our thread number is 2
+      val queueLatch = new CountDownLatch(2)
+      val doneLatch = new CountDownLatch(1)
+
+      doAnswer(_ => {
+        queueLatch.countDown()
+        // wait until verification completed
+        doneLatch.await()
+        new FetchDataInfo(new LogOffsetMetadata(startOffset), 
mock(classOf[Records]))
+      }).when(spyRLM).read(any())
+
+      // create 5 asyncRead tasks, which should enqueue 3 task
+      for (i <- 1 to 5)
+        replicaManager.fetchMessages(params, Seq(tidp0 -> new 
PartitionData(topicId, fetchOffset, 0, 100000, 
Optional.of[Integer](leaderEpoch), Optional.of[Integer](leaderEpoch))), 
UnboundedQuota, fetchCallback)
+
+      // wait until at least 2 task submitted to use all the available threads
+      queueLatch.await()
+      // make sure tasks are all submitted
+      Thread.sleep(100)
+      // RemoteLogReader should not be all idle
+      
assertTrue(yammerMetricValue("RemoteLogReaderAvgIdlePercent").asInstanceOf[Double]
 < 1.0)
+      // RemoteLogReader should queue some tasks
+      
assertTrue(yammerMetricValue("RemoteLogReaderTaskQueueSize").asInstanceOf[Int] 
> 0)

Review Comment:
   Good question, @clolov !
   For `RemoteLogReaderAvgIdlePercent`, it should be `0.0` since all threads 
are blocked by the latch. 
   For `RemoteLogReaderTaskQueueSize`, yes, it should be 3.
   
   I verified them as a conservative way to tolerate some uncertainties. But 
maybe I can assert the 2nd one to be 3. If it becomes flaky, we can relieve the 
assertion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to