This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 2e857605f8b [fix][broker] Make InflightReadsLimiter asynchronous and 
apply it for replay queue reads (#23901)
2e857605f8b is described below

commit 2e857605f8b538d28b67851cd42c6304d76fdfd7
Author: Lari Hotari <[email protected]>
AuthorDate: Wed Jan 29 22:11:56 2025 +0200

    [fix][broker] Make InflightReadsLimiter asynchronous and apply it for 
replay queue reads (#23901)
    
    (cherry picked from commit c5173d5e15efade90afb9b0b1c19f3ba5b3aab37)
---
 .../mledger/ManagedLedgerFactoryConfig.java        |  12 +
 .../mledger/impl/ManagedLedgerFactoryImpl.java     |   2 +-
 .../mledger/impl/cache/InflightReadsLimiter.java   | 252 +++++++---
 .../mledger/impl/cache/PendingReadsManager.java    | 156 +++---
 .../mledger/impl/cache/RangeEntryCacheImpl.java    | 298 +++++------
 .../impl/cache/RangeEntryCacheManagerImpl.java     |  16 +-
 .../impl/InflightReadsLimiterIntegrationTest.java  |  13 +-
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java |  92 ++--
 .../impl/cache/InflightReadsLimiterTest.java       | 560 ++++++++++++++++-----
 .../impl/cache/PendingReadsManagerTest.java        |   4 +-
 .../apache/pulsar/broker/ServiceConfiguration.java |   9 +
 .../pulsar/broker/ManagedLedgerClientFactory.java  |  17 +-
 12 files changed, 984 insertions(+), 447 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java
index 386310b3ccb..af538262ed4 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java
@@ -61,6 +61,18 @@ public class ManagedLedgerFactoryConfig {
      */
     private long managedLedgerMaxReadsInFlightSize = 0;
 
+    /**
+     * Maximum time to wait for acquiring permits for max reads in flight when 
managedLedgerMaxReadsInFlightSizeInMB is
+     * set (>0) and the limit is reached.
+     */
+    private long managedLedgerMaxReadsInFlightPermitsAcquireTimeoutMillis = 
60000;
+
+    /**
+     * Maximum number of reads that can be queued for acquiring permits for 
max reads in flight when
+     * managedLedgerMaxReadsInFlightSizeInMB is set (>0) and the limit is 
reached.
+     */
+    private int managedLedgerMaxReadsInFlightPermitsAcquireQueueSize = 10000;
+
     /**
      * Whether trace managed ledger task execution time.
      */
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index 12c3ea12df5..225f4dba493 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -228,7 +228,7 @@ public class ManagedLedgerFactoryImpl implements 
ManagedLedgerFactory {
                 compressionConfigForManagedCursorInfo);
         this.config = config;
         this.mbean = new ManagedLedgerFactoryMBeanImpl(this);
-        this.entryCacheManager = new RangeEntryCacheManagerImpl(this, 
openTelemetry);
+        this.entryCacheManager = new RangeEntryCacheManagerImpl(this, 
scheduledExecutor, openTelemetry);
         this.statsTask = 
scheduledExecutor.scheduleWithFixedDelay(catchingAndLoggingThrowables(this::refreshStats),
                 0, config.getStatsPeriodSeconds(), TimeUnit.SECONDS);
         this.flushCursorsTask = 
scheduledExecutor.scheduleAtFixedRate(catchingAndLoggingThrowables(this::flushCursors),
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java
index c87807b8663..1f4d2c26797 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java
@@ -22,12 +22,16 @@ import com.google.common.annotations.VisibleForTesting;
 import io.opentelemetry.api.OpenTelemetry;
 import io.opentelemetry.api.metrics.ObservableLongCounter;
 import io.prometheus.client.Gauge;
-import lombok.AllArgsConstructor;
-import lombok.ToString;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.opentelemetry.Constants;
 import 
org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.InflightReadLimiterUtilization;
 import org.apache.pulsar.opentelemetry.annotations.PulsarDeprecatedMetric;
+import org.jctools.queues.SpscArrayQueue;
 
 @Slf4j
 public class InflightReadsLimiter implements AutoCloseable {
@@ -58,16 +62,36 @@ public class InflightReadsLimiter implements AutoCloseable {
 
     private final long maxReadsInFlightSize;
     private long remainingBytes;
+    private final long acquireTimeoutMillis;
+    private final ScheduledExecutorService timeOutExecutor;
+    private final boolean enabled;
 
-    public InflightReadsLimiter(long maxReadsInFlightSize, OpenTelemetry 
openTelemetry) {
-        if (maxReadsInFlightSize <= 0) {
+    record Handle(long permits, long creationTime, boolean success) {
+    }
+
+    record QueuedHandle(Handle handle, Consumer<Handle> callback) {
+    }
+
+    private final Queue<QueuedHandle> queuedHandles;
+    private boolean timeoutCheckRunning = false;
+
+    public InflightReadsLimiter(long maxReadsInFlightSize, int 
maxReadsInFlightAcquireQueueSize,
+                                long acquireTimeoutMillis, 
ScheduledExecutorService timeOutExecutor,
+                                OpenTelemetry openTelemetry) {
+        this.maxReadsInFlightSize = maxReadsInFlightSize;
+        this.remainingBytes = maxReadsInFlightSize;
+        this.acquireTimeoutMillis = acquireTimeoutMillis;
+        this.timeOutExecutor = timeOutExecutor;
+        if (maxReadsInFlightSize > 0) {
+            enabled = true;
+            this.queuedHandles = new 
SpscArrayQueue<>(maxReadsInFlightAcquireQueueSize);
+        } else {
+            enabled = false;
+            this.queuedHandles = null;
             // set it to -1 in order to show in the metrics that the metric is 
not available
             PULSAR_ML_READS_BUFFER_SIZE.set(-1);
             PULSAR_ML_READS_AVAILABLE_BUFFER_SIZE.set(-1);
         }
-        this.maxReadsInFlightSize = maxReadsInFlightSize;
-        this.remainingBytes = maxReadsInFlightSize;
-
         var meter = 
openTelemetry.getMeter(Constants.BROKER_INSTRUMENTATION_SCOPE_NAME);
         inflightReadsLimitCounter = 
meter.counterBuilder(INFLIGHT_READS_LIMITER_LIMIT_METRIC_NAME)
                 .setDescription("Maximum number of bytes that can be retained 
by managed ledger data read from storage "
@@ -102,70 +126,178 @@ public class InflightReadsLimiter implements 
AutoCloseable {
         inflightReadsUsageCounter.close();
     }
 
-    @AllArgsConstructor
-    @ToString
-    static class Handle {
-        final long acquiredPermits;
-        final boolean success;
-        final int trials;
+    private static final Handle DISABLED = new Handle(0, 0, true);
+    private static final Optional<Handle> DISABLED_OPTIONAL = 
Optional.of(DISABLED);
 
-        final long creationTime;
+    /**
+     * Acquires permits from the limiter. If the limiter is disabled, it will 
immediately return a successful handle.
+     * If permits are available, it will return a handle with the acquired 
permits. If no permits are available,
+     * it will return an empty optional and the callback will be called when 
permits become available or when the
+     * acquire timeout is reached. The success field in the handle passed to 
the callback will be false if the acquire
+     * operation times out. The callback should be non-blocking and run on a 
desired executor handled within the
+     * callback itself.
+     *
+     * A successful handle will have the success field set to true, and the 
caller must call release with the handle
+     * when the permits are no longer needed.
+     *
+     * If an unsuccessful handle is returned immediately, it means that the 
queue limit has been reached and the
+     * callback will not be called. The caller should fail the read operation 
in this case to apply backpressure.
+     *
+     * @param permits  the number of permits to acquire
+     * @param callback the callback to be called when the permits are acquired 
or timed out
+     * @return an optional handle that contains the permits if acquired, 
otherwise an empty optional
+     */
+    public Optional<Handle> acquire(long permits, Consumer<Handle> callback) {
+        if (isDisabled()) {
+            return DISABLED_OPTIONAL;
+        }
+        return internalAcquire(permits, callback);
     }
 
-    private static final Handle DISABLED = new Handle(0, true, 0, -1);
+    private synchronized Optional<Handle> internalAcquire(long permits, 
Consumer<Handle> callback) {
+        Handle handle = new Handle(permits, System.currentTimeMillis(), true);
+        if (remainingBytes >= permits) {
+            remainingBytes -= permits;
+            if (log.isDebugEnabled()) {
+                log.debug("acquired permits: {}, creationTime: {}, 
remainingBytes:{}", permits, handle.creationTime,
+                        remainingBytes);
+            }
+            updateMetrics();
+            return Optional.of(handle);
+        } else if (permits > maxReadsInFlightSize && remainingBytes == 
maxReadsInFlightSize) {
+            remainingBytes = 0;
+            if (log.isInfoEnabled()) {
+                log.info("Requested permits {} exceeded maxReadsInFlightSize 
{}, creationTime: {}, remainingBytes:{}. "
+                                + "Allowing request with permits set to 
maxReadsInFlightSize.",
+                        permits, maxReadsInFlightSize, handle.creationTime, 
remainingBytes);
+            }
+            updateMetrics();
+            return Optional.of(new Handle(maxReadsInFlightSize, 
handle.creationTime, true));
+        } else {
+            if (queuedHandles.offer(new QueuedHandle(handle, callback))) {
+                scheduleTimeOutCheck(acquireTimeoutMillis);
+                return Optional.empty();
+            } else {
+                log.warn("Failed to queue handle for acquiring permits: {}, 
creationTime: {}, remainingBytes:{}",
+                        permits, handle.creationTime, remainingBytes);
+                return Optional.of(new Handle(0, handle.creationTime, false));
+            }
+        }
+    }
 
-    Handle acquire(long permits, Handle current) {
-        if (maxReadsInFlightSize <= 0) {
-            // feature is disabled
-            return DISABLED;
+    private synchronized void scheduleTimeOutCheck(long delayMillis) {
+        if (acquireTimeoutMillis <= 0) {
+            return;
         }
-        synchronized (this) {
-            try {
-                if (current == null) {
-                    if (remainingBytes == 0) {
-                        return new Handle(0, false, 1, 
System.currentTimeMillis());
-                    }
-                    if (remainingBytes >= permits) {
-                        remainingBytes -= permits;
-                        return new Handle(permits, true, 1, 
System.currentTimeMillis());
-                    } else {
-                        long possible = remainingBytes;
-                        remainingBytes = 0;
-                        return new Handle(possible, false, 1, 
System.currentTimeMillis());
-                    }
+        if (!timeoutCheckRunning) {
+            timeoutCheckRunning = true;
+            timeOutExecutor.schedule(this::timeoutCheck, delayMillis, 
TimeUnit.MILLISECONDS);
+        }
+    }
+
+    private synchronized void timeoutCheck() {
+        timeoutCheckRunning = false;
+        long delay = 0;
+        while (true) {
+            QueuedHandle queuedHandle = queuedHandles.peek();
+            if (queuedHandle != null) {
+                long age = System.currentTimeMillis() - 
queuedHandle.handle.creationTime;
+                if (age >= acquireTimeoutMillis) {
+                    // remove the peeked handle from the queue
+                    queuedHandles.poll();
+                    handleTimeout(queuedHandle);
                 } else {
-                    if (current.trials >= 4 && current.acquiredPermits > 0) {
-                        remainingBytes += current.acquiredPermits;
-                        return new Handle(0, false, 1, current.creationTime);
-                    }
-                    if (remainingBytes == 0) {
-                        return new Handle(current.acquiredPermits, false, 
current.trials + 1,
-                                current.creationTime);
-                    }
-                    long needed = permits - current.acquiredPermits;
-                    if (remainingBytes >= needed) {
-                        remainingBytes -= needed;
-                        return new Handle(permits, true, current.trials + 1, 
current.creationTime);
-                    } else {
-                        long possible = remainingBytes;
-                        remainingBytes = 0;
-                        return new Handle(current.acquiredPermits + possible, 
false,
-                                current.trials + 1, current.creationTime);
-                    }
+                    delay = acquireTimeoutMillis - age;
+                    break;
                 }
-            } finally {
-                updateMetrics();
+            } else {
+                break;
             }
         }
+        if (delay > 0) {
+            scheduleTimeOutCheck(delay);
+        }
+    }
+
+    private void handleTimeout(QueuedHandle queuedHandle) {
+        if (log.isDebugEnabled()) {
+            log.debug("timed out queued permits: {}, creationTime: {}, 
remainingBytes:{}",
+                    queuedHandle.handle.permits, 
queuedHandle.handle.creationTime, remainingBytes);
+        }
+        try {
+            queuedHandle.callback.accept(new Handle(0, 
queuedHandle.handle.creationTime, false));
+        } catch (Exception e) {
+            log.error("Error in callback of timed out queued permits: {}, 
creationTime: {}, remainingBytes:{}",
+                    queuedHandle.handle.permits, 
queuedHandle.handle.creationTime, remainingBytes, e);
+        }
     }
 
-    void release(Handle handle) {
+    /**
+     * Releases permits back to the limiter. If the handle is disabled, this 
method will be a no-op.
+     *
+     * @param handle the handle containing the permits to release
+     */
+    public void release(Handle handle) {
         if (handle == DISABLED) {
             return;
         }
-        synchronized (this) {
-            remainingBytes += handle.acquiredPermits;
-            updateMetrics();
+        internalRelease(handle);
+    }
+
+    private synchronized void internalRelease(Handle handle) {
+        if (log.isDebugEnabled()) {
+            log.debug("release permits: {}, creationTime: {}, 
remainingBytes:{}", handle.permits,
+                    handle.creationTime, getRemainingBytes());
+        }
+        remainingBytes += handle.permits;
+        while (true) {
+            QueuedHandle queuedHandle = queuedHandles.peek();
+            if (queuedHandle != null) {
+                boolean timedOut = acquireTimeoutMillis > 0
+                        && System.currentTimeMillis() - 
queuedHandle.handle.creationTime > acquireTimeoutMillis;
+                if (timedOut) {
+                    // remove the peeked handle from the queue
+                    queuedHandles.poll();
+                    handleTimeout(queuedHandle);
+                } else if (remainingBytes >= queuedHandle.handle.permits
+                        || queuedHandle.handle.permits > maxReadsInFlightSize
+                        && remainingBytes == maxReadsInFlightSize) {
+                    // remove the peeked handle from the queue
+                    queuedHandles.poll();
+                    handleQueuedHandle(queuedHandle);
+                } else {
+                    break;
+                }
+            } else {
+                break;
+            }
+        }
+        updateMetrics();
+    }
+
+    private void handleQueuedHandle(QueuedHandle queuedHandle) {
+        long permits = queuedHandle.handle.permits;
+        Handle handleForCallback = queuedHandle.handle;
+        if (permits > maxReadsInFlightSize && remainingBytes == 
maxReadsInFlightSize) {
+            remainingBytes = 0;
+            if (log.isInfoEnabled()) {
+                log.info("Requested permits {} exceeded maxReadsInFlightSize 
{}, creationTime: {}, remainingBytes:{}. "
+                                + "Allowing request with permits set to 
maxReadsInFlightSize.",
+                        permits, maxReadsInFlightSize, 
queuedHandle.handle.creationTime, remainingBytes);
+            }
+            handleForCallback = new Handle(maxReadsInFlightSize, 
queuedHandle.handle.creationTime, true);
+        } else {
+            remainingBytes -= permits;
+            if (log.isDebugEnabled()) {
+                log.debug("acquired queued permits: {}, creationTime: {}, 
remainingBytes:{}",
+                        permits, queuedHandle.handle.creationTime, 
remainingBytes);
+            }
+        }
+        try {
+            queuedHandle.callback.accept(handleForCallback);
+        } catch (Exception e) {
+            log.error("Error in callback of acquired queued permits: {}, 
creationTime: {}, remainingBytes:{}",
+                    handleForCallback.permits, handleForCallback.creationTime, 
remainingBytes, e);
         }
     }
 
@@ -175,8 +307,6 @@ public class InflightReadsLimiter implements AutoCloseable {
     }
 
     public boolean isDisabled() {
-        return maxReadsInFlightSize <= 0;
+        return !enabled;
     }
-
-
-}
+}
\ No newline at end of file
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java
index d733b54dd13..5944199287e 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java
@@ -25,9 +25,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
-import lombok.AllArgsConstructor;
-import lombok.Value;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
@@ -95,15 +94,11 @@ public class PendingReadsManager {
         this.rangeEntryCache = rangeEntryCache;
     }
 
-    @Value
-    private static class PendingReadKey {
-        private final long startEntry;
-        private final long endEntry;
+    private record PendingReadKey(long startEntry, long endEntry) {
         long size() {
             return endEntry - startEntry + 1;
         }
 
-
         boolean includes(PendingReadKey other) {
             return startEntry <= other.startEntry && other.endEntry <= 
endEntry;
         }
@@ -135,25 +130,18 @@ public class PendingReadsManager {
 
     }
 
-    @AllArgsConstructor
-    private static final class ReadEntriesCallbackWithContext {
-        final AsyncCallbacks.ReadEntriesCallback callback;
-        final Object ctx;
-        final long startEntry;
-        final long endEntry;
+    private record 
ReadEntriesCallbackWithContext(AsyncCallbacks.ReadEntriesCallback callback, 
Object ctx,
+                                                  long startEntry, long 
endEntry) {
     }
 
-    @AllArgsConstructor
-    private static final class FindPendingReadOutcome {
-        final PendingRead pendingRead;
-        final PendingReadKey missingOnLeft;
-        final PendingReadKey missingOnRight;
+    private record FindPendingReadOutcome(PendingRead pendingRead,
+                                          PendingReadKey missingOnLeft, 
PendingReadKey missingOnRight) {
         boolean needsAdditionalReads() {
             return missingOnLeft != null || missingOnRight != null;
         }
     }
 
-    private FindPendingReadOutcome findPendingRead(PendingReadKey key, 
Map<PendingReadKey,
+    private FindPendingReadOutcome findPendingRead(PendingReadKey key, 
ConcurrentMap<PendingReadKey,
             PendingRead> ledgerCache, AtomicBoolean created) {
         synchronized (ledgerCache) {
             PendingRead existing = ledgerCache.get(key);
@@ -222,18 +210,74 @@ public class PendingReadsManager {
 
     private class PendingRead {
         final PendingReadKey key;
-        final Map<PendingReadKey, PendingRead> ledgerCache;
+        final ConcurrentMap<PendingReadKey, PendingRead> ledgerCache;
         final List<ReadEntriesCallbackWithContext> callbacks = new 
ArrayList<>(1);
         boolean completed = false;
 
         public PendingRead(PendingReadKey key,
-                           Map<PendingReadKey, PendingRead> ledgerCache) {
+                           ConcurrentMap<PendingReadKey, PendingRead> 
ledgerCache) {
             this.key = key;
             this.ledgerCache = ledgerCache;
         }
 
-        private List<EntryImpl> keepEntries(List<EntryImpl> list, long 
startEntry, long endEntry) {
-            List<EntryImpl> result = new ArrayList<>((int) (endEntry - 
startEntry));
+        public void attach(CompletableFuture<List<EntryImpl>> handle) {
+            handle.whenComplete((entriesToReturn, error) -> {
+                // execute in the completing thread
+                completeAndRemoveFromCache();
+                // execute the callbacks in the managed ledger executor
+                rangeEntryCache.getManagedLedger().getExecutor().execute(() -> 
{
+                    if (error != null) {
+                        readEntriesFailed(error);
+                    } else {
+                        readEntriesComplete(entriesToReturn);
+                    }
+                });
+            });
+        }
+
+        private synchronized void completeAndRemoveFromCache() {
+            completed = true;
+            // When the read has completed, remove the instance from the 
ledgerCache map
+            // so that new reads will go to a new instance.
+            // this is required because we are going to do refcount management
+            // on the results of the callback
+            ledgerCache.remove(key, this);
+        }
+
+        private synchronized void readEntriesComplete(List<EntryImpl> 
entriesToReturn) {
+            if (callbacks.size() == 1) {
+                ReadEntriesCallbackWithContext first = callbacks.get(0);
+                if (first.startEntry == key.startEntry
+                        && first.endEntry == key.endEntry) {
+                    // perfect match, no copy, this is the most common case
+                    first.callback.readEntriesComplete((List) entriesToReturn,
+                            first.ctx);
+                } else {
+                    first.callback.readEntriesComplete(
+                            keepEntries(entriesToReturn, first.startEntry, 
first.endEntry),
+                            first.ctx);
+                }
+            } else {
+                for (ReadEntriesCallbackWithContext callback : callbacks) {
+                    callback.callback.readEntriesComplete(
+                            copyEntries(entriesToReturn, callback.startEntry, 
callback.endEntry),
+                            callback.ctx);
+                }
+                for (EntryImpl entry : entriesToReturn) {
+                    entry.release();
+                }
+            }
+        }
+
+        private synchronized void readEntriesFailed(Throwable error) {
+            for (ReadEntriesCallbackWithContext callback : callbacks) {
+                ManagedLedgerException mlException = 
createManagedLedgerException(error);
+                callback.callback.readEntriesFailed(mlException, callback.ctx);
+            }
+        }
+
+        private List<Entry> keepEntries(List<EntryImpl> list, long startEntry, 
long endEntry) {
+            List<Entry> result = new ArrayList<>((int) (endEntry - 
startEntry));
             for (EntryImpl entry : list) {
                 long entryId = entry.getEntryId();
                 if (startEntry <= entryId && entryId <= endEntry) {
@@ -245,62 +289,16 @@ public class PendingReadsManager {
             return result;
         }
 
-        public void attach(CompletableFuture<List<EntryImpl>> handle) {
-            // when the future is done remove this from the map
-            // new reads will go to a new instance
-            // this is required because we are going to do refcount management
-            // on the results of the callback
-            handle.whenComplete((___, error) -> {
-                synchronized (PendingRead.this) {
-                    completed = true;
-                    synchronized (ledgerCache) {
-                        ledgerCache.remove(key, this);
-                    }
-                }
-            });
-
-            handle.thenAcceptAsync(entriesToReturn -> {
-                synchronized (PendingRead.this) {
-                    if (callbacks.size() == 1) {
-                        ReadEntriesCallbackWithContext first = 
callbacks.get(0);
-                        if (first.startEntry == key.startEntry
-                                && first.endEntry == key.endEntry) {
-                            // perfect match, no copy, this is the most common 
case
-                            first.callback.readEntriesComplete((List) 
entriesToReturn,
-                                    first.ctx);
-                        } else {
-                            first.callback.readEntriesComplete(
-                                    (List) keepEntries(entriesToReturn, 
first.startEntry, first.endEntry),
-                                    first.ctx);
-                        }
-                    } else {
-                        for (ReadEntriesCallbackWithContext callback : 
callbacks) {
-                            long callbackStartEntry = callback.startEntry;
-                            long callbackEndEntry = callback.endEntry;
-                            List<EntryImpl> copy = new ArrayList<>((int) 
(callbackEndEntry - callbackStartEntry + 1));
-                            for (EntryImpl entry : entriesToReturn) {
-                                long entryId = entry.getEntryId();
-                                if (callbackStartEntry <= entryId && entryId 
<= callbackEndEntry) {
-                                    EntryImpl entryCopy = 
EntryImpl.create(entry);
-                                    copy.add(entryCopy);
-                                }
-                            }
-                            callback.callback.readEntriesComplete((List) copy, 
callback.ctx);
-                        }
-                        for (EntryImpl entry : entriesToReturn) {
-                            entry.release();
-                        }
-                    }
-                }
-            }, 
rangeEntryCache.getManagedLedger().getExecutor()).exceptionally(exception -> {
-                synchronized (PendingRead.this) {
-                    for (ReadEntriesCallbackWithContext callback : callbacks) {
-                        ManagedLedgerException mlException = 
createManagedLedgerException(exception);
-                        callback.callback.readEntriesFailed(mlException, 
callback.ctx);
-                    }
+        private List<Entry> copyEntries(List<EntryImpl> entriesToReturn, long 
startEntry, long endEntry) {
+            List<Entry> result = new ArrayList<>((int) (endEntry - startEntry 
+ 1));
+            for (EntryImpl entry : entriesToReturn) {
+                long entryId = entry.getEntryId();
+                if (startEntry <= entryId && entryId <= endEntry) {
+                    EntryImpl entryCopy = EntryImpl.create(entry);
+                    result.add(entryCopy);
                 }
-                return null;
-            });
+            }
+            return result;
         }
 
         synchronized boolean addListener(AsyncCallbacks.ReadEntriesCallback 
callback,
@@ -318,7 +316,7 @@ public class PendingReadsManager {
                      final AsyncCallbacks.ReadEntriesCallback callback, Object 
ctx) {
         final PendingReadKey key = new PendingReadKey(firstEntry, lastEntry);
 
-        Map<PendingReadKey, PendingRead> pendingReadsForLedger =
+        ConcurrentMap<PendingReadKey, PendingRead> pendingReadsForLedger =
                 cachedPendingReads.computeIfAbsent(lh.getId(), (l) -> new 
ConcurrentHashMap<>());
 
         boolean listenerAdded = false;
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
index d52fc8535b5..b81015ea639 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
@@ -22,18 +22,19 @@ import static 
com.google.common.base.Preconditions.checkArgument;
 import static java.util.Objects.requireNonNull;
 import static 
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Lists;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.PooledByteBufAllocator;
+import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Iterator;
+import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.LongAdder;
 import org.apache.bookkeeper.client.api.BKException;
 import org.apache.bookkeeper.client.api.LedgerEntry;
 import org.apache.bookkeeper.client.api.ReadHandle;
-import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
 import org.apache.bookkeeper.mledger.Entry;
@@ -58,6 +59,8 @@ public class RangeEntryCacheImpl implements EntryCache {
      * Overhead per-entry to take into account the envelope.
      */
     public static final long BOOKKEEPER_READ_OVERHEAD_PER_ENTRY = 64;
+    private static final int DEFAULT_ESTIMATED_ENTRY_SIZE = 10 * 1024;
+    private static final boolean DEFAULT_CACHE_INDIVIDUAL_READ_ENTRY = false;
 
     private final RangeEntryCacheManagerImpl manager;
     final ManagedLedgerImpl ml;
@@ -66,18 +69,16 @@ public class RangeEntryCacheImpl implements EntryCache {
     private final boolean copyEntries;
     private final PendingReadsManager pendingReadsManager;
 
-    private volatile long estimatedEntrySize = 10 * 1024;
-
-    private final long readEntryTimeoutMillis;
-
     private static final double MB = 1024 * 1024;
 
+    private final LongAdder totalAddedEntriesSize = new LongAdder();
+    private final LongAdder totalAddedEntriesCount = new LongAdder();
+
     public RangeEntryCacheImpl(RangeEntryCacheManagerImpl manager, 
ManagedLedgerImpl ml, boolean copyEntries) {
         this.manager = manager;
         this.ml = ml;
         this.pendingReadsManager = new PendingReadsManager(this);
         this.interceptor = ml.getManagedLedgerInterceptor();
-        this.readEntryTimeoutMillis = 
getManagedLedgerConfig().getReadEntryTimeoutSeconds();
         this.entries = new RangeCache<>(EntryImpl::getLength, 
EntryImpl::getTimestamp);
         this.copyEntries = copyEntries;
 
@@ -118,17 +119,18 @@ public class RangeEntryCacheImpl implements EntryCache {
 
     @Override
     public boolean insert(EntryImpl entry) {
+        int entryLength = entry.getLength();
         if (!manager.hasSpaceInCache()) {
             if (log.isDebugEnabled()) {
                 log.debug("[{}] Skipping cache while doing eviction: {} - 
size: {}", ml.getName(), entry.getPosition(),
-                        entry.getLength());
+                        entryLength);
             }
             return false;
         }
 
         if (log.isDebugEnabled()) {
             log.debug("[{}] Adding entry to cache: {} - size: {}", 
ml.getName(), entry.getPosition(),
-                    entry.getLength());
+                    entryLength);
         }
 
         Position position = entry.getPosition();
@@ -150,7 +152,9 @@ public class RangeEntryCacheImpl implements EntryCache {
         EntryImpl cacheEntry = EntryImpl.create(position, cachedData);
         cachedData.release();
         if (entries.put(position, cacheEntry)) {
-            manager.entryAdded(entry.getLength());
+            totalAddedEntriesSize.add(entryLength);
+            totalAddedEntriesCount.increment();
+            manager.entryAdded(entryLength);
             return true;
         } else {
             // entry was not inserted into cache, we need to discard it
@@ -226,7 +230,23 @@ public class RangeEntryCacheImpl implements EntryCache {
     public void asyncReadEntry(ReadHandle lh, Position position, final 
ReadEntryCallback callback,
             final Object ctx) {
         try {
-            asyncReadEntry0(lh, position, callback, ctx);
+            asyncReadEntriesByPosition(lh, position, position, 1,
+                    DEFAULT_CACHE_INDIVIDUAL_READ_ENTRY,
+                    new ReadEntriesCallback() {
+                @Override
+                public void readEntriesComplete(List<Entry> entries, Object 
ctx) {
+                    if (entries.isEmpty()) {
+                        callback.readEntryFailed(new 
ManagedLedgerException("Could not read given position"), ctx);
+                    } else {
+                        callback.readEntryComplete(entries.get(0), ctx);
+                    }
+                }
+
+                @Override
+                public void readEntriesFailed(ManagedLedgerException 
exception, Object ctx) {
+                    callback.readEntryFailed(exception, ctx);
+                }
+            }, ctx, true);
         } catch (Throwable t) {
             log.warn("failed to read entries for {}-{}", lh.getId(), position, 
t);
             // invalidate all entries related to ledger from the cache (it 
might happen if entry gets corrupt
@@ -237,47 +257,6 @@ public class RangeEntryCacheImpl implements EntryCache {
         }
     }
 
-    private void asyncReadEntry0(ReadHandle lh, Position position, final 
ReadEntryCallback callback,
-            final Object ctx) {
-        if (log.isDebugEnabled()) {
-            log.debug("[{}] Reading entry ledger {}: {}", ml.getName(), 
lh.getId(), position.getEntryId());
-        }
-        EntryImpl entry = entries.get(position);
-        if (entry != null) {
-            EntryImpl cachedEntry = EntryImpl.create(entry);
-            entry.release();
-            manager.mlFactoryMBean.recordCacheHit(cachedEntry.getLength());
-            callback.readEntryComplete(cachedEntry, ctx);
-        } else {
-            ReadEntryUtils.readAsync(ml, lh, position.getEntryId(), 
position.getEntryId()).thenAcceptAsync(
-                    ledgerEntries -> {
-                        try {
-                            Iterator<LedgerEntry> iterator = 
ledgerEntries.iterator();
-                            if (iterator.hasNext()) {
-                                LedgerEntry ledgerEntry = iterator.next();
-                                EntryImpl returnEntry = 
RangeEntryCacheManagerImpl.create(ledgerEntry, interceptor);
-
-                                
ml.getMbean().recordReadEntriesOpsCacheMisses(1, returnEntry.getLength());
-                                manager.mlFactoryMBean.recordCacheMiss(1, 
returnEntry.getLength());
-                                ml.getMbean().addReadEntriesSample(1, 
returnEntry.getLength());
-                                callback.readEntryComplete(returnEntry, ctx);
-                            } else {
-                                // got an empty sequence
-                                callback.readEntryFailed(new 
ManagedLedgerException("Could not read given position"),
-                                                         ctx);
-                            }
-                        } finally {
-                            ledgerEntries.close();
-                        }
-                    }, ml.getExecutor()).exceptionally(exception -> {
-                        ml.invalidateLedgerHandle(lh);
-                        pendingReadsManager.invalidateLedger(lh.getId());
-                        
callback.readEntryFailed(createManagedLedgerException(exception), ctx);
-                        return null;
-            });
-        }
-    }
-
     @Override
     public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, 
boolean shouldCacheEntry,
             final ReadEntriesCallback callback, Object ctx) {
@@ -295,38 +274,123 @@ public class RangeEntryCacheImpl implements EntryCache {
 
     @SuppressWarnings({ "unchecked", "rawtypes" })
     void asyncReadEntry0(ReadHandle lh, long firstEntry, long lastEntry, 
boolean shouldCacheEntry,
-            final ReadEntriesCallback callback, Object ctx, boolean 
withLimits) {
-        asyncReadEntry0WithLimits(lh, firstEntry, lastEntry, shouldCacheEntry, 
callback, ctx, null, withLimits);
+            final ReadEntriesCallback callback, Object ctx, boolean 
acquirePermits) {
+        final long ledgerId = lh.getId();
+        final int numberOfEntries = (int) (lastEntry - firstEntry) + 1;
+        final Position firstPosition = PositionFactory.create(ledgerId, 
firstEntry);
+        final Position lastPosition = PositionFactory.create(ledgerId, 
lastEntry);
+        asyncReadEntriesByPosition(lh, firstPosition, lastPosition, 
numberOfEntries, shouldCacheEntry, callback, ctx,
+                acquirePermits);
     }
 
-    void asyncReadEntry0WithLimits(ReadHandle lh, long firstEntry, long 
lastEntry, boolean shouldCacheEntry,
-        final ReadEntriesCallback originalCallback, Object ctx, 
InflightReadsLimiter.Handle handle,
-                                   boolean withLimits) {
-        AsyncCallbacks.ReadEntriesCallback callback;
-        if (withLimits) {
-            callback = handlePendingReadsLimits(lh, firstEntry, lastEntry, 
shouldCacheEntry, originalCallback, ctx,
-                    handle);
+    void asyncReadEntriesByPosition(ReadHandle lh, Position firstPosition, 
Position lastPosition, int numberOfEntries,
+                                    boolean shouldCacheEntry, final 
ReadEntriesCallback originalCallback,
+                                    Object ctx, boolean acquirePermits) {
+        checkArgument(firstPosition.getLedgerId() == 
lastPosition.getLedgerId(),
+                "Invalid range. Entries %s and %s should be in the same 
ledger.",
+                firstPosition, lastPosition);
+        checkArgument(firstPosition.getLedgerId() == lh.getId(),
+                "Invalid ReadHandle. The ledger %s of the range positions 
should match the handle's ledger %s.",
+                firstPosition.getLedgerId(), lh.getId());
+
+        if (log.isDebugEnabled()) {
+            log.debug("[{}] Reading {} entries in range {} to {}", 
ml.getName(), numberOfEntries, firstPosition,
+                    lastPosition);
+        }
+
+        InflightReadsLimiter pendingReadsLimiter = getPendingReadsLimiter();
+        if (!acquirePermits || pendingReadsLimiter.isDisabled()) {
+            doAsyncReadEntriesByPosition(lh, firstPosition, lastPosition, 
numberOfEntries, shouldCacheEntry,
+                    originalCallback, ctx);
         } else {
-            callback = originalCallback;
+            long estimatedEntrySize = getEstimatedEntrySize();
+            long estimatedReadSize = numberOfEntries * estimatedEntrySize;
+            if (log.isDebugEnabled()) {
+                log.debug("Estimated read size: {} bytes for {} entries with 
{} estimated entry size",
+                        estimatedReadSize,
+                        numberOfEntries, estimatedEntrySize);
+            }
+            Optional<InflightReadsLimiter.Handle> optionalHandle =
+                    pendingReadsLimiter.acquire(estimatedReadSize, handle -> {
+                        // permits were not immediately available, callback 
will be executed when permits are acquired
+                        // or timeout
+                        ml.getExecutor().execute(() -> {
+                            doAsyncReadEntriesWithAcquiredPermits(lh, 
firstPosition, lastPosition, numberOfEntries,
+                                    shouldCacheEntry, originalCallback, ctx, 
handle, estimatedReadSize);
+                        });
+                    });
+            // permits were immediately available and acquired
+            if (optionalHandle.isPresent()) {
+                doAsyncReadEntriesWithAcquiredPermits(lh, firstPosition, 
lastPosition, numberOfEntries,
+                        shouldCacheEntry, originalCallback, ctx, 
optionalHandle.get(), estimatedReadSize);
+            }
         }
-        if (callback == null) {
+    }
+
+    void doAsyncReadEntriesWithAcquiredPermits(ReadHandle lh, Position 
firstPosition, Position lastPosition,
+                                               int numberOfEntries, boolean 
shouldCacheEntry,
+                                               final ReadEntriesCallback 
originalCallback, Object ctx,
+                                               InflightReadsLimiter.Handle 
handle, long estimatedReadSize) {
+        if (!handle.success()) {
+            String message = String.format(
+                    "Couldn't acquire enough permits on the max reads in 
flight limiter to read from ledger "
+                            + "%d, %s, estimated read size %d bytes for %d 
entries (check "
+                            + "managedLedgerMaxReadsInFlightSizeInMB, "
+                            + 
"managedLedgerMaxReadsInFlightPermitsAcquireTimeoutMillis and "
+                            + 
"managedLedgerMaxReadsInFlightPermitsAcquireQueueSize)", lh.getId(), getName(),
+                    estimatedReadSize, numberOfEntries);
+            log.error(message);
+            originalCallback.readEntriesFailed(new 
ManagedLedgerException.TooManyRequestsException(message), ctx);
             return;
         }
+        InflightReadsLimiter pendingReadsLimiter = getPendingReadsLimiter();
+        ReadEntriesCallback wrappedCallback = new ReadEntriesCallback() {
+            @Override
+            public void readEntriesComplete(List<Entry> entries, Object ctx2) {
+                if (!entries.isEmpty()) {
+                    // release permits only when entries have been handled
+                    AtomicInteger remainingCount = new 
AtomicInteger(entries.size());
+                    for (Entry entry : entries) {
+                        ((EntryImpl) entry).onDeallocate(() -> {
+                            if (remainingCount.decrementAndGet() <= 0) {
+                                pendingReadsLimiter.release(handle);
+                            }
+                        });
+                    }
+                } else {
+                    pendingReadsLimiter.release(handle);
+                }
+                originalCallback.readEntriesComplete(entries, ctx2);
+            }
 
-        final long ledgerId = lh.getId();
-        final int entriesToRead = (int) (lastEntry - firstEntry) + 1;
-        final Position firstPosition = PositionFactory.create(lh.getId(), 
firstEntry);
-        final Position lastPosition = PositionFactory.create(lh.getId(), 
lastEntry);
+            @Override
+            public void readEntriesFailed(ManagedLedgerException exception, 
Object ctx2) {
+                pendingReadsLimiter.release(handle);
+                originalCallback.readEntriesFailed(exception, ctx2);
+            }
+        };
+        doAsyncReadEntriesByPosition(lh, firstPosition, lastPosition, 
numberOfEntries, shouldCacheEntry,
+                wrappedCallback, ctx);
+    }
 
-        if (log.isDebugEnabled()) {
-            log.debug("[{}] Reading entries range ledger {}: {} to {}", 
ml.getName(), ledgerId, firstEntry, lastEntry);
+    void doAsyncReadEntriesByPosition(ReadHandle lh, Position firstPosition, 
Position lastPosition, int numberOfEntries,
+                                      boolean shouldCacheEntry, final 
ReadEntriesCallback callback,
+                                      Object ctx) {
+        Collection<EntryImpl> cachedEntries;
+        if (firstPosition.compareTo(lastPosition) == 0) {
+            EntryImpl cachedEntry = entries.get(firstPosition);
+            if (cachedEntry == null) {
+                cachedEntries = Collections.emptyList();
+            } else {
+                cachedEntries = Collections.singleton(cachedEntry);
+            }
+        } else {
+            cachedEntries = entries.getRange(firstPosition, lastPosition);
         }
 
-        Collection<EntryImpl> cachedEntries = entries.getRange(firstPosition, 
lastPosition);
-
-        if (cachedEntries.size() == entriesToRead) {
+        if (cachedEntries.size() == numberOfEntries) {
             long totalCachedSize = 0;
-            final List<EntryImpl> entriesToReturn = 
Lists.newArrayListWithExpectedSize(entriesToRead);
+            final List<Entry> entriesToReturn = new 
ArrayList<>(numberOfEntries);
 
             // All entries found in cache
             for (EntryImpl entry : cachedEntries) {
@@ -337,11 +401,11 @@ public class RangeEntryCacheImpl implements EntryCache {
 
             manager.mlFactoryMBean.recordCacheHits(entriesToReturn.size(), 
totalCachedSize);
             if (log.isDebugEnabled()) {
-                log.debug("[{}] Ledger {} -- Found in cache entries: {}-{}", 
ml.getName(), ledgerId, firstEntry,
-                        lastEntry);
+                log.debug("[{}] Cache hit for {} entries in range {} to {}", 
ml.getName(), numberOfEntries,
+                        firstPosition, lastPosition);
             }
 
-            callback.readEntriesComplete((List) entriesToReturn, ctx);
+            callback.readEntriesComplete(entriesToReturn, ctx);
 
         } else {
             if (!cachedEntries.isEmpty()) {
@@ -349,77 +413,24 @@ public class RangeEntryCacheImpl implements EntryCache {
             }
 
             // Read all the entries from bookkeeper
-            pendingReadsManager.readEntries(lh, firstEntry, lastEntry,
+            pendingReadsManager.readEntries(lh, firstPosition.getEntryId(), 
lastPosition.getEntryId(),
                     shouldCacheEntry, callback, ctx);
-
         }
     }
 
-    private AsyncCallbacks.ReadEntriesCallback 
handlePendingReadsLimits(ReadHandle lh,
-                                                                long 
firstEntry, long lastEntry,
-                                                                boolean 
shouldCacheEntry,
-                                                                
AsyncCallbacks.ReadEntriesCallback originalCallback,
-                                                                Object ctx, 
InflightReadsLimiter.Handle handle) {
-        InflightReadsLimiter pendingReadsLimiter = getPendingReadsLimiter();
-        if (pendingReadsLimiter.isDisabled()) {
-            return originalCallback;
+    @VisibleForTesting
+    public long getEstimatedEntrySize() {
+        long estimatedEntrySize = getAvgEntrySize();
+        if (estimatedEntrySize == 0) {
+            estimatedEntrySize = DEFAULT_ESTIMATED_ENTRY_SIZE;
         }
-        long estimatedReadSize = (1 + lastEntry - firstEntry)
-                * (estimatedEntrySize + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
-        final AsyncCallbacks.ReadEntriesCallback callback;
-        InflightReadsLimiter.Handle newHandle = 
pendingReadsLimiter.acquire(estimatedReadSize, handle);
-        if (!newHandle.success) {
-            long now = System.currentTimeMillis();
-            if (now - newHandle.creationTime > readEntryTimeoutMillis) {
-                String message = "Time-out elapsed while acquiring enough 
permits "
-                        + "on the memory limiter to read from ledger "
-                        + lh.getId()
-                        + ", " + getName()
-                        + ", estimated read size " + estimatedReadSize + " 
bytes"
-                        + " for " + (1 + lastEntry - firstEntry)
-                        + " entries (check 
managedLedgerMaxReadsInFlightSizeInMB)";
-                log.error(message);
-                pendingReadsLimiter.release(newHandle);
-                originalCallback.readEntriesFailed(
-                        new 
ManagedLedgerException.TooManyRequestsException(message), ctx);
-                return null;
-            }
-            ml.getExecutor().execute(() -> {
-                asyncReadEntry0WithLimits(lh, firstEntry, lastEntry, 
shouldCacheEntry,
-                        originalCallback, ctx, newHandle, true);
-            });
-            return null;
-        } else {
-            callback = new AsyncCallbacks.ReadEntriesCallback() {
-
-                @Override
-                public void readEntriesComplete(List<Entry> entries, Object 
ctx) {
-                    if (!entries.isEmpty()) {
-                        long size = entries.get(0).getLength();
-                        estimatedEntrySize = size;
-
-                        AtomicInteger remainingCount = new 
AtomicInteger(entries.size());
-                        for (Entry entry : entries) {
-                            ((EntryImpl) entry).onDeallocate(() -> {
-                                if (remainingCount.decrementAndGet() <= 0) {
-                                    pendingReadsLimiter.release(newHandle);
-                                }
-                            });
-                        }
-                    } else {
-                        pendingReadsLimiter.release(newHandle);
-                    }
-                    originalCallback.readEntriesComplete(entries, ctx);
-                }
+        return estimatedEntrySize + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
+    }
 
-                @Override
-                public void readEntriesFailed(ManagedLedgerException 
exception, Object ctx) {
-                    pendingReadsLimiter.release(newHandle);
-                    originalCallback.readEntriesFailed(exception, ctx);
-                }
-            };
-        }
-        return callback;
+    private long getAvgEntrySize() {
+        long totalAddedEntriesCount = this.totalAddedEntriesCount.sum();
+        long totalAddedEntriesSize = this.totalAddedEntriesSize.sum();
+        return totalAddedEntriesCount != 0 ? totalAddedEntriesSize / 
totalAddedEntriesCount : 0;
     }
 
     /**
@@ -442,8 +453,7 @@ public class RangeEntryCacheImpl implements EntryCache {
                             try {
                                 // We got the entries, we need to transform 
them to a List<> type
                                 long totalSize = 0;
-                                final List<EntryImpl> entriesToReturn =
-                                        
Lists.newArrayListWithExpectedSize(entriesToRead);
+                                final List<EntryImpl> entriesToReturn = new 
ArrayList<>(entriesToRead);
                                 for (LedgerEntry e : ledgerEntries) {
                                     EntryImpl entry = 
RangeEntryCacheManagerImpl.create(e, interceptor);
                                     entriesToReturn.add(entry);
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheManagerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheManagerImpl.java
index 34be25df1f4..61d52aa3919 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheManagerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheManagerImpl.java
@@ -28,7 +28,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.bookkeeper.client.api.LedgerEntry;
 import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
 import org.apache.bookkeeper.mledger.impl.EntryImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryMBeanImpl;
@@ -57,12 +59,16 @@ public class RangeEntryCacheManagerImpl implements 
EntryCacheManager {
     private static final double evictionTriggerThresholdPercent = 0.98;
 
 
-    public RangeEntryCacheManagerImpl(ManagedLedgerFactoryImpl factory, 
OpenTelemetry openTelemetry) {
-        this.maxSize = factory.getConfig().getMaxCacheSize();
-        this.inflightReadsLimiter = new InflightReadsLimiter(
-                factory.getConfig().getManagedLedgerMaxReadsInFlightSize(), 
openTelemetry);
+    public RangeEntryCacheManagerImpl(ManagedLedgerFactoryImpl factory, 
OrderedScheduler scheduledExecutor,
+                                      OpenTelemetry openTelemetry) {
+        ManagedLedgerFactoryConfig config = factory.getConfig();
+        this.maxSize = config.getMaxCacheSize();
+        this.inflightReadsLimiter = new 
InflightReadsLimiter(config.getManagedLedgerMaxReadsInFlightSize(),
+                
config.getManagedLedgerMaxReadsInFlightPermitsAcquireQueueSize(),
+                
config.getManagedLedgerMaxReadsInFlightPermitsAcquireTimeoutMillis(),
+                scheduledExecutor, openTelemetry);
         this.evictionTriggerThreshold = (long) (maxSize * 
evictionTriggerThresholdPercent);
-        this.cacheEvictionWatermark = 
factory.getConfig().getCacheEvictionWatermark();
+        this.cacheEvictionWatermark = config.getCacheEvictionWatermark();
         this.evictionPolicy = new EntryCacheDefaultEvictionPolicy();
         this.mlFactory = factory;
         this.mlFactoryMBean = factory.getMbean();
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/InflightReadsLimiterIntegrationTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/InflightReadsLimiterIntegrationTest.java
index b57dea6a5bb..48f0cf08ddf 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/InflightReadsLimiterIntegrationTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/InflightReadsLimiterIntegrationTest.java
@@ -40,7 +40,6 @@ import 
org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl;
 import org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheManagerImpl;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
 import org.awaitility.Awaitility;
-import org.awaitility.reflect.WhiteboxImpl;
 import org.mockito.Mockito;
 import org.mockito.stubbing.Answer;
 import org.testng.Assert;
@@ -142,8 +141,8 @@ public class InflightReadsLimiterIntegrationTest extends 
MockedBookKeeperTestCas
         SimpleReadEntriesCallback cb0 = new SimpleReadEntriesCallback();
         entryCache.asyncReadEntry(spyCurrentLedger, 125, 125, true, cb0, ctx);
         cb0.entries.join();
-        Long sizePerEntry1 = WhiteboxImpl.getInternalState(entryCache, 
"estimatedEntrySize");
-        Assert.assertEquals(sizePerEntry1, 1);
+        Long sizePerEntry1 = entryCache.getEstimatedEntrySize();
+        Assert.assertEquals(sizePerEntry1, 1 + 
RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
         Awaitility.await().untilAsserted(() -> {
             long remainingBytes =limiter.getRemainingBytes();
             Assert.assertEquals(remainingBytes, totalCapacity);
@@ -179,8 +178,8 @@ public class InflightReadsLimiterIntegrationTest extends 
MockedBookKeeperTestCas
         Thread.sleep(3000);
         readCompleteSignal1.countDown();
         cb1.entries.join();
-        Long sizePerEntry2 = WhiteboxImpl.getInternalState(entryCache, 
"estimatedEntrySize");
-        Assert.assertEquals(sizePerEntry2, 1);
+        Long sizePerEntry2 = entryCache.getEstimatedEntrySize();
+        Assert.assertEquals(sizePerEntry2, 1 + 
RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
         long bytesAcquired2 = calculateBytesSizeBeforeFirstReading(readCount2, 
1);
         long remainingBytesExpected2 = totalCapacity - bytesAcquired2;
         log.info("acquired : {}", bytesAcquired2);
@@ -192,8 +191,8 @@ public class InflightReadsLimiterIntegrationTest extends 
MockedBookKeeperTestCas
 
         readCompleteSignal2.countDown();
         cb2.entries.join();
-        Long sizePerEntry3 = WhiteboxImpl.getInternalState(entryCache, 
"estimatedEntrySize");
-        Assert.assertEquals(sizePerEntry3, 1);
+        Long sizePerEntry3 = entryCache.getEstimatedEntrySize();
+        Assert.assertEquals(sizePerEntry3, 1 + 
RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
         Awaitility.await().untilAsserted(() -> {
             long remainingBytes = limiter.getRemainingBytes();
             log.info("remainingBytes 2: {}", remainingBytes);
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 04f8eecbe9a..5ec453a6d4e 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -94,6 +94,7 @@ import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.client.PulsarMockBookKeeper;
 import org.apache.bookkeeper.client.PulsarMockLedgerHandle;
+import org.apache.bookkeeper.client.PulsarMockReadHandleInterceptor;
 import org.apache.bookkeeper.client.api.LedgerEntries;
 import org.apache.bookkeeper.client.api.LedgerMetadata;
 import org.apache.bookkeeper.client.api.ReadHandle;
@@ -3133,17 +3134,26 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
         ManagedLedgerConfig config = new 
ManagedLedgerConfig().setReadEntryTimeoutSeconds(1);
         ManagedLedgerImpl ledger = (ManagedLedgerImpl) 
factory.open("timeout_ledger_test", config);
 
-        BookKeeper bk = mock(BookKeeper.class);
-        doNothing().when(bk).asyncCreateLedger(anyInt(), anyInt(), anyInt(), 
any(), any(), any(), any(), any());
+        Position position = ledger.addEntry("entry-1".getBytes());
+
+        // ensure that the read isn't cached
+        factory.getEntryCacheManager().clear();
+
+        bkc.setReadHandleInterceptor(new PulsarMockReadHandleInterceptor() {
+            @Override
+            public CompletableFuture<LedgerEntries> interceptReadAsync(long 
ledgerId, long firstEntry, long lastEntry,
+                                                                       
LedgerEntries entries) {
+                return CompletableFuture.supplyAsync(() -> {
+                    return entries;
+                }, CompletableFuture.delayedExecutor(3, TimeUnit.SECONDS));
+            }
+        });
+
         AtomicReference<ManagedLedgerException> responseException1 = new 
AtomicReference<>();
         String ctxStr = "timeoutCtx";
-        CompletableFuture<LedgerEntries> entriesFuture = new 
CompletableFuture<>();
-        ReadHandle ledgerHandle = mock(ReadHandle.class);
-        
doReturn(entriesFuture).when(ledgerHandle).readAsync(PositionFactory.EARLIEST.getLedgerId(),
-                PositionFactory.EARLIEST.getEntryId());
 
         // (1) test read-timeout for: ManagedLedger.asyncReadEntry(..)
-        ledger.asyncReadEntry(ledgerHandle, PositionFactory.EARLIEST, new 
ReadEntryCallback() {
+        ledger.asyncReadEntry(position, new ReadEntryCallback() {
             @Override
             public void readEntryComplete(Entry entry, Object ctx) {
                 responseException1.set(null);
@@ -3155,18 +3165,20 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
                 responseException1.set(exception);
             }
         }, ctxStr);
-        ledger.asyncCreateLedger(bk, config, null, (rc, lh, ctx) -> {}, 
Collections.emptyMap());
-        retryStrategically((test) -> responseException1.get() != null, 5, 
1000);
-        assertNotNull(responseException1.get());
-        assertTrue(responseException1.get().getMessage()
-                
.startsWith(BKException.getMessage(BKException.Code.TimeoutException)));
 
-        // (2) test read-timeout for: ManagedLedger.asyncReadEntry(..)
-        AtomicReference<ManagedLedgerException> responseException2 = new 
AtomicReference<>();
-        Position readPositionRef = PositionFactory.EARLIEST;
-        ManagedCursorImpl cursor = new ManagedCursorImpl(bk, ledger, 
"cursor1");
-        OpReadEntry opReadEntry = OpReadEntry.create(cursor, readPositionRef, 
1, new ReadEntriesCallback() {
+        Awaitility.await().untilAsserted(() -> {
+            assertNotNull(responseException1.get());
+            assertTrue(responseException1.get().getMessage()
+                    
.startsWith(BKException.getMessage(BKException.Code.TimeoutException)));
+        });
 
+        // ensure that the read isn't cached
+        factory.getEntryCacheManager().clear();
+
+        // (2) test read-timeout for: ManagedCursor.asyncReadEntries(..)
+        AtomicReference<ManagedLedgerException> responseException2 = new 
AtomicReference<>();
+        ManagedCursor cursor = ledger.openCursor("cursor1", 
InitialPosition.Earliest);
+        cursor.asyncReadEntries(1, new ReadEntriesCallback() {
             @Override
             public void readEntriesComplete(List<Entry> entries, Object ctx) {
             }
@@ -3176,16 +3188,13 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
                 assertEquals(ctxStr, (String) ctx);
                 responseException2.set(exception);
             }
+        }, ctxStr, PositionFactory.LATEST);
 
-        }, null, PositionFactory.LATEST, null);
-        ledger.asyncReadEntry(ledgerHandle, 
PositionFactory.EARLIEST.getEntryId(), PositionFactory.EARLIEST.getEntryId(),
-                opReadEntry, ctxStr);
-        retryStrategically((test) -> {
-            return responseException2.get() != null;
-        }, 5, 1000);
-        assertNotNull(responseException2.get());
-        assertTrue(responseException2.get().getMessage()
-                
.startsWith(BKException.getMessage(BKException.Code.TimeoutException)));
+        Awaitility.await().untilAsserted(() -> {
+            assertNotNull(responseException2.get());
+            assertTrue(responseException2.get().getMessage()
+                    
.startsWith(BKException.getMessage(BKException.Code.TimeoutException)));
+        });
 
         ledger.close();
     }
@@ -3723,6 +3732,10 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
         for (int i = 0; i < entries; i++) {
             ledger.addEntry(String.valueOf(i).getBytes(Encoding));
         }
+
+        // clear the cache to avoid flakiness
+        factory.getEntryCacheManager().clear();
+
         List<Entry> entryList = cursor.readEntries(3);
         assertEquals(entryList.size(), 3);
         Awaitility.await().untilAsserted(() -> {
@@ -3791,10 +3804,16 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
         for (int i = 0; i < entries; i++) {
             ledger.addEntry(String.valueOf(i).getBytes(Encoding));
         }
-        List<Entry> entryList = cursor.readEntries(3);
-        assertEquals(entryList.size(), 3);
-        assertEquals(ledger.ledgers.size(), 4);
-        assertEquals(ledger.ledgerCache.size(), 3);
+
+        // clear the cache to avoid flakiness
+        factory.getEntryCacheManager().clear();
+
+        final List<Entry> entryList = cursor.readEntries(3);
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(entryList.size(), 3);
+            assertEquals(ledger.ledgers.size(), 4);
+            assertEquals(ledger.ledgerCache.size(), 3);
+        });
         cursor.clearBacklog();
         cursor2.clearBacklog();
         ledger.trimConsumedLedgersInBackground(Futures.NULL_PROMISE);
@@ -3803,11 +3822,17 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
             assertEquals(ledger.ledgerCache.size(), 0);
         });
 
+        // clear the cache to avoid flakiness
+        factory.getEntryCacheManager().clear();
+
         // Verify the ReadHandle can be reopened.
         ManagedCursor cursor3 = ledger.openCursor("test-cursor3", 
InitialPosition.Earliest);
-        entryList = cursor3.readEntries(3);
-        assertEquals(entryList.size(), 3);
-        assertEquals(ledger.ledgerCache.size(), 3);
+        final List<Entry> entryList2 = cursor3.readEntries(3);
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(entryList2.size(), 3);
+            assertEquals(ledger.ledgerCache.size(), 3);
+        });
+
         cursor3.clearBacklog();
         ledger.trimConsumedLedgersInBackground(Futures.NULL_PROMISE);
         Awaitility.await().untilAsserted(() -> {
@@ -3815,7 +3840,6 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
             assertEquals(ledger.ledgerCache.size(), 0);
         });
 
-
         cursor.close();
         cursor2.close();
         cursor3.close();
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiterTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiterTest.java
index 68135598e33..7475b620f57 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiterTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiterTest.java
@@ -21,40 +21,51 @@ package org.apache.bookkeeper.mledger.impl.cache;
 import static 
io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat;
 import static 
org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.InflightReadLimiterUtilization.FREE;
 import static 
org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.InflightReadLimiterUtilization.USED;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
 import io.opentelemetry.api.OpenTelemetry;
 import io.opentelemetry.sdk.OpenTelemetrySdk;
 import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk;
 import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
 import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.tuple.Pair;
+import org.assertj.core.api.Assertions;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 @Slf4j
 public class InflightReadsLimiterTest {
+    private static final int ACQUIRE_QUEUE_SIZE = 1000;
+    private static final int ACQUIRE_TIMEOUT_MILLIS = 500;
 
     @DataProvider
     private static Object[][] isDisabled() {
-        return new Object[][] {
-            {0, true},
-            {-1, true},
-            {1, false},
+        return new Object[][]{
+                {0, true},
+                {-1, true},
+                {1, false},
         };
     }
 
+    @DataProvider
+    private static Object[] booleanValues() {
+        return new Object[]{ true, false };
+    }
+
     @Test(dataProvider = "isDisabled")
     public void testDisabled(long maxReadsInFlightSize, boolean 
shouldBeDisabled) throws Exception {
         var otel = buildOpenTelemetryAndReader();
         @Cleanup var openTelemetry = otel.getLeft();
         @Cleanup var metricReader = otel.getRight();
 
-        var limiter = new InflightReadsLimiter(maxReadsInFlightSize, 
openTelemetry);
-        assertEquals(limiter.isDisabled(), shouldBeDisabled);
+        var limiter = new InflightReadsLimiter(maxReadsInFlightSize, 
ACQUIRE_QUEUE_SIZE, ACQUIRE_TIMEOUT_MILLIS,
+                mock(ScheduledExecutorService.class), openTelemetry);
+        assertThat(limiter.isDisabled()).isEqualTo(shouldBeDisabled);
 
         if (shouldBeDisabled) {
             // Verify metrics are not present
@@ -72,136 +83,459 @@ public class InflightReadsLimiterTest {
         @Cleanup var openTelemetry = otel.getLeft();
         @Cleanup var metricReader = otel.getRight();
 
-        InflightReadsLimiter limiter = new InflightReadsLimiter(100, 
openTelemetry);
-        assertEquals(100, limiter.getRemainingBytes());
+        InflightReadsLimiter limiter =
+                new InflightReadsLimiter(100, ACQUIRE_QUEUE_SIZE, 
ACQUIRE_TIMEOUT_MILLIS,
+                        mock(ScheduledExecutorService.class), openTelemetry);
+        assertThat(limiter.getRemainingBytes()).isEqualTo(100);
         assertLimiterMetrics(metricReader, 100, 0, 100);
 
-        InflightReadsLimiter.Handle handle = limiter.acquire(100, null);
-        assertEquals(0, limiter.getRemainingBytes());
-        assertTrue(handle.success);
-        assertEquals(handle.acquiredPermits, 100);
-        assertEquals(1, handle.trials);
+        Optional<InflightReadsLimiter.Handle> optionalHandle = 
limiter.acquire(100, null);
+        assertThat(limiter.getRemainingBytes()).isZero();
+        assertThat(optionalHandle).isPresent();
+        InflightReadsLimiter.Handle handle = optionalHandle.get();
+        assertThat(handle.success()).isTrue();
+        assertThat(handle.permits()).isEqualTo(100);
         assertLimiterMetrics(metricReader, 100, 100, 0);
 
         limiter.release(handle);
-        assertEquals(100, limiter.getRemainingBytes());
+        assertThat(limiter.getRemainingBytes()).isEqualTo(100);
         assertLimiterMetrics(metricReader, 100, 0, 100);
     }
 
-
     @Test
     public void testNotEnoughPermits() throws Exception {
-        InflightReadsLimiter limiter = new InflightReadsLimiter(100, 
OpenTelemetry.noop());
-        assertEquals(100, limiter.getRemainingBytes());
-        InflightReadsLimiter.Handle handle = limiter.acquire(100, null);
-        assertEquals(0, limiter.getRemainingBytes());
-        assertTrue(handle.success);
-        assertEquals(handle.acquiredPermits, 100);
-        assertEquals(1, handle.trials);
-
-        InflightReadsLimiter.Handle handle2 = limiter.acquire(100, null);
-        assertEquals(0, limiter.getRemainingBytes());
-        assertFalse(handle2.success);
-        assertEquals(handle2.acquiredPermits, 0);
-        assertEquals(1, handle2.trials);
+        InflightReadsLimiter limiter =
+                new InflightReadsLimiter(100, ACQUIRE_QUEUE_SIZE, 
ACQUIRE_TIMEOUT_MILLIS,
+                        mock(ScheduledExecutorService.class), 
OpenTelemetry.noop());
+        assertThat(limiter.getRemainingBytes()).isEqualTo(100);
+        Optional<InflightReadsLimiter.Handle> optionalHandle = 
limiter.acquire(100, null);
+        assertThat(limiter.getRemainingBytes()).isZero();
+        assertThat(optionalHandle).isPresent();
+        InflightReadsLimiter.Handle handle = optionalHandle.get();
+        assertThat(handle.success()).isTrue();
+        assertThat(handle.permits()).isEqualTo(100);
+
+        AtomicReference<InflightReadsLimiter.Handle> handle2Reference = new 
AtomicReference<>();
+        Optional<InflightReadsLimiter.Handle> optionalHandle2 = 
limiter.acquire(100, handle2Reference::set);
+        assertThat(limiter.getRemainingBytes()).isZero();
+        assertThat(optionalHandle2).isNotPresent();
 
         limiter.release(handle);
-        assertEquals(100, limiter.getRemainingBytes());
-
-        handle2 = limiter.acquire(100, handle2);
-        assertEquals(0, limiter.getRemainingBytes());
-        assertTrue(handle2.success);
-        assertEquals(handle2.acquiredPermits, 100);
-        assertEquals(2, handle2.trials);
-
-        limiter.release(handle2);
-        assertEquals(100, limiter.getRemainingBytes());
+        assertThat(handle2Reference)
+                .hasValueSatisfying(h ->
+                        assertThat(h.success()).isTrue());
 
+        limiter.release(handle2Reference.get());
+        assertThat(limiter.getRemainingBytes()).isEqualTo(100);
     }
 
     @Test
-    public void testPartialAcquire() throws Exception {
-        InflightReadsLimiter limiter = new InflightReadsLimiter(100, 
OpenTelemetry.noop());
-        assertEquals(100, limiter.getRemainingBytes());
-
-        InflightReadsLimiter.Handle handle = limiter.acquire(30, null);
-        assertEquals(70, limiter.getRemainingBytes());
-        assertTrue(handle.success);
-        assertEquals(handle.acquiredPermits, 30);
-        assertEquals(1, handle.trials);
-
-        InflightReadsLimiter.Handle handle2 = limiter.acquire(100, null);
-        assertEquals(0, limiter.getRemainingBytes());
-        assertFalse(handle2.success);
-        assertEquals(handle2.acquiredPermits, 70);
-        assertEquals(1, handle2.trials);
-
-        limiter.release(handle);
+    public void testAcquireTimeout() throws Exception {
+        @Cleanup("shutdownNow")
+        ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor();
+        InflightReadsLimiter limiter =
+                new InflightReadsLimiter(100, ACQUIRE_QUEUE_SIZE, 
ACQUIRE_TIMEOUT_MILLIS,
+                        executor, OpenTelemetry.noop());
+        assertThat(limiter.getRemainingBytes()).isEqualTo(100);
+        limiter.acquire(100, null);
+
+        AtomicReference<InflightReadsLimiter.Handle> handle2Reference = new 
AtomicReference<>();
+        Optional<InflightReadsLimiter.Handle> optionalHandle2 = 
limiter.acquire(100, handle2Reference::set);
+        assertThat(optionalHandle2).isNotPresent();
+
+        Thread.sleep(ACQUIRE_TIMEOUT_MILLIS + 100);
+
+        assertThat(handle2Reference).hasValueSatisfying(h -> 
assertThat(h.success()).isFalse());
+    }
 
-        handle2 = limiter.acquire(100, handle2);
-        assertEquals(0, limiter.getRemainingBytes());
-        assertTrue(handle2.success);
-        assertEquals(handle2.acquiredPermits, 100);
-        assertEquals(2, handle2.trials);
+    @Test
+    public void testMultipleQueuedEntriesWithExceptionInFirstCallback() throws 
Exception {
+        @Cleanup("shutdownNow")
+        ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor();
+        InflightReadsLimiter limiter =
+                new InflightReadsLimiter(100, ACQUIRE_QUEUE_SIZE, 
ACQUIRE_TIMEOUT_MILLIS,
+                        executor, OpenTelemetry.noop());
+        assertThat(limiter.getRemainingBytes())
+                .as("Initial remaining bytes should be 100")
+                .isEqualTo(100);
+
+        // Acquire the initial permits
+        Optional<InflightReadsLimiter.Handle> handle1 = limiter.acquire(100, 
null);
+        assertThat(handle1)
+                .as("Initial handle should be present")
+                .isPresent();
+        assertThat(limiter.getRemainingBytes())
+                .as("Remaining bytes should be 0 after acquiring 100 permits")
+                .isEqualTo(0);
+
+        // Queue the first handle with a callback that throws an exception
+        AtomicReference<InflightReadsLimiter.Handle> handle2Reference = new 
AtomicReference<>();
+        Optional<InflightReadsLimiter.Handle> handle2 = limiter.acquire(50, 
handle -> {
+            handle2Reference.set(handle);
+            throw new RuntimeException("Callback exception");
+        });
+        assertThat(handle2)
+                .as("Second handle should not be present")
+                .isNotPresent();
+        assertThat(limiter.getRemainingBytes())
+                .as("Remaining bytes should still be 0 after failed 
acquisition")
+                .isEqualTo(0);
+
+        // Queue the second handle with a successful callback
+        AtomicReference<InflightReadsLimiter.Handle> handle3Reference = new 
AtomicReference<>();
+        Optional<InflightReadsLimiter.Handle> handle3 = limiter.acquire(50, 
handle3Reference::set);
+        assertThat(handle3)
+                .as("Third handle should not be present as queue is full")
+                .isNotPresent();
+        assertThat(limiter.getRemainingBytes())
+                .as("Remaining bytes should still be 0")
+                .isEqualTo(0);
+
+        // Release the initial handle to trigger the queued callbacks
+        limiter.release(handle1.get());
+
+        // Verify the first callback threw an exception but the second 
callback was handled successfully
+        assertThat(handle2Reference)
+                .as("Handle2 should have been set in the callback despite the 
exception")
+                .hasValueSatisfying(handle -> assertThat(handle.success())
+                        .as("Handle2 should be marked as successful")
+                        .isTrue());
+        assertThat(handle3Reference)
+                .as("Handle3 should have been set successfully")
+                .hasValueSatisfying(handle -> assertThat(handle.success())
+                        .as("Handle3 should be marked as successful")
+                        .isTrue());
+        assertThat(limiter.getRemainingBytes())
+                .as("Remaining bytes should still be 0 after first releases 
are acquired")
+                .isEqualTo(0);
+
+        // Release the second handle
+        limiter.release(handle3Reference.get());
+        assertThat(limiter.getRemainingBytes())
+                .as("Remaining bytes should be 50 after releasing handle3")
+                .isEqualTo(50);
+
+        // Release the third handle
+        limiter.release(handle3Reference.get());
+        assertThat(limiter.getRemainingBytes())
+                .as("All bytes should be released, so remaining bytes should 
be 100")
+                .isEqualTo(100);
+    }
 
-        limiter.release(handle2);
-        assertEquals(100, limiter.getRemainingBytes());
+    @Test
+    public void 
testMultipleQueuedEntriesWithTimeoutAndExceptionInFirstCallback() throws 
Exception {
+        @Cleanup("shutdownNow")
+        ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor();
+        InflightReadsLimiter limiter =
+                new InflightReadsLimiter(100, ACQUIRE_QUEUE_SIZE, 
ACQUIRE_TIMEOUT_MILLIS,
+                        executor, OpenTelemetry.noop());
+        assertThat(limiter.getRemainingBytes())
+                .as("Initial remaining bytes should be 100")
+                .isEqualTo(100);
+
+        // Acquire the initial permits
+        Optional<InflightReadsLimiter.Handle> handle1 = limiter.acquire(100, 
null);
+        assertThat(handle1)
+                .as("The first handle should be present after acquiring 100 
permits")
+                .isPresent();
+        assertThat(limiter.getRemainingBytes())
+                .as("Remaining bytes should be 0 after acquiring all permits")
+                .isEqualTo(0);
+
+        // Queue the first handle with a callback that times out and throws an 
exception
+        AtomicReference<InflightReadsLimiter.Handle> handle2Reference = new 
AtomicReference<>();
+        Optional<InflightReadsLimiter.Handle> handle2 = limiter.acquire(50, 
handle -> {
+            handle2Reference.set(handle);
+            throw new RuntimeException("Callback exception on timeout");
+        });
+        assertThat(handle2)
+                .as("The second handle should not be present as the callback 
throws an exception")
+                .isNotPresent();
+        assertThat(limiter.getRemainingBytes())
+                .as("Remaining bytes should still be 0 after failed 
acquisition")
+                .isEqualTo(0);
+
+        // Introduce a delay to differentiate operations between queued entries
+        Thread.sleep(50);
+
+        // Queue the second handle with a successful callback
+        AtomicReference<InflightReadsLimiter.Handle> handle3Reference = new 
AtomicReference<>();
+        Optional<InflightReadsLimiter.Handle> handle3 = limiter.acquire(50, 
handle3Reference::set);
+        assertThat(handle3)
+                .as("The third handle should not be present as permits are 
still unavailable")
+                .isNotPresent();
+        assertThat(limiter.getRemainingBytes())
+                .as("Remaining bytes should still be 0 after failed 
acquisition attempt")
+                .isEqualTo(0);
+
+        // Wait for the timeout to occur
+        Thread.sleep(ACQUIRE_TIMEOUT_MILLIS + 100);
+
+        // Verify the first callback timed out and threw an exception, and the 
second callback was handled
+        assertThat(handle2Reference)
+                .as("Handle2 should have been set in the callback despite the 
exception")
+                .hasValueSatisfying(handle -> assertThat(handle.success())
+                        .as("Handle2 should be marked as unsuccessful due to a 
timeout")
+                        .isFalse());
+        assertThat(handle3Reference)
+                .as("Handle3 should have been set in the callback after the 
permits became available")
+                .hasValueSatisfying(handle -> 
Assertions.assertThat(handle.success())
+                        .as("Handle3 should be marked as unsuccessful due to a 
timeout")
+                        .isFalse());
+        assertThat(limiter.getRemainingBytes())
+                .as("Remaining bytes should still be 0 as no permits were 
released")
+                .isEqualTo(0);
+
+        // Release the first handle
+        limiter.release(handle1.get());
+        assertThat(limiter.getRemainingBytes())
+                .as("Remaining bytes should be fully restored to 100 after 
releasing all permits")
+                .isEqualTo(100);
+    }
 
+    @Test
+    public void 
testMultipleQueuedEntriesWithTimeoutsThatAreTimedOutWhenPermitsAreAvailable() 
throws Exception {
+        // Use a mock executor to simulate scenarios where timed out queued 
handles are processed when permits become
+        // available
+        ScheduledExecutorService executor = 
mock(ScheduledExecutorService.class);
+        InflightReadsLimiter limiter =
+                new InflightReadsLimiter(100, ACQUIRE_QUEUE_SIZE, 
ACQUIRE_TIMEOUT_MILLIS,
+                        executor, OpenTelemetry.noop());
+        assertThat(limiter.getRemainingBytes())
+                .as("Initial remaining bytes should be 100")
+                .isEqualTo(100);
+
+        // Acquire the initial permits
+        Optional<InflightReadsLimiter.Handle> handle1 = limiter.acquire(100, 
null);
+        assertThat(handle1)
+                .as("The first handle should be present after acquiring 100 
permits")
+                .isPresent();
+        assertThat(limiter.getRemainingBytes())
+                .as("Remaining bytes should be 0 after acquiring all permits")
+                .isEqualTo(0);
+
+        // Queue the first handle
+        AtomicReference<InflightReadsLimiter.Handle> handle2Reference = new 
AtomicReference<>();
+        Optional<InflightReadsLimiter.Handle> handle2 = limiter.acquire(50, 
handle2Reference::set);
+        assertThat(handle2)
+                .as("The second handle should not be present as permits are 
unavailable")
+                .isNotPresent();
+        assertThat(limiter.getRemainingBytes())
+                .as("Remaining bytes should still be 0 after failed 
acquisition attempt for handle2")
+                .isEqualTo(0);
+
+        // Queue the second handle
+        AtomicReference<InflightReadsLimiter.Handle> handle3Reference = new 
AtomicReference<>();
+        Optional<InflightReadsLimiter.Handle> handle3 = limiter.acquire(50, 
handle3Reference::set);
+        assertThat(handle3)
+                .as("The third handle should not be present as permits are 
unavailable")
+                .isNotPresent();
+        assertThat(limiter.getRemainingBytes())
+                .as("Remaining bytes should still be 0 after failed 
acquisition attempt for handle3")
+                .isEqualTo(0);
+
+        // Wait for the timeout to occur
+        Thread.sleep(ACQUIRE_TIMEOUT_MILLIS + 100);
+
+        // Queue another handle
+        AtomicReference<InflightReadsLimiter.Handle> handle4Reference = new 
AtomicReference<>();
+        Optional<InflightReadsLimiter.Handle> handle4 = limiter.acquire(50, 
handle4Reference::set);
+        assertThat(handle4)
+                .as("The fourth handle should not be present because permits 
are unavailable")
+                .isNotPresent();
+        assertThat(limiter.getRemainingBytes())
+                .as("Remaining bytes should still be 0 after failed 
acquisition attempt for handle4")
+                .isEqualTo(0);
+
+        // Queue another handle
+        AtomicReference<InflightReadsLimiter.Handle> handle5Reference = new 
AtomicReference<>();
+        Optional<InflightReadsLimiter.Handle> handle5 = limiter.acquire(100, 
handle5Reference::set);
+        assertThat(handle5)
+                .as("The fifth handle should not be present as permits are 
unavailable")
+                .isNotPresent();
+        assertThat(limiter.getRemainingBytes())
+                .as("Remaining bytes should still be 0 after failed 
acquisition attempt for handle5")
+                .isEqualTo(0);
+
+        // Release the first handle
+        limiter.release(handle1.get());
+
+        assertThat(handle2Reference)
+                .as("Handle2 should have been set in the callback and marked 
unsuccessful")
+                .hasValueSatisfying(handle -> 
assertThat(handle.success()).isFalse());
+
+        assertThat(handle3Reference)
+                .as("Handle3 should have been set in the callback and marked 
unsuccessful")
+                .hasValueSatisfying(handle -> 
assertThat(handle.success()).isFalse());
+
+        assertThat(handle4Reference)
+                .as("Handle4 should have been set in the callback and marked 
successful")
+                .hasValueSatisfying(handle -> 
assertThat(handle.success()).isTrue());
+
+        assertThat(limiter.getRemainingBytes())
+                .as("Remaining bytes should be 50 after releasing handle4")
+                .isEqualTo(50);
+
+        limiter.release(handle4Reference.get());
+
+        assertThat(handle5Reference)
+                .as("Handle5 should have been set in the callback and marked 
successful")
+                .hasValueSatisfying(handle -> 
assertThat(handle.success()).isTrue());
+
+        limiter.release(handle5Reference.get());
+
+        assertThat(limiter.getRemainingBytes())
+                .as("All bytes should be released, so remaining bytes should 
be back to 100")
+                .isEqualTo(100);
     }
 
     @Test
-    public void testTooManyTrials() throws Exception {
-        InflightReadsLimiter limiter = new InflightReadsLimiter(100, 
OpenTelemetry.noop());
-        assertEquals(100, limiter.getRemainingBytes());
-
-        InflightReadsLimiter.Handle handle = limiter.acquire(30, null);
-        assertEquals(70, limiter.getRemainingBytes());
-        assertTrue(handle.success);
-        assertEquals(handle.acquiredPermits, 30);
-        assertEquals(1, handle.trials);
-
-        InflightReadsLimiter.Handle handle2 = limiter.acquire(100, null);
-        assertEquals(0, limiter.getRemainingBytes());
-        assertFalse(handle2.success);
-        assertEquals(handle2.acquiredPermits, 70);
-        assertEquals(1, handle2.trials);
-
-        handle2 = limiter.acquire(100, handle2);
-        assertEquals(0, limiter.getRemainingBytes());
-        assertFalse(handle2.success);
-        assertEquals(handle2.acquiredPermits, 70);
-        assertEquals(2, handle2.trials);
-
-        handle2 = limiter.acquire(100, handle2);
-        assertEquals(0, limiter.getRemainingBytes());
-        assertFalse(handle2.success);
-        assertEquals(handle2.acquiredPermits, 70);
-        assertEquals(3, handle2.trials);
-
-        handle2 = limiter.acquire(100, handle2);
-        assertEquals(0, limiter.getRemainingBytes());
-        assertFalse(handle2.success);
-        assertEquals(handle2.acquiredPermits, 70);
-        assertEquals(4, handle2.trials);
-
-        // too many trials, start from scratch
-        handle2 = limiter.acquire(100, handle2);
-        assertEquals(70, limiter.getRemainingBytes());
-        assertFalse(handle2.success);
-        assertEquals(handle2.acquiredPermits, 0);
-        assertEquals(1, handle2.trials);
+    public void testQueueSizeLimitReached() throws Exception {
+        @Cleanup("shutdownNow")
+        ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor();
+
+        // Minimum queue size is 4.
+        final int queueSizeLimit = 4;
+        InflightReadsLimiter limiter =
+                new InflightReadsLimiter(100, queueSizeLimit, 
ACQUIRE_TIMEOUT_MILLIS, executor, OpenTelemetry.noop());
+
+        assertThat(limiter.getRemainingBytes())
+                .as("Initial remaining bytes should be 100")
+                .isEqualTo(100);
+
+        // Acquire all available permits (consume 100 bytes)
+        Optional<InflightReadsLimiter.Handle> handle1 = limiter.acquire(100, 
null);
+        assertThat(handle1)
+                .as("The first handle should be present after acquiring all 
available permits")
+                .isPresent()
+                .hasValueSatisfying(handle -> 
assertThat(handle.success()).isTrue());
+        assertThat(limiter.getRemainingBytes())
+                .as("Remaining bytes should be zero after acquiring all 
permits")
+                .isEqualTo(0);
+
+        // Queue up to the limit (4 requests)
+        AtomicReference<InflightReadsLimiter.Handle> handle2Reference = new 
AtomicReference<>();
+        assertThat(limiter.acquire(50, handle2Reference::set)).isNotPresent();
+
+        AtomicReference<InflightReadsLimiter.Handle> handle3Reference = new 
AtomicReference<>();
+        assertThat(limiter.acquire(50, handle3Reference::set)).isNotPresent();
+
+        AtomicReference<InflightReadsLimiter.Handle> handle4Reference = new 
AtomicReference<>();
+        assertThat(limiter.acquire(50, handle4Reference::set)).isNotPresent();
+
+        AtomicReference<InflightReadsLimiter.Handle> handle5Reference = new 
AtomicReference<>();
+        assertThat(limiter.acquire(50, handle5Reference::set)).isNotPresent();
+
+        // Attempt to add one more request, which should fail as the queue is 
full
+        Optional<InflightReadsLimiter.Handle> handle6 = limiter.acquire(50, 
null);
+        assertThat(handle6)
+                .as("The sixth handle should not be successfull since the 
queue is full")
+                .hasValueSatisfying(handle -> 
assertThat(handle.success()).isFalse());
+    }
 
-        limiter.release(handle);
+    @Test(dataProvider = "booleanValues")
+    public void testAcquireExceedingMaxReadsInFlightSize(boolean firstInQueue) 
throws Exception {
+        @Cleanup("shutdownNow")
+        ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor();
+
+        long maxReadsInFlightSize = 100;
+        InflightReadsLimiter limiter =
+                new InflightReadsLimiter(maxReadsInFlightSize, 
ACQUIRE_QUEUE_SIZE, ACQUIRE_TIMEOUT_MILLIS, executor,
+                        OpenTelemetry.noop());
+
+        // Initial state
+        assertThat(limiter.getRemainingBytes())
+                .as("Initial remaining bytes should match 
maxReadsInFlightSize")
+                .isEqualTo(maxReadsInFlightSize);
+
+        // Acquire all permits (consume 100 bytes)
+        Optional<InflightReadsLimiter.Handle> handle1 = limiter.acquire(100, 
null);
+        assertThat(handle1)
+                .as("The first handle should be present")
+                .isPresent();
+        assertThat(limiter.getRemainingBytes())
+                .as("Remaining bytes should be zero after acquiring all 
permits")
+                .isEqualTo(0);
+
+
+        AtomicReference<InflightReadsLimiter.Handle> handle2Reference = new 
AtomicReference<>();
+
+        if (!firstInQueue) {
+            Optional<InflightReadsLimiter.Handle> handle2 = 
limiter.acquire(50, handle2Reference::set);
+            assertThat(handle2)
+                    .as("The second handle should not be present as remaining 
permits are zero")
+                    .isNotPresent();
+        }
 
-        handle2 = limiter.acquire(100, handle2);
-        assertEquals(0, limiter.getRemainingBytes());
-        assertTrue(handle2.success);
-        assertEquals(handle2.acquiredPermits, 100);
-        assertEquals(2, handle2.trials);
+        // Attempt to acquire more than maxReadsInFlightSize while all permits 
are in use
+        AtomicReference<InflightReadsLimiter.Handle> 
handleExceedingMaxReference = new AtomicReference<>();
+        Optional<InflightReadsLimiter.Handle> handleExceedingMaxOptional =
+                limiter.acquire(200, handleExceedingMaxReference::set);
+        assertThat(handleExceedingMaxOptional)
+                .as("The second handle should not be present as remaining 
permits are zero")
+                .isNotPresent();
+
+        // Release handle1 permits
+        limiter.release(handle1.get());
+
+        if (!firstInQueue) {
+            assertThat(handle2Reference)
+                    .as("Handle2 should have been set in the callback and 
marked successful")
+                    .hasValueSatisfying(handle -> {
+                        assertThat(handle.success()).isTrue();
+                        assertThat(handle.permits()).isEqualTo(50);
+                    });
+            limiter.release(handle2Reference.get());
+        }
+
+        assertThat(handleExceedingMaxReference)
+                .as("Handle2 should have been set in the callback and marked 
successful")
+                .hasValueSatisfying(handle -> {
+                    assertThat(handle.success()).isTrue();
+                    
assertThat(handle.permits()).isEqualTo(maxReadsInFlightSize);
+                });
 
-        limiter.release(handle2);
-        assertEquals(100, limiter.getRemainingBytes());
+        limiter.release(handleExceedingMaxReference.get());
 
+        assertThat(limiter.getRemainingBytes())
+                .as("Remaining bytes should be fully replenished after 
releasing all permits")
+                .isEqualTo(maxReadsInFlightSize);
+    }
+
+    @Test
+    public void testAcquireExceedingMaxReadsWhenAllPermitsAvailable() throws 
Exception {
+        @Cleanup("shutdownNow")
+        ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor();
+
+        long maxReadsInFlightSize = 100;
+        InflightReadsLimiter limiter =
+                new InflightReadsLimiter(maxReadsInFlightSize, 
ACQUIRE_QUEUE_SIZE, ACQUIRE_TIMEOUT_MILLIS, executor,
+                        OpenTelemetry.noop());
+
+        // Initial state
+        assertThat(limiter.getRemainingBytes())
+                .as("Initial remaining bytes should match 
maxReadsInFlightSize")
+                .isEqualTo(maxReadsInFlightSize);
+
+        // Acquire permits > maxReadsInFlightSize
+        Optional<InflightReadsLimiter.Handle> handleExceedingMaxOptional =
+                limiter.acquire(2 * maxReadsInFlightSize, null);
+        assertThat(handleExceedingMaxOptional)
+                .as("The handle for exceeding max permits should be present")
+                .hasValueSatisfying(handle -> {
+                    assertThat(handle.success()).isTrue();
+                    
assertThat(handle.permits()).isEqualTo(maxReadsInFlightSize);
+                });
+        assertThat(limiter.getRemainingBytes())
+                .as("Remaining bytes should be zero after acquiring all 
permits")
+                .isEqualTo(0);
+
+        // Release permits
+        limiter.release(handleExceedingMaxOptional.get());
+
+        assertThat(limiter.getRemainingBytes())
+                .as("Remaining bytes should be fully replenished after 
releasing all permits")
+                .isEqualTo(maxReadsInFlightSize);
     }
 
     private Pair<OpenTelemetrySdk, InMemoryMetricReader> 
buildOpenTelemetryAndReader() {
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManagerTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManagerTest.java
index 383568c17e8..55068580f62 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManagerTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManagerTest.java
@@ -35,6 +35,7 @@ import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.stream.Collectors;
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
@@ -91,7 +92,8 @@ public class PendingReadsManagerTest  {
         config.setReadEntryTimeoutSeconds(10000);
         when(rangeEntryCache.getName()).thenReturn("my-topic");
         when(rangeEntryCache.getManagedLedgerConfig()).thenReturn(config);
-        inflighReadsLimiter = new InflightReadsLimiter(0, 
OpenTelemetry.noop());
+        inflighReadsLimiter = new InflightReadsLimiter(0, 0, 0,
+                mock(ScheduledExecutorService.class), OpenTelemetry.noop());
         
when(rangeEntryCache.getPendingReadsLimiter()).thenReturn(inflighReadsLimiter);
         pendingReadsManager = new PendingReadsManager(rangeEntryCache);
         doAnswer(new Answer() {
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index d6df2a00c3a..d948b3afece 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -2114,6 +2114,15 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
             + " Consumer Netty channel. Use O to disable")
     private long managedLedgerMaxReadsInFlightSizeInMB = 0;
 
+    @FieldContext(category = CATEGORY_STORAGE_ML, doc = "Maximum time to wait 
for acquiring permits for max reads in "
+            + "flight when managedLedgerMaxReadsInFlightSizeInMB is set (>0) 
and the limit is reached.")
+    private long managedLedgerMaxReadsInFlightPermitsAcquireTimeoutMillis = 
60000;
+
+    @FieldContext(category = CATEGORY_STORAGE_ML, doc = "Maximum number of 
reads that can be queued for acquiring "
+            + "permits for max reads in flight when 
managedLedgerMaxReadsInFlightSizeInMB is set (>0) and the limit "
+            + "is reached.")
+    private int managedLedgerMaxReadsInFlightPermitsAcquireQueueSize = 50000;
+
     @FieldContext(
         category = CATEGORY_STORAGE_ML,
         dynamic = true,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
index 3d945afe4c1..b060475a43f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
@@ -72,8 +72,21 @@ public class ManagedLedgerClientFactory implements 
ManagedLedgerStorage {
         managedLedgerFactoryConfig.setCacheEvictionTimeThresholdMillis(
                 conf.getManagedLedgerCacheEvictionTimeThresholdMillis());
         
managedLedgerFactoryConfig.setCopyEntriesInCache(conf.isManagedLedgerCacheCopyEntries());
-        managedLedgerFactoryConfig.setManagedLedgerMaxReadsInFlightSize(
-                conf.getManagedLedgerMaxReadsInFlightSizeInMB() * 1024L * 
1024L);
+        long managedLedgerMaxReadsInFlightSizeBytes = 
conf.getManagedLedgerMaxReadsInFlightSizeInMB() * 1024L * 1024L;
+        if (managedLedgerMaxReadsInFlightSizeBytes > 0 && 
conf.getDispatcherMaxReadSizeBytes() > 0
+                && managedLedgerMaxReadsInFlightSizeBytes < 
conf.getDispatcherMaxReadSizeBytes()) {
+            log.warn("Invalid configuration for 
managedLedgerMaxReadsInFlightSizeInMB: {}, "
+                            + "dispatcherMaxReadSizeBytes: {}. 
managedLedgerMaxReadsInFlightSizeInMB in bytes should "
+                            + "be greater than dispatcherMaxReadSizeBytes. You 
should set "
+                            + "managedLedgerMaxReadsInFlightSizeInMB to at 
least {}",
+                    conf.getManagedLedgerMaxReadsInFlightSizeInMB(), 
conf.getDispatcherMaxReadSizeBytes(),
+                    (conf.getDispatcherMaxReadSizeBytes() / (1024L * 1024L)) + 
1);
+        }
+        
managedLedgerFactoryConfig.setManagedLedgerMaxReadsInFlightSize(managedLedgerMaxReadsInFlightSizeBytes);
+        
managedLedgerFactoryConfig.setManagedLedgerMaxReadsInFlightPermitsAcquireTimeoutMillis(
+                
conf.getManagedLedgerMaxReadsInFlightPermitsAcquireTimeoutMillis());
+        
managedLedgerFactoryConfig.setManagedLedgerMaxReadsInFlightPermitsAcquireQueueSize(
+                
conf.getManagedLedgerMaxReadsInFlightPermitsAcquireQueueSize());
         managedLedgerFactoryConfig.setPrometheusStatsLatencyRolloverSeconds(
                 conf.getManagedLedgerPrometheusStatsLatencyRolloverSeconds());
         
managedLedgerFactoryConfig.setTraceTaskExecution(conf.isManagedLedgerTraceTaskExecution());

Reply via email to