This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 70ddd8a  MINOR: Improve logging around index files (#6385)
70ddd8a is described below

commit 70ddd8af71938b4f5f6d1bb3df6243ef13359bcf
Author: Bob Barrett <bob.barr...@outlook.com>
AuthorDate: Tue Mar 19 02:00:01 2019 -0400

    MINOR: Improve logging around index files (#6385)
    
    This patch adds additional DEBUG statements in AbstractIndex.scala, 
OffsetIndex.scala, and TimeIndex.scala. It also changes the logging on append 
from DEBUG to TRACE to make DEBUG logging less disruptive, and it ensures that 
exceptions raised from index classes include file/offset information.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 core/src/main/scala/kafka/log/AbstractIndex.scala    |  3 +++
 core/src/main/scala/kafka/log/OffsetIndex.scala      | 15 +++++++++------
 core/src/main/scala/kafka/log/TimeIndex.scala        | 17 +++++++++++------
 core/src/main/scala/kafka/log/TransactionIndex.scala | 13 +++++++------
 4 files changed, 30 insertions(+), 18 deletions(-)

diff --git a/core/src/main/scala/kafka/log/AbstractIndex.scala 
b/core/src/main/scala/kafka/log/AbstractIndex.scala
index c69e783..05237c4 100644
--- a/core/src/main/scala/kafka/log/AbstractIndex.scala
+++ b/core/src/main/scala/kafka/log/AbstractIndex.scala
@@ -175,6 +175,7 @@ abstract class AbstractIndex[K, V](@volatile var file: 
File, val baseOffset: Lon
       val roundedNewSize = roundDownToExactMultiple(newSize, entrySize)
 
       if (_length == roundedNewSize) {
+        debug(s"Index ${file.getAbsolutePath} was not resized because it 
already has size $roundedNewSize")
         false
       } else {
         val raf = new RandomAccessFile(file, "rw")
@@ -189,6 +190,8 @@ abstract class AbstractIndex[K, V](@volatile var file: 
File, val baseOffset: Lon
           mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, 
roundedNewSize)
           _maxEntries = mmap.limit() / entrySize
           mmap.position(position)
+          debug(s"Resized ${file.getAbsolutePath} to $roundedNewSize, position 
is ${mmap.position()} " +
+            s"and limit is ${mmap.limit()}")
           true
         } finally {
           CoreUtils.swallow(raf.close(), AbstractIndex)
diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala 
b/core/src/main/scala/kafka/log/OffsetIndex.scala
index 876895c..d043d0a 100755
--- a/core/src/main/scala/kafka/log/OffsetIndex.scala
+++ b/core/src/main/scala/kafka/log/OffsetIndex.scala
@@ -59,8 +59,8 @@ class OffsetIndex(_file: File, baseOffset: Long, 
maxIndexSize: Int = -1, writabl
   /* the last offset in the index */
   private[this] var _lastOffset = lastEntry.offset
 
-  debug("Loaded index file %s with maxEntries = %d, maxIndexSize = %d, entries 
= %d, lastOffset = %d, file position = %d"
-    .format(file.getAbsolutePath, maxEntries, maxIndexSize, _entries, 
_lastOffset, mmap.position()))
+  debug(s"Loaded index file ${file.getAbsolutePath} with maxEntries = 
$maxEntries, " +
+    s"maxIndexSize = $maxIndexSize, entries = ${_entries}, lastOffset = 
${_lastOffset}, file position = ${mmap.position()}")
 
   /**
    * The last entry in the index
@@ -128,7 +128,8 @@ class OffsetIndex(_file: File, baseOffset: Long, 
maxIndexSize: Int = -1, writabl
   def entry(n: Int): OffsetPosition = {
     maybeLock(lock) {
       if (n >= _entries)
-        throw new IllegalArgumentException(s"Attempt to fetch the ${n}th entry 
from an index of size ${_entries}.")
+        throw new IllegalArgumentException(s"Attempt to fetch the ${n}th entry 
from index ${file.getAbsolutePath}, " +
+          s"which has size ${_entries}.")
       parseEntry(mmap, n)
     }
   }
@@ -141,15 +142,15 @@ class OffsetIndex(_file: File, baseOffset: Long, 
maxIndexSize: Int = -1, writabl
     inLock(lock) {
       require(!isFull, "Attempt to append to a full index (size = " + _entries 
+ ").")
       if (_entries == 0 || offset > _lastOffset) {
-        debug("Adding index entry %d => %d to %s.".format(offset, position, 
file.getName))
+        trace(s"Adding index entry $offset => $position to 
${file.getAbsolutePath}")
         mmap.putInt(relativeOffset(offset))
         mmap.putInt(position)
         _entries += 1
         _lastOffset = offset
         require(_entries * entrySize == mmap.position(), entries + " entries 
but file position in index is " + mmap.position() + ".")
       } else {
-        throw new InvalidOffsetException("Attempt to append an offset (%d) to 
position %d no larger than the last offset appended (%d) to %s."
-          .format(offset, entries, _lastOffset, file.getAbsolutePath))
+        throw new InvalidOffsetException(s"Attempt to append an offset 
($offset) to position $entries no larger than" +
+          s" the last offset appended (${_lastOffset}) to 
${file.getAbsolutePath}.")
       }
     }
   }
@@ -185,6 +186,8 @@ class OffsetIndex(_file: File, baseOffset: Long, 
maxIndexSize: Int = -1, writabl
       _entries = entries
       mmap.position(_entries * entrySize)
       _lastOffset = lastEntry.offset
+      debug(s"Truncated index ${file.getAbsolutePath} to $entries entries;" +
+        s" position is now ${mmap.position()} and last offset is now 
${_lastOffset}")
     }
   }
 
diff --git a/core/src/main/scala/kafka/log/TimeIndex.scala 
b/core/src/main/scala/kafka/log/TimeIndex.scala
index f5c4d90..f9bc929 100644
--- a/core/src/main/scala/kafka/log/TimeIndex.scala
+++ b/core/src/main/scala/kafka/log/TimeIndex.scala
@@ -58,6 +58,9 @@ class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: 
Int = -1, writable:
 
   override def entrySize = 12
 
+  debug(s"Loaded index file ${file.getAbsolutePath} with maxEntries = 
$maxEntries, maxIndexSize = $maxIndexSize," +
+    s" entries = ${_entries}, lastOffset = ${_lastEntry}, file position = 
${mmap.position()}")
+
   // We override the full check to reserve the last time index entry slot for 
the on roll call.
   override def isFull: Boolean = entries >= maxEntries - 1
 
@@ -87,7 +90,8 @@ class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: 
Int = -1, writable:
   def entry(n: Int): TimestampOffset = {
     maybeLock(lock) {
       if(n >= _entries)
-        throw new IllegalArgumentException("Attempt to fetch the %dth entry 
from a time index of size %d.".format(n, _entries))
+        throw new IllegalArgumentException(s"Attempt to fetch the ${n}th entry 
from  time index ${file.getAbsolutePath} " +
+          s"which has size ${_entries}.")
       parseEntry(mmap, n)
     }
   }
@@ -117,16 +121,16 @@ class TimeIndex(_file: File, baseOffset: Long, 
maxIndexSize: Int = -1, writable:
       // 1. A log segment is closed.
       // 2. LogSegment.onBecomeInactiveSegment() is called when an active log 
segment is rolled.
       if (_entries != 0 && offset < lastEntry.offset)
-        throw new InvalidOffsetException("Attempt to append an offset (%d) to 
slot %d no larger than the last offset appended (%d) to %s."
-          .format(offset, _entries, lastEntry.offset, file.getAbsolutePath))
+        throw new InvalidOffsetException(s"Attempt to append an offset 
($offset) to slot ${_entries} no larger than" +
+          s" the last offset appended (${lastEntry.offset}) to 
${file.getAbsolutePath}.")
       if (_entries != 0 && timestamp < lastEntry.timestamp)
-        throw new IllegalStateException("Attempt to append a timestamp (%d) to 
slot %d no larger than the last timestamp appended (%d) to %s."
-            .format(timestamp, _entries, lastEntry.timestamp, 
file.getAbsolutePath))
+        throw new IllegalStateException(s"Attempt to append a timestamp 
($timestamp) to slot ${_entries} no larger" +
+          s" than the last timestamp appended (${lastEntry.timestamp}) to 
${file.getAbsolutePath}.")
       // We only append to the time index when the timestamp is greater than 
the last inserted timestamp.
       // If all the messages are in message format v0, the timestamp will 
always be NoTimestamp. In that case, the time
       // index will be empty.
       if (timestamp > lastEntry.timestamp) {
-        debug("Adding index entry %d => %d to %s.".format(timestamp, offset, 
file.getName))
+        trace(s"Adding index entry $timestamp => $offset to 
${file.getAbsolutePath}.")
         mmap.putLong(timestamp)
         mmap.putInt(relativeOffset(offset))
         _entries += 1
@@ -200,6 +204,7 @@ class TimeIndex(_file: File, baseOffset: Long, 
maxIndexSize: Int = -1, writable:
       _entries = entries
       mmap.position(_entries * entrySize)
       _lastEntry = lastEntryFromIndexFile
+      debug(s"Truncated index ${file.getAbsolutePath} to $entries entries; 
position is now ${mmap.position()} and last entry is now ${_lastEntry}")
     }
   }
 
diff --git a/core/src/main/scala/kafka/log/TransactionIndex.scala 
b/core/src/main/scala/kafka/log/TransactionIndex.scala
index da7fce8..e730fdb 100644
--- a/core/src/main/scala/kafka/log/TransactionIndex.scala
+++ b/core/src/main/scala/kafka/log/TransactionIndex.scala
@@ -53,7 +53,8 @@ class TransactionIndex(val startOffset: Long, @volatile var 
file: File) extends
   def append(abortedTxn: AbortedTxn): Unit = {
     lastOffset.foreach { offset =>
       if (offset >= abortedTxn.lastOffset)
-        throw new IllegalArgumentException("The last offset of appended 
transactions must increase sequentially")
+        throw new IllegalArgumentException(s"The last offset of appended 
transactions must increase sequentially, but " +
+          s"${abortedTxn.lastOffset} is not greater than current last offset 
$offset of index ${file.getAbsolutePath}")
     }
     lastOffset = Some(abortedTxn.lastOffset)
     Utils.writeFully(channel, abortedTxn.buffer.duplicate())
@@ -138,8 +139,8 @@ class TransactionIndex(val startOffset: Long, @volatile var 
file: File) extends
 
               val abortedTxn = new AbortedTxn(buffer)
               if (abortedTxn.version > AbortedTxn.CurrentVersion)
-                throw new KafkaException(s"Unexpected aborted transaction 
version ${abortedTxn.version}, " +
-                  s"current version is ${AbortedTxn.CurrentVersion}")
+                throw new KafkaException(s"Unexpected aborted transaction 
version ${abortedTxn.version} " +
+                  s"in transaction index ${file.getAbsolutePath}, current 
version is ${AbortedTxn.CurrentVersion}")
               val nextEntry = (abortedTxn, position)
               position += AbortedTxn.TotalSize
               nextEntry
@@ -147,7 +148,7 @@ class TransactionIndex(val startOffset: Long, @volatile var 
file: File) extends
               case e: IOException =>
                 // We received an unexpected error reading from the index 
file. We propagate this as an
                 // UNKNOWN error to the consumer, which will cause it to retry 
the fetch.
-                throw new KafkaException(s"Failed to read from the transaction 
index $file", e)
+                throw new KafkaException(s"Failed to read from the transaction 
index ${file.getAbsolutePath}", e)
             }
           }
         }
@@ -187,8 +188,8 @@ class TransactionIndex(val startOffset: Long, @volatile var 
file: File) extends
     val buffer = ByteBuffer.allocate(AbortedTxn.TotalSize)
     for ((abortedTxn, _) <- iterator(() => buffer)) {
       if (abortedTxn.lastOffset < startOffset)
-        throw new CorruptIndexException(s"Last offset of aborted transaction 
$abortedTxn is less than start offset " +
-          s"$startOffset")
+        throw new CorruptIndexException(s"Last offset of aborted transaction 
$abortedTxn in index " +
+          s"${file.getAbsolutePath} is less than start offset $startOffset")
     }
   }
 

Reply via email to