[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
satishd commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1308153773 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -761,11 +784,385 @@ public void run() { } } +public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) { +if (isLeader()) { +logger.debug("Updating {} with remoteLogStartOffset: {}", topicPartition, remoteLogStartOffset); +updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset); +} +} + +class RemoteLogRetentionHandler { + +private final Optional retentionSizeData; +private final Optional retentionTimeData; + +private long remainingBreachedSize; + +private OptionalLong logStartOffset = OptionalLong.empty(); + +public RemoteLogRetentionHandler(Optional retentionSizeData, Optional retentionTimeData) { +this.retentionSizeData = retentionSizeData; +this.retentionTimeData = retentionTimeData; +remainingBreachedSize = retentionSizeData.map(sizeData -> sizeData.remainingBreachedSize).orElse(0L); +} + +private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException { +if (!retentionSizeData.isPresent()) { +return false; +} + +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> { +// Assumption that segments contain size >= 0 +if (remainingBreachedSize > 0) { +long remainingBytes = remainingBreachedSize - x.segmentSizeInBytes(); +if (remainingBytes >= 0) { +remainingBreachedSize = remainingBytes; +return true; +} +} + +return false; +}); +if (isSegmentDeleted) { +logStartOffset = OptionalLong.of(metadata.endOffset() + 1); +logger.info("Deleted remote log segment {} due to retention size {} breach. Log size after deletion will be {}.", +metadata.remoteLogSegmentId(), retentionSizeData.get().retentionSize, remainingBreachedSize + retentionSizeData.get().retentionSize); +} +return isSegmentDeleted; +} + +public boolean deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata) +throws RemoteStorageException, ExecutionException, InterruptedException { +if (!retentionTimeData.isPresent()) { +return false; +} + +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, +x -> x.maxTimestampMs() <= retentionTimeData.get().cleanupUntilMs); +if (isSegmentDeleted) { +remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); +// It is fine to have logStartOffset as `metadata.endOffset() + 1` as the segment offset intervals +// are ascending with in an epoch. +logStartOffset = OptionalLong.of(metadata.endOffset() + 1); +logger.info("Deleted remote log segment {} due to retention time {}ms breach based on the largest record timestamp in the segment", +metadata.remoteLogSegmentId(), retentionTimeData.get().retentionMs); +} +return isSegmentDeleted; +} + +private boolean deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long startOffset) +throws RemoteStorageException, ExecutionException, InterruptedException { +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> startOffset > x.endOffset()); +if (isSegmentDeleted && retentionSizeData.isPresent()) { +remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); +logger.info("Deleted remote log segment {} due to log start offset {} breach", metadata.remoteLogSegmentId(), startOffset); +} + +return isSegmentDeleted; +} + +// It removes the segments beyond the current leader's earliest epoch. Those segments are considered as +// unreferenced because they are not part of the current leader epoch lineage. +private boolean deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, RemoteLogSegmentMetadata metadata) throws RemoteStorageExcept
[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
satishd commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1308153773 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -761,11 +784,385 @@ public void run() { } } +public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) { +if (isLeader()) { +logger.debug("Updating {} with remoteLogStartOffset: {}", topicPartition, remoteLogStartOffset); +updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset); +} +} + +class RemoteLogRetentionHandler { + +private final Optional retentionSizeData; +private final Optional retentionTimeData; + +private long remainingBreachedSize; + +private OptionalLong logStartOffset = OptionalLong.empty(); + +public RemoteLogRetentionHandler(Optional retentionSizeData, Optional retentionTimeData) { +this.retentionSizeData = retentionSizeData; +this.retentionTimeData = retentionTimeData; +remainingBreachedSize = retentionSizeData.map(sizeData -> sizeData.remainingBreachedSize).orElse(0L); +} + +private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException { +if (!retentionSizeData.isPresent()) { +return false; +} + +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> { +// Assumption that segments contain size >= 0 +if (remainingBreachedSize > 0) { +long remainingBytes = remainingBreachedSize - x.segmentSizeInBytes(); +if (remainingBytes >= 0) { +remainingBreachedSize = remainingBytes; +return true; +} +} + +return false; +}); +if (isSegmentDeleted) { +logStartOffset = OptionalLong.of(metadata.endOffset() + 1); +logger.info("Deleted remote log segment {} due to retention size {} breach. Log size after deletion will be {}.", +metadata.remoteLogSegmentId(), retentionSizeData.get().retentionSize, remainingBreachedSize + retentionSizeData.get().retentionSize); +} +return isSegmentDeleted; +} + +public boolean deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata) +throws RemoteStorageException, ExecutionException, InterruptedException { +if (!retentionTimeData.isPresent()) { +return false; +} + +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, +x -> x.maxTimestampMs() <= retentionTimeData.get().cleanupUntilMs); +if (isSegmentDeleted) { +remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); +// It is fine to have logStartOffset as `metadata.endOffset() + 1` as the segment offset intervals +// are ascending with in an epoch. +logStartOffset = OptionalLong.of(metadata.endOffset() + 1); +logger.info("Deleted remote log segment {} due to retention time {}ms breach based on the largest record timestamp in the segment", +metadata.remoteLogSegmentId(), retentionTimeData.get().retentionMs); +} +return isSegmentDeleted; +} + +private boolean deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long startOffset) +throws RemoteStorageException, ExecutionException, InterruptedException { +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> startOffset > x.endOffset()); +if (isSegmentDeleted && retentionSizeData.isPresent()) { +remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); +logger.info("Deleted remote log segment {} due to log start offset {} breach", metadata.remoteLogSegmentId(), startOffset); +} + +return isSegmentDeleted; +} + +// It removes the segments beyond the current leader's earliest epoch. Those segments are considered as +// unreferenced because they are not part of the current leader epoch lineage. +private boolean deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, RemoteLogSegmentMetadata metadata) throws RemoteStorageExcept
[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
satishd commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1304597735 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -696,11 +704,327 @@ public void run() { } } +public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) { +if (isLeader()) { +logger.debug("Updating {} with remoteLogStartOffset: {}", topicPartition, remoteLogStartOffset); +updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset); +} +} + +class RemoteLogRetentionHandler { + +private final Optional retentionSizeData; +private final Optional retentionTimeData; + +private long remainingBreachedSize; + +private OptionalLong logStartOffset = OptionalLong.empty(); + +public RemoteLogRetentionHandler(Optional retentionSizeData, Optional retentionTimeData) { +this.retentionSizeData = retentionSizeData; +this.retentionTimeData = retentionTimeData; +remainingBreachedSize = retentionSizeData.map(sizeData -> sizeData.remainingBreachedSize).orElse(0L); +} + +private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException { +if (!retentionSizeData.isPresent()) { +return false; +} + +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> { +// Assumption that segments contain size >= 0 +if (remainingBreachedSize > 0) { +long remainingBytes = remainingBreachedSize - x.segmentSizeInBytes(); +if (remainingBytes >= 0) { +remainingBreachedSize = remainingBytes; +return true; +} +} + +return false; +}); +if (isSegmentDeleted) { +logStartOffset = OptionalLong.of(metadata.endOffset() + 1); +logger.info("Deleted remote log segment {} due to retention size {} breach. Log size after deletion will be {}.", +metadata.remoteLogSegmentId(), retentionSizeData.get().retentionSize, remainingBreachedSize + retentionSizeData.get().retentionSize); +} +return isSegmentDeleted; +} + +public boolean deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata) +throws RemoteStorageException, ExecutionException, InterruptedException { +if (!retentionTimeData.isPresent()) { +return false; +} + +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, +x -> x.maxTimestampMs() <= retentionTimeData.get().cleanupUntilMs); +if (isSegmentDeleted) { +remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); +// It is fine to have logStartOffset as `metadata.endOffset() + 1` as the segment offset intervals +// are ascending with in an epoch. +logStartOffset = OptionalLong.of(metadata.endOffset() + 1); +logger.info("Deleted remote log segment {} due to retention time {}ms breach based on the largest record timestamp in the segment", +metadata.remoteLogSegmentId(), retentionTimeData.get().retentionMs); +} +return isSegmentDeleted; +} + +private boolean deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long startOffset) +throws RemoteStorageException, ExecutionException, InterruptedException { +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> startOffset > x.endOffset()); +if (isSegmentDeleted && retentionSizeData.isPresent()) { +remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); +logger.info("Deleted remote log segment {} due to log start offset {} breach", metadata.remoteLogSegmentId(), startOffset); +} + +return isSegmentDeleted; +} + +// It removes the segments beyond the current leader's earliest epoch. Those segments are considered as +// unreferenced because they are not part of the current leader epoch lineage. +private boolean deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, RemoteLogSegmentMetadata metadata) throws RemoteStorageExcept
[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
satishd commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1301361619 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -1096,6 +1493,43 @@ public void close() { } } +// Visible for testing +public static class RetentionSizeData { +private final long retentionSize; +private final long remainingBreachedSize; + +public RetentionSizeData(long retentionSize, long remainingBreachedSize) { +if (retentionSize < 0) +throw new IllegalArgumentException("retentionSize should be non negative, but it is " + retentionSize); + +if (remainingBreachedSize <= 0) { +throw new IllegalArgumentException("remainingBreachedSize should be more than zero, but it is " + remainingBreachedSize); +} + +this.retentionSize = retentionSize; +this.remainingBreachedSize = remainingBreachedSize; +} +} + +// Visible for testing +public static class RetentionTimeData { + +private final long retentionMs; +private final long cleanupUntilMs; + +public RetentionTimeData(long retentionMs, long cleanupUntilMs) { +if (retentionMs < 0) +throw new IllegalArgumentException("retentionMs should be non negative, but it is " + retentionMs); + +if (retentionMs < cleanupUntilMs) { Review Comment: Right, I fixed the validation check. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
satishd commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1300176481 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -761,11 +784,385 @@ public void run() { } } +public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) { +if (isLeader()) { +logger.debug("Updating {} with remoteLogStartOffset: {}", topicPartition, remoteLogStartOffset); +updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset); +} +} + +class RemoteLogRetentionHandler { + +private final Optional retentionSizeData; +private final Optional retentionTimeData; + +private long remainingBreachedSize; + +private OptionalLong logStartOffset = OptionalLong.empty(); + +public RemoteLogRetentionHandler(Optional retentionSizeData, Optional retentionTimeData) { +this.retentionSizeData = retentionSizeData; +this.retentionTimeData = retentionTimeData; +remainingBreachedSize = retentionSizeData.map(sizeData -> sizeData.remainingBreachedSize).orElse(0L); +} + +private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException { +if (!retentionSizeData.isPresent()) { +return false; +} + +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> { +// Assumption that segments contain size >= 0 +if (remainingBreachedSize > 0) { +long remainingBytes = remainingBreachedSize - x.segmentSizeInBytes(); +if (remainingBytes >= 0) { +remainingBreachedSize = remainingBytes; +return true; +} +} + +return false; +}); +if (isSegmentDeleted) { +logStartOffset = OptionalLong.of(metadata.endOffset() + 1); Review Comment: Good point. I think we discussed this earlier also. Let us address this in a followup PR covering topics changing their retention from compaction to delete only retention. Filed https://issues.apache.org/jira/browse/KAFKA-15388 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
satishd commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1300279331 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -761,11 +784,385 @@ public void run() { } } +public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) { +if (isLeader()) { +logger.debug("Updating {} with remoteLogStartOffset: {}", topicPartition, remoteLogStartOffset); +updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset); +} +} + +class RemoteLogRetentionHandler { + +private final Optional retentionSizeData; +private final Optional retentionTimeData; + +private long remainingBreachedSize; + +private OptionalLong logStartOffset = OptionalLong.empty(); + +public RemoteLogRetentionHandler(Optional retentionSizeData, Optional retentionTimeData) { +this.retentionSizeData = retentionSizeData; +this.retentionTimeData = retentionTimeData; +remainingBreachedSize = retentionSizeData.map(sizeData -> sizeData.remainingBreachedSize).orElse(0L); +} + +private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException { +if (!retentionSizeData.isPresent()) { +return false; +} + +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> { +// Assumption that segments contain size >= 0 +if (remainingBreachedSize > 0) { +long remainingBytes = remainingBreachedSize - x.segmentSizeInBytes(); +if (remainingBytes >= 0) { +remainingBreachedSize = remainingBytes; +return true; +} +} + +return false; +}); +if (isSegmentDeleted) { +logStartOffset = OptionalLong.of(metadata.endOffset() + 1); Review Comment: Yes, this is planned for 3.6.0. I did not want to block this PR with that as we want to unblock other dependent PRs, especially integration test PRs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
satishd commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1300178242 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -1096,6 +1493,43 @@ public void close() { } } +// Visible for testing +public static class RetentionSizeData { +private final long retentionSize; +private final long remainingBreachedSize; + +public RetentionSizeData(long retentionSize, long remainingBreachedSize) { +if (retentionSize < 0) +throw new IllegalArgumentException("retentionSize should be non negative, but it is " + retentionSize); + +if (remainingBreachedSize <= 0) { +throw new IllegalArgumentException("remainingBreachedSize should be more than zero, but it is " + remainingBreachedSize); +} + +this.retentionSize = retentionSize; +this.remainingBreachedSize = remainingBreachedSize; +} +} + +// Visible for testing +public static class RetentionTimeData { + +private final long retentionMs; +private final long cleanupUntilMs; + +public RetentionTimeData(long retentionMs, long cleanupUntilMs) { +if (retentionMs < 0) +throw new IllegalArgumentException("retentionMs should be non negative, but it is " + retentionMs); + +if (retentionMs < cleanupUntilMs) { Review Comment: This check will be true when using system time. But added this defensive check if we have tests setting the mock time to set any long values. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
satishd commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1300176481 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -761,11 +784,385 @@ public void run() { } } +public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) { +if (isLeader()) { +logger.debug("Updating {} with remoteLogStartOffset: {}", topicPartition, remoteLogStartOffset); +updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset); +} +} + +class RemoteLogRetentionHandler { + +private final Optional retentionSizeData; +private final Optional retentionTimeData; + +private long remainingBreachedSize; + +private OptionalLong logStartOffset = OptionalLong.empty(); + +public RemoteLogRetentionHandler(Optional retentionSizeData, Optional retentionTimeData) { +this.retentionSizeData = retentionSizeData; +this.retentionTimeData = retentionTimeData; +remainingBreachedSize = retentionSizeData.map(sizeData -> sizeData.remainingBreachedSize).orElse(0L); +} + +private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException { +if (!retentionSizeData.isPresent()) { +return false; +} + +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> { +// Assumption that segments contain size >= 0 +if (remainingBreachedSize > 0) { +long remainingBytes = remainingBreachedSize - x.segmentSizeInBytes(); +if (remainingBytes >= 0) { +remainingBreachedSize = remainingBytes; +return true; +} +} + +return false; +}); +if (isSegmentDeleted) { +logStartOffset = OptionalLong.of(metadata.endOffset() + 1); Review Comment: Good point. I think we discussed this earlier scenario. Let us address this in a followup PR covering topics changing their retention from compaction to delete only retention. Filed https://issues.apache.org/jira/browse/KAFKA-15388 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
satishd commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1299640804 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -696,11 +704,327 @@ public void run() { } } +public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) { +if (isLeader()) { +logger.debug("Updating {} with remoteLogStartOffset: {}", topicPartition, remoteLogStartOffset); +updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset); +} +} + +class RemoteLogRetentionHandler { + +private final Optional retentionSizeData; +private final Optional retentionTimeData; + +private long remainingBreachedSize; + +private OptionalLong logStartOffset = OptionalLong.empty(); + +public RemoteLogRetentionHandler(Optional retentionSizeData, Optional retentionTimeData) { +this.retentionSizeData = retentionSizeData; +this.retentionTimeData = retentionTimeData; +remainingBreachedSize = retentionSizeData.map(sizeData -> sizeData.remainingBreachedSize).orElse(0L); +} + +private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException { +if (!retentionSizeData.isPresent()) { +return false; +} + +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> { +// Assumption that segments contain size >= 0 +if (remainingBreachedSize > 0) { +long remainingBytes = remainingBreachedSize - x.segmentSizeInBytes(); +if (remainingBytes >= 0) { +remainingBreachedSize = remainingBytes; +return true; +} +} + +return false; +}); +if (isSegmentDeleted) { +logStartOffset = OptionalLong.of(metadata.endOffset() + 1); +logger.info("Deleted remote log segment {} due to retention size {} breach. Log size after deletion will be {}.", +metadata.remoteLogSegmentId(), retentionSizeData.get().retentionSize, remainingBreachedSize + retentionSizeData.get().retentionSize); +} +return isSegmentDeleted; +} + +public boolean deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata) +throws RemoteStorageException, ExecutionException, InterruptedException { +if (!retentionTimeData.isPresent()) { +return false; +} + +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, +x -> x.maxTimestampMs() <= retentionTimeData.get().cleanupUntilMs); +if (isSegmentDeleted) { +remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); +// It is fine to have logStartOffset as `metadata.endOffset() + 1` as the segment offset intervals +// are ascending with in an epoch. +logStartOffset = OptionalLong.of(metadata.endOffset() + 1); +logger.info("Deleted remote log segment {} due to retention time {}ms breach based on the largest record timestamp in the segment", +metadata.remoteLogSegmentId(), retentionTimeData.get().retentionMs); +} +return isSegmentDeleted; +} + +private boolean deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long startOffset) +throws RemoteStorageException, ExecutionException, InterruptedException { +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> startOffset > x.endOffset()); +if (isSegmentDeleted && retentionSizeData.isPresent()) { +remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); +logger.info("Deleted remote log segment {} due to log start offset {} breach", metadata.remoteLogSegmentId(), startOffset); +} + +return isSegmentDeleted; +} + +// It removes the segments beyond the current leader's earliest epoch. Those segments are considered as +// unreferenced because they are not part of the current leader epoch lineage. +private boolean deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, RemoteLogSegmentMetadata metadata) throws RemoteStorageExcept
[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
satishd commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1299185257 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -698,11 +707,329 @@ public void run() { } } +public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) { +if (isLeader()) { +logger.debug("Updating {} with remoteLogStartOffset: {}", topicPartition, remoteLogStartOffset); +updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset); +} +} + +class RemoteLogRetentionHandler { + +private final Optional retentionSizeData; +private final Optional retentionTimeData; + +private long remainingBreachedSize; + +private OptionalLong logStartOffset = OptionalLong.empty(); + +public RemoteLogRetentionHandler(Optional retentionSizeData, Optional retentionTimeData) { +this.retentionSizeData = retentionSizeData; +this.retentionTimeData = retentionTimeData; +remainingBreachedSize = retentionSizeData.map(sizeData -> sizeData.remainingBreachedSize).orElse(0L); +} + +private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException { +if (!retentionSizeData.isPresent()) { +return false; +} + +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> { +// Assumption that segments contain size >= 0 +if (remainingBreachedSize > 0) { +long remainingBytes = remainingBreachedSize - x.segmentSizeInBytes(); +if (remainingBytes >= 0) { +remainingBreachedSize = remainingBytes; +return true; +} +} + +return false; +}); +if (isSegmentDeleted) { +logStartOffset = OptionalLong.of(metadata.endOffset() + 1); +logger.info("Deleted remote log segment {} due to retention size {} breach. Log size after deletion will be {}.", +metadata.remoteLogSegmentId(), retentionSizeData.get().retentionSize, remainingBreachedSize + retentionSizeData.get().retentionSize); +} +return isSegmentDeleted; +} + +public boolean deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata) +throws RemoteStorageException, ExecutionException, InterruptedException { +if (!retentionTimeData.isPresent()) { +return false; +} + +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, +x -> x.maxTimestampMs() <= retentionTimeData.get().cleanupUntilMs); +if (isSegmentDeleted) { +remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); +// It is fine to have logStartOffset as `metadata.endOffset() + 1` as the segment offset intervals +// are ascending with in an epoch. +logStartOffset = OptionalLong.of(metadata.endOffset() + 1); +logger.info("Deleted remote log segment {} due to retention time {}ms breach based on the largest record timestamp in the segment", +metadata.remoteLogSegmentId(), retentionTimeData.get().retentionMs); +} +return isSegmentDeleted; +} + +private boolean deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long startOffset) +throws RemoteStorageException, ExecutionException, InterruptedException { +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> startOffset > x.endOffset()); +if (isSegmentDeleted && retentionSizeData.isPresent()) { +remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); +logger.info("Deleted remote log segment {} due to log start offset {} breach", metadata.remoteLogSegmentId(), startOffset); +} + +return isSegmentDeleted; +} + +// It removes the segments beyond the current leader's earliest epoch. Those segments are considered as +// unreferenced because they are not part of the current leader epoch lineage. +private boolean deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, RemoteLogSegmentMetadata metadata) throws RemoteStorageExcept
[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
satishd commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1299184981 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -696,11 +704,327 @@ public void run() { } } +public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) { +if (isLeader()) { +logger.debug("Updating {} with remoteLogStartOffset: {}", topicPartition, remoteLogStartOffset); +updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset); +} +} + +class RemoteLogRetentionHandler { + +private final Optional retentionSizeData; +private final Optional retentionTimeData; + +private long remainingBreachedSize; + +private OptionalLong logStartOffset = OptionalLong.empty(); + +public RemoteLogRetentionHandler(Optional retentionSizeData, Optional retentionTimeData) { +this.retentionSizeData = retentionSizeData; +this.retentionTimeData = retentionTimeData; +remainingBreachedSize = retentionSizeData.map(sizeData -> sizeData.remainingBreachedSize).orElse(0L); +} + +private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException { +if (!retentionSizeData.isPresent()) { +return false; +} + +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> { +// Assumption that segments contain size >= 0 +if (remainingBreachedSize > 0) { +long remainingBytes = remainingBreachedSize - x.segmentSizeInBytes(); +if (remainingBytes >= 0) { +remainingBreachedSize = remainingBytes; +return true; +} +} + +return false; +}); +if (isSegmentDeleted) { +logStartOffset = OptionalLong.of(metadata.endOffset() + 1); +logger.info("Deleted remote log segment {} due to retention size {} breach. Log size after deletion will be {}.", +metadata.remoteLogSegmentId(), retentionSizeData.get().retentionSize, remainingBreachedSize + retentionSizeData.get().retentionSize); +} +return isSegmentDeleted; +} + +public boolean deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata) +throws RemoteStorageException, ExecutionException, InterruptedException { +if (!retentionTimeData.isPresent()) { +return false; +} + +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, +x -> x.maxTimestampMs() <= retentionTimeData.get().cleanupUntilMs); +if (isSegmentDeleted) { +remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); +// It is fine to have logStartOffset as `metadata.endOffset() + 1` as the segment offset intervals +// are ascending with in an epoch. +logStartOffset = OptionalLong.of(metadata.endOffset() + 1); +logger.info("Deleted remote log segment {} due to retention time {}ms breach based on the largest record timestamp in the segment", +metadata.remoteLogSegmentId(), retentionTimeData.get().retentionMs); +} +return isSegmentDeleted; +} + +private boolean deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long startOffset) +throws RemoteStorageException, ExecutionException, InterruptedException { +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> startOffset > x.endOffset()); +if (isSegmentDeleted && retentionSizeData.isPresent()) { +remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); +logger.info("Deleted remote log segment {} due to log start offset {} breach", metadata.remoteLogSegmentId(), startOffset); +} + +return isSegmentDeleted; +} + +// It removes the segments beyond the current leader's earliest epoch. Those segments are considered as +// unreferenced because they are not part of the current leader epoch lineage. +private boolean deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, RemoteLogSegmentMetadata metadata) throws RemoteStorageExcept
[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
satishd commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1298027732 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -696,11 +704,327 @@ public void run() { } } +public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) { +if (isLeader()) { +logger.debug("Updating {} with remoteLogStartOffset: {}", topicPartition, remoteLogStartOffset); +updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset); +} +} + +class RemoteLogRetentionHandler { + +private final Optional retentionSizeData; +private final Optional retentionTimeData; + +private long remainingBreachedSize; + +private OptionalLong logStartOffset = OptionalLong.empty(); + +public RemoteLogRetentionHandler(Optional retentionSizeData, Optional retentionTimeData) { +this.retentionSizeData = retentionSizeData; +this.retentionTimeData = retentionTimeData; +remainingBreachedSize = retentionSizeData.map(sizeData -> sizeData.remainingBreachedSize).orElse(0L); +} + +private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException { +if (!retentionSizeData.isPresent()) { +return false; +} + +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> { +// Assumption that segments contain size >= 0 +if (remainingBreachedSize > 0) { +long remainingBytes = remainingBreachedSize - x.segmentSizeInBytes(); +if (remainingBytes >= 0) { +remainingBreachedSize = remainingBytes; +return true; +} +} + +return false; +}); +if (isSegmentDeleted) { +logStartOffset = OptionalLong.of(metadata.endOffset() + 1); +logger.info("Deleted remote log segment {} due to retention size {} breach. Log size after deletion will be {}.", +metadata.remoteLogSegmentId(), retentionSizeData.get().retentionSize, remainingBreachedSize + retentionSizeData.get().retentionSize); +} +return isSegmentDeleted; +} + +public boolean deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata) +throws RemoteStorageException, ExecutionException, InterruptedException { +if (!retentionTimeData.isPresent()) { +return false; +} + +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, +x -> x.maxTimestampMs() <= retentionTimeData.get().cleanupUntilMs); +if (isSegmentDeleted) { +remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); +// It is fine to have logStartOffset as `metadata.endOffset() + 1` as the segment offset intervals +// are ascending with in an epoch. +logStartOffset = OptionalLong.of(metadata.endOffset() + 1); +logger.info("Deleted remote log segment {} due to retention time {}ms breach based on the largest record timestamp in the segment", +metadata.remoteLogSegmentId(), retentionTimeData.get().retentionMs); +} +return isSegmentDeleted; +} + +private boolean deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long startOffset) +throws RemoteStorageException, ExecutionException, InterruptedException { +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> startOffset > x.endOffset()); +if (isSegmentDeleted && retentionSizeData.isPresent()) { +remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); +logger.info("Deleted remote log segment {} due to log start offset {} breach", metadata.remoteLogSegmentId(), startOffset); +} + +return isSegmentDeleted; +} + +// It removes the segments beyond the current leader's earliest epoch. Those segments are considered as +// unreferenced because they are not part of the current leader epoch lineage. +private boolean deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, RemoteLogSegmentMetadata metadata) throws RemoteStorageExcept
[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
satishd commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1298029817 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -696,11 +704,327 @@ public void run() { } } +public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) { +if (isLeader()) { +logger.debug("Updating {} with remoteLogStartOffset: {}", topicPartition, remoteLogStartOffset); +updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset); +} +} + +class RemoteLogRetentionHandler { + +private final Optional retentionSizeData; +private final Optional retentionTimeData; + +private long remainingBreachedSize; + +private OptionalLong logStartOffset = OptionalLong.empty(); + +public RemoteLogRetentionHandler(Optional retentionSizeData, Optional retentionTimeData) { +this.retentionSizeData = retentionSizeData; +this.retentionTimeData = retentionTimeData; +remainingBreachedSize = retentionSizeData.map(sizeData -> sizeData.remainingBreachedSize).orElse(0L); +} + +private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException { +if (!retentionSizeData.isPresent()) { +return false; +} + +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> { +// Assumption that segments contain size >= 0 +if (remainingBreachedSize > 0) { +long remainingBytes = remainingBreachedSize - x.segmentSizeInBytes(); +if (remainingBytes >= 0) { +remainingBreachedSize = remainingBytes; +return true; +} +} + +return false; +}); +if (isSegmentDeleted) { +logStartOffset = OptionalLong.of(metadata.endOffset() + 1); +logger.info("Deleted remote log segment {} due to retention size {} breach. Log size after deletion will be {}.", +metadata.remoteLogSegmentId(), retentionSizeData.get().retentionSize, remainingBreachedSize + retentionSizeData.get().retentionSize); +} +return isSegmentDeleted; +} + +public boolean deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata) +throws RemoteStorageException, ExecutionException, InterruptedException { +if (!retentionTimeData.isPresent()) { +return false; +} + +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, +x -> x.maxTimestampMs() <= retentionTimeData.get().cleanupUntilMs); +if (isSegmentDeleted) { +remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); +// It is fine to have logStartOffset as `metadata.endOffset() + 1` as the segment offset intervals +// are ascending with in an epoch. +logStartOffset = OptionalLong.of(metadata.endOffset() + 1); +logger.info("Deleted remote log segment {} due to retention time {}ms breach based on the largest record timestamp in the segment", +metadata.remoteLogSegmentId(), retentionTimeData.get().retentionMs); +} +return isSegmentDeleted; +} + +private boolean deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long startOffset) +throws RemoteStorageException, ExecutionException, InterruptedException { +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> startOffset > x.endOffset()); +if (isSegmentDeleted && retentionSizeData.isPresent()) { +remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); +logger.info("Deleted remote log segment {} due to log start offset {} breach", metadata.remoteLogSegmentId(), startOffset); +} + +return isSegmentDeleted; +} + +// It removes the segments beyond the current leader's earliest epoch. Those segments are considered as +// unreferenced because they are not part of the current leader epoch lineage. +private boolean deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, RemoteLogSegmentMetadata metadata) throws RemoteStorageExcept
[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
satishd commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1298027732 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -696,11 +704,327 @@ public void run() { } } +public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) { +if (isLeader()) { +logger.debug("Updating {} with remoteLogStartOffset: {}", topicPartition, remoteLogStartOffset); +updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset); +} +} + +class RemoteLogRetentionHandler { + +private final Optional retentionSizeData; +private final Optional retentionTimeData; + +private long remainingBreachedSize; + +private OptionalLong logStartOffset = OptionalLong.empty(); + +public RemoteLogRetentionHandler(Optional retentionSizeData, Optional retentionTimeData) { +this.retentionSizeData = retentionSizeData; +this.retentionTimeData = retentionTimeData; +remainingBreachedSize = retentionSizeData.map(sizeData -> sizeData.remainingBreachedSize).orElse(0L); +} + +private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException { +if (!retentionSizeData.isPresent()) { +return false; +} + +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> { +// Assumption that segments contain size >= 0 +if (remainingBreachedSize > 0) { +long remainingBytes = remainingBreachedSize - x.segmentSizeInBytes(); +if (remainingBytes >= 0) { +remainingBreachedSize = remainingBytes; +return true; +} +} + +return false; +}); +if (isSegmentDeleted) { +logStartOffset = OptionalLong.of(metadata.endOffset() + 1); +logger.info("Deleted remote log segment {} due to retention size {} breach. Log size after deletion will be {}.", +metadata.remoteLogSegmentId(), retentionSizeData.get().retentionSize, remainingBreachedSize + retentionSizeData.get().retentionSize); +} +return isSegmentDeleted; +} + +public boolean deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata) +throws RemoteStorageException, ExecutionException, InterruptedException { +if (!retentionTimeData.isPresent()) { +return false; +} + +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, +x -> x.maxTimestampMs() <= retentionTimeData.get().cleanupUntilMs); +if (isSegmentDeleted) { +remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); +// It is fine to have logStartOffset as `metadata.endOffset() + 1` as the segment offset intervals +// are ascending with in an epoch. +logStartOffset = OptionalLong.of(metadata.endOffset() + 1); +logger.info("Deleted remote log segment {} due to retention time {}ms breach based on the largest record timestamp in the segment", +metadata.remoteLogSegmentId(), retentionTimeData.get().retentionMs); +} +return isSegmentDeleted; +} + +private boolean deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long startOffset) +throws RemoteStorageException, ExecutionException, InterruptedException { +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> startOffset > x.endOffset()); +if (isSegmentDeleted && retentionSizeData.isPresent()) { +remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); +logger.info("Deleted remote log segment {} due to log start offset {} breach", metadata.remoteLogSegmentId(), startOffset); +} + +return isSegmentDeleted; +} + +// It removes the segments beyond the current leader's earliest epoch. Those segments are considered as +// unreferenced because they are not part of the current leader epoch lineage. +private boolean deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, RemoteLogSegmentMetadata metadata) throws RemoteStorageExcept
[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
satishd commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1296732476 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -696,11 +704,327 @@ public void run() { } } +public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) { +if (isLeader()) { +logger.debug("Updating {} with remoteLogStartOffset: {}", topicPartition, remoteLogStartOffset); +updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset); +} +} + +class RemoteLogRetentionHandler { + +private final Optional retentionSizeData; +private final Optional retentionTimeData; + +private long remainingBreachedSize; + +private OptionalLong logStartOffset = OptionalLong.empty(); + +public RemoteLogRetentionHandler(Optional retentionSizeData, Optional retentionTimeData) { +this.retentionSizeData = retentionSizeData; +this.retentionTimeData = retentionTimeData; +remainingBreachedSize = retentionSizeData.map(sizeData -> sizeData.remainingBreachedSize).orElse(0L); +} + +private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException { +if (!retentionSizeData.isPresent()) { +return false; +} + +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> { +// Assumption that segments contain size >= 0 +if (remainingBreachedSize > 0) { +long remainingBytes = remainingBreachedSize - x.segmentSizeInBytes(); +if (remainingBytes >= 0) { +remainingBreachedSize = remainingBytes; +return true; +} +} + +return false; +}); +if (isSegmentDeleted) { +logStartOffset = OptionalLong.of(metadata.endOffset() + 1); +logger.info("Deleted remote log segment {} due to retention size {} breach. Log size after deletion will be {}.", +metadata.remoteLogSegmentId(), retentionSizeData.get().retentionSize, remainingBreachedSize + retentionSizeData.get().retentionSize); +} +return isSegmentDeleted; +} + +public boolean deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata) +throws RemoteStorageException, ExecutionException, InterruptedException { +if (!retentionTimeData.isPresent()) { +return false; +} + +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, +x -> x.maxTimestampMs() <= retentionTimeData.get().cleanupUntilMs); +if (isSegmentDeleted) { +remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); +// It is fine to have logStartOffset as `metadata.endOffset() + 1` as the segment offset intervals +// are ascending with in an epoch. +logStartOffset = OptionalLong.of(metadata.endOffset() + 1); +logger.info("Deleted remote log segment {} due to retention time {}ms breach based on the largest record timestamp in the segment", +metadata.remoteLogSegmentId(), retentionTimeData.get().retentionMs); +} +return isSegmentDeleted; +} + +private boolean deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long startOffset) +throws RemoteStorageException, ExecutionException, InterruptedException { +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> startOffset > x.endOffset()); +if (isSegmentDeleted && retentionSizeData.isPresent()) { +remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); +logger.info("Deleted remote log segment {} due to log start offset {} breach", metadata.remoteLogSegmentId(), startOffset); +} + +return isSegmentDeleted; +} + +// It removes the segments beyond the current leader's earliest epoch. Those segments are considered as +// unreferenced because they are not part of the current leader epoch lineage. +private boolean deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, RemoteLogSegmentMetadata metadata) throws RemoteStorageExcept
[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
satishd commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1296731885 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -696,11 +704,327 @@ public void run() { } } +public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) { +if (isLeader()) { +logger.debug("Updating {} with remoteLogStartOffset: {}", topicPartition, remoteLogStartOffset); +updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset); +} +} + +class RemoteLogRetentionHandler { + +private final Optional retentionSizeData; +private final Optional retentionTimeData; + +private long remainingBreachedSize; + +private OptionalLong logStartOffset = OptionalLong.empty(); + +public RemoteLogRetentionHandler(Optional retentionSizeData, Optional retentionTimeData) { +this.retentionSizeData = retentionSizeData; +this.retentionTimeData = retentionTimeData; +remainingBreachedSize = retentionSizeData.map(sizeData -> sizeData.remainingBreachedSize).orElse(0L); +} + +private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException { +if (!retentionSizeData.isPresent()) { +return false; +} + +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> { +// Assumption that segments contain size >= 0 +if (remainingBreachedSize > 0) { +long remainingBytes = remainingBreachedSize - x.segmentSizeInBytes(); +if (remainingBytes >= 0) { +remainingBreachedSize = remainingBytes; +return true; +} +} + +return false; +}); +if (isSegmentDeleted) { +logStartOffset = OptionalLong.of(metadata.endOffset() + 1); +logger.info("Deleted remote log segment {} due to retention size {} breach. Log size after deletion will be {}.", +metadata.remoteLogSegmentId(), retentionSizeData.get().retentionSize, remainingBreachedSize + retentionSizeData.get().retentionSize); +} +return isSegmentDeleted; +} + +public boolean deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata) +throws RemoteStorageException, ExecutionException, InterruptedException { +if (!retentionTimeData.isPresent()) { +return false; +} + +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, +x -> x.maxTimestampMs() <= retentionTimeData.get().cleanupUntilMs); +if (isSegmentDeleted) { +remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); +// It is fine to have logStartOffset as `metadata.endOffset() + 1` as the segment offset intervals +// are ascending with in an epoch. +logStartOffset = OptionalLong.of(metadata.endOffset() + 1); +logger.info("Deleted remote log segment {} due to retention time {}ms breach based on the largest record timestamp in the segment", +metadata.remoteLogSegmentId(), retentionTimeData.get().retentionMs); +} +return isSegmentDeleted; +} + +private boolean deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long startOffset) +throws RemoteStorageException, ExecutionException, InterruptedException { +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> startOffset > x.endOffset()); +if (isSegmentDeleted && retentionSizeData.isPresent()) { +remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); +logger.info("Deleted remote log segment {} due to log start offset {} breach", metadata.remoteLogSegmentId(), startOffset); +} + +return isSegmentDeleted; +} + +// It removes the segments beyond the current leader's earliest epoch. Those segments are considered as +// unreferenced because they are not part of the current leader epoch lineage. +private boolean deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, RemoteLogSegmentMetadata metadata) throws RemoteStorageExcept
[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
satishd commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1295713103 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -696,11 +704,327 @@ public void run() { } } +public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) { +if (isLeader()) { +logger.debug("Updating {} with remoteLogStartOffset: {}", topicPartition, remoteLogStartOffset); +updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset); +} +} + +class RemoteLogRetentionHandler { + +private final Optional retentionSizeData; +private final Optional retentionTimeData; + +private long remainingBreachedSize; + +private OptionalLong logStartOffset = OptionalLong.empty(); + +public RemoteLogRetentionHandler(Optional retentionSizeData, Optional retentionTimeData) { +this.retentionSizeData = retentionSizeData; +this.retentionTimeData = retentionTimeData; +remainingBreachedSize = retentionSizeData.map(sizeData -> sizeData.remainingBreachedSize).orElse(0L); +} + +private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException { +if (!retentionSizeData.isPresent()) { +return false; +} + +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> { +// Assumption that segments contain size >= 0 +if (remainingBreachedSize > 0) { +long remainingBytes = remainingBreachedSize - x.segmentSizeInBytes(); +if (remainingBytes >= 0) { +remainingBreachedSize = remainingBytes; +return true; +} +} + +return false; +}); +if (isSegmentDeleted) { +logStartOffset = OptionalLong.of(metadata.endOffset() + 1); +logger.info("Deleted remote log segment {} due to retention size {} breach. Log size after deletion will be {}.", +metadata.remoteLogSegmentId(), retentionSizeData.get().retentionSize, remainingBreachedSize + retentionSizeData.get().retentionSize); +} +return isSegmentDeleted; +} + +public boolean deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata) +throws RemoteStorageException, ExecutionException, InterruptedException { +if (!retentionTimeData.isPresent()) { +return false; +} + +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, +x -> x.maxTimestampMs() <= retentionTimeData.get().cleanupUntilMs); +if (isSegmentDeleted) { +remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); +// It is fine to have logStartOffset as `metadata.endOffset() + 1` as the segment offset intervals +// are ascending with in an epoch. +logStartOffset = OptionalLong.of(metadata.endOffset() + 1); +logger.info("Deleted remote log segment {} due to retention time {}ms breach based on the largest record timestamp in the segment", +metadata.remoteLogSegmentId(), retentionTimeData.get().retentionMs); +} +return isSegmentDeleted; +} + +private boolean deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long startOffset) +throws RemoteStorageException, ExecutionException, InterruptedException { +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> startOffset > x.endOffset()); +if (isSegmentDeleted && retentionSizeData.isPresent()) { +remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); +logger.info("Deleted remote log segment {} due to log start offset {} breach", metadata.remoteLogSegmentId(), startOffset); +} + +return isSegmentDeleted; +} + +// It removes the segments beyond the current leader's earliest epoch. Those segments are considered as +// unreferenced because they are not part of the current leader epoch lineage. +private boolean deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, RemoteLogSegmentMetadata metadata) throws RemoteStorageExcept
[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
satishd commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1295701347 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -698,11 +707,329 @@ public void run() { } } +public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) { +if (isLeader()) { +logger.debug("Updating {} with remoteLogStartOffset: {}", topicPartition, remoteLogStartOffset); +updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset); +} +} + +class RemoteLogRetentionHandler { + +private final Optional retentionSizeData; +private final Optional retentionTimeData; + +private long remainingBreachedSize; + +private OptionalLong logStartOffset = OptionalLong.empty(); + +public RemoteLogRetentionHandler(Optional retentionSizeData, Optional retentionTimeData) { +this.retentionSizeData = retentionSizeData; +this.retentionTimeData = retentionTimeData; +remainingBreachedSize = retentionSizeData.map(sizeData -> sizeData.remainingBreachedSize).orElse(0L); +} + +private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException { +if (!retentionSizeData.isPresent()) { +return false; +} + +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> { +// Assumption that segments contain size >= 0 +if (remainingBreachedSize > 0) { +long remainingBytes = remainingBreachedSize - x.segmentSizeInBytes(); +if (remainingBytes >= 0) { +remainingBreachedSize = remainingBytes; +return true; +} +} + +return false; +}); +if (isSegmentDeleted) { +logStartOffset = OptionalLong.of(metadata.endOffset() + 1); +logger.info("Deleted remote log segment {} due to retention size {} breach. Log size after deletion will be {}.", +metadata.remoteLogSegmentId(), retentionSizeData.get().retentionSize, remainingBreachedSize + retentionSizeData.get().retentionSize); +} +return isSegmentDeleted; +} + +public boolean deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata) +throws RemoteStorageException, ExecutionException, InterruptedException { +if (!retentionTimeData.isPresent()) { +return false; +} + +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, +x -> x.maxTimestampMs() <= retentionTimeData.get().cleanupUntilMs); +if (isSegmentDeleted) { +remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); +// It is fine to have logStartOffset as `metadata.endOffset() + 1` as the segment offset intervals +// are ascending with in an epoch. +logStartOffset = OptionalLong.of(metadata.endOffset() + 1); +logger.info("Deleted remote log segment {} due to retention time {}ms breach based on the largest record timestamp in the segment", +metadata.remoteLogSegmentId(), retentionTimeData.get().retentionMs); +} +return isSegmentDeleted; +} + +private boolean deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long startOffset) +throws RemoteStorageException, ExecutionException, InterruptedException { +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> startOffset > x.endOffset()); +if (isSegmentDeleted && retentionSizeData.isPresent()) { +remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); +logger.info("Deleted remote log segment {} due to log start offset {} breach", metadata.remoteLogSegmentId(), startOffset); +} + +return isSegmentDeleted; +} + +// It removes the segments beyond the current leader's earliest epoch. Those segments are considered as +// unreferenced because they are not part of the current leader epoch lineage. +private boolean deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, RemoteLogSegmentMetadata metadata) throws RemoteStorageExcept
[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
satishd commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1295670320 ## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ## @@ -1003,6 +1015,134 @@ public RemoteLogMetadataManager createRemoteLogMetadataManager() { } } +private static RemoteLogSegmentMetadata createRemoteLogSegmentMetadata(long startOffset, long endOffset, Map segmentEpochs) { +return new RemoteLogSegmentMetadata( +new RemoteLogSegmentId(new TopicIdPartition(Uuid.randomUuid(), +new TopicPartition("topic", 0)), Uuid.randomUuid()), +startOffset, endOffset, +10L, +1, +10L, +1000, +Optional.empty(), +RemoteLogSegmentState.COPY_SEGMENT_FINISHED, segmentEpochs); +} + +@Test +public void testRemoteSegmentWithinLeaderEpochs() { +// Test whether a remote segment is within the leader epochs +final long logEndOffset = 90L; + +TreeMap leaderEpochToStartOffset = new TreeMap() {{ +put(0, 0L); +put(1, 10L); +put(2, 20L); +put(3, 30L); +put(4, 40L); +put(5, 50L); +put(7, 70L); +}}; Review Comment: Thanks @kamalcph for the clarification, good to know about that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
satishd commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1295607067 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1390,7 +1424,15 @@ class UnifiedLog(@volatile var logStartOffset: Long, private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean, reason: SegmentDeletionReason): Int = { def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): Boolean = { - highWatermark >= nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset) && + val upperBoundOffset = nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset) + + // Check not to delete segments which are not yet copied to tiered storage + val isSegmentTieredToRemoteStorage = Review Comment: Thanks @junrao for the clarification. In the above case with remote storage enabled, it will eventually be deleted from local and remote storages, and updates log-start-offset and local-log-start-offset respectively. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
satishd commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1295387916 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -1033,6 +1360,35 @@ public void close() { } } +private static class RetentionSizeData { +private final long retentionSize; +private final long remainingBreachedSize; + +public RetentionSizeData(long retentionSize, long remainingBreachedSize) { +if (retentionSize < remainingBreachedSize) { +throw new IllegalArgumentException("retentionSize must be greater than remainingBreachedSize"); +} Review Comment: Good catch! It was changed while refactoring, added UTs to cover that in the latest commits. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
satishd commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1294843843 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -152,16 +152,42 @@ class UnifiedLog(@volatile var logStartOffset: Long, locally { initializePartitionMetadata() updateLogStartOffset(logStartOffset) +updateLocalLogStartOffset(math.max(logStartOffset, localLog.segments.firstSegmentBaseOffset.getOrElse(0L))) +if (!remoteLogEnabled()) + logStartOffset = localLogStartOffset() maybeIncrementFirstUnstableOffset() initializeTopicId() logOffsetsListener.onHighWatermarkUpdated(highWatermarkMetadata.messageOffset) + +info(s"Completed load of log with ${localLog.segments.numberOfSegments} segments, local log start offset ${localLogStartOffset()} and " + + s"log end offset $logEndOffset") } def setLogOffsetsListener(listener: LogOffsetsListener): Unit = { logOffsetsListener = listener } + private def updateLocalLogStartOffset(offset: Long): Unit = { Review Comment: This method is used only from `locally` block and it does not require taking any lock. We moved this method inside the locally block to avoid any confusion and future usage outside of that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
satishd commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1294604024 ## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ## @@ -1003,6 +1015,134 @@ public RemoteLogMetadataManager createRemoteLogMetadataManager() { } } +private static RemoteLogSegmentMetadata createRemoteLogSegmentMetadata(long startOffset, long endOffset, Map segmentEpochs) { +return new RemoteLogSegmentMetadata( +new RemoteLogSegmentId(new TopicIdPartition(Uuid.randomUuid(), +new TopicPartition("topic", 0)), Uuid.randomUuid()), +startOffset, endOffset, +10L, +1, +10L, +1000, +Optional.empty(), +RemoteLogSegmentState.COPY_SEGMENT_FINISHED, segmentEpochs); +} + +@Test +public void testRemoteSegmentWithinLeaderEpochs() { +// Test whether a remote segment is within the leader epochs +final long logEndOffset = 90L; + +TreeMap leaderEpochToStartOffset = new TreeMap() {{ +put(0, 0L); +put(1, 10L); +put(2, 20L); +put(3, 30L); +put(4, 40L); +put(5, 50L); +put(7, 70L); +}}; Review Comment: What is the rationale for this suggestion? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
satishd commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1294603142 ## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ## @@ -1003,6 +1015,134 @@ public RemoteLogMetadataManager createRemoteLogMetadataManager() { } } +private static RemoteLogSegmentMetadata createRemoteLogSegmentMetadata(long startOffset, long endOffset, Map segmentEpochs) { +return new RemoteLogSegmentMetadata( +new RemoteLogSegmentId(new TopicIdPartition(Uuid.randomUuid(), +new TopicPartition("topic", 0)), Uuid.randomUuid()), +startOffset, endOffset, +10L, +1, +10L, +1000, +Optional.empty(), +RemoteLogSegmentState.COPY_SEGMENT_FINISHED, segmentEpochs); +} + +@Test +public void testRemoteSegmentWithinLeaderEpochs() { +// Test whether a remote segment is within the leader epochs +final long logEndOffset = 90L; + +TreeMap leaderEpochToStartOffset = new TreeMap() {{ +put(0, 0L); +put(1, 10L); +put(2, 20L); +put(3, 30L); +put(4, 40L); +put(5, 50L); +put(7, 70L); +}}; + +// Test whether a remote segment's epochs/offsets(multiple) are within the range of leader epochs + assertTrue(RemoteLogManager.isRemoteSegmentWithinLeaderEpochs(createRemoteLogSegmentMetadata( +15, +35, +new TreeMap() {{ +put(1, 15L); +put(2, 20L); +put(3, 30L); +}}), logEndOffset, leaderEpochToStartOffset)); + +// Test whether a remote segment's epochs/offsets(single) are within the range of leader epochs + assertTrue(RemoteLogManager.isRemoteSegmentWithinLeaderEpochs(createRemoteLogSegmentMetadata( +15, +19, +new TreeMap() {{ +put(1, 15L); +}}), logEndOffset, leaderEpochToStartOffset)); + +// Test whether a remote segment's epochs/offsets(single) are within the range of leader epochs + assertTrue(RemoteLogManager.isRemoteSegmentWithinLeaderEpochs(createRemoteLogSegmentMetadata( Review Comment: Good catch! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
satishd commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1294602887 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -698,11 +707,329 @@ public void run() { } } +public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) { +if (isLeader()) { +logger.debug("Updating {} with remoteLogStartOffset: {}", topicPartition, remoteLogStartOffset); +updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset); +} +} + +class RemoteLogRetentionHandler { + +private final Optional retentionSizeData; +private final Optional retentionTimeData; + +private long remainingBreachedSize; + +private OptionalLong logStartOffset = OptionalLong.empty(); + +public RemoteLogRetentionHandler(Optional retentionSizeData, Optional retentionTimeData) { Review Comment: I do not find a strong reason not to use Optional as an argument. :) In the same SO link, few other opinions on why it is a weak argument. Optional as an argument is used in several other places within this project. I do not have strong opinions and I am fine if we decide to go with that rule across the project when there is a consensus. We can revisit it when we do that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
satishd commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1293345266 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1390,7 +1424,15 @@ class UnifiedLog(@volatile var logStartOffset: Long, private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean, reason: SegmentDeletionReason): Int = { def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): Boolean = { - highWatermark >= nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset) && + val upperBoundOffset = nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset) + + // Check not to delete segments which are not yet copied to tiered storage + val isSegmentTieredToRemoteStorage = Review Comment: Let me rephrase what you mentioned here retention.bytes= 100MB segment1 - 90MB When remote storage is not enabled, then this segment is not deleted from local log segments becuas eof the retention size check. retention.bytes= 100MB local.retention.bytes= 20MB segment1 - 90MB When remote storage is enabled, and there are no segments uploaded to remote storage. That means it will not allow this segment to be deleted as it is not yet copied to remote storage based on the introduced check in this PR. If it is copied to remote storage, that means it is not an active segment and there are one or more local segments after this segment. This segment will be eligible for deletion based on the local retention policy as it is already copied to remote storage earlier. @junrao Am I missing anything here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
satishd commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1293344902 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1390,7 +1424,15 @@ class UnifiedLog(@volatile var logStartOffset: Long, private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean, reason: SegmentDeletionReason): Int = { def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): Boolean = { - highWatermark >= nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset) && + val upperBoundOffset = nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset) + + // Check not to delete segments which are not yet copied to tiered storage + val isSegmentTieredToRemoteStorage = Review Comment: Let me rephrase what you mentioned here retention.bytes= 100MB segment1 - 90MB When remote storage is not enabled, then this segment is not deleted from local log segments becuas eof the retention size check. retention.bytes= 100MB local.retention.bytes= 20MB segment1 - 90MB When remote storage is enabled, and there are no segments uploaded to remote storage. That means it will not allow this segment to be deleted as it is not yet copied to remote storage based on the introduced check in this PR. If it is copied to remote storage, that means it is not an active segment and there are one or more local segments after this segment. This segment will be eligible for deletion based on the local retention policy as it is already copied to remote storage earlier. @junrao Am I missing antyhing here? ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1390,7 +1424,15 @@ class UnifiedLog(@volatile var logStartOffset: Long, private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean, reason: SegmentDeletionReason): Int = { def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): Boolean = { - highWatermark >= nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset) && + val upperBoundOffset = nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset) + + // Check not to delete segments which are not yet copied to tiered storage + val isSegmentTieredToRemoteStorage = Review Comment: Let me rephrase what you mentioned here retention.bytes= 100MB segment1 - 90MB When remote storage is not enabled, then this segment is not deleted from local log segments becuas eof the retention size check. retention.bytes= 100MB local.retention.bytes= 20MB segment1 - 90MB When remote storage is enabled, and there are no segments uploaded to remote storage. That means it will not allow this segment to be deleted as it is not yet copied to remote storage based on the introduced check in this PR. If it is copied to remote storage, that means it is not an active segment and there are one or more local segments after this segment. This segment will be eligible for deletion based on the local retention policy as it is already copied to remote storage earlier. @junrao Am I missing antyhing here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
satishd commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1293179175 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -618,6 +625,230 @@ public void run() { } } +public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) { +if (isLeader()) { Review Comment: @kamalcph The case mentioned by you can be addressed in a followup PR. Please file a JIRA. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
satishd commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1292967405 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1390,7 +1424,15 @@ class UnifiedLog(@volatile var logStartOffset: Long, private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean, reason: SegmentDeletionReason): Int = { def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): Boolean = { - highWatermark >= nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset) && + val upperBoundOffset = nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset) + + // Check not to delete segments which are not yet copied to tiered storage + val isSegmentTieredToRemoteStorage = +if (remoteLogEnabled()) upperBoundOffset > 0 && upperBoundOffset - 1 <= highestOffsetInRemoteStorage +else true Review Comment: It was implicit from the condition that it is relevant only when remote storage is enabled. I removed the value and added a condition and the respective comments for better clarity. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
satishd commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1292967294 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1464,12 +1513,12 @@ class UnifiedLog(@volatile var logStartOffset: Long, } } -deleteOldSegments(shouldDelete, RetentionSizeBreach(this)) +deleteOldSegments(shouldDelete, RetentionSizeBreach(this, remoteLogEnabled())) } private def deleteLogStartOffsetBreachedSegments(): Int = { def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): Boolean = { - nextSegmentOpt.exists(_.baseOffset <= logStartOffset) + nextSegmentOpt.exists(_.baseOffset <= (if(remoteLogEnabled()) localLogStartOffset() else logStartOffset)) Review Comment: When remote log is enabled, it deletes the local segments whose offset is <= local-log-start-offset. The existing condition without tiered storage is to delete the local log segments <= log-start-offset. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
satishd commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1289577043 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -696,11 +704,327 @@ public void run() { } } +public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) { +if (isLeader()) { +logger.debug("Updating {} with remoteLogStartOffset: {}", topicPartition, remoteLogStartOffset); +updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset); +} +} + +class RemoteLogRetentionHandler { + +private final Optional retentionSizeData; +private final Optional retentionTimeData; + +private long remainingBreachedSize; + +private OptionalLong logStartOffset = OptionalLong.empty(); + +public RemoteLogRetentionHandler(Optional retentionSizeData, Optional retentionTimeData) { +this.retentionSizeData = retentionSizeData; +this.retentionTimeData = retentionTimeData; +remainingBreachedSize = retentionSizeData.map(sizeData -> sizeData.remainingBreachedSize).orElse(0L); +} + +private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException { +if (!retentionSizeData.isPresent()) { +return false; +} + +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> { +// Assumption that segments contain size >= 0 +if (remainingBreachedSize > 0) { +long remainingBytes = remainingBreachedSize - x.segmentSizeInBytes(); +if (remainingBytes >= 0) { +remainingBreachedSize = remainingBytes; +return true; +} +} + +return false; +}); +if (isSegmentDeleted) { +logStartOffset = OptionalLong.of(metadata.endOffset() + 1); +logger.info("Deleted remote log segment {} due to retention size {} breach. Log size after deletion will be {}.", +metadata.remoteLogSegmentId(), retentionSizeData.get().retentionSize, remainingBreachedSize + retentionSizeData.get().retentionSize); +} +return isSegmentDeleted; +} + +public boolean deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata) +throws RemoteStorageException, ExecutionException, InterruptedException { +if (!retentionTimeData.isPresent()) { +return false; +} + +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, +x -> x.maxTimestampMs() <= retentionTimeData.get().cleanupUntilMs); +if (isSegmentDeleted) { +remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); +// It is fine to have logStartOffset as `metadata.endOffset() + 1` as the segment offset intervals +// are ascending with in an epoch. +logStartOffset = OptionalLong.of(metadata.endOffset() + 1); +logger.info("Deleted remote log segment {} due to retention time {}ms breach based on the largest record timestamp in the segment", +metadata.remoteLogSegmentId(), retentionTimeData.get().retentionMs); +} +return isSegmentDeleted; +} + +private boolean deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long startOffset) +throws RemoteStorageException, ExecutionException, InterruptedException { +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> startOffset > x.endOffset()); +if (isSegmentDeleted && retentionSizeData.isPresent()) { +remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); +logger.info("Deleted remote log segment {} due to log start offset {} breach", metadata.remoteLogSegmentId(), startOffset); +} + +return isSegmentDeleted; +} + +// It removes the segments beyond the current leader's earliest epoch. Those segments are considered as +// unreferenced because they are not part of the current leader epoch lineage. +private boolean deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, RemoteLogSegmentMetadata metadata) throws RemoteStorageExcept
[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
satishd commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1289577043 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -696,11 +704,327 @@ public void run() { } } +public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) { +if (isLeader()) { +logger.debug("Updating {} with remoteLogStartOffset: {}", topicPartition, remoteLogStartOffset); +updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset); +} +} + +class RemoteLogRetentionHandler { + +private final Optional retentionSizeData; +private final Optional retentionTimeData; + +private long remainingBreachedSize; + +private OptionalLong logStartOffset = OptionalLong.empty(); + +public RemoteLogRetentionHandler(Optional retentionSizeData, Optional retentionTimeData) { +this.retentionSizeData = retentionSizeData; +this.retentionTimeData = retentionTimeData; +remainingBreachedSize = retentionSizeData.map(sizeData -> sizeData.remainingBreachedSize).orElse(0L); +} + +private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException { +if (!retentionSizeData.isPresent()) { +return false; +} + +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> { +// Assumption that segments contain size >= 0 +if (remainingBreachedSize > 0) { +long remainingBytes = remainingBreachedSize - x.segmentSizeInBytes(); +if (remainingBytes >= 0) { +remainingBreachedSize = remainingBytes; +return true; +} +} + +return false; +}); +if (isSegmentDeleted) { +logStartOffset = OptionalLong.of(metadata.endOffset() + 1); +logger.info("Deleted remote log segment {} due to retention size {} breach. Log size after deletion will be {}.", +metadata.remoteLogSegmentId(), retentionSizeData.get().retentionSize, remainingBreachedSize + retentionSizeData.get().retentionSize); +} +return isSegmentDeleted; +} + +public boolean deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata) +throws RemoteStorageException, ExecutionException, InterruptedException { +if (!retentionTimeData.isPresent()) { +return false; +} + +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, +x -> x.maxTimestampMs() <= retentionTimeData.get().cleanupUntilMs); +if (isSegmentDeleted) { +remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); +// It is fine to have logStartOffset as `metadata.endOffset() + 1` as the segment offset intervals +// are ascending with in an epoch. +logStartOffset = OptionalLong.of(metadata.endOffset() + 1); +logger.info("Deleted remote log segment {} due to retention time {}ms breach based on the largest record timestamp in the segment", +metadata.remoteLogSegmentId(), retentionTimeData.get().retentionMs); +} +return isSegmentDeleted; +} + +private boolean deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long startOffset) +throws RemoteStorageException, ExecutionException, InterruptedException { +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> startOffset > x.endOffset()); +if (isSegmentDeleted && retentionSizeData.isPresent()) { +remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); +logger.info("Deleted remote log segment {} due to log start offset {} breach", metadata.remoteLogSegmentId(), startOffset); +} + +return isSegmentDeleted; +} + +// It removes the segments beyond the current leader's earliest epoch. Those segments are considered as +// unreferenced because they are not part of the current leader epoch lineage. +private boolean deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, RemoteLogSegmentMetadata metadata) throws RemoteStorageExcept
[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
satishd commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1289577612 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1464,12 +1513,12 @@ class UnifiedLog(@volatile var logStartOffset: Long, } } -deleteOldSegments(shouldDelete, RetentionSizeBreach(this)) +deleteOldSegments(shouldDelete, RetentionSizeBreach(this, remoteLogEnabled())) } private def deleteLogStartOffsetBreachedSegments(): Int = { def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): Boolean = { - nextSegmentOpt.exists(_.baseOffset <= logStartOffset) + nextSegmentOpt.exists(_.baseOffset <= localLogStartOffset()) Review Comment: Nice catch! Missed it while merging the conflicts. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
satishd commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1289577043 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -696,11 +704,327 @@ public void run() { } } +public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) { +if (isLeader()) { +logger.debug("Updating {} with remoteLogStartOffset: {}", topicPartition, remoteLogStartOffset); +updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset); +} +} + +class RemoteLogRetentionHandler { + +private final Optional retentionSizeData; +private final Optional retentionTimeData; + +private long remainingBreachedSize; + +private OptionalLong logStartOffset = OptionalLong.empty(); + +public RemoteLogRetentionHandler(Optional retentionSizeData, Optional retentionTimeData) { +this.retentionSizeData = retentionSizeData; +this.retentionTimeData = retentionTimeData; +remainingBreachedSize = retentionSizeData.map(sizeData -> sizeData.remainingBreachedSize).orElse(0L); +} + +private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException { +if (!retentionSizeData.isPresent()) { +return false; +} + +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> { +// Assumption that segments contain size >= 0 +if (remainingBreachedSize > 0) { +long remainingBytes = remainingBreachedSize - x.segmentSizeInBytes(); +if (remainingBytes >= 0) { +remainingBreachedSize = remainingBytes; +return true; +} +} + +return false; +}); +if (isSegmentDeleted) { +logStartOffset = OptionalLong.of(metadata.endOffset() + 1); +logger.info("Deleted remote log segment {} due to retention size {} breach. Log size after deletion will be {}.", +metadata.remoteLogSegmentId(), retentionSizeData.get().retentionSize, remainingBreachedSize + retentionSizeData.get().retentionSize); +} +return isSegmentDeleted; +} + +public boolean deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata) +throws RemoteStorageException, ExecutionException, InterruptedException { +if (!retentionTimeData.isPresent()) { +return false; +} + +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, +x -> x.maxTimestampMs() <= retentionTimeData.get().cleanupUntilMs); +if (isSegmentDeleted) { +remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); +// It is fine to have logStartOffset as `metadata.endOffset() + 1` as the segment offset intervals +// are ascending with in an epoch. +logStartOffset = OptionalLong.of(metadata.endOffset() + 1); +logger.info("Deleted remote log segment {} due to retention time {}ms breach based on the largest record timestamp in the segment", +metadata.remoteLogSegmentId(), retentionTimeData.get().retentionMs); +} +return isSegmentDeleted; +} + +private boolean deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long startOffset) +throws RemoteStorageException, ExecutionException, InterruptedException { +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> startOffset > x.endOffset()); +if (isSegmentDeleted && retentionSizeData.isPresent()) { +remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); +logger.info("Deleted remote log segment {} due to log start offset {} breach", metadata.remoteLogSegmentId(), startOffset); +} + +return isSegmentDeleted; +} + +// It removes the segments beyond the current leader's earliest epoch. Those segments are considered as +// unreferenced because they are not part of the current leader epoch lineage. +private boolean deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, RemoteLogSegmentMetadata metadata) throws RemoteStorageExcept
[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
satishd commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1289566534 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1390,7 +1424,15 @@ class UnifiedLog(@volatile var logStartOffset: Long, private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean, reason: SegmentDeletionReason): Int = { def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): Boolean = { - highWatermark >= nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset) && + val upperBoundOffset = nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset) + + // Check not to delete segments which are not yet copied to tiered storage + val isSegmentTieredToRemoteStorage = Review Comment: Local log size is based on the local retention configs and those are always less than or equal to the complete log retention. I'm unclear about the rationale behind retaining data in local storage using an overall retention size where there are no remote log segments. Please provide clarification. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
satishd commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1289488513 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1390,7 +1424,15 @@ class UnifiedLog(@volatile var logStartOffset: Long, private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean, reason: SegmentDeletionReason): Int = { def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): Boolean = { - highWatermark >= nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset) && + val upperBoundOffset = nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset) + + // Check not to delete segments which are not yet copied to tiered storage + val isSegmentTieredToRemoteStorage = +if (remoteLogEnabled()) upperBoundOffset > 0 && upperBoundOffset - 1 <= highestOffsetInRemoteStorage +else true Review Comment: No, this should be true if the remote storage is not enabled as this segment should be eligible based on other checks like `highWatermark >= upperBoundOffset && predicate(segment, nextSegmentOpt)`. Existing tests in `UnifiedLogTest`, `LogOffsetTest`, `LogLoaderTest`, `LogCleanerTest` already cover those scenarios. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
satishd commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1284280124 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -667,11 +675,323 @@ public void run() { } } +public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) { +if (isLeader()) { +logger.debug("Updating {} with remoteLogStartOffset: {}", topicPartition, remoteLogStartOffset); +updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset); +} +} + +class RemoteLogRetentionHandler { + +private final Optional retentionSizeData; +private final Optional retentionTimeData; + +private long remainingBreachedSize; + +private OptionalLong logStartOffset = OptionalLong.empty(); + +public RemoteLogRetentionHandler(Optional retentionSizeData, Optional retentionTimeData) { +this.retentionSizeData = retentionSizeData; +this.retentionTimeData = retentionTimeData; +remainingBreachedSize = retentionSizeData.map(sizeData -> sizeData.remainingBreachedSize).orElse(0L); +} + +private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException { +if (!retentionSizeData.isPresent()) { +return false; +} + +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> { +// Assumption that segments contain size >= 0 +if (remainingBreachedSize > 0) { +long remainingBytes = remainingBreachedSize - x.segmentSizeInBytes(); +if (remainingBytes >= 0) { +remainingBreachedSize = remainingBytes; +return true; +} +} + +return false; +}); +if (isSegmentDeleted) { +logStartOffset = OptionalLong.of(metadata.endOffset() + 1); +logger.info("Deleted remote log segment {} due to retention size {} breach. Log size after deletion will be {}.", +metadata.remoteLogSegmentId(), retentionSizeData.get().retentionSize, remainingBreachedSize + retentionSizeData.get().retentionSize); +} +return isSegmentDeleted; +} + +public boolean deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata) +throws RemoteStorageException, ExecutionException, InterruptedException { +if (!retentionTimeData.isPresent()) { +return false; +} + +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, +x -> x.maxTimestampMs() <= retentionTimeData.get().cleanupUntilMs); +if (isSegmentDeleted) { +remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); +// It is fine to have logStartOffset as `metadata.endOffset() + 1` as the segment offset intervals +// are ascending with in an epoch. +logStartOffset = OptionalLong.of(metadata.endOffset() + 1); +logger.info("Deleted remote log segment {} due to retention time {}ms breach based on the largest record timestamp in the segment", +metadata.remoteLogSegmentId(), retentionTimeData.get().retentionMs); +} +return isSegmentDeleted; +} + +private boolean deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long startOffset) +throws RemoteStorageException, ExecutionException, InterruptedException { +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> startOffset > x.endOffset()); +if (isSegmentDeleted && retentionSizeData.isPresent()) { +remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); +logger.info("Deleted remote log segment {} due to log start offset {} breach", metadata.remoteLogSegmentId(), startOffset); +} + +// No need to update the logStartOffset. +return isSegmentDeleted; +} + +// It removes the segments beyond the current leader's earliest epoch. Those segments are considered as +// unreferenced because they are not part of the current leader epoch lineage. +private boolean deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, Rem
[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
satishd commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1284278935 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -2207,7 +2267,7 @@ case class RetentionSizeBreach(log: UnifiedLog) extends SegmentDeletionReason { var size = log.size toDelete.foreach { segment => size -= segment.size - log.info(s"Deleting segment $segment due to retention size ${log.config.retentionSize} breach. Log size " + + log.info(s"Deleting segment $segment due to local log retention size ${UnifiedLog.localRetentionSize(log.config)} breach. Local log size " + Review Comment: Good catch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
satishd commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1284241897 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -152,16 +152,42 @@ class UnifiedLog(@volatile var logStartOffset: Long, locally { initializePartitionMetadata() updateLogStartOffset(logStartOffset) +updateLocalLogStartOffset(math.max(logStartOffset, localLog.segments.firstSegmentBaseOffset.getOrElse(0L))) +if (!remoteLogEnabled()) + logStartOffset = localLogStartOffset() maybeIncrementFirstUnstableOffset() initializeTopicId() logOffsetsListener.onHighWatermarkUpdated(highWatermarkMetadata.messageOffset) + +info(s"Completed load of log with ${localLog.segments.numberOfSegments} segments, local log start offset ${localLogStartOffset()} and " + + s"log end offset $logEndOffset") } def setLogOffsetsListener(listener: LogOffsetsListener): Unit = { logOffsetsListener = listener } + private def updateLocalLogStartOffset(offset: Long): Unit = { Review Comment: This is not required as the updated code does not use this method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
satishd commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1283024088 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -618,6 +625,230 @@ public void run() { } } +public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) { +if (isLeader()) { Review Comment: It is not mandatory to update it when this node becomes a follower as the existing follower fetch protocol makes sure that the follower truncates their log-start-offset based on the leader's log-start-ffset. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
satishd commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1281844159 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -945,13 +978,19 @@ class UnifiedLog(@volatile var logStartOffset: Long, localLog.checkIfMemoryMappedBufferClosed() if (newLogStartOffset > logStartOffset) { - updatedLogStartOffset = true - updateLogStartOffset(newLogStartOffset) - _localLogStartOffset = newLogStartOffset - info(s"Incremented log start offset to $newLogStartOffset due to $reason") - leaderEpochCache.foreach(_.truncateFromStart(logStartOffset)) - producerStateManager.onLogStartOffsetIncremented(newLogStartOffset) - maybeIncrementFirstUnstableOffset() + // it should always get updated if tiered-storage is not enabled. + if (!onlyLocalLogStartOffsetUpdate || !remoteLogEnabled()) { Review Comment: Good point! Addressed in the latest commits to keep the logic simpler. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
satishd commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1281672320 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -618,6 +625,230 @@ public void run() { } } +public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) { +if (isLeader()) { +logger.debug("Updating {} with remoteLogStartOffset: {}", topicPartition, remoteLogStartOffset); +updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset); +} +} + +class RemoteLogRetentionHandler { + +private final Optional retentionSizeData; +private final Optional retentionTimeData; + +private long remainingBreachedSize; + +private OptionalLong logStartOffset = OptionalLong.empty(); + +public RemoteLogRetentionHandler(Optional retentionSizeData, Optional retentionTimeData) { +this.retentionSizeData = retentionSizeData; +this.retentionTimeData = retentionTimeData; +remainingBreachedSize = retentionSizeData.map(sizeData -> sizeData.remainingBreachedSize).orElse(0L); +} + +private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException { +if (!retentionSizeData.isPresent()) { +return false; +} + +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> { +// Assumption that segments contain size >= 0 +if (retentionSizeData.get().remainingBreachedSize > 0) { +remainingBreachedSize -= x.segmentSizeInBytes(); +return remainingBreachedSize >= 0; +} else return false; +}); +if (isSegmentDeleted) { +logStartOffset = OptionalLong.of(metadata.endOffset() + 1); +logger.info("Deleted remote log segment {} due to retention size {} breach. Log size after deletion will be {}.", +metadata.remoteLogSegmentId(), retentionSizeData.get().retentionSize, remainingBreachedSize + retentionSizeData.get().retentionSize); +} +return isSegmentDeleted; +} + +public boolean deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata) +throws RemoteStorageException, ExecutionException, InterruptedException { +if (!retentionTimeData.isPresent()) { +return false; +} + +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, +x -> x.maxTimestampMs() <= retentionTimeData.get().cleanupUntilMs); +if (isSegmentDeleted) { +remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); +// It is fine to have logStartOffset as `metadata.endOffset() + 1` as the segment offset intervals +// are ascending with in an epoch. +logStartOffset = OptionalLong.of(metadata.endOffset() + 1); +logger.info("Deleted remote log segment {} due to retention time {}ms breach based on the largest record timestamp in the segment", +metadata.remoteLogSegmentId(), retentionTimeData.get().retentionMs); +} +return isSegmentDeleted; +} + +private boolean deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long startOffset) +throws RemoteStorageException, ExecutionException, InterruptedException { +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> startOffset > x.endOffset()); +if (isSegmentDeleted && retentionSizeData.isPresent()) { +remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); +logger.info("Deleted remote log segment {} due to log start offset {} breach", metadata.remoteLogSegmentId(), startOffset); +} + +// No need to update the logStartOffset. +return isSegmentDeleted; +} + +// It removes the segments beyond the current leader's earliest epoch. Those segments are considered as +// unreferenced because they are not part of the current leader epoch lineage. +private boolean deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException { +boolean isSegmentDelete
[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
satishd commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1234980957 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -152,16 +152,42 @@ class UnifiedLog(@volatile var logStartOffset: Long, locally { initializePartitionMetadata() updateLogStartOffset(logStartOffset) +updateLocalLogStartOffset(math.max(logStartOffset, localLog.segments.firstSegmentBaseOffset.getOrElse(0L))) +if (!remoteLogEnabled()) + logStartOffset = localLogStartOffset() maybeIncrementFirstUnstableOffset() initializeTopicId() logOffsetsListener.onHighWatermarkUpdated(highWatermarkMetadata.messageOffset) + +info(s"Completed load of log with ${localLog.segments.numberOfSegments} segments, local log start offset ${localLogStartOffset()} and " + + s"log end offset $logEndOffset") } def setLogOffsetsListener(listener: LogOffsetsListener): Unit = { logOffsetsListener = listener } + private def updateLocalLogStartOffset(offset: Long): Unit = { +_localLogStartOffset = offset + +if (highWatermark < offset) { + updateHighWatermark(offset) +} + +if (this.recoveryPoint < offset) { + localLog.updateRecoveryPoint(offset) Review Comment: It was updated based on log-start-offset with `updateLogStartOffset`, but local-log-start-offset can be more than that and it will be updated if needed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
satishd commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1234980098 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -618,6 +625,230 @@ public void run() { } } +public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) { +if (isLeader()) { +logger.debug("Updating {} with remoteLogStartOffset: {}", topicPartition, remoteLogStartOffset); +updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset); +} +} + +class RemoteLogRetentionHandler { + +private final Optional retentionSizeData; +private final Optional retentionTimeData; + +private long remainingBreachedSize; + +private OptionalLong logStartOffset = OptionalLong.empty(); + +public RemoteLogRetentionHandler(Optional retentionSizeData, Optional retentionTimeData) { +this.retentionSizeData = retentionSizeData; +this.retentionTimeData = retentionTimeData; +remainingBreachedSize = retentionSizeData.map(sizeData -> sizeData.remainingBreachedSize).orElse(0L); +} + +private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException { +if (!retentionSizeData.isPresent()) { +return false; +} + +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> { +// Assumption that segments contain size >= 0 +if (retentionSizeData.get().remainingBreachedSize > 0) { +remainingBreachedSize -= x.segmentSizeInBytes(); +return remainingBreachedSize >= 0; +} else return false; +}); +if (isSegmentDeleted) { +logStartOffset = OptionalLong.of(metadata.endOffset() + 1); +logger.info("Deleted remote log segment {} due to retention size {} breach. Log size after deletion will be {}.", +metadata.remoteLogSegmentId(), retentionSizeData.get().retentionSize, remainingBreachedSize + retentionSizeData.get().retentionSize); +} +return isSegmentDeleted; +} + +public boolean deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata) +throws RemoteStorageException, ExecutionException, InterruptedException { +if (!retentionTimeData.isPresent()) { +return false; +} + +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, +x -> x.maxTimestampMs() <= retentionTimeData.get().cleanupUntilMs); +if (isSegmentDeleted) { +remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); +// It is fine to have logStartOffset as `metadata.endOffset() + 1` as the segment offset intervals +// are ascending with in an epoch. +logStartOffset = OptionalLong.of(metadata.endOffset() + 1); +logger.info("Deleted remote log segment {} due to retention time {}ms breach based on the largest record timestamp in the segment", +metadata.remoteLogSegmentId(), retentionTimeData.get().retentionMs); +} +return isSegmentDeleted; +} + +private boolean deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long startOffset) +throws RemoteStorageException, ExecutionException, InterruptedException { +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> startOffset > x.endOffset()); +if (isSegmentDeleted && retentionSizeData.isPresent()) { +remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); +logger.info("Deleted remote log segment {} due to log start offset {} breach", metadata.remoteLogSegmentId(), startOffset); +} + +// No need to update the logStartOffset. +return isSegmentDeleted; +} + +// It removes the segments beyond the current leader's earliest epoch. Those segments are considered as +// unreferenced because they are not part of the current leader epoch lineage. +private boolean deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException { +boolean isSegmentDelete
[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
satishd commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1234978872 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1331,7 +1370,15 @@ class UnifiedLog(@volatile var logStartOffset: Long, private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean, reason: SegmentDeletionReason): Int = { def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): Boolean = { - highWatermark >= nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset) && + val upperBoundOffset = nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset) + + // Check not to delete segments which are not yet copied to tiered storage + val isSegmentTieredToRemoteStorage = +if (remoteLogEnabled()) upperBoundOffset > 0 && upperBoundOffset - 1 <= highestOffsetInRemoteStorage Review Comment: It is inclusive, updated with the doc describing about the variable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
satishd commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1213113458 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1383,20 +1421,30 @@ class UnifiedLog(@volatile var logStartOffset: Long, } } + private[log] def localRetentionMs: Long = { +if (config.remoteLogConfig.remoteStorageEnable) config.remoteLogConfig.localRetentionMs else config.retentionMs + } + private def deleteRetentionMsBreachedSegments(): Int = { -if (config.retentionMs < 0) return 0 +val retentionMs = localRetentionMs Review Comment: This is addressed with the latest commits. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
satishd commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1213112946 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -154,16 +154,41 @@ class UnifiedLog(@volatile var logStartOffset: Long, locally { initializePartitionMetadata() updateLogStartOffset(logStartOffset) +updateLocalLogStartOffset(math.max(logStartOffset, localLog.segments.firstSegmentBaseOffset.getOrElse(0L))) +if (!remoteLogEnabled()) + logStartOffset = localLogStartOffset() Review Comment: We already set the localLogStartOffset as max of passed logStartOffset and the first segment's base offset. When remote log is not enabled, `logStartOffset` is set as `localLogStartOffset` as computed above. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
satishd commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1213107956 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -618,6 +629,193 @@ public void run() { } } +public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) { +logger.debug("Updating $topicPartition with remoteLogStartOffset: {}", remoteLogStartOffset); +updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset); +} + +class RemoteLogRetentionHandler { + +private long remainingBreachedSize = 0L; +private OptionalLong logStartOffset = OptionalLong.empty(); + +public RemoteLogRetentionHandler(long remainingBreachedSize) { +this.remainingBreachedSize = remainingBreachedSize; +} + +private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata, boolean checkSizeRetention) throws RemoteStorageException, ExecutionException, InterruptedException { +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> { +// Assumption that segments contain size > 0 +if (checkSizeRetention && remainingBreachedSize > 0) { +remainingBreachedSize -= x.segmentSizeInBytes(); +return remainingBreachedSize >= 0; +} else return false; +}); +if (isSegmentDeleted) { +logStartOffset = OptionalLong.of(metadata.endOffset() + 1); +logger.info("Deleted remote log segment ${metadata.remoteLogSegmentId()} due to retention size " + +"${log.config.retentionSize} breach. Log size after deletion will be " + +"${remainingBreachedSize + log.config.retentionSize}."); +} +return isSegmentDeleted; +} + +private boolean deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long startOffset) +throws RemoteStorageException, ExecutionException, InterruptedException { +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> startOffset > x.endOffset()); +if (isSegmentDeleted) { +remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); +logger.info("Deleted remote log segment {} due to log start offset {} breach", metadata.remoteLogSegmentId(), startOffset); +} + +// No need to update the logStartOffset. +return isSegmentDeleted; +} + +// There are two cases: +// 1) When there are offline partitions and a new replica with empty disk is brought as leader, then the Review Comment: Updated the comment to make it more clear. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
satishd commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1213107255 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -618,6 +629,193 @@ public void run() { } } +public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) { +logger.debug("Updating $topicPartition with remoteLogStartOffset: {}", remoteLogStartOffset); +updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset); +} + +class RemoteLogRetentionHandler { + +private long remainingBreachedSize = 0L; +private OptionalLong logStartOffset = OptionalLong.empty(); + +public RemoteLogRetentionHandler(long remainingBreachedSize) { +this.remainingBreachedSize = remainingBreachedSize; +} + +private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata, boolean checkSizeRetention) throws RemoteStorageException, ExecutionException, InterruptedException { +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> { +// Assumption that segments contain size > 0 +if (checkSizeRetention && remainingBreachedSize > 0) { +remainingBreachedSize -= x.segmentSizeInBytes(); +return remainingBreachedSize >= 0; +} else return false; +}); +if (isSegmentDeleted) { +logStartOffset = OptionalLong.of(metadata.endOffset() + 1); +logger.info("Deleted remote log segment ${metadata.remoteLogSegmentId()} due to retention size " + +"${log.config.retentionSize} breach. Log size after deletion will be " + +"${remainingBreachedSize + log.config.retentionSize}."); +} +return isSegmentDeleted; +} + +private boolean deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long startOffset) +throws RemoteStorageException, ExecutionException, InterruptedException { +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> startOffset > x.endOffset()); +if (isSegmentDeleted) { +remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); +logger.info("Deleted remote log segment {} due to log start offset {} breach", metadata.remoteLogSegmentId(), startOffset); +} + +// No need to update the logStartOffset. +return isSegmentDeleted; +} + +// There are two cases: +// 1) When there are offline partitions and a new replica with empty disk is brought as leader, then the +//leader-epoch gets bumped but the log-start-offset gets truncated back to 0. +// 2) To remove the unreferenced segments. +private boolean deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException { +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> + x.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch < earliestEpochEntry.epoch)); +if (isSegmentDeleted) { +logger.info("Deleted remote log segment ${} due to leader epoch cache truncation. Current earliest epoch: {}, segmentEndOffset: {} and segmentEpochs: {}", +metadata.remoteLogSegmentId(), earliestEpochEntry, metadata.endOffset(), metadata.segmentLeaderEpochs().keySet()); +} + +// No need to update the log-start-offset as these epochs/offsets are earlier to that value. +return isSegmentDeleted; +} + +private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata segmentMetadata, Predicate predicate) +throws RemoteStorageException, ExecutionException, InterruptedException { +if (predicate.test(segmentMetadata)) { +// Publish delete segment started event. +remoteLogMetadataManager.updateRemoteLogSegmentMetadata( +new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(), + RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)).get(); + +// Delete the segment in remote storage. + remoteLogStorageManager.deleteLogSegmentData(segmentMetadata); + +// Publish delete segment finished event. +remoteLogMe
[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
satishd commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1213102668 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -618,6 +629,193 @@ public void run() { } } +public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) { +logger.debug("Updating $topicPartition with remoteLogStartOffset: {}", remoteLogStartOffset); +updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset); +} + +class RemoteLogRetentionHandler { + +private long remainingBreachedSize = 0L; +private OptionalLong logStartOffset = OptionalLong.empty(); + +public RemoteLogRetentionHandler(long remainingBreachedSize) { +this.remainingBreachedSize = remainingBreachedSize; +} + +private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata, boolean checkSizeRetention) throws RemoteStorageException, ExecutionException, InterruptedException { +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> { +// Assumption that segments contain size > 0 +if (checkSizeRetention && remainingBreachedSize > 0) { +remainingBreachedSize -= x.segmentSizeInBytes(); +return remainingBreachedSize >= 0; +} else return false; +}); +if (isSegmentDeleted) { +logStartOffset = OptionalLong.of(metadata.endOffset() + 1); +logger.info("Deleted remote log segment ${metadata.remoteLogSegmentId()} due to retention size " + +"${log.config.retentionSize} breach. Log size after deletion will be " + +"${remainingBreachedSize + log.config.retentionSize}."); +} +return isSegmentDeleted; +} + +private boolean deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long startOffset) +throws RemoteStorageException, ExecutionException, InterruptedException { +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> startOffset > x.endOffset()); +if (isSegmentDeleted) { +remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); +logger.info("Deleted remote log segment {} due to log start offset {} breach", metadata.remoteLogSegmentId(), startOffset); +} + +// No need to update the logStartOffset. +return isSegmentDeleted; +} + +// There are two cases: +// 1) When there are offline partitions and a new replica with empty disk is brought as leader, then the +//leader-epoch gets bumped but the log-start-offset gets truncated back to 0. +// 2) To remove the unreferenced segments. +private boolean deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException { +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> + x.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch < earliestEpochEntry.epoch)); +if (isSegmentDeleted) { +logger.info("Deleted remote log segment ${} due to leader epoch cache truncation. Current earliest epoch: {}, segmentEndOffset: {} and segmentEpochs: {}", +metadata.remoteLogSegmentId(), earliestEpochEntry, metadata.endOffset(), metadata.segmentLeaderEpochs().keySet()); +} + +// No need to update the log-start-offset as these epochs/offsets are earlier to that value. +return isSegmentDeleted; +} + +private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata segmentMetadata, Predicate predicate) +throws RemoteStorageException, ExecutionException, InterruptedException { +if (predicate.test(segmentMetadata)) { +// Publish delete segment started event. +remoteLogMetadataManager.updateRemoteLogSegmentMetadata( +new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(), + RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)).get(); + +// Delete the segment in remote storage. + remoteLogStorageManager.deleteLogSegmentData(segmentMetadata); + +// Publish delete segment finished event. +remoteLogMe
[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
satishd commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1213100593 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -618,6 +629,193 @@ public void run() { } } +public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) { +logger.debug("Updating $topicPartition with remoteLogStartOffset: {}", remoteLogStartOffset); +updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset); +} + +class RemoteLogRetentionHandler { + +private long remainingBreachedSize = 0L; +private OptionalLong logStartOffset = OptionalLong.empty(); + +public RemoteLogRetentionHandler(long remainingBreachedSize) { +this.remainingBreachedSize = remainingBreachedSize; +} + +private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata, boolean checkSizeRetention) throws RemoteStorageException, ExecutionException, InterruptedException { +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> { +// Assumption that segments contain size > 0 +if (checkSizeRetention && remainingBreachedSize > 0) { +remainingBreachedSize -= x.segmentSizeInBytes(); +return remainingBreachedSize >= 0; +} else return false; +}); +if (isSegmentDeleted) { +logStartOffset = OptionalLong.of(metadata.endOffset() + 1); +logger.info("Deleted remote log segment ${metadata.remoteLogSegmentId()} due to retention size " + +"${log.config.retentionSize} breach. Log size after deletion will be " + +"${remainingBreachedSize + log.config.retentionSize}."); +} +return isSegmentDeleted; +} + +private boolean deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long startOffset) +throws RemoteStorageException, ExecutionException, InterruptedException { +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> startOffset > x.endOffset()); +if (isSegmentDeleted) { +remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); +logger.info("Deleted remote log segment {} due to log start offset {} breach", metadata.remoteLogSegmentId(), startOffset); +} + +// No need to update the logStartOffset. +return isSegmentDeleted; +} + +// There are two cases: +// 1) When there are offline partitions and a new replica with empty disk is brought as leader, then the +//leader-epoch gets bumped but the log-start-offset gets truncated back to 0. +// 2) To remove the unreferenced segments. +private boolean deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException { +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> + x.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch < earliestEpochEntry.epoch)); +if (isSegmentDeleted) { +logger.info("Deleted remote log segment ${} due to leader epoch cache truncation. Current earliest epoch: {}, segmentEndOffset: {} and segmentEpochs: {}", +metadata.remoteLogSegmentId(), earliestEpochEntry, metadata.endOffset(), metadata.segmentLeaderEpochs().keySet()); +} + +// No need to update the log-start-offset as these epochs/offsets are earlier to that value. +return isSegmentDeleted; +} + +private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata segmentMetadata, Predicate predicate) +throws RemoteStorageException, ExecutionException, InterruptedException { +if (predicate.test(segmentMetadata)) { +// Publish delete segment started event. +remoteLogMetadataManager.updateRemoteLogSegmentMetadata( +new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(), + RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)).get(); + +// Delete the segment in remote storage. + remoteLogStorageManager.deleteLogSegmentData(segmentMetadata); + +// Publish delete segment finished event. +remoteLogMe
[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
satishd commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1181520975 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -581,11 +588,18 @@ public void run() { if (isLeader()) { // Copy log segments to remote storage copyLogSegmentsToRemote(); +// Cleanup/delete expired remote log segments +cleanupExpiredRemoteLogSegments(); Review Comment: This is different from local log deletion. It requires the deletion of segments from local storage which need to really delete the files. But incase of remote storages, it does not wait for the data to be deleted but it marks the file or object for deletion in their respective metadata stores. Respective garbage collectors in those storages will take care of deleting the data asynchronously. There is no perf impact for these delete calls as they take a much shorter time than copying segments. It is very unlikely that copying segments get affected because of the deletion of segments. Deletion checks are happening in every iteration so there will not be many segments that need to be deleted. Anyways, we can discuss this separately in a separate JIRA. On another note, all this logic will go to UnifiedLog in future. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
satishd commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1181520975 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -581,11 +588,18 @@ public void run() { if (isLeader()) { // Copy log segments to remote storage copyLogSegmentsToRemote(); +// Cleanup/delete expired remote log segments +cleanupExpiredRemoteLogSegments(); Review Comment: This is different from local log deletion. It requires the deletion of segments from local storage which need to really delete the files. B But incase of remote storages, it does not wait for the data to be deleted but it marks the file or object for deletion in their respective metadata stores. Respective garbage collectors in those storages will take care of deleting the data asynchronously. There is no perf impact for these delete calls as they take a much shorter time than copying segments. It is very unlikely that copying segments get affected because of the deletion of segments. Deletion checks are happening in every iteration so there will not be many segments that need to be deleted. Anyways, we can discuss this separately in a separate JIRA. On another note, all this logic will go to UnifiedLog in future. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
satishd commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1181532348 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -595,6 +609,193 @@ public void run() { } } +public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) { +logger.debug("Updating $topicPartition with remoteLogStartOffset: {}", remoteLogStartOffset); +updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset); +} + +class RemoteLogRetentionHandler { + +private long remainingBreachedSize = 0L; +private OptionalLong logStartOffset = OptionalLong.empty(); + +public RemoteLogRetentionHandler(long remainingBreachedSize) { +this.remainingBreachedSize = remainingBreachedSize; +} + +private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata, boolean checkSizeRetention) throws RemoteStorageException, ExecutionException, InterruptedException { +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> { +// Assumption that segments contain size > 0 +if (checkSizeRetention && remainingBreachedSize > 0) { +remainingBreachedSize -= x.segmentSizeInBytes(); +return remainingBreachedSize >= 0; +} else return false; +}); +if (isSegmentDeleted) { +logStartOffset = OptionalLong.of(metadata.endOffset() + 1); +logger.info("Deleted remote log segment ${metadata.remoteLogSegmentId()} due to retention size " + +"${log.config.retentionSize} breach. Log size after deletion will be " + +"${remainingBreachedSize + log.config.retentionSize}."); +} +return isSegmentDeleted; +} + +private boolean deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long startOffset) +throws RemoteStorageException, ExecutionException, InterruptedException { +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> startOffset > x.endOffset()); +if (isSegmentDeleted) { +remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); +logger.info("Deleted remote log segment {} due to log start offset {} breach", metadata.remoteLogSegmentId(), startOffset); +} + +// No need to update the logStartOffset. +return isSegmentDeleted; +} + +// There are two cases: +// 1) When there are offline partitions and a new replica with empty disk is brought as leader, then the +//leader-epoch gets bumped but the log-start-offset gets truncated back to 0. +// 2) To remove the unreferenced segments. +private boolean deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException { +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> + x.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch < earliestEpochEntry.epoch)); +if (isSegmentDeleted) { +logger.info("Deleted remote log segment ${} due to leader epoch cache truncation. Current earliest epoch: {}, segmentEndOffset: {} and segmentEpochs: {}", +metadata.remoteLogSegmentId(), earliestEpochEntry, metadata.endOffset(), metadata.segmentLeaderEpochs().keySet()); +} + +// No need to update the log-start-offset as these epochs/offsets are earlier to that value. +return isSegmentDeleted; +} + +private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata segmentMetadata, Predicate predicate) +throws RemoteStorageException, ExecutionException, InterruptedException { +if (predicate.test(segmentMetadata)) { +// Publish delete segment started event. +remoteLogMetadataManager.updateRemoteLogSegmentMetadata( +new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(), + RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)).get(); + +// Delete the segment in remote storage. + remoteLogStorageManager.deleteLogSegmentData(segmentMetadata); + +// Publish delete segment finished event. +remoteLogMe
[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
satishd commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1181529149 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -595,6 +609,193 @@ public void run() { } } +public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) { +logger.debug("Updating $topicPartition with remoteLogStartOffset: {}", remoteLogStartOffset); +updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset); +} + +class RemoteLogRetentionHandler { + +private long remainingBreachedSize = 0L; +private OptionalLong logStartOffset = OptionalLong.empty(); + +public RemoteLogRetentionHandler(long remainingBreachedSize) { +this.remainingBreachedSize = remainingBreachedSize; +} + +private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata, boolean checkSizeRetention) throws RemoteStorageException, ExecutionException, InterruptedException { +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> { +// Assumption that segments contain size > 0 +if (checkSizeRetention && remainingBreachedSize > 0) { +remainingBreachedSize -= x.segmentSizeInBytes(); +return remainingBreachedSize >= 0; +} else return false; +}); +if (isSegmentDeleted) { +logStartOffset = OptionalLong.of(metadata.endOffset() + 1); +logger.info("Deleted remote log segment ${metadata.remoteLogSegmentId()} due to retention size " + +"${log.config.retentionSize} breach. Log size after deletion will be " + +"${remainingBreachedSize + log.config.retentionSize}."); +} +return isSegmentDeleted; +} + +private boolean deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long startOffset) +throws RemoteStorageException, ExecutionException, InterruptedException { +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> startOffset > x.endOffset()); +if (isSegmentDeleted) { +remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); +logger.info("Deleted remote log segment {} due to log start offset {} breach", metadata.remoteLogSegmentId(), startOffset); +} + +// No need to update the logStartOffset. +return isSegmentDeleted; +} + +// There are two cases: +// 1) When there are offline partitions and a new replica with empty disk is brought as leader, then the +//leader-epoch gets bumped but the log-start-offset gets truncated back to 0. +// 2) To remove the unreferenced segments. +private boolean deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException { +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> + x.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch < earliestEpochEntry.epoch)); +if (isSegmentDeleted) { +logger.info("Deleted remote log segment ${} due to leader epoch cache truncation. Current earliest epoch: {}, segmentEndOffset: {} and segmentEpochs: {}", +metadata.remoteLogSegmentId(), earliestEpochEntry, metadata.endOffset(), metadata.segmentLeaderEpochs().keySet()); +} + +// No need to update the log-start-offset as these epochs/offsets are earlier to that value. +return isSegmentDeleted; +} + +private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata segmentMetadata, Predicate predicate) +throws RemoteStorageException, ExecutionException, InterruptedException { +if (predicate.test(segmentMetadata)) { +// Publish delete segment started event. +remoteLogMetadataManager.updateRemoteLogSegmentMetadata( +new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(), + RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)).get(); + +// Delete the segment in remote storage. + remoteLogStorageManager.deleteLogSegmentData(segmentMetadata); + +// Publish delete segment finished event. +remoteLogMe
[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
satishd commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1181528820 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -595,6 +609,193 @@ public void run() { } } +public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) { +logger.debug("Updating $topicPartition with remoteLogStartOffset: {}", remoteLogStartOffset); +updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset); +} + +class RemoteLogRetentionHandler { + +private long remainingBreachedSize = 0L; +private OptionalLong logStartOffset = OptionalLong.empty(); + +public RemoteLogRetentionHandler(long remainingBreachedSize) { +this.remainingBreachedSize = remainingBreachedSize; +} + +private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata, boolean checkSizeRetention) throws RemoteStorageException, ExecutionException, InterruptedException { +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> { +// Assumption that segments contain size > 0 +if (checkSizeRetention && remainingBreachedSize > 0) { +remainingBreachedSize -= x.segmentSizeInBytes(); +return remainingBreachedSize >= 0; +} else return false; +}); +if (isSegmentDeleted) { +logStartOffset = OptionalLong.of(metadata.endOffset() + 1); +logger.info("Deleted remote log segment ${metadata.remoteLogSegmentId()} due to retention size " + +"${log.config.retentionSize} breach. Log size after deletion will be " + +"${remainingBreachedSize + log.config.retentionSize}."); +} +return isSegmentDeleted; +} + +private boolean deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long startOffset) +throws RemoteStorageException, ExecutionException, InterruptedException { +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> startOffset > x.endOffset()); +if (isSegmentDeleted) { +remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); +logger.info("Deleted remote log segment {} due to log start offset {} breach", metadata.remoteLogSegmentId(), startOffset); +} + +// No need to update the logStartOffset. +return isSegmentDeleted; +} + +// There are two cases: +// 1) When there are offline partitions and a new replica with empty disk is brought as leader, then the +//leader-epoch gets bumped but the log-start-offset gets truncated back to 0. +// 2) To remove the unreferenced segments. Review Comment: Unreferenced segments within the current leader epoch chain will eventually move earlier to the earliest epoch of the current leader epoch chain after a few retention checks. That will take care of those kinds of segments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
satishd commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1181527119 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -595,6 +609,193 @@ public void run() { } } +public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) { +logger.debug("Updating $topicPartition with remoteLogStartOffset: {}", remoteLogStartOffset); +updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset); +} + +class RemoteLogRetentionHandler { + +private long remainingBreachedSize = 0L; +private OptionalLong logStartOffset = OptionalLong.empty(); + +public RemoteLogRetentionHandler(long remainingBreachedSize) { +this.remainingBreachedSize = remainingBreachedSize; +} + +private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata, boolean checkSizeRetention) throws RemoteStorageException, ExecutionException, InterruptedException { +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> { +// Assumption that segments contain size > 0 +if (checkSizeRetention && remainingBreachedSize > 0) { +remainingBreachedSize -= x.segmentSizeInBytes(); +return remainingBreachedSize >= 0; +} else return false; +}); +if (isSegmentDeleted) { +logStartOffset = OptionalLong.of(metadata.endOffset() + 1); +logger.info("Deleted remote log segment ${metadata.remoteLogSegmentId()} due to retention size " + +"${log.config.retentionSize} breach. Log size after deletion will be " + +"${remainingBreachedSize + log.config.retentionSize}."); +} +return isSegmentDeleted; +} + +private boolean deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long startOffset) +throws RemoteStorageException, ExecutionException, InterruptedException { +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> startOffset > x.endOffset()); +if (isSegmentDeleted) { +remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); +logger.info("Deleted remote log segment {} due to log start offset {} breach", metadata.remoteLogSegmentId(), startOffset); +} + +// No need to update the logStartOffset. +return isSegmentDeleted; +} + +// There are two cases: +// 1) When there are offline partitions and a new replica with empty disk is brought as leader, then the +//leader-epoch gets bumped but the log-start-offset gets truncated back to 0. +// 2) To remove the unreferenced segments. +private boolean deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException { +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> + x.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch < earliestEpochEntry.epoch)); +if (isSegmentDeleted) { +logger.info("Deleted remote log segment ${} due to leader epoch cache truncation. Current earliest epoch: {}, segmentEndOffset: {} and segmentEpochs: {}", +metadata.remoteLogSegmentId(), earliestEpochEntry, metadata.endOffset(), metadata.segmentLeaderEpochs().keySet()); +} + +// No need to update the log-start-offset as these epochs/offsets are earlier to that value. +return isSegmentDeleted; +} + +private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata segmentMetadata, Predicate predicate) +throws RemoteStorageException, ExecutionException, InterruptedException { +if (predicate.test(segmentMetadata)) { +// Publish delete segment started event. +remoteLogMetadataManager.updateRemoteLogSegmentMetadata( +new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(), + RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)).get(); + +// Delete the segment in remote storage. + remoteLogStorageManager.deleteLogSegmentData(segmentMetadata); + +// Publish delete segment finished event. +remoteLogMe
[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
satishd commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1181526976 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -595,6 +609,193 @@ public void run() { } } +public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) { +logger.debug("Updating $topicPartition with remoteLogStartOffset: {}", remoteLogStartOffset); +updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset); +} + +class RemoteLogRetentionHandler { + +private long remainingBreachedSize = 0L; +private OptionalLong logStartOffset = OptionalLong.empty(); + +public RemoteLogRetentionHandler(long remainingBreachedSize) { +this.remainingBreachedSize = remainingBreachedSize; +} + +private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata, boolean checkSizeRetention) throws RemoteStorageException, ExecutionException, InterruptedException { +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> { +// Assumption that segments contain size > 0 +if (checkSizeRetention && remainingBreachedSize > 0) { +remainingBreachedSize -= x.segmentSizeInBytes(); +return remainingBreachedSize >= 0; +} else return false; +}); +if (isSegmentDeleted) { +logStartOffset = OptionalLong.of(metadata.endOffset() + 1); +logger.info("Deleted remote log segment ${metadata.remoteLogSegmentId()} due to retention size " + +"${log.config.retentionSize} breach. Log size after deletion will be " + +"${remainingBreachedSize + log.config.retentionSize}."); +} +return isSegmentDeleted; +} + +private boolean deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long startOffset) +throws RemoteStorageException, ExecutionException, InterruptedException { +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> startOffset > x.endOffset()); +if (isSegmentDeleted) { +remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); +logger.info("Deleted remote log segment {} due to log start offset {} breach", metadata.remoteLogSegmentId(), startOffset); +} + +// No need to update the logStartOffset. +return isSegmentDeleted; +} + +// There are two cases: +// 1) When there are offline partitions and a new replica with empty disk is brought as leader, then the +//leader-epoch gets bumped but the log-start-offset gets truncated back to 0. +// 2) To remove the unreferenced segments. +private boolean deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException { +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> + x.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch < earliestEpochEntry.epoch)); +if (isSegmentDeleted) { +logger.info("Deleted remote log segment ${} due to leader epoch cache truncation. Current earliest epoch: {}, segmentEndOffset: {} and segmentEpochs: {}", +metadata.remoteLogSegmentId(), earliestEpochEntry, metadata.endOffset(), metadata.segmentLeaderEpochs().keySet()); +} + +// No need to update the log-start-offset as these epochs/offsets are earlier to that value. +return isSegmentDeleted; +} + +private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata segmentMetadata, Predicate predicate) +throws RemoteStorageException, ExecutionException, InterruptedException { +if (predicate.test(segmentMetadata)) { +// Publish delete segment started event. +remoteLogMetadataManager.updateRemoteLogSegmentMetadata( +new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(), + RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)).get(); + +// Delete the segment in remote storage. + remoteLogStorageManager.deleteLogSegmentData(segmentMetadata); + +// Publish delete segment finished event. +remoteLogMe
[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
satishd commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1181520975 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -581,11 +588,18 @@ public void run() { if (isLeader()) { // Copy log segments to remote storage copyLogSegmentsToRemote(); +// Cleanup/delete expired remote log segments +cleanupExpiredRemoteLogSegments(); Review Comment: This is different from local log deletion. It requires the deletion of segments from local storage which need to really delete the files. But incase of remote storages, it does not wait for the data to be deleted but it marks the file or object for deletion in their respective metadata stores. Respective garbage collectors in those storages will take care of deleting the data asynchronously. There is no perf impact for these delete calls as they take much shorter time than copying segments. It is very unlikely that copying segments get affected because of deletion of segments. Deletion checks are happening in every iteration so there will not be many segments that need to be deleted. Anyways, we can discuss this separately in a separate JIRA. On another note, all this logic will go to UnifiedLog in future. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org