KAFKA-3510; OffsetIndex thread safety * Make all fields accessed outside of a lock `volatile` * Only allow mutation within the class * Remove unnecessary `AtomicInteger` since mutation always happens inside a lock
Author: Ismael Juma <[email protected]> Reviewers: Guozhang Wang <[email protected]> Closes #1188 from ijuma/kafka-3510-offset-index-thread-safety Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/70301482 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/70301482 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/70301482 Branch: refs/heads/0.10.0 Commit: 703014824aeb0690dfb30a98fcd0f11e9d1e68fc Parents: d1a5883 Author: Ismael Juma <[email protected]> Authored: Tue Apr 5 11:46:04 2016 -0700 Committer: Gwen Shapira <[email protected]> Committed: Tue Apr 5 17:08:53 2016 -0700 ---------------------------------------------------------------------- core/src/main/scala/kafka/log/Log.scala | 2 +- core/src/main/scala/kafka/log/LogSegment.scala | 2 +- core/src/main/scala/kafka/log/OffsetIndex.scala | 173 ++++++++++--------- .../scala/kafka/tools/DumpLogSegments.scala | 2 +- .../scala/unit/kafka/log/OffsetIndexTest.scala | 10 +- 5 files changed, 98 insertions(+), 91 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/70301482/core/src/main/scala/kafka/log/Log.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 81c19fa..8465b64 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -215,7 +215,7 @@ class Log(val dir: File, val fileName = logFile.getName val startOffset = fileName.substring(0, fileName.length - LogFileSuffix.length).toLong val indexFile = new File(CoreUtils.replaceSuffix(logFile.getPath, LogFileSuffix, IndexFileSuffix) + SwapFileSuffix) - val index = new OffsetIndex(file = indexFile, baseOffset = startOffset, maxIndexSize = config.maxIndexSize) + val index = new OffsetIndex(indexFile, baseOffset = startOffset, maxIndexSize = config.maxIndexSize) val swapSegment = new LogSegment(new FileMessageSet(file = swapFile), index = index, baseOffset = startOffset, http://git-wip-us.apache.org/repos/asf/kafka/blob/70301482/core/src/main/scala/kafka/log/LogSegment.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index 9fc68a4..3a4bbc8 100755 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -55,7 +55,7 @@ class LogSegment(val log: FileMessageSet, def this(dir: File, startOffset: Long, indexIntervalBytes: Int, maxIndexSize: Int, rollJitterMs: Long, time: Time, fileAlreadyExists: Boolean = false, initFileSize: Int = 0, preallocate: Boolean = false) = this(new FileMessageSet(file = Log.logFilename(dir, startOffset), fileAlreadyExists = fileAlreadyExists, initFileSize = initFileSize, preallocate = preallocate), - new OffsetIndex(file = Log.indexFilename(dir, startOffset), baseOffset = startOffset, maxIndexSize = maxIndexSize), + new OffsetIndex(Log.indexFilename(dir, startOffset), baseOffset = startOffset, maxIndexSize = maxIndexSize), startOffset, indexIntervalBytes, rollJitterMs, http://git-wip-us.apache.org/repos/asf/kafka/blob/70301482/core/src/main/scala/kafka/log/OffsetIndex.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala index e95c9d1..ce35d68 100755 --- a/core/src/main/scala/kafka/log/OffsetIndex.scala +++ b/core/src/main/scala/kafka/log/OffsetIndex.scala @@ -24,7 +24,6 @@ import java.io._ import java.nio._ import java.nio.channels._ import java.util.concurrent.locks._ -import java.util.concurrent.atomic._ import kafka.utils._ import kafka.utils.CoreUtils.inLock import kafka.common.InvalidOffsetException @@ -54,62 +53,70 @@ import kafka.common.InvalidOffsetException * All external APIs translate from relative offsets to full offsets, so users of this class do not interact with the internal * storage format. */ -class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSize: Int = -1) extends Logging { +class OffsetIndex(@volatile private[this] var _file: File, val baseOffset: Long, val maxIndexSize: Int = -1) extends Logging { private val lock = new ReentrantLock /* initialize the memory mapping for this index */ - private var mmap: MappedByteBuffer = - { - val newlyCreated = file.createNewFile() - val raf = new RandomAccessFile(file, "rw") - try { - /* pre-allocate the file if necessary */ - if(newlyCreated) { - if(maxIndexSize < 8) - throw new IllegalArgumentException("Invalid max index size: " + maxIndexSize) - raf.setLength(roundToExactMultiple(maxIndexSize, 8)) - } - - /* memory-map the file */ - val len = raf.length() - val idx = raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, len) - - /* set the position in the index for the next entry */ - if(newlyCreated) - idx.position(0) - else - // if this is a pre-existing index, assume it is all valid and set position to last entry - idx.position(roundToExactMultiple(idx.limit, 8)) - idx - } finally { - CoreUtils.swallow(raf.close()) + @volatile + private[this] var mmap: MappedByteBuffer = { + val newlyCreated = _file.createNewFile() + val raf = new RandomAccessFile(_file, "rw") + try { + /* pre-allocate the file if necessary */ + if (newlyCreated) { + if (maxIndexSize < 8) + throw new IllegalArgumentException("Invalid max index size: " + maxIndexSize) + raf.setLength(roundToExactMultiple(maxIndexSize, 8)) } + + /* memory-map the file */ + val len = raf.length() + val idx = raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, len) + + /* set the position in the index for the next entry */ + if (newlyCreated) + idx.position(0) + else + // if this is a pre-existing index, assume it is all valid and set position to last entry + idx.position(roundToExactMultiple(idx.limit, 8)) + idx + } finally { + CoreUtils.swallow(raf.close()) } - + } + /* the number of eight-byte entries currently in the index */ - private var size = new AtomicInteger(mmap.position / 8) - - /** - * The maximum number of eight-byte entries this index can hold - */ @volatile - var maxEntries = mmap.limit / 8 - - /* the last offset in the index */ - var lastOffset = readLastEntry.offset + private[this] var _entries = mmap.position / 8 + + /* The maximum number of eight-byte entries this index can hold */ + @volatile + private[this] var _maxEntries = mmap.limit / 8 + + @volatile + private[this] var _lastOffset = readLastEntry.offset debug("Loaded index file %s with maxEntries = %d, maxIndexSize = %d, entries = %d, lastOffset = %d, file position = %d" - .format(file.getAbsolutePath, maxEntries, maxIndexSize, entries(), lastOffset, mmap.position)) + .format(_file.getAbsolutePath, _maxEntries, maxIndexSize, _entries, _lastOffset, mmap.position)) + + /** The maximum number of entries this index can hold */ + def maxEntries: Int = _maxEntries + + /** The last offset in the index */ + def lastOffset: Long = _lastOffset + + /** The index file */ + def file: File = _file /** * The last entry in the index */ def readLastEntry(): OffsetPosition = { inLock(lock) { - size.get match { + _entries match { case 0 => OffsetPosition(baseOffset, 0) - case s => OffsetPosition(baseOffset + relativeOffset(this.mmap, s-1), physical(this.mmap, s-1)) + case s => OffsetPosition(baseOffset + relativeOffset(mmap, s - 1), physical(mmap, s - 1)) } } } @@ -149,22 +156,22 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi val relOffset = targetOffset - baseOffset // check if the index is empty - if(entries == 0) + if (_entries == 0) return -1 // check if the target offset is smaller than the least offset - if(relativeOffset(idx, 0) > relOffset) + if (relativeOffset(idx, 0) > relOffset) return -1 // binary search for the entry var lo = 0 - var hi = entries-1 - while(lo < hi) { + var hi = _entries - 1 + while (lo < hi) { val mid = ceil(hi/2.0 + lo/2.0).toInt val found = relativeOffset(idx, mid) - if(found == relOffset) + if (found == relOffset) return mid - else if(found < relOffset) + else if (found < relOffset) lo = mid else hi = mid - 1 @@ -185,8 +192,8 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi */ def entry(n: Int): OffsetPosition = { maybeLock(lock) { - if(n >= entries) - throw new IllegalArgumentException("Attempt to fetch the %dth entry from an index of size %d.".format(n, entries)) + if(n >= _entries) + throw new IllegalArgumentException("Attempt to fetch the %dth entry from an index of size %d.".format(n, _entries)) val idx = mmap.duplicate OffsetPosition(relativeOffset(idx, n), physical(idx, n)) } @@ -197,17 +204,17 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi */ def append(offset: Long, position: Int) { inLock(lock) { - require(!isFull, "Attempt to append to a full index (size = " + size + ").") - if (size.get == 0 || offset > lastOffset) { - debug("Adding index entry %d => %d to %s.".format(offset, position, file.getName)) - this.mmap.putInt((offset - baseOffset).toInt) - this.mmap.putInt(position) - this.size.incrementAndGet() - this.lastOffset = offset - require(entries * 8 == mmap.position, entries + " entries but file position in index is " + mmap.position + ".") + require(!isFull, "Attempt to append to a full index (size = " + _entries + ").") + if (_entries == 0 || offset > _lastOffset) { + debug("Adding index entry %d => %d to %s.".format(offset, position, _file.getName)) + mmap.putInt((offset - baseOffset).toInt) + mmap.putInt(position) + _entries += 1 + _lastOffset = offset + require(_entries * 8 == mmap.position, _entries + " entries but file position in index is " + mmap.position + ".") } else { throw new InvalidOffsetException("Attempt to append an offset (%d) to position %d no larger than the last offset appended (%d) to %s." - .format(offset, entries, lastOffset, file.getAbsolutePath)) + .format(offset, _entries, _lastOffset, _file.getAbsolutePath)) } } } @@ -215,7 +222,7 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi /** * True iff there are no more slots available in this index */ - def isFull: Boolean = entries >= this.maxEntries + def isFull: Boolean = _entries >= _maxEntries /** * Truncate the entire index, deleting all entries @@ -252,9 +259,9 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi */ private def truncateToEntries(entries: Int) { inLock(lock) { - this.size.set(entries) - mmap.position(this.size.get * 8) - this.lastOffset = readLastEntry.offset + _entries = entries + mmap.position(_entries * 8) + _lastOffset = readLastEntry.offset } } @@ -264,7 +271,7 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi */ def trimToValidSize() { inLock(lock) { - resize(entries * 8) + resize(_entries * 8) } } @@ -276,18 +283,18 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi */ def resize(newSize: Int) { inLock(lock) { - val raf = new RandomAccessFile(file, "rw") + val raf = new RandomAccessFile(_file, "rw") val roundedNewSize = roundToExactMultiple(newSize, 8) - val position = this.mmap.position + val position = mmap.position /* Windows won't let us modify the file length while the file is mmapped :-( */ - if(Os.isWindows) - forceUnmap(this.mmap) + if (Os.isWindows) + forceUnmap(mmap) try { raf.setLength(roundedNewSize) - this.mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize) - this.maxEntries = this.mmap.limit / 8 - this.mmap.position(position) + mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize) + _maxEntries = mmap.limit / 8 + mmap.position(position) } finally { CoreUtils.swallow(raf.close()) } @@ -319,19 +326,19 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi * Delete this index file */ def delete(): Boolean = { - info("Deleting index " + this.file.getAbsolutePath) - if(Os.isWindows) - CoreUtils.swallow(forceUnmap(this.mmap)) - this.file.delete() + info("Deleting index " + _file.getAbsolutePath) + if (Os.isWindows) + CoreUtils.swallow(forceUnmap(mmap)) + _file.delete() } /** The number of entries in this index */ - def entries() = size.get + def entries = _entries /** * The number of bytes actually used by this index */ - def sizeInBytes() = 8 * entries + def sizeInBytes() = 8 * _entries /** Close the index */ def close() { @@ -343,8 +350,8 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi * @throws IOException if rename fails */ def renameTo(f: File) { - try Utils.atomicMoveWithFallback(file.toPath, f.toPath) - finally this.file = f + try Utils.atomicMoveWithFallback(_file.toPath, f.toPath) + finally _file = f } /** @@ -352,13 +359,13 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi * @throws IllegalArgumentException if any problems are found */ def sanityCheck() { - require(entries == 0 || lastOffset > baseOffset, + require(_entries == 0 || lastOffset > baseOffset, "Corrupt index found, index file (%s) has non-zero size but the last offset is %d and the base offset is %d" - .format(file.getAbsolutePath, lastOffset, baseOffset)) - val len = file.length() - require(len % 8 == 0, - "Index file " + file.getName + " is corrupt, found " + len + - " bytes which is not positive or not a multiple of 8.") + .format(_file.getAbsolutePath, lastOffset, baseOffset)) + val len = _file.length() + require(len % 8 == 0, + "Index file " + _file.getName + " is corrupt, found " + len + + " bytes which is not positive or not a multiple of 8.") } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/70301482/core/src/main/scala/kafka/tools/DumpLogSegments.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala index e882a30..dc99672 100755 --- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala +++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala @@ -124,7 +124,7 @@ object DumpLogSegments { val startOffset = file.getName().split("\\.")(0).toLong val logFile = new File(file.getAbsoluteFile.getParent, file.getName.split("\\.")(0) + Log.LogFileSuffix) val messageSet = new FileMessageSet(logFile, false) - val index = new OffsetIndex(file = file, baseOffset = startOffset) + val index = new OffsetIndex(file, baseOffset = startOffset) //Check that index passes sanityCheck, this is the check that determines if indexes will be rebuilt on startup or not. if (indexSanityOnly) { http://git-wip-us.apache.org/repos/asf/kafka/blob/70301482/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala b/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala index dfd7b54..869e618 100644 --- a/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala +++ b/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala @@ -34,7 +34,7 @@ class OffsetIndexTest extends JUnitSuite { @Before def setup() { - this.idx = new OffsetIndex(file = nonExistantTempFile(), baseOffset = 45L, maxIndexSize = 30 * 8) + this.idx = new OffsetIndex(nonExistantTempFile(), baseOffset = 45L, maxIndexSize = 30 * 8) } @After @@ -103,7 +103,7 @@ class OffsetIndexTest extends JUnitSuite { idx.append(first.offset, first.position) idx.append(sec.offset, sec.position) idx.close() - val idxRo = new OffsetIndex(file = idx.file, baseOffset = idx.baseOffset) + val idxRo = new OffsetIndex(idx.file, baseOffset = idx.baseOffset) assertEquals(first, idxRo.lookup(first.offset)) assertEquals(sec, idxRo.lookup(sec.offset)) assertEquals(sec.offset, idxRo.lastOffset) @@ -113,7 +113,7 @@ class OffsetIndexTest extends JUnitSuite { @Test def truncate() { - val idx = new OffsetIndex(file = nonExistantTempFile(), baseOffset = 0L, maxIndexSize = 10 * 8) + val idx = new OffsetIndex(nonExistantTempFile(), baseOffset = 0L, maxIndexSize = 10 * 8) idx.truncate() for(i <- 1 until 10) idx.append(i, i) @@ -140,7 +140,7 @@ class OffsetIndexTest extends JUnitSuite { idx.append(5, 5) idx.truncate() - assertEquals("Full truncation should leave no entries", 0, idx.entries()) + assertEquals("Full truncation should leave no entries", 0, idx.entries) idx.append(0, 0) } @@ -169,4 +169,4 @@ class OffsetIndexTest extends JUnitSuite { file.delete() file } -} \ No newline at end of file +}
