[GitHub] [kafka] lbradstreet commented on a change in pull request #10914: [KAFKA-8522] Streamline tombstone and transaction marker removal

2021-09-05 Thread GitBox


lbradstreet commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r702508503



##
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##
@@ -198,8 +199,23 @@ private[log] class LogCleanerManager(val logDirs: 
Seq[File],
   val cleanableLogs = dirtyLogs.filter { ltc =>
 (ltc.needCompactionNow && ltc.cleanableBytes > 0) || 
ltc.cleanableRatio > ltc.log.config.minCleanableRatio
   }
+
   if(cleanableLogs.isEmpty) {
-None
+val logsWithTombstonesExpired = dirtyLogs.filter {
+  case ltc => 
+// in this case, we are probably in a low throughput situation
+// therefore, we should take advantage of this fact and remove 
tombstones if we can
+// under the condition that the log's latest delete horizon is 
less than the current time
+// tracked
+ltc.log.latestDeleteHorizon != RecordBatch.NO_TIMESTAMP && 
ltc.log.latestDeleteHorizon <= time.milliseconds()

Review comment:
   It seems like whether we track the delete horizon or the # of tombstones 
we will need to checkpoint some state. Otherwise we will be forced to perform a 
pass after every broker restart. Could we track the delete horizon upon each 
log append, when we clean the log, and when we have to recover the log?
   
   I'm not sure where a checkpoint should be stored given our current 
checkpoint file formats and the need to support downgrades.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lbradstreet commented on a change in pull request #10914: [KAFKA-8522] Streamline tombstone and transaction marker removal

2021-09-05 Thread GitBox


lbradstreet commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r702503828



##
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##
@@ -198,13 +204,28 @@ private[log] class LogCleanerManager(val logDirs: 
Seq[File],
   val cleanableLogs = dirtyLogs.filter { ltc =>
 (ltc.needCompactionNow && ltc.cleanableBytes > 0) || 
ltc.cleanableRatio > ltc.log.config.minCleanableRatio
   }
+

Review comment:
   Preexisting nit issue: whitespace between `if` and `(` in 
`if(cleanableLogs`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lbradstreet commented on a change in pull request #10914: [KAFKA-8522] Streamline tombstone and transaction marker removal

2021-09-05 Thread GitBox


lbradstreet commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r702503726



##
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##
@@ -163,17 +168,18 @@ private[log] class LogCleanerManager(val logDirs: 
Seq[File],
 * Choose the log to clean next and add it to the in-progress set. We 
recompute this
 * each time from the full set of logs to allow logs to be dynamically 
added to the pool of logs
 * the log manager maintains.
+* Returns a tuple of an Option of the log selected to be cleaned and the 
reason it was selected.
 */
-  def grabFilthiestCompactedLog(time: Time, preCleanStats: PreCleanStats = new 
PreCleanStats()): Option[LogToClean] = {
+  def grabFilthiestCompactedLog(time: Time, preCleanStats: PreCleanStats = new 
PreCleanStats()): (Option[LogToClean], LogCleaningReason) = {

Review comment:
   Could LogCleaningReason be included as a field in LogToClean?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org