junrao commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1287764297
########## core/src/main/scala/kafka/log/UnifiedLog.scala: ########## @@ -956,6 +981,15 @@ class UnifiedLog(@volatile var logStartOffset: Long, } } + private def maybeIncrementLocalLogStartOffset(newLocalLogStartOffset: Long, reason: LogStartOffsetIncrementReason): Unit = { + lock synchronized { + if (newLocalLogStartOffset > localLogStartOffset()) { + _localLogStartOffset = math.max(newLocalLogStartOffset, localLogStartOffset()); Review Comment: Since newLocalLogStartOffset is larger than localLogStartOffset(), could we just assign newLocalLogStartOffset to _localLogStartOffset? ########## core/src/main/scala/kafka/log/UnifiedLog.scala: ########## @@ -966,7 +1000,8 @@ class UnifiedLog(@volatile var logStartOffset: Long, * @throws OffsetOutOfRangeException if the log start offset is greater than the high watermark * @return true if the log start offset was updated; otherwise false */ - def maybeIncrementLogStartOffset(newLogStartOffset: Long, reason: LogStartOffsetIncrementReason): Boolean = { + def maybeIncrementLogStartOffset(newLogStartOffset: Long, + reason: LogStartOffsetIncrementReason): Boolean = { Review Comment: Identation doesn't match other places in this file. ########## core/src/main/scala/kafka/log/UnifiedLog.scala: ########## @@ -1390,7 +1424,15 @@ class UnifiedLog(@volatile var logStartOffset: Long, private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean, reason: SegmentDeletionReason): Int = { def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): Boolean = { - highWatermark >= nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset) && + val upperBoundOffset = nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset) + + // Check not to delete segments which are not yet copied to tiered storage + val isSegmentTieredToRemoteStorage = Review Comment: Here is a corner case. Let's say remote log is enabled, but there is no remote segment (all have been deleted due to retention). The new logic will do retention based on `localRetentionBytes`, but it should actually do the retention based on `retentionSize`. If that happens, we need to advance logStartOffset, in addition to localLogStartOffset. ########## core/src/main/scala/kafka/log/UnifiedLog.scala: ########## @@ -1464,12 +1513,12 @@ class UnifiedLog(@volatile var logStartOffset: Long, } } - deleteOldSegments(shouldDelete, RetentionSizeBreach(this)) + deleteOldSegments(shouldDelete, RetentionSizeBreach(this, remoteLogEnabled())) } private def deleteLogStartOffsetBreachedSegments(): Int = { def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): Boolean = { - nextSegmentOpt.exists(_.baseOffset <= logStartOffset) + nextSegmentOpt.exists(_.baseOffset <= localLogStartOffset()) Review Comment: This doesn't look right. If remote log is not enabled, it seems that we should delete based on logStartOffset, not localLogStartOffset. ########## core/src/main/scala/kafka/log/UnifiedLog.scala: ########## @@ -1390,7 +1424,15 @@ class UnifiedLog(@volatile var logStartOffset: Long, private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean, reason: SegmentDeletionReason): Int = { def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): Boolean = { - highWatermark >= nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset) && + val upperBoundOffset = nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset) + + // Check not to delete segments which are not yet copied to tiered storage + val isSegmentTieredToRemoteStorage = + if (remoteLogEnabled()) upperBoundOffset > 0 && upperBoundOffset - 1 <= highestOffsetInRemoteStorage + else true Review Comment: Hmm, this should be false, right? Do we have a test case to cover that? ########## core/src/main/scala/kafka/log/UnifiedLog.scala: ########## @@ -147,11 +147,15 @@ class UnifiedLog(@volatile var logStartOffset: Long, def localLogStartOffset(): Long = _localLogStartOffset + // This is the offset(inclusive) until which segments are copied to the remote storage. @volatile private var highestOffsetInRemoteStorage: Long = -1L locally { initializePartitionMetadata() updateLogStartOffset(logStartOffset) + updateLocalLogStartOffset(math.max(logStartOffset, localLog.segments.firstSegmentBaseOffset.getOrElse(0L))) Review Comment: This is an existing issue. But there is one direct reference to `_localLogStartOffset` in `fetchOffsetByTimestamp()`. Should we change that to use `localLogStartOffset()` instead? ########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -696,11 +704,327 @@ public void run() { } } + public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) { + if (isLeader()) { + logger.debug("Updating {} with remoteLogStartOffset: {}", topicPartition, remoteLogStartOffset); + updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset); + } + } + + class RemoteLogRetentionHandler { + + private final Optional<RetentionSizeData> retentionSizeData; + private final Optional<RetentionTimeData> retentionTimeData; + + private long remainingBreachedSize; + + private OptionalLong logStartOffset = OptionalLong.empty(); + + public RemoteLogRetentionHandler(Optional<RetentionSizeData> retentionSizeData, Optional<RetentionTimeData> retentionTimeData) { + this.retentionSizeData = retentionSizeData; + this.retentionTimeData = retentionTimeData; + remainingBreachedSize = retentionSizeData.map(sizeData -> sizeData.remainingBreachedSize).orElse(0L); + } + + private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException { + if (!retentionSizeData.isPresent()) { + return false; + } + + boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> { + // Assumption that segments contain size >= 0 + if (remainingBreachedSize > 0) { + long remainingBytes = remainingBreachedSize - x.segmentSizeInBytes(); + if (remainingBytes >= 0) { + remainingBreachedSize = remainingBytes; + return true; + } + } + + return false; + }); + if (isSegmentDeleted) { + logStartOffset = OptionalLong.of(metadata.endOffset() + 1); + logger.info("Deleted remote log segment {} due to retention size {} breach. Log size after deletion will be {}.", + metadata.remoteLogSegmentId(), retentionSizeData.get().retentionSize, remainingBreachedSize + retentionSizeData.get().retentionSize); + } + return isSegmentDeleted; + } + + public boolean deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata) + throws RemoteStorageException, ExecutionException, InterruptedException { + if (!retentionTimeData.isPresent()) { + return false; + } + + boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, + x -> x.maxTimestampMs() <= retentionTimeData.get().cleanupUntilMs); + if (isSegmentDeleted) { + remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); + // It is fine to have logStartOffset as `metadata.endOffset() + 1` as the segment offset intervals + // are ascending with in an epoch. + logStartOffset = OptionalLong.of(metadata.endOffset() + 1); + logger.info("Deleted remote log segment {} due to retention time {}ms breach based on the largest record timestamp in the segment", + metadata.remoteLogSegmentId(), retentionTimeData.get().retentionMs); + } + return isSegmentDeleted; + } + + private boolean deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long startOffset) + throws RemoteStorageException, ExecutionException, InterruptedException { + boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> startOffset > x.endOffset()); + if (isSegmentDeleted && retentionSizeData.isPresent()) { + remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); + logger.info("Deleted remote log segment {} due to log start offset {} breach", metadata.remoteLogSegmentId(), startOffset); + } + + return isSegmentDeleted; + } + + // It removes the segments beyond the current leader's earliest epoch. Those segments are considered as + // unreferenced because they are not part of the current leader epoch lineage. + private boolean deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException { + boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> + x.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch < earliestEpochEntry.epoch)); + if (isSegmentDeleted) { + logger.info("Deleted remote log segment {} due to leader epoch cache truncation. Current earliest epoch: {}, segmentEndOffset: {} and segmentEpochs: {}", + metadata.remoteLogSegmentId(), earliestEpochEntry, metadata.endOffset(), metadata.segmentLeaderEpochs().keySet()); + } + + // No need to update the log-start-offset as these epochs/offsets are earlier to that value. + return isSegmentDeleted; + } + + private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata segmentMetadata, Predicate<RemoteLogSegmentMetadata> predicate) + throws RemoteStorageException, ExecutionException, InterruptedException { + if (predicate.test(segmentMetadata)) { + logger.info("Deleting remote log segment {}", segmentMetadata.remoteLogSegmentId()); + // Publish delete segment started event. + remoteLogMetadataManager.updateRemoteLogSegmentMetadata( + new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(), + segmentMetadata.customMetadata(), RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)).get(); + + // Delete the segment in remote storage. + remoteLogStorageManager.deleteLogSegmentData(segmentMetadata); + + // Publish delete segment finished event. + remoteLogMetadataManager.updateRemoteLogSegmentMetadata( + new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(), + segmentMetadata.customMetadata(), RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, brokerId)).get(); + logger.info("Deleted remote log segment {}", segmentMetadata.remoteLogSegmentId()); + return true; + } + + return false; + } + + } + + private void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, ExecutionException, InterruptedException { + if (isCancelled() || !isLeader()) { + logger.info("Returning from remote log segments cleanup as the task state is changed"); + return; + } + + // Cleanup remote log segments and update the log start offset if applicable. + final Iterator<RemoteLogSegmentMetadata> segmentMetadataIter = remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition); + if (!segmentMetadataIter.hasNext()) { + logger.debug("No remote log segments available on remote storage for partition: {}", topicIdPartition); + return; + } + + final Optional<UnifiedLog> logOptional = fetchLog.apply(topicIdPartition.topicPartition()); + if (!logOptional.isPresent()) { + logger.debug("No UnifiedLog instance available for partition: {}", topicIdPartition); + return; + } + + final UnifiedLog log = logOptional.get(); + final Option<LeaderEpochFileCache> leaderEpochCacheOption = log.leaderEpochCache(); + if (leaderEpochCacheOption.isEmpty()) { + logger.debug("No leader epoch cache available for partition: {}", topicIdPartition); + return; + } + + final Set<Integer> epochsSet = new HashSet<>(); + // Good to have an API from RLMM to get all the remote leader epochs of all the segments of a partition + // instead of going through all the segments and building it here. + while (segmentMetadataIter.hasNext()) { + RemoteLogSegmentMetadata segmentMetadata = segmentMetadataIter.next(); + epochsSet.addAll(segmentMetadata.segmentLeaderEpochs().keySet()); + } + + // All the leader epochs in sorted order that exists in remote storage + final List<Integer> remoteLeaderEpochs = new ArrayList<>(epochsSet); + Collections.sort(remoteLeaderEpochs); + + LeaderEpochFileCache leaderEpochCache = leaderEpochCacheOption.get(); + NavigableMap<Integer, Long> epochWithOffsets = leaderEpochCache.epochWithOffsets(); + Optional<EpochEntry> earliestEpochEntryOptional = leaderEpochCache.earliestEntry(); + + Optional<RetentionSizeData> retentionSizeData = buildRetentionSizeData(log.config().retentionSize, + log.onlyLocalLogSegmentsSize(), log.logEndOffset(), epochWithOffsets); + Optional<RetentionTimeData> retentionTimeData = buildRetentionTimeData(log.config().retentionMs); + + RemoteLogRetentionHandler remoteLogRetentionHandler = new RemoteLogRetentionHandler(retentionSizeData, retentionTimeData); + Iterator<Integer> epochIterator = epochWithOffsets.navigableKeySet().iterator(); + boolean isSegmentDeleted = true; + while (isSegmentDeleted && epochIterator.hasNext()) { + Integer epoch = epochIterator.next(); + Iterator<RemoteLogSegmentMetadata> segmentsIterator = remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epoch); + while (isSegmentDeleted && segmentsIterator.hasNext()) { + if (isCancelled() || !isLeader()) { + logger.info("Returning from remote log segments cleanup for the remaining segments as the task state is changed."); + return; + } + RemoteLogSegmentMetadata metadata = segmentsIterator.next(); + + // check whether the segment contains the required epoch range with in the current leader epoch lineage. + if (isRemoteSegmentWithinLeaderEpochs(metadata, log.logEndOffset(), epochWithOffsets)) { + isSegmentDeleted = + remoteLogRetentionHandler.deleteRetentionTimeBreachedSegments(metadata) || + remoteLogRetentionHandler.deleteRetentionSizeBreachedSegments(metadata) || + remoteLogRetentionHandler.deleteLogStartOffsetBreachedSegments(metadata, log.logStartOffset()); + } + } + } + + // Remove the remote log segments whose segment-leader-epochs are less than the earliest-epoch known Review Comment: > All the size/time/startOffset handlers run based on the current leader’s leader epochs. Here, we are removing the segments which have leader epochs earlier to the lowest leader epoch on this broker(partition leader). Hmm, I still don't quite understand this part. The leader's epoch chain only gets trimmed from the beginning when segments are deleted due to retention or the advancement of the startOffset by `deleteRecord()` call. These are covered by the size/time based retention and logStartOffset based retention. So what additional cases does the following code cover? ########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -696,11 +704,327 @@ public void run() { } } + public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) { + if (isLeader()) { + logger.debug("Updating {} with remoteLogStartOffset: {}", topicPartition, remoteLogStartOffset); + updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset); + } + } + + class RemoteLogRetentionHandler { + + private final Optional<RetentionSizeData> retentionSizeData; + private final Optional<RetentionTimeData> retentionTimeData; + + private long remainingBreachedSize; + + private OptionalLong logStartOffset = OptionalLong.empty(); + + public RemoteLogRetentionHandler(Optional<RetentionSizeData> retentionSizeData, Optional<RetentionTimeData> retentionTimeData) { + this.retentionSizeData = retentionSizeData; + this.retentionTimeData = retentionTimeData; + remainingBreachedSize = retentionSizeData.map(sizeData -> sizeData.remainingBreachedSize).orElse(0L); + } + + private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException { + if (!retentionSizeData.isPresent()) { + return false; + } + + boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> { + // Assumption that segments contain size >= 0 + if (remainingBreachedSize > 0) { + long remainingBytes = remainingBreachedSize - x.segmentSizeInBytes(); + if (remainingBytes >= 0) { + remainingBreachedSize = remainingBytes; + return true; + } + } + + return false; + }); + if (isSegmentDeleted) { + logStartOffset = OptionalLong.of(metadata.endOffset() + 1); + logger.info("Deleted remote log segment {} due to retention size {} breach. Log size after deletion will be {}.", + metadata.remoteLogSegmentId(), retentionSizeData.get().retentionSize, remainingBreachedSize + retentionSizeData.get().retentionSize); + } + return isSegmentDeleted; + } + + public boolean deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata) + throws RemoteStorageException, ExecutionException, InterruptedException { + if (!retentionTimeData.isPresent()) { + return false; + } + + boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, + x -> x.maxTimestampMs() <= retentionTimeData.get().cleanupUntilMs); + if (isSegmentDeleted) { + remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); + // It is fine to have logStartOffset as `metadata.endOffset() + 1` as the segment offset intervals + // are ascending with in an epoch. + logStartOffset = OptionalLong.of(metadata.endOffset() + 1); + logger.info("Deleted remote log segment {} due to retention time {}ms breach based on the largest record timestamp in the segment", + metadata.remoteLogSegmentId(), retentionTimeData.get().retentionMs); + } + return isSegmentDeleted; + } + + private boolean deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long startOffset) + throws RemoteStorageException, ExecutionException, InterruptedException { + boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> startOffset > x.endOffset()); + if (isSegmentDeleted && retentionSizeData.isPresent()) { + remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); + logger.info("Deleted remote log segment {} due to log start offset {} breach", metadata.remoteLogSegmentId(), startOffset); + } + + return isSegmentDeleted; + } + + // It removes the segments beyond the current leader's earliest epoch. Those segments are considered as + // unreferenced because they are not part of the current leader epoch lineage. + private boolean deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException { + boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> + x.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch < earliestEpochEntry.epoch)); + if (isSegmentDeleted) { + logger.info("Deleted remote log segment {} due to leader epoch cache truncation. Current earliest epoch: {}, segmentEndOffset: {} and segmentEpochs: {}", + metadata.remoteLogSegmentId(), earliestEpochEntry, metadata.endOffset(), metadata.segmentLeaderEpochs().keySet()); + } + + // No need to update the log-start-offset as these epochs/offsets are earlier to that value. + return isSegmentDeleted; + } + + private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata segmentMetadata, Predicate<RemoteLogSegmentMetadata> predicate) + throws RemoteStorageException, ExecutionException, InterruptedException { + if (predicate.test(segmentMetadata)) { + logger.info("Deleting remote log segment {}", segmentMetadata.remoteLogSegmentId()); + // Publish delete segment started event. + remoteLogMetadataManager.updateRemoteLogSegmentMetadata( + new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(), + segmentMetadata.customMetadata(), RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)).get(); + + // Delete the segment in remote storage. + remoteLogStorageManager.deleteLogSegmentData(segmentMetadata); + + // Publish delete segment finished event. + remoteLogMetadataManager.updateRemoteLogSegmentMetadata( + new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(), + segmentMetadata.customMetadata(), RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, brokerId)).get(); + logger.info("Deleted remote log segment {}", segmentMetadata.remoteLogSegmentId()); + return true; + } + + return false; + } + + } + + private void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, ExecutionException, InterruptedException { + if (isCancelled() || !isLeader()) { + logger.info("Returning from remote log segments cleanup as the task state is changed"); + return; + } + + // Cleanup remote log segments and update the log start offset if applicable. + final Iterator<RemoteLogSegmentMetadata> segmentMetadataIter = remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition); + if (!segmentMetadataIter.hasNext()) { + logger.debug("No remote log segments available on remote storage for partition: {}", topicIdPartition); + return; + } + + final Optional<UnifiedLog> logOptional = fetchLog.apply(topicIdPartition.topicPartition()); + if (!logOptional.isPresent()) { + logger.debug("No UnifiedLog instance available for partition: {}", topicIdPartition); + return; + } + + final UnifiedLog log = logOptional.get(); + final Option<LeaderEpochFileCache> leaderEpochCacheOption = log.leaderEpochCache(); + if (leaderEpochCacheOption.isEmpty()) { + logger.debug("No leader epoch cache available for partition: {}", topicIdPartition); + return; + } + + final Set<Integer> epochsSet = new HashSet<>(); + // Good to have an API from RLMM to get all the remote leader epochs of all the segments of a partition + // instead of going through all the segments and building it here. + while (segmentMetadataIter.hasNext()) { + RemoteLogSegmentMetadata segmentMetadata = segmentMetadataIter.next(); + epochsSet.addAll(segmentMetadata.segmentLeaderEpochs().keySet()); + } + + // All the leader epochs in sorted order that exists in remote storage + final List<Integer> remoteLeaderEpochs = new ArrayList<>(epochsSet); + Collections.sort(remoteLeaderEpochs); + + LeaderEpochFileCache leaderEpochCache = leaderEpochCacheOption.get(); + NavigableMap<Integer, Long> epochWithOffsets = leaderEpochCache.epochWithOffsets(); + Optional<EpochEntry> earliestEpochEntryOptional = leaderEpochCache.earliestEntry(); + + Optional<RetentionSizeData> retentionSizeData = buildRetentionSizeData(log.config().retentionSize, + log.onlyLocalLogSegmentsSize(), log.logEndOffset(), epochWithOffsets); + Optional<RetentionTimeData> retentionTimeData = buildRetentionTimeData(log.config().retentionMs); + + RemoteLogRetentionHandler remoteLogRetentionHandler = new RemoteLogRetentionHandler(retentionSizeData, retentionTimeData); + Iterator<Integer> epochIterator = epochWithOffsets.navigableKeySet().iterator(); + boolean isSegmentDeleted = true; + while (isSegmentDeleted && epochIterator.hasNext()) { + Integer epoch = epochIterator.next(); + Iterator<RemoteLogSegmentMetadata> segmentsIterator = remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epoch); + while (isSegmentDeleted && segmentsIterator.hasNext()) { + if (isCancelled() || !isLeader()) { + logger.info("Returning from remote log segments cleanup for the remaining segments as the task state is changed."); + return; + } + RemoteLogSegmentMetadata metadata = segmentsIterator.next(); + + // check whether the segment contains the required epoch range with in the current leader epoch lineage. + if (isRemoteSegmentWithinLeaderEpochs(metadata, log.logEndOffset(), epochWithOffsets)) { + isSegmentDeleted = + remoteLogRetentionHandler.deleteRetentionTimeBreachedSegments(metadata) || + remoteLogRetentionHandler.deleteRetentionSizeBreachedSegments(metadata) || + remoteLogRetentionHandler.deleteLogStartOffsetBreachedSegments(metadata, log.logStartOffset()); + } + } + } + + // Remove the remote log segments whose segment-leader-epochs are less than the earliest-epoch known Review Comment: > All the size/time/startOffset handlers run based on the current leader’s leader epochs. Here, we are removing the segments which have leader epochs earlier to the lowest leader epoch on this broker(partition leader). Hmm, I still don't quite understand this part. The leader's epoch chain only gets trimmed from the beginning when segments are deleted due to retention or the advancement of the startOffset by `deleteRecord()` call. These are covered by the size/time based retention and logStartOffset based retention. So what additional cases does the following code cover? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org