This is an automated email from the ASF dual-hosted git repository. chia7712 pushed a commit to branch 4.1 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 807866ca6d563892579bb93efd3e64061824c4fe Author: Masahiro Mori <[email protected]> AuthorDate: Wed Jul 9 01:15:32 2025 +0900 KAFKA-19390: Call safeForceUnmap() in AbstractIndex.resize() on Linux to prevent stale mmap of index files (#19961) https://issues.apache.org/jira/browse/KAFKA-19390 The AbstractIndex.resize() method does not release the old memory map for both index and time index files. In some cases, Mixed GC may not run for a long time, which can cause the broker to crash when the vm.max_map_count limit is reached. The root cause is that safeForceUnmap() is not being called on Linux within resize(), so we have changed the code to unmap old mmap on all operating systems. The same problem was reported in [KAFKA-7442](https://issues.apache.org/jira/browse/KAFKA-7442), but the PR submitted at that time did not acquire all necessary locks around the mmap accesses and was closed without fixing the issue. Reviewers: Jun Rao <[email protected]> --- .../org/apache/kafka/server/util/LockUtils.java | 77 ++++++++++++ .../kafka/storage/internals/log/AbstractIndex.java | 134 +++++++++++---------- .../kafka/storage/internals/log/OffsetIndex.java | 38 ++---- .../kafka/storage/internals/log/TimeIndex.java | 49 +++----- .../storage/internals/log/AbstractIndexTest.java | 89 ++++++++++++++ 5 files changed, 268 insertions(+), 119 deletions(-) diff --git a/server-common/src/main/java/org/apache/kafka/server/util/LockUtils.java b/server-common/src/main/java/org/apache/kafka/server/util/LockUtils.java index dd465977bd0..568d109daf3 100644 --- a/server-common/src/main/java/org/apache/kafka/server/util/LockUtils.java +++ b/server-common/src/main/java/org/apache/kafka/server/util/LockUtils.java @@ -26,6 +26,14 @@ import java.util.function.Supplier; * such as acquiring and releasing locks in a safe manner. */ public class LockUtils { + @FunctionalInterface + public interface ThrowingSupplier<T, E extends Exception> { + T get() throws E; + } + @FunctionalInterface + public interface ThrowingRunnable<E extends Exception> { + void run() throws E; + } /** * Executes the given {@link Supplier} within the context of the specified {@link Lock}. @@ -49,4 +57,73 @@ public class LockUtils { lock.unlock(); } } + + /** + * Executes the given {@link Runnable} within the context of the specified {@link Lock}. + * The lock is acquired before executing the runnable and released after the execution, + * ensuring that the lock is always released, even if an exception is thrown. + * + * @param lock the lock to be acquired and released + * @param runnable the runnable to be executed within the lock context + * @throws NullPointerException if either {@code lock} or {@code runnable} is null + */ + public static void inLock(Lock lock, Runnable runnable) { + Objects.requireNonNull(lock, "Lock must not be null"); + Objects.requireNonNull(runnable, "Runnable must not be null"); + + lock.lock(); + try { + runnable.run(); + } finally { + lock.unlock(); + } + } + + /** + * Executes the given {@link ThrowingSupplier} within the context of the specified {@link Lock}. + * The lock is acquired before executing the supplier and released after the execution, + * ensuring that the lock is always released, even if an exception is thrown. + * + * @param <T> the type of the result returned by the supplier + * @param <E> the type of exception that may be thrown by the supplier + * @param lock the lock to be acquired and released + * @param supplier the supplier to be executed within the lock context + * @return the result of the supplier + * @throws E if an exception occurs during the execution of the supplier + * @throws NullPointerException if either {@code lock} or {@code supplier} is null + */ + public static <T, E extends Exception> T inLockThrows(Lock lock, ThrowingSupplier<T, E> supplier) throws E { + Objects.requireNonNull(lock, "Lock must not be null"); + Objects.requireNonNull(supplier, "Supplier must not be null"); + + lock.lock(); + try { + return supplier.get(); + } finally { + lock.unlock(); + } + } + + /** + * Executes the given {@link ThrowingRunnable} within the context of the specified {@link Lock}. + * The lock is acquired before executing the runnable and released after the execution, + * ensuring that the lock is always released, even if an exception is thrown. + * + * @param <E> the type of exception that may be thrown by the runnable + * @param lock the lock to be acquired and released + * @param runnable the runnable to be executed within the lock context + * @throws E if an exception occurs during the execution of the runnable + * @throws NullPointerException if either {@code lock} or {@code runnable} is null + */ + public static <E extends Exception> void inLockThrows(Lock lock, ThrowingRunnable<E> runnable) throws E { + Objects.requireNonNull(lock, "Lock must not be null"); + Objects.requireNonNull(runnable, "Runnable must not be null"); + + lock.lock(); + try { + runnable.run(); + } finally { + lock.unlock(); + } + } } diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/AbstractIndex.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/AbstractIndex.java index f2e7c9830bd..eddd4ed8070 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/AbstractIndex.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/AbstractIndex.java @@ -17,8 +17,8 @@ package org.apache.kafka.storage.internals.log; import org.apache.kafka.common.utils.ByteBufferUnmapper; -import org.apache.kafka.common.utils.OperatingSystem; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.util.LockUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,8 +33,9 @@ import java.nio.channels.FileChannel; import java.nio.file.Files; import java.util.Objects; import java.util.OptionalInt; -import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Supplier; /** * The abstract index class which holds entry format agnostic methods. @@ -47,7 +48,10 @@ public abstract class AbstractIndex implements Closeable { private static final Logger log = LoggerFactory.getLogger(AbstractIndex.class); - protected final ReentrantLock lock = new ReentrantLock(); + // Serializes all index operations that mutate internal state + private final ReentrantLock lock = new ReentrantLock(); + // Allows concurrent read operations while ensuring exclusive access if the underlying mmap is changed + private final ReentrantReadWriteLock remapLock = new ReentrantReadWriteLock(); private final long baseOffset; private final int maxIndexSize; @@ -187,36 +191,32 @@ public abstract class AbstractIndex implements Closeable { * @return a boolean indicating whether the size of the memory map and the underneath file is changed or not. */ public boolean resize(int newSize) throws IOException { - lock.lock(); - try { - int roundedNewSize = roundDownToExactMultiple(newSize, entrySize()); - - if (length == roundedNewSize) { - log.debug("Index {} was not resized because it already has size {}", file.getAbsolutePath(), roundedNewSize); - return false; - } else { - RandomAccessFile raf = new RandomAccessFile(file, "rw"); - try { - int position = mmap.position(); - - /* Windows or z/OS won't let us modify the file length while the file is mmapped :-( */ - if (OperatingSystem.IS_WINDOWS || OperatingSystem.IS_ZOS) - safeForceUnmap(); - raf.setLength(roundedNewSize); - this.length = roundedNewSize; - mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize); - this.maxEntries = mmap.limit() / entrySize(); - mmap.position(position); - log.debug("Resized {} to {}, position is {} and limit is {}", file.getAbsolutePath(), roundedNewSize, - mmap.position(), mmap.limit()); - return true; - } finally { - Utils.closeQuietly(raf, "index file " + file.getName()); - } - } - } finally { - lock.unlock(); - } + return inLockThrows(() -> + inRemapWriteLockThrows(() -> { + int roundedNewSize = roundDownToExactMultiple(newSize, entrySize()); + + if (length == roundedNewSize) { + log.debug("Index {} was not resized because it already has size {}", file.getAbsolutePath(), roundedNewSize); + return false; + } else { + RandomAccessFile raf = new RandomAccessFile(file, "rw"); + try { + int position = mmap.position(); + + safeForceUnmap(); + raf.setLength(roundedNewSize); + this.length = roundedNewSize; + mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize); + this.maxEntries = mmap.limit() / entrySize(); + mmap.position(position); + log.debug("Resized {} to {}, position is {} and limit is {}", file.getAbsolutePath(), roundedNewSize, + mmap.position(), mmap.limit()); + return true; + } finally { + Utils.closeQuietly(raf, "index file " + file.getName()); + } + } + })); } /** @@ -236,12 +236,9 @@ public abstract class AbstractIndex implements Closeable { * Flush the data in the index to disk */ public void flush() { - lock.lock(); - try { + inLock(() -> { mmap.force(); - } finally { - lock.unlock(); - } + }); } /** @@ -261,14 +258,11 @@ public abstract class AbstractIndex implements Closeable { * the file. */ public void trimToValidSize() throws IOException { - lock.lock(); - try { + inLockThrows(() -> { if (mmap != null) { resize(entrySize() * entries); } - } finally { - lock.unlock(); - } + }); } /** @@ -288,12 +282,10 @@ public abstract class AbstractIndex implements Closeable { // However, in some cases it can pause application threads(STW) for a long moment reading metadata from a physical disk. // To prevent this, we forcefully cleanup memory mapping within proper execution which never affects API responsiveness. // See https://issues.apache.org/jira/browse/KAFKA-4614 for the details. - lock.lock(); - try { - safeForceUnmap(); - } finally { - lock.unlock(); - } + inLockThrows(() -> + inRemapWriteLockThrows(() -> { + safeForceUnmap(); + })); } /** @@ -420,20 +412,36 @@ public abstract class AbstractIndex implements Closeable { mmap.position(entries * entrySize()); } - /** - * Execute the given function in a lock only if we are running on windows or z/OS. We do this - * because Windows or z/OS won't let us resize a file while it is mmapped. As a result we have to force unmap it - * and this requires synchronizing reads. - */ - protected final <T, E extends Exception> T maybeLock(Lock lock, StorageAction<T, E> action) throws E { - if (OperatingSystem.IS_WINDOWS || OperatingSystem.IS_ZOS) - lock.lock(); - try { - return action.execute(); - } finally { - if (OperatingSystem.IS_WINDOWS || OperatingSystem.IS_ZOS) - lock.unlock(); - } + protected final <T> T inLock(Supplier<T> action) { + return LockUtils.inLock(lock, action); + } + + protected final void inLock(Runnable action) { + LockUtils.inLock(lock, action); + } + + protected final <T, E extends Exception> T inLockThrows(LockUtils.ThrowingSupplier<T, E> action) throws E { + return LockUtils.inLockThrows(lock, action); + } + + protected final <E extends Exception> void inLockThrows(LockUtils.ThrowingRunnable<E> action) throws E { + LockUtils.inLockThrows(lock, action); + } + + protected final <T> T inRemapReadLock(Supplier<T> action) { + return LockUtils.inLock(remapLock.readLock(), action); + } + + protected final void inRemapReadLock(Runnable action) { + LockUtils.inLock(remapLock.readLock(), action); + } + + protected final <T, E extends Exception> T inRemapWriteLockThrows(LockUtils.ThrowingSupplier<T, E> action) throws E { + return LockUtils.inLockThrows(remapLock.writeLock(), action); + } + + protected final <E extends Exception> void inRemapWriteLockThrows(LockUtils.ThrowingRunnable<E> action) throws E { + LockUtils.inLockThrows(remapLock.writeLock(), action); } /** diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/OffsetIndex.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/OffsetIndex.java index fe872a5cd7f..7d20edf37d3 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/OffsetIndex.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/OffsetIndex.java @@ -56,7 +56,7 @@ public final class OffsetIndex extends AbstractIndex { private static final int ENTRY_SIZE = 8; /* the last offset in the index */ - private long lastOffset; + private volatile long lastOffset; public OffsetIndex(File file, long baseOffset) throws IOException { this(file, baseOffset, -1); @@ -95,7 +95,7 @@ public final class OffsetIndex extends AbstractIndex { * the pair (baseOffset, 0) is returned. */ public OffsetPosition lookup(long targetOffset) { - return maybeLock(lock, () -> { + return inRemapReadLock(() -> { ByteBuffer idx = mmap().duplicate(); int slot = largestLowerBoundSlotFor(idx, targetOffset, IndexSearchType.KEY); if (slot == -1) @@ -111,7 +111,7 @@ public final class OffsetIndex extends AbstractIndex { * @return The offset/position pair at that entry */ public OffsetPosition entry(int n) { - return maybeLock(lock, () -> { + return inRemapReadLock(() -> { if (n >= entries()) throw new IllegalArgumentException("Attempt to fetch the " + n + "th entry from index " + file().getAbsolutePath() + ", which has size " + entries()); @@ -125,7 +125,7 @@ public final class OffsetIndex extends AbstractIndex { * such offset. */ public Optional<OffsetPosition> fetchUpperBoundOffset(OffsetPosition fetchOffset, int fetchSize) { - return maybeLock(lock, () -> { + return inRemapReadLock(() -> { ByteBuffer idx = mmap().duplicate(); int slot = smallestUpperBoundSlotFor(idx, fetchOffset.position + fetchSize, IndexSearchType.VALUE); if (slot == -1) @@ -141,8 +141,7 @@ public final class OffsetIndex extends AbstractIndex { * @throws InvalidOffsetException if provided offset is not larger than the last offset */ public void append(long offset, int position) { - lock.lock(); - try { + inLock(() -> { if (isFull()) throw new IllegalArgumentException("Attempt to append to a full index (size = " + entries() + ")."); @@ -157,15 +156,12 @@ public final class OffsetIndex extends AbstractIndex { } else throw new InvalidOffsetException("Attempt to append an offset " + offset + " to position " + entries() + " no larger than the last offset appended (" + lastOffset + ") to " + file().getAbsolutePath()); - } finally { - lock.unlock(); - } + }); } @Override public void truncateTo(long offset) { - lock.lock(); - try { + inLock(() -> { ByteBuffer idx = mmap().duplicate(); int slot = largestLowerBoundSlotFor(idx, offset, IndexSearchType.KEY); @@ -182,9 +178,7 @@ public final class OffsetIndex extends AbstractIndex { else newEntries = slot + 1; truncateToEntries(newEntries); - } finally { - lock.unlock(); - } + }); } public long lastOffset() { @@ -218,30 +212,24 @@ public final class OffsetIndex extends AbstractIndex { * Truncates index to a known number of entries. */ private void truncateToEntries(int entries) { - lock.lock(); - try { + inLock(() -> { super.truncateToEntries0(entries); this.lastOffset = lastEntry().offset; log.debug("Truncated index {} to {} entries; position is now {} and last offset is now {}", - file().getAbsolutePath(), entries, mmap().position(), lastOffset); - } finally { - lock.unlock(); - } + file().getAbsolutePath(), entries, mmap().position(), lastOffset); + }); } /** * The last entry in the index */ private OffsetPosition lastEntry() { - lock.lock(); - try { + return inRemapReadLock(() -> { int entries = entries(); if (entries == 0) return new OffsetPosition(baseOffset(), 0); else return parseEntry(mmap(), entries - 1); - } finally { - lock.unlock(); - } + }); } } diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/TimeIndex.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/TimeIndex.java index 3c3fa887fc6..e6f50da24ee 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/TimeIndex.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/TimeIndex.java @@ -76,10 +76,12 @@ public class TimeIndex extends AbstractIndex { TimestampOffset entry = lastEntry(); long lastTimestamp = entry.timestamp; long lastOffset = entry.offset; - if (entries() != 0 && lastTimestamp < timestamp(mmap(), 0)) - throw new CorruptIndexException("Corrupt time index found, time index file (" + file().getAbsolutePath() + ") has " - + "non-zero size but the last timestamp is " + lastTimestamp + " which is less than the first timestamp " - + timestamp(mmap(), 0)); + inRemapReadLock(() -> { + if (entries() != 0 && lastTimestamp < timestamp(mmap(), 0)) + throw new CorruptIndexException("Corrupt time index found, time index file (" + file().getAbsolutePath() + ") has " + + "non-zero size but the last timestamp is " + lastTimestamp + " which is less than the first timestamp " + + timestamp(mmap(), 0)); + }); if (entries() != 0 && lastOffset < baseOffset()) throw new CorruptIndexException("Corrupt time index found, time index file (" + file().getAbsolutePath() + ") has " + "non-zero size but the last offset is " + lastOffset + " which is less than the first offset " + baseOffset()); @@ -94,8 +96,7 @@ public class TimeIndex extends AbstractIndex { */ @Override public void truncateTo(long offset) { - lock.lock(); - try { + inLock(() -> { ByteBuffer idx = mmap().duplicate(); int slot = largestLowerBoundSlotFor(idx, offset, IndexSearchType.VALUE); @@ -113,9 +114,7 @@ public class TimeIndex extends AbstractIndex { newEntries = slot + 1; truncateToEntries(newEntries); - } finally { - lock.unlock(); - } + }); } // We override the full check to reserve the last time index entry slot for the on roll call. @@ -134,7 +133,7 @@ public class TimeIndex extends AbstractIndex { * @return The timestamp/offset pair at that entry */ public TimestampOffset entry(int n) { - return maybeLock(lock, () -> { + return inRemapReadLock(() -> { if (n >= entries()) throw new IllegalArgumentException("Attempt to fetch the " + n + "th entry from time index " + file().getAbsolutePath() + " which has size " + entries()); @@ -151,7 +150,7 @@ public class TimeIndex extends AbstractIndex { * @return The time index entry found. */ public TimestampOffset lookup(long targetTimestamp) { - return maybeLock(lock, () -> { + return inRemapReadLock(() -> { ByteBuffer idx = mmap().duplicate(); int slot = largestLowerBoundSlotFor(idx, targetTimestamp, IndexSearchType.KEY); if (slot == -1) @@ -181,8 +180,7 @@ public class TimeIndex extends AbstractIndex { * gets rolled or the segment is closed. */ public void maybeAppend(long timestamp, long offset, boolean skipFullCheck) { - lock.lock(); - try { + inLock(() -> { if (!skipFullCheck && isFull()) throw new IllegalArgumentException("Attempt to append to a full time index (size = " + entries() + ")."); @@ -212,23 +210,18 @@ public class TimeIndex extends AbstractIndex { if (entries() * ENTRY_SIZE != mmap.position()) throw new IllegalStateException(entries() + " entries but file position in index is " + mmap.position()); } - } finally { - lock.unlock(); - } + }); } @Override public boolean resize(int newSize) throws IOException { - lock.lock(); - try { + return inLockThrows(() -> { if (super.resize(newSize)) { this.lastEntry = lastEntryFromIndexFile(); return true; } else return false; - } finally { - lock.unlock(); - } + }); } // Visible for testing, we can make this protected once TimeIndexTest is in the same package as this class @@ -259,30 +252,24 @@ public class TimeIndex extends AbstractIndex { * Read the last entry from the index file. This operation involves disk access. */ private TimestampOffset lastEntryFromIndexFile() { - lock.lock(); - try { + return inRemapReadLock(() -> { int entries = entries(); if (entries == 0) return new TimestampOffset(RecordBatch.NO_TIMESTAMP, baseOffset()); else return parseEntry(mmap(), entries - 1); - } finally { - lock.unlock(); - } + }); } /** * Truncates index to a known number of entries. */ private void truncateToEntries(int entries) { - lock.lock(); - try { + inLock(() -> { super.truncateToEntries0(entries); this.lastEntry = lastEntryFromIndexFile(); log.debug("Truncated index {} to {} entries; position is now {} and last entry is now {}", file().getAbsolutePath(), entries, mmap().position(), lastEntry.offset); - } finally { - lock.unlock(); - } + }); } } diff --git a/storage/src/test/java/org/apache/kafka/storage/internals/log/AbstractIndexTest.java b/storage/src/test/java/org/apache/kafka/storage/internals/log/AbstractIndexTest.java new file mode 100644 index 00000000000..f6b28345b93 --- /dev/null +++ b/storage/src/test/java/org/apache/kafka/storage/internals/log/AbstractIndexTest.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.storage.internals.log; + +import org.apache.kafka.test.TestUtils; + +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNotSame; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class AbstractIndexTest { + private static class TestIndex extends AbstractIndex { + private boolean unmapInvoked = false; + private MappedByteBuffer unmappedBuffer = null; + public TestIndex(File file, long baseOffset, int maxIndexSize, boolean writable) throws IOException { + super(file, baseOffset, maxIndexSize, writable); + } + + @Override + protected int entrySize() { + return 1; + } + + @Override + protected IndexEntry parseEntry(ByteBuffer buffer, int n) { + return null; + } + + @Override + public void sanityCheck() { + // unused + } + + @Override + protected void truncate() { + // unused + } + + @Override + public void truncateTo(long offset) { + // unused + } + + @Override + protected void forceUnmap() throws IOException { + unmapInvoked = true; + unmappedBuffer = mmap(); + } + } + + @Test + public void testResizeInvokeUnmap() throws IOException { + File f = new File(TestUtils.tempDirectory(), "test-index"); + TestIndex idx = new TestIndex(f, 0L, 100, true); + MappedByteBuffer oldMmap = idx.mmap(); + assertNotNull(idx.mmap(), "MappedByteBuffer should not be null"); + assertFalse(idx.unmapInvoked, "Unmap should not have been invoked yet"); + + boolean changed = idx.resize(80); + assertTrue(changed); + assertTrue(idx.unmapInvoked, "Unmap should have been invoked after resize"); + assertSame(oldMmap, idx.unmappedBuffer, "old mmap should be unmapped"); + assertNotSame(idx.unmappedBuffer, idx.mmap()); + } +}
