This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 11a5c67e7 [server] force unmap mmap file on linux when call
AbstractIndex.resize() (#1382)
11a5c67e7 is described below
commit 11a5c67e70cc0a7732ec42ff091b7ddb7cd17bdd
Author: yunhong <[email protected]>
AuthorDate: Fri Aug 22 16:41:09 2025 +0800
[server] force unmap mmap file on linux when call AbstractIndex.resize()
(#1382)
---
.../alibaba/fluss/server/log/AbstractIndex.java | 118 ++++++++++-----------
.../com/alibaba/fluss/server/log/OffsetIndex.java | 19 ++--
.../com/alibaba/fluss/server/log/TimeIndex.java | 37 ++++---
.../fluss/server/log/AbstractIndexTest.java | 92 ++++++++++++++++
4 files changed, 181 insertions(+), 85 deletions(-)
diff --git
a/fluss-server/src/main/java/com/alibaba/fluss/server/log/AbstractIndex.java
b/fluss-server/src/main/java/com/alibaba/fluss/server/log/AbstractIndex.java
index ee99e5272..850a66509 100644
--- a/fluss-server/src/main/java/com/alibaba/fluss/server/log/AbstractIndex.java
+++ b/fluss-server/src/main/java/com/alibaba/fluss/server/log/AbstractIndex.java
@@ -21,7 +21,6 @@ import
com.alibaba.fluss.exception.IndexOffsetOverflowException;
import com.alibaba.fluss.server.exception.CorruptIndexException;
import com.alibaba.fluss.utils.FileUtils;
import com.alibaba.fluss.utils.IOUtils;
-import com.alibaba.fluss.utils.OperatingSystem;
import com.alibaba.fluss.utils.log.ByteBufferUnmapper;
import org.slf4j.Logger;
@@ -37,10 +36,11 @@ import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.util.Objects;
import java.util.OptionalInt;
-import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import static com.alibaba.fluss.utils.concurrent.LockUtils.inLock;
+import static com.alibaba.fluss.utils.concurrent.LockUtils.inWriteLock;
/* This file is based on source code of Apache Kafka Project
(https://kafka.apache.org/), licensed by the Apache
* Software Foundation (ASF) under the Apache License, Version 2.0. See the
NOTICE file distributed with this work for
@@ -50,7 +50,19 @@ import static
com.alibaba.fluss.utils.concurrent.LockUtils.inLock;
public abstract class AbstractIndex implements Closeable {
private static final Logger LOG =
LoggerFactory.getLogger(AbstractIndex.class);
- protected final Lock lock = new ReentrantLock();
+ // Serializes all index operations that mutate internal state.
+ // Readers do not need to acquire this lock because:
+ // 1) MappedByteBuffer provides direct access to the OS-level buffer
cache,
+ // which allows concurrent reads in practice.
+ // 2) Clients only read committed data and are not affected by concurrent
appends/truncates.
+ // In the rare case when the data is truncated, the follower could
read inconsistent data.
+ // The follower has the logic to ignore the inconsistent data through
crc and leader epoch.
+ // 3) Read and remap operations are coordinated via remapLock to ensure
visibility of the
+ // underlying mmap.
+ protected final ReentrantLock lock = new ReentrantLock();
+ // Allows concurrent read operations while ensuring exclusive access if
the underlying mmap is
+ // changed
+ protected final ReentrantReadWriteLock remapLock = new
ReentrantReadWriteLock();
/** The base offset of the segment that this index is corresponding to. */
private final long baseOffset;
@@ -189,43 +201,48 @@ public abstract class AbstractIndex implements Closeable {
public boolean resize(int newSize) throws IOException {
return inLock(
lock,
- () -> {
- int roundedNewSize = roundDownToExactMultiple(newSize,
entrySize());
-
- if (length == roundedNewSize) {
- LOG.debug(
- "Index {} was not resized because it already
has size {}",
- file.getAbsolutePath(),
- roundedNewSize);
- return false;
- } else {
- RandomAccessFile raf = new RandomAccessFile(file,
"rw");
- try {
- int position = mmap.position();
-
- /* Windows won't let us modify the file length
while the file is mmapped :-( */
- if (OperatingSystem.isWindows()) {
- safeForceUnmap();
- }
- raf.setLength(roundedNewSize);
- this.length = roundedNewSize;
- mmap =
- raf.getChannel()
-
.map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize);
- this.maxEntries = mmap.limit() / entrySize();
- mmap.position(position);
- LOG.debug(
- "Resized {} to {}, position is {} and
limit is {}",
- file.getAbsolutePath(),
- roundedNewSize,
- mmap.position(),
- mmap.limit());
- return true;
- } finally {
- IOUtils.closeQuietly(raf, "index file " +
file.getName());
- }
- }
- });
+ () ->
+ inWriteLock(
+ remapLock,
+ () -> {
+ int roundedNewSize =
+ roundDownToExactMultiple(newSize,
entrySize());
+
+ if (length == roundedNewSize) {
+ LOG.debug(
+ "Index {} was not resized
because it already has size {}",
+ file.getAbsolutePath(),
+ roundedNewSize);
+ return false;
+ } else {
+ RandomAccessFile raf = new
RandomAccessFile(file, "rw");
+ try {
+ int position = mmap.position();
+
+ safeForceUnmap();
+ raf.setLength(roundedNewSize);
+ this.length = roundedNewSize;
+ mmap =
+ raf.getChannel()
+ .map(
+
FileChannel.MapMode.READ_WRITE,
+ 0,
+
roundedNewSize);
+ this.maxEntries = mmap.limit() /
entrySize();
+ mmap.position(position);
+ LOG.debug(
+ "Resized {} to {},
position is {} and limit is {}",
+ file.getAbsolutePath(),
+ roundedNewSize,
+ mmap.position(),
+ mmap.limit());
+ return true;
+ } finally {
+ IOUtils.closeQuietly(
+ raf, "index file " +
file.getName());
+ }
+ }
+ }));
}
/**
@@ -283,7 +300,7 @@ public abstract class AbstractIndex implements Closeable {
// metadata from a physical disk.
// To prevent this, we forcefully cleanup memory mapping within proper
execution which never
// affects API responsiveness.
- inLock(lock, this::safeForceUnmap);
+ inLock(lock, () -> inWriteLock(remapLock, this::safeForceUnmap));
}
/** Remove all the entries from the index and resize the index to the max
index size. */
@@ -411,25 +428,6 @@ public abstract class AbstractIndex implements Closeable {
mmap.position(entries * entrySize());
}
- /**
- * Execute the given function in a lock only if we are running on windows.
We do this because
- * Windows won't let us resize a file while it is mmapped. As a result we
have to force unmap it
- * and this requires synchronizing reads.
- */
- protected final <T, E extends Exception> T maybeLock(Lock lock,
StorageAction<T, E> action)
- throws E {
- if (OperatingSystem.isWindows()) {
- lock.lock();
- }
- try {
- return action.execute();
- } finally {
- if (OperatingSystem.isWindows()) {
- lock.unlock();
- }
- }
- }
-
/**
* Find the slot in which the largest entry less than or equal to the
given target key or value
* is stored. The comparison is made using the `IndexEntry.compareTo()`
method.
diff --git
a/fluss-server/src/main/java/com/alibaba/fluss/server/log/OffsetIndex.java
b/fluss-server/src/main/java/com/alibaba/fluss/server/log/OffsetIndex.java
index d3bbf4a42..2e203908d 100644
--- a/fluss-server/src/main/java/com/alibaba/fluss/server/log/OffsetIndex.java
+++ b/fluss-server/src/main/java/com/alibaba/fluss/server/log/OffsetIndex.java
@@ -30,6 +30,7 @@ import java.nio.ByteBuffer;
import java.util.Optional;
import static com.alibaba.fluss.utils.concurrent.LockUtils.inLock;
+import static com.alibaba.fluss.utils.concurrent.LockUtils.inReadLock;
/* This file is based on source code of Apache Kafka Project
(https://kafka.apache.org/), licensed by the Apache
* Software Foundation (ASF) under the Apache License, Version 2.0. See the
NOTICE file distributed with this work for
@@ -70,7 +71,7 @@ public final class OffsetIndex extends AbstractIndex {
private static final int ENTRY_SIZE = 8;
/* the last offset in the index */
- private long lastOffset;
+ private volatile long lastOffset;
public OffsetIndex(File file, long baseOffset) throws IOException {
this(file, baseOffset, -1);
@@ -130,8 +131,8 @@ public final class OffsetIndex extends AbstractIndex {
* (baseOffset, 0) is returned.
*/
public OffsetPosition lookup(long targetOffset) {
- return maybeLock(
- lock,
+ return inReadLock(
+ remapLock,
() -> {
ByteBuffer idx = mmap().duplicate();
int slot = largestLowerBoundSlotFor(idx, targetOffset,
IndexSearchType.KEY);
@@ -150,8 +151,8 @@ public final class OffsetIndex extends AbstractIndex {
* @return The offset/position pair at that entry
*/
public OffsetPosition entry(int n) {
- return maybeLock(
- lock,
+ return inReadLock(
+ remapLock,
() -> {
if (n >= entries()) {
throw new IllegalArgumentException(
@@ -173,8 +174,8 @@ public final class OffsetIndex extends AbstractIndex {
*/
public Optional<OffsetPosition> fetchUpperBoundOffset(
OffsetPosition fetchOffset, int fetchSize) {
- return maybeLock(
- lock,
+ return inReadLock(
+ remapLock,
() -> {
ByteBuffer idx = mmap().duplicate();
int slot =
@@ -309,8 +310,8 @@ public final class OffsetIndex extends AbstractIndex {
/** The last entry in the index. */
private OffsetPosition lastEntry() {
- return inLock(
- lock,
+ return inReadLock(
+ remapLock,
() -> {
int entries = entries();
if (entries == 0) {
diff --git
a/fluss-server/src/main/java/com/alibaba/fluss/server/log/TimeIndex.java
b/fluss-server/src/main/java/com/alibaba/fluss/server/log/TimeIndex.java
index 99542790d..465a4dabf 100644
--- a/fluss-server/src/main/java/com/alibaba/fluss/server/log/TimeIndex.java
+++ b/fluss-server/src/main/java/com/alibaba/fluss/server/log/TimeIndex.java
@@ -29,6 +29,7 @@ import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import static com.alibaba.fluss.utils.concurrent.LockUtils.inLock;
+import static com.alibaba.fluss.utils.concurrent.LockUtils.inReadLock;
/* This file is based on source code of Apache Kafka Project
(https://kafka.apache.org/), licensed by the Apache
* Software Foundation (ASF) under the Apache License, Version 2.0. See the
NOTICE file distributed with this work for
@@ -90,16 +91,20 @@ public class TimeIndex extends AbstractIndex {
TimestampOffset entry = lastEntry();
long lastTimestamp = entry.timestamp;
long lastOffset = entry.offset;
- if (entries() != 0 && lastTimestamp < timestamp(mmap(), 0)) {
- throw new CorruptIndexException(
- "Corrupt time index found, time index file ("
- + file().getAbsolutePath()
- + ") has "
- + "non-zero size but the last timestamp is "
- + lastTimestamp
- + " which is less than the first timestamp "
- + timestamp(mmap(), 0));
- }
+ inReadLock(
+ remapLock,
+ () -> {
+ if (entries() != 0 && lastTimestamp < timestamp(mmap(),
0)) {
+ throw new CorruptIndexException(
+ "Corrupt time index found, time index file ("
+ + file().getAbsolutePath()
+ + ") has "
+ + "non-zero size but the last
timestamp is "
+ + lastTimestamp
+ + " which is less than the first
timestamp "
+ + timestamp(mmap(), 0));
+ }
+ });
if (entries() != 0 && lastOffset < baseOffset()) {
throw new CorruptIndexException(
"Corrupt time index found, time index file ("
@@ -170,8 +175,8 @@ public class TimeIndex extends AbstractIndex {
* @return The timestamp/offset pair at that entry
*/
public TimestampOffset entry(int n) {
- return maybeLock(
- lock,
+ return inReadLock(
+ remapLock,
() -> {
if (n >= entries()) {
throw new IllegalArgumentException(
@@ -195,8 +200,8 @@ public class TimeIndex extends AbstractIndex {
* @return The time index entry found.
*/
public TimestampOffset lookup(long targetTimestamp) {
- return maybeLock(
- lock,
+ return inReadLock(
+ remapLock,
() -> {
ByteBuffer idx = mmap().duplicate();
int slot = largestLowerBoundSlotFor(idx, targetTimestamp,
IndexSearchType.KEY);
@@ -332,8 +337,8 @@ public class TimeIndex extends AbstractIndex {
/** Read the last entry from the index file. This operation involves disk
access. */
private TimestampOffset lastEntryFromIndexFile() {
- return inLock(
- lock,
+ return inReadLock(
+ remapLock,
() -> {
int entries = entries();
if (entries == 0) {
diff --git
a/fluss-server/src/test/java/com/alibaba/fluss/server/log/AbstractIndexTest.java
b/fluss-server/src/test/java/com/alibaba/fluss/server/log/AbstractIndexTest.java
new file mode 100644
index 000000000..51f5ddbc5
--- /dev/null
+++
b/fluss-server/src/test/java/com/alibaba/fluss/server/log/AbstractIndexTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.server.log;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link AbstractIndex}. */
+public class AbstractIndexTest {
+
+ private @TempDir File tempDir;
+
+ @Test
+ public void testResizeInvokeUnmap() throws IOException {
+ File testIndex = new File(tempDir, "test.index");
+ TestIndex idx = new TestIndex(testIndex, 0L, 100, true);
+ MappedByteBuffer oldMmap = idx.mmap();
+ assertThat(idx.mmap()).isNotNull();
+ assertThat(idx.unmapInvoked).isFalse();
+
+ boolean changed = idx.resize(80);
+ assertThat(changed).isTrue();
+ // Unmap should have been invoked after resize.
+ assertThat(idx.unmapInvoked).isTrue();
+ // old mmap should be unmapped.
+ assertThat(idx.unmappedBuffer).isEqualTo(oldMmap);
+ assertThat(idx.unmappedBuffer).isNotEqualTo(idx.mmap());
+ }
+
+ private static class TestIndex extends AbstractIndex {
+ private boolean unmapInvoked = false;
+ private MappedByteBuffer unmappedBuffer = null;
+
+ public TestIndex(File file, long baseOffset, int maxIndexSize, boolean
writable)
+ throws IOException {
+ super(file, baseOffset, maxIndexSize, writable);
+ }
+
+ @Override
+ protected int entrySize() {
+ return 1;
+ }
+
+ @Override
+ protected IndexEntry parseEntry(ByteBuffer buffer, int n) {
+ return null;
+ }
+
+ @Override
+ public void sanityCheck() {
+ // unused
+ }
+
+ @Override
+ protected void truncate() {
+ // unused
+ }
+
+ @Override
+ public void truncateTo(long offset) {
+ // unused
+ }
+
+ @Override
+ public void forceUnmap() throws IOException {
+ unmapInvoked = true;
+ unmappedBuffer = mmap();
+ }
+ }
+}