junrao commented on code in PR #19961:
URL: https://github.com/apache/kafka/pull/19961#discussion_r2164504535


##########
storage/src/main/java/org/apache/kafka/storage/internals/log/OffsetIndex.java:
##########
@@ -95,7 +99,7 @@ public void sanityCheck() {
      *         the pair (baseOffset, 0) is returned.
      */
     public OffsetPosition lookup(long targetOffset) {
-        return maybeLock(lock, () -> {
+        return inLock(lock, () -> {

Review Comment:
   Currently, in Linux, we don't acquire the lock for `lookup()`, which is used 
for fetch requests. Both fetch and produce requests are common. Acquiring the 
lock in the fetch path reduces read/write concurrency. The reader only truly 
needs to lock when the underlying mmap changes by resize(). Since resize() is 
an infrequent event, we could introduce a separate resize lock, which will be 
held by `resize()` and all readers (currently calling `maybeLock()`). This will 
help maintain the current level of concurrency. 



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/TimeIndex.java:
##########
@@ -67,19 +70,23 @@ public TimeIndex(File file, long baseOffset, int 
maxIndexSize, boolean writable)
 
         this.lastEntry = lastEntryFromIndexFile();
 
-        log.debug("Loaded index file {} with maxEntries = {}, maxIndexSize = 
{}, entries = {}, lastOffset = {}, file position = {}",
-            file.getAbsolutePath(), maxEntries(), maxIndexSize, entries(), 
lastEntry.offset, mmap().position());
+        inLock(lock, () -> {

Review Comment:
   Is lock needed here in the constructor? Ditto for TimeIndex.



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/TimeIndex.java:
##########
@@ -67,19 +70,23 @@ public TimeIndex(File file, long baseOffset, int 
maxIndexSize, boolean writable)
 
         this.lastEntry = lastEntryFromIndexFile();
 
-        log.debug("Loaded index file {} with maxEntries = {}, maxIndexSize = 
{}, entries = {}, lastOffset = {}, file position = {}",
-            file.getAbsolutePath(), maxEntries(), maxIndexSize, entries(), 
lastEntry.offset, mmap().position());
+        inLock(lock, () -> {
+            log.debug("Loaded index file {} with maxEntries = {}, maxIndexSize 
= {}, entries = {}, lastOffset = {}, file position = {}",
+                file.getAbsolutePath(), maxEntries(), maxIndexSize, entries(), 
lastEntry.offset, mmap().position());
+        });
     }
 
     @Override
     public void sanityCheck() {
         TimestampOffset entry = lastEntry();
         long lastTimestamp = entry.timestamp;
         long lastOffset = entry.offset;
-        if (entries() != 0 && lastTimestamp < timestamp(mmap(), 0))
-            throw new CorruptIndexException("Corrupt time index found, time 
index file (" + file().getAbsolutePath() + ") has "
-                + "non-zero size but the last timestamp is " + lastTimestamp + 
" which is less than the first timestamp "
-                + timestamp(mmap(), 0));
+        inLock(lock, () -> {

Review Comment:
   Good catch here!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to