codelipenghui commented on code in PR #18245:
URL: https://github.com/apache/pulsar/pull/18245#discussion_r1019032971
##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java:
##########
@@ -44,6 +44,8 @@ protected EntryImpl newObject(Handle<EntryImpl> handle) {
private long entryId;
ByteBuf data;
+ private Runnable onDellocate;
Review Comment:
```suggestion
private Runnable onDeallocate;
```
##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java:
##########
@@ -63,6 +66,7 @@ public static EntryImpl create(long ledgerId, long entryId,
byte[] data) {
entry.entryId = entryId;
entry.data = Unpooled.wrappedBuffer(data);
entry.setRefCnt(1);
+ entry.onDellocate = null;
Review Comment:
We can remove this line, default should be null.
##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java:
##########
@@ -313,6 +349,74 @@ void asyncReadEntry0(ReadHandle lh, long firstEntry, long
lastEntry, boolean sho
}
}
+ private AsyncCallbacks.ReadEntriesCallback
handlePendingReadsLimits(ReadHandle lh,
+ long
firstEntry, long lastEntry,
+ boolean
shouldCacheEntry,
+
AsyncCallbacks.ReadEntriesCallback originalCallback,
+ Object ctx,
InflightReadsLimiter.Handle handle) {
+ InflightReadsLimiter pendingReadsLimiter = getPendingReadsLimiter();
+ if (pendingReadsLimiter.isDisabled()) {
+ return originalCallback;
+ }
+ long estimatedReadSize = (1 + lastEntry - firstEntry)
+ * (estimatedEntrySize + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
+ final AsyncCallbacks.ReadEntriesCallback callback;
+ InflightReadsLimiter.Handle newHandle =
pendingReadsLimiter.acquire(estimatedReadSize, handle);
+ if (!newHandle.success) {
+ long now = System.currentTimeMillis();
+ if (now - newHandle.creationTime > readEntryTimeoutMillis) {
+ String message = "Time-out elapsed while acquiring enough
permits "
+ + "on the memory limiter to read from ledger "
+ + lh.getId()
+ + ", " + getName()
+ + ", estimated read size " + estimatedReadSize + "
bytes"
+ + " for " + (1 + lastEntry - firstEntry)
+ + " entries (check
managedLedgerMaxReadsInFlightSizeInMB)";
+ log.error(message);
+ pendingReadsLimiter.release(newHandle);
+ originalCallback.readEntriesFailed(
+ new
ManagedLedgerException.TooManyRequestsException(message), ctx);
+ return null;
+ }
+ ml.getExecutor().submitOrdered(lh.getId(), () -> {
+ asyncReadEntry0WithLimits(lh, firstEntry, lastEntry,
shouldCacheEntry,
+ originalCallback, ctx, newHandle);
+ return null;
+ });
+ return null;
+ } else {
+ callback = new AsyncCallbacks.ReadEntriesCallback() {
+
+ @Override
+ public void readEntriesComplete(List<Entry> entries, Object
ctx) {
+ if (!entries.isEmpty()) {
+ long size = entries.get(0).getLength();
+ estimatedEntrySize = size;
Review Comment:
Can we use the `avgMessagesPerEntry` from the consumer?
The `RangeEntryCacheImpl.java` is shared across all the topics. If
calculated at the topic level, we should be able to get a more precise
estimated entry size.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]