Re: [PR] KAFKA-15401 | Segment with corrupted index should not be tiered [kafka]
github-actions[bot] commented on PR #15472: URL: https://github.com/apache/kafka/pull/15472#issuecomment-2148789357 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15401 | Segment with corrupted index should not be tiered [kafka]
divijvaidya commented on code in PR #15472: URL: https://github.com/apache/kafka/pull/15472#discussion_r1514414426 ## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ## @@ -202,6 +203,9 @@ public List read() { private final UnifiedLog mockLog = mock(UnifiedLog.class); +Integer maxEntries = 30; Review Comment: `private final static` please Also, for constants we usually capital snake syntax such as MAX_ENTRIES (same for base offset) ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -722,6 +727,13 @@ private void copyLogSegment(UnifiedLog log, LogSegment segment, long nextSegment File logFile = segment.log().file(); String logFileName = logFile.getName(); +// Corrupted indexes should not be uploaded to remote storage +// Example case: Local storage was filled, what caused index corruption +// We should avoid uploading such segments +segment.timeIndex().sanityCheck(); +segment.offsetIndex().sanityCheck(); +segment.txnIndex().sanityCheck(); + logger.info("Copying {} to remote storage.", logFileName); Review Comment: This should probably be moved before the sanity checks so that during debugging it is easy to understand what segment was being uploaded when index failed. ## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ## @@ -2482,6 +2486,134 @@ int lookupPositionForOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata, l } } +@Test +void testCopyLogSegmentsToRemoteShouldNotCopySegmentWithMissingIndexes() throws Exception { +long segmentStartOffset = 0L; + +// leader epoch preparation +checkpoint.write(totalEpochEntries); +LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint); +when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(0L)); + +// create log segment, with 0 as log start offset +LogSegment segment = mock(LogSegment.class); + +// Segment does not nage timeIndex() what is not acceptable to sanityChecks. +// In that case segment won't be copied. +when(segment.baseOffset()).thenReturn(segmentStartOffset); + +when(mockLog.activeSegment()).thenReturn(segment); +when(mockLog.logStartOffset()).thenReturn(segmentStartOffset); +when(mockLog.logSegments(anyLong(), anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(segment))); +when(mockLog.lastStableOffset()).thenReturn(150L); + +RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128); +task.copyLogSegmentsToRemote(mockLog); + +// verify the remoteLogMetadataManager never add any metadata and remoteStorageManager never copy log segments +// Since segment with index corruption should not be uploaded +verify(remoteLogMetadataManager, never()).addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class)); +verify(remoteStorageManager, never()).copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class)); +verify(remoteLogMetadataManager, never()).updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class)); +} + + +@Test +void testCorruptedTimeIndexes() throws Exception { +// copyLogSegment is executed in case we have more than 1 segment, what is why we create 2 of them +long oldSegmentStartOffset = 0L; +long nextSegmentStartOffset = 150L; + + when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition()); + +// leader epoch preparation +checkpoint.write(totalEpochEntries); +LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint); +when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(0L)); + +File tempFile = TestUtils.tempFile(); +File mockProducerSnapshotIndex = TestUtils.tempFile(); +File tempDir = TestUtils.tempDirectory(); +// create 2 log segments, with 0 and 150 as log start offset +LogSegment oldSegment = mock(LogSegment.class); +LogSegment activeSegment = mock(LogSegment.class); + +when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset); +when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset); + +// Mock all required segment data to be managed by RLMTask +FileRecords fileRecords = mock(FileRecords.class); +when(oldSegment.log(
Re: [PR] KAFKA-15401 | Segment with corrupted index should not be tiered [kafka]
Nickstery commented on code in PR #15472: URL: https://github.com/apache/kafka/pull/15472#discussion_r1514277995 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -707,6 +708,8 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException this.cancel(); } catch (InterruptedException | RetriableException ex) { throw ex; +} catch (CorruptIndexException ex) { +logger.error("Error occurred while copying log segments. Index appeared to be corrupted for partition: {} ", topicIdPartition, ex); Review Comment: Agree and fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15401 | Segment with corrupted index should not be tiered [kafka]
Nickstery commented on code in PR #15472: URL: https://github.com/apache/kafka/pull/15472#discussion_r1514277995 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -707,6 +708,8 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException this.cancel(); } catch (InterruptedException | RetriableException ex) { throw ex; +} catch (CorruptIndexException ex) { +logger.error("Error occurred while copying log segments. Index appeared to be corrupted for partition: {} ", topicIdPartition, ex); Review Comment: Agree -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15401 | Segment with corrupted index should not be tiered [kafka]
Nickstery commented on code in PR #15472: URL: https://github.com/apache/kafka/pull/15472#discussion_r1514266315 ## storage/src/main/java/org/apache/kafka/storage/internals/log/TimeIndex.java: ## @@ -75,13 +75,14 @@ public void sanityCheck() { TimestampOffset entry = lastEntry(); long lastTimestamp = entry.timestamp; long lastOffset = entry.offset; -if (entries() != 0 && lastTimestamp < timestamp(mmap(), 0)) -throw new CorruptIndexException("Corrupt time index found, time index file (" + file().getAbsolutePath() + ") has " -+ "non-zero size but the last timestamp is " + lastTimestamp + " which is less than the first timestamp " -+ timestamp(mmap(), 0)); + if (entries() != 0 && lastOffset < baseOffset()) throw new CorruptIndexException("Corrupt time index found, time index file (" + file().getAbsolutePath() + ") has " + "non-zero size but the last offset is " + lastOffset + " which is less than the first offset " + baseOffset()); +if (entries() != 0 && lastTimestamp < timestamp(mmap(), 0)) Review Comment: In the test we are not mocking mmap(), it is a bit trickier to do so, so when I write test to check index corruption I do it by satisfying this ``` if (entries() != 0 && lastOffset < baseOffset()) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15401 | Segment with corrupted index should not be tiered [kafka]
divijvaidya commented on code in PR #15472: URL: https://github.com/apache/kafka/pull/15472#discussion_r1513268220 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -707,6 +708,8 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException this.cancel(); } catch (InterruptedException | RetriableException ex) { throw ex; +} catch (CorruptIndexException ex) { +logger.error("Error occurred while copying log segments. Index appeared to be corrupted for partition: {} ", topicIdPartition, ex); Review Comment: I am assuming that the way this error will be monitored is by creating an alarm on `RemoteCopyLagSegments` [1]. Is that right? Can you also please why shouldn't we increment the `failedRemoteCopyRequestRate` and `failedRemoteCopyRequestRate` metric that are being incremented in catch exception below? [1] https://kafka.apache.org/documentation.html#tiered_storage_monitoring ## storage/src/main/java/org/apache/kafka/storage/internals/log/TimeIndex.java: ## @@ -75,13 +75,14 @@ public void sanityCheck() { TimestampOffset entry = lastEntry(); long lastTimestamp = entry.timestamp; long lastOffset = entry.offset; -if (entries() != 0 && lastTimestamp < timestamp(mmap(), 0)) -throw new CorruptIndexException("Corrupt time index found, time index file (" + file().getAbsolutePath() + ") has " -+ "non-zero size but the last timestamp is " + lastTimestamp + " which is less than the first timestamp " -+ timestamp(mmap(), 0)); + if (entries() != 0 && lastOffset < baseOffset()) throw new CorruptIndexException("Corrupt time index found, time index file (" + file().getAbsolutePath() + ") has " + "non-zero size but the last offset is " + lastOffset + " which is less than the first offset " + baseOffset()); +if (entries() != 0 && lastTimestamp < timestamp(mmap(), 0)) Review Comment: I am assuming that the reason of moving this down is to use the less expensive validation first? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15401 | Segment with corrupted index should not be tiered [kafka]
Nickstery closed pull request #15471: KAFKA-15401 | Segment with corrupted index should not be tiered URL: https://github.com/apache/kafka/pull/15471 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org