This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 909efec5f65 [improve][meta] Add byte size limit to
AbstractMetadataStore's childrenCache (#24868)
909efec5f65 is described below
commit 909efec5f650a1466b63dd05b99ae292e705b7fb
Author: Lari Hotari <[email protected]>
AuthorDate: Thu Dec 4 13:13:51 2025 +0200
[improve][meta] Add byte size limit to AbstractMetadataStore's
childrenCache (#24868)
---
.../metadata/impl/AbstractMetadataStore.java | 46 +++++++++++++++++++++-
1 file changed, 44 insertions(+), 2 deletions(-)
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
index b0e4b43f700..c6a376e45ee 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
@@ -25,7 +25,10 @@ import com.github.benmanes.caffeine.cache.AsyncCacheLoader;
import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.github.benmanes.caffeine.cache.RemovalCause;
+import com.github.benmanes.caffeine.cache.RemovalListener;
import com.google.common.annotations.VisibleForTesting;
+import io.netty.buffer.ByteBufUtil;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.opentelemetry.api.OpenTelemetry;
import java.time.Instant;
@@ -102,10 +105,35 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
StringUtils.isNotBlank(metadataStoreName) ?
metadataStoreName : getClass().getSimpleName()));
registerListener(this);
- this.childrenCache = Caffeine.newBuilder()
+ long childrenCacheMaxSizeBytes = getChildrenCacheMaxSizeBytes();
+
+ Caffeine<Object, Object> childrenCacheBuilder = Caffeine.newBuilder()
.recordStats()
.refreshAfterWrite(CACHE_REFRESH_TIME_MILLIS,
TimeUnit.MILLISECONDS)
- .expireAfterWrite(CACHE_REFRESH_TIME_MILLIS * 2,
TimeUnit.MILLISECONDS)
+ .expireAfterWrite(CACHE_REFRESH_TIME_MILLIS * 2,
TimeUnit.MILLISECONDS);
+ if (childrenCacheMaxSizeBytes > 0) {
+ childrenCacheBuilder.maximumWeight(childrenCacheMaxSizeBytes)
+ .weigher((String key, List<String> children) -> {
+ // calculate the total byte size of the key and
entries in the children list
+ // to get some estimation of the required heap memory
required for the entry.
+ // add 16 bytes overhead for Java object header and 16
bytes for java.lang.String fields.
+ int totalSize = ByteBufUtil.utf8Bytes(key) + 32;
+ for (String child : children) {
+ totalSize += ByteBufUtil.utf8Bytes(child) + 32;
+ }
+ return totalSize;
+ });
+ }
+ this.childrenCache = childrenCacheBuilder
+ .evictionListener(new RemovalListener<String, List<String>>() {
+ @Override
+ public void onRemoval(String key, List<String> value,
RemovalCause cause) {
+ if (cause == RemovalCause.SIZE) {
+ log.warn("[{}] Evicting path {} from children
cache because the size of the cache is too "
+ + "large. Consider increasing the maximum
heap size.", metadataStoreName, key);
+ }
+ }
+ })
.buildAsync(new AsyncCacheLoader<String, List<String>>() {
@Override
public CompletableFuture<List<String>> asyncLoad(String
key, Executor executor) {
@@ -152,6 +180,20 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
this.metadataStoreStats = new MetadataStoreStats(metadataStoreName,
openTelemetry);
}
+ /**
+ * Return the maximum size of the children cache in bytes.
+ * @return maximum size of the children cache in bytes.
+ */
+ protected long getChildrenCacheMaxSizeBytes() {
+ long heapMaxSizeBytes = Runtime.getRuntime().maxMemory();
+ // default 20% of max heap size, this should be sufficient to prevent
OOME in the use case
+ // when a lot of namespaces with lots of topics are listed in the
metadata store.
+ long defaultSizeBytes = heapMaxSizeBytes / 5;
+ // min size 20MB
+ int minSizeBytes = 1024 * 1024 * 20;
+ return Math.max(defaultSizeBytes, minSizeBytes);
+ }
+
@Override
public CompletableFuture<Void> handleMetadataEvent(MetadataEvent event) {
CompletableFuture<Void> result = new CompletableFuture<>();