Repository: kafka Updated Branches: refs/heads/0.10.0 2a1b3b93b -> 9181ecf24
KAFKA-3587; LogCleaner fails due to incorrect offset map computation Removed the over pessimistic require and instead attempt to fill the dedup buffer. Use the (only) map until full; this may allow to process all dirty segment (optimism) or may happen in the middle of a dirt segment. In either case, do compaction using the map loaded that way. This patch was developed with edoardocomar Author: Mickael Maison <mickael.mai...@gmail.com> Reviewers: Jun Rao <jun...@gmail.com>, Guozhang Wang <wangg...@gmail.com> Closes #1332 from mimaison/KAFKA-3587 (cherry picked from commit 2caf872c2e51d689c6ac20240c4a306e36d98b15) Signed-off-by: Ismael Juma <ism...@juma.me.uk> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9181ecf2 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9181ecf2 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9181ecf2 Branch: refs/heads/0.10.0 Commit: 9181ecf247297ea7f7cdd2b69a578a487c74fdf5 Parents: 2a1b3b9 Author: Mickael Maison <mickael.mai...@gmail.com> Authored: Mon May 9 18:10:40 2016 +0100 Committer: Ismael Juma <ism...@juma.me.uk> Committed: Mon May 9 18:11:08 2016 +0100 ---------------------------------------------------------------------- core/src/main/scala/kafka/log/LogCleaner.scala | 27 ++++++++++------ .../test/scala/unit/kafka/log/CleanerTest.scala | 33 ++++++++++++++++++++ 2 files changed, 51 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/9181ecf2/core/src/main/scala/kafka/log/LogCleaner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 0f742f9..c6636be 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -615,17 +615,19 @@ private[log] class Cleaner(val id: Int, // but we may be able to fit more (if there is lots of duplication in the dirty section of the log) var offset = dirty.head.baseOffset require(offset == start, "Last clean offset is %d but segment base offset is %d for log %s.".format(start, offset, log.name)) - val maxDesiredMapSize = (map.slots * this.dupBufferLoadFactor).toInt var full = false for (segment <- dirty if !full) { checkDone(log.topicAndPartition) - val segmentSize = segment.nextOffset() - segment.baseOffset - require(segmentSize <= maxDesiredMapSize, "%d messages in segment %s/%s but offset map can fit only %d. You can increase log.cleaner.dedupe.buffer.size or decrease log.cleaner.threads".format(segmentSize, log.name, segment.log.file.getName, maxDesiredMapSize)) - if (map.size + segmentSize <= maxDesiredMapSize) - offset = buildOffsetMapForSegment(log.topicAndPartition, segment, map) - else + val newOffset = buildOffsetMapForSegment(log.topicAndPartition, segment, map) + if (newOffset > -1L) + offset = newOffset + else { + // If not even one segment can fit in the map, compaction cannot happen + require(offset > start, "Unable to build the offset map for segment %s/%s. You can increase log.cleaner.dedupe.buffer.size or decrease log.cleaner.threads".format(log.name, segment.log.file.getName)) + debug("Offset map is full, %d segments fully mapped, segment with base offset %d is partially mapped".format(dirty.indexOf(segment), segment.baseOffset)) full = true + } } info("Offset map for log %s complete.".format(log.name)) offset @@ -637,11 +639,12 @@ private[log] class Cleaner(val id: Int, * @param segment The segment to index * @param map The map in which to store the key=>offset mapping * - * @return The final offset covered by the map + * @return The final offset covered by the map or -1 if the map is full */ private def buildOffsetMapForSegment(topicAndPartition: TopicAndPartition, segment: LogSegment, map: OffsetMap): Long = { var position = 0 var offset = segment.baseOffset + val maxDesiredMapSize = (map.slots * this.dupBufferLoadFactor).toInt while (position < segment.log.sizeInBytes) { checkDone(topicAndPartition) readBuffer.clear() @@ -650,8 +653,14 @@ private[log] class Cleaner(val id: Int, val startPosition = position for (entry <- messages) { val message = entry.message - if (message.hasKey) - map.put(message.key, entry.offset) + if (message.hasKey) { + if (map.size < maxDesiredMapSize) + map.put(message.key, entry.offset) + else { + // The map is full, stop looping and return + return -1L + } + } offset = entry.offset stats.indexMessagesRead(1) } http://git-wip-us.apache.org/repos/asf/kafka/blob/9181ecf2/core/src/test/scala/unit/kafka/log/CleanerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/CleanerTest.scala b/core/src/test/scala/unit/kafka/log/CleanerTest.scala index b6849f0..752a260 100755 --- a/core/src/test/scala/unit/kafka/log/CleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/CleanerTest.scala @@ -423,6 +423,39 @@ class CleanerTest extends JUnitSuite { recoverAndCheck(config, cleanedKeys) } + + @Test + def testBuildOffsetMapFakeLarge() { + val map = new FakeOffsetMap(1000) + val logProps = new Properties() + logProps.put(LogConfig.SegmentBytesProp, 72: java.lang.Integer) + logProps.put(LogConfig.SegmentIndexBytesProp, 72: java.lang.Integer) + logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact) + val logConfig = LogConfig(logProps) + val log = makeLog(config = logConfig) + val cleaner = makeCleaner(Int.MaxValue) + val start = 0 + val end = 2 + val offsetSeq = Seq(0L, 7206178L) + val offsets = writeToLog(log, (start until end) zip (start until end), offsetSeq) + val endOffset = cleaner.buildOffsetMap(log, start, end, map) + assertEquals("Last offset should be the end offset.", 7206178L, endOffset) + assertEquals("Should have the expected number of messages in the map.", end - start, map.size) + assertEquals("Map should contain first value", 0L, map.get(key(0))) + assertEquals("Map should contain second value", 7206178L, map.get(key(1))) + } + + private def writeToLog(log: Log, keysAndValues: Iterable[(Int, Int)], offsetSeq: Iterable[Long]): Iterable[Long] = { + for(((key, value), offset) <- keysAndValues.zip(offsetSeq)) + yield log.append(messageWithOffset(key, value, offset), assignOffsets = false).firstOffset + } + + private def messageWithOffset(key: Int, value: Int, offset: Long) = + new ByteBufferMessageSet(NoCompressionCodec, Seq(offset), + new Message(key = key.toString.getBytes, + bytes = value.toString.getBytes, + timestamp = Message.NoTimestamp, + magicValue = Message.MagicValue_V1)) def makeLog(dir: File = dir, config: LogConfig = logConfig) =