This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 7adf72f0ac9 [fix][broker]Fix memory leak when using a customized 
ManagedLedger implementation (#25016)
7adf72f0ac9 is described below

commit 7adf72f0ac9bd45b4522af8e0c7603c5da3aac64
Author: fengyubiao <[email protected]>
AuthorDate: Thu Nov 27 14:39:32 2025 +0800

    [fix][broker]Fix memory leak when using a customized ManagedLedger 
implementation (#25016)
    
    (cherry picked from commit 3937788d9bb4c7f1f5c60b927a1499c6dd3dcb2e)
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |    3 +-
 .../pulsar/broker/ManagedLedgerClientFactory.java  |    3 +-
 .../broker/service/persistent/PersistentTopic.java |    4 +
 .../CustomizedManagedLedgerStorageForTest.java     | 1113 ++++++++++++++++++++
 .../mledger/impl/CustomizedManagedLedgerTest.java  |  116 ++
 5 files changed, 1237 insertions(+), 2 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index b7fc294229c..ccda99cc497 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -337,7 +337,8 @@ public class ManagedCursorImpl implements ManagedCursor {
         void operationFailed(ManagedLedgerException exception);
     }
 
-    ManagedCursorImpl(BookKeeper bookkeeper, ManagedLedgerImpl ledger, String 
cursorName) {
+    @VisibleForTesting
+    protected ManagedCursorImpl(BookKeeper bookkeeper, ManagedLedgerImpl 
ledger, String cursorName) {
         this.bookkeeper = bookkeeper;
         this.cursorProperties = Collections.emptyMap();
         this.ledger = ledger;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
index e1561bda303..ee3b0a6d1af 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
@@ -54,7 +54,8 @@ public class ManagedLedgerClientFactory implements 
ManagedLedgerStorage {
     private static final Logger log = 
LoggerFactory.getLogger(ManagedLedgerClientFactory.class);
     private static final String DEFAULT_STORAGE_CLASS_NAME = "bookkeeper";
     private BookkeeperManagedLedgerStorageClass defaultStorageClass;
-    private ManagedLedgerFactory managedLedgerFactory;
+    @VisibleForTesting
+    protected ManagedLedgerFactory managedLedgerFactory;
     private BookKeeper defaultBkClient;
     private final AsyncCache<EnsemblePlacementPolicyConfig, BookKeeper>
             bkEnsemblePolicyToBkClientMap = 
Caffeine.newBuilder().recordStats().buildAsync();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index b79b2973f2e..b3992984ba1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -4067,6 +4067,10 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                 } catch (IOException e) {
                     log.warn("[{}] [{}] Error while getting the oldest 
message", topic, cursor.toString(), e);
                     res.complete(false);
+                } finally {
+                    if (entry != null) {
+                        entry.release();
+                    }
                 }
 
             }
diff --git 
a/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/CustomizedManagedLedgerStorageForTest.java
 
b/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/CustomizedManagedLedgerStorageForTest.java
new file mode 100644
index 00000000000..5d88ed1521c
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/CustomizedManagedLedgerStorageForTest.java
@@ -0,0 +1,1113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl;
+
+import com.google.common.collect.Range;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.EventLoopGroup;
+import io.opentelemetry.api.OpenTelemetry;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import lombok.AllArgsConstructor;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedCursorMXBean;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactoryMXBean;
+import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
+import org.apache.bookkeeper.mledger.ManagedLedgerMXBean;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.PositionBound;
+import org.apache.bookkeeper.mledger.ReadOnlyCursor;
+import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager;
+import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
+import org.apache.pulsar.broker.BookKeeperClientFactory;
+import org.apache.pulsar.broker.ManagedLedgerClientFactory;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
+import org.apache.pulsar.common.api.proto.CommandSubscribe;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
+import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.jspecify.annotations.Nullable;
+
+public class CustomizedManagedLedgerStorageForTest extends 
ManagedLedgerClientFactory
+        implements ManagedLedgerStorage  {
+
+    @Override
+    public void initialize(ServiceConfiguration conf, MetadataStoreExtended 
metadataStore,
+                           BookKeeperClientFactory bookkeeperProvider,
+                           EventLoopGroup eventLoopGroup,
+                           OpenTelemetry openTelemetry) throws Exception {
+        super.initialize(conf, metadataStore, bookkeeperProvider, 
eventLoopGroup, openTelemetry);
+        managedLedgerFactory = new 
CustomizedManagedLedgerFactory(managedLedgerFactory);
+    }
+
+    @AllArgsConstructor
+    public static class CustomizedManagedLedgerFactory implements 
ManagedLedgerFactory {
+        private ManagedLedgerFactory delegate;
+        @Override
+        public ManagedLedger open(String name) throws InterruptedException, 
ManagedLedgerException {
+            return new CustomizedManagedLedger((ManagedLedgerImpl) 
delegate.open(name));
+        }
+
+        @Override
+        public ManagedLedger open(String name, ManagedLedgerConfig config) 
throws InterruptedException,
+                ManagedLedgerException {
+            return new CustomizedManagedLedger((ManagedLedgerImpl) 
delegate.open(name, config));
+        }
+        @Override
+        public void asyncOpen(String name, AsyncCallbacks.OpenLedgerCallback 
callback, Object ctx) {
+            delegate.asyncOpen(name, new AsyncCallbacks.OpenLedgerCallback() {
+
+                @Override
+                public void openLedgerComplete(ManagedLedger ledger, Object 
ctx) {
+                    callback.openLedgerComplete(new 
CustomizedManagedLedger((ManagedLedgerImpl) ledger), ctx);
+                }
+
+                @Override
+                public void openLedgerFailed(ManagedLedgerException exception, 
Object ctx) {
+                    callback.openLedgerFailed(exception, ctx);
+                }
+            }, ctx);
+        }
+
+        @Override
+        public void asyncOpen(String name, ManagedLedgerConfig config, 
AsyncCallbacks.OpenLedgerCallback callback,
+                              Supplier<CompletableFuture<Boolean>> 
mlOwnershipChecker, Object ctx) {
+            delegate.asyncOpen(name, config, new 
AsyncCallbacks.OpenLedgerCallback() {
+
+                @Override
+                public void openLedgerComplete(ManagedLedger ledger, Object 
ctx) {
+                    callback.openLedgerComplete(new 
CustomizedManagedLedger((ManagedLedgerImpl) ledger), ctx);
+                }
+
+                @Override
+                public void openLedgerFailed(ManagedLedgerException exception, 
Object ctx) {
+                    callback.openLedgerFailed(exception, ctx);
+                }
+            }, mlOwnershipChecker, ctx);
+        }
+
+        @Override
+        public ReadOnlyCursor openReadOnlyCursor(String managedLedgerName, 
Position startPosition,
+                                                 ManagedLedgerConfig config)
+                throws InterruptedException, ManagedLedgerException {
+            return delegate.openReadOnlyCursor(managedLedgerName, 
startPosition, config);
+        }
+
+        @Override
+        public void asyncOpenReadOnlyCursor(String managedLedgerName, Position 
startPosition,
+                                            ManagedLedgerConfig config,
+                                            
AsyncCallbacks.OpenReadOnlyCursorCallback callback, Object ctx) {
+            delegate.asyncOpenReadOnlyCursor(managedLedgerName, startPosition, 
config, callback, ctx);
+        }
+
+        @Override
+        public void asyncOpenReadOnlyManagedLedger(String managedLedgerName,
+                                                   
AsyncCallbacks.OpenReadOnlyManagedLedgerCallback callback,
+                                                   ManagedLedgerConfig config, 
Object ctx) {
+            delegate.asyncOpenReadOnlyManagedLedger(managedLedgerName, 
callback, config, ctx);
+        }
+
+        @Override
+        public ManagedLedgerInfo getManagedLedgerInfo(String name) throws 
InterruptedException, ManagedLedgerException {
+            return delegate.getManagedLedgerInfo(name);
+        }
+
+        @Override
+        public void asyncGetManagedLedgerInfo(String name, 
AsyncCallbacks.ManagedLedgerInfoCallback callback,
+                                              Object ctx) {
+            delegate.asyncGetManagedLedgerInfo(name, callback, ctx);
+        }
+
+        @Override
+        public void delete(String name) throws InterruptedException, 
ManagedLedgerException {
+            delegate.delete(name);
+        }
+
+        @Override
+        public void delete(String name, CompletableFuture<ManagedLedgerConfig> 
mlConfigFuture)
+                throws InterruptedException, ManagedLedgerException {
+            delegate.delete(name, mlConfigFuture);
+        }
+
+        @Override
+        public void asyncDelete(String name, 
AsyncCallbacks.DeleteLedgerCallback callback, Object ctx) {
+            delegate.asyncDelete(name, callback, ctx);
+        }
+
+        @Override
+        public void asyncDelete(String name, 
CompletableFuture<ManagedLedgerConfig> mlConfigFuture,
+                                AsyncCallbacks.DeleteLedgerCallback callback, 
Object ctx) {
+            delegate.asyncDelete(name, mlConfigFuture, callback, ctx);
+        }
+
+        @Override
+        public void shutdown() throws InterruptedException, 
ManagedLedgerException {
+            delegate.shutdown();
+        }
+
+        @Override
+        public CompletableFuture<Void> shutdownAsync() throws 
ManagedLedgerException, InterruptedException {
+            return delegate.shutdownAsync();
+        }
+
+        @Override
+        public CompletableFuture<Boolean> asyncExists(String ledgerName) {
+            return delegate.asyncExists(ledgerName);
+        }
+
+        @Override
+        public EntryCacheManager getEntryCacheManager() {
+            return delegate.getEntryCacheManager();
+        }
+
+        @Override
+        public void updateCacheEvictionTimeThreshold(long 
cacheEvictionTimeThresholdNanos) {
+            
delegate.updateCacheEvictionTimeThreshold(cacheEvictionTimeThresholdNanos);
+        }
+
+        @Override
+        public long getCacheEvictionTimeThreshold() {
+            return delegate.getCacheEvictionTimeThreshold();
+        }
+
+        @Override
+        public CompletableFuture<Map<String, String>> 
getManagedLedgerPropertiesAsync(String name) {
+            return delegate.getManagedLedgerPropertiesAsync(name);
+        }
+
+        @Override
+        public Map<String, ManagedLedger> getManagedLedgers() {
+            return delegate.getManagedLedgers();
+        }
+
+        @Override
+        public ManagedLedgerFactoryMXBean getCacheStats() {
+            return delegate.getCacheStats();
+        }
+
+        @Override
+        public void estimateUnloadedTopicBacklog(PersistentOfflineTopicStats 
offlineTopicStats, TopicName topicName,
+                                                 boolean accurate, Object ctx) 
throws Exception {
+            delegate.estimateUnloadedTopicBacklog(offlineTopicStats, 
topicName, accurate, ctx);
+        }
+
+        @Override
+        public ManagedLedgerFactoryConfig getConfig() {
+            return delegate.getConfig();
+        }
+
+    }
+
+    @AllArgsConstructor
+    public static class CustomizedManagedLedger implements ManagedLedger {
+
+        public ManagedLedgerImpl delegate;
+
+        @Override
+        public String getName() {
+            return delegate.getName();
+        }
+
+        @Override
+        public Position addEntry(byte[] data) throws InterruptedException, 
ManagedLedgerException {
+            return delegate.addEntry(data);
+        }
+
+        @Override
+        public Position addEntry(byte[] data, int numberOfMessages)
+                throws InterruptedException, ManagedLedgerException {
+            return delegate.addEntry(data, numberOfMessages);
+        }
+
+        @Override
+        public void asyncAddEntry(byte[] data, AsyncCallbacks.AddEntryCallback 
callback, Object ctx) {
+            delegate.asyncAddEntry(data, callback, ctx);
+        }
+
+        @Override
+        public Position addEntry(byte[] data, int offset, int length)
+                throws InterruptedException, ManagedLedgerException {
+            return delegate.addEntry(data, offset, length);
+        }
+
+        @Override
+        public Position addEntry(byte[] data, int numberOfMessages, int 
offset, int length)
+                throws InterruptedException, ManagedLedgerException {
+            return delegate.addEntry(data, numberOfMessages, offset, length);
+        }
+
+        @Override
+        public void asyncAddEntry(byte[] data, int offset, int length, 
AsyncCallbacks.AddEntryCallback callback,
+                                  Object ctx) {
+            delegate.asyncAddEntry(data, offset, length, callback, ctx);
+        }
+
+        @Override
+        public void asyncAddEntry(byte[] data, int numberOfMessages, int 
offset, int length,
+                                  AsyncCallbacks.AddEntryCallback callback, 
Object ctx) {
+            delegate.asyncAddEntry(data, numberOfMessages, offset, length, 
callback, ctx);
+        }
+
+        @Override
+        public void asyncAddEntry(ByteBuf buffer, 
AsyncCallbacks.AddEntryCallback callback, Object ctx) {
+            delegate.asyncAddEntry(buffer, callback, ctx);
+        }
+
+        @Override
+        public void asyncAddEntry(ByteBuf buffer, int numberOfMessages, 
AsyncCallbacks.AddEntryCallback callback,
+                                  Object ctx) {
+            delegate.asyncAddEntry(buffer, numberOfMessages, callback, ctx);
+        }
+
+        @Override
+        public ManagedCursor openCursor(String name) throws 
InterruptedException, ManagedLedgerException {
+            ManagedCursor managedCursor = delegate.openCursor(name);
+            if (managedCursor instanceof ManagedCursorImpl) {
+                return new ManagedCursorDecorator((ManagedCursorImpl) 
managedCursor);
+            }
+            return  managedCursor;
+        }
+
+        @Override
+        public ManagedCursor openCursor(String name, 
CommandSubscribe.InitialPosition initialPosition)
+                throws InterruptedException, ManagedLedgerException {
+            ManagedCursor managedCursor = delegate.openCursor(name, 
initialPosition);
+            if (managedCursor instanceof ManagedCursorImpl) {
+                return new ManagedCursorDecorator((ManagedCursorImpl) 
managedCursor);
+            }
+            return  managedCursor;
+        }
+
+        @Override
+        public ManagedCursor openCursor(String name, 
CommandSubscribe.InitialPosition initialPosition,
+                                        Map<String, Long> properties, 
Map<String, String> cursorProperties)
+                throws InterruptedException, ManagedLedgerException {
+            ManagedCursor managedCursor = delegate.openCursor(name, 
initialPosition, properties, cursorProperties);
+            if (managedCursor instanceof ManagedCursorImpl) {
+                return new ManagedCursorDecorator((ManagedCursorImpl) 
managedCursor);
+            }
+            return  managedCursor;
+        }
+
+        @Override
+        public ManagedCursor newNonDurableCursor(Position startCursorPosition) 
throws ManagedLedgerException {
+            return delegate.newNonDurableCursor(startCursorPosition);
+        }
+
+        @Override
+        public ManagedCursor newNonDurableCursor(Position startPosition, 
String subscriptionName)
+                throws ManagedLedgerException {
+            return delegate.newNonDurableCursor(startPosition, 
subscriptionName);
+        }
+
+        @Override
+        public ManagedCursor newNonDurableCursor(Position startPosition, 
String subscriptionName,
+                                                 
CommandSubscribe.InitialPosition initialPosition,
+                                                 boolean isReadCompacted) 
throws ManagedLedgerException {
+            return delegate.newNonDurableCursor(startPosition, 
subscriptionName, initialPosition, isReadCompacted);
+        }
+
+        @Override
+        public void asyncDeleteCursor(String name, 
AsyncCallbacks.DeleteCursorCallback callback, Object ctx) {
+            delegate.asyncDeleteCursor(name, callback, ctx);
+        }
+
+        @Override
+        public void deleteCursor(String name) throws InterruptedException, 
ManagedLedgerException {
+            delegate.deleteCursor(name);
+        }
+
+        @Override
+        public void removeWaitingCursor(ManagedCursor cursor) {
+            if (cursor instanceof ManagedCursorDecorator decorator) {
+                delegate.removeWaitingCursor(decorator.delegate);
+                return;
+            }
+            delegate.removeWaitingCursor(cursor);
+        }
+
+        @Override
+        public void asyncOpenCursor(String name, 
AsyncCallbacks.OpenCursorCallback callback, Object ctx) {
+            delegate.asyncOpenCursor(name, callback, ctx);
+        }
+
+        @Override
+        public void asyncOpenCursor(String name, 
CommandSubscribe.InitialPosition initialPosition,
+                                    AsyncCallbacks.OpenCursorCallback 
callback, Object ctx) {
+            delegate.asyncOpenCursor(name, initialPosition, new 
AsyncCallbacks.OpenCursorCallback() {
+                @Override
+                public void openCursorComplete(ManagedCursor cursor, Object 
ctx) {
+                    callback.openCursorComplete(new 
ManagedCursorDecorator((ManagedCursorImpl) cursor), ctx);
+                }
+
+                @Override
+                public void openCursorFailed(ManagedLedgerException exception, 
Object ctx) {
+                    callback.openCursorComplete(null, exception);
+                }
+            }, ctx);
+        }
+
+        @Override
+        public void asyncOpenCursor(String name, 
CommandSubscribe.InitialPosition initialPosition,
+                                    Map<String, Long> properties, Map<String, 
String> cursorProperties,
+                                    AsyncCallbacks.OpenCursorCallback 
callback, Object ctx) {
+            delegate.asyncOpenCursor(name, initialPosition, properties, 
cursorProperties,
+                new AsyncCallbacks.OpenCursorCallback() {
+                    @Override
+                    public void openCursorComplete(ManagedCursor cursor, 
Object ctx) {
+                        callback.openCursorComplete(new 
ManagedCursorDecorator((ManagedCursorImpl) cursor), ctx);
+                    }
+
+                    @Override
+                    public void openCursorFailed(ManagedLedgerException 
exception, Object ctx) {
+                        callback.openCursorComplete(null, exception);
+                    }
+                }, ctx);
+        }
+
+        @Override
+        public Iterable<ManagedCursor> getCursors() {
+            return delegate.getCursors();
+        }
+
+        @Override
+        public Iterable<ManagedCursor> getActiveCursors() {
+            return delegate.getActiveCursors();
+        }
+
+        @Override
+        public long getNumberOfEntries() {
+            return delegate.getNumberOfEntries();
+        }
+
+        @Override
+        public long getNumberOfEntries(Range<Position> range) {
+            return delegate.getNumberOfEntries(range);
+        }
+
+        @Override
+        public long getNumberOfActiveEntries() {
+            return delegate.getNumberOfActiveEntries();
+        }
+
+        @Override
+        public long getTotalSize() {
+            return delegate.getTotalSize();
+        }
+
+        @Override
+        public long getEstimatedBacklogSize() {
+            return delegate.getEstimatedBacklogSize();
+        }
+
+        @Override
+        public CompletableFuture<Long> 
getEarliestMessagePublishTimeInBacklog() {
+            return delegate.getEarliestMessagePublishTimeInBacklog();
+        }
+
+        @Override
+        public long getOffloadedSize() {
+            return delegate.getOffloadedSize();
+        }
+
+        @Override
+        public long getLastOffloadedLedgerId() {
+            return delegate.getLastOffloadedLedgerId();
+        }
+
+        @Override
+        public long getLastOffloadedSuccessTimestamp() {
+            return delegate.getLastOffloadedSuccessTimestamp();
+        }
+
+        @Override
+        public long getLastOffloadedFailureTimestamp() {
+            return delegate.getLastOffloadedFailureTimestamp();
+        }
+
+        @Override
+        public void asyncTerminate(AsyncCallbacks.TerminateCallback callback, 
Object ctx) {
+            delegate.asyncTerminate(callback, ctx);
+        }
+
+        @Override
+        public CompletableFuture<Position> asyncMigrate() {
+            return delegate.asyncMigrate();
+        }
+
+        @Override
+        public Position terminate() throws InterruptedException, 
ManagedLedgerException {
+            return delegate.terminate();
+        }
+
+        @Override
+        public void close() throws InterruptedException, 
ManagedLedgerException {
+            delegate.close();
+        }
+
+        @Override
+        public void asyncClose(AsyncCallbacks.CloseCallback callback, Object 
ctx) {
+            delegate.asyncClose(callback, ctx);
+        }
+
+        @Override
+        public ManagedLedgerMXBean getStats() {
+            return delegate.getStats();
+        }
+
+        @Override
+        public void delete() throws InterruptedException, 
ManagedLedgerException {
+            delegate.delete();
+        }
+
+        @Override
+        public void asyncDelete(AsyncCallbacks.DeleteLedgerCallback callback, 
Object ctx) {
+            delegate.asyncDelete(callback, ctx);
+        }
+
+        @Override
+        public Position offloadPrefix(Position pos) throws 
InterruptedException, ManagedLedgerException {
+            return delegate.offloadPrefix(pos);
+        }
+
+        @Override
+        public void asyncOffloadPrefix(Position pos, 
AsyncCallbacks.OffloadCallback callback, Object ctx) {
+            delegate.asyncOffloadPrefix(pos, callback, ctx);
+        }
+
+        @Override
+        public @Nullable ManagedCursor getSlowestConsumer() {
+            return delegate.getSlowestConsumer();
+        }
+
+        @Override
+        public boolean isTerminated() {
+            return delegate.isTerminated();
+        }
+
+        @Override
+        public boolean isMigrated() {
+            return delegate.isMigrated();
+        }
+
+        @Override
+        public ManagedLedgerConfig getConfig() {
+            return delegate.getConfig();
+        }
+
+        @Override
+        public void setConfig(ManagedLedgerConfig config) {
+            delegate.setConfig(config);
+        }
+
+        @Override
+        public Position getLastConfirmedEntry() {
+            return delegate.getLastConfirmedEntry();
+        }
+
+        @Override
+        public void readyToCreateNewLedger() {
+            delegate.readyToCreateNewLedger();
+        }
+
+        @Override
+        public Map<String, String> getProperties() {
+            return delegate.getProperties();
+        }
+
+        @Override
+        public void setProperty(String key, String value) throws 
InterruptedException, ManagedLedgerException {
+            delegate.setProperty(key, value);
+        }
+
+        @Override
+        public void asyncSetProperty(String key, String value, 
AsyncCallbacks.UpdatePropertiesCallback callback,
+                                     Object ctx) {
+            delegate.asyncSetProperty(key, value, callback, ctx);
+        }
+
+        @Override
+        public void deleteProperty(String key) throws InterruptedException, 
ManagedLedgerException {
+            delegate.deleteProperty(key);
+        }
+
+        @Override
+        public void asyncDeleteProperty(String key, 
AsyncCallbacks.UpdatePropertiesCallback callback, Object ctx) {
+            delegate.asyncDeleteProperty(key, callback, ctx);
+        }
+
+        @Override
+        public void setProperties(Map<String, String> properties) throws 
InterruptedException, ManagedLedgerException {
+            delegate.setProperties(properties);
+        }
+
+        @Override
+        public void asyncSetProperties(Map<String, String> properties, 
AsyncCallbacks.UpdatePropertiesCallback callback,
+                                       Object ctx) {
+            delegate.asyncSetProperties(properties, callback, ctx);
+        }
+
+        @Override
+        public void trimConsumedLedgersInBackground(CompletableFuture<?> 
promise) {
+            delegate.trimConsumedLedgersInBackground(promise);
+        }
+
+        @Override
+        public void rollCurrentLedgerIfFull() {
+            delegate.rollCurrentLedgerIfFull();
+        }
+
+        @Override
+        public CompletableFuture<Position> asyncFindPosition(Predicate<Entry> 
predicate) {
+            return delegate.asyncFindPosition(predicate);
+        }
+
+        @Override
+        public ManagedLedgerInterceptor getManagedLedgerInterceptor() {
+            return delegate.getManagedLedgerInterceptor();
+        }
+
+        @Override
+        public CompletableFuture<MLDataFormats.ManagedLedgerInfo.LedgerInfo> 
getLedgerInfo(long ledgerId) {
+            return delegate.getLedgerInfo(ledgerId);
+        }
+
+        @Override
+        public Optional<MLDataFormats.ManagedLedgerInfo.LedgerInfo> 
getOptionalLedgerInfo(long ledgerId) {
+            return delegate.getOptionalLedgerInfo(ledgerId);
+        }
+
+        @Override
+        public CompletableFuture<Void> asyncTruncate() {
+            return delegate.asyncTruncate();
+        }
+
+        @Override
+        public CompletableFuture<ManagedLedgerInternalStats> 
getManagedLedgerInternalStats(
+                boolean includeLedgerMetadata) {
+            return 
delegate.getManagedLedgerInternalStats(includeLedgerMetadata);
+        }
+
+        @Override
+        public boolean checkInactiveLedgerAndRollOver() {
+            return delegate.checkInactiveLedgerAndRollOver();
+        }
+
+        @Override
+        public void checkCursorsToCacheEntries() {
+            delegate.checkCursorsToCacheEntries();
+        }
+
+        @Override
+        public void asyncReadEntry(Position position, 
AsyncCallbacks.ReadEntryCallback callback, Object ctx) {
+            delegate.asyncReadEntry(position, callback, ctx);
+        }
+
+        @Override
+        public NavigableMap<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> 
getLedgersInfo() {
+            return delegate.getLedgersInfo();
+        }
+
+        @Override
+        public Position getNextValidPosition(Position position) {
+            return delegate.getNextValidPosition(position);
+        }
+
+        @Override
+        public Position getPreviousPosition(Position position) {
+            return delegate.getPreviousPosition(position);
+        }
+
+        @Override
+        public long getEstimatedBacklogSize(Position position) {
+            return delegate.getEstimatedBacklogSize(position);
+        }
+
+        @Override
+        public Position getPositionAfterN(Position startPosition, long n, 
PositionBound startRange) {
+            return delegate.getPositionAfterN(startPosition, n, startRange);
+        }
+
+        @Override
+        public int getPendingAddEntriesCount() {
+            return delegate.getPendingAddEntriesCount();
+        }
+
+        @Override
+        public long getCacheSize() {
+            return delegate.getCacheSize();
+        }
+
+        @Override
+        public Position getFirstPosition() {
+            return delegate.getFirstPosition();
+        }
+    }
+
+    public static class ManagedCursorDecorator implements ManagedCursor {
+
+        private final ManagedCursorImpl delegate;
+        public Map<EntryImpl, Boolean> entryReleasedStatusMap = new 
ConcurrentHashMap<>();
+
+        public ManagedCursorDecorator(ManagedCursorImpl delegate) {
+            this.delegate = delegate;
+        }
+
+        @Override
+        public String getName() {
+            return delegate.getName();
+        }
+
+        @Override
+        public long getLastActive() {
+            return delegate.getLastActive();
+        }
+
+        @Override
+        public void updateLastActive() {
+            delegate.updateLastActive();
+        }
+
+        @Override
+        public Map<String, Long> getProperties() {
+            return delegate.getProperties();
+        }
+
+        @Override
+        public Map<String, String> getCursorProperties() {
+            return delegate.getCursorProperties();
+        }
+
+        @Override
+        public CompletableFuture<Void> putCursorProperty(String key, String 
value) {
+            return delegate.putCursorProperty(key, value);
+        }
+
+        @Override
+        public CompletableFuture<Void> setCursorProperties(Map<String, String> 
cursorProperties) {
+            return delegate.setCursorProperties(cursorProperties);
+        }
+
+        @Override
+        public CompletableFuture<Void> removeCursorProperty(String key) {
+            return delegate.removeCursorProperty(key);
+        }
+
+        @Override
+        public boolean putProperty(String key, Long value) {
+            return delegate.putProperty(key, value);
+        }
+
+        @Override
+        public boolean removeProperty(String key) {
+            return delegate.removeProperty(key);
+        }
+
+        @Override
+        public List<Entry> readEntries(int numberOfEntriesToRead) throws 
InterruptedException, ManagedLedgerException {
+            return delegate.readEntries(numberOfEntriesToRead);
+        }
+
+        @Override
+        public void asyncReadEntries(int numberOfEntriesToRead, 
AsyncCallbacks.ReadEntriesCallback callback, Object ctx,
+                                     Position maxPosition) {
+            delegate.asyncReadEntries(numberOfEntriesToRead, callback, ctx, 
maxPosition);
+        }
+
+        @Override
+        public void asyncReadEntries(int numberOfEntriesToRead, long 
maxSizeBytes,
+                                     AsyncCallbacks.ReadEntriesCallback 
callback, Object ctx, Position maxPosition) {
+            delegate.asyncReadEntries(numberOfEntriesToRead, maxSizeBytes, 
callback, ctx, maxPosition);
+        }
+
+        @Override
+        public Entry getNthEntry(int n, IndividualDeletedEntries 
deletedEntries)
+                throws InterruptedException, ManagedLedgerException {
+            EntryImpl entry = (EntryImpl) delegate.getNthEntry(n, 
deletedEntries);
+            entryReleasedStatusMap.put(entry, Boolean.FALSE);
+            entry.onDeallocate(() -> {
+                entryReleasedStatusMap.put(entry, Boolean.TRUE);
+            });
+            return entry;
+        }
+
+        @Override
+        public void asyncGetNthEntry(int n, IndividualDeletedEntries 
deletedEntries,
+                                     AsyncCallbacks.ReadEntryCallback 
callback, Object ctx) {
+            delegate.asyncGetNthEntry(n, deletedEntries, new 
AsyncCallbacks.ReadEntryCallback() {
+
+                @Override
+                public void readEntryComplete(Entry entry, Object ctx) {
+                    EntryImpl e = (EntryImpl) entry;
+                    entryReleasedStatusMap.put(e, Boolean.FALSE);
+                    e.onDeallocate(() -> {
+                        entryReleasedStatusMap.put(e, Boolean.TRUE);
+                    });
+                    callback.readEntryComplete(e, ctx);
+                }
+
+                @Override
+                public void readEntryFailed(ManagedLedgerException exception, 
Object ctx) {
+                    callback.readEntryFailed(exception, ctx);
+                }
+            }, ctx);
+        }
+
+        @Override
+        public List<Entry> readEntriesOrWait(int numberOfEntriesToRead)
+                throws InterruptedException, ManagedLedgerException {
+            return delegate.readEntriesOrWait(numberOfEntriesToRead);
+        }
+
+        @Override
+        public List<Entry> readEntriesOrWait(int maxEntries, long maxSizeBytes)
+                throws InterruptedException, ManagedLedgerException {
+            return delegate.readEntriesOrWait(maxEntries, maxSizeBytes);
+        }
+
+        @Override
+        public void asyncReadEntriesOrWait(int numberOfEntriesToRead, 
AsyncCallbacks.ReadEntriesCallback callback,
+                                           Object ctx, Position maxPosition) {
+            delegate.asyncReadEntriesOrWait(numberOfEntriesToRead, callback, 
ctx, maxPosition);
+        }
+
+        @Override
+        public void asyncReadEntriesOrWait(int maxEntries, long maxSizeBytes,
+                                           AsyncCallbacks.ReadEntriesCallback 
callback, Object ctx,
+                                           Position maxPosition) {
+            delegate.asyncReadEntriesOrWait(maxEntries, maxSizeBytes, 
callback, ctx, maxPosition);
+        }
+
+        @Override
+        public boolean cancelPendingReadRequest() {
+            return delegate.cancelPendingReadRequest();
+        }
+
+        @Override
+        public boolean hasMoreEntries() {
+            return delegate.hasMoreEntries();
+        }
+
+        @Override
+        public long getNumberOfEntries() {
+            return delegate.getNumberOfEntries();
+        }
+
+        @Override
+        public long getNumberOfEntriesInBacklog(boolean isPrecise) {
+            return delegate.getNumberOfEntriesInBacklog(isPrecise);
+        }
+
+        @Override
+        public void markDelete(Position position) throws InterruptedException, 
ManagedLedgerException {
+            delegate.markDelete(position);
+        }
+
+        @Override
+        public void markDelete(Position position, Map<String, Long> properties)
+                throws InterruptedException, ManagedLedgerException {
+            delegate.markDelete(position, properties);
+        }
+
+        @Override
+        public void asyncMarkDelete(Position position, 
AsyncCallbacks.MarkDeleteCallback callback, Object ctx) {
+            delegate.asyncMarkDelete(position, callback, ctx);
+        }
+
+        @Override
+        public void asyncMarkDelete(Position position, Map<String, Long> 
properties,
+                                    AsyncCallbacks.MarkDeleteCallback 
callback, Object ctx) {
+            delegate.asyncMarkDelete(position, properties, callback, ctx);
+        }
+
+        @Override
+        public void delete(Position position) throws InterruptedException, 
ManagedLedgerException {
+            delegate.delete(position);
+        }
+
+        @Override
+        public void asyncDelete(Position position, 
AsyncCallbacks.DeleteCallback callback, Object ctx) {
+            delegate.asyncDelete(position, callback, ctx);
+        }
+
+        @Override
+        public void delete(Iterable<Position> positions) throws 
InterruptedException, ManagedLedgerException {
+            delegate.delete(positions);
+        }
+
+        @Override
+        public void asyncDelete(Iterable<Position> position, 
AsyncCallbacks.DeleteCallback callback, Object ctx) {
+            delegate.asyncDelete(position, callback, ctx);
+        }
+
+        @Override
+        public Position getReadPosition() {
+            return delegate.getReadPosition();
+        }
+
+        @Override
+        public Position getMarkDeletedPosition() {
+            return delegate.getMarkDeletedPosition();
+        }
+
+        @Override
+        public Position getPersistentMarkDeletedPosition() {
+            return delegate.getPersistentMarkDeletedPosition();
+        }
+
+        @Override
+        public void rewind() {
+            delegate.rewind();
+        }
+
+        @Override
+        public void seek(Position newReadPosition, boolean force) {
+            delegate.seek(newReadPosition, force);
+        }
+
+        @Override
+        public void clearBacklog() throws InterruptedException, 
ManagedLedgerException {
+            delegate.clearBacklog();
+        }
+
+        @Override
+        public void asyncClearBacklog(AsyncCallbacks.ClearBacklogCallback 
callback, Object ctx) {
+            delegate.asyncClearBacklog(callback, ctx);
+        }
+
+        @Override
+        public void skipEntries(int numEntriesToSkip, IndividualDeletedEntries 
deletedEntries)
+                throws InterruptedException, ManagedLedgerException {
+            delegate.skipEntries(numEntriesToSkip, deletedEntries);
+        }
+
+        @Override
+        public void asyncSkipEntries(int numEntriesToSkip, 
IndividualDeletedEntries deletedEntries,
+                                     AsyncCallbacks.SkipEntriesCallback 
callback, Object ctx) {
+            delegate.asyncSkipEntries(numEntriesToSkip, deletedEntries, 
callback, ctx);
+        }
+
+        @Override
+        public Position findNewestMatching(Predicate<Entry> condition)
+                throws InterruptedException, ManagedLedgerException {
+            return delegate.findNewestMatching(condition);
+        }
+
+        @Override
+        public Position findNewestMatching(FindPositionConstraint constraint, 
Predicate<Entry> condition)
+                throws InterruptedException, ManagedLedgerException {
+            return delegate.findNewestMatching(constraint, condition);
+        }
+
+        @Override
+        public void asyncFindNewestMatching(FindPositionConstraint constraint, 
Predicate<Entry> condition,
+                                            AsyncCallbacks.FindEntryCallback 
callback, Object ctx) {
+            delegate.asyncFindNewestMatching(constraint, condition, callback, 
ctx);
+        }
+
+        @Override
+        public void asyncFindNewestMatching(FindPositionConstraint constraint, 
Predicate<Entry> condition,
+                                            AsyncCallbacks.FindEntryCallback 
callback, Object ctx,
+                                            boolean isFindFromLedger) {
+            delegate.asyncFindNewestMatching(constraint, condition, callback, 
ctx, isFindFromLedger);
+        }
+
+        @Override
+        public void resetCursor(Position position) throws 
InterruptedException, ManagedLedgerException {
+            delegate.resetCursor(position);
+        }
+
+        @Override
+        public void asyncResetCursor(Position position, boolean forceReset,
+                                     AsyncCallbacks.ResetCursorCallback 
callback) {
+            delegate.asyncResetCursor(position, forceReset, callback);
+        }
+
+        @Override
+        public List<Entry> replayEntries(Set<? extends Position> positions)
+                throws InterruptedException, ManagedLedgerException {
+            return delegate.replayEntries(positions);
+        }
+
+        @Override
+        public Set<? extends Position> asyncReplayEntries(Set<? extends 
Position> positions,
+                                                          
AsyncCallbacks.ReadEntriesCallback callback, Object ctx) {
+            return delegate.asyncReplayEntries(positions, callback, ctx);
+        }
+
+        @Override
+        public Set<? extends Position> asyncReplayEntries(Set<? extends 
Position> positions,
+                                                          
AsyncCallbacks.ReadEntriesCallback callback, Object ctx,
+                                                          boolean sortEntries) 
{
+            return delegate.asyncReplayEntries(positions, callback, ctx, 
sortEntries);
+        }
+
+        @Override
+        public void close() throws InterruptedException, 
ManagedLedgerException {
+            delegate.close();
+        }
+
+        @Override
+        public void asyncClose(AsyncCallbacks.CloseCallback callback, Object 
ctx) {
+            delegate.asyncClose(callback, ctx);
+        }
+
+        @Override
+        public Position getFirstPosition() {
+            return delegate.getFirstPosition();
+        }
+
+        @Override
+        public void setActive() {
+            delegate.setActive();
+        }
+
+        @Override
+        public void setInactive() {
+            delegate.setInactive();
+        }
+
+        @Override
+        public void setAlwaysInactive() {
+            delegate.setAlwaysInactive();
+        }
+
+        @Override
+        public boolean isActive() {
+            return delegate.isActive();
+        }
+
+        @Override
+        public boolean isDurable() {
+            return delegate.isDurable();
+        }
+
+        @Override
+        public long getNumberOfEntriesSinceFirstNotAckedMessage() {
+            return delegate.getNumberOfEntriesSinceFirstNotAckedMessage();
+        }
+
+        @Override
+        public int getTotalNonContiguousDeletedMessagesRange() {
+            return delegate.getTotalNonContiguousDeletedMessagesRange();
+        }
+
+        @Override
+        public int getNonContiguousDeletedMessagesRangeSerializedSize() {
+            return 
delegate.getNonContiguousDeletedMessagesRangeSerializedSize();
+        }
+
+        @Override
+        public long getEstimatedSizeSinceMarkDeletePosition() {
+            return delegate.getEstimatedSizeSinceMarkDeletePosition();
+        }
+
+        @Override
+        public double getThrottleMarkDelete() {
+            return delegate.getThrottleMarkDelete();
+        }
+
+        @Override
+        public void setThrottleMarkDelete(double throttleMarkDelete) {
+            delegate.setThrottleMarkDelete(throttleMarkDelete);
+        }
+
+        @Override
+        public ManagedLedger getManagedLedger() {
+            return delegate.getManagedLedger();
+        }
+
+        @Override
+        public Range<Position> getLastIndividualDeletedRange() {
+            return delegate.getLastIndividualDeletedRange();
+        }
+
+        @Override
+        public void trimDeletedEntries(List<Entry> entries) {
+            delegate.trimDeletedEntries(entries);
+        }
+
+        @Override
+        public long[] getDeletedBatchIndexesAsLongArray(Position position) {
+            return delegate.getDeletedBatchIndexesAsLongArray(position);
+        }
+
+        @Override
+        public ManagedCursorMXBean getStats() {
+            return delegate.getStats();
+        }
+
+        @Override
+        public boolean checkAndUpdateReadPositionChanged() {
+            return delegate.checkAndUpdateReadPositionChanged();
+        }
+
+        @Override
+        public boolean isClosed() {
+            return delegate.isClosed();
+        }
+
+        @Override
+        public ManagedLedgerInternalStats.CursorStats getCursorStats() {
+            return delegate.getCursorStats();
+        }
+
+        @Override
+        public boolean isMessageDeleted(Position position) {
+            return delegate.isMessageDeleted(position);
+        }
+
+        @Override
+        public ManagedCursor duplicateNonDurableCursor(String 
nonDurableCursorName) throws ManagedLedgerException {
+            return delegate.duplicateNonDurableCursor(nonDurableCursorName);
+        }
+
+        @Override
+        public long[] getBatchPositionAckSet(Position position) {
+            return delegate.getBatchPositionAckSet(position);
+        }
+
+        @Override
+        public int applyMaxSizeCap(int maxEntries, long maxSizeBytes) {
+            return delegate.applyMaxSizeCap(maxEntries, maxSizeBytes);
+        }
+
+        @Override
+        public void updateReadStats(int readEntriesCount, long 
readEntriesSize) {
+            delegate.updateReadStats(readEntriesCount, readEntriesSize);
+        }
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/CustomizedManagedLedgerTest.java
 
b/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/CustomizedManagedLedgerTest.java
new file mode 100644
index 00000000000..5ad1ee618c0
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/CustomizedManagedLedgerTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.naming.TopicName;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker")
+public class CustomizedManagedLedgerTest extends ProducerConsumerBase {
+
+    @BeforeClass(alwaysRun = true)
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @AfterClass(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    protected void doInitConf() throws Exception {
+        super.doInitConf();
+        conf.setManagedLedgerStorageClassName(
+            CustomizedManagedLedgerStorageForTest.class.getName());
+        conf.setManagedLedgerCacheSizeMB(10);
+        conf.setManagedLedgerCacheEvictionFrequency(1000);
+        conf.setManagedLedgerCacheEvictionTimeThresholdMillis(1000);
+    }
+
+    @Test
+    public void testNoMemoryLeakWhenExpireMessages() throws Exception {
+        final String topic = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+        final String subscription = "s1";
+
+        // Create topic with "CustomizedManagedLedger", which will
+        // call "PersistentTopic.checkMessageExpiryWithoutSharedPosition" when 
expiring messages.
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setMaxEntriesPerLedger(10);
+        int messageTTLInSecond = 1;
+        ManagedLedger ml = pulsar.getDefaultManagedLedgerFactory()
+            .open(TopicName.get(topic).getPersistenceNamingEncoding(), config);
+        CustomizedManagedLedgerStorageForTest.CustomizedManagedLedger 
customizedManagedLedger =
+                
(CustomizedManagedLedgerStorageForTest.CustomizedManagedLedger) ml;
+        assertTrue(ml instanceof 
CustomizedManagedLedgerStorageForTest.CustomizedManagedLedger);
+        Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING).topic(topic).create();
+        Consumer<String> consumer = 
pulsarClient.newConsumer(Schema.STRING).topic(topic)
+                .subscriptionName(subscription).subscribe();
+        PersistentTopic persistentTopic =
+                (PersistentTopic) pulsar.getBrokerService().getTopic(topic, 
false).join().get();
+        admin.topicPolicies().setMessageTTL(topic, messageTTLInSecond);
+        for (int i = 0; i < 50; i++) {
+            producer.send("msg-" + i);
+        }
+
+        // Trigger a messages expiring.
+        ManagedCursor managedCursor = 
persistentTopic.getSubscription(subscription).getCursor();
+        assertTrue(managedCursor instanceof 
CustomizedManagedLedgerStorageForTest.ManagedCursorDecorator);
+        CustomizedManagedLedgerStorageForTest.ManagedCursorDecorator 
cursorDecorator =
+                (CustomizedManagedLedgerStorageForTest.ManagedCursorDecorator) 
managedCursor;
+        Thread.sleep(messageTTLInSecond * 3);
+        Awaitility.await().untilAsserted(() -> {
+            persistentTopic.checkMessageExpiry();
+            assertEquals(cursorDecorator.getNumberOfEntriesInBacklog(true), 0);
+        });
+
+        // Verify: no memory leak.
+        Awaitility.await().atMost(60, TimeUnit.SECONDS).untilAsserted(() -> {
+            assertFalse(cursorDecorator.entryReleasedStatusMap.isEmpty());
+            for (Boolean released : 
cursorDecorator.entryReleasedStatusMap.values()) {
+                assertTrue(released);
+            }
+        });
+
+        // cleanup.
+        producer.close();
+        consumer.close();
+        admin.topics().delete(topic);
+    }
+}

Reply via email to