Repository: kafka
Updated Branches:
  refs/heads/trunk 1d2d0bac9 -> cd207dd3f


KAFKA-5431; cleanSegments should not set length for cleanable segment files

For a compacted topic with preallocate enabled, during log cleaning, 
LogCleaner.cleanSegments does not have to pre-allocate the underlying file size 
since we only want to store the cleaned data in the file.

It's believed that this fix should also solve KAFKA-5582.

Author: huxihx <huxi...@hotmail.com>

Reviewers: Jun Rao <jun...@gmail.com>

Closes #3525 from huxihx/log_compact_test


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/cd207dd3
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/cd207dd3
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/cd207dd3

Branch: refs/heads/trunk
Commit: cd207dd3feb08052a82db90d2e5da956bad5ac33
Parents: 1d2d0ba
Author: huxihx <huxi...@hotmail.com>
Authored: Thu Jul 20 21:49:27 2017 -0700
Committer: Jun Rao <jun...@gmail.com>
Committed: Thu Jul 20 21:49:27 2017 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/LogCleaner.scala  |  3 +++
 .../scala/unit/kafka/log/LogCleanerTest.scala   | 25 ++++++++++++++++++++
 2 files changed, 28 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/cd207dd3/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala 
b/core/src/main/scala/kafka/log/LogCleaner.scala
index d8a86db..4898d11 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -446,6 +446,9 @@ private[log] class Cleaner(val id: Int,
         currentSegmentOpt = nextSegmentOpt
       }
 
+      // trim log segment
+      cleaned.log.trim()
+
       // trim excess index
       index.trimToValidSize()
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/cd207dd3/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala 
b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 19ea699..3e58c4d 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -89,6 +89,31 @@ class LogCleanerTest extends JUnitSuite {
   }
 
   @Test
+  def testSizeTrimmedForPreallocatedAndCompactedTopic(): Unit = {
+    val originalMaxFileSize = 1024;
+    val cleaner = makeCleaner(2)
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, originalMaxFileSize: 
java.lang.Integer)
+    logProps.put(LogConfig.CleanupPolicyProp, "compact": java.lang.String)
+    logProps.put(LogConfig.PreAllocateEnableProp, "true": java.lang.String)
+    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, 
logProps))
+
+    log.appendAsLeader(record(0,0), leaderEpoch = 0) // offset 0
+    log.appendAsLeader(record(1,1), leaderEpoch = 0) // offset 1
+    log.appendAsLeader(record(0,0), leaderEpoch = 0) // offset 2
+    log.appendAsLeader(record(1,1), leaderEpoch = 0) // offset 3
+    log.appendAsLeader(record(0,0), leaderEpoch = 0) // offset 4
+    // roll the segment, so we can clean the messages already appended
+    log.roll()
+
+    // clean the log with only one message removed
+    cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 2, 
log.activeSegment.baseOffset))
+
+    assertTrue("Cleaned segment file should be trimmed to its real size.",
+      log.logSegments.iterator.next.log.channel().size() < originalMaxFileSize)
+  }
+
+  @Test
   def testDuplicateCheckAfterCleaning(): Unit = {
     val cleaner = makeCleaner(Int.MaxValue)
     val logProps = new Properties()

Reply via email to