This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 793e9472eeb [fix][ml] Fix cursor backlog size to account for
individual acks (#25089)
793e9472eeb is described below
commit 793e9472eebcbfc142c3f0e2df2b7be2de21bdca
Author: Penghui Li <[email protected]>
AuthorDate: Fri Jan 2 05:28:22 2026 -0800
[fix][ml] Fix cursor backlog size to account for individual acks (#25089)
---
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 64 +++++++++++++++++-
.../bookkeeper/mledger/impl/ManagedCursorTest.java | 75 ++++++++++++++++++++++
2 files changed, 138 insertions(+), 1 deletion(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 137be071d67..c0f29ea5c22 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1282,7 +1282,69 @@ public class ManagedCursorImpl implements ManagedCursor {
@Override
public long getEstimatedSizeSinceMarkDeletePosition() {
- return ledger.estimateBacklogFromPosition(markDeletePosition);
+ long totalSize =
ledger.estimateBacklogFromPosition(markDeletePosition);
+
+ // Need to subtract size of individual deleted messages
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Calculating backlog size for cursor {} from
position {}, totalSize: {}",
+ ledger.getName(), name, markDeletePosition, totalSize);
+ }
+
+ // Get count of individually deleted entries in the backlog range
+ long deletedCount = 0;
+ lock.readLock().lock();
+ try {
+ Range<Position> backlogRange =
Range.openClosed(markDeletePosition, ledger.getLastPosition());
+
+ if (getConfig().isUnackedRangesOpenCacheSetEnabled()) {
+ deletedCount = individualDeletedMessages.cardinality(
+ backlogRange.lowerEndpoint().getLedgerId(),
backlogRange.lowerEndpoint().getEntryId(),
+ backlogRange.upperEndpoint().getLedgerId(),
backlogRange.upperEndpoint().getEntryId());
+ } else {
+ AtomicLong deletedCounter = new AtomicLong(0);
+ individualDeletedMessages.forEach((r) -> {
+ if (r.isConnected(backlogRange)) {
+ Range<Position> intersection =
r.intersection(backlogRange);
+ long countInRange =
ledger.getNumberOfEntries(intersection);
+ deletedCounter.addAndGet(countInRange);
+ }
+ return true;
+ }, recyclePositionRangeConverter);
+ deletedCount = deletedCounter.get();
+ }
+ } finally {
+ lock.readLock().unlock();
+ }
+
+ if (deletedCount == 0) {
+ return totalSize;
+ }
+
+ // Estimate size by using average entry size from the backlog range
+ Range<Position> backlogRange = Range.openClosed(markDeletePosition,
ledger.getLastPosition());
+ long totalEntriesInBacklog = ledger.getNumberOfEntries(backlogRange);
+
+ if (totalEntriesInBacklog <= deletedCount || totalEntriesInBacklog ==
0) {
+ // Should not happen, but avoid division by zero
+ log.warn("[{}] [{}] Inconsistent state: totalEntriesInBacklog={},
deletedCount={}",
+ ledger.getName(), name, totalEntriesInBacklog,
deletedCount);
+ return Math.max(0, totalSize); // Return the total size and log
the issue
+ }
+
+ // Calculate average size in the backlog range
+ long averageSize = totalSize / totalEntriesInBacklog;
+
+ // Subtract size of deleted entries
+ long deletedSize = deletedCount * averageSize;
+ long adjustedSize = totalSize - deletedSize;
+
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] [{}] Adjusted backlog size: totalSize={},
deletedCount={}, averageSize={}, "
+ + "deletedSize={}, adjustedSize={}",
+ ledger.getName(), name, totalSize, deletedCount,
averageSize, deletedSize, adjustedSize);
+ }
+
+ return adjustedSize;
}
private long getNumberOfEntriesInBacklog() {
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index a65d830096e..123a5cf048d 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -3685,6 +3685,81 @@ public class ManagedCursorTest extends
MockedBookKeeperTestCase {
assertEquals(cursor.getEstimatedSizeSinceMarkDeletePosition(), 10 *
entryData.length);
}
+ /**
+ * Test that cursor.getEstimatedSizeSinceMarkDeletePosition() correctly
accounts for individual
+ * message deletions (asyncDelete/individual ack).
+ *
+ * This verifies the fix: when messages are acknowledged out of order
using asyncDelete,
+ * the backlog size is now correctly adjusted to reflect the individually
deleted messages.
+ */
+ @Test(timeOut = 20000)
+ public void testEstimatedSizeWithIndividualAcks() throws Exception {
+ ManagedLedger ledger =
factory.open("test_cursor_backlog_size_individual_acks");
+ ManagedCursor cursor = ledger.openCursor("c1");
+
+ // Each entry is 100 bytes
+ byte[] entryData = new byte[100];
+
+ // Add 5 entries: positions should be 0:0, 0:1, 0:2, 0:3, 0:4
+ List<Position> positions = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ positions.add(ledger.addEntry(entryData));
+ }
+
+ // Initial state: 5 entries * 100 bytes = 500 bytes
+ assertEquals(cursor.getEstimatedSizeSinceMarkDeletePosition(), 500);
+
+ // Read all entries so they can be acknowledged
+ List<Entry> entries = cursor.readEntries(5);
+ assertEquals(entries.size(), 5);
+ entries.forEach(Entry::release);
+
+ // Individual acknowledge positions 1, 3, 4 (leaving 0:0 and 0:2
unacknowledged)
+ AtomicInteger callbackCount = new AtomicInteger(0);
+ CountDownLatch latch = new CountDownLatch(3);
+
+ DeleteCallback callback = new DeleteCallback() {
+ @Override
+ public void deleteComplete(Object ctx) {
+ callbackCount.incrementAndGet();
+ latch.countDown();
+ }
+
+ @Override
+ public void deleteFailed(ManagedLedgerException exception, Object
ctx) {
+ latch.countDown();
+ }
+ };
+
+ cursor.asyncDelete(positions.get(1), callback, null);
+ cursor.asyncDelete(positions.get(3), callback, null);
+ cursor.asyncDelete(positions.get(4), callback, null);
+
+ // Wait for async operations to complete
+ assertTrue(latch.await(5, TimeUnit.SECONDS), "Deletes should
complete");
+ assertEquals(callbackCount.get(), 3, "All 3 deletes should succeed");
+
+ // Get current state
+ // After fix: should now account for individual deleted messages
+ long expectedBacklogSize = 200; // 2 remaining entries (0:0, 0:2) *
100 bytes
+ long actualBacklogSize =
cursor.getEstimatedSizeSinceMarkDeletePosition();
+ Position markDeletePos = cursor.getMarkDeletedPosition();
+
+ log.info("Backlog size after individual acks:");
+ log.info(" Expected: {}. Actual: {}", expectedBacklogSize,
actualBacklogSize);
+ log.info(" Mark delete position: {}", markDeletePos);
+ log.info(" Individual deleted: {}", ((ManagedCursorImpl)
cursor).getIndividuallyDeletedMessagesSet());
+
+ // After fix: backlog size should now correctly account for individual
deletions
+ assertEquals(actualBacklogSize, expectedBacklogSize,
+ "Backlog size should account for individual deletions");
+
+ // Verify both count and size are correct
+ assertEquals(cursor.getNumberOfEntriesInBacklog(true), 2, "Backlog
count should be 2");
+
+ ledger.close();
+ }
+
@Test(timeOut = 20000)
public void testRecoverCursorAheadOfLastPosition() throws Exception {
final String mlName = "my_test_ledger";