junrao commented on code in PR #19961:
URL: https://github.com/apache/kafka/pull/19961#discussion_r2181079929


##########
storage/src/main/java/org/apache/kafka/storage/internals/log/OffsetIndex.java:
##########
@@ -218,30 +212,24 @@ private int physical(ByteBuffer buffer, int n) {
      * Truncates index to a known number of entries.
      */
     private void truncateToEntries(int entries) {
-        lock.lock();
-        try {
+        inLock(() -> {
             super.truncateToEntries0(entries);
             this.lastOffset = lastEntry().offset;
             log.debug("Truncated index {} to {} entries; position is now {} 
and last offset is now {}",
-                    file().getAbsolutePath(), entries, mmap().position(), 
lastOffset);
-        } finally {
-            lock.unlock();
-        }
+                file().getAbsolutePath(), entries, mmap().position(), 
lastOffset);
+        });
     }
 
     /**
      * The last entry in the index
      */
     private OffsetPosition lastEntry() {
-        lock.lock();
-        try {
+        return inLock(() -> {

Review Comment:
   lastEntry() is a reader. So it should only take remap read lock.
   
   Also, there is an existing issue. It seems there is no memory barrier for 
the instance level field lastOffset. So a reader may not see the latest value. 
We need to make it volatile.



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/TimeIndex.java:
##########
@@ -259,30 +252,24 @@ private int relativeOffset(ByteBuffer buffer, int n) {
      * Read the last entry from the index file. This operation involves disk 
access.
      */
     private TimestampOffset lastEntryFromIndexFile() {
-        lock.lock();
-        try {
+        return inLock(() -> {

Review Comment:
   lastEntryFromIndexFile() is a reader. So, it needs the remap read lock.



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/TimeIndex.java:
##########
@@ -76,10 +76,12 @@ public void sanityCheck() {
         TimestampOffset entry = lastEntry();
         long lastTimestamp = entry.timestamp;
         long lastOffset = entry.offset;
-        if (entries() != 0 && lastTimestamp < timestamp(mmap(), 0))
-            throw new CorruptIndexException("Corrupt time index found, time 
index file (" + file().getAbsolutePath() + ") has "
-                + "non-zero size but the last timestamp is " + lastTimestamp + 
" which is less than the first timestamp "
-                + timestamp(mmap(), 0));
+        inLock(() -> {

Review Comment:
   This needs the remap read lock.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to