This is an automated email from the ASF dual-hosted git repository.

zhaocong pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new d3929765749 [improve][ml] Optimize ledger opening by skipping fully 
acknowledged ledgers (#24655)
d3929765749 is described below

commit d39297657490caec19b22bdf8848a53504bf83f5
Author: Cong Zhao <[email protected]>
AuthorDate: Mon Aug 25 10:21:37 2025 +0800

    [improve][ml] Optimize ledger opening by skipping fully acknowledged 
ledgers (#24655)
    
    (cherry picked from commit da0d11644f097ae657e79b2eb7835478d79ccd0e)
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  4 +-
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 63 +++++++++++++++++
 .../bookkeeper/mledger/impl/OpReadEntry.java       |  7 +-
 .../org/apache/bookkeeper/mledger/impl/OpScan.java |  2 +-
 .../bookkeeper/mledger/impl/ManagedCursorTest.java | 79 +++++++++++++++++++++-
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java |  2 +-
 6 files changed, 150 insertions(+), 7 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 40acb40e98d..8a6af494b81 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -901,7 +901,7 @@ public class ManagedCursorImpl implements ManagedCursor {
         // Skip deleted entries.
         skipCondition = skipCondition == null ? this::isMessageDeleted : 
skipCondition.or(this::isMessageDeleted);
         OpReadEntry op =
-                OpReadEntry.create(this, readPosition, numOfEntriesToRead, 
callback, ctx, maxPosition, skipCondition);
+            OpReadEntry.create(this, readPosition, numOfEntriesToRead, 
callback, ctx, maxPosition, skipCondition, true);
         ledger.asyncReadEntries(op);
     }
 
@@ -1061,7 +1061,7 @@ public class ManagedCursorImpl implements ManagedCursor {
             // Skip deleted entries.
             skipCondition = skipCondition == null ? this::isMessageDeleted : 
skipCondition.or(this::isMessageDeleted);
             OpReadEntry op = OpReadEntry.create(this, readPosition, 
numberOfEntriesToRead, callback,
-                    ctx, maxPosition, skipCondition);
+                    ctx, maxPosition, skipCondition, true);
             int opReadId = op.id;
             if (!WAITING_READ_OP_UPDATER.compareAndSet(this, null, op)) {
                 op.recycle();
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 960ae57db76..54f1da61430 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1945,6 +1945,19 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
                 return;
             }
 
+            // Optimization: Check if all entries in this ledger have been 
deleted (acknowledged)
+            // If so, skip opening the ledger and move to the next one
+            if (opReadEntry.cursor != null && 
opReadEntry.skipOpenLedgerFullyAcked
+                && isLedgerFullyAcked(ledgerId, ledgerInfo, 
opReadEntry.cursor)) {
+                log.info("[{}] All entries in ledger {} have been acked, 
skipping ledger opening", name, ledgerId);
+                // Move to the next ledger
+                Long nextLedgerId = ledgers.ceilingKey(ledgerId + 1);
+                opReadEntry.updateReadPosition(
+                    
PositionImpl.get(Objects.requireNonNullElseGet(nextLedgerId, () -> ledgerId + 
1), 0));
+                opReadEntry.checkReadCompletion();
+                return;
+            }
+
             // Get a ledger handle to read from
             getLedgerHandle(ledgerId).thenAccept(ledger -> 
internalReadFromLedger(ledger, opReadEntry)).exceptionally(ex
                     -> {
@@ -1957,6 +1970,56 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         }
     }
 
+    /**
+     * Check if all entries in the specified ledger have been acknowledged by 
the cursor.
+     * This optimization helps avoid opening ledgers that have no unacked 
entries.
+     *
+     * @param ledgerId the ledger ID to check
+     * @param ledgerInfo the ledger information
+     * @param cursor the cursor reading from this ledger
+     * @return true if all entries in the ledger have been acknowledged, false 
otherwise
+     */
+    private boolean isLedgerFullyAcked(long ledgerId, LedgerInfo ledgerInfo, 
ManagedCursorImpl cursor) {
+        if (ledgerInfo == null || ledgerInfo.getEntries() == 0) {
+            return true;
+        }
+
+        // Get the cursor's mark delete position
+        Position markDeletedPosition = cursor.getMarkDeletedPosition();
+        if (markDeletedPosition == null) {
+            return false;
+        }
+
+        // If the mark delete position is in a later ledger, then this ledger 
is fully acknowledged
+        if (markDeletedPosition.getLedgerId() > ledgerId) {
+            return true;
+        }
+
+        // Check if all entries in this ledger are individually deleted
+        if (markDeletedPosition.getLedgerId() <= ledgerId) {
+            final long lastEntryInLedger = ledgerInfo.getEntries() - 1;
+            Position startPosition = PositionImpl.get(ledgerId, 0);
+            if (markDeletedPosition.getLedgerId() == ledgerId) {
+                // The mark delete position represents the last acknowledged 
entry
+                // If it points to the last entry in the ledger, then the 
ledger is fully acknowledged
+                if (markDeletedPosition.getEntryId() >= lastEntryInLedger) {
+                    return true;
+                }
+
+                startPosition = markDeletedPosition;
+            }
+
+            Range<PositionImpl> scanRange =
+                Range.closed((PositionImpl) startPosition, 
PositionImpl.get(ledgerId, lastEntryInLedger));
+            long unackMessages = cursor.getNumberOfEntries(scanRange);
+            // All entries are individually deleted
+            return unackMessages == 0;
+        }
+
+        // If mark delete position is in an earlier ledger, this ledger is not 
consumed
+        return false;
+    }
+
     public CompletableFuture<String> getLedgerMetadata(long ledgerId) {
         LedgerHandle currentLedger = this.currentLedger;
         if (currentLedger != null && ledgerId == currentLedger.getId()) {
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
index 20c7742129b..a99269bd691 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
@@ -54,9 +54,12 @@ class OpReadEntry implements ReadEntriesCallback {
     PositionImpl maxPosition;
 
     Predicate<PositionImpl> skipCondition;
+    boolean skipOpenLedgerFullyAcked = false;
 
     public static OpReadEntry create(ManagedCursorImpl cursor, PositionImpl 
readPositionRef, int count,
-            ReadEntriesCallback callback, Object ctx, PositionImpl 
maxPosition, Predicate<PositionImpl> skipCondition) {
+                                     ReadEntriesCallback callback, Object ctx, 
PositionImpl maxPosition,
+                                     Predicate<PositionImpl> skipCondition,
+                                     boolean skipOpenLedgerFullyAcked) {
         OpReadEntry op = RECYCLER.get();
         op.id = opReadIdGenerator.getAndIncrement();
         op.readPosition = 
cursor.ledger.startReadOperationOnLedger(readPositionRef);
@@ -69,6 +72,7 @@ class OpReadEntry implements ReadEntriesCallback {
         }
         op.maxPosition = maxPosition;
         op.skipCondition = skipCondition;
+        op.skipOpenLedgerFullyAcked = skipOpenLedgerFullyAcked;
         op.ctx = ctx;
         op.nextReadPosition = PositionImpl.get(op.readPosition);
         return op;
@@ -244,6 +248,7 @@ class OpReadEntry implements ReadEntriesCallback {
         nextReadPosition = null;
         maxPosition = null;
         skipCondition = null;
+        skipOpenLedgerFullyAcked = false;
         recyclerHandle.recycle(this);
     }
 
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java
index 6d68b042a7a..8e1b4da54ae 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java
@@ -128,7 +128,7 @@ class OpScan implements ReadEntriesCallback {
         }
         if (cursor.hasMoreEntries(searchPosition)) {
             OpReadEntry opReadEntry = OpReadEntry.create(cursor, 
searchPosition, batchSize,
-            this, OpScan.this.ctx, null, null);
+            this, OpScan.this.ctx, null, null, false);
             ledger.asyncReadEntries(opReadEntry);
         } else {
             callback.scanComplete(lastSeenPosition, ScanOutcome.COMPLETED, 
OpScan.this.ctx);
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 5f80cdf43b4..fff2a61530e 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -20,11 +20,15 @@ package org.apache.bookkeeper.mledger.impl;
 
 import static 
org.apache.bookkeeper.mledger.impl.EntryCountEstimator.estimateEntryCountByBytesSize;
 import static 
org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
+import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
@@ -4341,7 +4345,7 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
 
         // op readPosition is bigger than maxReadPosition
         OpReadEntry opReadEntry = OpReadEntry.create(cursor, 
ledger.lastConfirmedEntry, 10, callback,
-                null, PositionImpl.get(lastPosition.getLedgerId(), -1), null);
+                null, PositionImpl.get(lastPosition.getLedgerId(), -1), null, 
true);
         Field field = ManagedCursorImpl.class.getDeclaredField("readPosition");
         field.setAccessible(true);
         field.set(cursor, PositionImpl.EARLIEST);
@@ -4363,7 +4367,7 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
         };
 
         @Cleanup final MockedStatic<OpReadEntry> mockedStaticOpReadEntry = 
Mockito.mockStatic(OpReadEntry.class);
-        mockedStaticOpReadEntry.when(() -> OpReadEntry.create(any(), any(), 
anyInt(), any(), any(), any(), any()))
+        mockedStaticOpReadEntry.when(() -> OpReadEntry.create(any(), any(), 
anyInt(), any(), any(), any(), any(), anyBoolean()))
                 .thenAnswer(__ -> createOpReadEntry.get());
 
         final ManagedLedgerConfig ledgerConfig = new ManagedLedgerConfig();
@@ -5340,5 +5344,76 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
         assertEquals(future1.get(2, TimeUnit.SECONDS).get(0).getData(), 
"msg".getBytes());
     }
 
+    @Test
+    public void testSkipOpenLedgerFullyAcked() throws Exception {
+        ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
+        managedLedgerConfig.setMaxEntriesPerLedger(10);
+        managedLedgerConfig.setMinimumRolloverTime(0, TimeUnit.MILLISECONDS);
+        ManagedLedger ledger = factory.open("testSkipOpenLedgerFullyAcked", 
managedLedgerConfig);
+        ManagedCursor cursor = ledger.openCursor("cursor");
+
+        List<Position> positions = new ArrayList<>();
+        for (int i = 0; i < 10; i++) {
+            Position pos = ledger.addEntry(("entry-" + i).getBytes());
+            positions.add(pos);
+        }
+
+        ((ManagedLedgerImpl) ledger).rollCurrentLedgerIfFull();
+
+        for (int i = 10; i < 20; i++) {
+            Position pos = ledger.addEntry(("entry-" + i).getBytes());
+            positions.add(pos);
+        }
+
+        ManagedLedgerImpl ledgerImpl = (ManagedLedgerImpl) ledger;
+        ManagedCursorImpl cursorImpl = (ManagedCursorImpl) cursor;
+
+        for (int i = 0; i < 5; i++) {
+            cursor.markDelete(positions.get(i));
+        }
+
+        for (int i = 5; i < 10; i++) {
+            cursor.delete(positions.get(i));
+        }
+
+        long firstLedgerId = positions.get(0).getLedgerId();
+        long secondLedgerId = positions.get(10).getLedgerId();
+        log.info("First ledger id is {}, Second ledger id is {}", 
firstLedgerId, secondLedgerId);
+
+        ManagedLedgerImpl spyLedger = spy(ledgerImpl);
+
+        Position readPosition = PositionFactory.create(firstLedgerId, 0);
+        CountDownLatch readLatch = new CountDownLatch(1);
+
+        ReadEntriesCallback callback = new ReadEntriesCallback() {
+            @Override
+            public void readEntriesComplete(List<Entry> entries, Object ctx) {
+                try {
+                    if (!entries.isEmpty()) {
+                        entries.forEach(Entry::release);
+                    }
+                } finally {
+                    readLatch.countDown();
+                }
+            }
+
+            @Override
+            public void readEntriesFailed(ManagedLedgerException exception, 
Object ctx) {
+                log.error("Read failed", exception);
+                readLatch.countDown();
+            }
+        };
+
+        OpReadEntry opReadEntry = OpReadEntry.create(cursorImpl, readPosition, 
5, callback, null, null, null, true);
+
+        spyLedger.asyncReadEntries(opReadEntry);
+
+        assertTrue(readLatch.await(10, TimeUnit.SECONDS));
+
+        verify(spyLedger, never()).getLedgerHandle(firstLedgerId);
+
+        ledger.close();
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(ManagedCursorTest.class);
 }
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 8972ade225f..ca4abdc41b8 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -686,7 +686,7 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
                     public void readEntriesFailed(ManagedLedgerException 
exception, Object ctx) {
 
                     }
-                }, null, maxPosition, null);
+                }, null, maxPosition, null, false);
         Assert.assertEquals(opReadEntry.readPosition, position);
     }
 

Reply via email to