Re: [PR] MINOR; Log reason for deleting a kraft snapshot [kafka]

2024-03-18 Thread via GitHub


jsancio merged PR #15478:
URL: https://github.com/apache/kafka/pull/15478


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR; Log reason for deleting a kraft snapshot [kafka]

2024-03-13 Thread via GitHub


jsancio commented on code in PR #15478:
URL: https://github.com/apache/kafka/pull/15478#discussion_r1523954492


##
core/src/main/scala/kafka/raft/KafkaMetadataLog.scala:
##
@@ -348,14 +357,15 @@ final class KafkaMetadataLog private (
   snapshotId.offset <= latestSnapshotId.offset &&
   log.maybeIncrementLogStartOffset(snapshotId.offset, 
LogStartOffsetIncrementReason.SnapshotGenerated) =>
 // Delete all segments that have a "last offset" less than the log 
start offset
-log.deleteOldSegments()
+val deletedSegments = log.deleteOldSegments()
 // Remove older snapshots from the snapshots cache
-(true, forgetSnapshotsBefore(snapshotId))
+val forgottenSnapshots = forgetSnapshotsBefore(snapshotId)
+(deletedSegments != 0 || forgottenSnapshots.nonEmpty, 
forgottenSnapshots)

Review Comment:
   The old code was probably correct in practice. I just wanted to make clearer 
that either one could be true for this function to return true.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR; Log reason for deleting a kraft snapshot [kafka]

2024-03-13 Thread via GitHub


jsancio commented on code in PR #15478:
URL: https://github.com/apache/kafka/pull/15478#discussion_r1523951275


##
core/src/main/scala/kafka/raft/KafkaMetadataLog.scala:
##
@@ -404,21 +414,33 @@ final class KafkaMetadataLog private (
* all cases.
*
* For the given predicate, we are testing if the snapshot identified by the 
first argument should be deleted.
+   * The predicate returns a Some with the reason if the snapshot should be 
deleted and a None if the snapshot
+   * should not be deleted
*/
-  private def cleanSnapshots(predicate: OffsetAndEpoch => Boolean): Boolean = {
-if (snapshots.size < 2)
+  private def cleanSnapshots(predicate: OffsetAndEpoch => 
Option[SnapshotDeletionReason]): Boolean = {
+if (snapshots.size < 2) {
   return false
+}
 
 var didClean = false
 snapshots.keys.toSeq.sliding(2).foreach {
   case Seq(snapshot: OffsetAndEpoch, nextSnapshot: OffsetAndEpoch) =>
-if (predicate(snapshot) && deleteBeforeSnapshot(nextSnapshot)) {
-  didClean = true
-} else {
-  return didClean
+predicate(snapshot) match {
+  case Some(reason) =>
+if (deleteBeforeSnapshot(nextSnapshot, reason)) {

Review Comment:
   Yes. `&&` only evaluates the right argument if the left argument is `true`. 
Which is what we want because we don't want to delete the snapshot if the 
predicate is false.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR; Log reason for deleting a kraft snapshot [kafka]

2024-03-13 Thread via GitHub


mumrah commented on code in PR #15478:
URL: https://github.com/apache/kafka/pull/15478#discussion_r1523630806


##
core/src/main/scala/kafka/raft/KafkaMetadataLog.scala:
##
@@ -404,21 +414,33 @@ final class KafkaMetadataLog private (
* all cases.
*
* For the given predicate, we are testing if the snapshot identified by the 
first argument should be deleted.
+   * The predicate returns a Some with the reason if the snapshot should be 
deleted and a None if the snapshot
+   * should not be deleted
*/
-  private def cleanSnapshots(predicate: OffsetAndEpoch => Boolean): Boolean = {
-if (snapshots.size < 2)
+  private def cleanSnapshots(predicate: OffsetAndEpoch => 
Option[SnapshotDeletionReason]): Boolean = {
+if (snapshots.size < 2) {
   return false
+}
 
 var didClean = false
 snapshots.keys.toSeq.sliding(2).foreach {
   case Seq(snapshot: OffsetAndEpoch, nextSnapshot: OffsetAndEpoch) =>
-if (predicate(snapshot) && deleteBeforeSnapshot(nextSnapshot)) {
-  didClean = true
-} else {
-  return didClean
+predicate(snapshot) match {
+  case Some(reason) =>
+if (deleteBeforeSnapshot(nextSnapshot, reason)) {

Review Comment:
   I assume `&&` will short-circuit in Scala like it does in Java (in which 
case this change looks correct)



##
core/src/main/scala/kafka/raft/KafkaMetadataLog.scala:
##
@@ -677,4 +675,38 @@ object KafkaMetadataLog extends Logging {
   Snapshots.deleteIfExists(logDir, snapshotId)
 }
   }
+
+  private sealed trait SnapshotDeletionReason {
+def reason(snapshotId: OffsetAndEpoch): String
+  }
+
+  private final case class RetentionMsBreach(now: Long, timestamp: Long, 
retentionMillis: Long) extends SnapshotDeletionReason {
+override def reason(snapshotId: OffsetAndEpoch): String = {
+  s"""Marking snapshot $snapshotId for deletion because it timestamp 
($timestamp) is now ($now) older than the
+  |retention ($retentionMillis""".stripMargin

Review Comment:
   is there a missing ")" here, or should we not have the opening "("?



##
core/src/main/scala/kafka/raft/KafkaMetadataLog.scala:
##
@@ -348,14 +357,15 @@ final class KafkaMetadataLog private (
   snapshotId.offset <= latestSnapshotId.offset &&
   log.maybeIncrementLogStartOffset(snapshotId.offset, 
LogStartOffsetIncrementReason.SnapshotGenerated) =>
 // Delete all segments that have a "last offset" less than the log 
start offset
-log.deleteOldSegments()
+val deletedSegments = log.deleteOldSegments()
 // Remove older snapshots from the snapshots cache
-(true, forgetSnapshotsBefore(snapshotId))
+val forgottenSnapshots = forgetSnapshotsBefore(snapshotId)
+(deletedSegments != 0 || forgottenSnapshots.nonEmpty, 
forgottenSnapshots)

Review Comment:
   Interesting, did we have a bug here previously? Looks like before we would 
always report `deleted=true` even if nothing was deleted. I wonder if we even 
use this return value 樂 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR; Log reason for deleting a kraft snapshot [kafka]

2024-03-06 Thread via GitHub


hni61223 commented on code in PR #15478:
URL: https://github.com/apache/kafka/pull/15478#discussion_r1514278368


##
core/src/main/scala/kafka/raft/KafkaMetadataLog.scala:
##
@@ -677,4 +675,38 @@ object KafkaMetadataLog extends Logging {
   Snapshots.deleteIfExists(logDir, snapshotId)
 }
   }
+
+  private sealed trait SnapshotDeletionReason {
+def reason(snapshotId: OffsetAndEpoch): String
+  }
+
+  private final case class RetentionMsBreach(now: Long, timestamp: Long, 
retentionMillis: Long) extends SnapshotDeletionReason {
+override def reason(snapshotId: OffsetAndEpoch): String = {
+  s"""Marking snapshot $snapshotId for deletion because it timestamp 
($timestamp) is now ($now) older than the

Review Comment:
   nit: its timestamp



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR; Log reason for deleting a kraft snapshot [kafka]

2024-03-06 Thread via GitHub


hni61223 commented on code in PR #15478:
URL: https://github.com/apache/kafka/pull/15478#discussion_r1514278368


##
core/src/main/scala/kafka/raft/KafkaMetadataLog.scala:
##
@@ -677,4 +675,38 @@ object KafkaMetadataLog extends Logging {
   Snapshots.deleteIfExists(logDir, snapshotId)
 }
   }
+
+  private sealed trait SnapshotDeletionReason {
+def reason(snapshotId: OffsetAndEpoch): String
+  }
+
+  private final case class RetentionMsBreach(now: Long, timestamp: Long, 
retentionMillis: Long) extends SnapshotDeletionReason {
+override def reason(snapshotId: OffsetAndEpoch): String = {
+  s"""Marking snapshot $snapshotId for deletion because it timestamp 
($timestamp) is now ($now) older than the

Review Comment:
   nit: it's timestamp



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org