This is an automated email from the ASF dual-hosted git repository.
zhaocong pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new d3929765749 [improve][ml] Optimize ledger opening by skipping fully
acknowledged ledgers (#24655)
d3929765749 is described below
commit d39297657490caec19b22bdf8848a53504bf83f5
Author: Cong Zhao <[email protected]>
AuthorDate: Mon Aug 25 10:21:37 2025 +0800
[improve][ml] Optimize ledger opening by skipping fully acknowledged
ledgers (#24655)
(cherry picked from commit da0d11644f097ae657e79b2eb7835478d79ccd0e)
---
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 4 +-
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 63 +++++++++++++++++
.../bookkeeper/mledger/impl/OpReadEntry.java | 7 +-
.../org/apache/bookkeeper/mledger/impl/OpScan.java | 2 +-
.../bookkeeper/mledger/impl/ManagedCursorTest.java | 79 +++++++++++++++++++++-
.../bookkeeper/mledger/impl/ManagedLedgerTest.java | 2 +-
6 files changed, 150 insertions(+), 7 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 40acb40e98d..8a6af494b81 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -901,7 +901,7 @@ public class ManagedCursorImpl implements ManagedCursor {
// Skip deleted entries.
skipCondition = skipCondition == null ? this::isMessageDeleted :
skipCondition.or(this::isMessageDeleted);
OpReadEntry op =
- OpReadEntry.create(this, readPosition, numOfEntriesToRead,
callback, ctx, maxPosition, skipCondition);
+ OpReadEntry.create(this, readPosition, numOfEntriesToRead,
callback, ctx, maxPosition, skipCondition, true);
ledger.asyncReadEntries(op);
}
@@ -1061,7 +1061,7 @@ public class ManagedCursorImpl implements ManagedCursor {
// Skip deleted entries.
skipCondition = skipCondition == null ? this::isMessageDeleted :
skipCondition.or(this::isMessageDeleted);
OpReadEntry op = OpReadEntry.create(this, readPosition,
numberOfEntriesToRead, callback,
- ctx, maxPosition, skipCondition);
+ ctx, maxPosition, skipCondition, true);
int opReadId = op.id;
if (!WAITING_READ_OP_UPDATER.compareAndSet(this, null, op)) {
op.recycle();
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 960ae57db76..54f1da61430 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1945,6 +1945,19 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
return;
}
+ // Optimization: Check if all entries in this ledger have been
deleted (acknowledged)
+ // If so, skip opening the ledger and move to the next one
+ if (opReadEntry.cursor != null &&
opReadEntry.skipOpenLedgerFullyAcked
+ && isLedgerFullyAcked(ledgerId, ledgerInfo,
opReadEntry.cursor)) {
+ log.info("[{}] All entries in ledger {} have been acked,
skipping ledger opening", name, ledgerId);
+ // Move to the next ledger
+ Long nextLedgerId = ledgers.ceilingKey(ledgerId + 1);
+ opReadEntry.updateReadPosition(
+
PositionImpl.get(Objects.requireNonNullElseGet(nextLedgerId, () -> ledgerId +
1), 0));
+ opReadEntry.checkReadCompletion();
+ return;
+ }
+
// Get a ledger handle to read from
getLedgerHandle(ledgerId).thenAccept(ledger ->
internalReadFromLedger(ledger, opReadEntry)).exceptionally(ex
-> {
@@ -1957,6 +1970,56 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
}
}
+ /**
+ * Check if all entries in the specified ledger have been acknowledged by
the cursor.
+ * This optimization helps avoid opening ledgers that have no unacked
entries.
+ *
+ * @param ledgerId the ledger ID to check
+ * @param ledgerInfo the ledger information
+ * @param cursor the cursor reading from this ledger
+ * @return true if all entries in the ledger have been acknowledged, false
otherwise
+ */
+ private boolean isLedgerFullyAcked(long ledgerId, LedgerInfo ledgerInfo,
ManagedCursorImpl cursor) {
+ if (ledgerInfo == null || ledgerInfo.getEntries() == 0) {
+ return true;
+ }
+
+ // Get the cursor's mark delete position
+ Position markDeletedPosition = cursor.getMarkDeletedPosition();
+ if (markDeletedPosition == null) {
+ return false;
+ }
+
+ // If the mark delete position is in a later ledger, then this ledger
is fully acknowledged
+ if (markDeletedPosition.getLedgerId() > ledgerId) {
+ return true;
+ }
+
+ // Check if all entries in this ledger are individually deleted
+ if (markDeletedPosition.getLedgerId() <= ledgerId) {
+ final long lastEntryInLedger = ledgerInfo.getEntries() - 1;
+ Position startPosition = PositionImpl.get(ledgerId, 0);
+ if (markDeletedPosition.getLedgerId() == ledgerId) {
+ // The mark delete position represents the last acknowledged
entry
+ // If it points to the last entry in the ledger, then the
ledger is fully acknowledged
+ if (markDeletedPosition.getEntryId() >= lastEntryInLedger) {
+ return true;
+ }
+
+ startPosition = markDeletedPosition;
+ }
+
+ Range<PositionImpl> scanRange =
+ Range.closed((PositionImpl) startPosition,
PositionImpl.get(ledgerId, lastEntryInLedger));
+ long unackMessages = cursor.getNumberOfEntries(scanRange);
+ // All entries are individually deleted
+ return unackMessages == 0;
+ }
+
+ // If mark delete position is in an earlier ledger, this ledger is not
consumed
+ return false;
+ }
+
public CompletableFuture<String> getLedgerMetadata(long ledgerId) {
LedgerHandle currentLedger = this.currentLedger;
if (currentLedger != null && ledgerId == currentLedger.getId()) {
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
index 20c7742129b..a99269bd691 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
@@ -54,9 +54,12 @@ class OpReadEntry implements ReadEntriesCallback {
PositionImpl maxPosition;
Predicate<PositionImpl> skipCondition;
+ boolean skipOpenLedgerFullyAcked = false;
public static OpReadEntry create(ManagedCursorImpl cursor, PositionImpl
readPositionRef, int count,
- ReadEntriesCallback callback, Object ctx, PositionImpl
maxPosition, Predicate<PositionImpl> skipCondition) {
+ ReadEntriesCallback callback, Object ctx,
PositionImpl maxPosition,
+ Predicate<PositionImpl> skipCondition,
+ boolean skipOpenLedgerFullyAcked) {
OpReadEntry op = RECYCLER.get();
op.id = opReadIdGenerator.getAndIncrement();
op.readPosition =
cursor.ledger.startReadOperationOnLedger(readPositionRef);
@@ -69,6 +72,7 @@ class OpReadEntry implements ReadEntriesCallback {
}
op.maxPosition = maxPosition;
op.skipCondition = skipCondition;
+ op.skipOpenLedgerFullyAcked = skipOpenLedgerFullyAcked;
op.ctx = ctx;
op.nextReadPosition = PositionImpl.get(op.readPosition);
return op;
@@ -244,6 +248,7 @@ class OpReadEntry implements ReadEntriesCallback {
nextReadPosition = null;
maxPosition = null;
skipCondition = null;
+ skipOpenLedgerFullyAcked = false;
recyclerHandle.recycle(this);
}
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java
index 6d68b042a7a..8e1b4da54ae 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java
@@ -128,7 +128,7 @@ class OpScan implements ReadEntriesCallback {
}
if (cursor.hasMoreEntries(searchPosition)) {
OpReadEntry opReadEntry = OpReadEntry.create(cursor,
searchPosition, batchSize,
- this, OpScan.this.ctx, null, null);
+ this, OpScan.this.ctx, null, null, false);
ledger.asyncReadEntries(opReadEntry);
} else {
callback.scanComplete(lastSeenPosition, ScanOutcome.COMPLETED,
OpScan.this.ctx);
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 5f80cdf43b4..fff2a61530e 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -20,11 +20,15 @@ package org.apache.bookkeeper.mledger.impl;
import static
org.apache.bookkeeper.mledger.impl.EntryCountEstimator.estimateEntryCountByBytesSize;
import static
org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
+import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
@@ -4341,7 +4345,7 @@ public class ManagedCursorTest extends
MockedBookKeeperTestCase {
// op readPosition is bigger than maxReadPosition
OpReadEntry opReadEntry = OpReadEntry.create(cursor,
ledger.lastConfirmedEntry, 10, callback,
- null, PositionImpl.get(lastPosition.getLedgerId(), -1), null);
+ null, PositionImpl.get(lastPosition.getLedgerId(), -1), null,
true);
Field field = ManagedCursorImpl.class.getDeclaredField("readPosition");
field.setAccessible(true);
field.set(cursor, PositionImpl.EARLIEST);
@@ -4363,7 +4367,7 @@ public class ManagedCursorTest extends
MockedBookKeeperTestCase {
};
@Cleanup final MockedStatic<OpReadEntry> mockedStaticOpReadEntry =
Mockito.mockStatic(OpReadEntry.class);
- mockedStaticOpReadEntry.when(() -> OpReadEntry.create(any(), any(),
anyInt(), any(), any(), any(), any()))
+ mockedStaticOpReadEntry.when(() -> OpReadEntry.create(any(), any(),
anyInt(), any(), any(), any(), any(), anyBoolean()))
.thenAnswer(__ -> createOpReadEntry.get());
final ManagedLedgerConfig ledgerConfig = new ManagedLedgerConfig();
@@ -5340,5 +5344,76 @@ public class ManagedCursorTest extends
MockedBookKeeperTestCase {
assertEquals(future1.get(2, TimeUnit.SECONDS).get(0).getData(),
"msg".getBytes());
}
+ @Test
+ public void testSkipOpenLedgerFullyAcked() throws Exception {
+ ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
+ managedLedgerConfig.setMaxEntriesPerLedger(10);
+ managedLedgerConfig.setMinimumRolloverTime(0, TimeUnit.MILLISECONDS);
+ ManagedLedger ledger = factory.open("testSkipOpenLedgerFullyAcked",
managedLedgerConfig);
+ ManagedCursor cursor = ledger.openCursor("cursor");
+
+ List<Position> positions = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ Position pos = ledger.addEntry(("entry-" + i).getBytes());
+ positions.add(pos);
+ }
+
+ ((ManagedLedgerImpl) ledger).rollCurrentLedgerIfFull();
+
+ for (int i = 10; i < 20; i++) {
+ Position pos = ledger.addEntry(("entry-" + i).getBytes());
+ positions.add(pos);
+ }
+
+ ManagedLedgerImpl ledgerImpl = (ManagedLedgerImpl) ledger;
+ ManagedCursorImpl cursorImpl = (ManagedCursorImpl) cursor;
+
+ for (int i = 0; i < 5; i++) {
+ cursor.markDelete(positions.get(i));
+ }
+
+ for (int i = 5; i < 10; i++) {
+ cursor.delete(positions.get(i));
+ }
+
+ long firstLedgerId = positions.get(0).getLedgerId();
+ long secondLedgerId = positions.get(10).getLedgerId();
+ log.info("First ledger id is {}, Second ledger id is {}",
firstLedgerId, secondLedgerId);
+
+ ManagedLedgerImpl spyLedger = spy(ledgerImpl);
+
+ Position readPosition = PositionFactory.create(firstLedgerId, 0);
+ CountDownLatch readLatch = new CountDownLatch(1);
+
+ ReadEntriesCallback callback = new ReadEntriesCallback() {
+ @Override
+ public void readEntriesComplete(List<Entry> entries, Object ctx) {
+ try {
+ if (!entries.isEmpty()) {
+ entries.forEach(Entry::release);
+ }
+ } finally {
+ readLatch.countDown();
+ }
+ }
+
+ @Override
+ public void readEntriesFailed(ManagedLedgerException exception,
Object ctx) {
+ log.error("Read failed", exception);
+ readLatch.countDown();
+ }
+ };
+
+ OpReadEntry opReadEntry = OpReadEntry.create(cursorImpl, readPosition,
5, callback, null, null, null, true);
+
+ spyLedger.asyncReadEntries(opReadEntry);
+
+ assertTrue(readLatch.await(10, TimeUnit.SECONDS));
+
+ verify(spyLedger, never()).getLedgerHandle(firstLedgerId);
+
+ ledger.close();
+ }
+
private static final Logger log =
LoggerFactory.getLogger(ManagedCursorTest.class);
}
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 8972ade225f..ca4abdc41b8 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -686,7 +686,7 @@ public class ManagedLedgerTest extends
MockedBookKeeperTestCase {
public void readEntriesFailed(ManagedLedgerException
exception, Object ctx) {
}
- }, null, maxPosition, null);
+ }, null, maxPosition, null, false);
Assert.assertEquals(opReadEntry.readPosition, position);
}