This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 807866ca6d563892579bb93efd3e64061824c4fe
Author: Masahiro Mori <[email protected]>
AuthorDate: Wed Jul 9 01:15:32 2025 +0900

    KAFKA-19390: Call safeForceUnmap() in AbstractIndex.resize() on Linux to 
prevent stale mmap of index files (#19961)
    
    https://issues.apache.org/jira/browse/KAFKA-19390
    
    The AbstractIndex.resize() method does not release the old memory map
    for both index and time index files.  In some cases, Mixed GC may not
    run for a long time, which can cause the broker to crash when the
    vm.max_map_count limit is reached.
    
    The root cause is that safeForceUnmap() is not being called on Linux
    within resize(), so we have changed the code to unmap old mmap on all
    operating systems.
    
    The same problem was reported in
    [KAFKA-7442](https://issues.apache.org/jira/browse/KAFKA-7442), but the
    PR submitted at that time did not acquire all necessary locks around the
    mmap accesses and was closed without fixing the issue.
    
    Reviewers: Jun Rao <[email protected]>
---
 .../org/apache/kafka/server/util/LockUtils.java    |  77 ++++++++++++
 .../kafka/storage/internals/log/AbstractIndex.java | 134 +++++++++++----------
 .../kafka/storage/internals/log/OffsetIndex.java   |  38 ++----
 .../kafka/storage/internals/log/TimeIndex.java     |  49 +++-----
 .../storage/internals/log/AbstractIndexTest.java   |  89 ++++++++++++++
 5 files changed, 268 insertions(+), 119 deletions(-)

diff --git 
a/server-common/src/main/java/org/apache/kafka/server/util/LockUtils.java 
b/server-common/src/main/java/org/apache/kafka/server/util/LockUtils.java
index dd465977bd0..568d109daf3 100644
--- a/server-common/src/main/java/org/apache/kafka/server/util/LockUtils.java
+++ b/server-common/src/main/java/org/apache/kafka/server/util/LockUtils.java
@@ -26,6 +26,14 @@ import java.util.function.Supplier;
  * such as acquiring and releasing locks in a safe manner.
  */
 public class LockUtils {
+    @FunctionalInterface
+    public interface ThrowingSupplier<T, E extends Exception> {
+        T get() throws E;
+    }
+    @FunctionalInterface
+    public interface ThrowingRunnable<E extends Exception> {
+        void run() throws E;
+    }
 
     /**
      * Executes the given {@link Supplier} within the context of the specified 
{@link Lock}.
@@ -49,4 +57,73 @@ public class LockUtils {
             lock.unlock();
         }
     }
+
+    /**
+     * Executes the given {@link Runnable} within the context of the specified 
{@link Lock}.
+     * The lock is acquired before executing the runnable and released after 
the execution,
+     * ensuring that the lock is always released, even if an exception is 
thrown.
+     *
+     * @param lock     the lock to be acquired and released
+     * @param runnable the runnable to be executed within the lock context
+     * @throws NullPointerException if either {@code lock} or {@code runnable} 
is null
+     */
+    public static void inLock(Lock lock, Runnable runnable) {
+        Objects.requireNonNull(lock, "Lock must not be null");
+        Objects.requireNonNull(runnable, "Runnable must not be null");
+
+        lock.lock();
+        try {
+            runnable.run();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Executes the given {@link ThrowingSupplier} within the context of the 
specified {@link Lock}.
+     * The lock is acquired before executing the supplier and released after 
the execution,
+     * ensuring that the lock is always released, even if an exception is 
thrown.
+     *
+     * @param <T>      the type of the result returned by the supplier
+     * @param <E>      the type of exception that may be thrown by the supplier
+     * @param lock     the lock to be acquired and released
+     * @param supplier the supplier to be executed within the lock context
+     * @return the result of the supplier
+     * @throws E if an exception occurs during the execution of the supplier
+     * @throws NullPointerException if either {@code lock} or {@code supplier} 
is null
+     */
+    public static <T, E extends Exception> T inLockThrows(Lock lock, 
ThrowingSupplier<T, E> supplier) throws E {
+        Objects.requireNonNull(lock, "Lock must not be null");
+        Objects.requireNonNull(supplier, "Supplier must not be null");
+
+        lock.lock();
+        try {
+            return supplier.get();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Executes the given {@link ThrowingRunnable} within the context of the 
specified {@link Lock}.
+     * The lock is acquired before executing the runnable and released after 
the execution,
+     * ensuring that the lock is always released, even if an exception is 
thrown.
+     *
+     * @param <E>      the type of exception that may be thrown by the runnable
+     * @param lock     the lock to be acquired and released
+     * @param runnable the runnable to be executed within the lock context
+     * @throws E if an exception occurs during the execution of the runnable
+     * @throws NullPointerException if either {@code lock} or {@code runnable} 
is null
+     */
+    public static <E extends Exception> void inLockThrows(Lock lock, 
ThrowingRunnable<E> runnable) throws E {
+        Objects.requireNonNull(lock, "Lock must not be null");
+        Objects.requireNonNull(runnable, "Runnable must not be null");
+
+        lock.lock();
+        try {
+            runnable.run();
+        } finally {
+            lock.unlock();
+        }
+    }
 }
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/AbstractIndex.java
 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/AbstractIndex.java
index f2e7c9830bd..eddd4ed8070 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/AbstractIndex.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/AbstractIndex.java
@@ -17,8 +17,8 @@
 package org.apache.kafka.storage.internals.log;
 
 import org.apache.kafka.common.utils.ByteBufferUnmapper;
-import org.apache.kafka.common.utils.OperatingSystem;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.LockUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -33,8 +33,9 @@ import java.nio.channels.FileChannel;
 import java.nio.file.Files;
 import java.util.Objects;
 import java.util.OptionalInt;
-import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Supplier;
 
 /**
  * The abstract index class which holds entry format agnostic methods.
@@ -47,7 +48,10 @@ public abstract class AbstractIndex implements Closeable {
 
     private static final Logger log = 
LoggerFactory.getLogger(AbstractIndex.class);
 
-    protected final ReentrantLock lock = new ReentrantLock();
+    // Serializes all index operations that mutate internal state
+    private final ReentrantLock lock = new ReentrantLock();
+    // Allows concurrent read operations while ensuring exclusive access if 
the underlying mmap is changed
+    private final ReentrantReadWriteLock remapLock = new 
ReentrantReadWriteLock();
 
     private final long baseOffset;
     private final int maxIndexSize;
@@ -187,36 +191,32 @@ public abstract class AbstractIndex implements Closeable {
      * @return a boolean indicating whether the size of the memory map and the 
underneath file is changed or not.
      */
     public boolean resize(int newSize) throws IOException {
-        lock.lock();
-        try {
-            int roundedNewSize = roundDownToExactMultiple(newSize, 
entrySize());
-
-            if (length == roundedNewSize) {
-                log.debug("Index {} was not resized because it already has 
size {}", file.getAbsolutePath(), roundedNewSize);
-                return false;
-            } else {
-                RandomAccessFile raf = new RandomAccessFile(file, "rw");
-                try {
-                    int position = mmap.position();
-
-                    /* Windows or z/OS won't let us modify the file length 
while the file is mmapped :-( */
-                    if (OperatingSystem.IS_WINDOWS || OperatingSystem.IS_ZOS)
-                        safeForceUnmap();
-                    raf.setLength(roundedNewSize);
-                    this.length = roundedNewSize;
-                    mmap = 
raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize);
-                    this.maxEntries = mmap.limit() / entrySize();
-                    mmap.position(position);
-                    log.debug("Resized {} to {}, position is {} and limit is 
{}", file.getAbsolutePath(), roundedNewSize,
-                            mmap.position(), mmap.limit());
-                    return true;
-                } finally {
-                    Utils.closeQuietly(raf, "index file " + file.getName());
-                }
-            }
-        } finally {
-            lock.unlock();
-        }
+        return inLockThrows(() ->
+                inRemapWriteLockThrows(() -> {
+                    int roundedNewSize = roundDownToExactMultiple(newSize, 
entrySize());
+
+                    if (length == roundedNewSize) {
+                        log.debug("Index {} was not resized because it already 
has size {}", file.getAbsolutePath(), roundedNewSize);
+                        return false;
+                    } else {
+                        RandomAccessFile raf = new RandomAccessFile(file, 
"rw");
+                        try {
+                            int position = mmap.position();
+
+                            safeForceUnmap();
+                            raf.setLength(roundedNewSize);
+                            this.length = roundedNewSize;
+                            mmap = 
raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize);
+                            this.maxEntries = mmap.limit() / entrySize();
+                            mmap.position(position);
+                            log.debug("Resized {} to {}, position is {} and 
limit is {}", file.getAbsolutePath(), roundedNewSize,
+                                    mmap.position(), mmap.limit());
+                            return true;
+                        } finally {
+                            Utils.closeQuietly(raf, "index file " + 
file.getName());
+                        }
+                    }
+                }));
     }
 
     /**
@@ -236,12 +236,9 @@ public abstract class AbstractIndex implements Closeable {
      * Flush the data in the index to disk
      */
     public void flush() {
-        lock.lock();
-        try {
+        inLock(() -> {
             mmap.force();
-        } finally {
-            lock.unlock();
-        }
+        });
     }
 
     /**
@@ -261,14 +258,11 @@ public abstract class AbstractIndex implements Closeable {
      * the file.
      */
     public void trimToValidSize() throws IOException {
-        lock.lock();
-        try {
+        inLockThrows(() -> {
             if (mmap != null) {
                 resize(entrySize() * entries);
             }
-        } finally {
-            lock.unlock();
-        }
+        });
     }
 
     /**
@@ -288,12 +282,10 @@ public abstract class AbstractIndex implements Closeable {
         // However, in some cases it can pause application threads(STW) for a 
long moment reading metadata from a physical disk.
         // To prevent this, we forcefully cleanup memory mapping within proper 
execution which never affects API responsiveness.
         // See https://issues.apache.org/jira/browse/KAFKA-4614 for the 
details.
-        lock.lock();
-        try {
-            safeForceUnmap();
-        } finally {
-            lock.unlock();
-        }
+        inLockThrows(() ->
+                inRemapWriteLockThrows(() -> {
+                    safeForceUnmap();
+                }));
     }
 
     /**
@@ -420,20 +412,36 @@ public abstract class AbstractIndex implements Closeable {
         mmap.position(entries * entrySize());
     }
 
-    /**
-     * Execute the given function in a lock only if we are running on windows 
or z/OS. We do this
-     * because Windows or z/OS won't let us resize a file while it is mmapped. 
As a result we have to force unmap it
-     * and this requires synchronizing reads.
-     */
-    protected final <T, E extends Exception> T maybeLock(Lock lock, 
StorageAction<T, E> action) throws E {
-        if (OperatingSystem.IS_WINDOWS || OperatingSystem.IS_ZOS)
-            lock.lock();
-        try {
-            return action.execute();
-        } finally {
-            if (OperatingSystem.IS_WINDOWS || OperatingSystem.IS_ZOS)
-                lock.unlock();
-        }
+    protected final <T> T inLock(Supplier<T> action) {
+        return LockUtils.inLock(lock, action);
+    }
+
+    protected final void inLock(Runnable action) {
+        LockUtils.inLock(lock, action);
+    }
+
+    protected final <T, E extends Exception> T 
inLockThrows(LockUtils.ThrowingSupplier<T, E> action) throws E {
+        return LockUtils.inLockThrows(lock, action);
+    }
+
+    protected final <E extends Exception> void 
inLockThrows(LockUtils.ThrowingRunnable<E> action) throws E {
+        LockUtils.inLockThrows(lock, action);
+    }
+
+    protected final <T> T inRemapReadLock(Supplier<T> action) {
+        return LockUtils.inLock(remapLock.readLock(), action);
+    }
+
+    protected final void inRemapReadLock(Runnable action) {
+        LockUtils.inLock(remapLock.readLock(), action);
+    }
+
+    protected final <T, E extends Exception> T 
inRemapWriteLockThrows(LockUtils.ThrowingSupplier<T, E> action) throws E {
+        return LockUtils.inLockThrows(remapLock.writeLock(), action);
+    }
+
+    protected final <E extends Exception> void 
inRemapWriteLockThrows(LockUtils.ThrowingRunnable<E> action) throws E {
+        LockUtils.inLockThrows(remapLock.writeLock(), action);
     }
 
     /**
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/OffsetIndex.java 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/OffsetIndex.java
index fe872a5cd7f..7d20edf37d3 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/OffsetIndex.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/OffsetIndex.java
@@ -56,7 +56,7 @@ public final class OffsetIndex extends AbstractIndex {
     private static final int ENTRY_SIZE = 8;
 
     /* the last offset in the index */
-    private long lastOffset;
+    private volatile long lastOffset;
 
     public OffsetIndex(File file, long baseOffset) throws IOException {
         this(file, baseOffset, -1);
@@ -95,7 +95,7 @@ public final class OffsetIndex extends AbstractIndex {
      *         the pair (baseOffset, 0) is returned.
      */
     public OffsetPosition lookup(long targetOffset) {
-        return maybeLock(lock, () -> {
+        return inRemapReadLock(() -> {
             ByteBuffer idx = mmap().duplicate();
             int slot = largestLowerBoundSlotFor(idx, targetOffset, 
IndexSearchType.KEY);
             if (slot == -1)
@@ -111,7 +111,7 @@ public final class OffsetIndex extends AbstractIndex {
      * @return The offset/position pair at that entry
      */
     public OffsetPosition entry(int n) {
-        return maybeLock(lock, () -> {
+        return inRemapReadLock(() -> {
             if (n >= entries())
                 throw new IllegalArgumentException("Attempt to fetch the " + n 
+ "th entry from index " +
                     file().getAbsolutePath() + ", which has size " + 
entries());
@@ -125,7 +125,7 @@ public final class OffsetIndex extends AbstractIndex {
      * such offset.
      */
     public Optional<OffsetPosition> fetchUpperBoundOffset(OffsetPosition 
fetchOffset, int fetchSize) {
-        return maybeLock(lock, () -> {
+        return inRemapReadLock(() -> {
             ByteBuffer idx = mmap().duplicate();
             int slot = smallestUpperBoundSlotFor(idx, fetchOffset.position + 
fetchSize, IndexSearchType.VALUE);
             if (slot == -1)
@@ -141,8 +141,7 @@ public final class OffsetIndex extends AbstractIndex {
      * @throws InvalidOffsetException if provided offset is not larger than 
the last offset
      */
     public void append(long offset, int position) {
-        lock.lock();
-        try {
+        inLock(() -> {
             if (isFull())
                 throw new IllegalArgumentException("Attempt to append to a 
full index (size = " + entries() + ").");
 
@@ -157,15 +156,12 @@ public final class OffsetIndex extends AbstractIndex {
             } else
                 throw new InvalidOffsetException("Attempt to append an offset 
" + offset + " to position " + entries() +
                     " no larger than the last offset appended (" + lastOffset 
+ ") to " + file().getAbsolutePath());
-        } finally {
-            lock.unlock();
-        }
+        });
     }
 
     @Override
     public void truncateTo(long offset) {
-        lock.lock();
-        try {
+        inLock(() -> {
             ByteBuffer idx = mmap().duplicate();
             int slot = largestLowerBoundSlotFor(idx, offset, 
IndexSearchType.KEY);
 
@@ -182,9 +178,7 @@ public final class OffsetIndex extends AbstractIndex {
             else
                 newEntries = slot + 1;
             truncateToEntries(newEntries);
-        } finally {
-            lock.unlock();
-        }
+        });
     }
 
     public long lastOffset() {
@@ -218,30 +212,24 @@ public final class OffsetIndex extends AbstractIndex {
      * Truncates index to a known number of entries.
      */
     private void truncateToEntries(int entries) {
-        lock.lock();
-        try {
+        inLock(() -> {
             super.truncateToEntries0(entries);
             this.lastOffset = lastEntry().offset;
             log.debug("Truncated index {} to {} entries; position is now {} 
and last offset is now {}",
-                    file().getAbsolutePath(), entries, mmap().position(), 
lastOffset);
-        } finally {
-            lock.unlock();
-        }
+                file().getAbsolutePath(), entries, mmap().position(), 
lastOffset);
+        });
     }
 
     /**
      * The last entry in the index
      */
     private OffsetPosition lastEntry() {
-        lock.lock();
-        try {
+        return inRemapReadLock(() -> {
             int entries = entries();
             if (entries == 0)
                 return new OffsetPosition(baseOffset(), 0);
             else
                 return parseEntry(mmap(), entries - 1);
-        } finally {
-            lock.unlock();
-        }
+        });
     }
 }
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/TimeIndex.java 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/TimeIndex.java
index 3c3fa887fc6..e6f50da24ee 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/TimeIndex.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/TimeIndex.java
@@ -76,10 +76,12 @@ public class TimeIndex extends AbstractIndex {
         TimestampOffset entry = lastEntry();
         long lastTimestamp = entry.timestamp;
         long lastOffset = entry.offset;
-        if (entries() != 0 && lastTimestamp < timestamp(mmap(), 0))
-            throw new CorruptIndexException("Corrupt time index found, time 
index file (" + file().getAbsolutePath() + ") has "
-                + "non-zero size but the last timestamp is " + lastTimestamp + 
" which is less than the first timestamp "
-                + timestamp(mmap(), 0));
+        inRemapReadLock(() -> {
+            if (entries() != 0 && lastTimestamp < timestamp(mmap(), 0))
+                throw new CorruptIndexException("Corrupt time index found, 
time index file (" + file().getAbsolutePath() + ") has "
+                    + "non-zero size but the last timestamp is " + 
lastTimestamp + " which is less than the first timestamp "
+                    + timestamp(mmap(), 0));
+        });
         if (entries() != 0 && lastOffset < baseOffset())
             throw new CorruptIndexException("Corrupt time index found, time 
index file (" + file().getAbsolutePath() + ") has "
                 + "non-zero size but the last offset is " + lastOffset + " 
which is less than the first offset " + baseOffset());
@@ -94,8 +96,7 @@ public class TimeIndex extends AbstractIndex {
      */
     @Override
     public void truncateTo(long offset) {
-        lock.lock();
-        try {
+        inLock(() -> {
             ByteBuffer idx = mmap().duplicate();
             int slot = largestLowerBoundSlotFor(idx, offset, 
IndexSearchType.VALUE);
 
@@ -113,9 +114,7 @@ public class TimeIndex extends AbstractIndex {
                 newEntries = slot + 1;
 
             truncateToEntries(newEntries);
-        } finally {
-            lock.unlock();
-        }
+        });
     }
 
     // We override the full check to reserve the last time index entry slot 
for the on roll call.
@@ -134,7 +133,7 @@ public class TimeIndex extends AbstractIndex {
      * @return The timestamp/offset pair at that entry
      */
     public TimestampOffset entry(int n) {
-        return maybeLock(lock, () -> {
+        return inRemapReadLock(() -> {
             if (n >= entries())
                 throw new IllegalArgumentException("Attempt to fetch the " + n 
+ "th entry from time index "
                     + file().getAbsolutePath() + " which has size " + 
entries());
@@ -151,7 +150,7 @@ public class TimeIndex extends AbstractIndex {
      * @return The time index entry found.
      */
     public TimestampOffset lookup(long targetTimestamp) {
-        return maybeLock(lock, () -> {
+        return inRemapReadLock(() -> {
             ByteBuffer idx = mmap().duplicate();
             int slot = largestLowerBoundSlotFor(idx, targetTimestamp, 
IndexSearchType.KEY);
             if (slot == -1)
@@ -181,8 +180,7 @@ public class TimeIndex extends AbstractIndex {
      *                      gets rolled or the segment is closed.
      */
     public void maybeAppend(long timestamp, long offset, boolean 
skipFullCheck) {
-        lock.lock();
-        try {
+        inLock(() -> {
             if (!skipFullCheck && isFull())
                 throw new IllegalArgumentException("Attempt to append to a 
full time index (size = " + entries() + ").");
 
@@ -212,23 +210,18 @@ public class TimeIndex extends AbstractIndex {
                 if (entries() * ENTRY_SIZE != mmap.position())
                     throw new IllegalStateException(entries() + " entries but 
file position in index is " + mmap.position());
             }
-        } finally {
-            lock.unlock();
-        }
+        });
     }
 
     @Override
     public boolean resize(int newSize) throws IOException {
-        lock.lock();
-        try {
+        return inLockThrows(() -> {
             if (super.resize(newSize)) {
                 this.lastEntry = lastEntryFromIndexFile();
                 return true;
             } else
                 return false;
-        } finally {
-            lock.unlock();
-        }
+        });
     }
 
     // Visible for testing, we can make this protected once TimeIndexTest is 
in the same package as this class
@@ -259,30 +252,24 @@ public class TimeIndex extends AbstractIndex {
      * Read the last entry from the index file. This operation involves disk 
access.
      */
     private TimestampOffset lastEntryFromIndexFile() {
-        lock.lock();
-        try {
+        return inRemapReadLock(() -> {
             int entries = entries();
             if (entries == 0)
                 return new TimestampOffset(RecordBatch.NO_TIMESTAMP, 
baseOffset());
             else
                 return parseEntry(mmap(), entries - 1);
-        } finally {
-            lock.unlock();
-        }
+        });
     }
 
     /**
      * Truncates index to a known number of entries.
      */
     private void truncateToEntries(int entries) {
-        lock.lock();
-        try {
+        inLock(() -> {
             super.truncateToEntries0(entries);
             this.lastEntry = lastEntryFromIndexFile();
             log.debug("Truncated index {} to {} entries; position is now {} 
and last entry is now {}",
                 file().getAbsolutePath(), entries, mmap().position(), 
lastEntry.offset);
-        } finally {
-            lock.unlock();
-        }
+        });
     }
 }
diff --git 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/AbstractIndexTest.java
 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/AbstractIndexTest.java
new file mode 100644
index 00000000000..f6b28345b93
--- /dev/null
+++ 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/AbstractIndexTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class AbstractIndexTest {
+    private static class TestIndex extends AbstractIndex {
+        private boolean unmapInvoked = false;
+        private MappedByteBuffer unmappedBuffer = null;
+        public TestIndex(File file, long baseOffset, int maxIndexSize, boolean 
writable) throws IOException {
+            super(file, baseOffset, maxIndexSize, writable);
+        }
+
+        @Override
+        protected int entrySize() {
+            return 1;
+        }
+
+        @Override
+        protected IndexEntry parseEntry(ByteBuffer buffer, int n) {
+            return null;
+        }
+
+        @Override
+        public void sanityCheck() {
+            // unused
+        }
+
+        @Override
+        protected void truncate() {
+            // unused
+        }
+
+        @Override
+        public void truncateTo(long offset) {
+            // unused
+        }
+
+        @Override
+        protected void forceUnmap() throws IOException {
+            unmapInvoked = true;
+            unmappedBuffer = mmap();
+        }
+    }
+
+    @Test
+    public void testResizeInvokeUnmap() throws IOException {
+        File f = new File(TestUtils.tempDirectory(), "test-index");
+        TestIndex idx = new TestIndex(f, 0L, 100, true);
+        MappedByteBuffer oldMmap = idx.mmap();
+        assertNotNull(idx.mmap(), "MappedByteBuffer should not be null");
+        assertFalse(idx.unmapInvoked, "Unmap should not have been invoked 
yet");
+
+        boolean changed = idx.resize(80);
+        assertTrue(changed);
+        assertTrue(idx.unmapInvoked, "Unmap should have been invoked after 
resize");
+        assertSame(oldMmap, idx.unmappedBuffer, "old mmap should be unmapped");
+        assertNotSame(idx.unmappedBuffer, idx.mmap());
+    }
+}

Reply via email to