divijvaidya commented on code in PR #15472: URL: https://github.com/apache/kafka/pull/15472#discussion_r1514414426
########## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ########## @@ -202,6 +203,9 @@ public List<EpochEntry> read() { private final UnifiedLog mockLog = mock(UnifiedLog.class); + Integer maxEntries = 30; Review Comment: `private final static` please Also, for constants we usually capital snake syntax such as MAX_ENTRIES (same for base offset) ########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -722,6 +727,13 @@ private void copyLogSegment(UnifiedLog log, LogSegment segment, long nextSegment File logFile = segment.log().file(); String logFileName = logFile.getName(); + // Corrupted indexes should not be uploaded to remote storage + // Example case: Local storage was filled, what caused index corruption + // We should avoid uploading such segments + segment.timeIndex().sanityCheck(); + segment.offsetIndex().sanityCheck(); + segment.txnIndex().sanityCheck(); + logger.info("Copying {} to remote storage.", logFileName); Review Comment: This should probably be moved before the sanity checks so that during debugging it is easy to understand what segment was being uploaded when index failed. ########## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ########## @@ -2482,6 +2486,134 @@ int lookupPositionForOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata, l } } + @Test + void testCopyLogSegmentsToRemoteShouldNotCopySegmentWithMissingIndexes() throws Exception { + long segmentStartOffset = 0L; + + // leader epoch preparation + checkpoint.write(totalEpochEntries); + LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint); + when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(0L)); + + // create log segment, with 0 as log start offset + LogSegment segment = mock(LogSegment.class); + + // Segment does not nage timeIndex() what is not acceptable to sanityChecks. + // In that case segment won't be copied. + when(segment.baseOffset()).thenReturn(segmentStartOffset); + + when(mockLog.activeSegment()).thenReturn(segment); + when(mockLog.logStartOffset()).thenReturn(segmentStartOffset); + when(mockLog.logSegments(anyLong(), anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(segment))); + when(mockLog.lastStableOffset()).thenReturn(150L); + + RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128); + task.copyLogSegmentsToRemote(mockLog); + + // verify the remoteLogMetadataManager never add any metadata and remoteStorageManager never copy log segments + // Since segment with index corruption should not be uploaded + verify(remoteLogMetadataManager, never()).addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class)); + verify(remoteStorageManager, never()).copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class)); + verify(remoteLogMetadataManager, never()).updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class)); + } + + + @Test + void testCorruptedTimeIndexes() throws Exception { + // copyLogSegment is executed in case we have more than 1 segment, what is why we create 2 of them + long oldSegmentStartOffset = 0L; + long nextSegmentStartOffset = 150L; + + when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition()); + + // leader epoch preparation + checkpoint.write(totalEpochEntries); + LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint); + when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(0L)); + + File tempFile = TestUtils.tempFile(); + File mockProducerSnapshotIndex = TestUtils.tempFile(); + File tempDir = TestUtils.tempDirectory(); + // create 2 log segments, with 0 and 150 as log start offset + LogSegment oldSegment = mock(LogSegment.class); + LogSegment activeSegment = mock(LogSegment.class); + + when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset); + when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset); + + // Mock all required segment data to be managed by RLMTask + FileRecords fileRecords = mock(FileRecords.class); + when(oldSegment.log()).thenReturn(fileRecords); + when(fileRecords.file()).thenReturn(tempFile); + when(fileRecords.sizeInBytes()).thenReturn(10); + when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset); + + File txnFile1 = UnifiedLog.transactionIndexFile(tempDir, oldSegmentStartOffset, ""); + txnFile1.createNewFile(); + + File timeindexFile = TestUtils.tempFile(); + + TimeIndex ti = spy(new TimeIndex(timeindexFile, 45L, 12)); Review Comment: May I suggest to create the 4th param as false i.e. writable = false since we don't intend to write to this file. Similar for Offset index. Also please close them at the end of test else they will continue to be mmapped ########## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ########## @@ -2482,6 +2486,134 @@ int lookupPositionForOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata, l } } + @Test + void testCopyLogSegmentsToRemoteShouldNotCopySegmentWithMissingIndexes() throws Exception { + long segmentStartOffset = 0L; + + // leader epoch preparation + checkpoint.write(totalEpochEntries); + LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint); + when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(0L)); + + // create log segment, with 0 as log start offset + LogSegment segment = mock(LogSegment.class); + + // Segment does not nage timeIndex() what is not acceptable to sanityChecks. + // In that case segment won't be copied. + when(segment.baseOffset()).thenReturn(segmentStartOffset); + + when(mockLog.activeSegment()).thenReturn(segment); + when(mockLog.logStartOffset()).thenReturn(segmentStartOffset); + when(mockLog.logSegments(anyLong(), anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(segment))); + when(mockLog.lastStableOffset()).thenReturn(150L); + + RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128); + task.copyLogSegmentsToRemote(mockLog); + + // verify the remoteLogMetadataManager never add any metadata and remoteStorageManager never copy log segments + // Since segment with index corruption should not be uploaded + verify(remoteLogMetadataManager, never()).addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class)); + verify(remoteStorageManager, never()).copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class)); + verify(remoteLogMetadataManager, never()).updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class)); + } + + + @Test + void testCorruptedTimeIndexes() throws Exception { + // copyLogSegment is executed in case we have more than 1 segment, what is why we create 2 of them + long oldSegmentStartOffset = 0L; + long nextSegmentStartOffset = 150L; + + when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition()); + + // leader epoch preparation + checkpoint.write(totalEpochEntries); + LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint); + when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(0L)); + + File tempFile = TestUtils.tempFile(); + File mockProducerSnapshotIndex = TestUtils.tempFile(); + File tempDir = TestUtils.tempDirectory(); + // create 2 log segments, with 0 and 150 as log start offset + LogSegment oldSegment = mock(LogSegment.class); + LogSegment activeSegment = mock(LogSegment.class); + + when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset); + when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset); + + // Mock all required segment data to be managed by RLMTask + FileRecords fileRecords = mock(FileRecords.class); + when(oldSegment.log()).thenReturn(fileRecords); + when(fileRecords.file()).thenReturn(tempFile); + when(fileRecords.sizeInBytes()).thenReturn(10); + when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset); + + File txnFile1 = UnifiedLog.transactionIndexFile(tempDir, oldSegmentStartOffset, ""); + txnFile1.createNewFile(); + + File timeindexFile = TestUtils.tempFile(); + + TimeIndex ti = spy(new TimeIndex(timeindexFile, 45L, 12)); + when(ti.entries()).thenReturn(1); + // One of the checks to detect index being corrupted is checking that: + // offset < baseOffset, BaseOffset is 45, offset => 0 + when(ti.lastEntry()).thenReturn(new TimestampOffset(0L, 0L)); + + // Initialise timeIndex for oldSegment + when(oldSegment.timeIndex()).thenReturn(ti); + when(oldSegment.txnIndex()).thenReturn(new TransactionIndex(nextSegmentStartOffset, txnFile1)); + when(oldSegment.offsetIndex()).thenReturn(new OffsetIndex(TestUtils.tempFile(), + oldSegmentStartOffset, maxEntries * 8)); + + File txnFile2 = UnifiedLog.transactionIndexFile(tempDir, nextSegmentStartOffset, ""); + txnFile2.createNewFile(); + + when(activeSegment.timeIndex()).thenReturn(new TimeIndex(TestUtils.tempFile(), 45L, 1)); + when(activeSegment.log()).thenReturn(fileRecords); + when(activeSegment.txnIndex()).thenReturn(new TransactionIndex(nextSegmentStartOffset, txnFile2)); + when(activeSegment.offsetIndex()).thenReturn(new OffsetIndex(TestUtils.tempFile(), + nextSegmentStartOffset, maxEntries * 8)); + + when(mockLog.activeSegment()).thenReturn(activeSegment); + when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset); + when(mockLog.logSegments(anyLong(), anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment, activeSegment))); + + ProducerStateManager mockStateManager = mock(ProducerStateManager.class); + when(mockLog.producerStateManager()).thenReturn(mockStateManager); + when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex)); + when(mockLog.lastStableOffset()).thenReturn(250L); + + + OffsetIndex idx = LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDir, oldSegmentStartOffset, ""), oldSegmentStartOffset, 1000).get(); + TimeIndex timeIdx = LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDir, oldSegmentStartOffset, ""), oldSegmentStartOffset, 1500).get(); + File txnFile = UnifiedLog.transactionIndexFile(tempDir, oldSegmentStartOffset, ""); + txnFile.createNewFile(); + TransactionIndex txnIndex = new TransactionIndex(oldSegmentStartOffset, txnFile); + when(oldSegment.timeIndex()).thenReturn(timeIdx); + when(oldSegment.offsetIndex()).thenReturn(idx); + when(oldSegment.txnIndex()).thenReturn(txnIndex); + + CompletableFuture<Void> dummyFuture = new CompletableFuture<>(); + dummyFuture.complete(null); + when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture); + when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture); Review Comment: do we need these? These function will never be called. No? ########## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ########## @@ -2482,6 +2486,134 @@ int lookupPositionForOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata, l } } + @Test + void testCopyLogSegmentsToRemoteShouldNotCopySegmentWithMissingIndexes() throws Exception { + long segmentStartOffset = 0L; + + // leader epoch preparation + checkpoint.write(totalEpochEntries); + LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint); + when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(0L)); + + // create log segment, with 0 as log start offset + LogSegment segment = mock(LogSegment.class); + + // Segment does not nage timeIndex() what is not acceptable to sanityChecks. + // In that case segment won't be copied. + when(segment.baseOffset()).thenReturn(segmentStartOffset); + + when(mockLog.activeSegment()).thenReturn(segment); + when(mockLog.logStartOffset()).thenReturn(segmentStartOffset); + when(mockLog.logSegments(anyLong(), anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(segment))); + when(mockLog.lastStableOffset()).thenReturn(150L); + + RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128); + task.copyLogSegmentsToRemote(mockLog); + + // verify the remoteLogMetadataManager never add any metadata and remoteStorageManager never copy log segments + // Since segment with index corruption should not be uploaded + verify(remoteLogMetadataManager, never()).addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class)); + verify(remoteStorageManager, never()).copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class)); + verify(remoteLogMetadataManager, never()).updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class)); + } + + + @Test + void testCorruptedTimeIndexes() throws Exception { + // copyLogSegment is executed in case we have more than 1 segment, what is why we create 2 of them + long oldSegmentStartOffset = 0L; + long nextSegmentStartOffset = 150L; + + when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition()); + + // leader epoch preparation + checkpoint.write(totalEpochEntries); + LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint); + when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(0L)); + + File tempFile = TestUtils.tempFile(); + File mockProducerSnapshotIndex = TestUtils.tempFile(); + File tempDir = TestUtils.tempDirectory(); + // create 2 log segments, with 0 and 150 as log start offset + LogSegment oldSegment = mock(LogSegment.class); + LogSegment activeSegment = mock(LogSegment.class); + + when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset); + when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset); + + // Mock all required segment data to be managed by RLMTask + FileRecords fileRecords = mock(FileRecords.class); + when(oldSegment.log()).thenReturn(fileRecords); + when(fileRecords.file()).thenReturn(tempFile); + when(fileRecords.sizeInBytes()).thenReturn(10); + when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset); + + File txnFile1 = UnifiedLog.transactionIndexFile(tempDir, oldSegmentStartOffset, ""); + txnFile1.createNewFile(); + + File timeindexFile = TestUtils.tempFile(); + + TimeIndex ti = spy(new TimeIndex(timeindexFile, 45L, 12)); + when(ti.entries()).thenReturn(1); + // One of the checks to detect index being corrupted is checking that: + // offset < baseOffset, BaseOffset is 45, offset => 0 + when(ti.lastEntry()).thenReturn(new TimestampOffset(0L, 0L)); + + // Initialise timeIndex for oldSegment + when(oldSegment.timeIndex()).thenReturn(ti); + when(oldSegment.txnIndex()).thenReturn(new TransactionIndex(nextSegmentStartOffset, txnFile1)); + when(oldSegment.offsetIndex()).thenReturn(new OffsetIndex(TestUtils.tempFile(), + oldSegmentStartOffset, maxEntries * 8)); + + File txnFile2 = UnifiedLog.transactionIndexFile(tempDir, nextSegmentStartOffset, ""); + txnFile2.createNewFile(); + + when(activeSegment.timeIndex()).thenReturn(new TimeIndex(TestUtils.tempFile(), 45L, 1)); + when(activeSegment.log()).thenReturn(fileRecords); + when(activeSegment.txnIndex()).thenReturn(new TransactionIndex(nextSegmentStartOffset, txnFile2)); + when(activeSegment.offsetIndex()).thenReturn(new OffsetIndex(TestUtils.tempFile(), + nextSegmentStartOffset, maxEntries * 8)); + + when(mockLog.activeSegment()).thenReturn(activeSegment); + when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset); + when(mockLog.logSegments(anyLong(), anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment, activeSegment))); + + ProducerStateManager mockStateManager = mock(ProducerStateManager.class); + when(mockLog.producerStateManager()).thenReturn(mockStateManager); + when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex)); + when(mockLog.lastStableOffset()).thenReturn(250L); + Review Comment: do we need this? Our index validation should throw an error before we start working with producer state. ########## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ########## @@ -2482,6 +2486,134 @@ int lookupPositionForOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata, l } } + @Test + void testCopyLogSegmentsToRemoteShouldNotCopySegmentWithMissingIndexes() throws Exception { + long segmentStartOffset = 0L; + + // leader epoch preparation + checkpoint.write(totalEpochEntries); + LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint); + when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(0L)); + + // create log segment, with 0 as log start offset + LogSegment segment = mock(LogSegment.class); + + // Segment does not nage timeIndex() what is not acceptable to sanityChecks. + // In that case segment won't be copied. + when(segment.baseOffset()).thenReturn(segmentStartOffset); + + when(mockLog.activeSegment()).thenReturn(segment); + when(mockLog.logStartOffset()).thenReturn(segmentStartOffset); + when(mockLog.logSegments(anyLong(), anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(segment))); + when(mockLog.lastStableOffset()).thenReturn(150L); + + RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128); + task.copyLogSegmentsToRemote(mockLog); + + // verify the remoteLogMetadataManager never add any metadata and remoteStorageManager never copy log segments + // Since segment with index corruption should not be uploaded + verify(remoteLogMetadataManager, never()).addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class)); + verify(remoteStorageManager, never()).copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class)); + verify(remoteLogMetadataManager, never()).updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class)); + } + + + @Test + void testCorruptedTimeIndexes() throws Exception { + // copyLogSegment is executed in case we have more than 1 segment, what is why we create 2 of them + long oldSegmentStartOffset = 0L; + long nextSegmentStartOffset = 150L; + + when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition()); + + // leader epoch preparation + checkpoint.write(totalEpochEntries); + LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint); + when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(0L)); + + File tempFile = TestUtils.tempFile(); + File mockProducerSnapshotIndex = TestUtils.tempFile(); + File tempDir = TestUtils.tempDirectory(); + // create 2 log segments, with 0 and 150 as log start offset + LogSegment oldSegment = mock(LogSegment.class); + LogSegment activeSegment = mock(LogSegment.class); + + when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset); + when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset); + + // Mock all required segment data to be managed by RLMTask + FileRecords fileRecords = mock(FileRecords.class); + when(oldSegment.log()).thenReturn(fileRecords); + when(fileRecords.file()).thenReturn(tempFile); + when(fileRecords.sizeInBytes()).thenReturn(10); + when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset); + + File txnFile1 = UnifiedLog.transactionIndexFile(tempDir, oldSegmentStartOffset, ""); + txnFile1.createNewFile(); + + File timeindexFile = TestUtils.tempFile(); + + TimeIndex ti = spy(new TimeIndex(timeindexFile, 45L, 12)); + when(ti.entries()).thenReturn(1); + // One of the checks to detect index being corrupted is checking that: + // offset < baseOffset, BaseOffset is 45, offset => 0 + when(ti.lastEntry()).thenReturn(new TimestampOffset(0L, 0L)); + + // Initialise timeIndex for oldSegment + when(oldSegment.timeIndex()).thenReturn(ti); + when(oldSegment.txnIndex()).thenReturn(new TransactionIndex(nextSegmentStartOffset, txnFile1)); + when(oldSegment.offsetIndex()).thenReturn(new OffsetIndex(TestUtils.tempFile(), + oldSegmentStartOffset, maxEntries * 8)); + + File txnFile2 = UnifiedLog.transactionIndexFile(tempDir, nextSegmentStartOffset, ""); + txnFile2.createNewFile(); + + when(activeSegment.timeIndex()).thenReturn(new TimeIndex(TestUtils.tempFile(), 45L, 1)); + when(activeSegment.log()).thenReturn(fileRecords); + when(activeSegment.txnIndex()).thenReturn(new TransactionIndex(nextSegmentStartOffset, txnFile2)); + when(activeSegment.offsetIndex()).thenReturn(new OffsetIndex(TestUtils.tempFile(), + nextSegmentStartOffset, maxEntries * 8)); + + when(mockLog.activeSegment()).thenReturn(activeSegment); + when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset); + when(mockLog.logSegments(anyLong(), anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment, activeSegment))); + + ProducerStateManager mockStateManager = mock(ProducerStateManager.class); + when(mockLog.producerStateManager()).thenReturn(mockStateManager); + when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex)); + when(mockLog.lastStableOffset()).thenReturn(250L); + + + OffsetIndex idx = LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDir, oldSegmentStartOffset, ""), oldSegmentStartOffset, 1000).get(); + TimeIndex timeIdx = LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDir, oldSegmentStartOffset, ""), oldSegmentStartOffset, 1500).get(); + File txnFile = UnifiedLog.transactionIndexFile(tempDir, oldSegmentStartOffset, ""); + txnFile.createNewFile(); + TransactionIndex txnIndex = new TransactionIndex(oldSegmentStartOffset, txnFile); + when(oldSegment.timeIndex()).thenReturn(timeIdx); + when(oldSegment.offsetIndex()).thenReturn(idx); + when(oldSegment.txnIndex()).thenReturn(txnIndex); + + CompletableFuture<Void> dummyFuture = new CompletableFuture<>(); + dummyFuture.complete(null); + when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture); + when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture); + doNothing().when(remoteStorageManager).copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class)); + + RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128); + // Just simulate situation when we upload next segment from another leader + task.convertToLeader(2); + // Here CorruptIndex will be thrown and caught by copyLogSegmentsToRemote + task.copyLogSegmentsToRemote(mockLog); + + // verify the remoteLogMetadataManager never add any metadata and remoteStorageManager never copy log segments + // Since segment with index corruption should not be uploaded + verify(remoteLogMetadataManager, never()).addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class)); Review Comment: can you please verify that copyLogSegment() was called. Otherwise we will miss cases where the upload was skipped for some other reason ########## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ########## @@ -2482,6 +2486,134 @@ int lookupPositionForOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata, l } } + @Test + void testCopyLogSegmentsToRemoteShouldNotCopySegmentWithMissingIndexes() throws Exception { + long segmentStartOffset = 0L; + + // leader epoch preparation + checkpoint.write(totalEpochEntries); + LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint); + when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(0L)); + + // create log segment, with 0 as log start offset + LogSegment segment = mock(LogSegment.class); + + // Segment does not nage timeIndex() what is not acceptable to sanityChecks. + // In that case segment won't be copied. + when(segment.baseOffset()).thenReturn(segmentStartOffset); + + when(mockLog.activeSegment()).thenReturn(segment); + when(mockLog.logStartOffset()).thenReturn(segmentStartOffset); + when(mockLog.logSegments(anyLong(), anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(segment))); + when(mockLog.lastStableOffset()).thenReturn(150L); + + RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128); + task.copyLogSegmentsToRemote(mockLog); + + // verify the remoteLogMetadataManager never add any metadata and remoteStorageManager never copy log segments + // Since segment with index corruption should not be uploaded + verify(remoteLogMetadataManager, never()).addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class)); + verify(remoteStorageManager, never()).copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class)); + verify(remoteLogMetadataManager, never()).updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class)); + } + + + @Test + void testCorruptedTimeIndexes() throws Exception { + // copyLogSegment is executed in case we have more than 1 segment, what is why we create 2 of them + long oldSegmentStartOffset = 0L; + long nextSegmentStartOffset = 150L; + + when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition()); + + // leader epoch preparation + checkpoint.write(totalEpochEntries); + LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint); + when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(0L)); + + File tempFile = TestUtils.tempFile(); + File mockProducerSnapshotIndex = TestUtils.tempFile(); + File tempDir = TestUtils.tempDirectory(); + // create 2 log segments, with 0 and 150 as log start offset + LogSegment oldSegment = mock(LogSegment.class); + LogSegment activeSegment = mock(LogSegment.class); + + when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset); + when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset); + + // Mock all required segment data to be managed by RLMTask + FileRecords fileRecords = mock(FileRecords.class); + when(oldSegment.log()).thenReturn(fileRecords); + when(fileRecords.file()).thenReturn(tempFile); + when(fileRecords.sizeInBytes()).thenReturn(10); + when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset); + + File txnFile1 = UnifiedLog.transactionIndexFile(tempDir, oldSegmentStartOffset, ""); + txnFile1.createNewFile(); + + File timeindexFile = TestUtils.tempFile(); + + TimeIndex ti = spy(new TimeIndex(timeindexFile, 45L, 12)); Review Comment: Another suggestion - please name the variable as corruptedTimeIndex ########## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ########## @@ -2482,6 +2486,134 @@ int lookupPositionForOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata, l } } + @Test + void testCopyLogSegmentsToRemoteShouldNotCopySegmentWithMissingIndexes() throws Exception { + long segmentStartOffset = 0L; + + // leader epoch preparation + checkpoint.write(totalEpochEntries); + LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint); + when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(0L)); + + // create log segment, with 0 as log start offset + LogSegment segment = mock(LogSegment.class); + + // Segment does not nage timeIndex() what is not acceptable to sanityChecks. + // In that case segment won't be copied. + when(segment.baseOffset()).thenReturn(segmentStartOffset); + + when(mockLog.activeSegment()).thenReturn(segment); + when(mockLog.logStartOffset()).thenReturn(segmentStartOffset); + when(mockLog.logSegments(anyLong(), anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(segment))); + when(mockLog.lastStableOffset()).thenReturn(150L); + + RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128); + task.copyLogSegmentsToRemote(mockLog); + + // verify the remoteLogMetadataManager never add any metadata and remoteStorageManager never copy log segments + // Since segment with index corruption should not be uploaded + verify(remoteLogMetadataManager, never()).addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class)); + verify(remoteStorageManager, never()).copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class)); + verify(remoteLogMetadataManager, never()).updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class)); + } + + + @Test + void testCorruptedTimeIndexes() throws Exception { + // copyLogSegment is executed in case we have more than 1 segment, what is why we create 2 of them + long oldSegmentStartOffset = 0L; + long nextSegmentStartOffset = 150L; + + when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition()); + + // leader epoch preparation + checkpoint.write(totalEpochEntries); + LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint); + when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(0L)); + + File tempFile = TestUtils.tempFile(); + File mockProducerSnapshotIndex = TestUtils.tempFile(); + File tempDir = TestUtils.tempDirectory(); + // create 2 log segments, with 0 and 150 as log start offset + LogSegment oldSegment = mock(LogSegment.class); + LogSegment activeSegment = mock(LogSegment.class); + + when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset); + when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset); + + // Mock all required segment data to be managed by RLMTask + FileRecords fileRecords = mock(FileRecords.class); + when(oldSegment.log()).thenReturn(fileRecords); + when(fileRecords.file()).thenReturn(tempFile); + when(fileRecords.sizeInBytes()).thenReturn(10); + when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset); + + File txnFile1 = UnifiedLog.transactionIndexFile(tempDir, oldSegmentStartOffset, ""); + txnFile1.createNewFile(); + + File timeindexFile = TestUtils.tempFile(); + + TimeIndex ti = spy(new TimeIndex(timeindexFile, 45L, 12)); Review Comment: Alternatively, can we simply use a Mock of timeIndex here? (instead of spy) ########## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ########## @@ -2482,6 +2486,134 @@ int lookupPositionForOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata, l } } + @Test + void testCopyLogSegmentsToRemoteShouldNotCopySegmentWithMissingIndexes() throws Exception { + long segmentStartOffset = 0L; + + // leader epoch preparation + checkpoint.write(totalEpochEntries); + LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint); + when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(0L)); + + // create log segment, with 0 as log start offset + LogSegment segment = mock(LogSegment.class); + + // Segment does not nage timeIndex() what is not acceptable to sanityChecks. + // In that case segment won't be copied. + when(segment.baseOffset()).thenReturn(segmentStartOffset); + + when(mockLog.activeSegment()).thenReturn(segment); + when(mockLog.logStartOffset()).thenReturn(segmentStartOffset); + when(mockLog.logSegments(anyLong(), anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(segment))); + when(mockLog.lastStableOffset()).thenReturn(150L); + + RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128); + task.copyLogSegmentsToRemote(mockLog); + + // verify the remoteLogMetadataManager never add any metadata and remoteStorageManager never copy log segments + // Since segment with index corruption should not be uploaded + verify(remoteLogMetadataManager, never()).addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class)); + verify(remoteStorageManager, never()).copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class)); + verify(remoteLogMetadataManager, never()).updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class)); + } + + + @Test + void testCorruptedTimeIndexes() throws Exception { + // copyLogSegment is executed in case we have more than 1 segment, what is why we create 2 of them + long oldSegmentStartOffset = 0L; + long nextSegmentStartOffset = 150L; + + when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition()); + + // leader epoch preparation + checkpoint.write(totalEpochEntries); + LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint); + when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(0L)); + + File tempFile = TestUtils.tempFile(); + File mockProducerSnapshotIndex = TestUtils.tempFile(); + File tempDir = TestUtils.tempDirectory(); + // create 2 log segments, with 0 and 150 as log start offset + LogSegment oldSegment = mock(LogSegment.class); + LogSegment activeSegment = mock(LogSegment.class); + + when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset); + when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset); + + // Mock all required segment data to be managed by RLMTask + FileRecords fileRecords = mock(FileRecords.class); + when(oldSegment.log()).thenReturn(fileRecords); + when(fileRecords.file()).thenReturn(tempFile); + when(fileRecords.sizeInBytes()).thenReturn(10); + when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset); + + File txnFile1 = UnifiedLog.transactionIndexFile(tempDir, oldSegmentStartOffset, ""); + txnFile1.createNewFile(); + + File timeindexFile = TestUtils.tempFile(); + + TimeIndex ti = spy(new TimeIndex(timeindexFile, 45L, 12)); + when(ti.entries()).thenReturn(1); + // One of the checks to detect index being corrupted is checking that: + // offset < baseOffset, BaseOffset is 45, offset => 0 + when(ti.lastEntry()).thenReturn(new TimestampOffset(0L, 0L)); + + // Initialise timeIndex for oldSegment + when(oldSegment.timeIndex()).thenReturn(ti); + when(oldSegment.txnIndex()).thenReturn(new TransactionIndex(nextSegmentStartOffset, txnFile1)); + when(oldSegment.offsetIndex()).thenReturn(new OffsetIndex(TestUtils.tempFile(), + oldSegmentStartOffset, maxEntries * 8)); + + File txnFile2 = UnifiedLog.transactionIndexFile(tempDir, nextSegmentStartOffset, ""); + txnFile2.createNewFile(); + + when(activeSegment.timeIndex()).thenReturn(new TimeIndex(TestUtils.tempFile(), 45L, 1)); + when(activeSegment.log()).thenReturn(fileRecords); + when(activeSegment.txnIndex()).thenReturn(new TransactionIndex(nextSegmentStartOffset, txnFile2)); + when(activeSegment.offsetIndex()).thenReturn(new OffsetIndex(TestUtils.tempFile(), + nextSegmentStartOffset, maxEntries * 8)); + + when(mockLog.activeSegment()).thenReturn(activeSegment); + when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset); + when(mockLog.logSegments(anyLong(), anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment, activeSegment))); + + ProducerStateManager mockStateManager = mock(ProducerStateManager.class); + when(mockLog.producerStateManager()).thenReturn(mockStateManager); + when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex)); + when(mockLog.lastStableOffset()).thenReturn(250L); + + + OffsetIndex idx = LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDir, oldSegmentStartOffset, ""), oldSegmentStartOffset, 1000).get(); + TimeIndex timeIdx = LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDir, oldSegmentStartOffset, ""), oldSegmentStartOffset, 1500).get(); + File txnFile = UnifiedLog.transactionIndexFile(tempDir, oldSegmentStartOffset, ""); + txnFile.createNewFile(); + TransactionIndex txnIndex = new TransactionIndex(oldSegmentStartOffset, txnFile); + when(oldSegment.timeIndex()).thenReturn(timeIdx); + when(oldSegment.offsetIndex()).thenReturn(idx); + when(oldSegment.txnIndex()).thenReturn(txnIndex); + + CompletableFuture<Void> dummyFuture = new CompletableFuture<>(); + dummyFuture.complete(null); Review Comment: nit in Java you can create completed futures directly using `CompletableFuture.completedFuture` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org