This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 70ddd8a MINOR: Improve logging around index files (#6385) 70ddd8a is described below commit 70ddd8af71938b4f5f6d1bb3df6243ef13359bcf Author: Bob Barrett <bob.barr...@outlook.com> AuthorDate: Tue Mar 19 02:00:01 2019 -0400 MINOR: Improve logging around index files (#6385) This patch adds additional DEBUG statements in AbstractIndex.scala, OffsetIndex.scala, and TimeIndex.scala. It also changes the logging on append from DEBUG to TRACE to make DEBUG logging less disruptive, and it ensures that exceptions raised from index classes include file/offset information. Reviewers: Jason Gustafson <ja...@confluent.io> --- core/src/main/scala/kafka/log/AbstractIndex.scala | 3 +++ core/src/main/scala/kafka/log/OffsetIndex.scala | 15 +++++++++------ core/src/main/scala/kafka/log/TimeIndex.scala | 17 +++++++++++------ core/src/main/scala/kafka/log/TransactionIndex.scala | 13 +++++++------ 4 files changed, 30 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/kafka/log/AbstractIndex.scala b/core/src/main/scala/kafka/log/AbstractIndex.scala index c69e783..05237c4 100644 --- a/core/src/main/scala/kafka/log/AbstractIndex.scala +++ b/core/src/main/scala/kafka/log/AbstractIndex.scala @@ -175,6 +175,7 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Lon val roundedNewSize = roundDownToExactMultiple(newSize, entrySize) if (_length == roundedNewSize) { + debug(s"Index ${file.getAbsolutePath} was not resized because it already has size $roundedNewSize") false } else { val raf = new RandomAccessFile(file, "rw") @@ -189,6 +190,8 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Lon mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize) _maxEntries = mmap.limit() / entrySize mmap.position(position) + debug(s"Resized ${file.getAbsolutePath} to $roundedNewSize, position is ${mmap.position()} " + + s"and limit is ${mmap.limit()}") true } finally { CoreUtils.swallow(raf.close(), AbstractIndex) diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala index 876895c..d043d0a 100755 --- a/core/src/main/scala/kafka/log/OffsetIndex.scala +++ b/core/src/main/scala/kafka/log/OffsetIndex.scala @@ -59,8 +59,8 @@ class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writabl /* the last offset in the index */ private[this] var _lastOffset = lastEntry.offset - debug("Loaded index file %s with maxEntries = %d, maxIndexSize = %d, entries = %d, lastOffset = %d, file position = %d" - .format(file.getAbsolutePath, maxEntries, maxIndexSize, _entries, _lastOffset, mmap.position())) + debug(s"Loaded index file ${file.getAbsolutePath} with maxEntries = $maxEntries, " + + s"maxIndexSize = $maxIndexSize, entries = ${_entries}, lastOffset = ${_lastOffset}, file position = ${mmap.position()}") /** * The last entry in the index @@ -128,7 +128,8 @@ class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writabl def entry(n: Int): OffsetPosition = { maybeLock(lock) { if (n >= _entries) - throw new IllegalArgumentException(s"Attempt to fetch the ${n}th entry from an index of size ${_entries}.") + throw new IllegalArgumentException(s"Attempt to fetch the ${n}th entry from index ${file.getAbsolutePath}, " + + s"which has size ${_entries}.") parseEntry(mmap, n) } } @@ -141,15 +142,15 @@ class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writabl inLock(lock) { require(!isFull, "Attempt to append to a full index (size = " + _entries + ").") if (_entries == 0 || offset > _lastOffset) { - debug("Adding index entry %d => %d to %s.".format(offset, position, file.getName)) + trace(s"Adding index entry $offset => $position to ${file.getAbsolutePath}") mmap.putInt(relativeOffset(offset)) mmap.putInt(position) _entries += 1 _lastOffset = offset require(_entries * entrySize == mmap.position(), entries + " entries but file position in index is " + mmap.position() + ".") } else { - throw new InvalidOffsetException("Attempt to append an offset (%d) to position %d no larger than the last offset appended (%d) to %s." - .format(offset, entries, _lastOffset, file.getAbsolutePath)) + throw new InvalidOffsetException(s"Attempt to append an offset ($offset) to position $entries no larger than" + + s" the last offset appended (${_lastOffset}) to ${file.getAbsolutePath}.") } } } @@ -185,6 +186,8 @@ class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writabl _entries = entries mmap.position(_entries * entrySize) _lastOffset = lastEntry.offset + debug(s"Truncated index ${file.getAbsolutePath} to $entries entries;" + + s" position is now ${mmap.position()} and last offset is now ${_lastOffset}") } } diff --git a/core/src/main/scala/kafka/log/TimeIndex.scala b/core/src/main/scala/kafka/log/TimeIndex.scala index f5c4d90..f9bc929 100644 --- a/core/src/main/scala/kafka/log/TimeIndex.scala +++ b/core/src/main/scala/kafka/log/TimeIndex.scala @@ -58,6 +58,9 @@ class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable: override def entrySize = 12 + debug(s"Loaded index file ${file.getAbsolutePath} with maxEntries = $maxEntries, maxIndexSize = $maxIndexSize," + + s" entries = ${_entries}, lastOffset = ${_lastEntry}, file position = ${mmap.position()}") + // We override the full check to reserve the last time index entry slot for the on roll call. override def isFull: Boolean = entries >= maxEntries - 1 @@ -87,7 +90,8 @@ class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable: def entry(n: Int): TimestampOffset = { maybeLock(lock) { if(n >= _entries) - throw new IllegalArgumentException("Attempt to fetch the %dth entry from a time index of size %d.".format(n, _entries)) + throw new IllegalArgumentException(s"Attempt to fetch the ${n}th entry from time index ${file.getAbsolutePath} " + + s"which has size ${_entries}.") parseEntry(mmap, n) } } @@ -117,16 +121,16 @@ class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable: // 1. A log segment is closed. // 2. LogSegment.onBecomeInactiveSegment() is called when an active log segment is rolled. if (_entries != 0 && offset < lastEntry.offset) - throw new InvalidOffsetException("Attempt to append an offset (%d) to slot %d no larger than the last offset appended (%d) to %s." - .format(offset, _entries, lastEntry.offset, file.getAbsolutePath)) + throw new InvalidOffsetException(s"Attempt to append an offset ($offset) to slot ${_entries} no larger than" + + s" the last offset appended (${lastEntry.offset}) to ${file.getAbsolutePath}.") if (_entries != 0 && timestamp < lastEntry.timestamp) - throw new IllegalStateException("Attempt to append a timestamp (%d) to slot %d no larger than the last timestamp appended (%d) to %s." - .format(timestamp, _entries, lastEntry.timestamp, file.getAbsolutePath)) + throw new IllegalStateException(s"Attempt to append a timestamp ($timestamp) to slot ${_entries} no larger" + + s" than the last timestamp appended (${lastEntry.timestamp}) to ${file.getAbsolutePath}.") // We only append to the time index when the timestamp is greater than the last inserted timestamp. // If all the messages are in message format v0, the timestamp will always be NoTimestamp. In that case, the time // index will be empty. if (timestamp > lastEntry.timestamp) { - debug("Adding index entry %d => %d to %s.".format(timestamp, offset, file.getName)) + trace(s"Adding index entry $timestamp => $offset to ${file.getAbsolutePath}.") mmap.putLong(timestamp) mmap.putInt(relativeOffset(offset)) _entries += 1 @@ -200,6 +204,7 @@ class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable: _entries = entries mmap.position(_entries * entrySize) _lastEntry = lastEntryFromIndexFile + debug(s"Truncated index ${file.getAbsolutePath} to $entries entries; position is now ${mmap.position()} and last entry is now ${_lastEntry}") } } diff --git a/core/src/main/scala/kafka/log/TransactionIndex.scala b/core/src/main/scala/kafka/log/TransactionIndex.scala index da7fce8..e730fdb 100644 --- a/core/src/main/scala/kafka/log/TransactionIndex.scala +++ b/core/src/main/scala/kafka/log/TransactionIndex.scala @@ -53,7 +53,8 @@ class TransactionIndex(val startOffset: Long, @volatile var file: File) extends def append(abortedTxn: AbortedTxn): Unit = { lastOffset.foreach { offset => if (offset >= abortedTxn.lastOffset) - throw new IllegalArgumentException("The last offset of appended transactions must increase sequentially") + throw new IllegalArgumentException(s"The last offset of appended transactions must increase sequentially, but " + + s"${abortedTxn.lastOffset} is not greater than current last offset $offset of index ${file.getAbsolutePath}") } lastOffset = Some(abortedTxn.lastOffset) Utils.writeFully(channel, abortedTxn.buffer.duplicate()) @@ -138,8 +139,8 @@ class TransactionIndex(val startOffset: Long, @volatile var file: File) extends val abortedTxn = new AbortedTxn(buffer) if (abortedTxn.version > AbortedTxn.CurrentVersion) - throw new KafkaException(s"Unexpected aborted transaction version ${abortedTxn.version}, " + - s"current version is ${AbortedTxn.CurrentVersion}") + throw new KafkaException(s"Unexpected aborted transaction version ${abortedTxn.version} " + + s"in transaction index ${file.getAbsolutePath}, current version is ${AbortedTxn.CurrentVersion}") val nextEntry = (abortedTxn, position) position += AbortedTxn.TotalSize nextEntry @@ -147,7 +148,7 @@ class TransactionIndex(val startOffset: Long, @volatile var file: File) extends case e: IOException => // We received an unexpected error reading from the index file. We propagate this as an // UNKNOWN error to the consumer, which will cause it to retry the fetch. - throw new KafkaException(s"Failed to read from the transaction index $file", e) + throw new KafkaException(s"Failed to read from the transaction index ${file.getAbsolutePath}", e) } } } @@ -187,8 +188,8 @@ class TransactionIndex(val startOffset: Long, @volatile var file: File) extends val buffer = ByteBuffer.allocate(AbortedTxn.TotalSize) for ((abortedTxn, _) <- iterator(() => buffer)) { if (abortedTxn.lastOffset < startOffset) - throw new CorruptIndexException(s"Last offset of aborted transaction $abortedTxn is less than start offset " + - s"$startOffset") + throw new CorruptIndexException(s"Last offset of aborted transaction $abortedTxn in index " + + s"${file.getAbsolutePath} is less than start offset $startOffset") } }