eolivelli commented on code in PR #18245:
URL: https://github.com/apache/pulsar/pull/18245#discussion_r1019103930


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java:
##########
@@ -313,6 +349,74 @@ void asyncReadEntry0(ReadHandle lh, long firstEntry, long 
lastEntry, boolean sho
         }
     }
 
+    private AsyncCallbacks.ReadEntriesCallback 
handlePendingReadsLimits(ReadHandle lh,
+                                                                long 
firstEntry, long lastEntry,
+                                                                boolean 
shouldCacheEntry,
+                                                                
AsyncCallbacks.ReadEntriesCallback originalCallback,
+                                                                Object ctx, 
InflightReadsLimiter.Handle handle) {
+        InflightReadsLimiter pendingReadsLimiter = getPendingReadsLimiter();
+        if (pendingReadsLimiter.isDisabled()) {
+            return originalCallback;
+        }
+        long estimatedReadSize = (1 + lastEntry - firstEntry)
+                * (estimatedEntrySize + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
+        final AsyncCallbacks.ReadEntriesCallback callback;
+        InflightReadsLimiter.Handle newHandle = 
pendingReadsLimiter.acquire(estimatedReadSize, handle);
+        if (!newHandle.success) {
+            long now = System.currentTimeMillis();
+            if (now - newHandle.creationTime > readEntryTimeoutMillis) {
+                String message = "Time-out elapsed while acquiring enough 
permits "
+                        + "on the memory limiter to read from ledger "
+                        + lh.getId()
+                        + ", " + getName()
+                        + ", estimated read size " + estimatedReadSize + " 
bytes"
+                        + " for " + (1 + lastEntry - firstEntry)
+                        + " entries (check 
managedLedgerMaxReadsInFlightSizeInMB)";
+                log.error(message);
+                pendingReadsLimiter.release(newHandle);
+                originalCallback.readEntriesFailed(
+                        new 
ManagedLedgerException.TooManyRequestsException(message), ctx);
+                return null;
+            }
+            ml.getExecutor().submitOrdered(lh.getId(), () -> {
+                asyncReadEntry0WithLimits(lh, firstEntry, lastEntry, 
shouldCacheEntry,
+                        originalCallback, ctx, newHandle);
+                return null;
+            });
+            return null;
+        } else {
+            callback = new AsyncCallbacks.ReadEntriesCallback() {
+
+                @Override
+                public void readEntriesComplete(List<Entry> entries, Object 
ctx) {
+                    if (!entries.isEmpty()) {
+                        long size = entries.get(0).getLength();
+                        estimatedEntrySize = size;

Review Comment:
   Unfortunately here we are in "managed-ledger" module and in order to get a 
value from the Dispatcher/Consumer I would have to change many internal APIs.
   
   If the size of entries in the topics is similar for all the entries that I 
think that this is a good estimate.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to