Repository: kafka Updated Branches: refs/heads/trunk de982ba3f -> 76f6e14b0
KAFKA-5413; Log cleaner fails due to large offset in segment file the contribution is my original work and I license the work to the project under the project's open source license. junrao , I had already made the code change before your last comment. I've done pretty much what you said, except that I've not used the current segment because I wasn't sure if it will always be available. I'm happy to change it if you prefer. I've run all the unit and integration tests which all passed. Author: Kelvin Rutt <[email protected]> Author: Kelvin Rutt <[email protected]> Reviewers: Jun Rao <[email protected]> Closes #3357 from kelvinrutt/kafka_5413_bugfix Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/76f6e14b Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/76f6e14b Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/76f6e14b Branch: refs/heads/trunk Commit: 76f6e14b07bd97d17f9275968a047d68b4658704 Parents: de982ba Author: Kelvin Rutt <[email protected]> Authored: Tue Jun 20 18:00:41 2017 -0700 Committer: Jun Rao <[email protected]> Committed: Tue Jun 20 18:00:41 2017 -0700 ---------------------------------------------------------------------- core/src/main/scala/kafka/log/LogCleaner.scala | 29 +++++++++- .../scala/unit/kafka/log/LogCleanerTest.scala | 58 +++++++++++++++++--- 2 files changed, 76 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/76f6e14b/core/src/main/scala/kafka/log/LogCleaner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 623586f..5222487 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -362,9 +362,10 @@ private[log] class Cleaner(val id: Int, // this is the lower of the last active segment and the compaction lag val cleanableHorizonMs = log.logSegments(0, cleanable.firstUncleanableOffset).lastOption.map(_.lastModified).getOrElse(0L) + // group the segments and clean the groups info("Cleaning log %s (cleaning prior to %s, discarding tombstones prior to %s)...".format(log.name, new Date(cleanableHorizonMs), new Date(deleteHorizonMs))) - for (group <- groupSegmentsBySize(log.logSegments(0, endOffset), log.config.segmentSize, log.config.maxIndexSize)) + for (group <- groupSegmentsBySize(log.logSegments(0, endOffset), log.config.segmentSize, log.config.maxIndexSize, cleanable.firstUncleanableOffset)) cleanSegments(log, group, offsetMap, deleteHorizonMs, stats) // record buffer utilization @@ -616,7 +617,7 @@ private[log] class Cleaner(val id: Int, * * @return A list of grouped segments */ - private[log] def groupSegmentsBySize(segments: Iterable[LogSegment], maxSize: Int, maxIndexSize: Int): List[Seq[LogSegment]] = { + private[log] def groupSegmentsBySize(segments: Iterable[LogSegment], maxSize: Int, maxIndexSize: Int, firstUncleanableOffset: Long): List[Seq[LogSegment]] = { var grouped = List[List[LogSegment]]() var segs = segments.toList while(segs.nonEmpty) { @@ -629,7 +630,7 @@ private[log] class Cleaner(val id: Int, logSize + segs.head.size <= maxSize && indexSize + segs.head.index.sizeInBytes <= maxIndexSize && timeIndexSize + segs.head.timeIndex.sizeInBytes <= maxIndexSize && - segs.head.index.lastOffset - group.last.index.baseOffset <= Int.MaxValue) { + lastOffsetForFirstSegment(segs, firstUncleanableOffset) - group.last.baseOffset <= Int.MaxValue) { group = segs.head :: group logSize += segs.head.size indexSize += segs.head.index.sizeInBytes @@ -642,6 +643,28 @@ private[log] class Cleaner(val id: Int, } /** + * We want to get the last offset in the first log segment in segs. + * LogSegment.nextOffset() gives the exact last offset in a segment, but can be expensive since it requires + * scanning the segment from the last index entry. + * Therefore, we estimate the last offset of the first log segment by using + * the base offset of the next segment in the list. + * If the next segment doesn't exist, first Uncleanable Offset will be used. + * + * @param segs - remaining segments to group. + * @return The estimated last offset for the first segment in segs + */ + private def lastOffsetForFirstSegment(segs: List[LogSegment], firstUncleanableOffset: Long): Long = { + if (segs.size > 1) { + /* if there is a next segment, use its base offset as the bounding offset to guarantee we know + * the worst case offset */ + segs(1).baseOffset - 1 + } else { + //for the last segment in the list, use the first uncleanable offset. + firstUncleanableOffset - 1 + } + } + + /** * Build a map of key_hash => offset for the keys in the cleanable dirty portion of the log to use in cleaning. * @param log The log to use * @param start The offset at which dirty messages begin http://git-wip-us.apache.org/repos/asf/kafka/blob/76f6e14b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala index 5e95dc2..dabd2d6 100755 --- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala @@ -642,17 +642,17 @@ class LogCleanerTest extends JUnitSuite { } // grouping by very large values should result in a single group with all the segments in it - var groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue) + var groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue, log.logEndOffset) assertEquals(1, groups.size) assertEquals(log.numberOfSegments, groups.head.size) checkSegmentOrder(groups) // grouping by very small values should result in all groups having one entry - groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = 1, maxIndexSize = Int.MaxValue) + groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = 1, maxIndexSize = Int.MaxValue, log.logEndOffset) assertEquals(log.numberOfSegments, groups.size) assertTrue("All groups should be singletons.", groups.forall(_.size == 1)) checkSegmentOrder(groups) - groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = 1) + groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = 1, log.logEndOffset) assertEquals(log.numberOfSegments, groups.size) assertTrue("All groups should be singletons.", groups.forall(_.size == 1)) checkSegmentOrder(groups) @@ -661,13 +661,13 @@ class LogCleanerTest extends JUnitSuite { // check grouping by log size val logSize = log.logSegments.take(groupSize).map(_.size).sum.toInt + 1 - groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = logSize, maxIndexSize = Int.MaxValue) + groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = logSize, maxIndexSize = Int.MaxValue, log.logEndOffset) checkSegmentOrder(groups) assertTrue("All but the last group should be the target size.", groups.dropRight(1).forall(_.size == groupSize)) // check grouping by index size val indexSize = log.logSegments.take(groupSize).map(_.index.sizeInBytes).sum + 1 - groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = indexSize) + groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = indexSize, log.logEndOffset) checkSegmentOrder(groups) assertTrue("All but the last group should be the target size.", groups.dropRight(1).forall(_.size == groupSize)) } @@ -699,14 +699,14 @@ class LogCleanerTest extends JUnitSuite { assertEquals(Int.MaxValue, log.activeSegment.index.lastOffset) // grouping should result in a single group with maximum relative offset of Int.MaxValue - var groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue) + var groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue, log.logEndOffset) assertEquals(1, groups.size) // append another message, making last offset of second segment > Int.MaxValue log.appendAsLeader(TestUtils.singletonRecords(value = "hello".getBytes, key = "hello".getBytes), leaderEpoch = 0) // grouping should not group the two segments to ensure that maximum relative offset in each group <= Int.MaxValue - groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue) + groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue, log.logEndOffset) assertEquals(2, groups.size) checkSegmentOrder(groups) @@ -714,13 +714,55 @@ class LogCleanerTest extends JUnitSuite { while (log.numberOfSegments < 4) log.appendAsLeader(TestUtils.singletonRecords(value = "hello".getBytes, key = "hello".getBytes), leaderEpoch = 0) - groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue) + groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue, log.logEndOffset) assertEquals(log.numberOfSegments - 1, groups.size) for (group <- groups) assertTrue("Relative offset greater than Int.MaxValue", group.last.index.lastOffset - group.head.index.baseOffset <= Int.MaxValue) checkSegmentOrder(groups) } + /** + * Following the loading of a log segment where the index file is zero sized, + * the index returned would be the base offset. Sometimes the log file would + * contain data with offsets in excess of the baseOffset which would cause + * the log cleaner to group together segments with a range of > Int.MaxValue + * this test replicates that scenario to ensure that the segments are grouped + * correctly. + */ + @Test + def testSegmentGroupingFollowingLoadOfZeroIndex(): Unit = { + val cleaner = makeCleaner(Int.MaxValue) + + val logProps = new Properties() + logProps.put(LogConfig.SegmentBytesProp, 400: java.lang.Integer) + + //mimic the effect of loading an empty index file + logProps.put(LogConfig.IndexIntervalBytesProp, 400: java.lang.Integer) + + val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) + + val record1 = messageWithOffset("hello".getBytes, "hello".getBytes, 0) + log.appendAsFollower(record1) + val record2 = messageWithOffset("hello".getBytes, "hello".getBytes, 1) + log.appendAsFollower(record2) + log.roll(Int.MaxValue/2) // starting a new log segment at offset Int.MaxValue/2 + val record3 = messageWithOffset("hello".getBytes, "hello".getBytes, Int.MaxValue/2) + log.appendAsFollower(record3) + val record4 = messageWithOffset("hello".getBytes, "hello".getBytes, Int.MaxValue.toLong + 1) + log.appendAsFollower(record4) + + assertTrue("Actual offset range should be > Int.MaxValue", log.logEndOffset - 1 - log.logStartOffset > Int.MaxValue) + assertTrue("index.lastOffset is reporting the wrong last offset", log.logSegments.last.index.lastOffset - log.logStartOffset <= Int.MaxValue) + + // grouping should result in two groups because the second segment takes the offset range > MaxInt + val groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue, log.logEndOffset) + assertEquals(2, groups.size) + + for (group <- groups) + assertTrue("Relative offset greater than Int.MaxValue", group.last.nextOffset() - 1 - group.head.baseOffset <= Int.MaxValue) + checkSegmentOrder(groups) + } + private def checkSegmentOrder(groups: Seq[Seq[LogSegment]]): Unit = { val offsets = groups.flatMap(_.map(_.baseOffset)) assertEquals("Offsets should be in increasing order.", offsets.sorted, offsets)
