This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit f1974aa6af5bd32d0f14f7c402d8e2c6f1baae25
Author: Yunze Xu <[email protected]>
AuthorDate: Sat Jul 19 12:26:22 2025 +0800

    [fix][ml] Fix asyncReadEntries might never complete if empty entries are 
read from BK (#24515)
    
    (cherry picked from commit eea7c13fb75e7acea012fbe7b46be0f4f7558a90)
---
 .../mledger/impl/ManagedLedgerFactoryImpl.java     |  31 ++++--
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |   5 +-
 .../bookkeeper/mledger/impl/OpReadEntry.java       | 104 +++++++++++++++------
 .../mledger/impl/cache/RangeEntryCacheImpl.java    |   5 +-
 .../bookkeeper/mledger/impl/ManagedCursorTest.java |  38 ++++++++
 .../bookkeeper/test/MockedBookKeeperTestCase.java  |  22 ++++-
 6 files changed, 168 insertions(+), 37 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index 627e3225519..ac924258556 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -21,6 +21,7 @@ package org.apache.bookkeeper.mledger.impl;
 import static 
org.apache.bookkeeper.mledger.ManagedLedgerException.getManagedLedgerException;
 import static 
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.NULL_OFFLOAD_PROMISE;
 import static 
org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Predicates;
 import com.google.common.collect.BoundType;
 import com.google.common.collect.Maps;
@@ -47,6 +48,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import lombok.Getter;
@@ -108,6 +110,7 @@ import org.apache.pulsar.metadata.api.MetadataStore;
 import org.apache.pulsar.metadata.api.Stat;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 import org.apache.pulsar.metadata.api.extended.SessionEvent;
+import org.jspecify.annotations.Nullable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -170,7 +173,7 @@ public class ManagedLedgerFactoryImpl implements 
ManagedLedgerFactory {
                                     ManagedLedgerFactoryConfig config)
             throws Exception {
         this(metadataStore, new DefaultBkFactory(bkClientConfiguration),
-                true /* isBookkeeperManaged */, config, 
NullStatsLogger.INSTANCE, OpenTelemetry.noop());
+                true /* isBookkeeperManaged */, config, 
NullStatsLogger.INSTANCE, OpenTelemetry.noop(), null);
     }
 
     public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore, 
BookKeeper bookKeeper)
@@ -184,12 +187,21 @@ public class ManagedLedgerFactoryImpl implements 
ManagedLedgerFactory {
         this(metadataStore, (policyConfig) -> 
CompletableFuture.completedFuture(bookKeeper), config);
     }
 
+    @VisibleForTesting
+    public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore, 
BookKeeper bookKeeper,
+                                    @Nullable 
Function<ManagedLedgerFactoryImpl, EntryCacheManager>
+                                            entryCacheManagerCreator)
+            throws Exception {
+        this(metadataStore, __ -> 
CompletableFuture.completedFuture(bookKeeper), false,
+                new ManagedLedgerFactoryConfig(), NullStatsLogger.INSTANCE, 
OpenTelemetry.noop(),
+                entryCacheManagerCreator);
+    }
+
     public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore,
                                     
BookkeeperFactoryForCustomEnsemblePlacementPolicy bookKeeperGroupFactory,
                                     ManagedLedgerFactoryConfig config)
             throws Exception {
-        this(metadataStore, bookKeeperGroupFactory, false /* 
isBookkeeperManaged */,
-                config, NullStatsLogger.INSTANCE, OpenTelemetry.noop());
+        this(metadataStore, bookKeeperGroupFactory, config, 
NullStatsLogger.INSTANCE, OpenTelemetry.noop());
     }
 
     public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore,
@@ -198,7 +210,7 @@ public class ManagedLedgerFactoryImpl implements 
ManagedLedgerFactory {
                                     OpenTelemetry openTelemetry)
             throws Exception {
         this(metadataStore, bookKeeperGroupFactory, false /* 
isBookkeeperManaged */,
-                config, statsLogger, openTelemetry);
+                config, statsLogger, openTelemetry, null);
     }
 
     private ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore,
@@ -206,7 +218,10 @@ public class ManagedLedgerFactoryImpl implements 
ManagedLedgerFactory {
                                      boolean isBookkeeperManaged,
                                      ManagedLedgerFactoryConfig config,
                                      StatsLogger statsLogger,
-                                     OpenTelemetry openTelemetry) throws 
Exception {
+                                     OpenTelemetry openTelemetry,
+                                     @Nullable 
Function<ManagedLedgerFactoryImpl, EntryCacheManager>
+                                             entryCacheManagerCreator)
+            throws Exception {
         MetadataCompressionConfig compressionConfigForManagedLedgerInfo =
                 config.getCompressionConfigForManagedLedgerInfo();
         MetadataCompressionConfig compressionConfigForManagedCursorInfo =
@@ -228,7 +243,11 @@ public class ManagedLedgerFactoryImpl implements 
ManagedLedgerFactory {
                 compressionConfigForManagedCursorInfo);
         this.config = config;
         this.mbean = new ManagedLedgerFactoryMBeanImpl(this);
-        this.entryCacheManager = new RangeEntryCacheManagerImpl(this, 
scheduledExecutor, openTelemetry);
+        if (entryCacheManagerCreator == null) {
+            this.entryCacheManager = new RangeEntryCacheManagerImpl(this, 
scheduledExecutor, openTelemetry);
+        } else {
+            this.entryCacheManager = entryCacheManagerCreator.apply(this);
+        }
         this.statsTask = 
scheduledExecutor.scheduleWithFixedDelay(catchingAndLoggingThrowables(this::refreshStats),
                 0, config.getStatsPeriodSeconds(), TimeUnit.SECONDS);
         this.flushCursorsTask = 
scheduledExecutor.scheduleAtFixedRate(catchingAndLoggingThrowables(this::flushCursors),
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 2022fd395e1..f98fc8b425d 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -2155,8 +2155,9 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
             // If all messages in [firstEntry...lastEntry] are filter out,
             // then manual call internalReadEntriesComplete to advance read 
position.
             if (firstValidEntry == -1L) {
-                
opReadEntry.internalReadEntriesComplete(Collections.emptyList(), 
opReadEntry.ctx,
-                        PositionFactory.create(ledger.getId(), lastEntry));
+                final var nextReadPosition = 
PositionFactory.create(ledger.getId(), lastEntry).getNext();
+                opReadEntry.updateReadPosition(nextReadPosition);
+                opReadEntry.checkReadCompletion();
                 return;
             }
 
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
index a4928b44bd9..2dcbe50a62c 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
@@ -21,7 +21,6 @@ package org.apache.bookkeeper.mledger.impl;
 import io.netty.util.Recycler;
 import io.netty.util.Recycler.Handle;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.function.Predicate;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
@@ -67,7 +66,12 @@ class OpReadEntry implements ReadEntriesCallback {
         return op;
     }
 
-    void internalReadEntriesComplete(List<Entry> returnedEntries, Object ctx, 
Position lastPosition) {
+    private void internalReadEntriesComplete(List<Entry> returnedEntries) {
+        if (returnedEntries.isEmpty()) {
+            log.warn("[{}] Read no entries unexpectedly", this);
+            checkReadCompletion();
+            return;
+        }
         // Filter the returned entries for individual deleted messages
         int entriesCount = returnedEntries.size();
         long entriesSize = 0;
@@ -76,19 +80,15 @@ class OpReadEntry implements ReadEntriesCallback {
         }
         cursor.updateReadStats(entriesCount, entriesSize);
 
-        if (entriesCount != 0) {
-            lastPosition = returnedEntries.get(entriesCount - 1).getPosition();
-        }
         if (log.isDebugEnabled()) {
             log.debug("[{}][{}] Read entries succeeded batch_size={} 
cumulative_size={} requested_count={}",
                     cursor.ledger.getName(), cursor.getName(), 
returnedEntries.size(), entries.size(), count);
         }
 
-        List<Entry> filteredEntries = Collections.emptyList();
-        if (entriesCount != 0) {
-            filteredEntries = cursor.filterReadEntries(returnedEntries);
-            entries.addAll(filteredEntries);
-        }
+        // Entries might be released after `filterReadEntries`, so retrieve 
the last position before that
+        final var lastPosition = returnedEntries.get(entriesCount - 
1).getPosition();
+        final var filteredEntries = cursor.filterReadEntries(returnedEntries);
+        entries.addAll(filteredEntries);
 
         // if entries have been filtered out then try to skip reading of 
already deletedMessages in that range
         final Position nexReadPosition = entriesCount != filteredEntries.size()
@@ -99,19 +99,30 @@ class OpReadEntry implements ReadEntriesCallback {
 
     @Override
     public void readEntriesComplete(List<Entry> returnedEntries, Object ctx) {
-        internalReadEntriesComplete(returnedEntries, ctx, null);
+        try {
+            internalReadEntriesComplete(returnedEntries);
+        } catch (Throwable throwable) {
+            log.error("[{}] Fallback to readEntriesFailed for exception in 
readEntriesComplete", this, throwable);
+            
readEntriesFailed(ManagedLedgerException.getManagedLedgerException(throwable), 
ctx);
+        }
     }
 
     @Override
     public void readEntriesFailed(ManagedLedgerException exception, Object 
ctx) {
+        try {
+            internalReadEntriesFailed(exception, ctx);
+        } catch (Throwable throwable) {
+            // At least we should complete the callback
+            fail(ManagedLedgerException.getManagedLedgerException(throwable), 
ctx);
+        }
+    }
+
+    private void internalReadEntriesFailed(ManagedLedgerException exception, 
Object ctx) {
         cursor.readOperationCompleted();
 
         if (!entries.isEmpty()) {
             // There were already some entries that were read before, we can 
return them
-            cursor.ledger.getExecutor().execute(() -> {
-                callback.readEntriesComplete(entries, ctx);
-                recycle();
-            });
+            complete(ctx);
         } else if (cursor.getConfig().isAutoSkipNonRecoverableData()
                 && exception instanceof NonRecoverableLedgerException) {
             log.warn("[{}][{}] read failed from ledger at position:{} : {}", 
cursor.ledger.getName(), cursor.getName(),
@@ -129,9 +140,7 @@ class OpReadEntry implements ReadEntriesCallback {
             }
             // fail callback if it couldn't find next valid ledger
             if (nexReadPosition == null) {
-                callback.readEntriesFailed(exception, ctx);
-                cursor.ledger.mbean.recordReadEntriesError();
-                recycle();
+                fail(exception, ctx);
                 return;
             }
             updateReadPosition(nexReadPosition);
@@ -152,9 +161,7 @@ class OpReadEntry implements ReadEntriesCallback {
                 }
             }
 
-            callback.readEntriesFailed(exception, ctx);
-            cursor.ledger.mbean.recordReadEntriesError();
-            recycle();
+            fail(exception, ctx);
         }
     }
 
@@ -177,12 +184,8 @@ class OpReadEntry implements ReadEntriesCallback {
             // The reading was already completed, release resources and 
trigger callback
             try {
                 cursor.readOperationCompleted();
-
             } finally {
-                cursor.ledger.getExecutor().execute(() -> {
-                    callback.readEntriesComplete(entries, ctx);
-                    recycle();
-                });
+                complete(ctx);
             }
         }
     }
@@ -217,5 +220,54 @@ class OpReadEntry implements ReadEntriesCallback {
         recyclerHandle.recycle(this);
     }
 
+    private void complete(Object ctx) {
+        cursor.ledger.getExecutor().execute(() -> {
+            try {
+                callback.readEntriesComplete(entries, ctx);
+                recycle();
+            } catch (Throwable throwable) {
+                log.error("[{}] readEntriesComplete failed (last position: 
{})", this, lastEntryPosition(), throwable);
+            }
+        });
+    }
+
+    private void fail(ManagedLedgerException e, Object ctx) {
+        try {
+            callback.readEntriesFailed(e, ctx);
+            cursor.ledger.mbean.recordReadEntriesError();
+            recycle();
+        } catch (Throwable throwable) {
+            log.error("[{}] readEntriesFailed failed (exception: {})", this, 
e.getMessage(), throwable);
+        }
+    }
+
+    @Override
+    public String toString() {
+        final var cursor = this.cursor;
+        final var readPosition = this.readPosition;
+        final var nextReadPosition = this.nextReadPosition;
+        final var entries = this.entries;
+        final var maxPosition = this.maxPosition;
+        final var count = this.count;
+        if (cursor != null) {
+            return cursor.ledger.getName() + " " + cursor.getName() + "{ 
readPosition: "
+                    + (readPosition != null ? readPosition : "(null)") + ", 
nextReadPosition: "
+                    + (nextReadPosition != null ? nextReadPosition : "(null)") 
+ ", maxPosition: "
+                    + (maxPosition != null ? maxPosition : "(null)") + ", 
entries count: "
+                    + (entries != null ? entries.size() : "(null)") + ", 
count: " + count + " }";
+        } else {
+            return "(null)";
+        }
+    }
+
+    private String lastEntryPosition() {
+        final var entries = this.entries;
+        if (entries != null) {
+            return entries.isEmpty() ? "(empty)" : entries.get(entries.size() 
- 1).getPosition().toString();
+        } else {
+            return "(null)";
+        }
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(OpReadEntry.class);
 }
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
index ad5630c078a..d4fbeec9617 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
@@ -440,8 +440,9 @@ public class RangeEntryCacheImpl implements EntryCache {
      * @param shouldCacheEntry if we should put the entry into the cache
      * @return a handle to the operation
      */
-    CompletableFuture<List<EntryImpl>> readFromStorage(ReadHandle lh,
-                                                       long firstEntry, long 
lastEntry, boolean shouldCacheEntry) {
+    @VisibleForTesting
+    protected CompletableFuture<List<EntryImpl>> readFromStorage(ReadHandle 
lh, long firstEntry, long lastEntry,
+                                                                 boolean 
shouldCacheEntry) {
         final int entriesToRead = (int) (lastEntry - firstEntry) + 1;
         CompletableFuture<List<EntryImpl>> readResult = 
ReadEntryUtils.readAsync(ml, lh, firstEntry, lastEntry)
                 .thenApply(
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 34b67c5d020..b900ed1ecf8 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -71,6 +71,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import lombok.Cleanup;
 import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
 import org.apache.bookkeeper.client.BKException;
@@ -99,6 +100,8 @@ import org.apache.bookkeeper.mledger.PositionFactory;
 import org.apache.bookkeeper.mledger.ScanOutcome;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback;
 import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
+import org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl;
+import org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheManagerImpl;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
@@ -121,6 +124,7 @@ import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
@@ -133,6 +137,10 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
         return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
     }
 
+    @AfterMethod
+    public void afterMethod() {
+        setEntryCacheCreator(null);
+    }
 
     @Test
     public void testCloseCursor() throws Exception {
@@ -5402,6 +5410,36 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
         assertTrue(recovered.booleanValue());
     }
 
+    @Test(timeOut = 10000)
+    public void testReadNoEntries() throws Exception {
+        final var firstRead = new AtomicBoolean(true);
+        setEntryCacheCreator(ml -> new 
RangeEntryCacheImpl((RangeEntryCacheManagerImpl) factory.getEntryCacheManager(),
+                ml, factory.getConfig().isCopyEntriesInCache()) {
+
+            @Override
+            protected CompletableFuture<List<EntryImpl>> 
readFromStorage(ReadHandle lh, long firstEntry, long lastEntry,
+                                                                         
boolean shouldCacheEntry) {
+                if (firstRead.compareAndSet(true, false)) {
+                    return CompletableFuture.completedFuture(List.of());
+                }
+                return super.readFromStorage(lh, firstEntry, lastEntry, 
shouldCacheEntry);
+            }
+        });
+        final var ml = factory.open("testReadNoEntries");
+        final var cursor = ml.openCursor("cursor");
+        cursor.setInactive(); // disable caching when adding entries
+        for (int i = 0; i < 10; i++) {
+            ml.addEntry(("msg-" + i).getBytes(StandardCharsets.UTF_8));
+        }
+        final var entries = cursor.readEntries(10);
+        assertEquals(entries.stream().map(e -> {
+            final var buffer = e.getDataBuffer();
+            final var bytes = new byte[buffer.readableBytes()];
+            buffer.readBytes(bytes);
+            return new String(bytes, StandardCharsets.UTF_8);
+        }).toList(), IntStream.range(0, 10).mapToObj(i -> "msg-" + 
i).toList());
+    }
+
     class TestPulsarMockBookKeeper extends PulsarMockBookKeeper {
         Map<Long, Integer> ledgerErrors = new HashMap<>();
 
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java
index 178a9cf984d..a23fbea96a6 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java
@@ -18,17 +18,22 @@
  */
 package org.apache.bookkeeper.test;
 
+import io.opentelemetry.api.OpenTelemetry;
 import java.lang.reflect.Method;
 import java.util.Optional;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
 import lombok.SneakyThrows;
 import org.apache.bookkeeper.client.PulsarMockBookKeeper;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.cache.EntryCache;
+import org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheManagerImpl;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
@@ -57,6 +62,7 @@ public abstract class MockedBookKeeperTestCase {
     protected ExecutorService cachedExecutor;
 
     protected FaultInjectionMetadataStore metadataStore;
+    private Function<ManagedLedgerImpl, EntryCache> entryCacheCreator = null;
 
     public MockedBookKeeperTestCase() {
         // By default start a 3 bookies cluster
@@ -84,7 +90,17 @@ public abstract class MockedBookKeeperTestCase {
 
         ManagedLedgerFactoryConfig managedLedgerFactoryConfig = new 
ManagedLedgerFactoryConfig();
         initManagedLedgerFactoryConfig(managedLedgerFactoryConfig);
-        factory = new ManagedLedgerFactoryImpl(metadataStore, bkc);
+        factory = new ManagedLedgerFactoryImpl(metadataStore, bkc, __ -> new 
RangeEntryCacheManagerImpl(__,
+                __.getScheduledExecutor(), OpenTelemetry.noop()) {
+
+            @Override
+            public EntryCache getEntryCache(ManagedLedgerImpl ml) {
+                if (entryCacheCreator != null) {
+                    return entryCacheCreator.apply(ml);
+                }
+                return super.getEntryCache(ml);
+            }
+        });
 
         setUpTestCase();
     }
@@ -164,4 +180,8 @@ public abstract class MockedBookKeeperTestCase {
     protected void stopMetadataStore() {
         metadataStore.setAlwaysFail(new MetadataStoreException("failed"));
     }
+
+    protected void setEntryCacheCreator(Function<ManagedLedgerImpl, 
EntryCache> entryCacheCreator) {
+        this.entryCacheCreator = entryCacheCreator;
+    }
 }

Reply via email to