Repository: kafka Updated Branches: refs/heads/trunk 113d23e8f -> 6ed3e6b1c
KAFKA-3894; log cleaner can partially clean a segment As discussed in https://issues.apache.org/jira/browse/KAFKA-3894, this PR makes the log cleaner do a "partial" clean on a segment, whereby it builds a partial offset map up to a particular offset in a segment. Once cleaning resumes again, we will continue from the next dirty offset, which can now be located in the middle of a segment. Prior to this PR, segments with overly numerous keys could crash the log cleaner thread, as it was required that the log cleaner had to fit at least a single segment in the offset map. Author: Tom Crayford <[email protected]> Reviewers: Jun Rao <[email protected]> Closes #1725 from tcrayford/dont_crash_log_cleaner_thread_if_segment_overflows_buffer Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6ed3e6b1 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6ed3e6b1 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6ed3e6b1 Branch: refs/heads/trunk Commit: 6ed3e6b1cb8a73b1f5f78926ccb247a8953a554c Parents: 113d23e Author: Tom Crayford <[email protected]> Authored: Mon Aug 22 10:58:40 2016 -0700 Committer: Jun Rao <[email protected]> Committed: Mon Aug 22 10:58:40 2016 -0700 ---------------------------------------------------------------------- core/src/main/scala/kafka/log/LogCleaner.scala | 46 +++++------ core/src/main/scala/kafka/log/OffsetMap.scala | 19 ++++- .../test/scala/unit/kafka/log/CleanerTest.scala | 85 ++++++++++++++++++-- 3 files changed, 111 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/6ed3e6b1/core/src/main/scala/kafka/log/LogCleaner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index d4bb1f2..ef880e6 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -311,6 +311,8 @@ private[log] class Cleaner(val id: Int, /* buffer used for write i/o */ private var writeBuffer = ByteBuffer.allocate(ioBufferSize) + require(offsetMap.slots * dupBufferLoadFactor > 1, "offset map is too small to fit in even a single message, so log cleaning will never make progress. You can increase log.cleaner.dedupe.buffer.size or decrease log.cleaner.threads") + /** * Clean the given log * @@ -326,7 +328,8 @@ private[log] class Cleaner(val id: Int, // build the offset map info("Building offset map for %s...".format(cleanable.log.name)) val upperBoundOffset = log.activeSegment.baseOffset - val endOffset = buildOffsetMap(log, cleanable.firstDirtyOffset, upperBoundOffset, offsetMap) + 1 + buildOffsetMap(log, cleanable.firstDirtyOffset, upperBoundOffset, offsetMap) + val endOffset = offsetMap.latestOffset + 1 stats.indexDone() // figure out the timestamp below which it is safe to remove delete tombstones @@ -341,7 +344,7 @@ private[log] class Cleaner(val id: Int, info("Cleaning log %s (discarding tombstones prior to %s)...".format(log.name, new Date(deleteHorizonMs))) for (group <- groupSegmentsBySize(log.logSegments(0, endOffset), log.config.segmentSize, log.config.maxIndexSize)) cleanSegments(log, group, offsetMap, deleteHorizonMs) - + // record buffer utilization stats.bufferUtilization = offsetMap.utilization @@ -533,6 +536,10 @@ private[log] class Cleaner(val id: Int, map: kafka.log.OffsetMap, retainDeletes: Boolean, entry: kafka.message.MessageAndOffset): Boolean = { + val pastLatestOffset = entry.offset > map.latestOffset + if (pastLatestOffset) + return true + val key = entry.message.key if (key != null) { val foundOffset = map.get(key) @@ -613,34 +620,23 @@ private[log] class Cleaner(val id: Int, * @param start The offset at which dirty messages begin * @param end The ending offset for the map that is being built * @param map The map in which to store the mappings - * - * @return The final offset the map covers */ - private[log] def buildOffsetMap(log: Log, start: Long, end: Long, map: OffsetMap): Long = { + private[log] def buildOffsetMap(log: Log, start: Long, end: Long, map: OffsetMap) { map.clear() val dirty = log.logSegments(start, end).toBuffer info("Building offset map for log %s for %d segments in offset range [%d, %d).".format(log.name, dirty.size, start, end)) // Add all the dirty segments. We must take at least map.slots * load_factor, // but we may be able to fit more (if there is lots of duplication in the dirty section of the log) - var offset = dirty.head.baseOffset - require(offset == start, "Last clean offset is %d but segment base offset is %d for log %s.".format(start, offset, log.name)) var full = false for (segment <- dirty if !full) { checkDone(log.topicAndPartition) - val newOffset = buildOffsetMapForSegment(log.topicAndPartition, segment, map) - if (newOffset > -1L) - offset = newOffset - else { - // If not even one segment can fit in the map, compaction cannot happen - require(offset > start, "Unable to build the offset map for segment %s/%s. You can increase log.cleaner.dedupe.buffer.size or decrease log.cleaner.threads".format(log.name, segment.log.file.getName)) + full = buildOffsetMapForSegment(log.topicAndPartition, segment, map, start) + if (full) debug("Offset map is full, %d segments fully mapped, segment with base offset %d is partially mapped".format(dirty.indexOf(segment), segment.baseOffset)) - full = true - } } info("Offset map for log %s complete.".format(log.name)) - offset } /** @@ -649,11 +645,10 @@ private[log] class Cleaner(val id: Int, * @param segment The segment to index * @param map The map in which to store the key=>offset mapping * - * @return The final offset covered by the map or -1 if the map is full + * @return If the map was filled whilst loading from this segment */ - private def buildOffsetMapForSegment(topicAndPartition: TopicAndPartition, segment: LogSegment, map: OffsetMap): Long = { - var position = 0 - var offset = segment.baseOffset + private def buildOffsetMapForSegment(topicAndPartition: TopicAndPartition, segment: LogSegment, map: OffsetMap, start: Long): Boolean = { + var position = segment.index.lookup(start).position val maxDesiredMapSize = (map.slots * this.dupBufferLoadFactor).toInt while (position < segment.log.sizeInBytes) { checkDone(topicAndPartition) @@ -663,15 +658,12 @@ private[log] class Cleaner(val id: Int, val startPosition = position for (entry <- messages) { val message = entry.message - if (message.hasKey) { + if (message.hasKey && entry.offset >= start) { if (map.size < maxDesiredMapSize) map.put(message.key, entry.offset) - else { - // The map is full, stop looping and return - return -1L - } + else + return true } - offset = entry.offset stats.indexMessagesRead(1) } position += messages.validBytes @@ -682,7 +674,7 @@ private[log] class Cleaner(val id: Int, growBuffers() } restoreBuffers() - offset + return false } } http://git-wip-us.apache.org/repos/asf/kafka/blob/6ed3e6b1/core/src/main/scala/kafka/log/OffsetMap.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/OffsetMap.scala b/core/src/main/scala/kafka/log/OffsetMap.scala index f453030..1df0615 100755 --- a/core/src/main/scala/kafka/log/OffsetMap.scala +++ b/core/src/main/scala/kafka/log/OffsetMap.scala @@ -30,6 +30,7 @@ trait OffsetMap { def clear() def size: Int def utilization: Double = size.toDouble / slots + def latestOffset: Long } /** @@ -60,7 +61,10 @@ class SkimpyOffsetMap(val memory: Int, val hashAlgorithm: String = "MD5") extend /* the number of probes for all lookups */ private var probes = 0L - + + /* the latest offset written into the map */ + private var lastOffset = -1L + /** * The number of bytes of space each entry uses (the number of bytes in the hash plus an 8 byte offset) */ @@ -89,6 +93,7 @@ class SkimpyOffsetMap(val memory: Int, val hashAlgorithm: String = "MD5") extend if(Arrays.equals(hash1, hash2)) { // we found an existing entry, overwrite it and return (size does not change) bytes.putLong(offset) + lastOffset = offset return } attempt += 1 @@ -98,6 +103,7 @@ class SkimpyOffsetMap(val memory: Int, val hashAlgorithm: String = "MD5") extend bytes.position(pos) bytes.put(hash1) bytes.putLong(offset) + lastOffset = offset entries += 1 } @@ -106,7 +112,7 @@ class SkimpyOffsetMap(val memory: Int, val hashAlgorithm: String = "MD5") extend */ private def isEmpty(position: Int): Boolean = bytes.getLong(position) == 0 && bytes.getLong(position + 8) == 0 && bytes.getLong(position + 16) == 0 - + /** * Get the offset associated with this key. * @param key The key @@ -136,12 +142,12 @@ class SkimpyOffsetMap(val memory: Int, val hashAlgorithm: String = "MD5") extend /** * Change the salt used for key hashing making all existing keys unfindable. - * Doesn't actually zero out the array. */ override def clear() { this.entries = 0 this.lookups = 0L this.probes = 0L + this.lastOffset = -1L Arrays.fill(bytes.array, bytes.arrayOffset, bytes.arrayOffset + bytes.limit, 0.toByte) } @@ -155,7 +161,12 @@ class SkimpyOffsetMap(val memory: Int, val hashAlgorithm: String = "MD5") extend */ def collisionRate: Double = (this.probes - this.lookups) / this.lookups.toDouble - + + /** + * The latest offset put into the map + */ + override def latestOffset: Long = lastOffset + /** * Calculate the ith probe position. We first try reading successive integers from the hash itself * then if all of those fail we degrade to linear probing. http://git-wip-us.apache.org/repos/asf/kafka/blob/6ed3e6b1/core/src/test/scala/unit/kafka/log/CleanerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/CleanerTest.scala b/core/src/test/scala/unit/kafka/log/CleanerTest.scala index 15920ad..5b0ce9a 100755 --- a/core/src/test/scala/unit/kafka/log/CleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/CleanerTest.scala @@ -108,6 +108,38 @@ class CleanerTest extends JUnitSuite { } @Test + def testPartialSegmentClean() { + // because loadFactor is 0.75, this means we can fit 2 messages in the map + var cleaner = makeCleaner(2) + val logProps = new Properties() + logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer) + + val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) + + log.append(message(0,0)) // offset 0 + log.append(message(1,1)) // offset 1 + log.append(message(0,0)) // offset 2 + log.append(message(1,1)) // offset 3 + log.append(message(0,0)) // offset 4 + // roll the segment, so we can clean the messages already appended + log.roll() + + // clean the log with only one message removed + cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 2)) + assertEquals(immutable.List(1,0,1,0), keysInLog(log)) + assertEquals(immutable.List(1,2,3,4), offsetsInLog(log)) + + // continue to make progress, even though we can only clean one message at a time + cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 3)) + assertEquals(immutable.List(0,1,0), keysInLog(log)) + assertEquals(immutable.List(2,3,4), offsetsInLog(log)) + + cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 4)) + assertEquals(immutable.List(1,0), keysInLog(log)) + assertEquals(immutable.List(3,4), offsetsInLog(log)) + } + + @Test def testLogToClean: Unit = { // create a log with small segment size val logProps = new Properties() @@ -159,6 +191,10 @@ class CleanerTest extends JUnitSuite { def keysInLog(log: Log): Iterable[Int] = log.logSegments.flatMap(s => s.log.filter(!_.message.isNull).filter(_.message.hasKey).map(m => TestUtils.readString(m.message.key).toInt)) + /* extract all the offsets from a log */ + def offsetsInLog(log: Log): Iterable[Long] = + log.logSegments.flatMap(s => s.log.filter(!_.message.isNull).filter(_.message.hasKey).map(m => m.offset)) + def unkeyedMessageCountInLog(log: Log) = log.logSegments.map(s => s.log.filter(!_.message.isNull).count(m => !m.message.hasKey)).sum @@ -307,7 +343,8 @@ class CleanerTest extends JUnitSuite { val end = 500 val offsets = writeToLog(log, (start until end) zip (start until end)) def checkRange(map: FakeOffsetMap, start: Int, end: Int) { - val endOffset = cleaner.buildOffsetMap(log, start, end, map) + 1 + cleaner.buildOffsetMap(log, start, end, map) + val endOffset = map.latestOffset + 1 assertEquals("Last offset should be the end offset.", end, endOffset) assertEquals("Should have the expected number of messages in the map.", end-start, map.size) for(i <- start until end) @@ -439,13 +476,39 @@ class CleanerTest extends JUnitSuite { val end = 2 val offsetSeq = Seq(0L, 7206178L) val offsets = writeToLog(log, (start until end) zip (start until end), offsetSeq) - val endOffset = cleaner.buildOffsetMap(log, start, end, map) + cleaner.buildOffsetMap(log, start, end, map) + val endOffset = map.latestOffset assertEquals("Last offset should be the end offset.", 7206178L, endOffset) assertEquals("Should have the expected number of messages in the map.", end - start, map.size) assertEquals("Map should contain first value", 0L, map.get(key(0))) assertEquals("Map should contain second value", 7206178L, map.get(key(1))) } + /** + * Test building a partial offset map of part of a log segment + */ + @Test + def testBuildPartialOffsetMap() { + // because loadFactor is 0.75, this means we can fit 2 messages in the map + val map = new FakeOffsetMap(3) + val log = makeLog() + val cleaner = makeCleaner(2) + + log.append(message(0,0)) + log.append(message(1,1)) + log.append(message(2,2)) + log.append(message(3,3)) + log.append(message(4,4)) + log.roll() + + cleaner.buildOffsetMap(log, 2, Int.MaxValue, map) + assertEquals(2, map.size) + assertEquals(-1, map.get(key(0))) + assertEquals(2, map.get(key(2))) + assertEquals(3, map.get(key(3))) + assertEquals(-1, map.get(key(4))) + } + private def writeToLog(log: Log, keysAndValues: Iterable[(Int, Int)], offsetSeq: Iterable[Long]): Iterable[Long] = { for(((key, value), offset) <- keysAndValues.zip(offsetSeq)) yield log.append(messageWithOffset(key, value, offset), assignOffsets = false).firstOffset @@ -469,7 +532,7 @@ class CleanerTest extends JUnitSuite { offsetMap = new FakeOffsetMap(capacity), ioBufferSize = 64*1024, maxIoBufferSize = 64*1024, - dupBufferLoadFactor = 0.75, + dupBufferLoadFactor = 0.75, throttler = throttler, time = time, checkDone = checkDone ) @@ -500,12 +563,15 @@ class CleanerTest extends JUnitSuite { class FakeOffsetMap(val slots: Int) extends OffsetMap { val map = new java.util.HashMap[String, Long]() - - private def keyFor(key: ByteBuffer) = + var lastOffset = -1L + + private def keyFor(key: ByteBuffer) = new String(Utils.readBytes(key.duplicate), "UTF-8") - - def put(key: ByteBuffer, offset: Long): Unit = + + def put(key: ByteBuffer, offset: Long): Unit = { + lastOffset = offset map.put(keyFor(key), offset) + } def get(key: ByteBuffer): Long = { val k = keyFor(key) @@ -518,5 +584,8 @@ class FakeOffsetMap(val slots: Int) extends OffsetMap { def clear() = map.clear() def size: Int = map.size - + + def latestOffset: Long = lastOffset + + override def toString: String = map.toString() }
