junrao commented on code in PR #19961: URL: https://github.com/apache/kafka/pull/19961#discussion_r2164504535
########## storage/src/main/java/org/apache/kafka/storage/internals/log/OffsetIndex.java: ########## @@ -95,7 +99,7 @@ public void sanityCheck() { * the pair (baseOffset, 0) is returned. */ public OffsetPosition lookup(long targetOffset) { - return maybeLock(lock, () -> { + return inLock(lock, () -> { Review Comment: Currently, in Linux, we don't acquire the lock for `lookup()`, which is used for fetch requests. Both fetch and produce requests are common. Acquiring the lock in the fetch path reduces read/write concurrency. The reader only truly needs to lock when the underlying mmap changes by resize(). Since resize() is an infrequent event, we could introduce a separate resize lock, which will be held by `resize()` and all readers (currently calling `maybeLock()`). This will help maintain the current level of concurrency. ########## storage/src/main/java/org/apache/kafka/storage/internals/log/TimeIndex.java: ########## @@ -67,19 +70,23 @@ public TimeIndex(File file, long baseOffset, int maxIndexSize, boolean writable) this.lastEntry = lastEntryFromIndexFile(); - log.debug("Loaded index file {} with maxEntries = {}, maxIndexSize = {}, entries = {}, lastOffset = {}, file position = {}", - file.getAbsolutePath(), maxEntries(), maxIndexSize, entries(), lastEntry.offset, mmap().position()); + inLock(lock, () -> { Review Comment: Is lock needed here in the constructor? Ditto for TimeIndex. ########## storage/src/main/java/org/apache/kafka/storage/internals/log/TimeIndex.java: ########## @@ -67,19 +70,23 @@ public TimeIndex(File file, long baseOffset, int maxIndexSize, boolean writable) this.lastEntry = lastEntryFromIndexFile(); - log.debug("Loaded index file {} with maxEntries = {}, maxIndexSize = {}, entries = {}, lastOffset = {}, file position = {}", - file.getAbsolutePath(), maxEntries(), maxIndexSize, entries(), lastEntry.offset, mmap().position()); + inLock(lock, () -> { + log.debug("Loaded index file {} with maxEntries = {}, maxIndexSize = {}, entries = {}, lastOffset = {}, file position = {}", + file.getAbsolutePath(), maxEntries(), maxIndexSize, entries(), lastEntry.offset, mmap().position()); + }); } @Override public void sanityCheck() { TimestampOffset entry = lastEntry(); long lastTimestamp = entry.timestamp; long lastOffset = entry.offset; - if (entries() != 0 && lastTimestamp < timestamp(mmap(), 0)) - throw new CorruptIndexException("Corrupt time index found, time index file (" + file().getAbsolutePath() + ") has " - + "non-zero size but the last timestamp is " + lastTimestamp + " which is less than the first timestamp " - + timestamp(mmap(), 0)); + inLock(lock, () -> { Review Comment: Good catch here! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org