[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-08-28 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1308153773


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -761,11 +784,385 @@ public void run() {
 }
 }
 
+public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
+if (isLeader()) {
+logger.debug("Updating {} with remoteLogStartOffset: {}", 
topicPartition, remoteLogStartOffset);
+updateRemoteLogStartOffset.accept(topicPartition, 
remoteLogStartOffset);
+}
+}
+
+class RemoteLogRetentionHandler {
+
+private final Optional retentionSizeData;
+private final Optional retentionTimeData;
+
+private long remainingBreachedSize;
+
+private OptionalLong logStartOffset = OptionalLong.empty();
+
+public RemoteLogRetentionHandler(Optional 
retentionSizeData, Optional retentionTimeData) {
+this.retentionSizeData = retentionSizeData;
+this.retentionTimeData = retentionTimeData;
+remainingBreachedSize = retentionSizeData.map(sizeData -> 
sizeData.remainingBreachedSize).orElse(0L);
+}
+
+private boolean 
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws 
RemoteStorageException, ExecutionException, InterruptedException {
+if (!retentionSizeData.isPresent()) {
+return false;
+}
+
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> {
+// Assumption that segments contain size >= 0
+if (remainingBreachedSize > 0) {
+long remainingBytes = remainingBreachedSize - 
x.segmentSizeInBytes();
+if (remainingBytes >= 0) {
+remainingBreachedSize = remainingBytes;
+return true;
+}
+}
+
+return false;
+});
+if (isSegmentDeleted) {
+logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+logger.info("Deleted remote log segment {} due to 
retention size {} breach. Log size after deletion will be {}.",
+metadata.remoteLogSegmentId(), 
retentionSizeData.get().retentionSize, remainingBreachedSize + 
retentionSizeData.get().retentionSize);
+}
+return isSegmentDeleted;
+}
+
+public boolean 
deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+if (!retentionTimeData.isPresent()) {
+return false;
+}
+
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata,
+x -> x.maxTimestampMs() <= 
retentionTimeData.get().cleanupUntilMs);
+if (isSegmentDeleted) {
+remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+// It is fine to have logStartOffset as 
`metadata.endOffset() + 1` as the segment offset intervals
+// are ascending with in an epoch.
+logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+logger.info("Deleted remote log segment {} due to 
retention time {}ms breach based on the largest record timestamp in the 
segment",
+metadata.remoteLogSegmentId(), 
retentionTimeData.get().retentionMs);
+}
+return isSegmentDeleted;
+}
+
+private boolean 
deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long 
startOffset)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> startOffset > x.endOffset());
+if (isSegmentDeleted && retentionSizeData.isPresent()) {
+remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+logger.info("Deleted remote log segment {} due to log 
start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+}
+
+return isSegmentDeleted;
+}
+
+// It removes the segments beyond the current leader's earliest 
epoch. Those segments are considered as
+// unreferenced because they are not part of the current leader 
epoch lineage.
+private boolean 
deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, 
RemoteLogSegmentMetadata metadata) throws RemoteStorageExcept

[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-08-28 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1308153773


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -761,11 +784,385 @@ public void run() {
 }
 }
 
+public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
+if (isLeader()) {
+logger.debug("Updating {} with remoteLogStartOffset: {}", 
topicPartition, remoteLogStartOffset);
+updateRemoteLogStartOffset.accept(topicPartition, 
remoteLogStartOffset);
+}
+}
+
+class RemoteLogRetentionHandler {
+
+private final Optional retentionSizeData;
+private final Optional retentionTimeData;
+
+private long remainingBreachedSize;
+
+private OptionalLong logStartOffset = OptionalLong.empty();
+
+public RemoteLogRetentionHandler(Optional 
retentionSizeData, Optional retentionTimeData) {
+this.retentionSizeData = retentionSizeData;
+this.retentionTimeData = retentionTimeData;
+remainingBreachedSize = retentionSizeData.map(sizeData -> 
sizeData.remainingBreachedSize).orElse(0L);
+}
+
+private boolean 
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws 
RemoteStorageException, ExecutionException, InterruptedException {
+if (!retentionSizeData.isPresent()) {
+return false;
+}
+
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> {
+// Assumption that segments contain size >= 0
+if (remainingBreachedSize > 0) {
+long remainingBytes = remainingBreachedSize - 
x.segmentSizeInBytes();
+if (remainingBytes >= 0) {
+remainingBreachedSize = remainingBytes;
+return true;
+}
+}
+
+return false;
+});
+if (isSegmentDeleted) {
+logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+logger.info("Deleted remote log segment {} due to 
retention size {} breach. Log size after deletion will be {}.",
+metadata.remoteLogSegmentId(), 
retentionSizeData.get().retentionSize, remainingBreachedSize + 
retentionSizeData.get().retentionSize);
+}
+return isSegmentDeleted;
+}
+
+public boolean 
deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+if (!retentionTimeData.isPresent()) {
+return false;
+}
+
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata,
+x -> x.maxTimestampMs() <= 
retentionTimeData.get().cleanupUntilMs);
+if (isSegmentDeleted) {
+remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+// It is fine to have logStartOffset as 
`metadata.endOffset() + 1` as the segment offset intervals
+// are ascending with in an epoch.
+logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+logger.info("Deleted remote log segment {} due to 
retention time {}ms breach based on the largest record timestamp in the 
segment",
+metadata.remoteLogSegmentId(), 
retentionTimeData.get().retentionMs);
+}
+return isSegmentDeleted;
+}
+
+private boolean 
deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long 
startOffset)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> startOffset > x.endOffset());
+if (isSegmentDeleted && retentionSizeData.isPresent()) {
+remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+logger.info("Deleted remote log segment {} due to log 
start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+}
+
+return isSegmentDeleted;
+}
+
+// It removes the segments beyond the current leader's earliest 
epoch. Those segments are considered as
+// unreferenced because they are not part of the current leader 
epoch lineage.
+private boolean 
deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, 
RemoteLogSegmentMetadata metadata) throws RemoteStorageExcept

[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-08-24 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1304597735


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -696,11 +704,327 @@ public void run() {
 }
 }
 
+public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
+if (isLeader()) {
+logger.debug("Updating {} with remoteLogStartOffset: {}", 
topicPartition, remoteLogStartOffset);
+updateRemoteLogStartOffset.accept(topicPartition, 
remoteLogStartOffset);
+}
+}
+
+class RemoteLogRetentionHandler {
+
+private final Optional retentionSizeData;
+private final Optional retentionTimeData;
+
+private long remainingBreachedSize;
+
+private OptionalLong logStartOffset = OptionalLong.empty();
+
+public RemoteLogRetentionHandler(Optional 
retentionSizeData, Optional retentionTimeData) {
+this.retentionSizeData = retentionSizeData;
+this.retentionTimeData = retentionTimeData;
+remainingBreachedSize = retentionSizeData.map(sizeData -> 
sizeData.remainingBreachedSize).orElse(0L);
+}
+
+private boolean 
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws 
RemoteStorageException, ExecutionException, InterruptedException {
+if (!retentionSizeData.isPresent()) {
+return false;
+}
+
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> {
+// Assumption that segments contain size >= 0
+if (remainingBreachedSize > 0) {
+long remainingBytes = remainingBreachedSize - 
x.segmentSizeInBytes();
+if (remainingBytes >= 0) {
+remainingBreachedSize = remainingBytes;
+return true;
+}
+}
+
+return false;
+});
+if (isSegmentDeleted) {
+logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+logger.info("Deleted remote log segment {} due to 
retention size {} breach. Log size after deletion will be {}.",
+metadata.remoteLogSegmentId(), 
retentionSizeData.get().retentionSize, remainingBreachedSize + 
retentionSizeData.get().retentionSize);
+}
+return isSegmentDeleted;
+}
+
+public boolean 
deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+if (!retentionTimeData.isPresent()) {
+return false;
+}
+
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata,
+x -> x.maxTimestampMs() <= 
retentionTimeData.get().cleanupUntilMs);
+if (isSegmentDeleted) {
+remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+// It is fine to have logStartOffset as 
`metadata.endOffset() + 1` as the segment offset intervals
+// are ascending with in an epoch.
+logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+logger.info("Deleted remote log segment {} due to 
retention time {}ms breach based on the largest record timestamp in the 
segment",
+metadata.remoteLogSegmentId(), 
retentionTimeData.get().retentionMs);
+}
+return isSegmentDeleted;
+}
+
+private boolean 
deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long 
startOffset)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> startOffset > x.endOffset());
+if (isSegmentDeleted && retentionSizeData.isPresent()) {
+remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+logger.info("Deleted remote log segment {} due to log 
start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+}
+
+return isSegmentDeleted;
+}
+
+// It removes the segments beyond the current leader's earliest 
epoch. Those segments are considered as
+// unreferenced because they are not part of the current leader 
epoch lineage.
+private boolean 
deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, 
RemoteLogSegmentMetadata metadata) throws RemoteStorageExcept

[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-08-22 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1301361619


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -1096,6 +1493,43 @@ public void close() {
 }
 }
 
+// Visible for testing
+public static class RetentionSizeData {
+private final long retentionSize;
+private final long remainingBreachedSize;
+
+public RetentionSizeData(long retentionSize, long 
remainingBreachedSize) {
+if (retentionSize < 0)
+throw new IllegalArgumentException("retentionSize should be 
non negative, but it is " + retentionSize);
+
+if (remainingBreachedSize <= 0) {
+throw new IllegalArgumentException("remainingBreachedSize 
should be more than zero, but it is " + remainingBreachedSize);
+}
+
+this.retentionSize = retentionSize;
+this.remainingBreachedSize = remainingBreachedSize;
+}
+}
+
+// Visible for testing
+public static class RetentionTimeData {
+
+private final long retentionMs;
+private final long cleanupUntilMs;
+
+public RetentionTimeData(long retentionMs, long cleanupUntilMs) {
+if (retentionMs < 0)
+throw new IllegalArgumentException("retentionMs should be non 
negative, but it is " + retentionMs);
+
+if (retentionMs < cleanupUntilMs) {

Review Comment:
   Right, I fixed the validation check. Thanks. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-08-21 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1300176481


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -761,11 +784,385 @@ public void run() {
 }
 }
 
+public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
+if (isLeader()) {
+logger.debug("Updating {} with remoteLogStartOffset: {}", 
topicPartition, remoteLogStartOffset);
+updateRemoteLogStartOffset.accept(topicPartition, 
remoteLogStartOffset);
+}
+}
+
+class RemoteLogRetentionHandler {
+
+private final Optional retentionSizeData;
+private final Optional retentionTimeData;
+
+private long remainingBreachedSize;
+
+private OptionalLong logStartOffset = OptionalLong.empty();
+
+public RemoteLogRetentionHandler(Optional 
retentionSizeData, Optional retentionTimeData) {
+this.retentionSizeData = retentionSizeData;
+this.retentionTimeData = retentionTimeData;
+remainingBreachedSize = retentionSizeData.map(sizeData -> 
sizeData.remainingBreachedSize).orElse(0L);
+}
+
+private boolean 
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws 
RemoteStorageException, ExecutionException, InterruptedException {
+if (!retentionSizeData.isPresent()) {
+return false;
+}
+
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> {
+// Assumption that segments contain size >= 0
+if (remainingBreachedSize > 0) {
+long remainingBytes = remainingBreachedSize - 
x.segmentSizeInBytes();
+if (remainingBytes >= 0) {
+remainingBreachedSize = remainingBytes;
+return true;
+}
+}
+
+return false;
+});
+if (isSegmentDeleted) {
+logStartOffset = OptionalLong.of(metadata.endOffset() + 1);

Review Comment:
   Good point. I think we discussed this earlier also. Let us address this in a 
followup PR covering topics changing their retention from compaction to delete 
only retention. Filed https://issues.apache.org/jira/browse/KAFKA-15388



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-08-21 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1300279331


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -761,11 +784,385 @@ public void run() {
 }
 }
 
+public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
+if (isLeader()) {
+logger.debug("Updating {} with remoteLogStartOffset: {}", 
topicPartition, remoteLogStartOffset);
+updateRemoteLogStartOffset.accept(topicPartition, 
remoteLogStartOffset);
+}
+}
+
+class RemoteLogRetentionHandler {
+
+private final Optional retentionSizeData;
+private final Optional retentionTimeData;
+
+private long remainingBreachedSize;
+
+private OptionalLong logStartOffset = OptionalLong.empty();
+
+public RemoteLogRetentionHandler(Optional 
retentionSizeData, Optional retentionTimeData) {
+this.retentionSizeData = retentionSizeData;
+this.retentionTimeData = retentionTimeData;
+remainingBreachedSize = retentionSizeData.map(sizeData -> 
sizeData.remainingBreachedSize).orElse(0L);
+}
+
+private boolean 
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws 
RemoteStorageException, ExecutionException, InterruptedException {
+if (!retentionSizeData.isPresent()) {
+return false;
+}
+
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> {
+// Assumption that segments contain size >= 0
+if (remainingBreachedSize > 0) {
+long remainingBytes = remainingBreachedSize - 
x.segmentSizeInBytes();
+if (remainingBytes >= 0) {
+remainingBreachedSize = remainingBytes;
+return true;
+}
+}
+
+return false;
+});
+if (isSegmentDeleted) {
+logStartOffset = OptionalLong.of(metadata.endOffset() + 1);

Review Comment:
   Yes, this is planned for 3.6.0. I did not want to block this PR with that as 
we want to unblock other dependent PRs, especially integration test PRs.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-08-21 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1300178242


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -1096,6 +1493,43 @@ public void close() {
 }
 }
 
+// Visible for testing
+public static class RetentionSizeData {
+private final long retentionSize;
+private final long remainingBreachedSize;
+
+public RetentionSizeData(long retentionSize, long 
remainingBreachedSize) {
+if (retentionSize < 0)
+throw new IllegalArgumentException("retentionSize should be 
non negative, but it is " + retentionSize);
+
+if (remainingBreachedSize <= 0) {
+throw new IllegalArgumentException("remainingBreachedSize 
should be more than zero, but it is " + remainingBreachedSize);
+}
+
+this.retentionSize = retentionSize;
+this.remainingBreachedSize = remainingBreachedSize;
+}
+}
+
+// Visible for testing
+public static class RetentionTimeData {
+
+private final long retentionMs;
+private final long cleanupUntilMs;
+
+public RetentionTimeData(long retentionMs, long cleanupUntilMs) {
+if (retentionMs < 0)
+throw new IllegalArgumentException("retentionMs should be non 
negative, but it is " + retentionMs);
+
+if (retentionMs < cleanupUntilMs) {

Review Comment:
   This check will be true when using system time. But added this defensive 
check if we have tests setting the mock time to set any long values.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-08-21 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1300176481


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -761,11 +784,385 @@ public void run() {
 }
 }
 
+public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
+if (isLeader()) {
+logger.debug("Updating {} with remoteLogStartOffset: {}", 
topicPartition, remoteLogStartOffset);
+updateRemoteLogStartOffset.accept(topicPartition, 
remoteLogStartOffset);
+}
+}
+
+class RemoteLogRetentionHandler {
+
+private final Optional retentionSizeData;
+private final Optional retentionTimeData;
+
+private long remainingBreachedSize;
+
+private OptionalLong logStartOffset = OptionalLong.empty();
+
+public RemoteLogRetentionHandler(Optional 
retentionSizeData, Optional retentionTimeData) {
+this.retentionSizeData = retentionSizeData;
+this.retentionTimeData = retentionTimeData;
+remainingBreachedSize = retentionSizeData.map(sizeData -> 
sizeData.remainingBreachedSize).orElse(0L);
+}
+
+private boolean 
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws 
RemoteStorageException, ExecutionException, InterruptedException {
+if (!retentionSizeData.isPresent()) {
+return false;
+}
+
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> {
+// Assumption that segments contain size >= 0
+if (remainingBreachedSize > 0) {
+long remainingBytes = remainingBreachedSize - 
x.segmentSizeInBytes();
+if (remainingBytes >= 0) {
+remainingBreachedSize = remainingBytes;
+return true;
+}
+}
+
+return false;
+});
+if (isSegmentDeleted) {
+logStartOffset = OptionalLong.of(metadata.endOffset() + 1);

Review Comment:
   Good point. I think we discussed this earlier scenario. Let us address this 
in a followup PR covering topics changing their retention from compaction to 
delete only retention. Filed https://issues.apache.org/jira/browse/KAFKA-15388



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-08-20 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1299640804


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -696,11 +704,327 @@ public void run() {
 }
 }
 
+public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
+if (isLeader()) {
+logger.debug("Updating {} with remoteLogStartOffset: {}", 
topicPartition, remoteLogStartOffset);
+updateRemoteLogStartOffset.accept(topicPartition, 
remoteLogStartOffset);
+}
+}
+
+class RemoteLogRetentionHandler {
+
+private final Optional retentionSizeData;
+private final Optional retentionTimeData;
+
+private long remainingBreachedSize;
+
+private OptionalLong logStartOffset = OptionalLong.empty();
+
+public RemoteLogRetentionHandler(Optional 
retentionSizeData, Optional retentionTimeData) {
+this.retentionSizeData = retentionSizeData;
+this.retentionTimeData = retentionTimeData;
+remainingBreachedSize = retentionSizeData.map(sizeData -> 
sizeData.remainingBreachedSize).orElse(0L);
+}
+
+private boolean 
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws 
RemoteStorageException, ExecutionException, InterruptedException {
+if (!retentionSizeData.isPresent()) {
+return false;
+}
+
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> {
+// Assumption that segments contain size >= 0
+if (remainingBreachedSize > 0) {
+long remainingBytes = remainingBreachedSize - 
x.segmentSizeInBytes();
+if (remainingBytes >= 0) {
+remainingBreachedSize = remainingBytes;
+return true;
+}
+}
+
+return false;
+});
+if (isSegmentDeleted) {
+logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+logger.info("Deleted remote log segment {} due to 
retention size {} breach. Log size after deletion will be {}.",
+metadata.remoteLogSegmentId(), 
retentionSizeData.get().retentionSize, remainingBreachedSize + 
retentionSizeData.get().retentionSize);
+}
+return isSegmentDeleted;
+}
+
+public boolean 
deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+if (!retentionTimeData.isPresent()) {
+return false;
+}
+
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata,
+x -> x.maxTimestampMs() <= 
retentionTimeData.get().cleanupUntilMs);
+if (isSegmentDeleted) {
+remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+// It is fine to have logStartOffset as 
`metadata.endOffset() + 1` as the segment offset intervals
+// are ascending with in an epoch.
+logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+logger.info("Deleted remote log segment {} due to 
retention time {}ms breach based on the largest record timestamp in the 
segment",
+metadata.remoteLogSegmentId(), 
retentionTimeData.get().retentionMs);
+}
+return isSegmentDeleted;
+}
+
+private boolean 
deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long 
startOffset)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> startOffset > x.endOffset());
+if (isSegmentDeleted && retentionSizeData.isPresent()) {
+remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+logger.info("Deleted remote log segment {} due to log 
start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+}
+
+return isSegmentDeleted;
+}
+
+// It removes the segments beyond the current leader's earliest 
epoch. Those segments are considered as
+// unreferenced because they are not part of the current leader 
epoch lineage.
+private boolean 
deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, 
RemoteLogSegmentMetadata metadata) throws RemoteStorageExcept

[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-08-19 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1299185257


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -698,11 +707,329 @@ public void run() {
 }
 }
 
+public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
+if (isLeader()) {
+logger.debug("Updating {} with remoteLogStartOffset: {}", 
topicPartition, remoteLogStartOffset);
+updateRemoteLogStartOffset.accept(topicPartition, 
remoteLogStartOffset);
+}
+}
+
+class RemoteLogRetentionHandler {
+
+private final Optional retentionSizeData;
+private final Optional retentionTimeData;
+
+private long remainingBreachedSize;
+
+private OptionalLong logStartOffset = OptionalLong.empty();
+
+public RemoteLogRetentionHandler(Optional 
retentionSizeData, Optional retentionTimeData) {
+this.retentionSizeData = retentionSizeData;
+this.retentionTimeData = retentionTimeData;
+remainingBreachedSize = retentionSizeData.map(sizeData -> 
sizeData.remainingBreachedSize).orElse(0L);
+}
+
+private boolean 
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws 
RemoteStorageException, ExecutionException, InterruptedException {
+if (!retentionSizeData.isPresent()) {
+return false;
+}
+
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> {
+// Assumption that segments contain size >= 0
+if (remainingBreachedSize > 0) {
+long remainingBytes = remainingBreachedSize - 
x.segmentSizeInBytes();
+if (remainingBytes >= 0) {
+remainingBreachedSize = remainingBytes;
+return true;
+}
+}
+
+return false;
+});
+if (isSegmentDeleted) {
+logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+logger.info("Deleted remote log segment {} due to 
retention size {} breach. Log size after deletion will be {}.",
+metadata.remoteLogSegmentId(), 
retentionSizeData.get().retentionSize, remainingBreachedSize + 
retentionSizeData.get().retentionSize);
+}
+return isSegmentDeleted;
+}
+
+public boolean 
deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+if (!retentionTimeData.isPresent()) {
+return false;
+}
+
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata,
+x -> x.maxTimestampMs() <= 
retentionTimeData.get().cleanupUntilMs);
+if (isSegmentDeleted) {
+remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+// It is fine to have logStartOffset as 
`metadata.endOffset() + 1` as the segment offset intervals
+// are ascending with in an epoch.
+logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+logger.info("Deleted remote log segment {} due to 
retention time {}ms breach based on the largest record timestamp in the 
segment",
+metadata.remoteLogSegmentId(), 
retentionTimeData.get().retentionMs);
+}
+return isSegmentDeleted;
+}
+
+private boolean 
deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long 
startOffset)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> startOffset > x.endOffset());
+if (isSegmentDeleted && retentionSizeData.isPresent()) {
+remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+logger.info("Deleted remote log segment {} due to log 
start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+}
+
+return isSegmentDeleted;
+}
+
+// It removes the segments beyond the current leader's earliest 
epoch. Those segments are considered as
+// unreferenced because they are not part of the current leader 
epoch lineage.
+private boolean 
deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, 
RemoteLogSegmentMetadata metadata) throws RemoteStorageExcept

[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-08-19 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1299184981


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -696,11 +704,327 @@ public void run() {
 }
 }
 
+public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
+if (isLeader()) {
+logger.debug("Updating {} with remoteLogStartOffset: {}", 
topicPartition, remoteLogStartOffset);
+updateRemoteLogStartOffset.accept(topicPartition, 
remoteLogStartOffset);
+}
+}
+
+class RemoteLogRetentionHandler {
+
+private final Optional retentionSizeData;
+private final Optional retentionTimeData;
+
+private long remainingBreachedSize;
+
+private OptionalLong logStartOffset = OptionalLong.empty();
+
+public RemoteLogRetentionHandler(Optional 
retentionSizeData, Optional retentionTimeData) {
+this.retentionSizeData = retentionSizeData;
+this.retentionTimeData = retentionTimeData;
+remainingBreachedSize = retentionSizeData.map(sizeData -> 
sizeData.remainingBreachedSize).orElse(0L);
+}
+
+private boolean 
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws 
RemoteStorageException, ExecutionException, InterruptedException {
+if (!retentionSizeData.isPresent()) {
+return false;
+}
+
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> {
+// Assumption that segments contain size >= 0
+if (remainingBreachedSize > 0) {
+long remainingBytes = remainingBreachedSize - 
x.segmentSizeInBytes();
+if (remainingBytes >= 0) {
+remainingBreachedSize = remainingBytes;
+return true;
+}
+}
+
+return false;
+});
+if (isSegmentDeleted) {
+logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+logger.info("Deleted remote log segment {} due to 
retention size {} breach. Log size after deletion will be {}.",
+metadata.remoteLogSegmentId(), 
retentionSizeData.get().retentionSize, remainingBreachedSize + 
retentionSizeData.get().retentionSize);
+}
+return isSegmentDeleted;
+}
+
+public boolean 
deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+if (!retentionTimeData.isPresent()) {
+return false;
+}
+
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata,
+x -> x.maxTimestampMs() <= 
retentionTimeData.get().cleanupUntilMs);
+if (isSegmentDeleted) {
+remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+// It is fine to have logStartOffset as 
`metadata.endOffset() + 1` as the segment offset intervals
+// are ascending with in an epoch.
+logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+logger.info("Deleted remote log segment {} due to 
retention time {}ms breach based on the largest record timestamp in the 
segment",
+metadata.remoteLogSegmentId(), 
retentionTimeData.get().retentionMs);
+}
+return isSegmentDeleted;
+}
+
+private boolean 
deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long 
startOffset)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> startOffset > x.endOffset());
+if (isSegmentDeleted && retentionSizeData.isPresent()) {
+remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+logger.info("Deleted remote log segment {} due to log 
start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+}
+
+return isSegmentDeleted;
+}
+
+// It removes the segments beyond the current leader's earliest 
epoch. Those segments are considered as
+// unreferenced because they are not part of the current leader 
epoch lineage.
+private boolean 
deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, 
RemoteLogSegmentMetadata metadata) throws RemoteStorageExcept

[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-08-17 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1298027732


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -696,11 +704,327 @@ public void run() {
 }
 }
 
+public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
+if (isLeader()) {
+logger.debug("Updating {} with remoteLogStartOffset: {}", 
topicPartition, remoteLogStartOffset);
+updateRemoteLogStartOffset.accept(topicPartition, 
remoteLogStartOffset);
+}
+}
+
+class RemoteLogRetentionHandler {
+
+private final Optional retentionSizeData;
+private final Optional retentionTimeData;
+
+private long remainingBreachedSize;
+
+private OptionalLong logStartOffset = OptionalLong.empty();
+
+public RemoteLogRetentionHandler(Optional 
retentionSizeData, Optional retentionTimeData) {
+this.retentionSizeData = retentionSizeData;
+this.retentionTimeData = retentionTimeData;
+remainingBreachedSize = retentionSizeData.map(sizeData -> 
sizeData.remainingBreachedSize).orElse(0L);
+}
+
+private boolean 
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws 
RemoteStorageException, ExecutionException, InterruptedException {
+if (!retentionSizeData.isPresent()) {
+return false;
+}
+
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> {
+// Assumption that segments contain size >= 0
+if (remainingBreachedSize > 0) {
+long remainingBytes = remainingBreachedSize - 
x.segmentSizeInBytes();
+if (remainingBytes >= 0) {
+remainingBreachedSize = remainingBytes;
+return true;
+}
+}
+
+return false;
+});
+if (isSegmentDeleted) {
+logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+logger.info("Deleted remote log segment {} due to 
retention size {} breach. Log size after deletion will be {}.",
+metadata.remoteLogSegmentId(), 
retentionSizeData.get().retentionSize, remainingBreachedSize + 
retentionSizeData.get().retentionSize);
+}
+return isSegmentDeleted;
+}
+
+public boolean 
deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+if (!retentionTimeData.isPresent()) {
+return false;
+}
+
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata,
+x -> x.maxTimestampMs() <= 
retentionTimeData.get().cleanupUntilMs);
+if (isSegmentDeleted) {
+remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+// It is fine to have logStartOffset as 
`metadata.endOffset() + 1` as the segment offset intervals
+// are ascending with in an epoch.
+logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+logger.info("Deleted remote log segment {} due to 
retention time {}ms breach based on the largest record timestamp in the 
segment",
+metadata.remoteLogSegmentId(), 
retentionTimeData.get().retentionMs);
+}
+return isSegmentDeleted;
+}
+
+private boolean 
deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long 
startOffset)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> startOffset > x.endOffset());
+if (isSegmentDeleted && retentionSizeData.isPresent()) {
+remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+logger.info("Deleted remote log segment {} due to log 
start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+}
+
+return isSegmentDeleted;
+}
+
+// It removes the segments beyond the current leader's earliest 
epoch. Those segments are considered as
+// unreferenced because they are not part of the current leader 
epoch lineage.
+private boolean 
deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, 
RemoteLogSegmentMetadata metadata) throws RemoteStorageExcept

[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-08-17 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1298029817


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -696,11 +704,327 @@ public void run() {
 }
 }
 
+public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
+if (isLeader()) {
+logger.debug("Updating {} with remoteLogStartOffset: {}", 
topicPartition, remoteLogStartOffset);
+updateRemoteLogStartOffset.accept(topicPartition, 
remoteLogStartOffset);
+}
+}
+
+class RemoteLogRetentionHandler {
+
+private final Optional retentionSizeData;
+private final Optional retentionTimeData;
+
+private long remainingBreachedSize;
+
+private OptionalLong logStartOffset = OptionalLong.empty();
+
+public RemoteLogRetentionHandler(Optional 
retentionSizeData, Optional retentionTimeData) {
+this.retentionSizeData = retentionSizeData;
+this.retentionTimeData = retentionTimeData;
+remainingBreachedSize = retentionSizeData.map(sizeData -> 
sizeData.remainingBreachedSize).orElse(0L);
+}
+
+private boolean 
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws 
RemoteStorageException, ExecutionException, InterruptedException {
+if (!retentionSizeData.isPresent()) {
+return false;
+}
+
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> {
+// Assumption that segments contain size >= 0
+if (remainingBreachedSize > 0) {
+long remainingBytes = remainingBreachedSize - 
x.segmentSizeInBytes();
+if (remainingBytes >= 0) {
+remainingBreachedSize = remainingBytes;
+return true;
+}
+}
+
+return false;
+});
+if (isSegmentDeleted) {
+logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+logger.info("Deleted remote log segment {} due to 
retention size {} breach. Log size after deletion will be {}.",
+metadata.remoteLogSegmentId(), 
retentionSizeData.get().retentionSize, remainingBreachedSize + 
retentionSizeData.get().retentionSize);
+}
+return isSegmentDeleted;
+}
+
+public boolean 
deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+if (!retentionTimeData.isPresent()) {
+return false;
+}
+
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata,
+x -> x.maxTimestampMs() <= 
retentionTimeData.get().cleanupUntilMs);
+if (isSegmentDeleted) {
+remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+// It is fine to have logStartOffset as 
`metadata.endOffset() + 1` as the segment offset intervals
+// are ascending with in an epoch.
+logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+logger.info("Deleted remote log segment {} due to 
retention time {}ms breach based on the largest record timestamp in the 
segment",
+metadata.remoteLogSegmentId(), 
retentionTimeData.get().retentionMs);
+}
+return isSegmentDeleted;
+}
+
+private boolean 
deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long 
startOffset)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> startOffset > x.endOffset());
+if (isSegmentDeleted && retentionSizeData.isPresent()) {
+remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+logger.info("Deleted remote log segment {} due to log 
start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+}
+
+return isSegmentDeleted;
+}
+
+// It removes the segments beyond the current leader's earliest 
epoch. Those segments are considered as
+// unreferenced because they are not part of the current leader 
epoch lineage.
+private boolean 
deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, 
RemoteLogSegmentMetadata metadata) throws RemoteStorageExcept

[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-08-17 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1298027732


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -696,11 +704,327 @@ public void run() {
 }
 }
 
+public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
+if (isLeader()) {
+logger.debug("Updating {} with remoteLogStartOffset: {}", 
topicPartition, remoteLogStartOffset);
+updateRemoteLogStartOffset.accept(topicPartition, 
remoteLogStartOffset);
+}
+}
+
+class RemoteLogRetentionHandler {
+
+private final Optional retentionSizeData;
+private final Optional retentionTimeData;
+
+private long remainingBreachedSize;
+
+private OptionalLong logStartOffset = OptionalLong.empty();
+
+public RemoteLogRetentionHandler(Optional 
retentionSizeData, Optional retentionTimeData) {
+this.retentionSizeData = retentionSizeData;
+this.retentionTimeData = retentionTimeData;
+remainingBreachedSize = retentionSizeData.map(sizeData -> 
sizeData.remainingBreachedSize).orElse(0L);
+}
+
+private boolean 
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws 
RemoteStorageException, ExecutionException, InterruptedException {
+if (!retentionSizeData.isPresent()) {
+return false;
+}
+
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> {
+// Assumption that segments contain size >= 0
+if (remainingBreachedSize > 0) {
+long remainingBytes = remainingBreachedSize - 
x.segmentSizeInBytes();
+if (remainingBytes >= 0) {
+remainingBreachedSize = remainingBytes;
+return true;
+}
+}
+
+return false;
+});
+if (isSegmentDeleted) {
+logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+logger.info("Deleted remote log segment {} due to 
retention size {} breach. Log size after deletion will be {}.",
+metadata.remoteLogSegmentId(), 
retentionSizeData.get().retentionSize, remainingBreachedSize + 
retentionSizeData.get().retentionSize);
+}
+return isSegmentDeleted;
+}
+
+public boolean 
deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+if (!retentionTimeData.isPresent()) {
+return false;
+}
+
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata,
+x -> x.maxTimestampMs() <= 
retentionTimeData.get().cleanupUntilMs);
+if (isSegmentDeleted) {
+remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+// It is fine to have logStartOffset as 
`metadata.endOffset() + 1` as the segment offset intervals
+// are ascending with in an epoch.
+logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+logger.info("Deleted remote log segment {} due to 
retention time {}ms breach based on the largest record timestamp in the 
segment",
+metadata.remoteLogSegmentId(), 
retentionTimeData.get().retentionMs);
+}
+return isSegmentDeleted;
+}
+
+private boolean 
deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long 
startOffset)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> startOffset > x.endOffset());
+if (isSegmentDeleted && retentionSizeData.isPresent()) {
+remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+logger.info("Deleted remote log segment {} due to log 
start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+}
+
+return isSegmentDeleted;
+}
+
+// It removes the segments beyond the current leader's earliest 
epoch. Those segments are considered as
+// unreferenced because they are not part of the current leader 
epoch lineage.
+private boolean 
deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, 
RemoteLogSegmentMetadata metadata) throws RemoteStorageExcept

[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-08-16 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1296732476


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -696,11 +704,327 @@ public void run() {
 }
 }
 
+public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
+if (isLeader()) {
+logger.debug("Updating {} with remoteLogStartOffset: {}", 
topicPartition, remoteLogStartOffset);
+updateRemoteLogStartOffset.accept(topicPartition, 
remoteLogStartOffset);
+}
+}
+
+class RemoteLogRetentionHandler {
+
+private final Optional retentionSizeData;
+private final Optional retentionTimeData;
+
+private long remainingBreachedSize;
+
+private OptionalLong logStartOffset = OptionalLong.empty();
+
+public RemoteLogRetentionHandler(Optional 
retentionSizeData, Optional retentionTimeData) {
+this.retentionSizeData = retentionSizeData;
+this.retentionTimeData = retentionTimeData;
+remainingBreachedSize = retentionSizeData.map(sizeData -> 
sizeData.remainingBreachedSize).orElse(0L);
+}
+
+private boolean 
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws 
RemoteStorageException, ExecutionException, InterruptedException {
+if (!retentionSizeData.isPresent()) {
+return false;
+}
+
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> {
+// Assumption that segments contain size >= 0
+if (remainingBreachedSize > 0) {
+long remainingBytes = remainingBreachedSize - 
x.segmentSizeInBytes();
+if (remainingBytes >= 0) {
+remainingBreachedSize = remainingBytes;
+return true;
+}
+}
+
+return false;
+});
+if (isSegmentDeleted) {
+logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+logger.info("Deleted remote log segment {} due to 
retention size {} breach. Log size after deletion will be {}.",
+metadata.remoteLogSegmentId(), 
retentionSizeData.get().retentionSize, remainingBreachedSize + 
retentionSizeData.get().retentionSize);
+}
+return isSegmentDeleted;
+}
+
+public boolean 
deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+if (!retentionTimeData.isPresent()) {
+return false;
+}
+
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata,
+x -> x.maxTimestampMs() <= 
retentionTimeData.get().cleanupUntilMs);
+if (isSegmentDeleted) {
+remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+// It is fine to have logStartOffset as 
`metadata.endOffset() + 1` as the segment offset intervals
+// are ascending with in an epoch.
+logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+logger.info("Deleted remote log segment {} due to 
retention time {}ms breach based on the largest record timestamp in the 
segment",
+metadata.remoteLogSegmentId(), 
retentionTimeData.get().retentionMs);
+}
+return isSegmentDeleted;
+}
+
+private boolean 
deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long 
startOffset)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> startOffset > x.endOffset());
+if (isSegmentDeleted && retentionSizeData.isPresent()) {
+remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+logger.info("Deleted remote log segment {} due to log 
start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+}
+
+return isSegmentDeleted;
+}
+
+// It removes the segments beyond the current leader's earliest 
epoch. Those segments are considered as
+// unreferenced because they are not part of the current leader 
epoch lineage.
+private boolean 
deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, 
RemoteLogSegmentMetadata metadata) throws RemoteStorageExcept

[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-08-16 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1296731885


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -696,11 +704,327 @@ public void run() {
 }
 }
 
+public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
+if (isLeader()) {
+logger.debug("Updating {} with remoteLogStartOffset: {}", 
topicPartition, remoteLogStartOffset);
+updateRemoteLogStartOffset.accept(topicPartition, 
remoteLogStartOffset);
+}
+}
+
+class RemoteLogRetentionHandler {
+
+private final Optional retentionSizeData;
+private final Optional retentionTimeData;
+
+private long remainingBreachedSize;
+
+private OptionalLong logStartOffset = OptionalLong.empty();
+
+public RemoteLogRetentionHandler(Optional 
retentionSizeData, Optional retentionTimeData) {
+this.retentionSizeData = retentionSizeData;
+this.retentionTimeData = retentionTimeData;
+remainingBreachedSize = retentionSizeData.map(sizeData -> 
sizeData.remainingBreachedSize).orElse(0L);
+}
+
+private boolean 
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws 
RemoteStorageException, ExecutionException, InterruptedException {
+if (!retentionSizeData.isPresent()) {
+return false;
+}
+
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> {
+// Assumption that segments contain size >= 0
+if (remainingBreachedSize > 0) {
+long remainingBytes = remainingBreachedSize - 
x.segmentSizeInBytes();
+if (remainingBytes >= 0) {
+remainingBreachedSize = remainingBytes;
+return true;
+}
+}
+
+return false;
+});
+if (isSegmentDeleted) {
+logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+logger.info("Deleted remote log segment {} due to 
retention size {} breach. Log size after deletion will be {}.",
+metadata.remoteLogSegmentId(), 
retentionSizeData.get().retentionSize, remainingBreachedSize + 
retentionSizeData.get().retentionSize);
+}
+return isSegmentDeleted;
+}
+
+public boolean 
deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+if (!retentionTimeData.isPresent()) {
+return false;
+}
+
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata,
+x -> x.maxTimestampMs() <= 
retentionTimeData.get().cleanupUntilMs);
+if (isSegmentDeleted) {
+remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+// It is fine to have logStartOffset as 
`metadata.endOffset() + 1` as the segment offset intervals
+// are ascending with in an epoch.
+logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+logger.info("Deleted remote log segment {} due to 
retention time {}ms breach based on the largest record timestamp in the 
segment",
+metadata.remoteLogSegmentId(), 
retentionTimeData.get().retentionMs);
+}
+return isSegmentDeleted;
+}
+
+private boolean 
deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long 
startOffset)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> startOffset > x.endOffset());
+if (isSegmentDeleted && retentionSizeData.isPresent()) {
+remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+logger.info("Deleted remote log segment {} due to log 
start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+}
+
+return isSegmentDeleted;
+}
+
+// It removes the segments beyond the current leader's earliest 
epoch. Those segments are considered as
+// unreferenced because they are not part of the current leader 
epoch lineage.
+private boolean 
deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, 
RemoteLogSegmentMetadata metadata) throws RemoteStorageExcept

[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-08-16 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1295713103


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -696,11 +704,327 @@ public void run() {
 }
 }
 
+public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
+if (isLeader()) {
+logger.debug("Updating {} with remoteLogStartOffset: {}", 
topicPartition, remoteLogStartOffset);
+updateRemoteLogStartOffset.accept(topicPartition, 
remoteLogStartOffset);
+}
+}
+
+class RemoteLogRetentionHandler {
+
+private final Optional retentionSizeData;
+private final Optional retentionTimeData;
+
+private long remainingBreachedSize;
+
+private OptionalLong logStartOffset = OptionalLong.empty();
+
+public RemoteLogRetentionHandler(Optional 
retentionSizeData, Optional retentionTimeData) {
+this.retentionSizeData = retentionSizeData;
+this.retentionTimeData = retentionTimeData;
+remainingBreachedSize = retentionSizeData.map(sizeData -> 
sizeData.remainingBreachedSize).orElse(0L);
+}
+
+private boolean 
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws 
RemoteStorageException, ExecutionException, InterruptedException {
+if (!retentionSizeData.isPresent()) {
+return false;
+}
+
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> {
+// Assumption that segments contain size >= 0
+if (remainingBreachedSize > 0) {
+long remainingBytes = remainingBreachedSize - 
x.segmentSizeInBytes();
+if (remainingBytes >= 0) {
+remainingBreachedSize = remainingBytes;
+return true;
+}
+}
+
+return false;
+});
+if (isSegmentDeleted) {
+logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+logger.info("Deleted remote log segment {} due to 
retention size {} breach. Log size after deletion will be {}.",
+metadata.remoteLogSegmentId(), 
retentionSizeData.get().retentionSize, remainingBreachedSize + 
retentionSizeData.get().retentionSize);
+}
+return isSegmentDeleted;
+}
+
+public boolean 
deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+if (!retentionTimeData.isPresent()) {
+return false;
+}
+
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata,
+x -> x.maxTimestampMs() <= 
retentionTimeData.get().cleanupUntilMs);
+if (isSegmentDeleted) {
+remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+// It is fine to have logStartOffset as 
`metadata.endOffset() + 1` as the segment offset intervals
+// are ascending with in an epoch.
+logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+logger.info("Deleted remote log segment {} due to 
retention time {}ms breach based on the largest record timestamp in the 
segment",
+metadata.remoteLogSegmentId(), 
retentionTimeData.get().retentionMs);
+}
+return isSegmentDeleted;
+}
+
+private boolean 
deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long 
startOffset)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> startOffset > x.endOffset());
+if (isSegmentDeleted && retentionSizeData.isPresent()) {
+remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+logger.info("Deleted remote log segment {} due to log 
start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+}
+
+return isSegmentDeleted;
+}
+
+// It removes the segments beyond the current leader's earliest 
epoch. Those segments are considered as
+// unreferenced because they are not part of the current leader 
epoch lineage.
+private boolean 
deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, 
RemoteLogSegmentMetadata metadata) throws RemoteStorageExcept

[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-08-16 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1295701347


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -698,11 +707,329 @@ public void run() {
 }
 }
 
+public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
+if (isLeader()) {
+logger.debug("Updating {} with remoteLogStartOffset: {}", 
topicPartition, remoteLogStartOffset);
+updateRemoteLogStartOffset.accept(topicPartition, 
remoteLogStartOffset);
+}
+}
+
+class RemoteLogRetentionHandler {
+
+private final Optional retentionSizeData;
+private final Optional retentionTimeData;
+
+private long remainingBreachedSize;
+
+private OptionalLong logStartOffset = OptionalLong.empty();
+
+public RemoteLogRetentionHandler(Optional 
retentionSizeData, Optional retentionTimeData) {
+this.retentionSizeData = retentionSizeData;
+this.retentionTimeData = retentionTimeData;
+remainingBreachedSize = retentionSizeData.map(sizeData -> 
sizeData.remainingBreachedSize).orElse(0L);
+}
+
+private boolean 
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws 
RemoteStorageException, ExecutionException, InterruptedException {
+if (!retentionSizeData.isPresent()) {
+return false;
+}
+
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> {
+// Assumption that segments contain size >= 0
+if (remainingBreachedSize > 0) {
+long remainingBytes = remainingBreachedSize - 
x.segmentSizeInBytes();
+if (remainingBytes >= 0) {
+remainingBreachedSize = remainingBytes;
+return true;
+}
+}
+
+return false;
+});
+if (isSegmentDeleted) {
+logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+logger.info("Deleted remote log segment {} due to 
retention size {} breach. Log size after deletion will be {}.",
+metadata.remoteLogSegmentId(), 
retentionSizeData.get().retentionSize, remainingBreachedSize + 
retentionSizeData.get().retentionSize);
+}
+return isSegmentDeleted;
+}
+
+public boolean 
deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+if (!retentionTimeData.isPresent()) {
+return false;
+}
+
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata,
+x -> x.maxTimestampMs() <= 
retentionTimeData.get().cleanupUntilMs);
+if (isSegmentDeleted) {
+remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+// It is fine to have logStartOffset as 
`metadata.endOffset() + 1` as the segment offset intervals
+// are ascending with in an epoch.
+logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+logger.info("Deleted remote log segment {} due to 
retention time {}ms breach based on the largest record timestamp in the 
segment",
+metadata.remoteLogSegmentId(), 
retentionTimeData.get().retentionMs);
+}
+return isSegmentDeleted;
+}
+
+private boolean 
deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long 
startOffset)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> startOffset > x.endOffset());
+if (isSegmentDeleted && retentionSizeData.isPresent()) {
+remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+logger.info("Deleted remote log segment {} due to log 
start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+}
+
+return isSegmentDeleted;
+}
+
+// It removes the segments beyond the current leader's earliest 
epoch. Those segments are considered as
+// unreferenced because they are not part of the current leader 
epoch lineage.
+private boolean 
deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, 
RemoteLogSegmentMetadata metadata) throws RemoteStorageExcept

[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-08-16 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1295670320


##
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##
@@ -1003,6 +1015,134 @@ public RemoteLogMetadataManager 
createRemoteLogMetadataManager() {
 }
 }
 
+private static RemoteLogSegmentMetadata 
createRemoteLogSegmentMetadata(long startOffset, long endOffset, Map segmentEpochs) {
+return new RemoteLogSegmentMetadata(
+new RemoteLogSegmentId(new TopicIdPartition(Uuid.randomUuid(),
+new TopicPartition("topic", 0)), Uuid.randomUuid()),
+startOffset, endOffset,
+10L,
+1,
+10L,
+1000,
+Optional.empty(),
+RemoteLogSegmentState.COPY_SEGMENT_FINISHED, segmentEpochs);
+}
+
+@Test
+public void testRemoteSegmentWithinLeaderEpochs() {
+// Test whether a remote segment is within the leader epochs
+final long logEndOffset = 90L;
+
+TreeMap leaderEpochToStartOffset = new TreeMap() {{
+put(0, 0L);
+put(1, 10L);
+put(2, 20L);
+put(3, 30L);
+put(4, 40L);
+put(5, 50L);
+put(7, 70L);
+}};

Review Comment:
   Thanks @kamalcph for the clarification, good to know about that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-08-16 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1295607067


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1390,7 +1424,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => 
Boolean,
 reason: SegmentDeletionReason): Int = {
 def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): 
Boolean = {
-  highWatermark >= 
nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset) &&
+  val upperBoundOffset = 
nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset)
+
+  // Check not to delete segments which are not yet copied to tiered 
storage
+  val isSegmentTieredToRemoteStorage =

Review Comment:
   Thanks @junrao for the clarification. 
   
   In the above case with remote storage enabled, it will eventually be deleted 
from local and remote storages, and updates log-start-offset and 
local-log-start-offset respectively. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-08-15 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1295387916


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -1033,6 +1360,35 @@ public void close() {
 }
 }
 
+private static class RetentionSizeData {
+private final long retentionSize;
+private final long remainingBreachedSize;
+
+public RetentionSizeData(long retentionSize, long 
remainingBreachedSize) {
+if (retentionSize < remainingBreachedSize) {
+throw new IllegalArgumentException("retentionSize must be 
greater than remainingBreachedSize");
+}

Review Comment:
   Good catch! It was changed while refactoring, added UTs to cover that in the 
latest commits.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-08-15 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1294843843


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -152,16 +152,42 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   locally {
 initializePartitionMetadata()
 updateLogStartOffset(logStartOffset)
+updateLocalLogStartOffset(math.max(logStartOffset, 
localLog.segments.firstSegmentBaseOffset.getOrElse(0L)))
+if (!remoteLogEnabled())
+  logStartOffset = localLogStartOffset()
 maybeIncrementFirstUnstableOffset()
 initializeTopicId()
 
 
logOffsetsListener.onHighWatermarkUpdated(highWatermarkMetadata.messageOffset)
+
+info(s"Completed load of log with ${localLog.segments.numberOfSegments} 
segments, local log start offset ${localLogStartOffset()} and " +
+  s"log end offset $logEndOffset")
   }
 
   def setLogOffsetsListener(listener: LogOffsetsListener): Unit = {
 logOffsetsListener = listener
   }
 
+  private def updateLocalLogStartOffset(offset: Long): Unit = {

Review Comment:
   This method is used only from `locally` block and it does not require taking 
any lock. We moved this method inside the locally block to avoid any confusion 
and future usage outside of that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-08-15 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1294604024


##
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##
@@ -1003,6 +1015,134 @@ public RemoteLogMetadataManager 
createRemoteLogMetadataManager() {
 }
 }
 
+private static RemoteLogSegmentMetadata 
createRemoteLogSegmentMetadata(long startOffset, long endOffset, Map segmentEpochs) {
+return new RemoteLogSegmentMetadata(
+new RemoteLogSegmentId(new TopicIdPartition(Uuid.randomUuid(),
+new TopicPartition("topic", 0)), Uuid.randomUuid()),
+startOffset, endOffset,
+10L,
+1,
+10L,
+1000,
+Optional.empty(),
+RemoteLogSegmentState.COPY_SEGMENT_FINISHED, segmentEpochs);
+}
+
+@Test
+public void testRemoteSegmentWithinLeaderEpochs() {
+// Test whether a remote segment is within the leader epochs
+final long logEndOffset = 90L;
+
+TreeMap leaderEpochToStartOffset = new TreeMap() {{
+put(0, 0L);
+put(1, 10L);
+put(2, 20L);
+put(3, 30L);
+put(4, 40L);
+put(5, 50L);
+put(7, 70L);
+}};

Review Comment:
   What is the rationale for this suggestion?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-08-15 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1294603142


##
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##
@@ -1003,6 +1015,134 @@ public RemoteLogMetadataManager 
createRemoteLogMetadataManager() {
 }
 }
 
+private static RemoteLogSegmentMetadata 
createRemoteLogSegmentMetadata(long startOffset, long endOffset, Map segmentEpochs) {
+return new RemoteLogSegmentMetadata(
+new RemoteLogSegmentId(new TopicIdPartition(Uuid.randomUuid(),
+new TopicPartition("topic", 0)), Uuid.randomUuid()),
+startOffset, endOffset,
+10L,
+1,
+10L,
+1000,
+Optional.empty(),
+RemoteLogSegmentState.COPY_SEGMENT_FINISHED, segmentEpochs);
+}
+
+@Test
+public void testRemoteSegmentWithinLeaderEpochs() {
+// Test whether a remote segment is within the leader epochs
+final long logEndOffset = 90L;
+
+TreeMap leaderEpochToStartOffset = new TreeMap() {{
+put(0, 0L);
+put(1, 10L);
+put(2, 20L);
+put(3, 30L);
+put(4, 40L);
+put(5, 50L);
+put(7, 70L);
+}};
+
+// Test whether a remote segment's epochs/offsets(multiple) are within 
the range of leader epochs
+
assertTrue(RemoteLogManager.isRemoteSegmentWithinLeaderEpochs(createRemoteLogSegmentMetadata(
+15,
+35,
+new TreeMap() {{
+put(1, 15L);
+put(2, 20L);
+put(3, 30L);
+}}), logEndOffset, leaderEpochToStartOffset));
+
+// Test whether a remote segment's epochs/offsets(single) are within 
the range of leader epochs
+
assertTrue(RemoteLogManager.isRemoteSegmentWithinLeaderEpochs(createRemoteLogSegmentMetadata(
+15,
+19,
+new TreeMap() {{
+put(1, 15L);
+}}), logEndOffset, leaderEpochToStartOffset));
+
+// Test whether a remote segment's epochs/offsets(single) are within 
the range of leader epochs
+
assertTrue(RemoteLogManager.isRemoteSegmentWithinLeaderEpochs(createRemoteLogSegmentMetadata(

Review Comment:
   Good catch!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-08-15 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1294602887


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -698,11 +707,329 @@ public void run() {
 }
 }
 
+public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
+if (isLeader()) {
+logger.debug("Updating {} with remoteLogStartOffset: {}", 
topicPartition, remoteLogStartOffset);
+updateRemoteLogStartOffset.accept(topicPartition, 
remoteLogStartOffset);
+}
+}
+
+class RemoteLogRetentionHandler {
+
+private final Optional retentionSizeData;
+private final Optional retentionTimeData;
+
+private long remainingBreachedSize;
+
+private OptionalLong logStartOffset = OptionalLong.empty();
+
+public RemoteLogRetentionHandler(Optional 
retentionSizeData, Optional retentionTimeData) {

Review Comment:
   I do not find a strong reason not to use Optional as an argument. :) In the 
same SO link, few other opinions on why it is a weak argument. 
   
   Optional as an argument is used in several other places within this project. 
I do not have strong opinions and I am fine if we decide to go with that rule 
across the project when there is a consensus. We can revisit it when we do that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-08-14 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1293345266


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1390,7 +1424,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => 
Boolean,
 reason: SegmentDeletionReason): Int = {
 def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): 
Boolean = {
-  highWatermark >= 
nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset) &&
+  val upperBoundOffset = 
nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset)
+
+  // Check not to delete segments which are not yet copied to tiered 
storage
+  val isSegmentTieredToRemoteStorage =

Review Comment:
   Let me rephrase what you mentioned here
   
   retention.bytes= 100MB
   segment1 - 90MB
   
   When remote storage is not enabled, then this segment is not deleted from 
local log segments becuas eof the retention size check.
   
   retention.bytes= 100MB
   local.retention.bytes= 20MB
   segment1 - 90MB
   
   When remote storage is enabled, and there are no segments uploaded to remote 
storage. That means it will not allow this segment to be deleted as it is not 
yet copied to remote storage based on the introduced check in this PR.
   
   If it is copied to remote storage, that means it is not an active segment 
and there are one or more local segments after this segment. This segment will 
be eligible for deletion based on the local retention policy as it is already 
copied to remote storage earlier.
   
   @junrao Am I missing anything here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-08-14 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1293344902


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1390,7 +1424,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => 
Boolean,
 reason: SegmentDeletionReason): Int = {
 def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): 
Boolean = {
-  highWatermark >= 
nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset) &&
+  val upperBoundOffset = 
nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset)
+
+  // Check not to delete segments which are not yet copied to tiered 
storage
+  val isSegmentTieredToRemoteStorage =

Review Comment:
   Let me rephrase what you mentioned here
   
   retention.bytes= 100MB
   segment1 - 90MB
   
   When remote storage is not enabled, then this segment is not deleted from 
local log segments becuas eof the retention size check. 
   
   retention.bytes= 100MB
   local.retention.bytes= 20MB
   segment1 - 90MB
   
   When remote storage is enabled, and there are no segments uploaded to remote 
storage. That means it will not allow this segment to be deleted as it is not 
yet copied to remote storage based on the introduced check in this PR. 
   
   If it is copied to remote storage, that means it is not an active segment 
and there are one or more local segments after this segment. This segment will 
be eligible for deletion based on the local retention policy as it is already 
copied to remote storage earlier.
   
   @junrao Am I missing antyhing here?
   



##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1390,7 +1424,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => 
Boolean,
 reason: SegmentDeletionReason): Int = {
 def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): 
Boolean = {
-  highWatermark >= 
nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset) &&
+  val upperBoundOffset = 
nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset)
+
+  // Check not to delete segments which are not yet copied to tiered 
storage
+  val isSegmentTieredToRemoteStorage =

Review Comment:
   Let me rephrase what you mentioned here
   
   retention.bytes= 100MB
   segment1 - 90MB
   
   When remote storage is not enabled, then this segment is not deleted from 
local log segments becuas eof the retention size check. 
   
   retention.bytes= 100MB
   local.retention.bytes= 20MB
   segment1 - 90MB
   
   When remote storage is enabled, and there are no segments uploaded to remote 
storage. That means it will not allow this segment to be deleted as it is not 
yet copied to remote storage based on the introduced check in this PR. 
   
   If it is copied to remote storage, that means it is not an active segment 
and there are one or more local segments after this segment. This segment will 
be eligible for deletion based on the local retention policy as it is already 
copied to remote storage earlier.
   
   @junrao Am I missing antyhing here?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-08-14 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1293179175


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -618,6 +625,230 @@ public void run() {
 }
 }
 
+public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
+if (isLeader()) {

Review Comment:
   @kamalcph The case mentioned by you can be addressed in a followup PR. 
Please file a JIRA.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-08-13 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1292967405


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1390,7 +1424,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => 
Boolean,
 reason: SegmentDeletionReason): Int = {
 def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): 
Boolean = {
-  highWatermark >= 
nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset) &&
+  val upperBoundOffset = 
nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset)
+
+  // Check not to delete segments which are not yet copied to tiered 
storage
+  val isSegmentTieredToRemoteStorage =
+if (remoteLogEnabled()) upperBoundOffset > 0 && upperBoundOffset - 1 
<= highestOffsetInRemoteStorage
+else true

Review Comment:
   It was implicit from the condition that it is relevant only when remote 
storage is enabled. I removed the value and added a condition and the 
respective comments for better clarity.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-08-13 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1292967294


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1464,12 +1513,12 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   }
 }
 
-deleteOldSegments(shouldDelete, RetentionSizeBreach(this))
+deleteOldSegments(shouldDelete, RetentionSizeBreach(this, 
remoteLogEnabled()))
   }
 
   private def deleteLogStartOffsetBreachedSegments(): Int = {
 def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): 
Boolean = {
-  nextSegmentOpt.exists(_.baseOffset <= logStartOffset)
+  nextSegmentOpt.exists(_.baseOffset <= (if(remoteLogEnabled()) 
localLogStartOffset() else logStartOffset))

Review Comment:
   When remote log is enabled, it deletes the local segments whose offset is <= 
local-log-start-offset. The existing condition without tiered storage is to 
delete the local log segments <= log-start-offset.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-08-09 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1289577043


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -696,11 +704,327 @@ public void run() {
 }
 }
 
+public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
+if (isLeader()) {
+logger.debug("Updating {} with remoteLogStartOffset: {}", 
topicPartition, remoteLogStartOffset);
+updateRemoteLogStartOffset.accept(topicPartition, 
remoteLogStartOffset);
+}
+}
+
+class RemoteLogRetentionHandler {
+
+private final Optional retentionSizeData;
+private final Optional retentionTimeData;
+
+private long remainingBreachedSize;
+
+private OptionalLong logStartOffset = OptionalLong.empty();
+
+public RemoteLogRetentionHandler(Optional 
retentionSizeData, Optional retentionTimeData) {
+this.retentionSizeData = retentionSizeData;
+this.retentionTimeData = retentionTimeData;
+remainingBreachedSize = retentionSizeData.map(sizeData -> 
sizeData.remainingBreachedSize).orElse(0L);
+}
+
+private boolean 
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws 
RemoteStorageException, ExecutionException, InterruptedException {
+if (!retentionSizeData.isPresent()) {
+return false;
+}
+
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> {
+// Assumption that segments contain size >= 0
+if (remainingBreachedSize > 0) {
+long remainingBytes = remainingBreachedSize - 
x.segmentSizeInBytes();
+if (remainingBytes >= 0) {
+remainingBreachedSize = remainingBytes;
+return true;
+}
+}
+
+return false;
+});
+if (isSegmentDeleted) {
+logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+logger.info("Deleted remote log segment {} due to 
retention size {} breach. Log size after deletion will be {}.",
+metadata.remoteLogSegmentId(), 
retentionSizeData.get().retentionSize, remainingBreachedSize + 
retentionSizeData.get().retentionSize);
+}
+return isSegmentDeleted;
+}
+
+public boolean 
deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+if (!retentionTimeData.isPresent()) {
+return false;
+}
+
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata,
+x -> x.maxTimestampMs() <= 
retentionTimeData.get().cleanupUntilMs);
+if (isSegmentDeleted) {
+remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+// It is fine to have logStartOffset as 
`metadata.endOffset() + 1` as the segment offset intervals
+// are ascending with in an epoch.
+logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+logger.info("Deleted remote log segment {} due to 
retention time {}ms breach based on the largest record timestamp in the 
segment",
+metadata.remoteLogSegmentId(), 
retentionTimeData.get().retentionMs);
+}
+return isSegmentDeleted;
+}
+
+private boolean 
deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long 
startOffset)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> startOffset > x.endOffset());
+if (isSegmentDeleted && retentionSizeData.isPresent()) {
+remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+logger.info("Deleted remote log segment {} due to log 
start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+}
+
+return isSegmentDeleted;
+}
+
+// It removes the segments beyond the current leader's earliest 
epoch. Those segments are considered as
+// unreferenced because they are not part of the current leader 
epoch lineage.
+private boolean 
deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, 
RemoteLogSegmentMetadata metadata) throws RemoteStorageExcept

[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-08-09 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1289577043


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -696,11 +704,327 @@ public void run() {
 }
 }
 
+public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
+if (isLeader()) {
+logger.debug("Updating {} with remoteLogStartOffset: {}", 
topicPartition, remoteLogStartOffset);
+updateRemoteLogStartOffset.accept(topicPartition, 
remoteLogStartOffset);
+}
+}
+
+class RemoteLogRetentionHandler {
+
+private final Optional retentionSizeData;
+private final Optional retentionTimeData;
+
+private long remainingBreachedSize;
+
+private OptionalLong logStartOffset = OptionalLong.empty();
+
+public RemoteLogRetentionHandler(Optional 
retentionSizeData, Optional retentionTimeData) {
+this.retentionSizeData = retentionSizeData;
+this.retentionTimeData = retentionTimeData;
+remainingBreachedSize = retentionSizeData.map(sizeData -> 
sizeData.remainingBreachedSize).orElse(0L);
+}
+
+private boolean 
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws 
RemoteStorageException, ExecutionException, InterruptedException {
+if (!retentionSizeData.isPresent()) {
+return false;
+}
+
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> {
+// Assumption that segments contain size >= 0
+if (remainingBreachedSize > 0) {
+long remainingBytes = remainingBreachedSize - 
x.segmentSizeInBytes();
+if (remainingBytes >= 0) {
+remainingBreachedSize = remainingBytes;
+return true;
+}
+}
+
+return false;
+});
+if (isSegmentDeleted) {
+logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+logger.info("Deleted remote log segment {} due to 
retention size {} breach. Log size after deletion will be {}.",
+metadata.remoteLogSegmentId(), 
retentionSizeData.get().retentionSize, remainingBreachedSize + 
retentionSizeData.get().retentionSize);
+}
+return isSegmentDeleted;
+}
+
+public boolean 
deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+if (!retentionTimeData.isPresent()) {
+return false;
+}
+
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata,
+x -> x.maxTimestampMs() <= 
retentionTimeData.get().cleanupUntilMs);
+if (isSegmentDeleted) {
+remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+// It is fine to have logStartOffset as 
`metadata.endOffset() + 1` as the segment offset intervals
+// are ascending with in an epoch.
+logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+logger.info("Deleted remote log segment {} due to 
retention time {}ms breach based on the largest record timestamp in the 
segment",
+metadata.remoteLogSegmentId(), 
retentionTimeData.get().retentionMs);
+}
+return isSegmentDeleted;
+}
+
+private boolean 
deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long 
startOffset)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> startOffset > x.endOffset());
+if (isSegmentDeleted && retentionSizeData.isPresent()) {
+remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+logger.info("Deleted remote log segment {} due to log 
start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+}
+
+return isSegmentDeleted;
+}
+
+// It removes the segments beyond the current leader's earliest 
epoch. Those segments are considered as
+// unreferenced because they are not part of the current leader 
epoch lineage.
+private boolean 
deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, 
RemoteLogSegmentMetadata metadata) throws RemoteStorageExcept

[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-08-09 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1289577612


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1464,12 +1513,12 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   }
 }
 
-deleteOldSegments(shouldDelete, RetentionSizeBreach(this))
+deleteOldSegments(shouldDelete, RetentionSizeBreach(this, 
remoteLogEnabled()))
   }
 
   private def deleteLogStartOffsetBreachedSegments(): Int = {
 def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): 
Boolean = {
-  nextSegmentOpt.exists(_.baseOffset <= logStartOffset)
+  nextSegmentOpt.exists(_.baseOffset <= localLogStartOffset())

Review Comment:
   Nice catch! Missed it while merging the conflicts.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-08-09 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1289577043


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -696,11 +704,327 @@ public void run() {
 }
 }
 
+public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
+if (isLeader()) {
+logger.debug("Updating {} with remoteLogStartOffset: {}", 
topicPartition, remoteLogStartOffset);
+updateRemoteLogStartOffset.accept(topicPartition, 
remoteLogStartOffset);
+}
+}
+
+class RemoteLogRetentionHandler {
+
+private final Optional retentionSizeData;
+private final Optional retentionTimeData;
+
+private long remainingBreachedSize;
+
+private OptionalLong logStartOffset = OptionalLong.empty();
+
+public RemoteLogRetentionHandler(Optional 
retentionSizeData, Optional retentionTimeData) {
+this.retentionSizeData = retentionSizeData;
+this.retentionTimeData = retentionTimeData;
+remainingBreachedSize = retentionSizeData.map(sizeData -> 
sizeData.remainingBreachedSize).orElse(0L);
+}
+
+private boolean 
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws 
RemoteStorageException, ExecutionException, InterruptedException {
+if (!retentionSizeData.isPresent()) {
+return false;
+}
+
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> {
+// Assumption that segments contain size >= 0
+if (remainingBreachedSize > 0) {
+long remainingBytes = remainingBreachedSize - 
x.segmentSizeInBytes();
+if (remainingBytes >= 0) {
+remainingBreachedSize = remainingBytes;
+return true;
+}
+}
+
+return false;
+});
+if (isSegmentDeleted) {
+logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+logger.info("Deleted remote log segment {} due to 
retention size {} breach. Log size after deletion will be {}.",
+metadata.remoteLogSegmentId(), 
retentionSizeData.get().retentionSize, remainingBreachedSize + 
retentionSizeData.get().retentionSize);
+}
+return isSegmentDeleted;
+}
+
+public boolean 
deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+if (!retentionTimeData.isPresent()) {
+return false;
+}
+
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata,
+x -> x.maxTimestampMs() <= 
retentionTimeData.get().cleanupUntilMs);
+if (isSegmentDeleted) {
+remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+// It is fine to have logStartOffset as 
`metadata.endOffset() + 1` as the segment offset intervals
+// are ascending with in an epoch.
+logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+logger.info("Deleted remote log segment {} due to 
retention time {}ms breach based on the largest record timestamp in the 
segment",
+metadata.remoteLogSegmentId(), 
retentionTimeData.get().retentionMs);
+}
+return isSegmentDeleted;
+}
+
+private boolean 
deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long 
startOffset)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> startOffset > x.endOffset());
+if (isSegmentDeleted && retentionSizeData.isPresent()) {
+remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+logger.info("Deleted remote log segment {} due to log 
start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+}
+
+return isSegmentDeleted;
+}
+
+// It removes the segments beyond the current leader's earliest 
epoch. Those segments are considered as
+// unreferenced because they are not part of the current leader 
epoch lineage.
+private boolean 
deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, 
RemoteLogSegmentMetadata metadata) throws RemoteStorageExcept

[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-08-09 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1289566534


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1390,7 +1424,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => 
Boolean,
 reason: SegmentDeletionReason): Int = {
 def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): 
Boolean = {
-  highWatermark >= 
nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset) &&
+  val upperBoundOffset = 
nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset)
+
+  // Check not to delete segments which are not yet copied to tiered 
storage
+  val isSegmentTieredToRemoteStorage =

Review Comment:
   Local log size is based on the local retention configs and those are always 
less than or equal to the complete log retention.
   
   I'm unclear about the rationale behind retaining data in local storage using 
an overall retention size where there are no remote log segments. Please 
provide clarification.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-08-09 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1289488513


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1390,7 +1424,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => 
Boolean,
 reason: SegmentDeletionReason): Int = {
 def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): 
Boolean = {
-  highWatermark >= 
nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset) &&
+  val upperBoundOffset = 
nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset)
+
+  // Check not to delete segments which are not yet copied to tiered 
storage
+  val isSegmentTieredToRemoteStorage =
+if (remoteLogEnabled()) upperBoundOffset > 0 && upperBoundOffset - 1 
<= highestOffsetInRemoteStorage
+else true

Review Comment:
   No, this should be true if the remote storage is not enabled as this segment 
should be eligible based on other checks like `highWatermark >= 
upperBoundOffset && predicate(segment, nextSegmentOpt)`. Existing tests in 
`UnifiedLogTest`, `LogOffsetTest`, `LogLoaderTest`, `LogCleanerTest` already 
cover those scenarios.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-08-04 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1284280124


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -667,11 +675,323 @@ public void run() {
 }
 }
 
+public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
+if (isLeader()) {
+logger.debug("Updating {} with remoteLogStartOffset: {}", 
topicPartition, remoteLogStartOffset);
+updateRemoteLogStartOffset.accept(topicPartition, 
remoteLogStartOffset);
+}
+}
+
+class RemoteLogRetentionHandler {
+
+private final Optional retentionSizeData;
+private final Optional retentionTimeData;
+
+private long remainingBreachedSize;
+
+private OptionalLong logStartOffset = OptionalLong.empty();
+
+public RemoteLogRetentionHandler(Optional 
retentionSizeData, Optional retentionTimeData) {
+this.retentionSizeData = retentionSizeData;
+this.retentionTimeData = retentionTimeData;
+remainingBreachedSize = retentionSizeData.map(sizeData -> 
sizeData.remainingBreachedSize).orElse(0L);
+}
+
+private boolean 
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws 
RemoteStorageException, ExecutionException, InterruptedException {
+if (!retentionSizeData.isPresent()) {
+return false;
+}
+
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> {
+// Assumption that segments contain size >= 0
+if (remainingBreachedSize > 0) {
+long remainingBytes = remainingBreachedSize - 
x.segmentSizeInBytes();
+if (remainingBytes >= 0) {
+remainingBreachedSize = remainingBytes;
+return true;
+}
+}
+
+return false;
+});
+if (isSegmentDeleted) {
+logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+logger.info("Deleted remote log segment {} due to 
retention size {} breach. Log size after deletion will be {}.",
+metadata.remoteLogSegmentId(), 
retentionSizeData.get().retentionSize, remainingBreachedSize + 
retentionSizeData.get().retentionSize);
+}
+return isSegmentDeleted;
+}
+
+public boolean 
deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+if (!retentionTimeData.isPresent()) {
+return false;
+}
+
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata,
+x -> x.maxTimestampMs() <= 
retentionTimeData.get().cleanupUntilMs);
+if (isSegmentDeleted) {
+remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+// It is fine to have logStartOffset as 
`metadata.endOffset() + 1` as the segment offset intervals
+// are ascending with in an epoch.
+logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+logger.info("Deleted remote log segment {} due to 
retention time {}ms breach based on the largest record timestamp in the 
segment",
+metadata.remoteLogSegmentId(), 
retentionTimeData.get().retentionMs);
+}
+return isSegmentDeleted;
+}
+
+private boolean 
deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long 
startOffset)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> startOffset > x.endOffset());
+if (isSegmentDeleted && retentionSizeData.isPresent()) {
+remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+logger.info("Deleted remote log segment {} due to log 
start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+}
+
+// No need to update the logStartOffset.
+return isSegmentDeleted;
+}
+
+// It removes the segments beyond the current leader's earliest 
epoch. Those segments are considered as
+// unreferenced because they are not part of the current leader 
epoch lineage.
+private boolean 
deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, 
Rem

[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-08-04 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1284278935


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -2207,7 +2267,7 @@ case class RetentionSizeBreach(log: UnifiedLog) extends 
SegmentDeletionReason {
 var size = log.size
 toDelete.foreach { segment =>
   size -= segment.size
-  log.info(s"Deleting segment $segment due to retention size 
${log.config.retentionSize} breach. Log size " +
+  log.info(s"Deleting segment $segment due to local log retention size 
${UnifiedLog.localRetentionSize(log.config)} breach. Local log size " +

Review Comment:
   Good catch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-08-04 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1284241897


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -152,16 +152,42 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   locally {
 initializePartitionMetadata()
 updateLogStartOffset(logStartOffset)
+updateLocalLogStartOffset(math.max(logStartOffset, 
localLog.segments.firstSegmentBaseOffset.getOrElse(0L)))
+if (!remoteLogEnabled())
+  logStartOffset = localLogStartOffset()
 maybeIncrementFirstUnstableOffset()
 initializeTopicId()
 
 
logOffsetsListener.onHighWatermarkUpdated(highWatermarkMetadata.messageOffset)
+
+info(s"Completed load of log with ${localLog.segments.numberOfSegments} 
segments, local log start offset ${localLogStartOffset()} and " +
+  s"log end offset $logEndOffset")
   }
 
   def setLogOffsetsListener(listener: LogOffsetsListener): Unit = {
 logOffsetsListener = listener
   }
 
+  private def updateLocalLogStartOffset(offset: Long): Unit = {

Review Comment:
   This is not required as the updated code does not use this method. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-08-03 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1283024088


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -618,6 +625,230 @@ public void run() {
 }
 }
 
+public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
+if (isLeader()) {

Review Comment:
   It is not mandatory to update it when this node becomes a follower as the 
existing follower fetch protocol makes sure that the follower truncates their 
log-start-offset based on the leader's log-start-ffset. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-08-02 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1281844159


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -945,13 +978,19 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
 localLog.checkIfMemoryMappedBufferClosed()
 if (newLogStartOffset > logStartOffset) {
-  updatedLogStartOffset = true
-  updateLogStartOffset(newLogStartOffset)
-  _localLogStartOffset = newLogStartOffset
-  info(s"Incremented log start offset to $newLogStartOffset due to 
$reason")
-  leaderEpochCache.foreach(_.truncateFromStart(logStartOffset))
-  producerStateManager.onLogStartOffsetIncremented(newLogStartOffset)
-  maybeIncrementFirstUnstableOffset()
+  // it should always get updated  if tiered-storage is not enabled.
+  if (!onlyLocalLogStartOffsetUpdate || !remoteLogEnabled()) {

Review Comment:
   Good point! Addressed in the latest commits to keep the logic simpler.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-08-02 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1281672320


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -618,6 +625,230 @@ public void run() {
 }
 }
 
+public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
+if (isLeader()) {
+logger.debug("Updating {} with remoteLogStartOffset: {}", 
topicPartition, remoteLogStartOffset);
+updateRemoteLogStartOffset.accept(topicPartition, 
remoteLogStartOffset);
+}
+}
+
+class RemoteLogRetentionHandler {
+
+private final Optional retentionSizeData;
+private final Optional retentionTimeData;
+
+private long remainingBreachedSize;
+
+private OptionalLong logStartOffset = OptionalLong.empty();
+
+public RemoteLogRetentionHandler(Optional 
retentionSizeData, Optional retentionTimeData) {
+this.retentionSizeData = retentionSizeData;
+this.retentionTimeData = retentionTimeData;
+remainingBreachedSize = retentionSizeData.map(sizeData -> 
sizeData.remainingBreachedSize).orElse(0L);
+}
+
+private boolean 
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws 
RemoteStorageException, ExecutionException, InterruptedException {
+if (!retentionSizeData.isPresent()) {
+return false;
+}
+
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> {
+// Assumption that segments contain size >= 0
+if (retentionSizeData.get().remainingBreachedSize > 0) {
+remainingBreachedSize -= x.segmentSizeInBytes();
+return remainingBreachedSize >= 0;
+} else return false;
+});
+if (isSegmentDeleted) {
+logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+logger.info("Deleted remote log segment {} due to 
retention size {} breach. Log size after deletion will be {}.",
+metadata.remoteLogSegmentId(), 
retentionSizeData.get().retentionSize, remainingBreachedSize + 
retentionSizeData.get().retentionSize);
+}
+return isSegmentDeleted;
+}
+
+public boolean 
deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+if (!retentionTimeData.isPresent()) {
+return false;
+}
+
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata,
+x -> x.maxTimestampMs() <= 
retentionTimeData.get().cleanupUntilMs);
+if (isSegmentDeleted) {
+remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+// It is fine to have logStartOffset as 
`metadata.endOffset() + 1` as the segment offset intervals
+// are ascending with in an epoch.
+logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+logger.info("Deleted remote log segment {} due to 
retention time {}ms breach based on the largest record timestamp in the 
segment",
+metadata.remoteLogSegmentId(), 
retentionTimeData.get().retentionMs);
+}
+return isSegmentDeleted;
+}
+
+private boolean 
deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long 
startOffset)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> startOffset > x.endOffset());
+if (isSegmentDeleted && retentionSizeData.isPresent()) {
+remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+logger.info("Deleted remote log segment {} due to log 
start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+}
+
+// No need to update the logStartOffset.
+return isSegmentDeleted;
+}
+
+// It removes the segments beyond the current leader's earliest 
epoch. Those segments are considered as
+// unreferenced because they are not part of the current leader 
epoch lineage.
+private boolean 
deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, 
RemoteLogSegmentMetadata metadata) throws RemoteStorageException, 
ExecutionException, InterruptedException {
+boolean isSegmentDelete

[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-06-20 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1234980957


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -152,16 +152,42 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   locally {
 initializePartitionMetadata()
 updateLogStartOffset(logStartOffset)
+updateLocalLogStartOffset(math.max(logStartOffset, 
localLog.segments.firstSegmentBaseOffset.getOrElse(0L)))
+if (!remoteLogEnabled())
+  logStartOffset = localLogStartOffset()
 maybeIncrementFirstUnstableOffset()
 initializeTopicId()
 
 
logOffsetsListener.onHighWatermarkUpdated(highWatermarkMetadata.messageOffset)
+
+info(s"Completed load of log with ${localLog.segments.numberOfSegments} 
segments, local log start offset ${localLogStartOffset()} and " +
+  s"log end offset $logEndOffset")
   }
 
   def setLogOffsetsListener(listener: LogOffsetsListener): Unit = {
 logOffsetsListener = listener
   }
 
+  private def updateLocalLogStartOffset(offset: Long): Unit = {
+_localLogStartOffset = offset
+
+if (highWatermark < offset) {
+  updateHighWatermark(offset)
+}
+
+if (this.recoveryPoint < offset) {
+  localLog.updateRecoveryPoint(offset)

Review Comment:
   It was updated based on log-start-offset with `updateLogStartOffset`, but 
local-log-start-offset can be more than that and it will be updated if needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-06-20 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1234980098


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -618,6 +625,230 @@ public void run() {
 }
 }
 
+public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
+if (isLeader()) {
+logger.debug("Updating {} with remoteLogStartOffset: {}", 
topicPartition, remoteLogStartOffset);
+updateRemoteLogStartOffset.accept(topicPartition, 
remoteLogStartOffset);
+}
+}
+
+class RemoteLogRetentionHandler {
+
+private final Optional retentionSizeData;
+private final Optional retentionTimeData;
+
+private long remainingBreachedSize;
+
+private OptionalLong logStartOffset = OptionalLong.empty();
+
+public RemoteLogRetentionHandler(Optional 
retentionSizeData, Optional retentionTimeData) {
+this.retentionSizeData = retentionSizeData;
+this.retentionTimeData = retentionTimeData;
+remainingBreachedSize = retentionSizeData.map(sizeData -> 
sizeData.remainingBreachedSize).orElse(0L);
+}
+
+private boolean 
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws 
RemoteStorageException, ExecutionException, InterruptedException {
+if (!retentionSizeData.isPresent()) {
+return false;
+}
+
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> {
+// Assumption that segments contain size >= 0
+if (retentionSizeData.get().remainingBreachedSize > 0) {
+remainingBreachedSize -= x.segmentSizeInBytes();
+return remainingBreachedSize >= 0;
+} else return false;
+});
+if (isSegmentDeleted) {
+logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+logger.info("Deleted remote log segment {} due to 
retention size {} breach. Log size after deletion will be {}.",
+metadata.remoteLogSegmentId(), 
retentionSizeData.get().retentionSize, remainingBreachedSize + 
retentionSizeData.get().retentionSize);
+}
+return isSegmentDeleted;
+}
+
+public boolean 
deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+if (!retentionTimeData.isPresent()) {
+return false;
+}
+
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata,
+x -> x.maxTimestampMs() <= 
retentionTimeData.get().cleanupUntilMs);
+if (isSegmentDeleted) {
+remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+// It is fine to have logStartOffset as 
`metadata.endOffset() + 1` as the segment offset intervals
+// are ascending with in an epoch.
+logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+logger.info("Deleted remote log segment {} due to 
retention time {}ms breach based on the largest record timestamp in the 
segment",
+metadata.remoteLogSegmentId(), 
retentionTimeData.get().retentionMs);
+}
+return isSegmentDeleted;
+}
+
+private boolean 
deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long 
startOffset)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> startOffset > x.endOffset());
+if (isSegmentDeleted && retentionSizeData.isPresent()) {
+remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+logger.info("Deleted remote log segment {} due to log 
start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+}
+
+// No need to update the logStartOffset.
+return isSegmentDeleted;
+}
+
+// It removes the segments beyond the current leader's earliest 
epoch. Those segments are considered as
+// unreferenced because they are not part of the current leader 
epoch lineage.
+private boolean 
deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, 
RemoteLogSegmentMetadata metadata) throws RemoteStorageException, 
ExecutionException, InterruptedException {
+boolean isSegmentDelete

[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-06-20 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1234978872


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1331,7 +1370,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => 
Boolean,
 reason: SegmentDeletionReason): Int = {
 def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): 
Boolean = {
-  highWatermark >= 
nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset) &&
+  val upperBoundOffset = 
nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset)
+
+  // Check not to delete segments which are not yet copied to tiered 
storage
+  val isSegmentTieredToRemoteStorage =
+if (remoteLogEnabled()) upperBoundOffset > 0 && upperBoundOffset - 1 
<= highestOffsetInRemoteStorage

Review Comment:
   It is inclusive, updated with the doc describing about the variable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-06-01 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1213113458


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1383,20 +1421,30 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 }
   }
 
+  private[log] def localRetentionMs: Long = {
+if (config.remoteLogConfig.remoteStorageEnable) 
config.remoteLogConfig.localRetentionMs else config.retentionMs
+  }
+
   private def deleteRetentionMsBreachedSegments(): Int = {
-if (config.retentionMs < 0) return 0
+val retentionMs = localRetentionMs

Review Comment:
   This is addressed with the latest commits. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-06-01 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1213112946


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -154,16 +154,41 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   locally {
 initializePartitionMetadata()
 updateLogStartOffset(logStartOffset)
+updateLocalLogStartOffset(math.max(logStartOffset, 
localLog.segments.firstSegmentBaseOffset.getOrElse(0L)))
+if (!remoteLogEnabled())
+  logStartOffset = localLogStartOffset()

Review Comment:
   We already set the localLogStartOffset as max of passed logStartOffset and 
the first segment's base offset. 
   When remote log is not enabled, `logStartOffset` is set as 
`localLogStartOffset` as computed above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-06-01 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1213107956


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -618,6 +629,193 @@ public void run() {
 }
 }
 
+public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
+logger.debug("Updating $topicPartition with remoteLogStartOffset: 
{}", remoteLogStartOffset);
+updateRemoteLogStartOffset.accept(topicPartition, 
remoteLogStartOffset);
+}
+
+class RemoteLogRetentionHandler {
+
+private long remainingBreachedSize = 0L;
+private OptionalLong logStartOffset = OptionalLong.empty();
+
+public RemoteLogRetentionHandler(long remainingBreachedSize) {
+this.remainingBreachedSize = remainingBreachedSize;
+}
+
+private boolean 
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata, boolean 
checkSizeRetention) throws RemoteStorageException, ExecutionException, 
InterruptedException {
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> {
+// Assumption that segments contain size > 0
+if (checkSizeRetention && remainingBreachedSize > 0) {
+remainingBreachedSize -= x.segmentSizeInBytes();
+return remainingBreachedSize >= 0;
+} else return false;
+});
+if (isSegmentDeleted) {
+logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+logger.info("Deleted remote log segment 
${metadata.remoteLogSegmentId()} due to retention size " +
+"${log.config.retentionSize} breach. Log size 
after deletion will be " +
+"${remainingBreachedSize + 
log.config.retentionSize}.");
+}
+return isSegmentDeleted;
+}
+
+private boolean 
deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long 
startOffset)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> startOffset > x.endOffset());
+if (isSegmentDeleted) {
+remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+logger.info("Deleted remote log segment {} due to log 
start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+}
+
+// No need to update the logStartOffset.
+return isSegmentDeleted;
+}
+
+// There are two cases:
+// 1) When there are offline partitions and a new replica with 
empty disk is brought as leader, then the

Review Comment:
   Updated the comment to make it more clear.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-06-01 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1213107255


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -618,6 +629,193 @@ public void run() {
 }
 }
 
+public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
+logger.debug("Updating $topicPartition with remoteLogStartOffset: 
{}", remoteLogStartOffset);
+updateRemoteLogStartOffset.accept(topicPartition, 
remoteLogStartOffset);
+}
+
+class RemoteLogRetentionHandler {
+
+private long remainingBreachedSize = 0L;
+private OptionalLong logStartOffset = OptionalLong.empty();
+
+public RemoteLogRetentionHandler(long remainingBreachedSize) {
+this.remainingBreachedSize = remainingBreachedSize;
+}
+
+private boolean 
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata, boolean 
checkSizeRetention) throws RemoteStorageException, ExecutionException, 
InterruptedException {
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> {
+// Assumption that segments contain size > 0
+if (checkSizeRetention && remainingBreachedSize > 0) {
+remainingBreachedSize -= x.segmentSizeInBytes();
+return remainingBreachedSize >= 0;
+} else return false;
+});
+if (isSegmentDeleted) {
+logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+logger.info("Deleted remote log segment 
${metadata.remoteLogSegmentId()} due to retention size " +
+"${log.config.retentionSize} breach. Log size 
after deletion will be " +
+"${remainingBreachedSize + 
log.config.retentionSize}.");
+}
+return isSegmentDeleted;
+}
+
+private boolean 
deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long 
startOffset)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> startOffset > x.endOffset());
+if (isSegmentDeleted) {
+remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+logger.info("Deleted remote log segment {} due to log 
start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+}
+
+// No need to update the logStartOffset.
+return isSegmentDeleted;
+}
+
+// There are two cases:
+// 1) When there are offline partitions and a new replica with 
empty disk is brought as leader, then the
+//leader-epoch gets bumped but the log-start-offset gets 
truncated back to 0.
+// 2) To remove the unreferenced segments.
+private boolean 
deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, 
RemoteLogSegmentMetadata metadata) throws RemoteStorageException, 
ExecutionException, InterruptedException {
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
->
+
x.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch < 
earliestEpochEntry.epoch));
+if (isSegmentDeleted) {
+logger.info("Deleted remote log segment ${} due to leader 
epoch cache truncation. Current earliest epoch: {}, segmentEndOffset: {} and 
segmentEpochs: {}",
+metadata.remoteLogSegmentId(), earliestEpochEntry, 
metadata.endOffset(), metadata.segmentLeaderEpochs().keySet());
+}
+
+// No need to update the log-start-offset as these 
epochs/offsets are earlier to that value.
+return isSegmentDeleted;
+}
+
+private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata 
segmentMetadata, Predicate predicate)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+if (predicate.test(segmentMetadata)) {
+// Publish delete segment started event.
+remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
+new 
RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), 
time.milliseconds(),
+
RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)).get();
+
+// Delete the segment in remote storage.
+
remoteLogStorageManager.deleteLogSegmentData(segmentMetadata);
+
+// Publish delete segment finished event.
+remoteLogMe

[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-06-01 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1213102668


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -618,6 +629,193 @@ public void run() {
 }
 }
 
+public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
+logger.debug("Updating $topicPartition with remoteLogStartOffset: 
{}", remoteLogStartOffset);
+updateRemoteLogStartOffset.accept(topicPartition, 
remoteLogStartOffset);
+}
+
+class RemoteLogRetentionHandler {
+
+private long remainingBreachedSize = 0L;
+private OptionalLong logStartOffset = OptionalLong.empty();
+
+public RemoteLogRetentionHandler(long remainingBreachedSize) {
+this.remainingBreachedSize = remainingBreachedSize;
+}
+
+private boolean 
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata, boolean 
checkSizeRetention) throws RemoteStorageException, ExecutionException, 
InterruptedException {
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> {
+// Assumption that segments contain size > 0
+if (checkSizeRetention && remainingBreachedSize > 0) {
+remainingBreachedSize -= x.segmentSizeInBytes();
+return remainingBreachedSize >= 0;
+} else return false;
+});
+if (isSegmentDeleted) {
+logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+logger.info("Deleted remote log segment 
${metadata.remoteLogSegmentId()} due to retention size " +
+"${log.config.retentionSize} breach. Log size 
after deletion will be " +
+"${remainingBreachedSize + 
log.config.retentionSize}.");
+}
+return isSegmentDeleted;
+}
+
+private boolean 
deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long 
startOffset)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> startOffset > x.endOffset());
+if (isSegmentDeleted) {
+remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+logger.info("Deleted remote log segment {} due to log 
start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+}
+
+// No need to update the logStartOffset.
+return isSegmentDeleted;
+}
+
+// There are two cases:
+// 1) When there are offline partitions and a new replica with 
empty disk is brought as leader, then the
+//leader-epoch gets bumped but the log-start-offset gets 
truncated back to 0.
+// 2) To remove the unreferenced segments.
+private boolean 
deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, 
RemoteLogSegmentMetadata metadata) throws RemoteStorageException, 
ExecutionException, InterruptedException {
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
->
+
x.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch < 
earliestEpochEntry.epoch));
+if (isSegmentDeleted) {
+logger.info("Deleted remote log segment ${} due to leader 
epoch cache truncation. Current earliest epoch: {}, segmentEndOffset: {} and 
segmentEpochs: {}",
+metadata.remoteLogSegmentId(), earliestEpochEntry, 
metadata.endOffset(), metadata.segmentLeaderEpochs().keySet());
+}
+
+// No need to update the log-start-offset as these 
epochs/offsets are earlier to that value.
+return isSegmentDeleted;
+}
+
+private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata 
segmentMetadata, Predicate predicate)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+if (predicate.test(segmentMetadata)) {
+// Publish delete segment started event.
+remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
+new 
RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), 
time.milliseconds(),
+
RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)).get();
+
+// Delete the segment in remote storage.
+
remoteLogStorageManager.deleteLogSegmentData(segmentMetadata);
+
+// Publish delete segment finished event.
+remoteLogMe

[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-06-01 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1213100593


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -618,6 +629,193 @@ public void run() {
 }
 }
 
+public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
+logger.debug("Updating $topicPartition with remoteLogStartOffset: 
{}", remoteLogStartOffset);
+updateRemoteLogStartOffset.accept(topicPartition, 
remoteLogStartOffset);
+}
+
+class RemoteLogRetentionHandler {
+
+private long remainingBreachedSize = 0L;
+private OptionalLong logStartOffset = OptionalLong.empty();
+
+public RemoteLogRetentionHandler(long remainingBreachedSize) {
+this.remainingBreachedSize = remainingBreachedSize;
+}
+
+private boolean 
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata, boolean 
checkSizeRetention) throws RemoteStorageException, ExecutionException, 
InterruptedException {
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> {
+// Assumption that segments contain size > 0
+if (checkSizeRetention && remainingBreachedSize > 0) {
+remainingBreachedSize -= x.segmentSizeInBytes();
+return remainingBreachedSize >= 0;
+} else return false;
+});
+if (isSegmentDeleted) {
+logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+logger.info("Deleted remote log segment 
${metadata.remoteLogSegmentId()} due to retention size " +
+"${log.config.retentionSize} breach. Log size 
after deletion will be " +
+"${remainingBreachedSize + 
log.config.retentionSize}.");
+}
+return isSegmentDeleted;
+}
+
+private boolean 
deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long 
startOffset)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> startOffset > x.endOffset());
+if (isSegmentDeleted) {
+remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+logger.info("Deleted remote log segment {} due to log 
start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+}
+
+// No need to update the logStartOffset.
+return isSegmentDeleted;
+}
+
+// There are two cases:
+// 1) When there are offline partitions and a new replica with 
empty disk is brought as leader, then the
+//leader-epoch gets bumped but the log-start-offset gets 
truncated back to 0.
+// 2) To remove the unreferenced segments.
+private boolean 
deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, 
RemoteLogSegmentMetadata metadata) throws RemoteStorageException, 
ExecutionException, InterruptedException {
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
->
+
x.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch < 
earliestEpochEntry.epoch));
+if (isSegmentDeleted) {
+logger.info("Deleted remote log segment ${} due to leader 
epoch cache truncation. Current earliest epoch: {}, segmentEndOffset: {} and 
segmentEpochs: {}",
+metadata.remoteLogSegmentId(), earliestEpochEntry, 
metadata.endOffset(), metadata.segmentLeaderEpochs().keySet());
+}
+
+// No need to update the log-start-offset as these 
epochs/offsets are earlier to that value.
+return isSegmentDeleted;
+}
+
+private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata 
segmentMetadata, Predicate predicate)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+if (predicate.test(segmentMetadata)) {
+// Publish delete segment started event.
+remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
+new 
RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), 
time.milliseconds(),
+
RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)).get();
+
+// Delete the segment in remote storage.
+
remoteLogStorageManager.deleteLogSegmentData(segmentMetadata);
+
+// Publish delete segment finished event.
+remoteLogMe

[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-05-23 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1181520975


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -581,11 +588,18 @@ public void run() {
 if (isLeader()) {
 // Copy log segments to remote storage
 copyLogSegmentsToRemote();
+// Cleanup/delete expired remote log segments
+cleanupExpiredRemoteLogSegments();

Review Comment:
   This is different from local log deletion. It requires the deletion of 
segments from local storage which need to really delete the files. 
   But incase of remote storages, it does not wait for the data to be deleted 
but it marks the file or object for deletion in their respective metadata 
stores. Respective garbage collectors in those storages will take care of 
deleting the data asynchronously. There is no perf impact for these delete 
calls as they take a much shorter time than copying segments. It is very 
unlikely that copying segments get affected because of the deletion of 
segments. Deletion checks are happening in every iteration so there will not be 
many segments that need to be deleted. 
   Anyways, we can discuss this separately in a separate JIRA. On another note, 
all this logic will go to UnifiedLog in future.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-05-01 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1181520975


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -581,11 +588,18 @@ public void run() {
 if (isLeader()) {
 // Copy log segments to remote storage
 copyLogSegmentsToRemote();
+// Cleanup/delete expired remote log segments
+cleanupExpiredRemoteLogSegments();

Review Comment:
   This is different from local log deletion. It requires the deletion of 
segments from local storage which need to really delete the files. B
   But incase of remote storages, it does not wait for the data to be deleted 
but it marks the file or object for deletion in their respective metadata 
stores. Respective garbage collectors in those storages will take care of 
deleting the data asynchronously. There is no perf impact for these delete 
calls as they take a much shorter time than copying segments. It is very 
unlikely that copying segments get affected because of the deletion of 
segments. Deletion checks are happening in every iteration so there will not be 
many segments that need to be deleted. 
   Anyways, we can discuss this separately in a separate JIRA. On another note, 
all this logic will go to UnifiedLog in future.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-05-01 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1181532348


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -595,6 +609,193 @@ public void run() {
 }
 }
 
+public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
+logger.debug("Updating $topicPartition with remoteLogStartOffset: 
{}", remoteLogStartOffset);
+updateRemoteLogStartOffset.accept(topicPartition, 
remoteLogStartOffset);
+}
+
+class RemoteLogRetentionHandler {
+
+private long remainingBreachedSize = 0L;
+private OptionalLong logStartOffset = OptionalLong.empty();
+
+public RemoteLogRetentionHandler(long remainingBreachedSize) {
+this.remainingBreachedSize = remainingBreachedSize;
+}
+
+private boolean 
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata, boolean 
checkSizeRetention) throws RemoteStorageException, ExecutionException, 
InterruptedException {
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> {
+// Assumption that segments contain size > 0
+if (checkSizeRetention && remainingBreachedSize > 0) {
+remainingBreachedSize -= x.segmentSizeInBytes();
+return remainingBreachedSize >= 0;
+} else return false;
+});
+if (isSegmentDeleted) {
+logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+logger.info("Deleted remote log segment 
${metadata.remoteLogSegmentId()} due to retention size " +
+"${log.config.retentionSize} breach. Log size 
after deletion will be " +
+"${remainingBreachedSize + 
log.config.retentionSize}.");
+}
+return isSegmentDeleted;
+}
+
+private boolean 
deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long 
startOffset)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> startOffset > x.endOffset());
+if (isSegmentDeleted) {
+remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+logger.info("Deleted remote log segment {} due to log 
start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+}
+
+// No need to update the logStartOffset.
+return isSegmentDeleted;
+}
+
+// There are two cases:
+// 1) When there are offline partitions and a new replica with 
empty disk is brought as leader, then the
+//leader-epoch gets bumped but the log-start-offset gets 
truncated back to 0.
+// 2) To remove the unreferenced segments.
+private boolean 
deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, 
RemoteLogSegmentMetadata metadata) throws RemoteStorageException, 
ExecutionException, InterruptedException {
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
->
+
x.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch < 
earliestEpochEntry.epoch));
+if (isSegmentDeleted) {
+logger.info("Deleted remote log segment ${} due to leader 
epoch cache truncation. Current earliest epoch: {}, segmentEndOffset: {} and 
segmentEpochs: {}",
+metadata.remoteLogSegmentId(), earliestEpochEntry, 
metadata.endOffset(), metadata.segmentLeaderEpochs().keySet());
+}
+
+// No need to update the log-start-offset as these 
epochs/offsets are earlier to that value.
+return isSegmentDeleted;
+}
+
+private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata 
segmentMetadata, Predicate predicate)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+if (predicate.test(segmentMetadata)) {
+// Publish delete segment started event.
+remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
+new 
RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), 
time.milliseconds(),
+
RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)).get();
+
+// Delete the segment in remote storage.
+
remoteLogStorageManager.deleteLogSegmentData(segmentMetadata);
+
+// Publish delete segment finished event.
+remoteLogMe

[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-05-01 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1181529149


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -595,6 +609,193 @@ public void run() {
 }
 }
 
+public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
+logger.debug("Updating $topicPartition with remoteLogStartOffset: 
{}", remoteLogStartOffset);
+updateRemoteLogStartOffset.accept(topicPartition, 
remoteLogStartOffset);
+}
+
+class RemoteLogRetentionHandler {
+
+private long remainingBreachedSize = 0L;
+private OptionalLong logStartOffset = OptionalLong.empty();
+
+public RemoteLogRetentionHandler(long remainingBreachedSize) {
+this.remainingBreachedSize = remainingBreachedSize;
+}
+
+private boolean 
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata, boolean 
checkSizeRetention) throws RemoteStorageException, ExecutionException, 
InterruptedException {
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> {
+// Assumption that segments contain size > 0
+if (checkSizeRetention && remainingBreachedSize > 0) {
+remainingBreachedSize -= x.segmentSizeInBytes();
+return remainingBreachedSize >= 0;
+} else return false;
+});
+if (isSegmentDeleted) {
+logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+logger.info("Deleted remote log segment 
${metadata.remoteLogSegmentId()} due to retention size " +
+"${log.config.retentionSize} breach. Log size 
after deletion will be " +
+"${remainingBreachedSize + 
log.config.retentionSize}.");
+}
+return isSegmentDeleted;
+}
+
+private boolean 
deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long 
startOffset)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> startOffset > x.endOffset());
+if (isSegmentDeleted) {
+remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+logger.info("Deleted remote log segment {} due to log 
start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+}
+
+// No need to update the logStartOffset.
+return isSegmentDeleted;
+}
+
+// There are two cases:
+// 1) When there are offline partitions and a new replica with 
empty disk is brought as leader, then the
+//leader-epoch gets bumped but the log-start-offset gets 
truncated back to 0.
+// 2) To remove the unreferenced segments.
+private boolean 
deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, 
RemoteLogSegmentMetadata metadata) throws RemoteStorageException, 
ExecutionException, InterruptedException {
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
->
+
x.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch < 
earliestEpochEntry.epoch));
+if (isSegmentDeleted) {
+logger.info("Deleted remote log segment ${} due to leader 
epoch cache truncation. Current earliest epoch: {}, segmentEndOffset: {} and 
segmentEpochs: {}",
+metadata.remoteLogSegmentId(), earliestEpochEntry, 
metadata.endOffset(), metadata.segmentLeaderEpochs().keySet());
+}
+
+// No need to update the log-start-offset as these 
epochs/offsets are earlier to that value.
+return isSegmentDeleted;
+}
+
+private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata 
segmentMetadata, Predicate predicate)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+if (predicate.test(segmentMetadata)) {
+// Publish delete segment started event.
+remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
+new 
RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), 
time.milliseconds(),
+
RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)).get();
+
+// Delete the segment in remote storage.
+
remoteLogStorageManager.deleteLogSegmentData(segmentMetadata);
+
+// Publish delete segment finished event.
+remoteLogMe

[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-05-01 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1181528820


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -595,6 +609,193 @@ public void run() {
 }
 }
 
+public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
+logger.debug("Updating $topicPartition with remoteLogStartOffset: 
{}", remoteLogStartOffset);
+updateRemoteLogStartOffset.accept(topicPartition, 
remoteLogStartOffset);
+}
+
+class RemoteLogRetentionHandler {
+
+private long remainingBreachedSize = 0L;
+private OptionalLong logStartOffset = OptionalLong.empty();
+
+public RemoteLogRetentionHandler(long remainingBreachedSize) {
+this.remainingBreachedSize = remainingBreachedSize;
+}
+
+private boolean 
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata, boolean 
checkSizeRetention) throws RemoteStorageException, ExecutionException, 
InterruptedException {
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> {
+// Assumption that segments contain size > 0
+if (checkSizeRetention && remainingBreachedSize > 0) {
+remainingBreachedSize -= x.segmentSizeInBytes();
+return remainingBreachedSize >= 0;
+} else return false;
+});
+if (isSegmentDeleted) {
+logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+logger.info("Deleted remote log segment 
${metadata.remoteLogSegmentId()} due to retention size " +
+"${log.config.retentionSize} breach. Log size 
after deletion will be " +
+"${remainingBreachedSize + 
log.config.retentionSize}.");
+}
+return isSegmentDeleted;
+}
+
+private boolean 
deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long 
startOffset)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> startOffset > x.endOffset());
+if (isSegmentDeleted) {
+remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+logger.info("Deleted remote log segment {} due to log 
start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+}
+
+// No need to update the logStartOffset.
+return isSegmentDeleted;
+}
+
+// There are two cases:
+// 1) When there are offline partitions and a new replica with 
empty disk is brought as leader, then the
+//leader-epoch gets bumped but the log-start-offset gets 
truncated back to 0.
+// 2) To remove the unreferenced segments.

Review Comment:
   Unreferenced segments within the current leader epoch chain will eventually 
move earlier to the earliest epoch of the current leader epoch chain after a 
few retention checks. That will take care of those kinds of segments. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-05-01 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1181527119


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -595,6 +609,193 @@ public void run() {
 }
 }
 
+public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
+logger.debug("Updating $topicPartition with remoteLogStartOffset: 
{}", remoteLogStartOffset);
+updateRemoteLogStartOffset.accept(topicPartition, 
remoteLogStartOffset);
+}
+
+class RemoteLogRetentionHandler {
+
+private long remainingBreachedSize = 0L;
+private OptionalLong logStartOffset = OptionalLong.empty();
+
+public RemoteLogRetentionHandler(long remainingBreachedSize) {
+this.remainingBreachedSize = remainingBreachedSize;
+}
+
+private boolean 
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata, boolean 
checkSizeRetention) throws RemoteStorageException, ExecutionException, 
InterruptedException {
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> {
+// Assumption that segments contain size > 0
+if (checkSizeRetention && remainingBreachedSize > 0) {
+remainingBreachedSize -= x.segmentSizeInBytes();
+return remainingBreachedSize >= 0;
+} else return false;
+});
+if (isSegmentDeleted) {
+logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+logger.info("Deleted remote log segment 
${metadata.remoteLogSegmentId()} due to retention size " +
+"${log.config.retentionSize} breach. Log size 
after deletion will be " +
+"${remainingBreachedSize + 
log.config.retentionSize}.");
+}
+return isSegmentDeleted;
+}
+
+private boolean 
deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long 
startOffset)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> startOffset > x.endOffset());
+if (isSegmentDeleted) {
+remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+logger.info("Deleted remote log segment {} due to log 
start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+}
+
+// No need to update the logStartOffset.
+return isSegmentDeleted;
+}
+
+// There are two cases:
+// 1) When there are offline partitions and a new replica with 
empty disk is brought as leader, then the
+//leader-epoch gets bumped but the log-start-offset gets 
truncated back to 0.
+// 2) To remove the unreferenced segments.
+private boolean 
deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, 
RemoteLogSegmentMetadata metadata) throws RemoteStorageException, 
ExecutionException, InterruptedException {
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
->
+
x.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch < 
earliestEpochEntry.epoch));
+if (isSegmentDeleted) {
+logger.info("Deleted remote log segment ${} due to leader 
epoch cache truncation. Current earliest epoch: {}, segmentEndOffset: {} and 
segmentEpochs: {}",
+metadata.remoteLogSegmentId(), earliestEpochEntry, 
metadata.endOffset(), metadata.segmentLeaderEpochs().keySet());
+}
+
+// No need to update the log-start-offset as these 
epochs/offsets are earlier to that value.
+return isSegmentDeleted;
+}
+
+private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata 
segmentMetadata, Predicate predicate)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+if (predicate.test(segmentMetadata)) {
+// Publish delete segment started event.
+remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
+new 
RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), 
time.milliseconds(),
+
RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)).get();
+
+// Delete the segment in remote storage.
+
remoteLogStorageManager.deleteLogSegmentData(segmentMetadata);
+
+// Publish delete segment finished event.
+remoteLogMe

[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-05-01 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1181526976


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -595,6 +609,193 @@ public void run() {
 }
 }
 
+public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
+logger.debug("Updating $topicPartition with remoteLogStartOffset: 
{}", remoteLogStartOffset);
+updateRemoteLogStartOffset.accept(topicPartition, 
remoteLogStartOffset);
+}
+
+class RemoteLogRetentionHandler {
+
+private long remainingBreachedSize = 0L;
+private OptionalLong logStartOffset = OptionalLong.empty();
+
+public RemoteLogRetentionHandler(long remainingBreachedSize) {
+this.remainingBreachedSize = remainingBreachedSize;
+}
+
+private boolean 
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata, boolean 
checkSizeRetention) throws RemoteStorageException, ExecutionException, 
InterruptedException {
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> {
+// Assumption that segments contain size > 0
+if (checkSizeRetention && remainingBreachedSize > 0) {
+remainingBreachedSize -= x.segmentSizeInBytes();
+return remainingBreachedSize >= 0;
+} else return false;
+});
+if (isSegmentDeleted) {
+logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+logger.info("Deleted remote log segment 
${metadata.remoteLogSegmentId()} due to retention size " +
+"${log.config.retentionSize} breach. Log size 
after deletion will be " +
+"${remainingBreachedSize + 
log.config.retentionSize}.");
+}
+return isSegmentDeleted;
+}
+
+private boolean 
deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long 
startOffset)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> startOffset > x.endOffset());
+if (isSegmentDeleted) {
+remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+logger.info("Deleted remote log segment {} due to log 
start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+}
+
+// No need to update the logStartOffset.
+return isSegmentDeleted;
+}
+
+// There are two cases:
+// 1) When there are offline partitions and a new replica with 
empty disk is brought as leader, then the
+//leader-epoch gets bumped but the log-start-offset gets 
truncated back to 0.
+// 2) To remove the unreferenced segments.
+private boolean 
deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, 
RemoteLogSegmentMetadata metadata) throws RemoteStorageException, 
ExecutionException, InterruptedException {
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
->
+
x.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch < 
earliestEpochEntry.epoch));
+if (isSegmentDeleted) {
+logger.info("Deleted remote log segment ${} due to leader 
epoch cache truncation. Current earliest epoch: {}, segmentEndOffset: {} and 
segmentEpochs: {}",
+metadata.remoteLogSegmentId(), earliestEpochEntry, 
metadata.endOffset(), metadata.segmentLeaderEpochs().keySet());
+}
+
+// No need to update the log-start-offset as these 
epochs/offsets are earlier to that value.
+return isSegmentDeleted;
+}
+
+private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata 
segmentMetadata, Predicate predicate)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+if (predicate.test(segmentMetadata)) {
+// Publish delete segment started event.
+remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
+new 
RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), 
time.milliseconds(),
+
RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)).get();
+
+// Delete the segment in remote storage.
+
remoteLogStorageManager.deleteLogSegmentData(segmentMetadata);
+
+// Publish delete segment finished event.
+remoteLogMe

[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-05-01 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1181520975


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -581,11 +588,18 @@ public void run() {
 if (isLeader()) {
 // Copy log segments to remote storage
 copyLogSegmentsToRemote();
+// Cleanup/delete expired remote log segments
+cleanupExpiredRemoteLogSegments();

Review Comment:
   This is different from local log deletion. It requires the deletion of 
segments from local storage which need to really delete the files. But incase 
of remote storages, it does not wait for the data to be deleted but it marks 
the file or object for deletion in their respective metadata stores. Respective 
garbage collectors in those storages will take care of deleting the data 
asynchronously. There is no perf impact for these delete calls as they take 
much shorter time than copying segments. It is very unlikely that copying 
segments get affected because of deletion of segments. Deletion checks are 
happening in every iteration so there will not be many segments that need to be 
deleted. Anyways, we can discuss this separately in a separate JIRA. On another 
note, all this logic will go to UnifiedLog in future.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org