This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
     new daebb01  MINOR: Improve logging around index files (#6385)
daebb01 is described below

commit daebb01622c6190da00f7a7e37be7d187989ac34
Author: Bob Barrett <bob.barr...@outlook.com>
AuthorDate: Tue Mar 19 02:00:01 2019 -0400

    MINOR: Improve logging around index files (#6385)
    
    This patch adds additional DEBUG statements in AbstractIndex.scala, 
OffsetIndex.scala, and TimeIndex.scala. It also changes the logging on append 
from DEBUG to TRACE to make DEBUG logging less disruptive, and it ensures that 
exceptions raised from index classes include file/offset information.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 core/src/main/scala/kafka/log/AbstractIndex.scala    |  3 +++
 core/src/main/scala/kafka/log/OffsetIndex.scala      | 15 +++++++++------
 core/src/main/scala/kafka/log/TimeIndex.scala        | 17 +++++++++++------
 core/src/main/scala/kafka/log/TransactionIndex.scala | 13 +++++++------
 4 files changed, 30 insertions(+), 18 deletions(-)

diff --git a/core/src/main/scala/kafka/log/AbstractIndex.scala 
b/core/src/main/scala/kafka/log/AbstractIndex.scala
index 44083c1..ec91b4e 100644
--- a/core/src/main/scala/kafka/log/AbstractIndex.scala
+++ b/core/src/main/scala/kafka/log/AbstractIndex.scala
@@ -115,6 +115,7 @@ abstract class AbstractIndex[K, V](@volatile var file: 
File, val baseOffset: Lon
       val roundedNewSize = roundDownToExactMultiple(newSize, entrySize)
 
       if (_length == roundedNewSize) {
+        debug(s"Index ${file.getAbsolutePath} was not resized because it 
already has size $roundedNewSize")
         false
       } else {
         val raf = new RandomAccessFile(file, "rw")
@@ -129,6 +130,8 @@ abstract class AbstractIndex[K, V](@volatile var file: 
File, val baseOffset: Lon
           mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, 
roundedNewSize)
           _maxEntries = mmap.limit() / entrySize
           mmap.position(position)
+          debug(s"Resized ${file.getAbsolutePath} to $roundedNewSize, position 
is ${mmap.position()} " +
+            s"and limit is ${mmap.limit()}")
           true
         } finally {
           CoreUtils.swallow(raf.close(), this)
diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala 
b/core/src/main/scala/kafka/log/OffsetIndex.scala
index 523c88c..400388b 100755
--- a/core/src/main/scala/kafka/log/OffsetIndex.scala
+++ b/core/src/main/scala/kafka/log/OffsetIndex.scala
@@ -57,8 +57,8 @@ class OffsetIndex(_file: File, baseOffset: Long, 
maxIndexSize: Int = -1, writabl
   /* the last offset in the index */
   private[this] var _lastOffset = lastEntry.offset
 
-  debug("Loaded index file %s with maxEntries = %d, maxIndexSize = %d, entries 
= %d, lastOffset = %d, file position = %d"
-    .format(file.getAbsolutePath, maxEntries, maxIndexSize, _entries, 
_lastOffset, mmap.position()))
+  debug(s"Loaded index file ${file.getAbsolutePath} with maxEntries = 
$maxEntries, " +
+    s"maxIndexSize = $maxIndexSize, entries = ${_entries}, lastOffset = 
${_lastOffset}, file position = ${mmap.position()}")
 
   /**
    * The last entry in the index
@@ -126,7 +126,8 @@ class OffsetIndex(_file: File, baseOffset: Long, 
maxIndexSize: Int = -1, writabl
   def entry(n: Int): OffsetPosition = {
     maybeLock(lock) {
       if(n >= _entries)
-        throw new IllegalArgumentException("Attempt to fetch the %dth entry 
from an index of size %d.".format(n, _entries))
+        throw new IllegalArgumentException(s"Attempt to fetch the ${n}th entry 
from index ${file.getAbsolutePath}, " +
+          s"which has size ${_entries}.")
       val idx = mmap.duplicate
       OffsetPosition(relativeOffset(idx, n), physical(idx, n))
     }
@@ -139,15 +140,15 @@ class OffsetIndex(_file: File, baseOffset: Long, 
maxIndexSize: Int = -1, writabl
     inLock(lock) {
       require(!isFull, "Attempt to append to a full index (size = " + _entries 
+ ").")
       if (_entries == 0 || offset > _lastOffset) {
-        debug("Adding index entry %d => %d to %s.".format(offset, position, 
file.getName))
+        trace(s"Adding index entry $offset => $position to 
${file.getAbsolutePath}")
         mmap.putInt((offset - baseOffset).toInt)
         mmap.putInt(position)
         _entries += 1
         _lastOffset = offset
         require(_entries * entrySize == mmap.position(), entries + " entries 
but file position in index is " + mmap.position() + ".")
       } else {
-        throw new InvalidOffsetException("Attempt to append an offset (%d) to 
position %d no larger than the last offset appended (%d) to %s."
-          .format(offset, entries, _lastOffset, file.getAbsolutePath))
+        throw new InvalidOffsetException(s"Attempt to append an offset 
($offset) to position $entries no larger than" +
+          s" the last offset appended (${_lastOffset}) to 
${file.getAbsolutePath}.")
       }
     }
   }
@@ -183,6 +184,8 @@ class OffsetIndex(_file: File, baseOffset: Long, 
maxIndexSize: Int = -1, writabl
       _entries = entries
       mmap.position(_entries * entrySize)
       _lastOffset = lastEntry.offset
+      debug(s"Truncated index ${file.getAbsolutePath} to $entries entries;" +
+        s" position is now ${mmap.position()} and last offset is now 
${_lastOffset}")
     }
   }
 
diff --git a/core/src/main/scala/kafka/log/TimeIndex.scala 
b/core/src/main/scala/kafka/log/TimeIndex.scala
index e505f36..e75e3e5 100644
--- a/core/src/main/scala/kafka/log/TimeIndex.scala
+++ b/core/src/main/scala/kafka/log/TimeIndex.scala
@@ -57,6 +57,9 @@ class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: 
Int = -1, writable:
 
   override def entrySize = 12
 
+  debug(s"Loaded index file ${file.getAbsolutePath} with maxEntries = 
$maxEntries, maxIndexSize = $maxIndexSize," +
+    s" entries = ${_entries}, lastOffset = ${_lastEntry}, file position = 
${mmap.position()}")
+
   // We override the full check to reserve the last time index entry slot for 
the on roll call.
   override def isFull: Boolean = entries >= maxEntries - 1
 
@@ -86,7 +89,8 @@ class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: 
Int = -1, writable:
   def entry(n: Int): TimestampOffset = {
     maybeLock(lock) {
       if(n >= _entries)
-        throw new IllegalArgumentException("Attempt to fetch the %dth entry 
from a time index of size %d.".format(n, _entries))
+        throw new IllegalArgumentException(s"Attempt to fetch the ${n}th entry 
from  time index ${file.getAbsolutePath} " +
+          s"which has size ${_entries}.")
       val idx = mmap.duplicate
       TimestampOffset(timestamp(idx, n), relativeOffset(idx, n))
     }
@@ -117,16 +121,16 @@ class TimeIndex(_file: File, baseOffset: Long, 
maxIndexSize: Int = -1, writable:
       // 1. A log segment is closed.
       // 2. LogSegment.onBecomeInactiveSegment() is called when an active log 
segment is rolled.
       if (_entries != 0 && offset < lastEntry.offset)
-        throw new InvalidOffsetException("Attempt to append an offset (%d) to 
slot %d no larger than the last offset appended (%d) to %s."
-          .format(offset, _entries, lastEntry.offset, file.getAbsolutePath))
+        throw new InvalidOffsetException(s"Attempt to append an offset 
($offset) to slot ${_entries} no larger than" +
+          s" the last offset appended (${lastEntry.offset}) to 
${file.getAbsolutePath}.")
       if (_entries != 0 && timestamp < lastEntry.timestamp)
-        throw new IllegalStateException("Attempt to append a timestamp (%d) to 
slot %d no larger than the last timestamp appended (%d) to %s."
-            .format(timestamp, _entries, lastEntry.timestamp, 
file.getAbsolutePath))
+        throw new IllegalStateException(s"Attempt to append a timestamp 
($timestamp) to slot ${_entries} no larger" +
+          s" than the last timestamp appended (${lastEntry.timestamp}) to 
${file.getAbsolutePath}.")
       // We only append to the time index when the timestamp is greater than 
the last inserted timestamp.
       // If all the messages are in message format v0, the timestamp will 
always be NoTimestamp. In that case, the time
       // index will be empty.
       if (timestamp > lastEntry.timestamp) {
-        debug("Adding index entry %d => %d to %s.".format(timestamp, offset, 
file.getName))
+        trace(s"Adding index entry $timestamp => $offset to 
${file.getAbsolutePath}.")
         mmap.putLong(timestamp)
         mmap.putInt((offset - baseOffset).toInt)
         _entries += 1
@@ -202,6 +206,7 @@ class TimeIndex(_file: File, baseOffset: Long, 
maxIndexSize: Int = -1, writable:
       _entries = entries
       mmap.position(_entries * entrySize)
       _lastEntry = lastEntryFromIndexFile
+      debug(s"Truncated index ${file.getAbsolutePath} to $entries entries; 
position is now ${mmap.position()} and last entry is now ${_lastEntry}")
     }
   }
 
diff --git a/core/src/main/scala/kafka/log/TransactionIndex.scala 
b/core/src/main/scala/kafka/log/TransactionIndex.scala
index 349f0ce..ec59225 100644
--- a/core/src/main/scala/kafka/log/TransactionIndex.scala
+++ b/core/src/main/scala/kafka/log/TransactionIndex.scala
@@ -53,7 +53,8 @@ class TransactionIndex(val startOffset: Long, @volatile var 
file: File) extends
   def append(abortedTxn: AbortedTxn): Unit = {
     lastOffset.foreach { offset =>
       if (offset >= abortedTxn.lastOffset)
-        throw new IllegalArgumentException("The last offset of appended 
transactions must increase sequentially")
+        throw new IllegalArgumentException(s"The last offset of appended 
transactions must increase sequentially, but " +
+          s"${abortedTxn.lastOffset} is not greater than current last offset 
$offset of index ${file.getAbsolutePath}")
     }
     lastOffset = Some(abortedTxn.lastOffset)
     Utils.writeFully(channel, abortedTxn.buffer.duplicate())
@@ -138,8 +139,8 @@ class TransactionIndex(val startOffset: Long, @volatile var 
file: File) extends
 
               val abortedTxn = new AbortedTxn(buffer)
               if (abortedTxn.version > AbortedTxn.CurrentVersion)
-                throw new KafkaException(s"Unexpected aborted transaction 
version ${abortedTxn.version}, " +
-                  s"current version is ${AbortedTxn.CurrentVersion}")
+                throw new KafkaException(s"Unexpected aborted transaction 
version ${abortedTxn.version} " +
+                  s"in transaction index ${file.getAbsolutePath}, current 
version is ${AbortedTxn.CurrentVersion}")
               val nextEntry = (abortedTxn, position)
               position += AbortedTxn.TotalSize
               nextEntry
@@ -147,7 +148,7 @@ class TransactionIndex(val startOffset: Long, @volatile var 
file: File) extends
               case e: IOException =>
                 // We received an unexpected error reading from the index 
file. We propagate this as an
                 // UNKNOWN error to the consumer, which will cause it to retry 
the fetch.
-                throw new KafkaException(s"Failed to read from the transaction 
index $file", e)
+                throw new KafkaException(s"Failed to read from the transaction 
index ${file.getAbsolutePath}", e)
             }
           }
         }
@@ -187,8 +188,8 @@ class TransactionIndex(val startOffset: Long, @volatile var 
file: File) extends
     val buffer = ByteBuffer.allocate(AbortedTxn.TotalSize)
     for ((abortedTxn, _) <- iterator(() => buffer)) {
       if (abortedTxn.lastOffset < startOffset)
-        throw new CorruptIndexException(s"Last offset of aborted transaction 
$abortedTxn is less than start offset " +
-          s"$startOffset")
+        throw new CorruptIndexException(s"Last offset of aborted transaction 
$abortedTxn in index " +
+          s"${file.getAbsolutePath} is less than start offset $startOffset")
     }
   }
 

Reply via email to