This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 11a5c67e7 [server] force unmap mmap file on linux when call 
AbstractIndex.resize() (#1382)
11a5c67e7 is described below

commit 11a5c67e70cc0a7732ec42ff091b7ddb7cd17bdd
Author: yunhong <[email protected]>
AuthorDate: Fri Aug 22 16:41:09 2025 +0800

    [server] force unmap mmap file on linux when call AbstractIndex.resize() 
(#1382)
---
 .../alibaba/fluss/server/log/AbstractIndex.java    | 118 ++++++++++-----------
 .../com/alibaba/fluss/server/log/OffsetIndex.java  |  19 ++--
 .../com/alibaba/fluss/server/log/TimeIndex.java    |  37 ++++---
 .../fluss/server/log/AbstractIndexTest.java        |  92 ++++++++++++++++
 4 files changed, 181 insertions(+), 85 deletions(-)

diff --git 
a/fluss-server/src/main/java/com/alibaba/fluss/server/log/AbstractIndex.java 
b/fluss-server/src/main/java/com/alibaba/fluss/server/log/AbstractIndex.java
index ee99e5272..850a66509 100644
--- a/fluss-server/src/main/java/com/alibaba/fluss/server/log/AbstractIndex.java
+++ b/fluss-server/src/main/java/com/alibaba/fluss/server/log/AbstractIndex.java
@@ -21,7 +21,6 @@ import 
com.alibaba.fluss.exception.IndexOffsetOverflowException;
 import com.alibaba.fluss.server.exception.CorruptIndexException;
 import com.alibaba.fluss.utils.FileUtils;
 import com.alibaba.fluss.utils.IOUtils;
-import com.alibaba.fluss.utils.OperatingSystem;
 import com.alibaba.fluss.utils.log.ByteBufferUnmapper;
 
 import org.slf4j.Logger;
@@ -37,10 +36,11 @@ import java.nio.channels.FileChannel;
 import java.nio.file.Files;
 import java.util.Objects;
 import java.util.OptionalInt;
-import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import static com.alibaba.fluss.utils.concurrent.LockUtils.inLock;
+import static com.alibaba.fluss.utils.concurrent.LockUtils.inWriteLock;
 
 /* This file is based on source code of Apache Kafka Project 
(https://kafka.apache.org/), licensed by the Apache
  * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
@@ -50,7 +50,19 @@ import static 
com.alibaba.fluss.utils.concurrent.LockUtils.inLock;
 public abstract class AbstractIndex implements Closeable {
     private static final Logger LOG = 
LoggerFactory.getLogger(AbstractIndex.class);
 
-    protected final Lock lock = new ReentrantLock();
+    // Serializes all index operations that mutate internal state.
+    // Readers do not need to acquire this lock because:
+    //  1) MappedByteBuffer provides direct access to the OS-level buffer 
cache,
+    //     which allows concurrent reads in practice.
+    //  2) Clients only read committed data and are not affected by concurrent 
appends/truncates.
+    //     In the rare case when the data is truncated, the follower could 
read inconsistent data.
+    //     The follower has the logic to ignore the inconsistent data through 
crc and leader epoch.
+    //  3) Read and remap operations are coordinated via remapLock to ensure 
visibility of the
+    //     underlying mmap.
+    protected final ReentrantLock lock = new ReentrantLock();
+    // Allows concurrent read operations while ensuring exclusive access if 
the underlying mmap is
+    // changed
+    protected final ReentrantReadWriteLock remapLock = new 
ReentrantReadWriteLock();
 
     /** The base offset of the segment that this index is corresponding to. */
     private final long baseOffset;
@@ -189,43 +201,48 @@ public abstract class AbstractIndex implements Closeable {
     public boolean resize(int newSize) throws IOException {
         return inLock(
                 lock,
-                () -> {
-                    int roundedNewSize = roundDownToExactMultiple(newSize, 
entrySize());
-
-                    if (length == roundedNewSize) {
-                        LOG.debug(
-                                "Index {} was not resized because it already 
has size {}",
-                                file.getAbsolutePath(),
-                                roundedNewSize);
-                        return false;
-                    } else {
-                        RandomAccessFile raf = new RandomAccessFile(file, 
"rw");
-                        try {
-                            int position = mmap.position();
-
-                            /* Windows won't let us modify the file length 
while the file is mmapped :-( */
-                            if (OperatingSystem.isWindows()) {
-                                safeForceUnmap();
-                            }
-                            raf.setLength(roundedNewSize);
-                            this.length = roundedNewSize;
-                            mmap =
-                                    raf.getChannel()
-                                            
.map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize);
-                            this.maxEntries = mmap.limit() / entrySize();
-                            mmap.position(position);
-                            LOG.debug(
-                                    "Resized {} to {}, position is {} and 
limit is {}",
-                                    file.getAbsolutePath(),
-                                    roundedNewSize,
-                                    mmap.position(),
-                                    mmap.limit());
-                            return true;
-                        } finally {
-                            IOUtils.closeQuietly(raf, "index file " + 
file.getName());
-                        }
-                    }
-                });
+                () ->
+                        inWriteLock(
+                                remapLock,
+                                () -> {
+                                    int roundedNewSize =
+                                            roundDownToExactMultiple(newSize, 
entrySize());
+
+                                    if (length == roundedNewSize) {
+                                        LOG.debug(
+                                                "Index {} was not resized 
because it already has size {}",
+                                                file.getAbsolutePath(),
+                                                roundedNewSize);
+                                        return false;
+                                    } else {
+                                        RandomAccessFile raf = new 
RandomAccessFile(file, "rw");
+                                        try {
+                                            int position = mmap.position();
+
+                                            safeForceUnmap();
+                                            raf.setLength(roundedNewSize);
+                                            this.length = roundedNewSize;
+                                            mmap =
+                                                    raf.getChannel()
+                                                            .map(
+                                                                    
FileChannel.MapMode.READ_WRITE,
+                                                                    0,
+                                                                    
roundedNewSize);
+                                            this.maxEntries = mmap.limit() / 
entrySize();
+                                            mmap.position(position);
+                                            LOG.debug(
+                                                    "Resized {} to {}, 
position is {} and limit is {}",
+                                                    file.getAbsolutePath(),
+                                                    roundedNewSize,
+                                                    mmap.position(),
+                                                    mmap.limit());
+                                            return true;
+                                        } finally {
+                                            IOUtils.closeQuietly(
+                                                    raf, "index file " + 
file.getName());
+                                        }
+                                    }
+                                }));
     }
 
     /**
@@ -283,7 +300,7 @@ public abstract class AbstractIndex implements Closeable {
         // metadata from a physical disk.
         // To prevent this, we forcefully cleanup memory mapping within proper 
execution which never
         // affects API responsiveness.
-        inLock(lock, this::safeForceUnmap);
+        inLock(lock, () -> inWriteLock(remapLock, this::safeForceUnmap));
     }
 
     /** Remove all the entries from the index and resize the index to the max 
index size. */
@@ -411,25 +428,6 @@ public abstract class AbstractIndex implements Closeable {
         mmap.position(entries * entrySize());
     }
 
-    /**
-     * Execute the given function in a lock only if we are running on windows. 
We do this because
-     * Windows won't let us resize a file while it is mmapped. As a result we 
have to force unmap it
-     * and this requires synchronizing reads.
-     */
-    protected final <T, E extends Exception> T maybeLock(Lock lock, 
StorageAction<T, E> action)
-            throws E {
-        if (OperatingSystem.isWindows()) {
-            lock.lock();
-        }
-        try {
-            return action.execute();
-        } finally {
-            if (OperatingSystem.isWindows()) {
-                lock.unlock();
-            }
-        }
-    }
-
     /**
      * Find the slot in which the largest entry less than or equal to the 
given target key or value
      * is stored. The comparison is made using the `IndexEntry.compareTo()` 
method.
diff --git 
a/fluss-server/src/main/java/com/alibaba/fluss/server/log/OffsetIndex.java 
b/fluss-server/src/main/java/com/alibaba/fluss/server/log/OffsetIndex.java
index d3bbf4a42..2e203908d 100644
--- a/fluss-server/src/main/java/com/alibaba/fluss/server/log/OffsetIndex.java
+++ b/fluss-server/src/main/java/com/alibaba/fluss/server/log/OffsetIndex.java
@@ -30,6 +30,7 @@ import java.nio.ByteBuffer;
 import java.util.Optional;
 
 import static com.alibaba.fluss.utils.concurrent.LockUtils.inLock;
+import static com.alibaba.fluss.utils.concurrent.LockUtils.inReadLock;
 
 /* This file is based on source code of Apache Kafka Project 
(https://kafka.apache.org/), licensed by the Apache
  * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
@@ -70,7 +71,7 @@ public final class OffsetIndex extends AbstractIndex {
     private static final int ENTRY_SIZE = 8;
 
     /* the last offset in the index */
-    private long lastOffset;
+    private volatile long lastOffset;
 
     public OffsetIndex(File file, long baseOffset) throws IOException {
         this(file, baseOffset, -1);
@@ -130,8 +131,8 @@ public final class OffsetIndex extends AbstractIndex {
      *     (baseOffset, 0) is returned.
      */
     public OffsetPosition lookup(long targetOffset) {
-        return maybeLock(
-                lock,
+        return inReadLock(
+                remapLock,
                 () -> {
                     ByteBuffer idx = mmap().duplicate();
                     int slot = largestLowerBoundSlotFor(idx, targetOffset, 
IndexSearchType.KEY);
@@ -150,8 +151,8 @@ public final class OffsetIndex extends AbstractIndex {
      * @return The offset/position pair at that entry
      */
     public OffsetPosition entry(int n) {
-        return maybeLock(
-                lock,
+        return inReadLock(
+                remapLock,
                 () -> {
                     if (n >= entries()) {
                         throw new IllegalArgumentException(
@@ -173,8 +174,8 @@ public final class OffsetIndex extends AbstractIndex {
      */
     public Optional<OffsetPosition> fetchUpperBoundOffset(
             OffsetPosition fetchOffset, int fetchSize) {
-        return maybeLock(
-                lock,
+        return inReadLock(
+                remapLock,
                 () -> {
                     ByteBuffer idx = mmap().duplicate();
                     int slot =
@@ -309,8 +310,8 @@ public final class OffsetIndex extends AbstractIndex {
 
     /** The last entry in the index. */
     private OffsetPosition lastEntry() {
-        return inLock(
-                lock,
+        return inReadLock(
+                remapLock,
                 () -> {
                     int entries = entries();
                     if (entries == 0) {
diff --git 
a/fluss-server/src/main/java/com/alibaba/fluss/server/log/TimeIndex.java 
b/fluss-server/src/main/java/com/alibaba/fluss/server/log/TimeIndex.java
index 99542790d..465a4dabf 100644
--- a/fluss-server/src/main/java/com/alibaba/fluss/server/log/TimeIndex.java
+++ b/fluss-server/src/main/java/com/alibaba/fluss/server/log/TimeIndex.java
@@ -29,6 +29,7 @@ import java.nio.ByteBuffer;
 import java.nio.MappedByteBuffer;
 
 import static com.alibaba.fluss.utils.concurrent.LockUtils.inLock;
+import static com.alibaba.fluss.utils.concurrent.LockUtils.inReadLock;
 
 /* This file is based on source code of Apache Kafka Project 
(https://kafka.apache.org/), licensed by the Apache
  * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
@@ -90,16 +91,20 @@ public class TimeIndex extends AbstractIndex {
         TimestampOffset entry = lastEntry();
         long lastTimestamp = entry.timestamp;
         long lastOffset = entry.offset;
-        if (entries() != 0 && lastTimestamp < timestamp(mmap(), 0)) {
-            throw new CorruptIndexException(
-                    "Corrupt time index found, time index file ("
-                            + file().getAbsolutePath()
-                            + ") has "
-                            + "non-zero size but the last timestamp is "
-                            + lastTimestamp
-                            + " which is less than the first timestamp "
-                            + timestamp(mmap(), 0));
-        }
+        inReadLock(
+                remapLock,
+                () -> {
+                    if (entries() != 0 && lastTimestamp < timestamp(mmap(), 
0)) {
+                        throw new CorruptIndexException(
+                                "Corrupt time index found, time index file ("
+                                        + file().getAbsolutePath()
+                                        + ") has "
+                                        + "non-zero size but the last 
timestamp is "
+                                        + lastTimestamp
+                                        + " which is less than the first 
timestamp "
+                                        + timestamp(mmap(), 0));
+                    }
+                });
         if (entries() != 0 && lastOffset < baseOffset()) {
             throw new CorruptIndexException(
                     "Corrupt time index found, time index file ("
@@ -170,8 +175,8 @@ public class TimeIndex extends AbstractIndex {
      * @return The timestamp/offset pair at that entry
      */
     public TimestampOffset entry(int n) {
-        return maybeLock(
-                lock,
+        return inReadLock(
+                remapLock,
                 () -> {
                     if (n >= entries()) {
                         throw new IllegalArgumentException(
@@ -195,8 +200,8 @@ public class TimeIndex extends AbstractIndex {
      * @return The time index entry found.
      */
     public TimestampOffset lookup(long targetTimestamp) {
-        return maybeLock(
-                lock,
+        return inReadLock(
+                remapLock,
                 () -> {
                     ByteBuffer idx = mmap().duplicate();
                     int slot = largestLowerBoundSlotFor(idx, targetTimestamp, 
IndexSearchType.KEY);
@@ -332,8 +337,8 @@ public class TimeIndex extends AbstractIndex {
 
     /** Read the last entry from the index file. This operation involves disk 
access. */
     private TimestampOffset lastEntryFromIndexFile() {
-        return inLock(
-                lock,
+        return inReadLock(
+                remapLock,
                 () -> {
                     int entries = entries();
                     if (entries == 0) {
diff --git 
a/fluss-server/src/test/java/com/alibaba/fluss/server/log/AbstractIndexTest.java
 
b/fluss-server/src/test/java/com/alibaba/fluss/server/log/AbstractIndexTest.java
new file mode 100644
index 000000000..51f5ddbc5
--- /dev/null
+++ 
b/fluss-server/src/test/java/com/alibaba/fluss/server/log/AbstractIndexTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.server.log;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link AbstractIndex}. */
+public class AbstractIndexTest {
+
+    private @TempDir File tempDir;
+
+    @Test
+    public void testResizeInvokeUnmap() throws IOException {
+        File testIndex = new File(tempDir, "test.index");
+        TestIndex idx = new TestIndex(testIndex, 0L, 100, true);
+        MappedByteBuffer oldMmap = idx.mmap();
+        assertThat(idx.mmap()).isNotNull();
+        assertThat(idx.unmapInvoked).isFalse();
+
+        boolean changed = idx.resize(80);
+        assertThat(changed).isTrue();
+        // Unmap should have been invoked after resize.
+        assertThat(idx.unmapInvoked).isTrue();
+        // old mmap should be unmapped.
+        assertThat(idx.unmappedBuffer).isEqualTo(oldMmap);
+        assertThat(idx.unmappedBuffer).isNotEqualTo(idx.mmap());
+    }
+
+    private static class TestIndex extends AbstractIndex {
+        private boolean unmapInvoked = false;
+        private MappedByteBuffer unmappedBuffer = null;
+
+        public TestIndex(File file, long baseOffset, int maxIndexSize, boolean 
writable)
+                throws IOException {
+            super(file, baseOffset, maxIndexSize, writable);
+        }
+
+        @Override
+        protected int entrySize() {
+            return 1;
+        }
+
+        @Override
+        protected IndexEntry parseEntry(ByteBuffer buffer, int n) {
+            return null;
+        }
+
+        @Override
+        public void sanityCheck() {
+            // unused
+        }
+
+        @Override
+        protected void truncate() {
+            // unused
+        }
+
+        @Override
+        public void truncateTo(long offset) {
+            // unused
+        }
+
+        @Override
+        public void forceUnmap() throws IOException {
+            unmapInvoked = true;
+            unmappedBuffer = mmap();
+        }
+    }
+}

Reply via email to