showuon commented on code in PR #14113: URL: https://github.com/apache/kafka/pull/14113#discussion_r1312804527
########## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ########## @@ -754,6 +755,59 @@ void testCopyLogSegmentsToRemoteShouldNotCopySegmentForFollower() throws Excepti verify(mockLog, never()).updateHighestOffsetInRemoteStorage(anyLong()); } + @Test + void testRLMTaskDoesNotUploadSegmentsWhenRemoteLogMetadataManagerIsNotInitialized() throws Exception { + long oldSegmentStartOffset = 0L; + long nextSegmentStartOffset = 150L; + + when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition()); + + // leader epoch preparation + checkpoint.write(totalEpochEntries); + LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint); + when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + + // Throw a retryable exception so indicate that the remote log metadata manager is not initialized yet + when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())) + .thenThrow(new ReplicaNotAvailableException("Remote log metadata cache is not initialized for partition: " + leaderTopicIdPartition)); + + // create 2 log segments, with 0 and 150 as log start offset + LogSegment oldSegment = mock(LogSegment.class); + LogSegment activeSegment = mock(LogSegment.class); + + when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset); + when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset); + + when(mockLog.activeSegment()).thenReturn(activeSegment); + when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset); + when(mockLog.logSegments(anyLong(), anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment, activeSegment))); + when(mockLog.lastStableOffset()).thenReturn(250L); + + // Ensure the metrics for remote write requests/failures is zero before attempt to copy log segment + assertEquals(0, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyRequestRate().count()); + assertEquals(0, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).failedRemoteCopyRequestRate().count()); + // Ensure aggregate metrics + assertEquals(0, brokerTopicStats.allTopicsStats().remoteCopyRequestRate().count()); + assertEquals(0, brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().count()); + + RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128); + task.convertToLeader(0); + task.run(); + + // verify the remoteLogMetadataManager never add any metadata and remoteStorageManager never copy log segments + verify(remoteLogMetadataManager, never()).addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class)); + verify(remoteStorageManager, never()).copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class)); Review Comment: We can't assure if the RLMTask thread is already run once or not when verifying these things under current testing. Could we just invoke `task.copyLogSegmentsToRemote` and verify it throws exception we expected. And then, we verify the following things. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org