[GitHub] [kafka] lbradstreet commented on a change in pull request #10914: [KAFKA-8522] Streamline tombstone and transaction marker removal
lbradstreet commented on a change in pull request #10914: URL: https://github.com/apache/kafka/pull/10914#discussion_r702508503 ## File path: core/src/main/scala/kafka/log/LogCleanerManager.scala ## @@ -198,8 +199,23 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], val cleanableLogs = dirtyLogs.filter { ltc => (ltc.needCompactionNow && ltc.cleanableBytes > 0) || ltc.cleanableRatio > ltc.log.config.minCleanableRatio } + if(cleanableLogs.isEmpty) { -None +val logsWithTombstonesExpired = dirtyLogs.filter { + case ltc => +// in this case, we are probably in a low throughput situation +// therefore, we should take advantage of this fact and remove tombstones if we can +// under the condition that the log's latest delete horizon is less than the current time +// tracked +ltc.log.latestDeleteHorizon != RecordBatch.NO_TIMESTAMP && ltc.log.latestDeleteHorizon <= time.milliseconds() Review comment: It seems like whether we track the delete horizon or the # of tombstones we will need to checkpoint some state. Otherwise we will be forced to perform a pass after every broker restart. Could we track the delete horizon upon each log append, when we clean the log, and when we have to recover the log? I'm not sure where a checkpoint should be stored given our current checkpoint file formats and the need to support downgrades. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lbradstreet commented on a change in pull request #10914: [KAFKA-8522] Streamline tombstone and transaction marker removal
lbradstreet commented on a change in pull request #10914: URL: https://github.com/apache/kafka/pull/10914#discussion_r702503828 ## File path: core/src/main/scala/kafka/log/LogCleanerManager.scala ## @@ -198,13 +204,28 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], val cleanableLogs = dirtyLogs.filter { ltc => (ltc.needCompactionNow && ltc.cleanableBytes > 0) || ltc.cleanableRatio > ltc.log.config.minCleanableRatio } + Review comment: Preexisting nit issue: whitespace between `if` and `(` in `if(cleanableLogs`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lbradstreet commented on a change in pull request #10914: [KAFKA-8522] Streamline tombstone and transaction marker removal
lbradstreet commented on a change in pull request #10914: URL: https://github.com/apache/kafka/pull/10914#discussion_r702503726 ## File path: core/src/main/scala/kafka/log/LogCleanerManager.scala ## @@ -163,17 +168,18 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], * Choose the log to clean next and add it to the in-progress set. We recompute this * each time from the full set of logs to allow logs to be dynamically added to the pool of logs * the log manager maintains. +* Returns a tuple of an Option of the log selected to be cleaned and the reason it was selected. */ - def grabFilthiestCompactedLog(time: Time, preCleanStats: PreCleanStats = new PreCleanStats()): Option[LogToClean] = { + def grabFilthiestCompactedLog(time: Time, preCleanStats: PreCleanStats = new PreCleanStats()): (Option[LogToClean], LogCleaningReason) = { Review comment: Could LogCleaningReason be included as a field in LogToClean? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org