Repository: kafka Updated Branches: refs/heads/trunk 2ac70c0b7 -> 302a246c7
KAFKA-4296; Fix LogCleaner statistics rolling Author: Jason Gustafson <ja...@confluent.io> Reviewers: Ismael Juma <ism...@juma.me.uk> Closes #2016 from hachikuji/KAFKA-4296 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/302a246c Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/302a246c Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/302a246c Branch: refs/heads/trunk Commit: 302a246c72677b1045696f4d6dda51da8c6a1da0 Parents: 2ac70c0 Author: Jason Gustafson <ja...@confluent.io> Authored: Fri Oct 21 14:41:04 2016 -0700 Committer: Jason Gustafson <ja...@confluent.io> Committed: Fri Oct 21 14:41:04 2016 -0700 ---------------------------------------------------------------------- core/src/main/scala/kafka/log/LogCleaner.scala | 102 ++- .../test/scala/unit/kafka/log/CleanerTest.scala | 801 ------------------ .../scala/unit/kafka/log/LogCleanerTest.scala | 839 +++++++++++++++++++ 3 files changed, 896 insertions(+), 846 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/302a246c/core/src/main/scala/kafka/log/LogCleaner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 219957f..bb8a89a 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -236,8 +236,9 @@ class LogCleaner(val config: CleanerConfig, // there's a log, clean it var endOffset = cleanable.firstDirtyOffset try { - endOffset = cleaner.clean(cleanable) - recordStats(cleaner.id, cleanable.log.name, cleanable.firstDirtyOffset, endOffset, cleaner.stats) + val (nextDirtyOffset, cleanerStats) = cleaner.clean(cleanable) + recordStats(cleaner.id, cleanable.log.name, cleanable.firstDirtyOffset, endOffset, cleanerStats) + endOffset = nextDirtyOffset } catch { case pe: LogCleaningAbortedException => // task can be aborted, let it go. } finally { @@ -263,7 +264,6 @@ class LogCleaner(val config: CleanerConfig, */ def recordStats(id: Int, name: String, from: Long, to: Long, stats: CleanerStats) { this.lastStats = stats - cleaner.statsUnderlying.swap def mb(bytes: Double) = bytes / (1024*1024) val message = "%n\tLog cleaner thread %d cleaned log %s (dirty section = [%d, %d])%n".format(id, name, from, to) + @@ -314,10 +314,6 @@ private[log] class Cleaner(val id: Int, override val loggerName = classOf[LogCleaner].getName this.logIdent = "Cleaner " + id + ": " - - /* cleaning stats - one instance for the current (or next) cleaning cycle and one for the last completed cycle */ - val statsUnderlying = (new CleanerStats(time), new CleanerStats(time)) - def stats = statsUnderlying._1 /* buffer used for read i/o */ private var readBuffer = ByteBuffer.allocate(ioBufferSize) @@ -332,17 +328,18 @@ private[log] class Cleaner(val id: Int, * * @param cleanable The log to be cleaned * - * @return The first offset not cleaned + * @return The first offset not cleaned and the statistics for this round of cleaning */ - private[log] def clean(cleanable: LogToClean): Long = { - stats.clear() + private[log] def clean(cleanable: LogToClean): (Long, CleanerStats) = { + val stats = new CleanerStats() + info("Beginning cleaning of log %s.".format(cleanable.log.name)) val log = cleanable.log // build the offset map info("Building offset map for %s...".format(cleanable.log.name)) val upperBoundOffset = cleanable.firstUncleanableOffset - buildOffsetMap(log, cleanable.firstDirtyOffset, upperBoundOffset, offsetMap) + buildOffsetMap(log, cleanable.firstDirtyOffset, upperBoundOffset, offsetMap, stats) val endOffset = offsetMap.latestOffset + 1 stats.indexDone() @@ -361,14 +358,14 @@ private[log] class Cleaner(val id: Int, // group the segments and clean the groups info("Cleaning log %s (cleaning prior to %s, discarding tombstones prior to %s)...".format(log.name, new Date(cleanableHorizionMs), new Date(deleteHorizonMs))) for (group <- groupSegmentsBySize(log.logSegments(0, endOffset), log.config.segmentSize, log.config.maxIndexSize)) - cleanSegments(log, group, offsetMap, deleteHorizonMs) + cleanSegments(log, group, offsetMap, deleteHorizonMs, stats) // record buffer utilization stats.bufferUtilization = offsetMap.utilization stats.allDone() - endOffset + (endOffset, stats) } /** @@ -378,11 +375,13 @@ private[log] class Cleaner(val id: Int, * @param segments The group of segments being cleaned * @param map The offset map to use for cleaning segments * @param deleteHorizonMs The time to retain delete tombstones + * @param stats Collector for cleaning statistics */ private[log] def cleanSegments(log: Log, segments: Seq[LogSegment], map: OffsetMap, - deleteHorizonMs: Long) { + deleteHorizonMs: Long, + stats: CleanerStats) { // create a new segment with the suffix .cleaned appended to both the log and index name val logFile = new File(segments.head.log.file.getPath + Log.CleanedFileSuffix) logFile.delete() @@ -401,7 +400,7 @@ private[log] class Cleaner(val id: Int, val retainDeletes = old.largestTimestamp > deleteHorizonMs info("Cleaning segment %s in log %s (largest timestamp %s) into %s, %s deletes." .format(old.baseOffset, log.name, new Date(old.largestTimestamp), cleaned.baseOffset, if(retainDeletes) "retaining" else "discarding")) - cleanInto(log.topicAndPartition, old, cleaned, map, retainDeletes, log.config.maxMessageSize) + cleanInto(log.topicAndPartition, old, cleaned, map, retainDeletes, log.config.maxMessageSize, stats) } // trim excess index @@ -440,13 +439,15 @@ private[log] class Cleaner(val id: Int, * @param map The key=>offset mapping * @param retainDeletes Should delete tombstones be retained while cleaning this segment * @param maxLogMessageSize The maximum message size of the corresponding topic + * @param stats Collector for cleaning statistics */ private[log] def cleanInto(topicAndPartition: TopicAndPartition, source: LogSegment, dest: LogSegment, map: OffsetMap, retainDeletes: Boolean, - maxLogMessageSize: Int) { + maxLogMessageSize: Int, + stats: CleanerStats) { var position = 0 while (position < source.log.sizeInBytes) { checkDone(topicAndPartition) @@ -466,7 +467,7 @@ private[log] class Cleaner(val id: Int, stats.readMessage(size) if (shallowMessage.compressionCodec == NoCompressionCodec) { - if (shouldRetainMessage(source, map, retainDeletes, shallowMessageAndOffset)) { + if (shouldRetainMessage(source, map, retainDeletes, shallowMessageAndOffset, stats)) { ByteBufferMessageSet.writeMessage(writeBuffer, shallowMessage, shallowOffset) stats.recopyMessage(size) if (shallowMessage.timestamp > maxTimestamp) { @@ -487,7 +488,7 @@ private[log] class Cleaner(val id: Int, for (deepMessageAndOffset <- ByteBufferMessageSet.deepIterator(shallowMessageAndOffset)) { messagesRead += 1 - if (shouldRetainMessage(source, map, retainDeletes, deepMessageAndOffset)) { + if (shouldRetainMessage(source, map, retainDeletes, deepMessageAndOffset, stats)) { // Check for log corruption due to KAFKA-4298. If we find it, make sure that we overwrite // the corrupted entry with correct data. if (shallowMagic != deepMessageAndOffset.message.magic) @@ -505,8 +506,10 @@ private[log] class Cleaner(val id: Int, // There are no messages compacted out and no message format conversion, write the original message set back if (writeOriginalMessageSet) ByteBufferMessageSet.writeMessage(writeBuffer, shallowMessage, shallowOffset) - else - compressMessages(writeBuffer, shallowMessage.compressionCodec, retainedMessages) + else if (retainedMessages.nonEmpty) { + val retainedSize = compressMessages(writeBuffer, shallowMessage.compressionCodec, retainedMessages) + stats.recopyMessage(retainedSize) + } } } @@ -529,9 +532,12 @@ private[log] class Cleaner(val id: Int, private def compressMessages(buffer: ByteBuffer, compressionCodec: CompressionCodec, - messageAndOffsets: Seq[MessageAndOffset]) { + messageAndOffsets: Seq[MessageAndOffset]): Int = { require(compressionCodec != NoCompressionCodec, s"compressionCodec must not be $NoCompressionCodec") - if (messageAndOffsets.nonEmpty) { + + if (messageAndOffsets.isEmpty) { + 0 + } else { val messages = messageAndOffsets.map(_.message) val magicAndTimestamp = MessageSet.magicAndLargestTimestamp(messages) @@ -565,14 +571,15 @@ private[log] class Cleaner(val id: Int, } } ByteBufferMessageSet.writeMessage(buffer, messageWriter, offset) - stats.recopyMessage(messageWriter.size + MessageSet.LogOverhead) + messageWriter.size + MessageSet.LogOverhead } } private def shouldRetainMessage(source: kafka.log.LogSegment, map: kafka.log.OffsetMap, retainDeletes: Boolean, - entry: kafka.message.MessageAndOffset): Boolean = { + entry: kafka.message.MessageAndOffset, + stats: CleanerStats): Boolean = { val pastLatestOffset = entry.offset > map.latestOffset if (pastLatestOffset) return true @@ -658,8 +665,13 @@ private[log] class Cleaner(val id: Int, * @param start The offset at which dirty messages begin * @param end The ending offset for the map that is being built * @param map The map in which to store the mappings + * @param stats Collector for cleaning statistics */ - private[log] def buildOffsetMap(log: Log, start: Long, end: Long, map: OffsetMap) { + private[log] def buildOffsetMap(log: Log, + start: Long, + end: Long, + map: OffsetMap, + stats: CleanerStats) { map.clear() val dirty = log.logSegments(start, end).toBuffer info("Building offset map for log %s for %d segments in offset range [%d, %d).".format(log.name, dirty.size, start, end)) @@ -670,7 +682,7 @@ private[log] class Cleaner(val id: Int, for (segment <- dirty if !full) { checkDone(log.topicAndPartition) - full = buildOffsetMapForSegment(log.topicAndPartition, segment, map, start, log.config.maxMessageSize) + full = buildOffsetMapForSegment(log.topicAndPartition, segment, map, start, log.config.maxMessageSize, stats) if (full) debug("Offset map is full, %d segments fully mapped, segment with base offset %d is partially mapped".format(dirty.indexOf(segment), segment.baseOffset)) } @@ -682,10 +694,16 @@ private[log] class Cleaner(val id: Int, * * @param segment The segment to index * @param map The map in which to store the key=>offset mapping + * @param stats Collector for cleaning statistics * * @return If the map was filled whilst loading from this segment */ - private def buildOffsetMapForSegment(topicAndPartition: TopicAndPartition, segment: LogSegment, map: OffsetMap, start: Long, maxLogMessageSize: Int): Boolean = { + private def buildOffsetMapForSegment(topicAndPartition: TopicAndPartition, + segment: LogSegment, + map: OffsetMap, + start: Long, + maxLogMessageSize: Int, + stats: CleanerStats): Boolean = { var position = segment.index.lookup(start).position val maxDesiredMapSize = (map.slots * this.dupBufferLoadFactor).toInt while (position < segment.log.sizeInBytes) { @@ -719,12 +737,19 @@ private[log] class Cleaner(val id: Int, /** * A simple struct for collecting stats about log cleaning */ -private case class CleanerStats(time: Time = SystemTime) { - var startTime, mapCompleteTime, endTime, bytesRead, bytesWritten, mapBytesRead, mapMessagesRead, messagesRead, - messagesWritten, invalidMessagesRead = 0L +private class CleanerStats(time: Time = SystemTime) { + val startTime = time.milliseconds + var mapCompleteTime = -1L + var endTime = -1L + var bytesRead = 0L + var bytesWritten = 0L + var mapBytesRead = 0L + var mapMessagesRead = 0L + var messagesRead = 0L + var invalidMessagesRead = 0L + var messagesWritten = 0L var bufferUtilization = 0.0d - clear() - + def readMessage(size: Int) { messagesRead += 1 bytesRead += size @@ -759,19 +784,6 @@ private case class CleanerStats(time: Time = SystemTime) { def elapsedIndexSecs = (mapCompleteTime - startTime)/1000.0 - def clear() { - startTime = time.milliseconds - mapCompleteTime = -1L - endTime = -1L - bytesRead = 0L - bytesWritten = 0L - mapBytesRead = 0L - mapMessagesRead = 0L - messagesRead = 0L - invalidMessagesRead = 0L - messagesWritten = 0L - bufferUtilization = 0.0d - } } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/302a246c/core/src/test/scala/unit/kafka/log/CleanerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/CleanerTest.scala b/core/src/test/scala/unit/kafka/log/CleanerTest.scala deleted file mode 100755 index 536f10d..0000000 --- a/core/src/test/scala/unit/kafka/log/CleanerTest.scala +++ /dev/null @@ -1,801 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.log - -import java.io.{DataOutputStream, File} -import java.nio._ -import java.nio.file.Paths -import java.util.Properties - -import kafka.common._ -import kafka.message._ -import kafka.utils._ -import org.apache.kafka.common.record.{MemoryRecords, TimestampType} -import org.apache.kafka.common.utils.Utils -import org.junit.Assert._ -import org.junit.{After, Test} -import org.scalatest.junit.JUnitSuite - -import scala.collection._ - -/** - * Unit tests for the log cleaning logic - */ -class CleanerTest extends JUnitSuite { - - val tmpdir = TestUtils.tempDir() - val dir = TestUtils.randomPartitionLogDir(tmpdir) - val logProps = new Properties() - logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer) - logProps.put(LogConfig.SegmentIndexBytesProp, 1024: java.lang.Integer) - logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact) - val logConfig = LogConfig(logProps) - val time = new MockTime() - val throttler = new Throttler(desiredRatePerSec = Double.MaxValue, checkIntervalMs = Long.MaxValue, time = time) - - @After - def teardown(): Unit = { - Utils.delete(tmpdir) - } - - /** - * Test simple log cleaning - */ - @Test - def testCleanSegments(): Unit = { - val cleaner = makeCleaner(Int.MaxValue) - val logProps = new Properties() - logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer) - - val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) - - // append messages to the log until we have four segments - while(log.numberOfSegments < 4) - log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt)) - val keysFound = keysInLog(log) - assertEquals(0L until log.logEndOffset, keysFound) - - // pretend we have the following keys - val keys = immutable.ListSet(1, 3, 5, 7, 9) - val map = new FakeOffsetMap(Int.MaxValue) - keys.foreach(k => map.put(key(k), Long.MaxValue)) - - // clean the log - cleaner.cleanSegments(log, log.logSegments.take(3).toSeq, map, 0L) - val shouldRemain = keysInLog(log).filter(!keys.contains(_)) - assertEquals(shouldRemain, keysInLog(log)) - } - - /** - * Test log cleaning with logs containing messages larger than default message size - */ - @Test - def testLargeMessage() { - val largeMessageSize = 1024 * 1024 - // Create cleaner with very small default max message size - val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024) - val logProps = new Properties() - logProps.put(LogConfig.SegmentBytesProp, largeMessageSize * 16: java.lang.Integer) - logProps.put(LogConfig.MaxMessageBytesProp, largeMessageSize * 2: java.lang.Integer) - - val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) - - while(log.numberOfSegments < 2) - log.append(message(log.logEndOffset.toInt, Array.fill(largeMessageSize)(0: Byte))) - val keysFound = keysInLog(log) - assertEquals(0L until log.logEndOffset, keysFound) - - // pretend we have the following keys - val keys = immutable.ListSet(1, 3, 5, 7, 9) - val map = new FakeOffsetMap(Int.MaxValue) - keys.foreach(k => map.put(key(k), Long.MaxValue)) - - // clean the log - cleaner.cleanSegments(log, Seq(log.logSegments.head), map, 0L) - val shouldRemain = keysInLog(log).filter(!keys.contains(_)) - assertEquals(shouldRemain, keysInLog(log)) - } - - @Test - def testCleaningWithDeletes(): Unit = { - val cleaner = makeCleaner(Int.MaxValue) - val logProps = new Properties() - logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer) - - val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) - - // append messages with the keys 0 through N - while(log.numberOfSegments < 2) - log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt)) - - // delete all even keys between 0 and N - val leo = log.logEndOffset - for(key <- 0 until leo.toInt by 2) - log.append(deleteMessage(key)) - - // append some new unique keys to pad out to a new active segment - while(log.numberOfSegments < 4) - log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt)) - - cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 0, log.activeSegment.baseOffset)) - val keys = keysInLog(log).toSet - assertTrue("None of the keys we deleted should still exist.", - (0 until leo.toInt by 2).forall(!keys.contains(_))) - } - - @Test - def testPartialSegmentClean(): Unit = { - // because loadFactor is 0.75, this means we can fit 2 messages in the map - val cleaner = makeCleaner(2) - val logProps = new Properties() - logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer) - - val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) - - log.append(message(0,0)) // offset 0 - log.append(message(1,1)) // offset 1 - log.append(message(0,0)) // offset 2 - log.append(message(1,1)) // offset 3 - log.append(message(0,0)) // offset 4 - // roll the segment, so we can clean the messages already appended - log.roll() - - // clean the log with only one message removed - cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 2, log.activeSegment.baseOffset)) - assertEquals(immutable.List(1,0,1,0), keysInLog(log)) - assertEquals(immutable.List(1,2,3,4), offsetsInLog(log)) - - // continue to make progress, even though we can only clean one message at a time - cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 3, log.activeSegment.baseOffset)) - assertEquals(immutable.List(0,1,0), keysInLog(log)) - assertEquals(immutable.List(2,3,4), offsetsInLog(log)) - - cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 4, log.activeSegment.baseOffset)) - assertEquals(immutable.List(1,0), keysInLog(log)) - assertEquals(immutable.List(3,4), offsetsInLog(log)) - } - - @Test - def testCleaningWithUncleanableSection(): Unit = { - val cleaner = makeCleaner(Int.MaxValue) - val logProps = new Properties() - logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer) - - val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) - - // Number of distinct keys. For an effective test this should be small enough such that each log segment contains some duplicates. - val N = 10 - val numCleanableSegments = 2 - val numTotalSegments = 7 - - // append messages with the keys 0 through N-1, values equal offset - while(log.numberOfSegments <= numCleanableSegments) - log.append(message(log.logEndOffset.toInt % N, log.logEndOffset.toInt)) - - // at this point one message past the cleanable segments has been added - // the entire segment containing the first uncleanable offset should not be cleaned. - val firstUncleanableOffset = log.logEndOffset + 1 // +1 so it is past the baseOffset - - while(log.numberOfSegments < numTotalSegments - 1) - log.append(message(log.logEndOffset.toInt % N, log.logEndOffset.toInt)) - - // the last (active) segment has just one message - - def distinctValuesBySegment = log.logSegments.map(s => s.log.map(m => TestUtils.readString(m.message.payload)).toSet.size).toSeq - - val disctinctValuesBySegmentBeforeClean = distinctValuesBySegment - assertTrue("Test is not effective unless each segment contains duplicates. Increase segment size or decrease number of keys.", - distinctValuesBySegment.reverse.tail.forall(_ > N)) - - cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 0, firstUncleanableOffset)) - - val distinctValuesBySegmentAfterClean = distinctValuesBySegment - - assertTrue("The cleanable segments should have fewer number of values after cleaning", - disctinctValuesBySegmentBeforeClean.zip(distinctValuesBySegmentAfterClean).take(numCleanableSegments).forall { case (before, after) => after < before }) - assertTrue("The uncleanable segments should have the same number of values after cleaning", disctinctValuesBySegmentBeforeClean.zip(distinctValuesBySegmentAfterClean) - .slice(numCleanableSegments, numTotalSegments).forall { x => x._1 == x._2 }) - } - - @Test - def testLogToClean(): Unit = { - // create a log with small segment size - val logProps = new Properties() - logProps.put(LogConfig.SegmentBytesProp, 100: java.lang.Integer) - val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) - - // create 6 segments with only one message in each segment - val messageSet = TestUtils.singleMessageSet(payload = Array.fill[Byte](50)(0), key = 1.toString.getBytes) - for (i <- 0 until 6) - log.append(messageSet, assignOffsets = true) - - val logToClean = LogToClean(TopicAndPartition("test", 0), log, log.activeSegment.baseOffset, log.activeSegment.baseOffset) - - assertEquals("Total bytes of LogToClean should equal size of all segments excluding the active segment", - logToClean.totalBytes, log.size - log.activeSegment.size) - } - - @Test - def testLogToCleanWithUncleanableSection(): Unit = { - // create a log with small segment size - val logProps = new Properties() - logProps.put(LogConfig.SegmentBytesProp, 100: java.lang.Integer) - val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) - - // create 6 segments with only one message in each segment - val messageSet = TestUtils.singleMessageSet(payload = Array.fill[Byte](50)(0), key = 1.toString.getBytes) - for (i <- 0 until 6) - log.append(messageSet, assignOffsets = true) - - // segments [0,1] are clean; segments [2, 3] are cleanable; segments [4,5] are uncleanable - val segs = log.logSegments.toSeq - val logToClean = LogToClean(TopicAndPartition("test", 0), log, segs(2).baseOffset, segs(4).baseOffset) - - val expectedCleanSize = segs.take(2).map(_.size).sum - val expectedCleanableSize = segs.slice(2, 4).map(_.size).sum - val expectedUncleanableSize = segs.drop(4).map(_.size).sum - - assertEquals("Uncleanable bytes of LogToClean should equal size of all segments prior the one containing first dirty", - logToClean.cleanBytes, expectedCleanSize) - assertEquals("Cleanable bytes of LogToClean should equal size of all segments from the one containing first dirty offset" + - " to the segment prior to the one with the first uncleanable offset", - logToClean.cleanableBytes, expectedCleanableSize) - assertEquals("Total bytes should be the sum of the clean and cleanable segments", logToClean.totalBytes, expectedCleanSize + expectedCleanableSize) - assertEquals("Total cleanable ratio should be the ratio of cleanable size to clean plus cleanable", logToClean.cleanableRatio, - expectedCleanableSize / (expectedCleanSize + expectedCleanableSize).toDouble, 1.0e-6d) - } - - @Test - def testCleaningWithUnkeyedMessages(): Unit = { - val cleaner = makeCleaner(Int.MaxValue) - - // create a log with compaction turned off so we can append unkeyed messages - val logProps = new Properties() - logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer) - logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Delete) - - val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) - - // append unkeyed messages - while(log.numberOfSegments < 2) - log.append(unkeyedMessage(log.logEndOffset.toInt)) - val numInvalidMessages = unkeyedMessageCountInLog(log) - - val sizeWithUnkeyedMessages = log.size - - // append keyed messages - while(log.numberOfSegments < 3) - log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt)) - - val expectedSizeAfterCleaning = log.size - sizeWithUnkeyedMessages - cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 0, log.activeSegment.baseOffset)) - - assertEquals("Log should only contain keyed messages after cleaning.", 0, unkeyedMessageCountInLog(log)) - assertEquals("Log should only contain keyed messages after cleaning.", expectedSizeAfterCleaning, log.size) - assertEquals("Cleaner should have seen %d invalid messages.", numInvalidMessages, cleaner.stats.invalidMessagesRead) - } - - /* extract all the keys from a log */ - def keysInLog(log: Log): Iterable[Int] = - log.logSegments.flatMap(s => s.log.filter(!_.message.isNull).filter(_.message.hasKey).map(m => TestUtils.readString(m.message.key).toInt)) - - /* extract all the offsets from a log */ - def offsetsInLog(log: Log): Iterable[Long] = - log.logSegments.flatMap(s => s.log.filter(!_.message.isNull).filter(_.message.hasKey).map(m => m.offset)) - - def unkeyedMessageCountInLog(log: Log) = - log.logSegments.map(s => s.log.filter(!_.message.isNull).count(m => !m.message.hasKey)).sum - - def abortCheckDone(topicAndPartition: TopicAndPartition): Unit = { - throw new LogCleaningAbortedException() - } - - /** - * Test that abortion during cleaning throws a LogCleaningAbortedException - */ - @Test - def testCleanSegmentsWithAbort(): Unit = { - val cleaner = makeCleaner(Int.MaxValue, abortCheckDone) - val logProps = new Properties() - logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer) - - val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) - - // append messages to the log until we have four segments - while(log.numberOfSegments < 4) - log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt)) - - val keys = keysInLog(log) - val map = new FakeOffsetMap(Int.MaxValue) - keys.foreach(k => map.put(key(k), Long.MaxValue)) - intercept[LogCleaningAbortedException] { - cleaner.cleanSegments(log, log.logSegments.take(3).toSeq, map, 0L) - } - } - - /** - * Validate the logic for grouping log segments together for cleaning - */ - @Test - def testSegmentGrouping(): Unit = { - val cleaner = makeCleaner(Int.MaxValue) - val logProps = new Properties() - logProps.put(LogConfig.SegmentBytesProp, 300: java.lang.Integer) - logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer) - - val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) - - // append some messages to the log - var i = 0 - while(log.numberOfSegments < 10) { - log.append(TestUtils.singleMessageSet(payload = "hello".getBytes, key = "hello".getBytes)) - i += 1 - } - - // grouping by very large values should result in a single group with all the segments in it - var groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue) - assertEquals(1, groups.size) - assertEquals(log.numberOfSegments, groups.head.size) - checkSegmentOrder(groups) - - // grouping by very small values should result in all groups having one entry - groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = 1, maxIndexSize = Int.MaxValue) - assertEquals(log.numberOfSegments, groups.size) - assertTrue("All groups should be singletons.", groups.forall(_.size == 1)) - checkSegmentOrder(groups) - groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = 1) - assertEquals(log.numberOfSegments, groups.size) - assertTrue("All groups should be singletons.", groups.forall(_.size == 1)) - checkSegmentOrder(groups) - - val groupSize = 3 - - // check grouping by log size - val logSize = log.logSegments.take(groupSize).map(_.size).sum.toInt + 1 - groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = logSize, maxIndexSize = Int.MaxValue) - checkSegmentOrder(groups) - assertTrue("All but the last group should be the target size.", groups.dropRight(1).forall(_.size == groupSize)) - - // check grouping by index size - val indexSize = log.logSegments.take(groupSize).map(_.index.sizeInBytes).sum + 1 - groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = indexSize) - checkSegmentOrder(groups) - assertTrue("All but the last group should be the target size.", groups.dropRight(1).forall(_.size == groupSize)) - } - - /** - * Validate the logic for grouping log segments together for cleaning when only a small number of - * messages are retained, but the range of offsets is greater than Int.MaxValue. A group should not - * contain a range of offsets greater than Int.MaxValue to ensure that relative offsets can be - * stored in 4 bytes. - */ - @Test - def testSegmentGroupingWithSparseOffsets(): Unit = { - val cleaner = makeCleaner(Int.MaxValue) - - val logProps = new Properties() - logProps.put(LogConfig.SegmentBytesProp, 300: java.lang.Integer) - logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer) - - val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) - - // fill up first segment - while (log.numberOfSegments == 1) - log.append(TestUtils.singleMessageSet(payload = "hello".getBytes, key = "hello".getBytes)) - - // forward offset and append message to next segment at offset Int.MaxValue - val messageSet = new ByteBufferMessageSet(NoCompressionCodec, new LongRef(Int.MaxValue - 1), - new Message("hello".getBytes, "hello".getBytes, Message.NoTimestamp, Message.MagicValue_V1)) - log.append(messageSet, assignOffsets = false) - log.append(TestUtils.singleMessageSet(payload = "hello".getBytes, key = "hello".getBytes)) - assertEquals(Int.MaxValue, log.activeSegment.index.lastOffset) - - // grouping should result in a single group with maximum relative offset of Int.MaxValue - var groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue) - assertEquals(1, groups.size) - - // append another message, making last offset of second segment > Int.MaxValue - log.append(TestUtils.singleMessageSet(payload = "hello".getBytes, key = "hello".getBytes)) - - // grouping should not group the two segments to ensure that maximum relative offset in each group <= Int.MaxValue - groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue) - assertEquals(2, groups.size) - checkSegmentOrder(groups) - - // append more messages, creating new segments, further grouping should still occur - while (log.numberOfSegments < 4) - log.append(TestUtils.singleMessageSet(payload = "hello".getBytes, key = "hello".getBytes)) - - groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue) - assertEquals(log.numberOfSegments - 1, groups.size) - for (group <- groups) - assertTrue("Relative offset greater than Int.MaxValue", group.last.index.lastOffset - group.head.index.baseOffset <= Int.MaxValue) - checkSegmentOrder(groups) - - } - - private def checkSegmentOrder(groups: Seq[Seq[LogSegment]]): Unit = { - val offsets = groups.flatMap(_.map(_.baseOffset)) - assertEquals("Offsets should be in increasing order.", offsets.sorted, offsets) - } - - /** - * Test building an offset map off the log - */ - @Test - def testBuildOffsetMap(): Unit = { - val map = new FakeOffsetMap(1000) - val log = makeLog() - val cleaner = makeCleaner(Int.MaxValue) - val start = 0 - val end = 500 - val offsets = writeToLog(log, (start until end) zip (start until end)) - def checkRange(map: FakeOffsetMap, start: Int, end: Int) { - cleaner.buildOffsetMap(log, start, end, map) - val endOffset = map.latestOffset + 1 - assertEquals("Last offset should be the end offset.", end, endOffset) - assertEquals("Should have the expected number of messages in the map.", end-start, map.size) - for(i <- start until end) - assertEquals("Should find all the keys", i.toLong, map.get(key(i))) - assertEquals("Should not find a value too small", -1L, map.get(key(start - 1))) - assertEquals("Should not find a value too large", -1L, map.get(key(end))) - } - val segments = log.logSegments.toSeq - checkRange(map, 0, segments(1).baseOffset.toInt) - checkRange(map, segments(1).baseOffset.toInt, segments(3).baseOffset.toInt) - checkRange(map, segments(3).baseOffset.toInt, log.logEndOffset.toInt) - } - - - /** - * Tests recovery if broker crashes at the following stages during the cleaning sequence - * <ol> - * <li> Cleaner has created .cleaned log containing multiple segments, swap sequence not yet started - * <li> .cleaned log renamed to .swap, old segment files not yet renamed to .deleted - * <li> .cleaned log renamed to .swap, old segment files renamed to .deleted, but not yet deleted - * <li> .swap suffix removed, completing the swap, but async delete of .deleted files not yet complete - * </ol> - */ - @Test - def testRecoveryAfterCrash(): Unit = { - val cleaner = makeCleaner(Int.MaxValue) - val logProps = new Properties() - logProps.put(LogConfig.SegmentBytesProp, 300: java.lang.Integer) - logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer) - logProps.put(LogConfig.FileDeleteDelayMsProp, 10: java.lang.Integer) - - val config = LogConfig.fromProps(logConfig.originals, logProps) - - def recoverAndCheck(config: LogConfig, expectedKeys : Iterable[Int]) : Log = { - // Recover log file and check that after recovery, keys are as expected - // and all temporary files have been deleted - val recoveredLog = makeLog(config = config) - time.sleep(config.fileDeleteDelayMs + 1) - for (file <- dir.listFiles) { - assertFalse("Unexpected .deleted file after recovery", file.getName.endsWith(Log.DeletedFileSuffix)) - assertFalse("Unexpected .cleaned file after recovery", file.getName.endsWith(Log.CleanedFileSuffix)) - assertFalse("Unexpected .swap file after recovery", file.getName.endsWith(Log.SwapFileSuffix)) - } - assertEquals(expectedKeys, keysInLog(recoveredLog)) - recoveredLog - } - - // create a log and append some messages - var log = makeLog(config = config) - var messageCount = 0 - while(log.numberOfSegments < 10) { - log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt)) - messageCount += 1 - } - val allKeys = keysInLog(log) - - // pretend we have odd-numbered keys - val offsetMap = new FakeOffsetMap(Int.MaxValue) - for (k <- 1 until messageCount by 2) - offsetMap.put(key(k), Long.MaxValue) - - // clean the log - cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L) - var cleanedKeys = keysInLog(log) - - // 1) Simulate recovery just after .cleaned file is created, before rename to .swap - // On recovery, clean operation is aborted. All messages should be present in the log - log.logSegments.head.changeFileSuffixes("", Log.CleanedFileSuffix) - for (file <- dir.listFiles if file.getName.endsWith(Log.DeletedFileSuffix)) { - Utils.atomicMoveWithFallback(file.toPath, Paths.get(CoreUtils.replaceSuffix(file.getPath, Log.DeletedFileSuffix, ""))) - } - log = recoverAndCheck(config, allKeys) - - // clean again - cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L) - cleanedKeys = keysInLog(log) - - // 2) Simulate recovery just after swap file is created, before old segment files are - // renamed to .deleted. Clean operation is resumed during recovery. - log.logSegments.head.changeFileSuffixes("", Log.SwapFileSuffix) - for (file <- dir.listFiles if file.getName.endsWith(Log.DeletedFileSuffix)) { - Utils.atomicMoveWithFallback(file.toPath, Paths.get(CoreUtils.replaceSuffix(file.getPath, Log.DeletedFileSuffix, ""))) - } - log = recoverAndCheck(config, cleanedKeys) - - // add some more messages and clean the log again - while(log.numberOfSegments < 10) { - log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt)) - messageCount += 1 - } - for (k <- 1 until messageCount by 2) - offsetMap.put(key(k), Long.MaxValue) - cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L) - cleanedKeys = keysInLog(log) - - // 3) Simulate recovery after swap file is created and old segments files are renamed - // to .deleted. Clean operation is resumed during recovery. - log.logSegments.head.changeFileSuffixes("", Log.SwapFileSuffix) - log = recoverAndCheck(config, cleanedKeys) - - // add some more messages and clean the log again - while(log.numberOfSegments < 10) { - log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt)) - messageCount += 1 - } - for (k <- 1 until messageCount by 2) - offsetMap.put(key(k), Long.MaxValue) - cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L) - cleanedKeys = keysInLog(log) - - // 4) Simulate recovery after swap is complete, but async deletion - // is not yet complete. Clean operation is resumed during recovery. - recoverAndCheck(config, cleanedKeys) - - } - - @Test - def testBuildOffsetMapFakeLarge(): Unit = { - val map = new FakeOffsetMap(1000) - val logProps = new Properties() - logProps.put(LogConfig.SegmentBytesProp, 72: java.lang.Integer) - logProps.put(LogConfig.SegmentIndexBytesProp, 72: java.lang.Integer) - logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact) - val logConfig = LogConfig(logProps) - val log = makeLog(config = logConfig) - val cleaner = makeCleaner(Int.MaxValue) - val start = 0 - val end = 2 - val offsetSeq = Seq(0L, 7206178L) - val offsets = writeToLog(log, (start until end) zip (start until end), offsetSeq) - cleaner.buildOffsetMap(log, start, end, map) - val endOffset = map.latestOffset - assertEquals("Last offset should be the end offset.", 7206178L, endOffset) - assertEquals("Should have the expected number of messages in the map.", end - start, map.size) - assertEquals("Map should contain first value", 0L, map.get(key(0))) - assertEquals("Map should contain second value", 7206178L, map.get(key(1))) - } - - /** - * Test building a partial offset map of part of a log segment - */ - @Test - def testBuildPartialOffsetMap(): Unit = { - // because loadFactor is 0.75, this means we can fit 2 messages in the map - val map = new FakeOffsetMap(3) - val log = makeLog() - val cleaner = makeCleaner(2) - - log.append(message(0,0)) - log.append(message(1,1)) - log.append(message(2,2)) - log.append(message(3,3)) - log.append(message(4,4)) - log.roll() - - cleaner.buildOffsetMap(log, 2, Int.MaxValue, map) - assertEquals(2, map.size) - assertEquals(-1, map.get(key(0))) - assertEquals(2, map.get(key(2))) - assertEquals(3, map.get(key(3))) - assertEquals(-1, map.get(key(4))) - } - - /** - * This test verifies that messages corrupted by KAFKA-4298 are fixed by the cleaner - */ - @Test - def testCleanCorruptMessageSet() { - val codec = SnappyCompressionCodec - - val logProps = new Properties() - logProps.put(LogConfig.CompressionTypeProp, codec.name) - val logConfig = LogConfig(logProps) - - val log = makeLog(config = logConfig) - val cleaner = makeCleaner(10) - - // messages are constructed so that the payload matches the expecting offset to - // make offset validation easier after cleaning - - // one compressed log entry with duplicates - val dupSetKeys = (0 until 2) ++ (0 until 2) - val dupSetOffset = 25 - val dupSet = dupSetKeys zip (dupSetOffset until dupSetOffset + dupSetKeys.size) - - // and one without (should still be fixed by the cleaner) - val noDupSetKeys = 3 until 5 - val noDupSetOffset = 50 - val noDupSet = noDupSetKeys zip (noDupSetOffset until noDupSetOffset + noDupSetKeys.size) - - log.append(invalidCleanedMessage(dupSetOffset, dupSet, codec), assignOffsets = false) - log.append(invalidCleanedMessage(noDupSetOffset, noDupSet, codec), assignOffsets = false) - - log.roll() - - cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 0, log.activeSegment.baseOffset)) - - for (segment <- log.logSegments; shallowMessage <- segment.log.iterator; deepMessage <- ByteBufferMessageSet.deepIterator(shallowMessage)) { - assertEquals(shallowMessage.message.magic, deepMessage.message.magic) - val value = TestUtils.readString(deepMessage.message.payload).toLong - assertEquals(deepMessage.offset, value) - } - } - - /** - * Verify that the client can handle corrupted messages. Located here for now since the client - * does not support writing messages with the old magic. - */ - @Test - def testClientHandlingOfCorruptMessageSet(): Unit = { - import JavaConverters._ - - val keys = 1 until 10 - val offset = 50 - val set = keys zip (offset until offset + keys.size) - - val corruptedMessage = invalidCleanedMessage(offset, set) - val records = MemoryRecords.readableRecords(corruptedMessage.buffer) - - for (logEntry <- records.iterator.asScala) { - val offset = logEntry.offset - val value = TestUtils.readString(logEntry.record.value).toLong - assertEquals(offset, value) - } - } - - private def writeToLog(log: Log, keysAndValues: Iterable[(Int, Int)], offsetSeq: Iterable[Long]): Iterable[Long] = { - for(((key, value), offset) <- keysAndValues.zip(offsetSeq)) - yield log.append(messageWithOffset(key, value, offset), assignOffsets = false).firstOffset - } - - private def invalidCleanedMessage(initialOffset: Long, - keysAndValues: Iterable[(Int, Int)], - codec: CompressionCodec = SnappyCompressionCodec): ByteBufferMessageSet = { - // this function replicates the old versions of the cleaner which under some circumstances - // would write invalid compressed message sets with the outer magic set to 1 and the inner - // magic set to 0 - - val messages = keysAndValues.map(kv => - new Message(key = kv._1.toString.getBytes, - bytes = kv._2.toString.getBytes, - timestamp = Message.NoTimestamp, - magicValue = Message.MagicValue_V0)) - - val messageWriter = new MessageWriter(math.min(math.max(MessageSet.messageSetSize(messages) / 2, 1024), 1 << 16)) - var lastOffset = initialOffset - - messageWriter.write( - codec = codec, - timestamp = Message.NoTimestamp, - timestampType = TimestampType.CREATE_TIME, - magicValue = Message.MagicValue_V1) { outputStream => - - val output = new DataOutputStream(CompressionFactory(codec, Message.MagicValue_V1, outputStream)) - try { - for (message <- messages) { - val innerOffset = lastOffset - initialOffset - output.writeLong(innerOffset) - output.writeInt(message.size) - output.write(message.buffer.array, message.buffer.arrayOffset, message.buffer.limit) - lastOffset += 1 - } - } finally { - output.close() - } - } - val buffer = ByteBuffer.allocate(messageWriter.size + MessageSet.LogOverhead) - ByteBufferMessageSet.writeMessage(buffer, messageWriter, lastOffset - 1) - buffer.rewind() - - new ByteBufferMessageSet(buffer) - } - - private def messageWithOffset(key: Int, value: Int, offset: Long) = - new ByteBufferMessageSet(NoCompressionCodec, Seq(offset), - new Message(key = key.toString.getBytes, - bytes = value.toString.getBytes, - timestamp = Message.NoTimestamp, - magicValue = Message.MagicValue_V1)) - - - def makeLog(dir: File = dir, config: LogConfig = logConfig) = - new Log(dir = dir, config = config, recoveryPoint = 0L, scheduler = time.scheduler, time = time) - - def noOpCheckDone(topicAndPartition: TopicAndPartition) { /* do nothing */ } - - def makeCleaner(capacity: Int, checkDone: (TopicAndPartition) => Unit = noOpCheckDone, maxMessageSize: Int = 64*1024) = - new Cleaner(id = 0, - offsetMap = new FakeOffsetMap(capacity), - ioBufferSize = maxMessageSize, - maxIoBufferSize = maxMessageSize, - dupBufferLoadFactor = 0.75, - throttler = throttler, - time = time, - checkDone = checkDone ) - - def writeToLog(log: Log, seq: Iterable[(Int, Int)]): Iterable[Long] = { - for((key, value) <- seq) - yield log.append(message(key, value)).firstOffset - } - - def key(id: Int) = ByteBuffer.wrap(id.toString.getBytes) - - def message(key: Int, value: Int): ByteBufferMessageSet = - message(key, value.toString.getBytes) - - def message(key: Int, value: Array[Byte]) = - new ByteBufferMessageSet(new Message(key = key.toString.getBytes, - bytes = value, - timestamp = Message.NoTimestamp, - magicValue = Message.MagicValue_V1)) - - def unkeyedMessage(value: Int) = - new ByteBufferMessageSet(new Message(bytes = value.toString.getBytes)) - - def deleteMessage(key: Int) = - new ByteBufferMessageSet(new Message(key = key.toString.getBytes, - bytes = null, - timestamp = Message.NoTimestamp, - magicValue = Message.MagicValue_V1)) - -} - -class FakeOffsetMap(val slots: Int) extends OffsetMap { - val map = new java.util.HashMap[String, Long]() - var lastOffset = -1L - - private def keyFor(key: ByteBuffer) = - new String(Utils.readBytes(key.duplicate), "UTF-8") - - def put(key: ByteBuffer, offset: Long): Unit = { - lastOffset = offset - map.put(keyFor(key), offset) - } - - def get(key: ByteBuffer): Long = { - val k = keyFor(key) - if(map.containsKey(k)) - map.get(k) - else - -1L - } - - def clear(): Unit = map.clear() - - def size: Int = map.size - - def latestOffset: Long = lastOffset - - override def toString: String = map.toString() -} http://git-wip-us.apache.org/repos/asf/kafka/blob/302a246c/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala new file mode 100755 index 0000000..24a6366 --- /dev/null +++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala @@ -0,0 +1,839 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log + +import java.io.{DataOutputStream, File} +import java.nio._ +import java.nio.file.Paths +import java.util.Properties + +import kafka.common._ +import kafka.message._ +import kafka.utils._ +import org.apache.kafka.common.record.{MemoryRecords, TimestampType} +import org.apache.kafka.common.utils.Utils +import org.junit.Assert._ +import org.junit.{After, Test} +import org.scalatest.junit.JUnitSuite + +import scala.collection._ + +/** + * Unit tests for the log cleaning logic + */ +class LogCleanerTest extends JUnitSuite { + + val tmpdir = TestUtils.tempDir() + val dir = TestUtils.randomPartitionLogDir(tmpdir) + val logProps = new Properties() + logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer) + logProps.put(LogConfig.SegmentIndexBytesProp, 1024: java.lang.Integer) + logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact) + val logConfig = LogConfig(logProps) + val time = new MockTime() + val throttler = new Throttler(desiredRatePerSec = Double.MaxValue, checkIntervalMs = Long.MaxValue, time = time) + + @After + def teardown(): Unit = { + Utils.delete(tmpdir) + } + + /** + * Test simple log cleaning + */ + @Test + def testCleanSegments(): Unit = { + val cleaner = makeCleaner(Int.MaxValue) + val logProps = new Properties() + logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer) + + val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) + + // append messages to the log until we have four segments + while(log.numberOfSegments < 4) + log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt)) + val keysFound = keysInLog(log) + assertEquals(0L until log.logEndOffset, keysFound) + + // pretend we have the following keys + val keys = immutable.ListSet(1, 3, 5, 7, 9) + val map = new FakeOffsetMap(Int.MaxValue) + keys.foreach(k => map.put(key(k), Long.MaxValue)) + + // clean the log + val segments = log.logSegments.take(3).toSeq + val stats = new CleanerStats() + val expectedBytesRead = segments.map(_.size).sum + cleaner.cleanSegments(log, segments, map, 0L, stats) + val shouldRemain = keysInLog(log).filter(!keys.contains(_)) + assertEquals(shouldRemain, keysInLog(log)) + assertEquals(expectedBytesRead, stats.bytesRead) + } + + /** + * Test log cleaning with logs containing messages larger than default message size + */ + @Test + def testLargeMessage() { + val largeMessageSize = 1024 * 1024 + // Create cleaner with very small default max message size + val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024) + val logProps = new Properties() + logProps.put(LogConfig.SegmentBytesProp, largeMessageSize * 16: java.lang.Integer) + logProps.put(LogConfig.MaxMessageBytesProp, largeMessageSize * 2: java.lang.Integer) + + val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) + + while(log.numberOfSegments < 2) + log.append(message(log.logEndOffset.toInt, Array.fill(largeMessageSize)(0: Byte))) + val keysFound = keysInLog(log) + assertEquals(0L until log.logEndOffset, keysFound) + + // pretend we have the following keys + val keys = immutable.ListSet(1, 3, 5, 7, 9) + val map = new FakeOffsetMap(Int.MaxValue) + keys.foreach(k => map.put(key(k), Long.MaxValue)) + + // clean the log + val stats = new CleanerStats() + cleaner.cleanSegments(log, Seq(log.logSegments.head), map, 0L, stats) + val shouldRemain = keysInLog(log).filter(!keys.contains(_)) + assertEquals(shouldRemain, keysInLog(log)) + } + + @Test + def testCleaningWithDeletes(): Unit = { + val cleaner = makeCleaner(Int.MaxValue) + val logProps = new Properties() + logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer) + + val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) + + // append messages with the keys 0 through N + while(log.numberOfSegments < 2) + log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt)) + + // delete all even keys between 0 and N + val leo = log.logEndOffset + for(key <- 0 until leo.toInt by 2) + log.append(deleteMessage(key)) + + // append some new unique keys to pad out to a new active segment + while(log.numberOfSegments < 4) + log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt)) + + cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 0, log.activeSegment.baseOffset)) + val keys = keysInLog(log).toSet + assertTrue("None of the keys we deleted should still exist.", + (0 until leo.toInt by 2).forall(!keys.contains(_))) + } + + def testLogCleanerStats(): Unit = { + // because loadFactor is 0.75, this means we can fit 2 messages in the map + val cleaner = makeCleaner(2) + val logProps = new Properties() + logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer) + + val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) + + log.append(message(0,0)) // offset 0 + log.append(message(1,1)) // offset 1 + log.append(message(0,0)) // offset 2 + log.append(message(1,1)) // offset 3 + log.append(message(0,0)) // offset 4 + // roll the segment, so we can clean the messages already appended + log.roll() + + val initialLogSize = log.size + + val (endOffset, stats) = cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 2, log.activeSegment.baseOffset)) + assertEquals(5, endOffset) + assertEquals(5, stats.messagesRead) + assertEquals(initialLogSize, stats.bytesRead) + assertEquals(2, stats.messagesWritten) + assertEquals(log.size, stats.bytesWritten) + assertEquals(0, stats.invalidMessagesRead) + assertTrue(stats.endTime >= stats.startTime) + } + + @Test + def testPartialSegmentClean(): Unit = { + // because loadFactor is 0.75, this means we can fit 2 messages in the map + val cleaner = makeCleaner(2) + val logProps = new Properties() + logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer) + + val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) + + log.append(message(0,0)) // offset 0 + log.append(message(1,1)) // offset 1 + log.append(message(0,0)) // offset 2 + log.append(message(1,1)) // offset 3 + log.append(message(0,0)) // offset 4 + // roll the segment, so we can clean the messages already appended + log.roll() + + // clean the log with only one message removed + cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 2, log.activeSegment.baseOffset)) + assertEquals(immutable.List(1,0,1,0), keysInLog(log)) + assertEquals(immutable.List(1,2,3,4), offsetsInLog(log)) + + // continue to make progress, even though we can only clean one message at a time + cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 3, log.activeSegment.baseOffset)) + assertEquals(immutable.List(0,1,0), keysInLog(log)) + assertEquals(immutable.List(2,3,4), offsetsInLog(log)) + + cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 4, log.activeSegment.baseOffset)) + assertEquals(immutable.List(1,0), keysInLog(log)) + assertEquals(immutable.List(3,4), offsetsInLog(log)) + } + + @Test + def testCleaningWithUncleanableSection(): Unit = { + val cleaner = makeCleaner(Int.MaxValue) + val logProps = new Properties() + logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer) + + val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) + + // Number of distinct keys. For an effective test this should be small enough such that each log segment contains some duplicates. + val N = 10 + val numCleanableSegments = 2 + val numTotalSegments = 7 + + // append messages with the keys 0 through N-1, values equal offset + while(log.numberOfSegments <= numCleanableSegments) + log.append(message(log.logEndOffset.toInt % N, log.logEndOffset.toInt)) + + // at this point one message past the cleanable segments has been added + // the entire segment containing the first uncleanable offset should not be cleaned. + val firstUncleanableOffset = log.logEndOffset + 1 // +1 so it is past the baseOffset + + while(log.numberOfSegments < numTotalSegments - 1) + log.append(message(log.logEndOffset.toInt % N, log.logEndOffset.toInt)) + + // the last (active) segment has just one message + + def distinctValuesBySegment = log.logSegments.map(s => s.log.map(m => TestUtils.readString(m.message.payload)).toSet.size).toSeq + + val disctinctValuesBySegmentBeforeClean = distinctValuesBySegment + assertTrue("Test is not effective unless each segment contains duplicates. Increase segment size or decrease number of keys.", + distinctValuesBySegment.reverse.tail.forall(_ > N)) + + cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 0, firstUncleanableOffset)) + + val distinctValuesBySegmentAfterClean = distinctValuesBySegment + + assertTrue("The cleanable segments should have fewer number of values after cleaning", + disctinctValuesBySegmentBeforeClean.zip(distinctValuesBySegmentAfterClean).take(numCleanableSegments).forall { case (before, after) => after < before }) + assertTrue("The uncleanable segments should have the same number of values after cleaning", disctinctValuesBySegmentBeforeClean.zip(distinctValuesBySegmentAfterClean) + .slice(numCleanableSegments, numTotalSegments).forall { x => x._1 == x._2 }) + } + + @Test + def testLogToClean(): Unit = { + // create a log with small segment size + val logProps = new Properties() + logProps.put(LogConfig.SegmentBytesProp, 100: java.lang.Integer) + val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) + + // create 6 segments with only one message in each segment + val messageSet = TestUtils.singleMessageSet(payload = Array.fill[Byte](50)(0), key = 1.toString.getBytes) + for (i <- 0 until 6) + log.append(messageSet, assignOffsets = true) + + val logToClean = LogToClean(TopicAndPartition("test", 0), log, log.activeSegment.baseOffset, log.activeSegment.baseOffset) + + assertEquals("Total bytes of LogToClean should equal size of all segments excluding the active segment", + logToClean.totalBytes, log.size - log.activeSegment.size) + } + + @Test + def testLogToCleanWithUncleanableSection(): Unit = { + // create a log with small segment size + val logProps = new Properties() + logProps.put(LogConfig.SegmentBytesProp, 100: java.lang.Integer) + val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) + + // create 6 segments with only one message in each segment + val messageSet = TestUtils.singleMessageSet(payload = Array.fill[Byte](50)(0), key = 1.toString.getBytes) + for (i <- 0 until 6) + log.append(messageSet, assignOffsets = true) + + // segments [0,1] are clean; segments [2, 3] are cleanable; segments [4,5] are uncleanable + val segs = log.logSegments.toSeq + val logToClean = LogToClean(TopicAndPartition("test", 0), log, segs(2).baseOffset, segs(4).baseOffset) + + val expectedCleanSize = segs.take(2).map(_.size).sum + val expectedCleanableSize = segs.slice(2, 4).map(_.size).sum + val expectedUncleanableSize = segs.drop(4).map(_.size).sum + + assertEquals("Uncleanable bytes of LogToClean should equal size of all segments prior the one containing first dirty", + logToClean.cleanBytes, expectedCleanSize) + assertEquals("Cleanable bytes of LogToClean should equal size of all segments from the one containing first dirty offset" + + " to the segment prior to the one with the first uncleanable offset", + logToClean.cleanableBytes, expectedCleanableSize) + assertEquals("Total bytes should be the sum of the clean and cleanable segments", logToClean.totalBytes, expectedCleanSize + expectedCleanableSize) + assertEquals("Total cleanable ratio should be the ratio of cleanable size to clean plus cleanable", logToClean.cleanableRatio, + expectedCleanableSize / (expectedCleanSize + expectedCleanableSize).toDouble, 1.0e-6d) + } + + @Test + def testCleaningWithUnkeyedMessages(): Unit = { + val cleaner = makeCleaner(Int.MaxValue) + + // create a log with compaction turned off so we can append unkeyed messages + val logProps = new Properties() + logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer) + logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Delete) + + val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) + + // append unkeyed messages + while(log.numberOfSegments < 2) + log.append(unkeyedMessage(log.logEndOffset.toInt)) + val numInvalidMessages = unkeyedMessageCountInLog(log) + + val sizeWithUnkeyedMessages = log.size + + // append keyed messages + while(log.numberOfSegments < 3) + log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt)) + + val expectedSizeAfterCleaning = log.size - sizeWithUnkeyedMessages + val (_, stats) = cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 0, log.activeSegment.baseOffset)) + + assertEquals("Log should only contain keyed messages after cleaning.", 0, unkeyedMessageCountInLog(log)) + assertEquals("Log should only contain keyed messages after cleaning.", expectedSizeAfterCleaning, log.size) + assertEquals("Cleaner should have seen %d invalid messages.", numInvalidMessages, stats.invalidMessagesRead) + } + + /* extract all the keys from a log */ + def keysInLog(log: Log): Iterable[Int] = + log.logSegments.flatMap(s => s.log.filter(!_.message.isNull).filter(_.message.hasKey).map(m => TestUtils.readString(m.message.key).toInt)) + + /* extract all the offsets from a log */ + def offsetsInLog(log: Log): Iterable[Long] = + log.logSegments.flatMap(s => s.log.filter(!_.message.isNull).filter(_.message.hasKey).map(m => m.offset)) + + def unkeyedMessageCountInLog(log: Log) = + log.logSegments.map(s => s.log.filter(!_.message.isNull).count(m => !m.message.hasKey)).sum + + def abortCheckDone(topicAndPartition: TopicAndPartition): Unit = { + throw new LogCleaningAbortedException() + } + + /** + * Test that abortion during cleaning throws a LogCleaningAbortedException + */ + @Test + def testCleanSegmentsWithAbort(): Unit = { + val cleaner = makeCleaner(Int.MaxValue, abortCheckDone) + val logProps = new Properties() + logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer) + + val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) + + // append messages to the log until we have four segments + while(log.numberOfSegments < 4) + log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt)) + + val keys = keysInLog(log) + val map = new FakeOffsetMap(Int.MaxValue) + keys.foreach(k => map.put(key(k), Long.MaxValue)) + intercept[LogCleaningAbortedException] { + cleaner.cleanSegments(log, log.logSegments.take(3).toSeq, map, 0L, new CleanerStats()) + } + } + + /** + * Validate the logic for grouping log segments together for cleaning + */ + @Test + def testSegmentGrouping(): Unit = { + val cleaner = makeCleaner(Int.MaxValue) + val logProps = new Properties() + logProps.put(LogConfig.SegmentBytesProp, 300: java.lang.Integer) + logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer) + + val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) + + // append some messages to the log + var i = 0 + while(log.numberOfSegments < 10) { + log.append(TestUtils.singleMessageSet(payload = "hello".getBytes, key = "hello".getBytes)) + i += 1 + } + + // grouping by very large values should result in a single group with all the segments in it + var groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue) + assertEquals(1, groups.size) + assertEquals(log.numberOfSegments, groups.head.size) + checkSegmentOrder(groups) + + // grouping by very small values should result in all groups having one entry + groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = 1, maxIndexSize = Int.MaxValue) + assertEquals(log.numberOfSegments, groups.size) + assertTrue("All groups should be singletons.", groups.forall(_.size == 1)) + checkSegmentOrder(groups) + groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = 1) + assertEquals(log.numberOfSegments, groups.size) + assertTrue("All groups should be singletons.", groups.forall(_.size == 1)) + checkSegmentOrder(groups) + + val groupSize = 3 + + // check grouping by log size + val logSize = log.logSegments.take(groupSize).map(_.size).sum.toInt + 1 + groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = logSize, maxIndexSize = Int.MaxValue) + checkSegmentOrder(groups) + assertTrue("All but the last group should be the target size.", groups.dropRight(1).forall(_.size == groupSize)) + + // check grouping by index size + val indexSize = log.logSegments.take(groupSize).map(_.index.sizeInBytes).sum + 1 + groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = indexSize) + checkSegmentOrder(groups) + assertTrue("All but the last group should be the target size.", groups.dropRight(1).forall(_.size == groupSize)) + } + + /** + * Validate the logic for grouping log segments together for cleaning when only a small number of + * messages are retained, but the range of offsets is greater than Int.MaxValue. A group should not + * contain a range of offsets greater than Int.MaxValue to ensure that relative offsets can be + * stored in 4 bytes. + */ + @Test + def testSegmentGroupingWithSparseOffsets(): Unit = { + val cleaner = makeCleaner(Int.MaxValue) + + val logProps = new Properties() + logProps.put(LogConfig.SegmentBytesProp, 300: java.lang.Integer) + logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer) + + val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) + + // fill up first segment + while (log.numberOfSegments == 1) + log.append(TestUtils.singleMessageSet(payload = "hello".getBytes, key = "hello".getBytes)) + + // forward offset and append message to next segment at offset Int.MaxValue + val messageSet = new ByteBufferMessageSet(NoCompressionCodec, new LongRef(Int.MaxValue - 1), + new Message("hello".getBytes, "hello".getBytes, Message.NoTimestamp, Message.MagicValue_V1)) + log.append(messageSet, assignOffsets = false) + log.append(TestUtils.singleMessageSet(payload = "hello".getBytes, key = "hello".getBytes)) + assertEquals(Int.MaxValue, log.activeSegment.index.lastOffset) + + // grouping should result in a single group with maximum relative offset of Int.MaxValue + var groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue) + assertEquals(1, groups.size) + + // append another message, making last offset of second segment > Int.MaxValue + log.append(TestUtils.singleMessageSet(payload = "hello".getBytes, key = "hello".getBytes)) + + // grouping should not group the two segments to ensure that maximum relative offset in each group <= Int.MaxValue + groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue) + assertEquals(2, groups.size) + checkSegmentOrder(groups) + + // append more messages, creating new segments, further grouping should still occur + while (log.numberOfSegments < 4) + log.append(TestUtils.singleMessageSet(payload = "hello".getBytes, key = "hello".getBytes)) + + groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue) + assertEquals(log.numberOfSegments - 1, groups.size) + for (group <- groups) + assertTrue("Relative offset greater than Int.MaxValue", group.last.index.lastOffset - group.head.index.baseOffset <= Int.MaxValue) + checkSegmentOrder(groups) + + } + + private def checkSegmentOrder(groups: Seq[Seq[LogSegment]]): Unit = { + val offsets = groups.flatMap(_.map(_.baseOffset)) + assertEquals("Offsets should be in increasing order.", offsets.sorted, offsets) + } + + /** + * Test building an offset map off the log + */ + @Test + def testBuildOffsetMap(): Unit = { + val map = new FakeOffsetMap(1000) + val log = makeLog() + val cleaner = makeCleaner(Int.MaxValue) + val start = 0 + val end = 500 + writeToLog(log, (start until end) zip (start until end)) + + def checkRange(map: FakeOffsetMap, start: Int, end: Int) { + val stats = new CleanerStats() + cleaner.buildOffsetMap(log, start, end, map, stats) + val endOffset = map.latestOffset + 1 + assertEquals("Last offset should be the end offset.", end, endOffset) + assertEquals("Should have the expected number of messages in the map.", end-start, map.size) + for(i <- start until end) + assertEquals("Should find all the keys", i.toLong, map.get(key(i))) + assertEquals("Should not find a value too small", -1L, map.get(key(start - 1))) + assertEquals("Should not find a value too large", -1L, map.get(key(end))) + assertEquals(end - start, stats.mapMessagesRead) + } + + val segments = log.logSegments.toSeq + checkRange(map, 0, segments(1).baseOffset.toInt) + checkRange(map, segments(1).baseOffset.toInt, segments(3).baseOffset.toInt) + checkRange(map, segments(3).baseOffset.toInt, log.logEndOffset.toInt) + } + + + /** + * Tests recovery if broker crashes at the following stages during the cleaning sequence + * <ol> + * <li> Cleaner has created .cleaned log containing multiple segments, swap sequence not yet started + * <li> .cleaned log renamed to .swap, old segment files not yet renamed to .deleted + * <li> .cleaned log renamed to .swap, old segment files renamed to .deleted, but not yet deleted + * <li> .swap suffix removed, completing the swap, but async delete of .deleted files not yet complete + * </ol> + */ + @Test + def testRecoveryAfterCrash(): Unit = { + val cleaner = makeCleaner(Int.MaxValue) + val logProps = new Properties() + logProps.put(LogConfig.SegmentBytesProp, 300: java.lang.Integer) + logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer) + logProps.put(LogConfig.FileDeleteDelayMsProp, 10: java.lang.Integer) + + val config = LogConfig.fromProps(logConfig.originals, logProps) + + def recoverAndCheck(config: LogConfig, expectedKeys : Iterable[Int]) : Log = { + // Recover log file and check that after recovery, keys are as expected + // and all temporary files have been deleted + val recoveredLog = makeLog(config = config) + time.sleep(config.fileDeleteDelayMs + 1) + for (file <- dir.listFiles) { + assertFalse("Unexpected .deleted file after recovery", file.getName.endsWith(Log.DeletedFileSuffix)) + assertFalse("Unexpected .cleaned file after recovery", file.getName.endsWith(Log.CleanedFileSuffix)) + assertFalse("Unexpected .swap file after recovery", file.getName.endsWith(Log.SwapFileSuffix)) + } + assertEquals(expectedKeys, keysInLog(recoveredLog)) + recoveredLog + } + + // create a log and append some messages + var log = makeLog(config = config) + var messageCount = 0 + while(log.numberOfSegments < 10) { + log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt)) + messageCount += 1 + } + val allKeys = keysInLog(log) + + // pretend we have odd-numbered keys + val offsetMap = new FakeOffsetMap(Int.MaxValue) + for (k <- 1 until messageCount by 2) + offsetMap.put(key(k), Long.MaxValue) + + // clean the log + cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats()) + var cleanedKeys = keysInLog(log) + + // 1) Simulate recovery just after .cleaned file is created, before rename to .swap + // On recovery, clean operation is aborted. All messages should be present in the log + log.logSegments.head.changeFileSuffixes("", Log.CleanedFileSuffix) + for (file <- dir.listFiles if file.getName.endsWith(Log.DeletedFileSuffix)) { + Utils.atomicMoveWithFallback(file.toPath, Paths.get(CoreUtils.replaceSuffix(file.getPath, Log.DeletedFileSuffix, ""))) + } + log = recoverAndCheck(config, allKeys) + + // clean again + cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats()) + cleanedKeys = keysInLog(log) + + // 2) Simulate recovery just after swap file is created, before old segment files are + // renamed to .deleted. Clean operation is resumed during recovery. + log.logSegments.head.changeFileSuffixes("", Log.SwapFileSuffix) + for (file <- dir.listFiles if file.getName.endsWith(Log.DeletedFileSuffix)) { + Utils.atomicMoveWithFallback(file.toPath, Paths.get(CoreUtils.replaceSuffix(file.getPath, Log.DeletedFileSuffix, ""))) + } + log = recoverAndCheck(config, cleanedKeys) + + // add some more messages and clean the log again + while(log.numberOfSegments < 10) { + log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt)) + messageCount += 1 + } + for (k <- 1 until messageCount by 2) + offsetMap.put(key(k), Long.MaxValue) + cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats()) + cleanedKeys = keysInLog(log) + + // 3) Simulate recovery after swap file is created and old segments files are renamed + // to .deleted. Clean operation is resumed during recovery. + log.logSegments.head.changeFileSuffixes("", Log.SwapFileSuffix) + log = recoverAndCheck(config, cleanedKeys) + + // add some more messages and clean the log again + while(log.numberOfSegments < 10) { + log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt)) + messageCount += 1 + } + for (k <- 1 until messageCount by 2) + offsetMap.put(key(k), Long.MaxValue) + cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats()) + cleanedKeys = keysInLog(log) + + // 4) Simulate recovery after swap is complete, but async deletion + // is not yet complete. Clean operation is resumed during recovery. + recoverAndCheck(config, cleanedKeys) + } + + @Test + def testBuildOffsetMapFakeLarge(): Unit = { + val map = new FakeOffsetMap(1000) + val logProps = new Properties() + logProps.put(LogConfig.SegmentBytesProp, 72: java.lang.Integer) + logProps.put(LogConfig.SegmentIndexBytesProp, 72: java.lang.Integer) + logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact) + val logConfig = LogConfig(logProps) + val log = makeLog(config = logConfig) + val cleaner = makeCleaner(Int.MaxValue) + val start = 0 + val end = 2 + val offsetSeq = Seq(0L, 7206178L) + writeToLog(log, (start until end) zip (start until end), offsetSeq) + cleaner.buildOffsetMap(log, start, end, map, new CleanerStats()) + val endOffset = map.latestOffset + assertEquals("Last offset should be the end offset.", 7206178L, endOffset) + assertEquals("Should have the expected number of messages in the map.", end - start, map.size) + assertEquals("Map should contain first value", 0L, map.get(key(0))) + assertEquals("Map should contain second value", 7206178L, map.get(key(1))) + } + + /** + * Test building a partial offset map of part of a log segment + */ + @Test + def testBuildPartialOffsetMap(): Unit = { + // because loadFactor is 0.75, this means we can fit 2 messages in the map + val map = new FakeOffsetMap(3) + val log = makeLog() + val cleaner = makeCleaner(2) + + log.append(message(0,0)) + log.append(message(1,1)) + log.append(message(2,2)) + log.append(message(3,3)) + log.append(message(4,4)) + log.roll() + + val stats = new CleanerStats() + cleaner.buildOffsetMap(log, 2, Int.MaxValue, map, stats) + assertEquals(2, map.size) + assertEquals(-1, map.get(key(0))) + assertEquals(2, map.get(key(2))) + assertEquals(3, map.get(key(3))) + assertEquals(-1, map.get(key(4))) + assertEquals(4, stats.mapMessagesRead) + } + + /** + * This test verifies that messages corrupted by KAFKA-4298 are fixed by the cleaner + */ + @Test + def testCleanCorruptMessageSet() { + val codec = SnappyCompressionCodec + + val logProps = new Properties() + logProps.put(LogConfig.CompressionTypeProp, codec.name) + val logConfig = LogConfig(logProps) + + val log = makeLog(config = logConfig) + val cleaner = makeCleaner(10) + + // messages are constructed so that the payload matches the expecting offset to + // make offset validation easier after cleaning + + // one compressed log entry with duplicates + val dupSetKeys = (0 until 2) ++ (0 until 2) + val dupSetOffset = 25 + val dupSet = dupSetKeys zip (dupSetOffset until dupSetOffset + dupSetKeys.size) + + // and one without (should still be fixed by the cleaner) + val noDupSetKeys = 3 until 5 + val noDupSetOffset = 50 + val noDupSet = noDupSetKeys zip (noDupSetOffset until noDupSetOffset + noDupSetKeys.size) + + log.append(invalidCleanedMessage(dupSetOffset, dupSet, codec), assignOffsets = false) + log.append(invalidCleanedMessage(noDupSetOffset, noDupSet, codec), assignOffsets = false) + + log.roll() + + cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 0, log.activeSegment.baseOffset)) + + for (segment <- log.logSegments; shallowMessage <- segment.log.iterator; deepMessage <- ByteBufferMessageSet.deepIterator(shallowMessage)) { + assertEquals(shallowMessage.message.magic, deepMessage.message.magic) + val value = TestUtils.readString(deepMessage.message.payload).toLong + assertEquals(deepMessage.offset, value) + } + } + + /** + * Verify that the client can handle corrupted messages. Located here for now since the client + * does not support writing messages with the old magic. + */ + @Test + def testClientHandlingOfCorruptMessageSet(): Unit = { + import JavaConverters._ + + val keys = 1 until 10 + val offset = 50 + val set = keys zip (offset until offset + keys.size) + + val corruptedMessage = invalidCleanedMessage(offset, set) + val records = MemoryRecords.readableRecords(corruptedMessage.buffer) + + for (logEntry <- records.iterator.asScala) { + val offset = logEntry.offset + val value = TestUtils.readString(logEntry.record.value).toLong + assertEquals(offset, value) + } + } + + private def writeToLog(log: Log, keysAndValues: Iterable[(Int, Int)], offsetSeq: Iterable[Long]): Iterable[Long] = { + for(((key, value), offset) <- keysAndValues.zip(offsetSeq)) + yield log.append(messageWithOffset(key, value, offset), assignOffsets = false).firstOffset + } + + private def invalidCleanedMessage(initialOffset: Long, + keysAndValues: Iterable[(Int, Int)], + codec: CompressionCodec = SnappyCompressionCodec): ByteBufferMessageSet = { + // this function replicates the old versions of the cleaner which under some circumstances + // would write invalid compressed message sets with the outer magic set to 1 and the inner + // magic set to 0 + + val messages = keysAndValues.map(kv => + new Message(key = kv._1.toString.getBytes, + bytes = kv._2.toString.getBytes, + timestamp = Message.NoTimestamp, + magicValue = Message.MagicValue_V0)) + + val messageWriter = new MessageWriter(math.min(math.max(MessageSet.messageSetSize(messages) / 2, 1024), 1 << 16)) + var lastOffset = initialOffset + + messageWriter.write( + codec = codec, + timestamp = Message.NoTimestamp, + timestampType = TimestampType.CREATE_TIME, + magicValue = Message.MagicValue_V1) { outputStream => + + val output = new DataOutputStream(CompressionFactory(codec, Message.MagicValue_V1, outputStream)) + try { + for (message <- messages) { + val innerOffset = lastOffset - initialOffset + output.writeLong(innerOffset) + output.writeInt(message.size) + output.write(message.buffer.array, message.buffer.arrayOffset, message.buffer.limit) + lastOffset += 1 + } + } finally { + output.close() + } + } + val buffer = ByteBuffer.allocate(messageWriter.size + MessageSet.LogOverhead) + ByteBufferMessageSet.writeMessage(buffer, messageWriter, lastOffset - 1) + buffer.rewind() + + new ByteBufferMessageSet(buffer) + } + + private def messageWithOffset(key: Int, value: Int, offset: Long) = + new ByteBufferMessageSet(NoCompressionCodec, Seq(offset), + new Message(key = key.toString.getBytes, + bytes = value.toString.getBytes, + timestamp = Message.NoTimestamp, + magicValue = Message.MagicValue_V1)) + + + def makeLog(dir: File = dir, config: LogConfig = logConfig) = + new Log(dir = dir, config = config, recoveryPoint = 0L, scheduler = time.scheduler, time = time) + + def noOpCheckDone(topicAndPartition: TopicAndPartition) { /* do nothing */ } + + def makeCleaner(capacity: Int, checkDone: (TopicAndPartition) => Unit = noOpCheckDone, maxMessageSize: Int = 64*1024) = + new Cleaner(id = 0, + offsetMap = new FakeOffsetMap(capacity), + ioBufferSize = maxMessageSize, + maxIoBufferSize = maxMessageSize, + dupBufferLoadFactor = 0.75, + throttler = throttler, + time = time, + checkDone = checkDone ) + + def writeToLog(log: Log, seq: Iterable[(Int, Int)]): Iterable[Long] = { + for((key, value) <- seq) + yield log.append(message(key, value)).firstOffset + } + + def key(id: Int) = ByteBuffer.wrap(id.toString.getBytes) + + def message(key: Int, value: Int): ByteBufferMessageSet = + message(key, value.toString.getBytes) + + def message(key: Int, value: Array[Byte]) = + new ByteBufferMessageSet(new Message(key = key.toString.getBytes, + bytes = value, + timestamp = Message.NoTimestamp, + magicValue = Message.MagicValue_V1)) + + def unkeyedMessage(value: Int) = + new ByteBufferMessageSet(new Message(bytes = value.toString.getBytes)) + + def deleteMessage(key: Int) = + new ByteBufferMessageSet(new Message(key = key.toString.getBytes, + bytes = null, + timestamp = Message.NoTimestamp, + magicValue = Message.MagicValue_V1)) + +} + +class FakeOffsetMap(val slots: Int) extends OffsetMap { + val map = new java.util.HashMap[String, Long]() + var lastOffset = -1L + + private def keyFor(key: ByteBuffer) = + new String(Utils.readBytes(key.duplicate), "UTF-8") + + def put(key: ByteBuffer, offset: Long): Unit = { + lastOffset = offset + map.put(keyFor(key), offset) + } + + def get(key: ByteBuffer): Long = { + val k = keyFor(key) + if(map.containsKey(k)) + map.get(k) + else + -1L + } + + def clear(): Unit = map.clear() + + def size: Int = map.size + + def latestOffset: Long = lastOffset + + override def toString: String = map.toString() +}