This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 909efec5f65 [improve][meta] Add byte size limit to 
AbstractMetadataStore's childrenCache (#24868)
909efec5f65 is described below

commit 909efec5f650a1466b63dd05b99ae292e705b7fb
Author: Lari Hotari <[email protected]>
AuthorDate: Thu Dec 4 13:13:51 2025 +0200

    [improve][meta] Add byte size limit to AbstractMetadataStore's 
childrenCache (#24868)
---
 .../metadata/impl/AbstractMetadataStore.java       | 46 +++++++++++++++++++++-
 1 file changed, 44 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
index b0e4b43f700..c6a376e45ee 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
@@ -25,7 +25,10 @@ import com.github.benmanes.caffeine.cache.AsyncCacheLoader;
 import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
 import com.github.benmanes.caffeine.cache.Caffeine;
 import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.github.benmanes.caffeine.cache.RemovalCause;
+import com.github.benmanes.caffeine.cache.RemovalListener;
 import com.google.common.annotations.VisibleForTesting;
+import io.netty.buffer.ByteBufUtil;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import io.opentelemetry.api.OpenTelemetry;
 import java.time.Instant;
@@ -102,10 +105,35 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
                         StringUtils.isNotBlank(metadataStoreName) ? 
metadataStoreName : getClass().getSimpleName()));
         registerListener(this);
 
-        this.childrenCache = Caffeine.newBuilder()
+        long childrenCacheMaxSizeBytes = getChildrenCacheMaxSizeBytes();
+
+        Caffeine<Object, Object> childrenCacheBuilder = Caffeine.newBuilder()
                 .recordStats()
                 .refreshAfterWrite(CACHE_REFRESH_TIME_MILLIS, 
TimeUnit.MILLISECONDS)
-                .expireAfterWrite(CACHE_REFRESH_TIME_MILLIS * 2, 
TimeUnit.MILLISECONDS)
+                .expireAfterWrite(CACHE_REFRESH_TIME_MILLIS * 2, 
TimeUnit.MILLISECONDS);
+        if (childrenCacheMaxSizeBytes > 0) {
+            childrenCacheBuilder.maximumWeight(childrenCacheMaxSizeBytes)
+                    .weigher((String key, List<String> children) -> {
+                        // calculate the total byte size of the key and 
entries in the children list
+                        // to get some estimation of the required heap memory 
required for the entry.
+                        // add 16 bytes overhead for Java object header and 16 
bytes for java.lang.String fields.
+                        int totalSize = ByteBufUtil.utf8Bytes(key) + 32;
+                        for (String child : children) {
+                            totalSize += ByteBufUtil.utf8Bytes(child) + 32;
+                        }
+                        return totalSize;
+                    });
+        }
+        this.childrenCache = childrenCacheBuilder
+                .evictionListener(new RemovalListener<String, List<String>>() {
+                    @Override
+                    public void onRemoval(String key, List<String> value, 
RemovalCause cause) {
+                        if (cause == RemovalCause.SIZE) {
+                            log.warn("[{}] Evicting path {} from children 
cache because the size of the cache is too "
+                                    + "large. Consider increasing the maximum 
heap size.", metadataStoreName, key);
+                        }
+                    }
+                })
                 .buildAsync(new AsyncCacheLoader<String, List<String>>() {
                     @Override
                     public CompletableFuture<List<String>> asyncLoad(String 
key, Executor executor) {
@@ -152,6 +180,20 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
         this.metadataStoreStats = new MetadataStoreStats(metadataStoreName, 
openTelemetry);
     }
 
+    /**
+     * Return the maximum size of the children cache in bytes.
+     * @return maximum size of the children cache in bytes.
+     */
+    protected long getChildrenCacheMaxSizeBytes() {
+        long heapMaxSizeBytes = Runtime.getRuntime().maxMemory();
+        // default 20% of max heap size, this should be sufficient to prevent 
OOME in the use case
+        // when a lot of namespaces with lots of topics are listed in the 
metadata store.
+        long defaultSizeBytes = heapMaxSizeBytes / 5;
+        // min size 20MB
+        int minSizeBytes = 1024 * 1024 * 20;
+        return Math.max(defaultSizeBytes, minSizeBytes);
+    }
+
     @Override
     public CompletableFuture<Void> handleMetadataEvent(MetadataEvent event) {
         CompletableFuture<Void> result = new CompletableFuture<>();

Reply via email to