Re: [PR] KAFKA-15401 | Segment with corrupted index should not be tiered [kafka]

2024-06-04 Thread via GitHub


github-actions[bot] commented on PR #15472:
URL: https://github.com/apache/kafka/pull/15472#issuecomment-2148789357

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15401 | Segment with corrupted index should not be tiered [kafka]

2024-03-06 Thread via GitHub


divijvaidya commented on code in PR #15472:
URL: https://github.com/apache/kafka/pull/15472#discussion_r1514414426


##
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##
@@ -202,6 +203,9 @@ public List read() {
 
 private final UnifiedLog mockLog = mock(UnifiedLog.class);
 
+Integer maxEntries = 30;

Review Comment:
   `private final static` please
   
   Also, for constants we usually capital snake syntax such as MAX_ENTRIES
   
   (same for base offset)



##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -722,6 +727,13 @@ private void copyLogSegment(UnifiedLog log, LogSegment 
segment, long nextSegment
 File logFile = segment.log().file();
 String logFileName = logFile.getName();
 
+// Corrupted indexes should not be uploaded to remote storage
+// Example case: Local storage was filled, what caused index 
corruption
+// We should avoid uploading such segments
+segment.timeIndex().sanityCheck();
+segment.offsetIndex().sanityCheck();
+segment.txnIndex().sanityCheck();
+
 logger.info("Copying {} to remote storage.", logFileName);

Review Comment:
   This should probably be moved before the sanity checks so that during 
debugging it is easy to understand what segment was being uploaded when index 
failed.



##
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##
@@ -2482,6 +2486,134 @@ int lookupPositionForOffset(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata, l
 }
 }
 
+@Test
+void testCopyLogSegmentsToRemoteShouldNotCopySegmentWithMissingIndexes() 
throws Exception {
+long segmentStartOffset = 0L;
+
+// leader epoch preparation
+checkpoint.write(totalEpochEntries);
+LeaderEpochFileCache cache = new 
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
 anyInt())).thenReturn(Optional.of(0L));
+
+// create log segment, with 0 as log start offset
+LogSegment segment = mock(LogSegment.class);
+
+// Segment does not nage timeIndex() what is not acceptable to 
sanityChecks.
+// In that case segment won't be copied.
+when(segment.baseOffset()).thenReturn(segmentStartOffset);
+
+when(mockLog.activeSegment()).thenReturn(segment);
+when(mockLog.logStartOffset()).thenReturn(segmentStartOffset);
+when(mockLog.logSegments(anyLong(), 
anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(segment)));
+when(mockLog.lastStableOffset()).thenReturn(150L);
+
+RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMTask(leaderTopicIdPartition, 128);
+task.copyLogSegmentsToRemote(mockLog);
+
+// verify the remoteLogMetadataManager never add any metadata and 
remoteStorageManager never copy log segments
+// Since segment with index corruption should not be uploaded
+verify(remoteLogMetadataManager, 
never()).addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class));
+verify(remoteStorageManager, 
never()).copyLogSegmentData(any(RemoteLogSegmentMetadata.class), 
any(LogSegmentData.class));
+verify(remoteLogMetadataManager, 
never()).updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class));
+}
+
+
+@Test
+void testCorruptedTimeIndexes() throws Exception {
+// copyLogSegment is executed in case we have more than 1 segment, 
what is why we create 2 of them
+long oldSegmentStartOffset = 0L;
+long nextSegmentStartOffset = 150L;
+
+
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+
+// leader epoch preparation
+checkpoint.write(totalEpochEntries);
+LeaderEpochFileCache cache = new 
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
 anyInt())).thenReturn(Optional.of(0L));
+
+File tempFile = TestUtils.tempFile();
+File mockProducerSnapshotIndex = TestUtils.tempFile();
+File tempDir = TestUtils.tempDirectory();
+// create 2 log segments, with 0 and 150 as log start offset
+LogSegment oldSegment = mock(LogSegment.class);
+LogSegment activeSegment = mock(LogSegment.class);
+
+when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset);
+when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset);
+
+// Mock all required segment data to be managed by RLMTask
+FileRecords fileRecords = mock(FileRecords.class);
+when(oldSegment.log(

Re: [PR] KAFKA-15401 | Segment with corrupted index should not be tiered [kafka]

2024-03-06 Thread via GitHub


Nickstery commented on code in PR #15472:
URL: https://github.com/apache/kafka/pull/15472#discussion_r1514277995


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -707,6 +708,8 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws 
InterruptedException
 this.cancel();
 } catch (InterruptedException | RetriableException ex) {
 throw ex;
+} catch (CorruptIndexException ex) {
+logger.error("Error occurred while copying log segments. Index 
appeared to be corrupted for partition: {}  ", topicIdPartition, ex);

Review Comment:
   Agree and fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15401 | Segment with corrupted index should not be tiered [kafka]

2024-03-06 Thread via GitHub


Nickstery commented on code in PR #15472:
URL: https://github.com/apache/kafka/pull/15472#discussion_r1514277995


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -707,6 +708,8 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws 
InterruptedException
 this.cancel();
 } catch (InterruptedException | RetriableException ex) {
 throw ex;
+} catch (CorruptIndexException ex) {
+logger.error("Error occurred while copying log segments. Index 
appeared to be corrupted for partition: {}  ", topicIdPartition, ex);

Review Comment:
   Agree



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15401 | Segment with corrupted index should not be tiered [kafka]

2024-03-06 Thread via GitHub


Nickstery commented on code in PR #15472:
URL: https://github.com/apache/kafka/pull/15472#discussion_r1514266315


##
storage/src/main/java/org/apache/kafka/storage/internals/log/TimeIndex.java:
##
@@ -75,13 +75,14 @@ public void sanityCheck() {
 TimestampOffset entry = lastEntry();
 long lastTimestamp = entry.timestamp;
 long lastOffset = entry.offset;
-if (entries() != 0 && lastTimestamp < timestamp(mmap(), 0))
-throw new CorruptIndexException("Corrupt time index found, time 
index file (" + file().getAbsolutePath() + ") has "
-+ "non-zero size but the last timestamp is " + lastTimestamp + 
" which is less than the first timestamp "
-+ timestamp(mmap(), 0));
+
 if (entries() != 0 && lastOffset < baseOffset())
 throw new CorruptIndexException("Corrupt time index found, time 
index file (" + file().getAbsolutePath() + ") has "
 + "non-zero size but the last offset is " + lastOffset + " 
which is less than the first offset " + baseOffset());
+if (entries() != 0 && lastTimestamp < timestamp(mmap(), 0))

Review Comment:
   In the test we are not mocking mmap(), it is a bit trickier to do so, so 
when I write test to check index corruption I do it by satisfying this
   ```
   if (entries() != 0 && lastOffset < baseOffset())
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15401 | Segment with corrupted index should not be tiered [kafka]

2024-03-05 Thread via GitHub


divijvaidya commented on code in PR #15472:
URL: https://github.com/apache/kafka/pull/15472#discussion_r1513268220


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -707,6 +708,8 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws 
InterruptedException
 this.cancel();
 } catch (InterruptedException | RetriableException ex) {
 throw ex;
+} catch (CorruptIndexException ex) {
+logger.error("Error occurred while copying log segments. Index 
appeared to be corrupted for partition: {}  ", topicIdPartition, ex);

Review Comment:
   I am assuming that the way this error will be monitored is by creating an 
alarm on `RemoteCopyLagSegments` [1]. Is that right?
   
   Can you also please why shouldn't we increment the 
`failedRemoteCopyRequestRate` and `failedRemoteCopyRequestRate` metric that are 
being incremented in catch exception below?
   
   
   
   [1] https://kafka.apache.org/documentation.html#tiered_storage_monitoring




##
storage/src/main/java/org/apache/kafka/storage/internals/log/TimeIndex.java:
##
@@ -75,13 +75,14 @@ public void sanityCheck() {
 TimestampOffset entry = lastEntry();
 long lastTimestamp = entry.timestamp;
 long lastOffset = entry.offset;
-if (entries() != 0 && lastTimestamp < timestamp(mmap(), 0))
-throw new CorruptIndexException("Corrupt time index found, time 
index file (" + file().getAbsolutePath() + ") has "
-+ "non-zero size but the last timestamp is " + lastTimestamp + 
" which is less than the first timestamp "
-+ timestamp(mmap(), 0));
+
 if (entries() != 0 && lastOffset < baseOffset())
 throw new CorruptIndexException("Corrupt time index found, time 
index file (" + file().getAbsolutePath() + ") has "
 + "non-zero size but the last offset is " + lastOffset + " 
which is less than the first offset " + baseOffset());
+if (entries() != 0 && lastTimestamp < timestamp(mmap(), 0))

Review Comment:
   I am assuming that the reason of moving this down is to use the less 
expensive validation first?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15401 | Segment with corrupted index should not be tiered [kafka]

2024-03-05 Thread via GitHub


Nickstery closed pull request #15471: KAFKA-15401 | Segment with corrupted 
index should not be tiered
URL: https://github.com/apache/kafka/pull/15471


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org