[ 
https://issues.apache.org/jira/browse/KAFKA-7400?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16624132#comment-16624132
 ] 

ASF GitHub Bot commented on KAFKA-7400:
---------------------------------------

junrao closed pull request #5646: KAFKA-7400: Compacted topic segments that 
precede the log start offse…
URL: https://github.com/apache/kafka/pull/5646
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/log/Log.scala 
b/core/src/main/scala/kafka/log/Log.scala
index afe151d69b6..c9b877bdca9 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -1353,12 +1353,17 @@ class Log(@volatile var dir: File,
   }
 
   /**
-   * Delete any log segments that have either expired due to time based 
retention
-   * or because the log size is > retentionSize
+   * If topic deletion is enabled, delete any log segments that have either 
expired due to time based retention
+   * or because the log size is > retentionSize.
+   *
+   * Whether or not deletion is enabled, delete any log segments that are 
before the log start offset
    */
   def deleteOldSegments(): Int = {
-    if (!config.delete) return 0
-    deleteRetentionMsBreachedSegments() + 
deleteRetentionSizeBreachedSegments() + deleteLogStartOffsetBreachedSegments()
+    if (config.delete) {
+      deleteRetentionMsBreachedSegments() + 
deleteRetentionSizeBreachedSegments() + deleteLogStartOffsetBreachedSegments()
+    } else {
+      deleteLogStartOffsetBreachedSegments()
+    }
   }
 
   private def deleteRetentionMsBreachedSegments(): Int = {
diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala 
b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index 83d902f952a..680fa94e33e 100755
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -171,12 +171,13 @@ private[log] class LogCleanerManager(val logDirs: 
Seq[File],
   }
 
   /**
-    * Find any logs that have compact and delete enabled
+    * Find any logs that have compaction enabled. Include logs without delete 
enabled, as they may have segments
+    * that precede the start offset.
     */
   def deletableLogs(): Iterable[(TopicPartition, Log)] = {
     inLock(lock) {
       val toClean = logs.filter { case (topicPartition, log) =>
-        !inProgress.contains(topicPartition) && isCompactAndDelete(log)
+        !inProgress.contains(topicPartition) && log.config.compact
       }
       toClean.foreach { case (tp, _) => inProgress.put(tp, 
LogCleaningInProgress) }
       toClean
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala 
b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
index 8cb2f9ec874..3653e282383 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
@@ -77,17 +77,17 @@ class LogCleanerManagerTest extends JUnitSuite with Logging 
{
   }
 
   /**
-    * When looking for logs with segments ready to be deleted we shouldn't 
consider
-    * logs with cleanup.policy=compact as they shouldn't have segments 
truncated.
+    * When looking for logs with segments ready to be deleted we should 
consider
+    * logs with cleanup.policy=compact because they may have segments from 
before the log start offset
     */
   @Test
-  def testLogsWithSegmentsToDeleteShouldNotConsiderCleanupPolicyCompactLogs(): 
Unit = {
+  def testLogsWithSegmentsToDeleteShouldConsiderCleanupPolicyCompactLogs(): 
Unit = {
     val records = TestUtils.singletonRecords("test".getBytes, 
key="test".getBytes)
     val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
     val cleanerManager: LogCleanerManager = createCleanerManager(log)
 
     val readyToDelete = cleanerManager.deletableLogs().size
-    assertEquals("should have 1 logs ready to be deleted", 0, readyToDelete)
+    assertEquals("should have 1 logs ready to be deleted", 1, readyToDelete)
   }
 
   /**
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala 
b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index 38d6f71c7d0..ae8bc01fab2 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -190,13 +190,26 @@ class LogManagerTest {
   }
 
   /**
-    * Ensures that LogManager only runs on logs with cleanup.policy=delete
+    * Ensures that LogManager doesn't run on logs with 
cleanup.policy=compact,delete
     * LogCleaner.CleanerThread handles all logs where compaction is enabled.
     */
   @Test
   def testDoesntCleanLogsWithCompactDeletePolicy() {
+    testDoesntCleanLogs(LogConfig.Compact + "," + LogConfig.Delete)
+  }
+
+  /**
+    * Ensures that LogManager doesn't run on logs with cleanup.policy=compact
+    * LogCleaner.CleanerThread handles all logs where compaction is enabled.
+    */
+  @Test
+  def testDoesntCleanLogsWithCompactPolicy() {
+    testDoesntCleanLogs(LogConfig.Compact)
+  }
+
+  private def testDoesntCleanLogs(policy: String) {
     val logProps = new Properties()
-    logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact + "," + 
LogConfig.Delete)
+    logProps.put(LogConfig.CleanupPolicyProp, policy)
     val log = logManager.getOrCreateLog(new TopicPartition(name, 0), 
LogConfig.fromProps(logConfig.originals, logProps))
     var offset = 0L
     for (_ <- 0 until 200) {
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala 
b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 9a9bc613585..1381bc66fe4 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -2746,6 +2746,32 @@ class LogTest {
     assertEquals("There should be 1 segment remaining", 1, 
log.numberOfSegments)
   }
 
+  @Test
+  def shouldDeleteStartOffsetBreachedSegmentsWhenPolicyDoesNotIncludeDelete(): 
Unit = {
+    def createRecords = TestUtils.singletonRecords("test".getBytes, key = 
"test".getBytes, timestamp = 10L)
+    val logConfig = LogTest.createLogConfig(segmentBytes = 
createRecords.sizeInBytes * 5, retentionMs = 10000, cleanupPolicy = "compact")
+
+    // Create log with start offset ahead of the first log segment
+    val log = createLog(logDir, logConfig, brokerTopicStats, logStartOffset = 
5L)
+
+    // append some messages to create some segments
+    for (_ <- 0 until 15)
+      log.appendAsLeader(createRecords, leaderEpoch = 0)
+
+    // Three segments should be created, with the first one entirely preceding 
the log start offset
+    assertEquals(3, log.logSegments.count(_ => true))
+    assertTrue(log.logSegments.slice(1, 2).head.baseOffset <= 
log.logStartOffset)
+
+    // The first segment, which is entirely before the log start offset, 
should be deleted
+    // Of the remaining the segments, the first can overlap the log start 
offset and the rest must have a base offset
+    // greater than the start offset
+    log.onHighWatermarkIncremented(log.logEndOffset)
+    log.deleteOldSegments()
+    assertEquals("There should be 2 segments remaining", 2, 
log.numberOfSegments)
+    assertTrue(log.logSegments.head.baseOffset <= log.logStartOffset)
+    assertTrue(log.logSegments.tail.forall(s => s.baseOffset > 
log.logStartOffset))
+  }
+
   @Test
   def shouldApplyEpochToMessageOnAppendIfLeader() {
     val records = (0 until 50).toArray.map(id => new 
SimpleRecord(id.toString.getBytes))


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Compacted topic segments that precede the log start offset are not cleaned up
> -----------------------------------------------------------------------------
>
>                 Key: KAFKA-7400
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7400
>             Project: Kafka
>          Issue Type: Bug
>          Components: log
>    Affects Versions: 1.1.0, 1.1.1, 2.0.0
>            Reporter: Bob Barrett
>            Assignee: Bob Barrett
>            Priority: Minor
>
> LogManager.cleanupLogs currently checks if a topic is compacted, and skips 
> any deletion if it is. This means that if the log start offset increases, log 
> segments that precede the start offset will never be deleted. The log cleanup 
> logic should be improved to delete these segments even for compacted topics.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to