This is an automated email from the ASF dual-hosted git repository.
showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 59e4af3f2cb KAFKA-19970: Add configurable TTL for tiered storage index
cache eviction (#21089)
59e4af3f2cb is described below
commit 59e4af3f2cb084cf1e8fb2946eca15de1fee1e10
Author: Nandini Singhal <[email protected]>
AuthorDate: Tue Dec 30 00:54:01 2025 +0100
KAFKA-19970: Add configurable TTL for tiered storage index cache eviction
(#21089)
Caffeine cache currently only uses size-based eviction with frequency
buckets. Within same frequency bucket,
larger entries are always evicted. This causes old (smaller) index files
to stay in cache indefinitely while
newer indices thrash, resulting in:
- Poor cache utilization for backfill workloads
- Higher fetch errors
- Suboptimal memory usage
This problem is exacerbated when a broker is no longer the leader for a
partition or when read-from-replica is enabled, as stale indexes can
accumulate without being cleared.
Solution:
---------
Add time-based eviction (expireAfterAccess) via a new internal config
remote.log.index.file.cache.ttl.ms (default: 1 hour). This ensures
entries are
evicted after last access time expires, preventing indefinite
accumulation.
Evicted index files are automatically re-fetched from remote storage
when needed.
Reviewers: Luke Chen <[email protected]>, Kamal Chandraprakash
<[email protected]>
---
.../scala/unit/kafka/server/KafkaConfigTest.scala | 1 +
.../log/remote/storage/RemoteLogManager.java | 7 +-
.../log/remote/storage/RemoteLogManagerConfig.java | 19 +++
.../storage/internals/log/RemoteIndexCache.java | 52 ++++++++-
.../internals/log/RemoteIndexCacheTest.java | 128 +++++++++++++++++++++
5 files changed, 202 insertions(+), 5 deletions(-)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index a780ddda271..2e366a97117 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -1016,6 +1016,7 @@ class KafkaConfigTest {
case
RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP => //
ignore string
case
RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP => //
ignore string
case
RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP =>
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
+ case RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TTL_MS_PROP =>
assertPropertyInvalid(baseProperties, name, "not_a_number", -2)
case RemoteLogManagerConfig.REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP
=> assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FOLLOWER_THREAD_POOL_SIZE_PROP =>
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP =>
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1, -2)
diff --git
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
index 3542c444b0d..ae57e0e29b6 100644
---
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
+++
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
@@ -241,7 +241,12 @@ public class RemoteLogManager implements Closeable,
AsyncOffsetReader {
copyQuotaMetrics = new RLMQuotaMetrics(metrics,
"remote-copy-throttle-time", RemoteLogManager.class.getSimpleName(),
"The %s time in millis remote copies was throttled by a broker",
INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS);
- indexCache = new
RemoteIndexCache(rlmConfig.remoteLogIndexFileCacheTotalSizeBytes(),
remoteStorageManagerPlugin.get(), logDir);
+ indexCache = new RemoteIndexCache(
+ rlmConfig.remoteLogIndexFileCacheTotalSizeBytes(),
+ rlmConfig.remoteLogIndexFileCacheTtlMs(),
+ false,
+ remoteStorageManagerPlugin.get(),
+ logDir);
delayInMs = rlmConfig.remoteLogManagerTaskIntervalMs();
rlmCopyThreadPool = new
RLMScheduledThreadPool(rlmConfig.remoteLogManagerCopierThreadPoolSize(),
"RLMCopyThreadPool", "kafka-rlm-copy-thread-pool-%d");
diff --git
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
index 0b1b049d766..dcdec1d68f7 100644
---
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
+++
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
@@ -92,6 +92,15 @@ public final class RemoteLogManagerConfig {
"from remote storage in the local storage.";
public static final long
DEFAULT_REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES = 1024 * 1024 * 1024L;
+ public static final String REMOTE_LOG_INDEX_FILE_CACHE_TTL_MS_PROP =
"remote.log.index.file.cache.ttl.ms";
+ public static final String REMOTE_LOG_INDEX_FILE_CACHE_TTL_MS_DOC = "The
maximum time in milliseconds an index file entry can remain in the cache " +
+ "after its last access. After this duration, the entry will be
evicted even if there is available space. " +
+ "This helps prevent stale index files from remaining in cache
indefinitely, particularly when a broker is no longer the leader " +
+ "for a partition or when read-from-replica is enabled. Evicted
index files are automatically re-fetched from remote storage when needed. " +
+ "Default is 15 minutes (900000 ms), which provides sufficient time
for clients to read a partition/segment while ensuring stale entries " +
+ "don't accumulate. Set to -1 to disable time-based eviction and
use only size-based eviction.";
+ public static final long DEFAULT_REMOTE_LOG_INDEX_FILE_CACHE_TTL_MS =
900000L; // 15 minutes
+
public static final String
REMOTE_LOG_MANAGER_FOLLOWER_THREAD_POOL_SIZE_PROP =
"remote.log.manager.follower.thread.pool.size";
public static final String
REMOTE_LOG_MANAGER_FOLLOWER_THREAD_POOL_SIZE_DOC = "Size of the thread pool
used in scheduling follower tasks to read " +
"the highest-uploaded remote-offset for follower partitions.";
@@ -261,6 +270,12 @@ public final class RemoteLogManagerConfig {
atLeast(1),
LOW,
REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_DOC)
+ .defineInternal(REMOTE_LOG_INDEX_FILE_CACHE_TTL_MS_PROP,
+ LONG,
+ DEFAULT_REMOTE_LOG_INDEX_FILE_CACHE_TTL_MS,
+ atLeast(-1),
+ LOW,
+ REMOTE_LOG_INDEX_FILE_CACHE_TTL_MS_DOC)
.define(REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP,
INT,
DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE,
@@ -525,6 +540,10 @@ public final class RemoteLogManagerConfig {
return
config.getLong(REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP);
}
+ public long remoteLogIndexFileCacheTtlMs() {
+ return config.getLong(REMOTE_LOG_INDEX_FILE_CACHE_TTL_MS_PROP);
+ }
+
public long remoteLogManagerCopyMaxBytesPerSecond() {
return
config.getLong(REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP);
}
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
index 325080fb0a8..49b34b54397 100644
---
a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
@@ -30,6 +30,7 @@ import org.apache.kafka.server.util.KafkaScheduler;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.RemovalCause;
+import com.github.benmanes.caffeine.cache.Ticker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,6 +49,7 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -120,10 +122,38 @@ public class RemoteIndexCache implements Closeable {
* @param logDir log directory
*/
public RemoteIndexCache(long maxSize, RemoteStorageManager
remoteStorageManager, String logDir) throws IOException {
+ this(maxSize, -1L, false, remoteStorageManager, logDir);
+ }
+
+ /**
+ * Creates RemoteIndexCache with the given configs.
+ *
+ * @param maxSize maximum bytes size of segment index entries
to be cached.
+ * @param ttlMs maximum time in milliseconds an entry can
remain in cache after last access. -1 to disable.
+ * @param recordStats whether to record cache statistics.
Recording statistics requires bookkeeping with each operation.
+ * @param remoteStorageManager RemoteStorageManager instance, to be used
in fetching indexes.
+ * @param logDir log directory
+ */
+ public RemoteIndexCache(long maxSize, long ttlMs, boolean recordStats,
RemoteStorageManager remoteStorageManager, String logDir) throws IOException {
+ this(maxSize, ttlMs, recordStats, remoteStorageManager, logDir, null);
+ }
+
+ /**
+ * Creates RemoteIndexCache with the given configs and custom ticker (for
testing).
+ *
+ * @param maxSize maximum bytes size of segment index entries
to be cached.
+ * @param ttlMs maximum time in milliseconds an entry can
remain in cache after last access. -1 to disable.
+ * @param recordStats whether to record cache statistics.
Recording statistics requires bookkeeping with each operation.
+ * @param remoteStorageManager RemoteStorageManager instance, to be used
in fetching indexes.
+ * @param logDir log directory
+ * @param ticker custom ticker for testing time-based
eviction (null for system ticker)
+ */
+ public RemoteIndexCache(long maxSize, long ttlMs, boolean recordStats,
RemoteStorageManager remoteStorageManager, String logDir,
+ Ticker ticker) throws IOException {
this.remoteStorageManager = remoteStorageManager;
cacheDir = new File(logDir, DIR_NAME);
- internalCache = initEmptyCache(maxSize);
+ internalCache = initEmptyCache(maxSize, ttlMs, recordStats, ticker);
init();
cleanerScheduler.startup();
}
@@ -138,10 +168,24 @@ public class RemoteIndexCache implements Closeable {
}
}
- private Cache<Uuid, Entry> initEmptyCache(long maxSize) {
- return Caffeine.newBuilder()
+ private Cache<Uuid, Entry> initEmptyCache(long maxSize, long ttlMs,
boolean recordStats, Ticker ticker) {
+ Caffeine<Uuid, Entry> builder = Caffeine.newBuilder()
.maximumWeight(maxSize)
- .weigher((Uuid key, Entry entry) -> (int) entry.entrySizeBytes)
+ .weigher((Uuid key, Entry entry) -> (int)
entry.entrySizeBytes);
+
+ if (recordStats) {
+ builder.recordStats();
+ }
+
+ if (ticker != null) {
+ builder.ticker(ticker);
+ }
+
+ if (ttlMs > 0) {
+ builder.expireAfterAccess(ttlMs, TimeUnit.MILLISECONDS);
+ }
+
+ return builder
// This listener is invoked each time an entry is being
automatically removed due to eviction. The cache will invoke this listener
// during the atomic operation to remove the entry (refer:
https://github.com/ben-manes/caffeine/wiki/Removal),
// hence, care must be taken to ensure that this operation is
not expensive. Note that this listener is not invoked when
diff --git
a/storage/src/test/java/org/apache/kafka/storage/internals/log/RemoteIndexCacheTest.java
b/storage/src/test/java/org/apache/kafka/storage/internals/log/RemoteIndexCacheTest.java
index a47cc4ef987..d8d3a7f5822 100644
---
a/storage/src/test/java/org/apache/kafka/storage/internals/log/RemoteIndexCacheTest.java
+++
b/storage/src/test/java/org/apache/kafka/storage/internals/log/RemoteIndexCacheTest.java
@@ -1309,4 +1309,132 @@ public class RemoteIndexCacheTest {
.filter(t -> t.isAlive() &&
t.getName().startsWith(REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD))
.collect(Collectors.toSet());
}
+
+ @Test
+ public void testCacheTtlCanBeDisabled() throws IOException {
+ // Test that TTL can be disabled by setting it to -1
+ long ttlMs = -1L;
+ FakeTicker fakeTicker = new FakeTicker();
+ RemoteIndexCache ttlCache = new RemoteIndexCache(1024 * 1024L, ttlMs,
false, rsm, logDir.toString(), fakeTicker);
+ try {
+ RemoteIndexCache.Entry entry = ttlCache.getIndexEntry(rlsMetadata);
+ assertNotNull(entry);
+
+ fakeTicker.advance(TimeUnit.MINUTES.toNanos(15));
+ ttlCache.internalCache().cleanUp();
+
+ RemoteIndexCache.Entry cachedEntry =
ttlCache.internalCache().getIfPresent(rlsMetadata.remoteLogSegmentId().id());
+ assertNotNull(cachedEntry, "Entry should remain cached when TTL
disabled");
+ } finally {
+ Utils.closeQuietly(ttlCache, "RemoteIndexCache");
+ }
+ }
+
+ @Test
+ public void testCacheTtlEviction() throws IOException {
+ long ttlMs = TimeUnit.SECONDS.toMillis(10);
+ FakeTicker fakeTicker = new FakeTicker();
+ RemoteIndexCache ttlCache = new RemoteIndexCache(1024 * 1024L, ttlMs,
false, rsm, logDir.toString(), fakeTicker);
+ try {
+ RemoteIndexCache.Entry entry = ttlCache.getIndexEntry(rlsMetadata);
+ assertNotNull(entry);
+
+ fakeTicker.advance(TimeUnit.MILLISECONDS.toNanos(ttlMs - 1000));
+ ttlCache.internalCache().cleanUp();
+ assertEquals(1, ttlCache.internalCache().estimatedSize());
+
+ fakeTicker.advance(TimeUnit.MILLISECONDS.toNanos(2000));
+ ttlCache.internalCache().cleanUp();
+ assertEquals(0, ttlCache.internalCache().estimatedSize());
+ } finally {
+ Utils.closeQuietly(ttlCache, "RemoteIndexCache");
+ }
+ }
+
+ @Test
+ public void testCacheTtlRefreshOnAccess() throws IOException {
+ long ttlMs = TimeUnit.SECONDS.toMillis(10);
+ FakeTicker fakeTicker = new FakeTicker();
+ RemoteIndexCache ttlCache = new RemoteIndexCache(1024 * 1024L, ttlMs,
false, rsm, logDir.toString(), fakeTicker);
+ try {
+ ttlCache.getIndexEntry(rlsMetadata);
+
+ fakeTicker.advance(TimeUnit.SECONDS.toNanos(8));
+ ttlCache.getIndexEntry(rlsMetadata);
+
+ fakeTicker.advance(TimeUnit.SECONDS.toNanos(8));
+ ttlCache.internalCache().cleanUp();
+
+ RemoteIndexCache.Entry cachedEntry =
ttlCache.internalCache().getIfPresent(rlsMetadata.remoteLogSegmentId().id());
+ assertNotNull(cachedEntry, "Entry should remain after access
refreshes TTL");
+ } finally {
+ Utils.closeQuietly(ttlCache, "RemoteIndexCache");
+ }
+ }
+
+ @Test
+ public void testCacheTtlWithMultipleEntries() throws IOException,
RemoteStorageException {
+ long ttlMs = TimeUnit.SECONDS.toMillis(10);
+ FakeTicker fakeTicker = new FakeTicker();
+ RemoteIndexCache ttlCache = new RemoteIndexCache(1024 * 1024L, ttlMs,
false, rsm, logDir.toString(), fakeTicker);
+ try {
+ RemoteLogSegmentId remoteLogSegmentId2 =
RemoteLogSegmentId.generateNew(idPartition);
+ RemoteLogSegmentMetadata rlsMetadata2 = new
RemoteLogSegmentMetadata(remoteLogSegmentId2, baseOffset + 100, lastOffset +
100,
+ time.milliseconds(), brokerId, time.milliseconds(),
segmentSize, Collections.singletonMap(0, 0L));
+
+ ttlCache.getIndexEntry(rlsMetadata);
+ fakeTicker.advance(TimeUnit.SECONDS.toNanos(5));
+
+ ttlCache.getIndexEntry(rlsMetadata2);
+ fakeTicker.advance(TimeUnit.SECONDS.toNanos(6));
+ ttlCache.internalCache().cleanUp();
+
+
assertNull(ttlCache.internalCache().getIfPresent(rlsMetadata.remoteLogSegmentId().id()));
+
assertNotNull(ttlCache.internalCache().getIfPresent(rlsMetadata2.remoteLogSegmentId().id()));
+ } finally {
+ Utils.closeQuietly(ttlCache, "RemoteIndexCache");
+ }
+ }
+
+ @Test
+ public void testSizeAndTimeBasedEviction() throws IOException,
RemoteStorageException, InterruptedException {
+ long estimateEntryBytesSize = estimateOneEntryBytesSize();
+ long ttlMs = TimeUnit.SECONDS.toMillis(10);
+ FakeTicker fakeTicker = new FakeTicker();
+ RemoteIndexCache ttlCache = new RemoteIndexCache(2 *
estimateEntryBytesSize, ttlMs, false, rsm, logDir.toString(), fakeTicker);
+
+ try {
+ TopicIdPartition tpId = new TopicIdPartition(Uuid.randomUuid(),
new TopicPartition("foo", 0));
+ List<RemoteLogSegmentMetadata> metadataList =
generateRemoteLogSegmentMetadata(3, tpId);
+
+ ttlCache.getIndexEntry(metadataList.get(0));
+ ttlCache.getIndexEntry(metadataList.get(1));
+ TestUtils.waitForCondition(() ->
ttlCache.internalCache().estimatedSize() == 2,
+ "Cache size should be 2 after adding 2 entries");
+
+ ttlCache.getIndexEntry(metadataList.get(2));
+ TestUtils.waitForCondition(() ->
ttlCache.internalCache().estimatedSize() == 2,
+ "Size-based eviction should keep cache at 2 entries");
+
+ fakeTicker.advance(TimeUnit.MILLISECONDS.toNanos(ttlMs + 1000));
+ ttlCache.internalCache().cleanUp();
+ assertEquals(0, ttlCache.internalCache().estimatedSize(),
+ "Time-based eviction should remove all expired entries");
+ } finally {
+ Utils.closeQuietly(ttlCache, "RemoteIndexCache");
+ }
+ }
+
+ static class FakeTicker implements
com.github.benmanes.caffeine.cache.Ticker {
+ private long nanos = 0;
+
+ public void advance(long nanoseconds) {
+ nanos += nanoseconds;
+ }
+
+ @Override
+ public long read() {
+ return nanos;
+ }
+ }
}