[jira] [Commented] (KAFKA-7321) ensure timely processing of deletion requests in Kafka topic (Time-based log compaction)

2019-05-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7321?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16838668#comment-16838668
 ] 

ASF GitHub Bot commented on KAFKA-7321:
---

jjkoshy commented on pull request #6009: KAFKA-7321: Add a Maximum Log 
Compaction Lag (KIP-354)
URL: https://github.com/apache/kafka/pull/6009
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> ensure timely processing of deletion requests in Kafka topic (Time-based log 
> compaction)
> 
>
> Key: KAFKA-7321
> URL: https://issues.apache.org/jira/browse/KAFKA-7321
> Project: Kafka
>  Issue Type: Improvement
>  Components: log
>Reporter: xiongqi wu
>Assignee: xiongqi wu
>Priority: Major
>
> _Compaction enables Kafka to remove old messages that are flagged for 
> deletion while other messages can be retained for a relatively longer time.  
> Today, a log segment may remain un-compacted for a long time since the 
> eligibility for log compaction is determined based on compaction ratio 
> (“min.cleanable.dirty.ratio”) and min compaction lag 
> ("min.compaction.lag.ms") setting.  Ability to delete a log message through 
> compaction in a timely manner has become an important requirement in some use 
> cases (e.g., GDPR).  For example,  one use case is to delete PII (Personal 
> Identifiable information) data within 7 days while keeping non-PII 
> indefinitely in compacted format.  The goal of this change is to provide a 
> time-based compaction policy that ensures the cleanable section is compacted 
> after the specified time interval regardless of dirty ratio and “min 
> compaction lag”.  However, dirty ratio and “min compaction lag” are still 
> honored if the time based compaction rule is not violated. In other words, if 
> Kafka receives a deletion request on a key (e..g, a key with null value), the 
> corresponding log segment will be picked up for compaction after the 
> configured time interval to remove the key._
>  
> _This is to track effort in KIP 354:_
> _https://cwiki.apache.org/confluence/display/KAFKA/KIP-354%3A+Time-based+log+compaction+policy_



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7321) ensure timely processing of deletion requests in Kafka topic (Time-based log compaction)

2018-12-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7321?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16712085#comment-16712085
 ] 

ASF GitHub Bot commented on KAFKA-7321:
---

xiowu0 opened a new pull request #6009: KAFKA-7321: Add a Maximum Log 
Compaction Lag (KIP-354)
URL: https://github.com/apache/kafka/pull/6009
 
 
   Implement the change described in KIP-354
   Added unit tests. 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> ensure timely processing of deletion requests in Kafka topic (Time-based log 
> compaction)
> 
>
> Key: KAFKA-7321
> URL: https://issues.apache.org/jira/browse/KAFKA-7321
> Project: Kafka
>  Issue Type: Improvement
>  Components: log
>Reporter: xiongqi wu
>Assignee: xiongqi wu
>Priority: Major
>
> _Compaction enables Kafka to remove old messages that are flagged for 
> deletion while other messages can be retained for a relatively longer time.  
> Today, a log segment may remain un-compacted for a long time since the 
> eligibility for log compaction is determined based on compaction ratio 
> (“min.cleanable.dirty.ratio”) and min compaction lag 
> ("min.compaction.lag.ms") setting.  Ability to delete a log message through 
> compaction in a timely manner has become an important requirement in some use 
> cases (e.g., GDPR).  For example,  one use case is to delete PII (Personal 
> Identifiable information) data within 7 days while keeping non-PII 
> indefinitely in compacted format.  The goal of this change is to provide a 
> time-based compaction policy that ensures the cleanable section is compacted 
> after the specified time interval regardless of dirty ratio and “min 
> compaction lag”.  However, dirty ratio and “min compaction lag” are still 
> honored if the time based compaction rule is not violated. In other words, if 
> Kafka receives a deletion request on a key (e..g, a key with null value), the 
> corresponding log segment will be picked up for compaction after the 
> configured time interval to remove the key._
>  
> _This is to track effort in KIP 354:_
> _https://cwiki.apache.org/confluence/display/KAFKA/KIP-354%3A+Time-based+log+compaction+policy_



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7321) ensure timely processing of deletion requests in Kafka topic (Time-based log compaction)

2018-12-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7321?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16712082#comment-16712082
 ] 

ASF GitHub Bot commented on KAFKA-7321:
---

xiowu0 closed pull request #5611: KAFKA-7321: Add a Maximum Log Compaction Lag 
(KIP-354)
URL: https://github.com/apache/kafka/pull/5611
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java 
b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
index 4410c971a16..927e1032e3c 100755
--- a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
@@ -113,6 +113,10 @@
 "higher ratio will mean fewer, more efficient cleanings but will mean 
more wasted " +
 "space in the log.";
 
+public static final String MAX_COMPACTION_LAG_MS_CONFIG = 
"max.compaction.lag.ms";
+public static final String MAX_COMPACTION_LAG_MS_DOC = "The maximum time a 
message will remain " +
+"uncompacted in the log. Only applicable for logs that are being 
compacted.";
+
 public static final String CLEANUP_POLICY_CONFIG = "cleanup.policy";
 public static final String CLEANUP_POLICY_COMPACT = "compact";
 public static final String CLEANUP_POLICY_DELETE = "delete";
diff --git a/core/src/main/scala/kafka/log/Log.scala 
b/core/src/main/scala/kafka/log/Log.scala
index bc328d77efc..710419bb2c1 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -157,7 +157,7 @@ case class RollParams(maxSegmentMs: Long,
 
 object RollParams {
   def apply(config: LogConfig, appendInfo: LogAppendInfo, messagesSize: Int, 
now: Long): RollParams = {
-   new RollParams(config.segmentMs,
+   new RollParams(config.maxSegmentMs,
  config.segmentSize,
  appendInfo.maxTimestamp,
  appendInfo.lastOffset,
@@ -1905,6 +1905,13 @@ class Log(@volatile var dir: File,
 }
   }
 
+  @threadsafe
+  private[log] def getFirstBatchTimestampForSegment(segment: LogSegment): Long 
= {
+lock synchronized {
+  segment.getFirstBatchTimestamp()
+}
+  }
+
   /**
* remove deleted log metrics
*/
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala 
b/core/src/main/scala/kafka/log/LogCleaner.scala
index 8449e39d581..43b954ac7e8 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -132,6 +132,12 @@ class LogCleaner(initialConfig: CleanerConfig,
new Gauge[Int] {
  def value: Int = 
cleaners.map(_.lastStats).map(_.elapsedSecs).max.toInt
})
+  // a metric to track delay between the time when a log is required to be 
compacted
+  // as determined by max compaction lag and the time of last cleaner run.
+  newGauge("max-compaction-delay-secs",
+  new Gauge[Int] {
+  def value: Int = Math.max(0, 
(cleaners.map(_.lastPreCleanStats).map(_.maxCompactionDelayMs).max / 
1000).toInt)
+  })
 
   /**
* Start the background cleaning
@@ -285,6 +291,7 @@ class LogCleaner(initialConfig: CleanerConfig,
   checkDone = checkDone)
 
 @volatile var lastStats: CleanerStats = new CleanerStats()
+@volatile var lastPreCleanStats: PreCleanStats = new PreCleanStats()
 
 private def checkDone(topicPartition: TopicPartition) {
   if (!isRunning)
@@ -310,10 +317,12 @@ class LogCleaner(initialConfig: CleanerConfig,
   var currentLog: Option[Log] = None
 
   try {
-val cleaned = cleanerManager.grabFilthiestCompactedLog(time) match {
+val preCleanStats = new PreCleanStats()
+val cleaned = cleanerManager.grabFilthiestCompactedLog(time, 
preCleanStats) match {
   case None =>
 false
   case Some(cleanable) =>
+this.lastPreCleanStats = preCleanStats
 // there's a log, clean it
 currentLog = Some(cleanable.log)
 cleanLog(cleanable)
@@ -930,6 +939,17 @@ private[log] class Cleaner(val id: Int,
   }
 }
 
+/**
+  * A simple struct for collecting pre-clean stats
+  */
+private class PreCleanStats() {
+  var maxCompactionDelayMs = 0L
+
+  def updateMaxCompactionDelay(delayMs: Long): Unit = {
+maxCompactionDelayMs = Math.max(maxCompactionDelayMs, delayMs)
+  }
+}
+
 /**
  * A simple struct for collecting stats about log cleaning
  */
@@ -983,9 +1003,11 @@ private class CleanerStats(time: Time = Time.SYSTEM) {
 }
 
 /**
- * Helper class for a log, its topic/partition, the first cleanable position, 
and the first uncleanable dirty position
- */
-private case class LogToClean(topicPartition: 

[jira] [Commented] (KAFKA-7321) ensure timely processing of deletion requests in Kafka topic (Time-based log compaction)

2018-09-04 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7321?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16603656#comment-16603656
 ] 

ASF GitHub Bot commented on KAFKA-7321:
---

xiowu0 opened a new pull request #5611: KAFKA-7321: time based log compaction 
(KIP-354)
URL: https://github.com/apache/kafka/pull/5611
 
 
   This is to implement KIP-354.
   More detailed information can be found in KIP-354
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> ensure timely processing of deletion requests in Kafka topic (Time-based log 
> compaction)
> 
>
> Key: KAFKA-7321
> URL: https://issues.apache.org/jira/browse/KAFKA-7321
> Project: Kafka
>  Issue Type: Improvement
>  Components: log
>Reporter: xiongqi wu
>Assignee: xiongqi wu
>Priority: Major
>
> _Compaction enables Kafka to remove old messages that are flagged for 
> deletion while other messages can be retained for a relatively longer time.  
> Today, a log segment may remain un-compacted for a long time since the 
> eligibility for log compaction is determined based on compaction ratio 
> (“min.cleanable.dirty.ratio”) and min compaction lag 
> ("min.compaction.lag.ms") setting.  Ability to delete a log message through 
> compaction in a timely manner has become an important requirement in some use 
> cases (e.g., GDPR).  For example,  one use case is to delete PII (Personal 
> Identifiable information) data within 7 days while keeping non-PII 
> indefinitely in compacted format.  The goal of this change is to provide a 
> time-based compaction policy that ensures the cleanable section is compacted 
> after the specified time interval regardless of dirty ratio and “min 
> compaction lag”.  However, dirty ratio and “min compaction lag” are still 
> honored if the time based compaction rule is not violated. In other words, if 
> Kafka receives a deletion request on a key (e..g, a key with null value), the 
> corresponding log segment will be picked up for compaction after the 
> configured time interval to remove the key._
>  
> _This is to track effort in KIP 354:_
> _https://cwiki.apache.org/confluence/display/KAFKA/KIP-354%3A+Time-based+log+compaction+policy_



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)