This is an automated email from the ASF dual-hosted git repository.
zhaocong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new da0d11644f0 [improve][ml] Optimize ledger opening by skipping fully
acknowledged ledgers (#24655)
da0d11644f0 is described below
commit da0d11644f097ae657e79b2eb7835478d79ccd0e
Author: Cong Zhao <[email protected]>
AuthorDate: Mon Aug 25 10:21:37 2025 +0800
[improve][ml] Optimize ledger opening by skipping fully acknowledged
ledgers (#24655)
---
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 4 +-
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 63 +++++++++++++++++
.../bookkeeper/mledger/impl/OpReadEntry.java | 7 +-
.../org/apache/bookkeeper/mledger/impl/OpScan.java | 2 +-
.../bookkeeper/mledger/impl/ManagedCursorTest.java | 79 +++++++++++++++++++++-
.../bookkeeper/mledger/impl/ManagedLedgerTest.java | 2 +-
6 files changed, 150 insertions(+), 7 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 9cba4d863e2..2a0c5d06e4b 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -912,7 +912,7 @@ public class ManagedCursorImpl implements ManagedCursor {
// Skip deleted entries.
skipCondition = skipCondition == null ? this::isMessageDeleted :
skipCondition.or(this::isMessageDeleted);
OpReadEntry op =
- OpReadEntry.create(this, readPosition, numOfEntriesToRead,
callback, ctx, maxPosition, skipCondition);
+ OpReadEntry.create(this, readPosition, numOfEntriesToRead,
callback, ctx, maxPosition, skipCondition, true);
ledger.asyncReadEntries(op);
}
@@ -1072,7 +1072,7 @@ public class ManagedCursorImpl implements ManagedCursor {
// Skip deleted entries.
skipCondition = skipCondition == null ? this::isMessageDeleted :
skipCondition.or(this::isMessageDeleted);
OpReadEntry op = OpReadEntry.create(this, readPosition,
numberOfEntriesToRead, callback,
- ctx, maxPosition, skipCondition);
+ ctx, maxPosition, skipCondition, true);
int opReadId = op.id;
if (!WAITING_READ_OP_UPDATER.compareAndSet(this, null, op)) {
op.recycle();
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 00a3387df3c..895984ab470 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -2044,6 +2044,19 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
return;
}
+ // Optimization: Check if all entries in this ledger have been
deleted (acknowledged)
+ // If so, skip opening the ledger and move to the next one
+ if (opReadEntry.cursor != null &&
opReadEntry.skipOpenLedgerFullyAcked
+ && isLedgerFullyAcked(ledgerId, ledgerInfo,
opReadEntry.cursor)) {
+ log.info("[{}] All entries in ledger {} have been acked,
skipping ledger opening", name, ledgerId);
+ // Move to the next ledger
+ Long nextLedgerId = ledgers.ceilingKey(ledgerId + 1);
+ opReadEntry.updateReadPosition(
+
PositionFactory.create(Objects.requireNonNullElseGet(nextLedgerId, () ->
ledgerId + 1), 0));
+ opReadEntry.checkReadCompletion();
+ return;
+ }
+
// Get a ledger handle to read from
getLedgerHandle(ledgerId).thenAccept(ledger ->
internalReadFromLedger(ledger, opReadEntry)).exceptionally(ex
-> {
@@ -2056,6 +2069,56 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
}
}
+ /**
+ * Check if all entries in the specified ledger have been acknowledged by
the cursor.
+ * This optimization helps avoid opening ledgers that have no unacked
entries.
+ *
+ * @param ledgerId the ledger ID to check
+ * @param ledgerInfo the ledger information
+ * @param cursor the cursor reading from this ledger
+ * @return true if all entries in the ledger have been acknowledged, false
otherwise
+ */
+ private boolean isLedgerFullyAcked(long ledgerId, LedgerInfo ledgerInfo,
ManagedCursorImpl cursor) {
+ if (ledgerInfo == null || ledgerInfo.getEntries() == 0) {
+ return true;
+ }
+
+ // Get the cursor's mark delete position
+ Position markDeletedPosition = cursor.getMarkDeletedPosition();
+ if (markDeletedPosition == null) {
+ return false;
+ }
+
+ // If the mark delete position is in a later ledger, then this ledger
is fully acknowledged
+ if (markDeletedPosition.getLedgerId() > ledgerId) {
+ return true;
+ }
+
+ // Check if all entries in this ledger are individually deleted
+ if (markDeletedPosition.getLedgerId() <= ledgerId) {
+ final long lastEntryInLedger = ledgerInfo.getEntries() - 1;
+ Position startPosition = PositionFactory.create(ledgerId, 0);
+ if (markDeletedPosition.getLedgerId() == ledgerId) {
+ // The mark delete position represents the last acknowledged
entry
+ // If it points to the last entry in the ledger, then the
ledger is fully acknowledged
+ if (markDeletedPosition.getEntryId() >= lastEntryInLedger) {
+ return true;
+ }
+
+ startPosition = markDeletedPosition;
+ }
+
+ Range<Position> scanRange =
+ Range.closed(startPosition, PositionFactory.create(ledgerId,
lastEntryInLedger));
+ long unackMessages = cursor.getNumberOfEntries(scanRange);
+ // All entries are individually deleted
+ return unackMessages == 0;
+ }
+
+ // If mark delete position is in an earlier ledger, this ledger is not
consumed
+ return false;
+ }
+
public CompletableFuture<LedgerMetadata> getLedgerMetadata(long ledgerId) {
LedgerHandle currentLedger = this.currentLedger;
if (currentLedger != null && ledgerId == currentLedger.getId()) {
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
index 4576d7cd67e..b618a25aa3d 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
@@ -55,9 +55,12 @@ class OpReadEntry implements ReadEntriesCallback {
Position maxPosition;
Predicate<Position> skipCondition;
+ boolean skipOpenLedgerFullyAcked = false;
public static OpReadEntry create(ManagedCursorImpl cursor, Position
readPositionRef, int count,
- ReadEntriesCallback callback, Object ctx, Position maxPosition,
Predicate<Position> skipCondition) {
+ ReadEntriesCallback callback, Object ctx,
Position maxPosition,
+ Predicate<Position> skipCondition,
+ boolean skipOpenLedgerFullyAcked) {
OpReadEntry op = RECYCLER.get();
op.id = opReadIdGenerator.getAndIncrement();
op.readPosition =
cursor.ledger.startReadOperationOnLedger(readPositionRef);
@@ -70,6 +73,7 @@ class OpReadEntry implements ReadEntriesCallback {
}
op.maxPosition = maxPosition;
op.skipCondition = skipCondition;
+ op.skipOpenLedgerFullyAcked = skipOpenLedgerFullyAcked;
op.ctx = ctx;
op.nextReadPosition = PositionFactory.create(op.readPosition);
return op;
@@ -247,6 +251,7 @@ class OpReadEntry implements ReadEntriesCallback {
nextReadPosition = null;
maxPosition = null;
skipCondition = null;
+ skipOpenLedgerFullyAcked = false;
recyclerHandle.recycle(this);
}
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java
index 72d05ede3a0..732071ee01a 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java
@@ -128,7 +128,7 @@ class OpScan implements ReadEntriesCallback {
}
if (cursor.hasMoreEntries(searchPosition)) {
OpReadEntry opReadEntry = OpReadEntry.create(cursor,
searchPosition, batchSize,
- this, OpScan.this.ctx, null, null);
+ this, OpScan.this.ctx, null, null, false);
ledger.asyncReadEntries(opReadEntry);
} else {
callback.scanComplete(lastSeenPosition, ScanOutcome.COMPLETED,
OpScan.this.ctx);
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index a88605dbb34..b77cd8dbf62 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -20,11 +20,15 @@ package org.apache.bookkeeper.mledger.impl;
import static
org.apache.bookkeeper.mledger.impl.EntryCountEstimator.estimateEntryCountByBytesSize;
import static
org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
+import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
@@ -4391,7 +4395,7 @@ public class ManagedCursorTest extends
MockedBookKeeperTestCase {
// op readPosition is bigger than maxReadPosition
OpReadEntry opReadEntry = OpReadEntry.create(cursor,
ledger.lastConfirmedEntry, 10, callback,
- null, PositionFactory.create(lastPosition.getLedgerId(), -1),
null);
+ null, PositionFactory.create(lastPosition.getLedgerId(), -1),
null, true);
Field field = ManagedCursorImpl.class.getDeclaredField("readPosition");
field.setAccessible(true);
field.set(cursor, PositionFactory.EARLIEST);
@@ -4414,7 +4418,7 @@ public class ManagedCursorTest extends
MockedBookKeeperTestCase {
@Cleanup final MockedStatic<OpReadEntry> mockedStaticOpReadEntry =
Mockito.mockStatic(OpReadEntry.class);
mockedStaticOpReadEntry.when(() -> OpReadEntry.create(any(), any(),
anyInt(), any(),
- any(), any(), any())).thenAnswer(__ ->
createOpReadEntry.get());
+ any(), any(), any(), anyBoolean())).thenAnswer(__ ->
createOpReadEntry.get());
final ManagedLedgerConfig ledgerConfig = new ManagedLedgerConfig();
ledgerConfig.setNewEntriesCheckDelayInMillis(10);
@@ -5468,6 +5472,77 @@ public class ManagedCursorTest extends
MockedBookKeeperTestCase {
}).toList(), IntStream.range(0, 10).mapToObj(i -> "msg-" +
i).toList());
}
+ @Test
+ public void testSkipOpenLedgerFullyAcked() throws Exception {
+ ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
+ managedLedgerConfig.setMaxEntriesPerLedger(10);
+ managedLedgerConfig.setMinimumRolloverTime(0, TimeUnit.MILLISECONDS);
+ ManagedLedger ledger = factory.open("testSkipOpenLedgerFullyAcked",
managedLedgerConfig);
+ ManagedCursor cursor = ledger.openCursor("cursor");
+
+ List<Position> positions = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ Position pos = ledger.addEntry(("entry-" + i).getBytes());
+ positions.add(pos);
+ }
+
+ ((ManagedLedgerImpl) ledger).rollCurrentLedgerIfFull();
+
+ for (int i = 10; i < 20; i++) {
+ Position pos = ledger.addEntry(("entry-" + i).getBytes());
+ positions.add(pos);
+ }
+
+ ManagedLedgerImpl ledgerImpl = (ManagedLedgerImpl) ledger;
+ ManagedCursorImpl cursorImpl = (ManagedCursorImpl) cursor;
+
+ for (int i = 0; i < 5; i++) {
+ cursor.markDelete(positions.get(i));
+ }
+
+ for (int i = 5; i < 10; i++) {
+ cursor.delete(positions.get(i));
+ }
+
+ long firstLedgerId = positions.get(0).getLedgerId();
+ long secondLedgerId = positions.get(10).getLedgerId();
+ log.info("First ledger id is {}, Second ledger id is {}",
firstLedgerId, secondLedgerId);
+
+ ManagedLedgerImpl spyLedger = spy(ledgerImpl);
+
+ Position readPosition = PositionFactory.create(firstLedgerId, 0);
+ CountDownLatch readLatch = new CountDownLatch(1);
+
+ ReadEntriesCallback callback = new ReadEntriesCallback() {
+ @Override
+ public void readEntriesComplete(List<Entry> entries, Object ctx) {
+ try {
+ if (!entries.isEmpty()) {
+ entries.forEach(Entry::release);
+ }
+ } finally {
+ readLatch.countDown();
+ }
+ }
+
+ @Override
+ public void readEntriesFailed(ManagedLedgerException exception,
Object ctx) {
+ log.error("Read failed", exception);
+ readLatch.countDown();
+ }
+ };
+
+ OpReadEntry opReadEntry = OpReadEntry.create(cursorImpl, readPosition,
5, callback, null, null, null, true);
+
+ spyLedger.asyncReadEntries(opReadEntry);
+
+ assertTrue(readLatch.await(10, TimeUnit.SECONDS));
+
+ verify(spyLedger, never()).getLedgerHandle(firstLedgerId);
+
+ ledger.close();
+ }
+
class TestPulsarMockBookKeeper extends PulsarMockBookKeeper {
Map<Long, Integer> ledgerErrors = new HashMap<>();
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 91cac258bc2..b8c268a5ba4 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -692,7 +692,7 @@ public class ManagedLedgerTest extends
MockedBookKeeperTestCase {
public void readEntriesFailed(ManagedLedgerException
exception, Object ctx) {
}
- }, null, maxPosition, null);
+ }, null, maxPosition, null, false);
Assert.assertEquals(opReadEntry.readPosition, position);
}