Nickstery commented on code in PR #14104: URL: https://github.com/apache/kafka/pull/14104#discussion_r1303966365
########## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ########## @@ -558,6 +614,134 @@ void testCopyLogSegmentsToRemoteShouldNotCopySegmentForFollower() throws Excepti verify(mockLog, never()).updateHighestOffsetInRemoteStorage(anyLong()); } + @Test + void testCopyLogSegmentsToRemoteShouldNotCopySegmentWithMissingIndexes() throws Exception { + long segmentStartOffset = 0L; + + // leader epoch preparation + checkpoint.write(totalEpochEntries); + LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint); + when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(0L)); + + // create log segment, with 0 as log start offset + LogSegment segment = mock(LogSegment.class); + + // Segment does not nage timeIndex() what is not acceptable to sanityChecks. + // In that case segment won't be copied. + when(segment.baseOffset()).thenReturn(segmentStartOffset); + + when(mockLog.activeSegment()).thenReturn(segment); + when(mockLog.logStartOffset()).thenReturn(segmentStartOffset); + when(mockLog.logSegments(anyLong(), anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(segment))); + when(mockLog.lastStableOffset()).thenReturn(150L); + + RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition); + task.copyLogSegmentsToRemote(mockLog); + + // verify the remoteLogMetadataManager never add any metadata and remoteStorageManager never copy log segments + // Since segment with index corruption should not be uploaded + verify(remoteLogMetadataManager, never()).addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class)); + verify(remoteStorageManager, never()).copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class)); + verify(remoteLogMetadataManager, never()).updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class)); + } + + + @Test + void testCorruptedTimeIndexes() throws Exception { + // copyLogSegment is executed in case we have more than 1 segment, what is why we create 2 of them + long oldSegmentStartOffset = 0L; + long nextSegmentStartOffset = 150L; + + when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition()); + + // leader epoch preparation + checkpoint.write(totalEpochEntries); + LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint); + when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(0L)); + + File tempFile = TestUtils.tempFile(); + File mockProducerSnapshotIndex = TestUtils.tempFile(); + File tempDir = TestUtils.tempDirectory(); + // create 2 log segments, with 0 and 150 as log start offset + LogSegment oldSegment = mock(LogSegment.class); + LogSegment activeSegment = mock(LogSegment.class); + + when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset); + when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset); + + // Mock all required segment data to be managed by RLMTask + FileRecords fileRecords = mock(FileRecords.class); + when(oldSegment.log()).thenReturn(fileRecords); + when(fileRecords.file()).thenReturn(tempFile); + when(fileRecords.sizeInBytes()).thenReturn(10); + when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset); + + File txnFile1 = UnifiedLog.transactionIndexFile(tempDir, oldSegmentStartOffset, ""); + txnFile1.createNewFile(); + + File timeindexFile = TestUtils.tempFile(); + + TimeIndex ti = spy(new TimeIndex(timeindexFile, 45L, 12)); + when(ti.entries()).thenReturn(1); + // One of the checks to detect index being corrupted is checking that: + // offset < baseOffset, BaseOffset is 45, offset => 0 + when(ti.lastEntry()).thenReturn(new TimestampOffset(0L, 0L)); Review Comment: Will cover where it is possible -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org