Repository: kafka Updated Branches: refs/heads/trunk 1d2d0bac9 -> cd207dd3f
KAFKA-5431; cleanSegments should not set length for cleanable segment files For a compacted topic with preallocate enabled, during log cleaning, LogCleaner.cleanSegments does not have to pre-allocate the underlying file size since we only want to store the cleaned data in the file. It's believed that this fix should also solve KAFKA-5582. Author: huxihx <huxi...@hotmail.com> Reviewers: Jun Rao <jun...@gmail.com> Closes #3525 from huxihx/log_compact_test Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/cd207dd3 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/cd207dd3 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/cd207dd3 Branch: refs/heads/trunk Commit: cd207dd3feb08052a82db90d2e5da956bad5ac33 Parents: 1d2d0ba Author: huxihx <huxi...@hotmail.com> Authored: Thu Jul 20 21:49:27 2017 -0700 Committer: Jun Rao <jun...@gmail.com> Committed: Thu Jul 20 21:49:27 2017 -0700 ---------------------------------------------------------------------- core/src/main/scala/kafka/log/LogCleaner.scala | 3 +++ .../scala/unit/kafka/log/LogCleanerTest.scala | 25 ++++++++++++++++++++ 2 files changed, 28 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/cd207dd3/core/src/main/scala/kafka/log/LogCleaner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index d8a86db..4898d11 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -446,6 +446,9 @@ private[log] class Cleaner(val id: Int, currentSegmentOpt = nextSegmentOpt } + // trim log segment + cleaned.log.trim() + // trim excess index index.trimToValidSize() http://git-wip-us.apache.org/repos/asf/kafka/blob/cd207dd3/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala index 19ea699..3e58c4d 100755 --- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala @@ -89,6 +89,31 @@ class LogCleanerTest extends JUnitSuite { } @Test + def testSizeTrimmedForPreallocatedAndCompactedTopic(): Unit = { + val originalMaxFileSize = 1024; + val cleaner = makeCleaner(2) + val logProps = new Properties() + logProps.put(LogConfig.SegmentBytesProp, originalMaxFileSize: java.lang.Integer) + logProps.put(LogConfig.CleanupPolicyProp, "compact": java.lang.String) + logProps.put(LogConfig.PreAllocateEnableProp, "true": java.lang.String) + val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) + + log.appendAsLeader(record(0,0), leaderEpoch = 0) // offset 0 + log.appendAsLeader(record(1,1), leaderEpoch = 0) // offset 1 + log.appendAsLeader(record(0,0), leaderEpoch = 0) // offset 2 + log.appendAsLeader(record(1,1), leaderEpoch = 0) // offset 3 + log.appendAsLeader(record(0,0), leaderEpoch = 0) // offset 4 + // roll the segment, so we can clean the messages already appended + log.roll() + + // clean the log with only one message removed + cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 2, log.activeSegment.baseOffset)) + + assertTrue("Cleaned segment file should be trimmed to its real size.", + log.logSegments.iterator.next.log.channel().size() < originalMaxFileSize) + } + + @Test def testDuplicateCheckAfterCleaning(): Unit = { val cleaner = makeCleaner(Int.MaxValue) val logProps = new Properties()