[ 
https://issues.apache.org/jira/browse/KAFKA-7321?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16712082#comment-16712082
 ] 

ASF GitHub Bot commented on KAFKA-7321:
---------------------------------------

xiowu0 closed pull request #5611: KAFKA-7321: Add a Maximum Log Compaction Lag 
(KIP-354)
URL: https://github.com/apache/kafka/pull/5611
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java 
b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
index 4410c971a16..927e1032e3c 100755
--- a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
@@ -113,6 +113,10 @@
         "higher ratio will mean fewer, more efficient cleanings but will mean 
more wasted " +
         "space in the log.";
 
+    public static final String MAX_COMPACTION_LAG_MS_CONFIG = 
"max.compaction.lag.ms";
+    public static final String MAX_COMPACTION_LAG_MS_DOC = "The maximum time a 
message will remain " +
+        "uncompacted in the log. Only applicable for logs that are being 
compacted.";
+
     public static final String CLEANUP_POLICY_CONFIG = "cleanup.policy";
     public static final String CLEANUP_POLICY_COMPACT = "compact";
     public static final String CLEANUP_POLICY_DELETE = "delete";
diff --git a/core/src/main/scala/kafka/log/Log.scala 
b/core/src/main/scala/kafka/log/Log.scala
index bc328d77efc..710419bb2c1 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -157,7 +157,7 @@ case class RollParams(maxSegmentMs: Long,
 
 object RollParams {
   def apply(config: LogConfig, appendInfo: LogAppendInfo, messagesSize: Int, 
now: Long): RollParams = {
-   new RollParams(config.segmentMs,
+   new RollParams(config.maxSegmentMs,
      config.segmentSize,
      appendInfo.maxTimestamp,
      appendInfo.lastOffset,
@@ -1905,6 +1905,13 @@ class Log(@volatile var dir: File,
     }
   }
 
+  @threadsafe
+  private[log] def getFirstBatchTimestampForSegment(segment: LogSegment): Long 
= {
+    lock synchronized {
+      segment.getFirstBatchTimestamp()
+    }
+  }
+
   /**
    * remove deleted log metrics
    */
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala 
b/core/src/main/scala/kafka/log/LogCleaner.scala
index 8449e39d581..43b954ac7e8 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -132,6 +132,12 @@ class LogCleaner(initialConfig: CleanerConfig,
            new Gauge[Int] {
              def value: Int = 
cleaners.map(_.lastStats).map(_.elapsedSecs).max.toInt
            })
+  // a metric to track delay between the time when a log is required to be 
compacted
+  // as determined by max compaction lag and the time of last cleaner run.
+  newGauge("max-compaction-delay-secs",
+          new Gauge[Int] {
+          def value: Int = Math.max(0, 
(cleaners.map(_.lastPreCleanStats).map(_.maxCompactionDelayMs).max / 
1000).toInt)
+          })
 
   /**
    * Start the background cleaning
@@ -285,6 +291,7 @@ class LogCleaner(initialConfig: CleanerConfig,
                               checkDone = checkDone)
 
     @volatile var lastStats: CleanerStats = new CleanerStats()
+    @volatile var lastPreCleanStats: PreCleanStats = new PreCleanStats()
 
     private def checkDone(topicPartition: TopicPartition) {
       if (!isRunning)
@@ -310,10 +317,12 @@ class LogCleaner(initialConfig: CleanerConfig,
       var currentLog: Option[Log] = None
 
       try {
-        val cleaned = cleanerManager.grabFilthiestCompactedLog(time) match {
+        val preCleanStats = new PreCleanStats()
+        val cleaned = cleanerManager.grabFilthiestCompactedLog(time, 
preCleanStats) match {
           case None =>
             false
           case Some(cleanable) =>
+            this.lastPreCleanStats = preCleanStats
             // there's a log, clean it
             currentLog = Some(cleanable.log)
             cleanLog(cleanable)
@@ -930,6 +939,17 @@ private[log] class Cleaner(val id: Int,
   }
 }
 
+/**
+  * A simple struct for collecting pre-clean stats
+  */
+private class PreCleanStats() {
+  var maxCompactionDelayMs = 0L
+
+  def updateMaxCompactionDelay(delayMs: Long): Unit = {
+    maxCompactionDelayMs = Math.max(maxCompactionDelayMs, delayMs)
+  }
+}
+
 /**
  * A simple struct for collecting stats about log cleaning
  */
@@ -983,9 +1003,11 @@ private class CleanerStats(time: Time = Time.SYSTEM) {
 }
 
 /**
- * Helper class for a log, its topic/partition, the first cleanable position, 
and the first uncleanable dirty position
- */
-private case class LogToClean(topicPartition: TopicPartition, log: Log, 
firstDirtyOffset: Long, uncleanableOffset: Long) extends Ordered[LogToClean] {
+  * Helper class for a log, its topic/partition, the first cleanable position, 
the first uncleanable dirty position,
+  * and whether it needs compaction immediately.
+  */
+private case class LogToClean(topicPartition: TopicPartition, log: Log, 
firstDirtyOffset: Long,
+                              uncleanableOffset: Long, needCompactionNow: 
Boolean = false) extends Ordered[LogToClean] {
   val cleanBytes = log.logSegments(-1, firstDirtyOffset).map(_.size.toLong).sum
   val (firstUncleanableOffset, cleanableBytes) = 
LogCleaner.calculateCleanableBytes(log, firstDirtyOffset, uncleanableOffset)
   val totalBytes = cleanBytes + cleanableBytes
diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala 
b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index 2fc7b749852..7bb5cdfe540 100755
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -168,7 +168,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
     * each time from the full set of logs to allow logs to be dynamically 
added to the pool of logs
     * the log manager maintains.
     */
-  def grabFilthiestCompactedLog(time: Time): Option[LogToClean] = {
+  def grabFilthiestCompactedLog(time: Time, preCleanStats: PreCleanStats = new 
PreCleanStats()): Option[LogToClean] = {
     inLock(lock) {
       val now = time.milliseconds
       this.timeOfLastRun = now
@@ -181,14 +181,21 @@ private[log] class LogCleanerManager(val logDirs: 
Seq[File],
           inProgress.contains(topicPartition) || isUncleanablePartition(log, 
topicPartition)
       }.map {
         case (topicPartition, log) => // create a LogToClean instance for each
-          val (firstDirtyOffset, firstUncleanableDirtyOffset) = 
LogCleanerManager.cleanableOffsets(log, topicPartition,
-            lastClean, now)
-          LogToClean(topicPartition, log, firstDirtyOffset, 
firstUncleanableDirtyOffset)
+          val (firstDirtyOffset, firstUncleanableDirtyOffset) =
+            LogCleanerManager.cleanableOffsets(log, topicPartition, lastClean, 
now)
+
+          val firstDirtySegmentCreateTime = 
LogCleanerManager.getFirstDirtySegmentEstimatedCreateTime(log, firstDirtyOffset)
+          val compactionDelayMs = LogCleanerManager.getCompactionDelay(log, 
firstDirtySegmentCreateTime, now)
+          preCleanStats.updateMaxCompactionDelay(compactionDelayMs)
+
+          LogToClean(topicPartition, log, firstDirtyOffset, 
firstUncleanableDirtyOffset, compactionDelayMs > 0)
       }.filter(ltc => ltc.totalBytes > 0) // skip any empty logs
 
       this.dirtiestLogCleanableRatio = if (dirtyLogs.nonEmpty) 
dirtyLogs.max.cleanableRatio else 0
-      // and must meet the minimum threshold for dirty byte ratio
-      val cleanableLogs = dirtyLogs.filter(ltc => ltc.cleanableRatio > 
ltc.log.config.minCleanableRatio)
+      // and must meet the minimum threshold for dirty byte ratio or have some 
bytes required to be compacted
+      val cleanableLogs = dirtyLogs.filter { ltc =>
+        (ltc.needCompactionNow && ltc.cleanableBytes > 0) || 
ltc.cleanableRatio > ltc.log.config.minCleanableRatio
+      }
       if(cleanableLogs.isEmpty) {
         None
       } else {
@@ -479,6 +486,41 @@ private[log] object LogCleanerManager extends Logging {
     log.config.compact && log.config.delete
   }
 
+  /**
+    * return first dirty segment estimated createTime
+    */
+  def getFirstDirtySegmentEstimatedCreateTime(log: Log, firstDirtyOffset: 
Long) : Long = {
+    val largestCleanTimestamp = log.logSegments(-1, 
firstDirtyOffset).lastOption.map(_.largestTimestamp).getOrElse(0L)
+    val dirtyNonActiveSegments = log.logSegments(firstDirtyOffset, 
log.activeSegment.baseOffset)
+
+    val firstDirtySegmentCreateTime: Long = dirtyNonActiveSegments.headOption 
match {
+      case None => -1
+      case Some(segment) =>
+        if (largestCleanTimestamp > 0)
+          largestCleanTimestamp
+        else {
+          val firstBatchTimestamp = 
log.getFirstBatchTimestampForSegment(segment)
+          // if timestamp of the first batch is unavailable,
+          // use largestTimestamp instead
+          if (firstBatchTimestamp > 0) firstBatchTimestamp
+          else segment.largestTimestamp
+        }
+    }
+    firstDirtySegmentCreateTime
+  }
+
+  /**
+    * get delay between the time when log is required to be compacted as 
determined
+    * by maxCompactionLagMs and the current time.
+    */
+  def getCompactionDelay(log: Log, firstDirtySegmentCreateTime: Long, now: 
Long) : Long = {
+    val maxCompactionLagMs = math.max(log.config.maxCompactionLagMs, 0L)
+    val cleanUtilTime = now - maxCompactionLagMs
+    if (firstDirtySegmentCreateTime < cleanUtilTime && 
firstDirtySegmentCreateTime > 0)
+      cleanUtilTime - firstDirtySegmentCreateTime
+    else
+      0L
+  }
 
   /**
     * Returns the range of dirty offsets that can be cleaned.
@@ -508,7 +550,7 @@ private[log] object LogCleanerManager extends Logging {
       }
     }
 
-    val compactionLagMs = math.max(log.config.compactionLagMs, 0L)
+    val minCompactionLagMs = math.max(log.config.compactionLagMs, 0L)
 
     // find first segment that cannot be cleaned
     // neither the active segment, nor segments with any messages closer to 
the head of the log than the minimum compaction lag time
@@ -522,12 +564,12 @@ private[log] object LogCleanerManager extends Logging {
       Option(log.activeSegment.baseOffset),
 
       // the first segment whose largest message timestamp is within a minimum 
time lag from now
-      if (compactionLagMs > 0) {
+      if (minCompactionLagMs > 0) {
         // dirty log segments
         val dirtyNonActiveSegments = log.logSegments(firstDirtyOffset, 
log.activeSegment.baseOffset)
         dirtyNonActiveSegments.find { s =>
-          val isUncleanable = s.largestTimestamp > now - compactionLagMs
-          debug(s"Checking if log segment may be cleaned: log='${log.name}' 
segment.baseOffset=${s.baseOffset} 
segment.largestTimestamp=${s.largestTimestamp}; now - compactionLag=${now - 
compactionLagMs}; is uncleanable=$isUncleanable")
+          val isUncleanable = s.largestTimestamp > now - minCompactionLagMs
+          debug(s"Checking if log segment may be cleaned: log='${log.name}' 
segment.baseOffset=${s.baseOffset} 
segment.largestTimestamp=${s.largestTimestamp}; now - compactionLag=${now - 
minCompactionLagMs}; is uncleanable=$isUncleanable")
           isUncleanable
         }.map(_.baseOffset)
       } else None
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala 
b/core/src/main/scala/kafka/log/LogConfig.scala
index d872e09ed79..5f96dc81c91 100755
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -46,6 +46,7 @@ object Defaults {
   val FileDeleteDelayMs = kafka.server.Defaults.LogDeleteDelayMs
   val DeleteRetentionMs = kafka.server.Defaults.LogCleanerDeleteRetentionMs
   val MinCompactionLagMs = kafka.server.Defaults.LogCleanerMinCompactionLagMs
+  val MaxCompactionLagMs = kafka.server.Defaults.LogCleanerMaxCompactionLagMs
   val MinCleanableDirtyRatio = kafka.server.Defaults.LogCleanerMinCleanRatio
 
   @deprecated(message = "This is a misleading variable name as it actually 
refers to the 'delete' cleanup policy. Use " +
@@ -85,6 +86,7 @@ case class LogConfig(props: java.util.Map[_, _], 
overriddenConfigs: Set[String]
   val fileDeleteDelayMs = getLong(LogConfig.FileDeleteDelayMsProp)
   val deleteRetentionMs = getLong(LogConfig.DeleteRetentionMsProp)
   val compactionLagMs = getLong(LogConfig.MinCompactionLagMsProp)
+  val maxCompactionLagMs = getLong(LogConfig.MaxCompactionLagMsProp)
   val minCleanableRatio = getDouble(LogConfig.MinCleanableDirtyRatioProp)
   val compact = 
getList(LogConfig.CleanupPolicyProp).asScala.map(_.toLowerCase(Locale.ROOT)).contains(LogConfig.Compact)
   val delete = 
getList(LogConfig.CleanupPolicyProp).asScala.map(_.toLowerCase(Locale.ROOT)).contains(LogConfig.Delete)
@@ -101,6 +103,11 @@ case class LogConfig(props: java.util.Map[_, _], 
overriddenConfigs: Set[String]
 
   def randomSegmentJitter: Long =
     if (segmentJitterMs == 0) 0 else Utils.abs(scala.util.Random.nextInt()) % 
math.min(segmentJitterMs, segmentMs)
+
+  def maxSegmentMs :Long = {
+    if (compact && maxCompactionLagMs > 0) math.min(maxCompactionLagMs, 
segmentMs)
+    else segmentMs
+  }
 }
 
 object LogConfig {
@@ -121,6 +128,7 @@ object LogConfig {
   val IndexIntervalBytesProp = TopicConfig.INDEX_INTERVAL_BYTES_CONFIG
   val DeleteRetentionMsProp = TopicConfig.DELETE_RETENTION_MS_CONFIG
   val MinCompactionLagMsProp = TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG
+  val MaxCompactionLagMsProp = TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG
   val FileDeleteDelayMsProp = TopicConfig.FILE_DELETE_DELAY_MS_CONFIG
   val MinCleanableDirtyRatioProp = TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG
   val CleanupPolicyProp = TopicConfig.CLEANUP_POLICY_CONFIG
@@ -152,6 +160,7 @@ object LogConfig {
   val FileDeleteDelayMsDoc = TopicConfig.FILE_DELETE_DELAY_MS_DOC
   val DeleteRetentionMsDoc = TopicConfig.DELETE_RETENTION_MS_DOC
   val MinCompactionLagMsDoc = TopicConfig.MIN_COMPACTION_LAG_MS_DOC
+  val MaxCompactionLagMsDoc = TopicConfig.MAX_COMPACTION_LAG_MS_DOC
   val MinCleanableRatioDoc = TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_DOC
   val CompactDoc = TopicConfig.CLEANUP_POLICY_DOC
   val UncleanLeaderElectionEnableDoc = 
TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_DOC
@@ -242,6 +251,8 @@ object LogConfig {
         DeleteRetentionMsDoc, KafkaConfig.LogCleanerDeleteRetentionMsProp)
       .define(MinCompactionLagMsProp, LONG, Defaults.MinCompactionLagMs, 
atLeast(0), MEDIUM, MinCompactionLagMsDoc,
         KafkaConfig.LogCleanerMinCompactionLagMsProp)
+      .define(MaxCompactionLagMsProp, LONG, Defaults.MaxCompactionLagMs, 
atLeast(1), MEDIUM, MaxCompactionLagMsDoc,
+        KafkaConfig.LogCleanerMaxCompactionLagMsProp)
       .define(FileDeleteDelayMsProp, LONG, Defaults.FileDeleteDelayMs, 
atLeast(0), MEDIUM, FileDeleteDelayMsDoc,
         KafkaConfig.LogDeleteDelayMsProp)
       .define(MinCleanableDirtyRatioProp, DOUBLE, 
Defaults.MinCleanableDirtyRatio, between(0, 1), MEDIUM,
@@ -297,12 +308,21 @@ object LogConfig {
         throw new InvalidConfigurationException(s"Unknown topic config name: 
$name")
   }
 
+  def crossValidateCheck(props: java.util.Map[_, _]): Unit = {
+    val minCompactionLag =  
props.get(MinCompactionLagMsProp).asInstanceOf[Long]
+    val maxCompactionLag =  
props.get(MaxCompactionLagMsProp).asInstanceOf[Long]
+    if (minCompactionLag > maxCompactionLag) {
+      throw new InvalidConfigurationException(s"conflict topic config setting 
$MinCompactionLagMsProp " +
+        s"($minCompactionLag) > $MaxCompactionLagMsProp ($maxCompactionLag)")
+    }
+  }
   /**
    * Check that the given properties contain only valid log config names and 
that all values can be parsed and are valid
    */
   def validate(props: Properties) {
     validateNames(props)
-    configDef.parse(props)
+    val valueMaps = configDef.parse(props)
+    crossValidateCheck(valueMaps)
   }
 
   /**
@@ -322,6 +342,7 @@ object LogConfig {
     IndexIntervalBytesProp -> KafkaConfig.LogIndexIntervalBytesProp,
     DeleteRetentionMsProp -> KafkaConfig.LogCleanerDeleteRetentionMsProp,
     MinCompactionLagMsProp -> KafkaConfig.LogCleanerMinCompactionLagMsProp,
+    MaxCompactionLagMsProp -> KafkaConfig.LogCleanerMaxCompactionLagMsProp,
     FileDeleteDelayMsProp -> KafkaConfig.LogDeleteDelayMsProp,
     MinCleanableDirtyRatioProp -> KafkaConfig.LogCleanerMinCleanRatioProp,
     CleanupPolicyProp -> KafkaConfig.LogCleanupPolicyProp,
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala 
b/core/src/main/scala/kafka/log/LogSegment.scala
index d910a29100c..ebc7521c6c9 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -91,7 +91,7 @@ class LogSegment private[log] (val log: FileRecords,
   private var bytesSinceLastIndexEntry = 0
 
   /* The timestamp we used for time based log rolling */
-  private var rollingBasedTimestamp: Option[Long] = None
+  @volatile private var rollingBasedTimestamp: Option[Long] = None
 
   /* The maximum timestamp we see so far */
   @volatile private var maxTimestampSoFar: Long = timeIndex.lastEntry.timestamp
@@ -503,6 +503,18 @@ class LogSegment private[log] (val log: FileRecords,
     log.trim()
   }
 
+  /**
+    * If not previously loaded,
+    * load the timestamp of the first message into memory.
+    */
+  private def mayLoadFirstBatchTimestamp(): Unit = {
+    if (rollingBasedTimestamp.isEmpty) {
+      val iter = log.batches.iterator()
+      if (iter.hasNext)
+        rollingBasedTimestamp = Some(iter.next().maxTimestamp)
+    }
+  }
+
   /**
    * The time this segment has waited to be rolled.
    * If the first message batch has a timestamp we use its timestamp to 
determine when to roll a segment. A segment
@@ -514,17 +526,24 @@ class LogSegment private[log] (val log: FileRecords,
    */
   def timeWaitedForRoll(now: Long, messageTimestamp: Long) : Long = {
     // Load the timestamp of the first message into memory
-    if (rollingBasedTimestamp.isEmpty) {
-      val iter = log.batches.iterator()
-      if (iter.hasNext)
-        rollingBasedTimestamp = Some(iter.next().maxTimestamp)
-    }
+    mayLoadFirstBatchTimestamp()
     rollingBasedTimestamp match {
       case Some(t) if t >= 0 => messageTimestamp - t
       case _ => now - created
     }
   }
 
+  /**
+    * @return the first batch timestamp if the timestamp is available. 
Otherwise return 0
+    */
+  def getFirstBatchTimestamp() : Long = {
+    mayLoadFirstBatchTimestamp()
+    rollingBasedTimestamp match {
+      case Some(t) if t >= 0 => t
+      case _ => 0L
+    }
+  }
+
   /**
    * Search the message offset based on timestamp and offset.
    *
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 13f555a2059..f63d3521b36 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -102,6 +102,7 @@ object Defaults {
   val LogCleanerEnable = true
   val LogCleanerDeleteRetentionMs = 24 * 60 * 60 * 1000L
   val LogCleanerMinCompactionLagMs = 0L
+  val LogCleanerMaxCompactionLagMs = Long.MaxValue
   val LogIndexSizeMaxBytes = 10 * 1024 * 1024
   val LogIndexIntervalBytes = 4096
   val LogFlushIntervalMessages = Long.MaxValue
@@ -324,6 +325,7 @@ object KafkaConfig {
   val LogCleanerEnableProp = "log.cleaner.enable"
   val LogCleanerDeleteRetentionMsProp = "log.cleaner.delete.retention.ms"
   val LogCleanerMinCompactionLagMsProp = "log.cleaner.min.compaction.lag.ms"
+  val LogCleanerMaxCompactionLagMsProp = "log.cleaner.max.compaction.lag.ms"
   val LogIndexSizeMaxBytesProp = "log.index.size.max.bytes"
   val LogIndexIntervalBytesProp = "log.index.interval.bytes"
   val LogFlushIntervalMessagesProp = "log.flush.interval.messages"
@@ -587,6 +589,7 @@ object KafkaConfig {
   val LogCleanerEnableDoc = "Enable the log cleaner process to run on the 
server. Should be enabled if using any topics with a cleanup.policy=compact 
including the internal offsets topic. If disabled those topics will not be 
compacted and continually grow in size."
   val LogCleanerDeleteRetentionMsDoc = "How long are delete records retained?"
   val LogCleanerMinCompactionLagMsDoc = "The minimum time a message will 
remain uncompacted in the log. Only applicable for logs that are being 
compacted."
+  val LogCleanerMaxCompactionLagMsDoc = "The maximum time a message will 
remain uncompacted in the log. Only applicable for logs that are being 
compacted."
   val LogIndexSizeMaxBytesDoc = "The maximum size in bytes of the offset index"
   val LogIndexIntervalBytesDoc = "The interval with which we add an entry to 
the offset index"
   val LogFlushIntervalMessagesDoc = "The number of messages accumulated on a 
log partition before messages are flushed to disk "
@@ -886,6 +889,7 @@ object KafkaConfig {
       .define(LogCleanerEnableProp, BOOLEAN, Defaults.LogCleanerEnable, 
MEDIUM, LogCleanerEnableDoc)
       .define(LogCleanerDeleteRetentionMsProp, LONG, 
Defaults.LogCleanerDeleteRetentionMs, MEDIUM, LogCleanerDeleteRetentionMsDoc)
       .define(LogCleanerMinCompactionLagMsProp, LONG, 
Defaults.LogCleanerMinCompactionLagMs, MEDIUM, LogCleanerMinCompactionLagMsDoc)
+      .define(LogCleanerMaxCompactionLagMsProp, LONG, 
Defaults.LogCleanerMaxCompactionLagMs, MEDIUM, LogCleanerMaxCompactionLagMsDoc)
       .define(LogIndexSizeMaxBytesProp, INT, Defaults.LogIndexSizeMaxBytes, 
atLeast(4), MEDIUM, LogIndexSizeMaxBytesDoc)
       .define(LogIndexIntervalBytesProp, INT, Defaults.LogIndexIntervalBytes, 
atLeast(0), MEDIUM, LogIndexIntervalBytesDoc)
       .define(LogFlushIntervalMessagesProp, LONG, 
Defaults.LogFlushIntervalMessages, atLeast(1), HIGH, 
LogFlushIntervalMessagesDoc)
@@ -1169,6 +1173,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: 
Boolean, dynamicConfigO
   val logCleanerIoMaxBytesPerSecond = 
getDouble(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp)
   def logCleanerDeleteRetentionMs = 
getLong(KafkaConfig.LogCleanerDeleteRetentionMsProp)
   def logCleanerMinCompactionLagMs = 
getLong(KafkaConfig.LogCleanerMinCompactionLagMsProp)
+  def logCleanerMaxCompactionLagMs = 
getLong(KafkaConfig.LogCleanerMaxCompactionLagMsProp)
   val logCleanerBackoffMs = getLong(KafkaConfig.LogCleanerBackoffMsProp)
   def logCleanerMinCleanRatio = 
getDouble(KafkaConfig.LogCleanerMinCleanRatioProp)
   val logCleanerEnable = getBoolean(KafkaConfig.LogCleanerEnableProp)
@@ -1445,5 +1450,9 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: 
Boolean, dynamicConfigO
         
s"${KafkaConfig.FailedAuthenticationDelayMsProp}=$failedAuthenticationDelayMs 
should always be less than" +
           s" ${KafkaConfig.ConnectionsMaxIdleMsProp}=$connectionsMaxIdleMs to 
prevent failed" +
           " authentication responses from timing out")
+
+    require(logCleanerMinCompactionLagMs >= 0 && logCleanerMaxCompactionLagMs 
> 0 &&
+      logCleanerMinCompactionLagMs <= logCleanerMaxCompactionLagMs,
+      s"${KafkaConfig.LogCleanerMinCompactionLagMsProp} must be less than or 
equal to ${KafkaConfig.LogCleanerMaxCompactionLagMsProp} and non-negative" )
   }
 }
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
index 5f9b3612c4f..608907ccf60 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -69,6 +69,7 @@ object KafkaServer {
     logProps.put(LogConfig.IndexIntervalBytesProp, 
kafkaConfig.logIndexIntervalBytes)
     logProps.put(LogConfig.DeleteRetentionMsProp, 
kafkaConfig.logCleanerDeleteRetentionMs)
     logProps.put(LogConfig.MinCompactionLagMsProp, 
kafkaConfig.logCleanerMinCompactionLagMs)
+    logProps.put(LogConfig.MaxCompactionLagMsProp, 
kafkaConfig.logCleanerMaxCompactionLagMs)
     logProps.put(LogConfig.FileDeleteDelayMsProp, kafkaConfig.logDeleteDelayMs)
     logProps.put(LogConfig.MinCleanableDirtyRatioProp, 
kafkaConfig.logCleanerMinCleanRatio)
     logProps.put(LogConfig.CleanupPolicyProp, kafkaConfig.logCleanupPolicy)
diff --git 
a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala 
b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
index cc35f1d9ff6..9f99fbdb801 100644
--- a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
@@ -40,9 +40,10 @@ abstract class AbstractLogCleanerIntegrationTest {
   private val logs = ListBuffer.empty[Log]
   private val defaultMaxMessageSize = 128
   private val defaultMinCleanableDirtyRatio = 0.0F
-  private val defaultCompactionLag = 0L
+  private val defaultMinCompactionLagMS = 0L
   private val defaultDeleteDelay = 1000
   private val defaultSegmentSize = 2048
+  private val defaultMaxCompactionLagMs = Long.MaxValue
 
   def time: MockTime
 
@@ -58,9 +59,10 @@ abstract class AbstractLogCleanerIntegrationTest {
   def logConfigProperties(propertyOverrides: Properties = new Properties(),
                           maxMessageSize: Int,
                           minCleanableDirtyRatio: Float = 
defaultMinCleanableDirtyRatio,
-                          compactionLag: Long = defaultCompactionLag,
+                          minCompactionLagMs: Long = defaultMinCompactionLagMS,
                           deleteDelay: Int = defaultDeleteDelay,
-                          segmentSize: Int = defaultSegmentSize): Properties = 
{
+                          segmentSize: Int = defaultSegmentSize,
+                          maxCompactionLagMs: Long = 
defaultMaxCompactionLagMs): Properties = {
     val props = new Properties()
     props.put(LogConfig.MaxMessageBytesProp, maxMessageSize: java.lang.Integer)
     props.put(LogConfig.SegmentBytesProp, segmentSize: java.lang.Integer)
@@ -69,7 +71,8 @@ abstract class AbstractLogCleanerIntegrationTest {
     props.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
     props.put(LogConfig.MinCleanableDirtyRatioProp, minCleanableDirtyRatio: 
java.lang.Float)
     props.put(LogConfig.MessageTimestampDifferenceMaxMsProp, 
Long.MaxValue.toString)
-    props.put(LogConfig.MinCompactionLagMsProp, compactionLag: java.lang.Long)
+    props.put(LogConfig.MinCompactionLagMsProp, minCompactionLagMs: 
java.lang.Long)
+    props.put(LogConfig.MaxCompactionLagMsProp, maxCompactionLagMs: 
java.lang.Long)
     props ++= propertyOverrides
     props
   }
@@ -79,9 +82,10 @@ abstract class AbstractLogCleanerIntegrationTest {
                   numThreads: Int = 1,
                   backOffMs: Long = 15000L,
                   maxMessageSize: Int = defaultMaxMessageSize,
-                  compactionLag: Long = defaultCompactionLag,
+                  minCompactionLagMs: Long = defaultMinCompactionLagMS,
                   deleteDelay: Int = defaultDeleteDelay,
                   segmentSize: Int = defaultSegmentSize,
+                  maxCompactionLagMs: Long = defaultMaxCompactionLagMs,
                   cleanerIoBufferSize: Option[Int] = None,
                   propertyOverrides: Properties = new Properties()): 
LogCleaner = {
 
@@ -93,9 +97,10 @@ abstract class AbstractLogCleanerIntegrationTest {
       val logConfig = LogConfig(logConfigProperties(propertyOverrides,
         maxMessageSize = maxMessageSize,
         minCleanableDirtyRatio = minCleanableDirtyRatio,
-        compactionLag = compactionLag,
+        minCompactionLagMs = minCompactionLagMs,
         deleteDelay = deleteDelay,
-        segmentSize = segmentSize))
+        segmentSize = segmentSize,
+        maxCompactionLagMs = maxCompactionLagMs))
       val log = Log(dir,
         logConfig,
         logStartOffset = 0L,
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala 
b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
index bfee811167c..2d342facf52 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
@@ -25,9 +25,10 @@ import kafka.utils.{MockTime, TestUtils}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS
 import org.apache.kafka.common.record.{CompressionType, RecordBatch}
-import org.junit.Assert.{assertFalse, assertTrue, fail}
+import org.junit.Assert._
 import org.junit.Test
 
+import scala.collection.{Iterable, JavaConverters, Seq}
 import scala.collection.JavaConverters.mapAsScalaMapConverter
 
 /**
@@ -93,4 +94,95 @@ class LogCleanerIntegrationTest extends 
AbstractLogCleanerIntegrationTest {
     assertTrue(uncleanablePartitions.contains(topicPartitions(1)))
     assertFalse(uncleanablePartitions.contains(topicPartitions(2)))
   }
+
+  @Test
+  def testMaxLogCompactionLag(): Unit = {
+    val msPerHour = 60 * 60 * 1000
+
+    val minCompactionLagMs = 1 * msPerHour
+    val maxCompactionLagMs = 6 * msPerHour
+
+    val cleanerBackOffMs = 200L
+    val segmentSize = 512
+    val topicPartitions = Array(new TopicPartition("log", 0), new 
TopicPartition("log", 1), new TopicPartition("log", 2))
+    val minCleanableDirtyRatio = 1.0F
+
+    cleaner = makeCleaner(partitions = topicPartitions,
+      backOffMs = cleanerBackOffMs,
+      minCompactionLagMs = minCompactionLagMs,
+      segmentSize = segmentSize,
+      maxCompactionLagMs= maxCompactionLagMs,
+      minCleanableDirtyRatio = minCleanableDirtyRatio)
+    val log = cleaner.logs.get(topicPartitions(0))
+
+    val T0 = time.milliseconds
+    writeKeyDups(numKeys = 100, numDups = 3, log, CompressionType.NONE, 
timestamp = T0, startValue = 0, step = 1)
+
+    val startSizeBlock0 = log.size
+
+    val activeSegAtT0 = log.activeSegment
+
+    cleaner.startup()
+
+    // advance to a time still less than maxCompactionLagMs from start
+    time.sleep(maxCompactionLagMs/2)
+    Thread.sleep(5 * cleanerBackOffMs) // give cleaning thread a chance to 
_not_ clean
+    assertEquals("There should be no cleaning until the max compaction lag has 
passed", startSizeBlock0, log.size)
+
+    // advance to time a bit more than one maxCompactionLagMs from start
+    time.sleep(maxCompactionLagMs/2 + 1)
+    val T1 = time.milliseconds
+
+    // write the second block of data: all zero keys
+    val appends1 = writeKeyDups(numKeys = 100, numDups = 1, log, 
CompressionType.NONE, timestamp = T1, startValue = 0, step = 0)
+
+    // roll the active segment
+    log.roll()
+    val activeSegAtT1 = log.activeSegment
+    val firstBlockCleanableSegmentOffset = activeSegAtT0.baseOffset
+
+    // the first block should get cleaned
+    cleaner.awaitCleaned(new TopicPartition("log", 0), 
firstBlockCleanableSegmentOffset)
+
+    val read1 = readFromLog(log)
+    val lastCleaned = cleaner.cleanerManager.allCleanerCheckpoints(new 
TopicPartition("log", 0))
+    assertTrue(s"log cleaner should have processed at least to offset 
$firstBlockCleanableSegmentOffset, " +
+      s"but lastCleaned=$lastCleaned", lastCleaned >= 
firstBlockCleanableSegmentOffset)
+
+    //minCleanableDirtyRatio  will prevent second block of data from compacting
+    assertNotEquals(s"log should still contain non-zero keys", appends1, read1)
+
+    time.sleep(maxCompactionLagMs + 1)
+    // the second block should get cleaned. only zero keys left
+    cleaner.awaitCleaned(new TopicPartition("log", 0), 
activeSegAtT1.baseOffset)
+
+    val read2 = readFromLog(log)
+
+    assertEquals(s"log should only contains zero keys now", appends1, read2)
+
+    val lastCleaned2 = cleaner.cleanerManager.allCleanerCheckpoints(new 
TopicPartition("log", 0))
+    val secondBlockCleanableSegmentOffset = activeSegAtT1.baseOffset
+    assertTrue(s"log cleaner should have processed at least to offset 
$secondBlockCleanableSegmentOffset, " +
+      s"but lastCleaned=$lastCleaned2", lastCleaned2 >= 
secondBlockCleanableSegmentOffset)
+  }
+
+  private def readFromLog(log: Log): Iterable[(Int, Int)] = {
+    import JavaConverters._
+    for (segment <- log.logSegments; record <- segment.log.records.asScala) 
yield {
+      val key = TestUtils.readString(record.key).toInt
+      val value = TestUtils.readString(record.value).toInt
+      key -> value
+    }
+  }
+
+  private def writeKeyDups(numKeys: Int, numDups: Int, log: Log, codec: 
CompressionType, timestamp: Long, startValue: Int, step: Int): Seq[(Int, Int)] 
= {
+    var valCounter = startValue
+    for (_ <- 0 until numDups; key <- 0 until numKeys) yield {
+      val curValue = valCounter
+      log.appendAsLeader(TestUtils.singletonRecords(value = 
curValue.toString.getBytes, codec = codec,
+        key = key.toString.getBytes, timestamp = timestamp), leaderEpoch = 0)
+      valCounter += step
+      (key, curValue)
+    }
+  }
 }
diff --git 
a/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala 
b/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
index 6e8c9b9d336..0232e5772d6 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
@@ -35,8 +35,8 @@ import scala.collection._
 class LogCleanerLagIntegrationTest(compressionCodecName: String) extends 
AbstractLogCleanerIntegrationTest with Logging {
   val msPerHour = 60 * 60 * 1000
 
-  val compactionLag = 1 * msPerHour
-  assertTrue("compactionLag must be divisible by 2 for this test", 
compactionLag % 2 == 0)
+  val minCompactionLag = 1 * msPerHour
+  assertTrue("compactionLag must be divisible by 2 for this test", 
minCompactionLag % 2 == 0)
 
   val time = new MockTime(1400000000000L, 1000L)  // Tue May 13 16:53:20 UTC 
2014 for `currentTimeMs`
   val cleanerBackOffMs = 200L
@@ -50,7 +50,7 @@ class LogCleanerLagIntegrationTest(compressionCodecName: 
String) extends Abstrac
   def cleanerTest(): Unit = {
     cleaner = makeCleaner(partitions = topicPartitions,
       backOffMs = cleanerBackOffMs,
-      compactionLag = compactionLag,
+      minCompactionLagMs = minCompactionLag,
       segmentSize = segmentSize)
     val log = cleaner.logs.get(topicPartitions(0))
 
@@ -69,13 +69,13 @@ class LogCleanerLagIntegrationTest(compressionCodecName: 
String) extends Abstrac
 
     // T0 < t < T1
     // advance to a time still less than one compaction lag from start
-    time.sleep(compactionLag/2)
+    time.sleep(minCompactionLag/2)
     Thread.sleep(5 * cleanerBackOffMs) // give cleaning thread a chance to 
_not_ clean
     assertEquals("There should be no cleaning until the compaction lag has 
passed", startSizeBlock0, log.size)
 
     // t = T1 > T0 + compactionLag
     // advance to time a bit more than one compaction lag from start
-    time.sleep(compactionLag/2 + 1)
+    time.sleep(minCompactionLag/2 + 1)
     val T1 = time.milliseconds
 
     // write another block of data
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 10dba1a4793..1c7fbd97c40 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -610,6 +610,7 @@ class KafkaConfigTest {
         case KafkaConfig.LogCleanerEnableProp => 
assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean")
         case KafkaConfig.LogCleanerDeleteRetentionMsProp => 
assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
         case KafkaConfig.LogCleanerMinCompactionLagMsProp => 
assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+        case KafkaConfig.LogCleanerMaxCompactionLagMsProp => 
assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
         case KafkaConfig.LogCleanerMinCleanRatioProp => 
assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
         case KafkaConfig.LogIndexSizeMaxBytesProp => 
assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "3")
         case KafkaConfig.LogFlushIntervalMessagesProp => 
assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> ensure timely processing of deletion requests in Kafka topic (Time-based log 
> compaction)
> ----------------------------------------------------------------------------------------
>
>                 Key: KAFKA-7321
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7321
>             Project: Kafka
>          Issue Type: Improvement
>          Components: log
>            Reporter: xiongqi wu
>            Assignee: xiongqi wu
>            Priority: Major
>
> _Compaction enables Kafka to remove old messages that are flagged for 
> deletion while other messages can be retained for a relatively longer time.  
> Today, a log segment may remain un-compacted for a long time since the 
> eligibility for log compaction is determined based on compaction ratio 
> (“min.cleanable.dirty.ratio”) and min compaction lag 
> ("min.compaction.lag.ms") setting.  Ability to delete a log message through 
> compaction in a timely manner has become an important requirement in some use 
> cases (e.g., GDPR).  For example,  one use case is to delete PII (Personal 
> Identifiable information) data within 7 days while keeping non-PII 
> indefinitely in compacted format.  The goal of this change is to provide a 
> time-based compaction policy that ensures the cleanable section is compacted 
> after the specified time interval regardless of dirty ratio and “min 
> compaction lag”.  However, dirty ratio and “min compaction lag” are still 
> honored if the time based compaction rule is not violated. In other words, if 
> Kafka receives a deletion request on a key (e..g, a key with null value), the 
> corresponding log segment will be picked up for compaction after the 
> configured time interval to remove the key._
>  
> _This is to track effort in KIP 354:_
> _https://cwiki.apache.org/confluence/display/KAFKA/KIP-354%3A+Time-based+log+compaction+policy_



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to