[jira] [Commented] (KAFKA-7321) ensure timely processing of deletion requests in Kafka topic (Time-based log compaction)
[ https://issues.apache.org/jira/browse/KAFKA-7321?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16838668#comment-16838668 ] ASF GitHub Bot commented on KAFKA-7321: --- jjkoshy commented on pull request #6009: KAFKA-7321: Add a Maximum Log Compaction Lag (KIP-354) URL: https://github.com/apache/kafka/pull/6009 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > ensure timely processing of deletion requests in Kafka topic (Time-based log > compaction) > > > Key: KAFKA-7321 > URL: https://issues.apache.org/jira/browse/KAFKA-7321 > Project: Kafka > Issue Type: Improvement > Components: log >Reporter: xiongqi wu >Assignee: xiongqi wu >Priority: Major > > _Compaction enables Kafka to remove old messages that are flagged for > deletion while other messages can be retained for a relatively longer time. > Today, a log segment may remain un-compacted for a long time since the > eligibility for log compaction is determined based on compaction ratio > (“min.cleanable.dirty.ratio”) and min compaction lag > ("min.compaction.lag.ms") setting. Ability to delete a log message through > compaction in a timely manner has become an important requirement in some use > cases (e.g., GDPR). For example, one use case is to delete PII (Personal > Identifiable information) data within 7 days while keeping non-PII > indefinitely in compacted format. The goal of this change is to provide a > time-based compaction policy that ensures the cleanable section is compacted > after the specified time interval regardless of dirty ratio and “min > compaction lag”. However, dirty ratio and “min compaction lag” are still > honored if the time based compaction rule is not violated. In other words, if > Kafka receives a deletion request on a key (e..g, a key with null value), the > corresponding log segment will be picked up for compaction after the > configured time interval to remove the key._ > > _This is to track effort in KIP 354:_ > _https://cwiki.apache.org/confluence/display/KAFKA/KIP-354%3A+Time-based+log+compaction+policy_ -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7321) ensure timely processing of deletion requests in Kafka topic (Time-based log compaction)
[ https://issues.apache.org/jira/browse/KAFKA-7321?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16712085#comment-16712085 ] ASF GitHub Bot commented on KAFKA-7321: --- xiowu0 opened a new pull request #6009: KAFKA-7321: Add a Maximum Log Compaction Lag (KIP-354) URL: https://github.com/apache/kafka/pull/6009 Implement the change described in KIP-354 Added unit tests. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > ensure timely processing of deletion requests in Kafka topic (Time-based log > compaction) > > > Key: KAFKA-7321 > URL: https://issues.apache.org/jira/browse/KAFKA-7321 > Project: Kafka > Issue Type: Improvement > Components: log >Reporter: xiongqi wu >Assignee: xiongqi wu >Priority: Major > > _Compaction enables Kafka to remove old messages that are flagged for > deletion while other messages can be retained for a relatively longer time. > Today, a log segment may remain un-compacted for a long time since the > eligibility for log compaction is determined based on compaction ratio > (“min.cleanable.dirty.ratio”) and min compaction lag > ("min.compaction.lag.ms") setting. Ability to delete a log message through > compaction in a timely manner has become an important requirement in some use > cases (e.g., GDPR). For example, one use case is to delete PII (Personal > Identifiable information) data within 7 days while keeping non-PII > indefinitely in compacted format. The goal of this change is to provide a > time-based compaction policy that ensures the cleanable section is compacted > after the specified time interval regardless of dirty ratio and “min > compaction lag”. However, dirty ratio and “min compaction lag” are still > honored if the time based compaction rule is not violated. In other words, if > Kafka receives a deletion request on a key (e..g, a key with null value), the > corresponding log segment will be picked up for compaction after the > configured time interval to remove the key._ > > _This is to track effort in KIP 354:_ > _https://cwiki.apache.org/confluence/display/KAFKA/KIP-354%3A+Time-based+log+compaction+policy_ -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7321) ensure timely processing of deletion requests in Kafka topic (Time-based log compaction)
[ https://issues.apache.org/jira/browse/KAFKA-7321?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16712082#comment-16712082 ] ASF GitHub Bot commented on KAFKA-7321: --- xiowu0 closed pull request #5611: KAFKA-7321: Add a Maximum Log Compaction Lag (KIP-354) URL: https://github.com/apache/kafka/pull/5611 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java index 4410c971a16..927e1032e3c 100755 --- a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java @@ -113,6 +113,10 @@ "higher ratio will mean fewer, more efficient cleanings but will mean more wasted " + "space in the log."; +public static final String MAX_COMPACTION_LAG_MS_CONFIG = "max.compaction.lag.ms"; +public static final String MAX_COMPACTION_LAG_MS_DOC = "The maximum time a message will remain " + +"uncompacted in the log. Only applicable for logs that are being compacted."; + public static final String CLEANUP_POLICY_CONFIG = "cleanup.policy"; public static final String CLEANUP_POLICY_COMPACT = "compact"; public static final String CLEANUP_POLICY_DELETE = "delete"; diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index bc328d77efc..710419bb2c1 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -157,7 +157,7 @@ case class RollParams(maxSegmentMs: Long, object RollParams { def apply(config: LogConfig, appendInfo: LogAppendInfo, messagesSize: Int, now: Long): RollParams = { - new RollParams(config.segmentMs, + new RollParams(config.maxSegmentMs, config.segmentSize, appendInfo.maxTimestamp, appendInfo.lastOffset, @@ -1905,6 +1905,13 @@ class Log(@volatile var dir: File, } } + @threadsafe + private[log] def getFirstBatchTimestampForSegment(segment: LogSegment): Long = { +lock synchronized { + segment.getFirstBatchTimestamp() +} + } + /** * remove deleted log metrics */ diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 8449e39d581..43b954ac7e8 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -132,6 +132,12 @@ class LogCleaner(initialConfig: CleanerConfig, new Gauge[Int] { def value: Int = cleaners.map(_.lastStats).map(_.elapsedSecs).max.toInt }) + // a metric to track delay between the time when a log is required to be compacted + // as determined by max compaction lag and the time of last cleaner run. + newGauge("max-compaction-delay-secs", + new Gauge[Int] { + def value: Int = Math.max(0, (cleaners.map(_.lastPreCleanStats).map(_.maxCompactionDelayMs).max / 1000).toInt) + }) /** * Start the background cleaning @@ -285,6 +291,7 @@ class LogCleaner(initialConfig: CleanerConfig, checkDone = checkDone) @volatile var lastStats: CleanerStats = new CleanerStats() +@volatile var lastPreCleanStats: PreCleanStats = new PreCleanStats() private def checkDone(topicPartition: TopicPartition) { if (!isRunning) @@ -310,10 +317,12 @@ class LogCleaner(initialConfig: CleanerConfig, var currentLog: Option[Log] = None try { -val cleaned = cleanerManager.grabFilthiestCompactedLog(time) match { +val preCleanStats = new PreCleanStats() +val cleaned = cleanerManager.grabFilthiestCompactedLog(time, preCleanStats) match { case None => false case Some(cleanable) => +this.lastPreCleanStats = preCleanStats // there's a log, clean it currentLog = Some(cleanable.log) cleanLog(cleanable) @@ -930,6 +939,17 @@ private[log] class Cleaner(val id: Int, } } +/** + * A simple struct for collecting pre-clean stats + */ +private class PreCleanStats() { + var maxCompactionDelayMs = 0L + + def updateMaxCompactionDelay(delayMs: Long): Unit = { +maxCompactionDelayMs = Math.max(maxCompactionDelayMs, delayMs) + } +} + /** * A simple struct for collecting stats about log cleaning */ @@ -983,9 +1003,11 @@ private class CleanerStats(time: Time = Time.SYSTEM) { } /** - * Helper class for a log, its topic/partition, the first cleanable position, and the first uncleanable dirty position - */ -private case class LogToClean(topicPartition:
[jira] [Commented] (KAFKA-7321) ensure timely processing of deletion requests in Kafka topic (Time-based log compaction)
[ https://issues.apache.org/jira/browse/KAFKA-7321?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16603656#comment-16603656 ] ASF GitHub Bot commented on KAFKA-7321: --- xiowu0 opened a new pull request #5611: KAFKA-7321: time based log compaction (KIP-354) URL: https://github.com/apache/kafka/pull/5611 This is to implement KIP-354. More detailed information can be found in KIP-354 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > ensure timely processing of deletion requests in Kafka topic (Time-based log > compaction) > > > Key: KAFKA-7321 > URL: https://issues.apache.org/jira/browse/KAFKA-7321 > Project: Kafka > Issue Type: Improvement > Components: log >Reporter: xiongqi wu >Assignee: xiongqi wu >Priority: Major > > _Compaction enables Kafka to remove old messages that are flagged for > deletion while other messages can be retained for a relatively longer time. > Today, a log segment may remain un-compacted for a long time since the > eligibility for log compaction is determined based on compaction ratio > (“min.cleanable.dirty.ratio”) and min compaction lag > ("min.compaction.lag.ms") setting. Ability to delete a log message through > compaction in a timely manner has become an important requirement in some use > cases (e.g., GDPR). For example, one use case is to delete PII (Personal > Identifiable information) data within 7 days while keeping non-PII > indefinitely in compacted format. The goal of this change is to provide a > time-based compaction policy that ensures the cleanable section is compacted > after the specified time interval regardless of dirty ratio and “min > compaction lag”. However, dirty ratio and “min compaction lag” are still > honored if the time based compaction rule is not violated. In other words, if > Kafka receives a deletion request on a key (e..g, a key with null value), the > corresponding log segment will be picked up for compaction after the > configured time interval to remove the key._ > > _This is to track effort in KIP 354:_ > _https://cwiki.apache.org/confluence/display/KAFKA/KIP-354%3A+Time-based+log+compaction+policy_ -- This message was sent by Atlassian JIRA (v7.6.3#76005)