showuon commented on code in PR #14113:
URL: https://github.com/apache/kafka/pull/14113#discussion_r1312968348


##########
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##########
@@ -754,6 +755,62 @@ void 
testCopyLogSegmentsToRemoteShouldNotCopySegmentForFollower() throws Excepti
         verify(mockLog, never()).updateHighestOffsetInRemoteStorage(anyLong());
     }
 
+    @Test
+    void 
testRLMTaskDoesNotUploadSegmentsWhenRemoteLogMetadataManagerIsNotInitialized() 
throws Exception {
+        long oldSegmentStartOffset = 0L;
+        long nextSegmentStartOffset = 150L;
+
+        
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+
+        // leader epoch preparation
+        checkpoint.write(totalEpochEntries);
+        LeaderEpochFileCache cache = new 
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+        when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
+        // Throw a retryable exception so indicate that the remote log 
metadata manager is not initialized yet
+        
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
 anyInt()))
+            .thenThrow(new ReplicaNotAvailableException("Remote log metadata 
cache is not initialized for partition: " + leaderTopicIdPartition));
+
+        // create 2 log segments, with 0 and 150 as log start offset
+        LogSegment oldSegment = mock(LogSegment.class);
+        LogSegment activeSegment = mock(LogSegment.class);
+
+        when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset);
+        when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset);
+
+        when(mockLog.activeSegment()).thenReturn(activeSegment);
+        when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset);
+        when(mockLog.logSegments(anyLong(), 
anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment,
 activeSegment)));
+        when(mockLog.lastStableOffset()).thenReturn(250L);
+
+        // Ensure the metrics for remote write requests/failures is zero 
before attempt to copy log segment
+        assertEquals(0, 
brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyRequestRate().count());
+        assertEquals(0, 
brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).failedRemoteCopyRequestRate().count());
+        // Ensure aggregate metrics
+        assertEquals(0, 
brokerTopicStats.allTopicsStats().remoteCopyRequestRate().count());
+        assertEquals(0, 
brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().count());
+
+        RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMTask(leaderTopicIdPartition, 128);
+        task.convertToLeader(0);
+
+        RemoteLogManager.RLMTask spyTask = spy(task);
+        spyTask.run();
+
+        verify(spyTask, times(1)).copyLogSegmentsToRemote(mockLog);

Review Comment:
   @abhijeetk88 , although we added this line to verify it is called, it will 
make this test unstable since sometimes it'll fail here because the RLMTask 
haven't invoked `copyLogSegmentsToRemote`, right? You can either use 
`waitUntil`, or call `RLMTask#copyLogSegmentsToRemote` directly to fix that. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to