Updated Branches: refs/heads/trunk eedbea652 -> 7c54e39bd
KAFKA-1112; broker can not start itself after kafka is killed with -9; patched by Jay Kreps and Jun Rao; reviewed by Jay Kreps, Neha Narkhede and Guozhang Wang Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7c54e39b Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7c54e39b Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7c54e39b Branch: refs/heads/trunk Commit: 7c54e39bd48c9908c220ee68cee608a0d0cf5d9d Parents: eedbea6 Author: Jun Rao <[email protected]> Authored: Mon Nov 18 18:31:32 2013 -0800 Committer: Jun Rao <[email protected]> Committed: Mon Nov 18 18:31:32 2013 -0800 ---------------------------------------------------------------------- .../main/scala/kafka/log/FileMessageSet.scala | 2 + core/src/main/scala/kafka/log/Log.scala | 28 +++++++------- core/src/main/scala/kafka/log/LogManager.scala | 2 + core/src/main/scala/kafka/log/OffsetIndex.scala | 40 ++++++++++++-------- .../scala/unit/kafka/log/LogSegmentTest.scala | 5 +-- .../src/test/scala/unit/kafka/log/LogTest.scala | 22 +++++------ .../test/scala/unit/kafka/utils/TestUtils.scala | 10 ++++- 7 files changed, 62 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/7c54e39b/core/src/main/scala/kafka/log/FileMessageSet.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala index 6c099da..e1f8b97 100644 --- a/core/src/main/scala/kafka/log/FileMessageSet.scala +++ b/core/src/main/scala/kafka/log/FileMessageSet.scala @@ -123,6 +123,8 @@ class FileMessageSet private[kafka](@volatile var file: File, if(offset >= targetOffset) return OffsetPosition(offset, position) val messageSize = buffer.getInt() + if(messageSize < Message.MessageOverhead) + throw new IllegalStateException("Invalid message size: " + messageSize) position += MessageSet.LogOverhead + messageSize } null http://git-wip-us.apache.org/repos/asf/kafka/blob/7c54e39b/core/src/main/scala/kafka/log/Log.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 9205128..1883a53 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -155,26 +155,19 @@ class Log(val dir: File, activeSegment.index.resize(config.maxIndexSize) } - // sanity check the index file of every segment, if it's empty or its last offset is greater than its base offset. - for (s <- logSegments) { - require(s.index.entries == 0 || s.index.lastOffset > s.index.baseOffset, - "Corrupt index found, index file (%s) has non-zero size but the last offset is %d and the base offset is %d" - .format(s.index.file.getAbsolutePath, s.index.lastOffset, s.index.baseOffset)) - } + // sanity check the index file of every segment to ensure we don't proceed with a corrupt segment + for (s <- logSegments) + s.index.sanityCheck() } private def recoverLog() { - val lastOffset = try {activeSegment.nextOffset} catch {case _: Throwable => -1L} - val needsRecovery = !(new File(dir.getParentFile, CleanShutdownFile)).exists() - if(!needsRecovery) { - this.recoveryPoint = lastOffset - return - } - if(lastOffset <= this.recoveryPoint) { - info("Log '%s' is fully intact, skipping recovery".format(name)) - this.recoveryPoint = lastOffset + // if we have the clean shutdown marker, skip recovery + if(hasCleanShutdownFile) { + this.recoveryPoint = activeSegment.nextOffset return } + + // okay we need to actually recover this log val unflushed = logSegments(this.recoveryPoint, Long.MaxValue).iterator while(unflushed.hasNext) { val curr = unflushed.next @@ -196,6 +189,11 @@ class Log(val dir: File, } } } + + /** + * Check if we have the "clean shutdown" file + */ + private def hasCleanShutdownFile() = new File(dir.getParentFile, CleanShutdownFile).exists() /** * The number of segments in the log. http://git-wip-us.apache.org/repos/asf/kafka/blob/7c54e39b/core/src/main/scala/kafka/log/LogManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 390b759..81be88a 100644 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -175,6 +175,8 @@ class LogManager(val logDirs: Array[File], allLogs.foreach(_.close()) // update the last flush point checkpointRecoveryPointOffsets() + // mark that the shutdown was clean by creating the clean shutdown marker file + logDirs.foreach(dir => Utils.swallow(new File(dir, Log.CleanShutdownFile).createNewFile())) } finally { // regardless of whether the close succeeded, we need to unlock the data directories dirLocks.foreach(_.destroy()) http://git-wip-us.apache.org/repos/asf/kafka/blob/7c54e39b/core/src/main/scala/kafka/log/OffsetIndex.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala index 2f4e303..96571b3 100644 --- a/core/src/main/scala/kafka/log/OffsetIndex.scala +++ b/core/src/main/scala/kafka/log/OffsetIndex.scala @@ -69,12 +69,8 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi raf.setLength(roundToExactMultiple(maxIndexSize, 8)) } - val len = raf.length() - if(len < 0 || len % 8 != 0) - throw new IllegalStateException("Index file " + file.getName + " is corrupt, found " + len + - " bytes which is not positive or not a multiple of 8.") - /* memory-map the file */ + val len = raf.length() val idx = raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, len) /* set the position in the index for the next entry */ @@ -99,22 +95,20 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi var maxEntries = mmap.limit / 8 /* the last offset in the index */ - var lastOffset = readLastOffset() + var lastOffset = readLastEntry.offset debug("Loaded index file %s with maxEntries = %d, maxIndexSize = %d, entries = %d, lastOffset = %d, file position = %d" .format(file.getAbsolutePath, maxEntries, maxIndexSize, entries(), lastOffset, mmap.position)) /** - * The last offset written to the index + * The last entry in the index */ - private def readLastOffset(): Long = { + def readLastEntry(): OffsetPosition = { inLock(lock) { - val offset = - size.get match { - case 0 => 0 - case s => relativeOffset(this.mmap, s-1) - } - baseOffset + offset + size.get match { + case 0 => OffsetPosition(baseOffset, 0) + case s => OffsetPosition(baseOffset + relativeOffset(this.mmap, s-1), physical(this.mmap, s-1)) + } } } @@ -179,7 +173,7 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi /* return the nth offset relative to the base offset */ private def relativeOffset(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * 8) - /* return the nth physical offset */ + /* return the nth physical position */ private def physical(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * 8 + 4) /** @@ -258,7 +252,7 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi inLock(lock) { this.size.set(entries) mmap.position(this.size.get * 8) - this.lastOffset = readLastOffset + this.lastOffset = readLastEntry.offset } } @@ -351,6 +345,20 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi } /** + * Do a basic sanity check on this index to detect obvious problems + * @throw IllegalArgumentException if any problems are found + */ + def sanityCheck() { + require(entries == 0 || lastOffset > baseOffset, + "Corrupt index found, index file (%s) has non-zero size but the last offset is %d and the base offset is %d" + .format(file.getAbsolutePath, lastOffset, baseOffset)) + val len = file.length() + require(len % 8 == 0, + "Index file " + file.getAbsolutePath + " is corrupt, found " + len + + " bytes which is not positive or not a multiple of 8.") + } + + /** * Round a number to the greatest exact multiple of the given factor less than the given number. * E.g. roundToExactMultiple(67, 8) == 64 */ http://git-wip-us.apache.org/repos/asf/kafka/blob/7c54e39b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala index 5f2c2e8..6b76037 100644 --- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala @@ -212,15 +212,14 @@ class LogSegmentTest extends JUnit3Suite { */ @Test def testRecoveryWithCorruptMessage() { - val rand = new Random(1) val messagesAppended = 20 for(iteration <- 0 until 10) { val seg = createSegment(0) for(i <- 0 until messagesAppended) seg.append(i, messages(i, i.toString)) - val offsetToBeginCorruption = rand.nextInt(messagesAppended) + val offsetToBeginCorruption = TestUtils.random.nextInt(messagesAppended) // start corrupting somewhere in the middle of the chosen record all the way to the end - val position = seg.log.searchFor(offsetToBeginCorruption, 0).position + rand.nextInt(15) + val position = seg.log.searchFor(offsetToBeginCorruption, 0).position + TestUtils.random.nextInt(15) TestUtils.writeNonsenseToFile(seg.log.file, position, seg.log.file.length.toInt - position) seg.recover(64*1024) assertEquals("Should have truncated off bad messages.", (0 until offsetToBeginCorruption).toList, seg.log.map(_.offset).toList) http://git-wip-us.apache.org/repos/asf/kafka/blob/7c54e39b/core/src/test/scala/unit/kafka/log/LogTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 1571f1e..1da1393 100644 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -592,29 +592,29 @@ class LogTest extends JUnitSuite { val config = logConfig.copy(indexInterval = 1, maxMessageSize = 64*1024, segmentSize = 1000) val set = TestUtils.singleMessageSet("test".getBytes()) val recoveryPoint = 50L - for(iteration <- 0 until 10) { + for(iteration <- 0 until 50) { // create a log and write some messages to it + logDir.mkdirs() var log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time) - for(i <- 0 until 100) + val numMessages = 50 + TestUtils.random.nextInt(50) + for(i <- 0 until numMessages) log.append(set) - val seg = log.logSegments(0, recoveryPoint).last - val index = seg.index - val messages = seg.log - val filePosition = messages.searchFor(recoveryPoint, 0).position - val indexPosition = index.lookup(recoveryPoint).position + val messages = log.logSegments.flatMap(_.log.iterator.toList) log.close() - // corrupt file - TestUtils.writeNonsenseToFile(index.file, indexPosition, index.file.length.toInt - indexPosition) - TestUtils.writeNonsenseToFile(messages.file, filePosition, messages.file.length().toInt - filePosition) + // corrupt index and log by appending random bytes + TestUtils.appendNonsenseToFile(log.activeSegment.index.file, TestUtils.random.nextInt(1024) + 1) + TestUtils.appendNonsenseToFile(log.activeSegment.log.file, TestUtils.random.nextInt(1024) + 1) // attempt recovery log = new Log(logDir, config, recoveryPoint, time.scheduler, time) - assertEquals(recoveryPoint, log.logEndOffset) + assertEquals(numMessages, log.logEndOffset) + assertEquals("Messages in the log after recovery should be the same.", messages, log.logSegments.flatMap(_.log.iterator.toList)) + Utils.rm(logDir) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/7c54e39b/core/src/test/scala/unit/kafka/utils/TestUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 777b315..d88b6c3 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -518,9 +518,15 @@ object TestUtils extends Logging { def writeNonsenseToFile(fileName: File, position: Long, size: Int) { val file = new RandomAccessFile(fileName, "rw") file.seek(position) - val rand = new Random for(i <- 0 until size) - file.writeByte(rand.nextInt(255)) + file.writeByte(random.nextInt(255)) + file.close() + } + + def appendNonsenseToFile(fileName: File, size: Int) { + val file = new FileOutputStream(fileName, true) + for(i <- 0 until size) + file.write(random.nextInt(255)) file.close() }
