divijvaidya commented on code in PR #15472:
URL: https://github.com/apache/kafka/pull/15472#discussion_r1514414426


##########
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##########
@@ -202,6 +203,9 @@ public List<EpochEntry> read() {
 
     private final UnifiedLog mockLog = mock(UnifiedLog.class);
 
+    Integer maxEntries = 30;

Review Comment:
   `private final static` please
   
   Also, for constants we usually capital snake syntax such as MAX_ENTRIES
   
   (same for base offset)



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -722,6 +727,13 @@ private void copyLogSegment(UnifiedLog log, LogSegment 
segment, long nextSegment
             File logFile = segment.log().file();
             String logFileName = logFile.getName();
 
+            // Corrupted indexes should not be uploaded to remote storage
+            // Example case: Local storage was filled, what caused index 
corruption
+            // We should avoid uploading such segments
+            segment.timeIndex().sanityCheck();
+            segment.offsetIndex().sanityCheck();
+            segment.txnIndex().sanityCheck();
+
             logger.info("Copying {} to remote storage.", logFileName);

Review Comment:
   This should probably be moved before the sanity checks so that during 
debugging it is easy to understand what segment was being uploaded when index 
failed.



##########
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##########
@@ -2482,6 +2486,134 @@ int lookupPositionForOffset(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata, l
         }
     }
 
+    @Test
+    void testCopyLogSegmentsToRemoteShouldNotCopySegmentWithMissingIndexes() 
throws Exception {
+        long segmentStartOffset = 0L;
+
+        // leader epoch preparation
+        checkpoint.write(totalEpochEntries);
+        LeaderEpochFileCache cache = new 
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+        when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+        
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
 anyInt())).thenReturn(Optional.of(0L));
+
+        // create log segment, with 0 as log start offset
+        LogSegment segment = mock(LogSegment.class);
+
+        // Segment does not nage timeIndex() what is not acceptable to 
sanityChecks.
+        // In that case segment won't be copied.
+        when(segment.baseOffset()).thenReturn(segmentStartOffset);
+
+        when(mockLog.activeSegment()).thenReturn(segment);
+        when(mockLog.logStartOffset()).thenReturn(segmentStartOffset);
+        when(mockLog.logSegments(anyLong(), 
anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(segment)));
+        when(mockLog.lastStableOffset()).thenReturn(150L);
+
+        RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMTask(leaderTopicIdPartition, 128);
+        task.copyLogSegmentsToRemote(mockLog);
+
+        // verify the remoteLogMetadataManager never add any metadata and 
remoteStorageManager never copy log segments
+        // Since segment with index corruption should not be uploaded
+        verify(remoteLogMetadataManager, 
never()).addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class));
+        verify(remoteStorageManager, 
never()).copyLogSegmentData(any(RemoteLogSegmentMetadata.class), 
any(LogSegmentData.class));
+        verify(remoteLogMetadataManager, 
never()).updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class));
+    }
+
+
+    @Test
+    void testCorruptedTimeIndexes() throws Exception {
+        // copyLogSegment is executed in case we have more than 1 segment, 
what is why we create 2 of them
+        long oldSegmentStartOffset = 0L;
+        long nextSegmentStartOffset = 150L;
+
+        
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+
+        // leader epoch preparation
+        checkpoint.write(totalEpochEntries);
+        LeaderEpochFileCache cache = new 
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+        when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+        
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
 anyInt())).thenReturn(Optional.of(0L));
+
+        File tempFile = TestUtils.tempFile();
+        File mockProducerSnapshotIndex = TestUtils.tempFile();
+        File tempDir = TestUtils.tempDirectory();
+        // create 2 log segments, with 0 and 150 as log start offset
+        LogSegment oldSegment = mock(LogSegment.class);
+        LogSegment activeSegment = mock(LogSegment.class);
+
+        when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset);
+        when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset);
+
+        // Mock all required segment data to be managed by RLMTask
+        FileRecords fileRecords = mock(FileRecords.class);
+        when(oldSegment.log()).thenReturn(fileRecords);
+        when(fileRecords.file()).thenReturn(tempFile);
+        when(fileRecords.sizeInBytes()).thenReturn(10);
+        when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset);
+
+        File txnFile1 = UnifiedLog.transactionIndexFile(tempDir, 
oldSegmentStartOffset, "");
+        txnFile1.createNewFile();
+
+        File timeindexFile = TestUtils.tempFile();
+
+        TimeIndex ti = spy(new TimeIndex(timeindexFile, 45L, 12));

Review Comment:
   May I suggest to create the 4th param as false i.e. writable = false since 
we don't intend to write to this file. Similar for Offset index.
   
   Also please close them at the end of test else they will continue to be 
mmapped



##########
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##########
@@ -2482,6 +2486,134 @@ int lookupPositionForOffset(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata, l
         }
     }
 
+    @Test
+    void testCopyLogSegmentsToRemoteShouldNotCopySegmentWithMissingIndexes() 
throws Exception {
+        long segmentStartOffset = 0L;
+
+        // leader epoch preparation
+        checkpoint.write(totalEpochEntries);
+        LeaderEpochFileCache cache = new 
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+        when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+        
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
 anyInt())).thenReturn(Optional.of(0L));
+
+        // create log segment, with 0 as log start offset
+        LogSegment segment = mock(LogSegment.class);
+
+        // Segment does not nage timeIndex() what is not acceptable to 
sanityChecks.
+        // In that case segment won't be copied.
+        when(segment.baseOffset()).thenReturn(segmentStartOffset);
+
+        when(mockLog.activeSegment()).thenReturn(segment);
+        when(mockLog.logStartOffset()).thenReturn(segmentStartOffset);
+        when(mockLog.logSegments(anyLong(), 
anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(segment)));
+        when(mockLog.lastStableOffset()).thenReturn(150L);
+
+        RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMTask(leaderTopicIdPartition, 128);
+        task.copyLogSegmentsToRemote(mockLog);
+
+        // verify the remoteLogMetadataManager never add any metadata and 
remoteStorageManager never copy log segments
+        // Since segment with index corruption should not be uploaded
+        verify(remoteLogMetadataManager, 
never()).addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class));
+        verify(remoteStorageManager, 
never()).copyLogSegmentData(any(RemoteLogSegmentMetadata.class), 
any(LogSegmentData.class));
+        verify(remoteLogMetadataManager, 
never()).updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class));
+    }
+
+
+    @Test
+    void testCorruptedTimeIndexes() throws Exception {
+        // copyLogSegment is executed in case we have more than 1 segment, 
what is why we create 2 of them
+        long oldSegmentStartOffset = 0L;
+        long nextSegmentStartOffset = 150L;
+
+        
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+
+        // leader epoch preparation
+        checkpoint.write(totalEpochEntries);
+        LeaderEpochFileCache cache = new 
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+        when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+        
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
 anyInt())).thenReturn(Optional.of(0L));
+
+        File tempFile = TestUtils.tempFile();
+        File mockProducerSnapshotIndex = TestUtils.tempFile();
+        File tempDir = TestUtils.tempDirectory();
+        // create 2 log segments, with 0 and 150 as log start offset
+        LogSegment oldSegment = mock(LogSegment.class);
+        LogSegment activeSegment = mock(LogSegment.class);
+
+        when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset);
+        when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset);
+
+        // Mock all required segment data to be managed by RLMTask
+        FileRecords fileRecords = mock(FileRecords.class);
+        when(oldSegment.log()).thenReturn(fileRecords);
+        when(fileRecords.file()).thenReturn(tempFile);
+        when(fileRecords.sizeInBytes()).thenReturn(10);
+        when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset);
+
+        File txnFile1 = UnifiedLog.transactionIndexFile(tempDir, 
oldSegmentStartOffset, "");
+        txnFile1.createNewFile();
+
+        File timeindexFile = TestUtils.tempFile();
+
+        TimeIndex ti = spy(new TimeIndex(timeindexFile, 45L, 12));
+        when(ti.entries()).thenReturn(1);
+        // One of the checks to detect index being corrupted is checking that:
+        // offset < baseOffset, BaseOffset is 45, offset => 0
+        when(ti.lastEntry()).thenReturn(new TimestampOffset(0L, 0L));
+
+        // Initialise timeIndex for oldSegment
+        when(oldSegment.timeIndex()).thenReturn(ti);
+        when(oldSegment.txnIndex()).thenReturn(new 
TransactionIndex(nextSegmentStartOffset, txnFile1));
+        when(oldSegment.offsetIndex()).thenReturn(new 
OffsetIndex(TestUtils.tempFile(),
+                oldSegmentStartOffset, maxEntries * 8));
+
+        File txnFile2 = UnifiedLog.transactionIndexFile(tempDir, 
nextSegmentStartOffset, "");
+        txnFile2.createNewFile();
+
+        when(activeSegment.timeIndex()).thenReturn(new 
TimeIndex(TestUtils.tempFile(), 45L, 1));
+        when(activeSegment.log()).thenReturn(fileRecords);
+        when(activeSegment.txnIndex()).thenReturn(new 
TransactionIndex(nextSegmentStartOffset, txnFile2));
+        when(activeSegment.offsetIndex()).thenReturn(new 
OffsetIndex(TestUtils.tempFile(),
+                nextSegmentStartOffset, maxEntries * 8));
+
+        when(mockLog.activeSegment()).thenReturn(activeSegment);
+        when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset);
+        when(mockLog.logSegments(anyLong(), 
anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment,
 activeSegment)));
+
+        ProducerStateManager mockStateManager = 
mock(ProducerStateManager.class);
+        when(mockLog.producerStateManager()).thenReturn(mockStateManager);
+        
when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex));
+        when(mockLog.lastStableOffset()).thenReturn(250L);
+
+
+        OffsetIndex idx = 
LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDir, 
oldSegmentStartOffset, ""), oldSegmentStartOffset, 1000).get();
+        TimeIndex timeIdx = 
LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDir, oldSegmentStartOffset, 
""), oldSegmentStartOffset, 1500).get();
+        File txnFile = UnifiedLog.transactionIndexFile(tempDir, 
oldSegmentStartOffset, "");
+        txnFile.createNewFile();
+        TransactionIndex txnIndex = new 
TransactionIndex(oldSegmentStartOffset, txnFile);
+        when(oldSegment.timeIndex()).thenReturn(timeIdx);
+        when(oldSegment.offsetIndex()).thenReturn(idx);
+        when(oldSegment.txnIndex()).thenReturn(txnIndex);
+
+        CompletableFuture<Void> dummyFuture = new CompletableFuture<>();
+        dummyFuture.complete(null);
+        
when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture);
+        
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture);

Review Comment:
   do we need these? These function will never be called. No?



##########
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##########
@@ -2482,6 +2486,134 @@ int lookupPositionForOffset(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata, l
         }
     }
 
+    @Test
+    void testCopyLogSegmentsToRemoteShouldNotCopySegmentWithMissingIndexes() 
throws Exception {
+        long segmentStartOffset = 0L;
+
+        // leader epoch preparation
+        checkpoint.write(totalEpochEntries);
+        LeaderEpochFileCache cache = new 
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+        when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+        
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
 anyInt())).thenReturn(Optional.of(0L));
+
+        // create log segment, with 0 as log start offset
+        LogSegment segment = mock(LogSegment.class);
+
+        // Segment does not nage timeIndex() what is not acceptable to 
sanityChecks.
+        // In that case segment won't be copied.
+        when(segment.baseOffset()).thenReturn(segmentStartOffset);
+
+        when(mockLog.activeSegment()).thenReturn(segment);
+        when(mockLog.logStartOffset()).thenReturn(segmentStartOffset);
+        when(mockLog.logSegments(anyLong(), 
anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(segment)));
+        when(mockLog.lastStableOffset()).thenReturn(150L);
+
+        RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMTask(leaderTopicIdPartition, 128);
+        task.copyLogSegmentsToRemote(mockLog);
+
+        // verify the remoteLogMetadataManager never add any metadata and 
remoteStorageManager never copy log segments
+        // Since segment with index corruption should not be uploaded
+        verify(remoteLogMetadataManager, 
never()).addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class));
+        verify(remoteStorageManager, 
never()).copyLogSegmentData(any(RemoteLogSegmentMetadata.class), 
any(LogSegmentData.class));
+        verify(remoteLogMetadataManager, 
never()).updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class));
+    }
+
+
+    @Test
+    void testCorruptedTimeIndexes() throws Exception {
+        // copyLogSegment is executed in case we have more than 1 segment, 
what is why we create 2 of them
+        long oldSegmentStartOffset = 0L;
+        long nextSegmentStartOffset = 150L;
+
+        
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+
+        // leader epoch preparation
+        checkpoint.write(totalEpochEntries);
+        LeaderEpochFileCache cache = new 
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+        when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+        
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
 anyInt())).thenReturn(Optional.of(0L));
+
+        File tempFile = TestUtils.tempFile();
+        File mockProducerSnapshotIndex = TestUtils.tempFile();
+        File tempDir = TestUtils.tempDirectory();
+        // create 2 log segments, with 0 and 150 as log start offset
+        LogSegment oldSegment = mock(LogSegment.class);
+        LogSegment activeSegment = mock(LogSegment.class);
+
+        when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset);
+        when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset);
+
+        // Mock all required segment data to be managed by RLMTask
+        FileRecords fileRecords = mock(FileRecords.class);
+        when(oldSegment.log()).thenReturn(fileRecords);
+        when(fileRecords.file()).thenReturn(tempFile);
+        when(fileRecords.sizeInBytes()).thenReturn(10);
+        when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset);
+
+        File txnFile1 = UnifiedLog.transactionIndexFile(tempDir, 
oldSegmentStartOffset, "");
+        txnFile1.createNewFile();
+
+        File timeindexFile = TestUtils.tempFile();
+
+        TimeIndex ti = spy(new TimeIndex(timeindexFile, 45L, 12));
+        when(ti.entries()).thenReturn(1);
+        // One of the checks to detect index being corrupted is checking that:
+        // offset < baseOffset, BaseOffset is 45, offset => 0
+        when(ti.lastEntry()).thenReturn(new TimestampOffset(0L, 0L));
+
+        // Initialise timeIndex for oldSegment
+        when(oldSegment.timeIndex()).thenReturn(ti);
+        when(oldSegment.txnIndex()).thenReturn(new 
TransactionIndex(nextSegmentStartOffset, txnFile1));
+        when(oldSegment.offsetIndex()).thenReturn(new 
OffsetIndex(TestUtils.tempFile(),
+                oldSegmentStartOffset, maxEntries * 8));
+
+        File txnFile2 = UnifiedLog.transactionIndexFile(tempDir, 
nextSegmentStartOffset, "");
+        txnFile2.createNewFile();
+
+        when(activeSegment.timeIndex()).thenReturn(new 
TimeIndex(TestUtils.tempFile(), 45L, 1));
+        when(activeSegment.log()).thenReturn(fileRecords);
+        when(activeSegment.txnIndex()).thenReturn(new 
TransactionIndex(nextSegmentStartOffset, txnFile2));
+        when(activeSegment.offsetIndex()).thenReturn(new 
OffsetIndex(TestUtils.tempFile(),
+                nextSegmentStartOffset, maxEntries * 8));
+
+        when(mockLog.activeSegment()).thenReturn(activeSegment);
+        when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset);
+        when(mockLog.logSegments(anyLong(), 
anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment,
 activeSegment)));
+
+        ProducerStateManager mockStateManager = 
mock(ProducerStateManager.class);
+        when(mockLog.producerStateManager()).thenReturn(mockStateManager);
+        
when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex));
+        when(mockLog.lastStableOffset()).thenReturn(250L);
+

Review Comment:
   do we need this? Our index validation should throw an error before we start 
working with producer state.



##########
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##########
@@ -2482,6 +2486,134 @@ int lookupPositionForOffset(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata, l
         }
     }
 
+    @Test
+    void testCopyLogSegmentsToRemoteShouldNotCopySegmentWithMissingIndexes() 
throws Exception {
+        long segmentStartOffset = 0L;
+
+        // leader epoch preparation
+        checkpoint.write(totalEpochEntries);
+        LeaderEpochFileCache cache = new 
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+        when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+        
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
 anyInt())).thenReturn(Optional.of(0L));
+
+        // create log segment, with 0 as log start offset
+        LogSegment segment = mock(LogSegment.class);
+
+        // Segment does not nage timeIndex() what is not acceptable to 
sanityChecks.
+        // In that case segment won't be copied.
+        when(segment.baseOffset()).thenReturn(segmentStartOffset);
+
+        when(mockLog.activeSegment()).thenReturn(segment);
+        when(mockLog.logStartOffset()).thenReturn(segmentStartOffset);
+        when(mockLog.logSegments(anyLong(), 
anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(segment)));
+        when(mockLog.lastStableOffset()).thenReturn(150L);
+
+        RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMTask(leaderTopicIdPartition, 128);
+        task.copyLogSegmentsToRemote(mockLog);
+
+        // verify the remoteLogMetadataManager never add any metadata and 
remoteStorageManager never copy log segments
+        // Since segment with index corruption should not be uploaded
+        verify(remoteLogMetadataManager, 
never()).addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class));
+        verify(remoteStorageManager, 
never()).copyLogSegmentData(any(RemoteLogSegmentMetadata.class), 
any(LogSegmentData.class));
+        verify(remoteLogMetadataManager, 
never()).updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class));
+    }
+
+
+    @Test
+    void testCorruptedTimeIndexes() throws Exception {
+        // copyLogSegment is executed in case we have more than 1 segment, 
what is why we create 2 of them
+        long oldSegmentStartOffset = 0L;
+        long nextSegmentStartOffset = 150L;
+
+        
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+
+        // leader epoch preparation
+        checkpoint.write(totalEpochEntries);
+        LeaderEpochFileCache cache = new 
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+        when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+        
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
 anyInt())).thenReturn(Optional.of(0L));
+
+        File tempFile = TestUtils.tempFile();
+        File mockProducerSnapshotIndex = TestUtils.tempFile();
+        File tempDir = TestUtils.tempDirectory();
+        // create 2 log segments, with 0 and 150 as log start offset
+        LogSegment oldSegment = mock(LogSegment.class);
+        LogSegment activeSegment = mock(LogSegment.class);
+
+        when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset);
+        when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset);
+
+        // Mock all required segment data to be managed by RLMTask
+        FileRecords fileRecords = mock(FileRecords.class);
+        when(oldSegment.log()).thenReturn(fileRecords);
+        when(fileRecords.file()).thenReturn(tempFile);
+        when(fileRecords.sizeInBytes()).thenReturn(10);
+        when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset);
+
+        File txnFile1 = UnifiedLog.transactionIndexFile(tempDir, 
oldSegmentStartOffset, "");
+        txnFile1.createNewFile();
+
+        File timeindexFile = TestUtils.tempFile();
+
+        TimeIndex ti = spy(new TimeIndex(timeindexFile, 45L, 12));
+        when(ti.entries()).thenReturn(1);
+        // One of the checks to detect index being corrupted is checking that:
+        // offset < baseOffset, BaseOffset is 45, offset => 0
+        when(ti.lastEntry()).thenReturn(new TimestampOffset(0L, 0L));
+
+        // Initialise timeIndex for oldSegment
+        when(oldSegment.timeIndex()).thenReturn(ti);
+        when(oldSegment.txnIndex()).thenReturn(new 
TransactionIndex(nextSegmentStartOffset, txnFile1));
+        when(oldSegment.offsetIndex()).thenReturn(new 
OffsetIndex(TestUtils.tempFile(),
+                oldSegmentStartOffset, maxEntries * 8));
+
+        File txnFile2 = UnifiedLog.transactionIndexFile(tempDir, 
nextSegmentStartOffset, "");
+        txnFile2.createNewFile();
+
+        when(activeSegment.timeIndex()).thenReturn(new 
TimeIndex(TestUtils.tempFile(), 45L, 1));
+        when(activeSegment.log()).thenReturn(fileRecords);
+        when(activeSegment.txnIndex()).thenReturn(new 
TransactionIndex(nextSegmentStartOffset, txnFile2));
+        when(activeSegment.offsetIndex()).thenReturn(new 
OffsetIndex(TestUtils.tempFile(),
+                nextSegmentStartOffset, maxEntries * 8));
+
+        when(mockLog.activeSegment()).thenReturn(activeSegment);
+        when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset);
+        when(mockLog.logSegments(anyLong(), 
anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment,
 activeSegment)));
+
+        ProducerStateManager mockStateManager = 
mock(ProducerStateManager.class);
+        when(mockLog.producerStateManager()).thenReturn(mockStateManager);
+        
when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex));
+        when(mockLog.lastStableOffset()).thenReturn(250L);
+
+
+        OffsetIndex idx = 
LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDir, 
oldSegmentStartOffset, ""), oldSegmentStartOffset, 1000).get();
+        TimeIndex timeIdx = 
LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDir, oldSegmentStartOffset, 
""), oldSegmentStartOffset, 1500).get();
+        File txnFile = UnifiedLog.transactionIndexFile(tempDir, 
oldSegmentStartOffset, "");
+        txnFile.createNewFile();
+        TransactionIndex txnIndex = new 
TransactionIndex(oldSegmentStartOffset, txnFile);
+        when(oldSegment.timeIndex()).thenReturn(timeIdx);
+        when(oldSegment.offsetIndex()).thenReturn(idx);
+        when(oldSegment.txnIndex()).thenReturn(txnIndex);
+
+        CompletableFuture<Void> dummyFuture = new CompletableFuture<>();
+        dummyFuture.complete(null);
+        
when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture);
+        
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture);
+        
doNothing().when(remoteStorageManager).copyLogSegmentData(any(RemoteLogSegmentMetadata.class),
 any(LogSegmentData.class));
+
+        RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMTask(leaderTopicIdPartition, 128);
+        // Just simulate situation when we upload next segment from another 
leader
+        task.convertToLeader(2);
+        // Here CorruptIndex will be thrown and caught by 
copyLogSegmentsToRemote
+        task.copyLogSegmentsToRemote(mockLog);
+
+        // verify the remoteLogMetadataManager never add any metadata and 
remoteStorageManager never copy log segments
+        // Since segment with index corruption should not be uploaded
+        verify(remoteLogMetadataManager, 
never()).addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class));

Review Comment:
   can you please verify that copyLogSegment() was called. Otherwise we will 
miss cases where the upload was skipped for some other reason



##########
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##########
@@ -2482,6 +2486,134 @@ int lookupPositionForOffset(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata, l
         }
     }
 
+    @Test
+    void testCopyLogSegmentsToRemoteShouldNotCopySegmentWithMissingIndexes() 
throws Exception {
+        long segmentStartOffset = 0L;
+
+        // leader epoch preparation
+        checkpoint.write(totalEpochEntries);
+        LeaderEpochFileCache cache = new 
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+        when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+        
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
 anyInt())).thenReturn(Optional.of(0L));
+
+        // create log segment, with 0 as log start offset
+        LogSegment segment = mock(LogSegment.class);
+
+        // Segment does not nage timeIndex() what is not acceptable to 
sanityChecks.
+        // In that case segment won't be copied.
+        when(segment.baseOffset()).thenReturn(segmentStartOffset);
+
+        when(mockLog.activeSegment()).thenReturn(segment);
+        when(mockLog.logStartOffset()).thenReturn(segmentStartOffset);
+        when(mockLog.logSegments(anyLong(), 
anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(segment)));
+        when(mockLog.lastStableOffset()).thenReturn(150L);
+
+        RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMTask(leaderTopicIdPartition, 128);
+        task.copyLogSegmentsToRemote(mockLog);
+
+        // verify the remoteLogMetadataManager never add any metadata and 
remoteStorageManager never copy log segments
+        // Since segment with index corruption should not be uploaded
+        verify(remoteLogMetadataManager, 
never()).addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class));
+        verify(remoteStorageManager, 
never()).copyLogSegmentData(any(RemoteLogSegmentMetadata.class), 
any(LogSegmentData.class));
+        verify(remoteLogMetadataManager, 
never()).updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class));
+    }
+
+
+    @Test
+    void testCorruptedTimeIndexes() throws Exception {
+        // copyLogSegment is executed in case we have more than 1 segment, 
what is why we create 2 of them
+        long oldSegmentStartOffset = 0L;
+        long nextSegmentStartOffset = 150L;
+
+        
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+
+        // leader epoch preparation
+        checkpoint.write(totalEpochEntries);
+        LeaderEpochFileCache cache = new 
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+        when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+        
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
 anyInt())).thenReturn(Optional.of(0L));
+
+        File tempFile = TestUtils.tempFile();
+        File mockProducerSnapshotIndex = TestUtils.tempFile();
+        File tempDir = TestUtils.tempDirectory();
+        // create 2 log segments, with 0 and 150 as log start offset
+        LogSegment oldSegment = mock(LogSegment.class);
+        LogSegment activeSegment = mock(LogSegment.class);
+
+        when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset);
+        when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset);
+
+        // Mock all required segment data to be managed by RLMTask
+        FileRecords fileRecords = mock(FileRecords.class);
+        when(oldSegment.log()).thenReturn(fileRecords);
+        when(fileRecords.file()).thenReturn(tempFile);
+        when(fileRecords.sizeInBytes()).thenReturn(10);
+        when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset);
+
+        File txnFile1 = UnifiedLog.transactionIndexFile(tempDir, 
oldSegmentStartOffset, "");
+        txnFile1.createNewFile();
+
+        File timeindexFile = TestUtils.tempFile();
+
+        TimeIndex ti = spy(new TimeIndex(timeindexFile, 45L, 12));

Review Comment:
   Another suggestion - please name the variable as corruptedTimeIndex



##########
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##########
@@ -2482,6 +2486,134 @@ int lookupPositionForOffset(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata, l
         }
     }
 
+    @Test
+    void testCopyLogSegmentsToRemoteShouldNotCopySegmentWithMissingIndexes() 
throws Exception {
+        long segmentStartOffset = 0L;
+
+        // leader epoch preparation
+        checkpoint.write(totalEpochEntries);
+        LeaderEpochFileCache cache = new 
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+        when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+        
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
 anyInt())).thenReturn(Optional.of(0L));
+
+        // create log segment, with 0 as log start offset
+        LogSegment segment = mock(LogSegment.class);
+
+        // Segment does not nage timeIndex() what is not acceptable to 
sanityChecks.
+        // In that case segment won't be copied.
+        when(segment.baseOffset()).thenReturn(segmentStartOffset);
+
+        when(mockLog.activeSegment()).thenReturn(segment);
+        when(mockLog.logStartOffset()).thenReturn(segmentStartOffset);
+        when(mockLog.logSegments(anyLong(), 
anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(segment)));
+        when(mockLog.lastStableOffset()).thenReturn(150L);
+
+        RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMTask(leaderTopicIdPartition, 128);
+        task.copyLogSegmentsToRemote(mockLog);
+
+        // verify the remoteLogMetadataManager never add any metadata and 
remoteStorageManager never copy log segments
+        // Since segment with index corruption should not be uploaded
+        verify(remoteLogMetadataManager, 
never()).addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class));
+        verify(remoteStorageManager, 
never()).copyLogSegmentData(any(RemoteLogSegmentMetadata.class), 
any(LogSegmentData.class));
+        verify(remoteLogMetadataManager, 
never()).updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class));
+    }
+
+
+    @Test
+    void testCorruptedTimeIndexes() throws Exception {
+        // copyLogSegment is executed in case we have more than 1 segment, 
what is why we create 2 of them
+        long oldSegmentStartOffset = 0L;
+        long nextSegmentStartOffset = 150L;
+
+        
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+
+        // leader epoch preparation
+        checkpoint.write(totalEpochEntries);
+        LeaderEpochFileCache cache = new 
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+        when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+        
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
 anyInt())).thenReturn(Optional.of(0L));
+
+        File tempFile = TestUtils.tempFile();
+        File mockProducerSnapshotIndex = TestUtils.tempFile();
+        File tempDir = TestUtils.tempDirectory();
+        // create 2 log segments, with 0 and 150 as log start offset
+        LogSegment oldSegment = mock(LogSegment.class);
+        LogSegment activeSegment = mock(LogSegment.class);
+
+        when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset);
+        when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset);
+
+        // Mock all required segment data to be managed by RLMTask
+        FileRecords fileRecords = mock(FileRecords.class);
+        when(oldSegment.log()).thenReturn(fileRecords);
+        when(fileRecords.file()).thenReturn(tempFile);
+        when(fileRecords.sizeInBytes()).thenReturn(10);
+        when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset);
+
+        File txnFile1 = UnifiedLog.transactionIndexFile(tempDir, 
oldSegmentStartOffset, "");
+        txnFile1.createNewFile();
+
+        File timeindexFile = TestUtils.tempFile();
+
+        TimeIndex ti = spy(new TimeIndex(timeindexFile, 45L, 12));

Review Comment:
   Alternatively, can we simply use a Mock of timeIndex here? (instead of spy)



##########
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##########
@@ -2482,6 +2486,134 @@ int lookupPositionForOffset(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata, l
         }
     }
 
+    @Test
+    void testCopyLogSegmentsToRemoteShouldNotCopySegmentWithMissingIndexes() 
throws Exception {
+        long segmentStartOffset = 0L;
+
+        // leader epoch preparation
+        checkpoint.write(totalEpochEntries);
+        LeaderEpochFileCache cache = new 
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+        when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+        
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
 anyInt())).thenReturn(Optional.of(0L));
+
+        // create log segment, with 0 as log start offset
+        LogSegment segment = mock(LogSegment.class);
+
+        // Segment does not nage timeIndex() what is not acceptable to 
sanityChecks.
+        // In that case segment won't be copied.
+        when(segment.baseOffset()).thenReturn(segmentStartOffset);
+
+        when(mockLog.activeSegment()).thenReturn(segment);
+        when(mockLog.logStartOffset()).thenReturn(segmentStartOffset);
+        when(mockLog.logSegments(anyLong(), 
anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(segment)));
+        when(mockLog.lastStableOffset()).thenReturn(150L);
+
+        RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMTask(leaderTopicIdPartition, 128);
+        task.copyLogSegmentsToRemote(mockLog);
+
+        // verify the remoteLogMetadataManager never add any metadata and 
remoteStorageManager never copy log segments
+        // Since segment with index corruption should not be uploaded
+        verify(remoteLogMetadataManager, 
never()).addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class));
+        verify(remoteStorageManager, 
never()).copyLogSegmentData(any(RemoteLogSegmentMetadata.class), 
any(LogSegmentData.class));
+        verify(remoteLogMetadataManager, 
never()).updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class));
+    }
+
+
+    @Test
+    void testCorruptedTimeIndexes() throws Exception {
+        // copyLogSegment is executed in case we have more than 1 segment, 
what is why we create 2 of them
+        long oldSegmentStartOffset = 0L;
+        long nextSegmentStartOffset = 150L;
+
+        
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+
+        // leader epoch preparation
+        checkpoint.write(totalEpochEntries);
+        LeaderEpochFileCache cache = new 
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+        when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+        
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
 anyInt())).thenReturn(Optional.of(0L));
+
+        File tempFile = TestUtils.tempFile();
+        File mockProducerSnapshotIndex = TestUtils.tempFile();
+        File tempDir = TestUtils.tempDirectory();
+        // create 2 log segments, with 0 and 150 as log start offset
+        LogSegment oldSegment = mock(LogSegment.class);
+        LogSegment activeSegment = mock(LogSegment.class);
+
+        when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset);
+        when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset);
+
+        // Mock all required segment data to be managed by RLMTask
+        FileRecords fileRecords = mock(FileRecords.class);
+        when(oldSegment.log()).thenReturn(fileRecords);
+        when(fileRecords.file()).thenReturn(tempFile);
+        when(fileRecords.sizeInBytes()).thenReturn(10);
+        when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset);
+
+        File txnFile1 = UnifiedLog.transactionIndexFile(tempDir, 
oldSegmentStartOffset, "");
+        txnFile1.createNewFile();
+
+        File timeindexFile = TestUtils.tempFile();
+
+        TimeIndex ti = spy(new TimeIndex(timeindexFile, 45L, 12));
+        when(ti.entries()).thenReturn(1);
+        // One of the checks to detect index being corrupted is checking that:
+        // offset < baseOffset, BaseOffset is 45, offset => 0
+        when(ti.lastEntry()).thenReturn(new TimestampOffset(0L, 0L));
+
+        // Initialise timeIndex for oldSegment
+        when(oldSegment.timeIndex()).thenReturn(ti);
+        when(oldSegment.txnIndex()).thenReturn(new 
TransactionIndex(nextSegmentStartOffset, txnFile1));
+        when(oldSegment.offsetIndex()).thenReturn(new 
OffsetIndex(TestUtils.tempFile(),
+                oldSegmentStartOffset, maxEntries * 8));
+
+        File txnFile2 = UnifiedLog.transactionIndexFile(tempDir, 
nextSegmentStartOffset, "");
+        txnFile2.createNewFile();
+
+        when(activeSegment.timeIndex()).thenReturn(new 
TimeIndex(TestUtils.tempFile(), 45L, 1));
+        when(activeSegment.log()).thenReturn(fileRecords);
+        when(activeSegment.txnIndex()).thenReturn(new 
TransactionIndex(nextSegmentStartOffset, txnFile2));
+        when(activeSegment.offsetIndex()).thenReturn(new 
OffsetIndex(TestUtils.tempFile(),
+                nextSegmentStartOffset, maxEntries * 8));
+
+        when(mockLog.activeSegment()).thenReturn(activeSegment);
+        when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset);
+        when(mockLog.logSegments(anyLong(), 
anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment,
 activeSegment)));
+
+        ProducerStateManager mockStateManager = 
mock(ProducerStateManager.class);
+        when(mockLog.producerStateManager()).thenReturn(mockStateManager);
+        
when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex));
+        when(mockLog.lastStableOffset()).thenReturn(250L);
+
+
+        OffsetIndex idx = 
LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDir, 
oldSegmentStartOffset, ""), oldSegmentStartOffset, 1000).get();
+        TimeIndex timeIdx = 
LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDir, oldSegmentStartOffset, 
""), oldSegmentStartOffset, 1500).get();
+        File txnFile = UnifiedLog.transactionIndexFile(tempDir, 
oldSegmentStartOffset, "");
+        txnFile.createNewFile();
+        TransactionIndex txnIndex = new 
TransactionIndex(oldSegmentStartOffset, txnFile);
+        when(oldSegment.timeIndex()).thenReturn(timeIdx);
+        when(oldSegment.offsetIndex()).thenReturn(idx);
+        when(oldSegment.txnIndex()).thenReturn(txnIndex);
+
+        CompletableFuture<Void> dummyFuture = new CompletableFuture<>();
+        dummyFuture.complete(null);

Review Comment:
   nit
   
   in Java you can create completed futures directly using 
`CompletableFuture.completedFuture`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to