Nickstery commented on code in PR #14104:
URL: https://github.com/apache/kafka/pull/14104#discussion_r1303966365


##########
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##########
@@ -558,6 +614,134 @@ void 
testCopyLogSegmentsToRemoteShouldNotCopySegmentForFollower() throws Excepti
         verify(mockLog, never()).updateHighestOffsetInRemoteStorage(anyLong());
     }
 
+    @Test
+    void testCopyLogSegmentsToRemoteShouldNotCopySegmentWithMissingIndexes() 
throws Exception {
+        long segmentStartOffset = 0L;
+
+        // leader epoch preparation
+        checkpoint.write(totalEpochEntries);
+        LeaderEpochFileCache cache = new 
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+        when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+        
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
 anyInt())).thenReturn(Optional.of(0L));
+
+        // create log segment, with 0 as log start offset
+        LogSegment segment = mock(LogSegment.class);
+
+        // Segment does not nage timeIndex() what is not acceptable to 
sanityChecks.
+        // In that case segment won't be copied.
+        when(segment.baseOffset()).thenReturn(segmentStartOffset);
+
+        when(mockLog.activeSegment()).thenReturn(segment);
+        when(mockLog.logStartOffset()).thenReturn(segmentStartOffset);
+        when(mockLog.logSegments(anyLong(), 
anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(segment)));
+        when(mockLog.lastStableOffset()).thenReturn(150L);
+
+        RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMTask(leaderTopicIdPartition);
+        task.copyLogSegmentsToRemote(mockLog);
+
+        // verify the remoteLogMetadataManager never add any metadata and 
remoteStorageManager never copy log segments
+        // Since segment with index corruption should not be uploaded
+        verify(remoteLogMetadataManager, 
never()).addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class));
+        verify(remoteStorageManager, 
never()).copyLogSegmentData(any(RemoteLogSegmentMetadata.class), 
any(LogSegmentData.class));
+        verify(remoteLogMetadataManager, 
never()).updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class));
+    }
+
+
+    @Test
+    void testCorruptedTimeIndexes() throws Exception {
+        // copyLogSegment is executed in case we have more than 1 segment, 
what is why we create 2 of them
+        long oldSegmentStartOffset = 0L;
+        long nextSegmentStartOffset = 150L;
+
+        
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+
+        // leader epoch preparation
+        checkpoint.write(totalEpochEntries);
+        LeaderEpochFileCache cache = new 
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+        when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+        
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
 anyInt())).thenReturn(Optional.of(0L));
+
+        File tempFile = TestUtils.tempFile();
+        File mockProducerSnapshotIndex = TestUtils.tempFile();
+        File tempDir = TestUtils.tempDirectory();
+        // create 2 log segments, with 0 and 150 as log start offset
+        LogSegment oldSegment = mock(LogSegment.class);
+        LogSegment activeSegment = mock(LogSegment.class);
+
+        when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset);
+        when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset);
+
+        // Mock all required segment data to be managed by RLMTask
+        FileRecords fileRecords = mock(FileRecords.class);
+        when(oldSegment.log()).thenReturn(fileRecords);
+        when(fileRecords.file()).thenReturn(tempFile);
+        when(fileRecords.sizeInBytes()).thenReturn(10);
+        when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset);
+
+        File txnFile1 = UnifiedLog.transactionIndexFile(tempDir, 
oldSegmentStartOffset, "");
+        txnFile1.createNewFile();
+
+        File timeindexFile = TestUtils.tempFile();
+
+        TimeIndex ti = spy(new TimeIndex(timeindexFile, 45L, 12));
+        when(ti.entries()).thenReturn(1);
+        // One of the checks to detect index being corrupted is checking that:
+        // offset < baseOffset, BaseOffset is 45, offset => 0
+        when(ti.lastEntry()).thenReturn(new TimestampOffset(0L, 0L));

Review Comment:
   Will cover where it is possible



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to