This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new c969b02cf5c [fix][ml] Fix `getNumberOfEntries` may point to deleted
ledger (#24852)
c969b02cf5c is described below
commit c969b02cf5cb429ae03e9b6fa595ff13a5747e5e
Author: Jiwei Guo <[email protected]>
AuthorDate: Fri Oct 17 11:02:19 2025 +0800
[fix][ml] Fix `getNumberOfEntries` may point to deleted ledger (#24852)
---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 16 +++++++----
.../bookkeeper/mledger/impl/ManagedLedgerTest.java | 32 ++++++++++++++++++++++
.../apache/pulsar/compaction/CompactionTest.java | 2 +-
3 files changed, 44 insertions(+), 6 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 7faf6047e02..960ae57db76 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -3621,11 +3621,17 @@ public class ManagedLedgerImpl implements
ManagedLedger, CreateCallback {
boolean toIncluded = range.upperBoundType() == BoundType.CLOSED;
if (fromPosition.getLedgerId() == toPosition.getLedgerId()) {
- // If the 2 positions are in the same ledger
- long count = toPosition.getEntryId() - fromPosition.getEntryId() -
1;
- count += fromIncluded ? 1 : 0;
- count += toIncluded ? 1 : 0;
- return count;
+ LedgerInfo li = ledgers.get(toPosition.getLedgerId());
+ if (li != null) {
+ // If the 2 positions are in the same ledger
+ long count = toPosition.getEntryId() -
fromPosition.getEntryId() - 1;
+ count += fromIncluded ? 1 : 0;
+ count += toIncluded ? 1 : 0;
+ return count;
+ } else {
+ // if the ledgerId is not in the ledgers, it means it has been
deleted
+ return 0;
+ }
} else {
long count = 0;
// If the from & to are pointing to different ledgers, then we
need to :
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index fb671dcc8c0..2cc0e19a34c 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -42,6 +42,7 @@ import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertSame;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
+import com.google.common.collect.Range;
import com.google.common.collect.Sets;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
@@ -2659,6 +2660,37 @@ public class ManagedLedgerTest extends
MockedBookKeeperTestCase {
assertEquals(length, numberOfEntries);
}
+ @Test
+ public void testGetNumberOfEntries() throws Exception {
+ ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
+ initManagedLedgerConfig(managedLedgerConfig);
+ managedLedgerConfig.setMaxEntriesPerLedger(5);
+ ManagedLedgerImpl managedLedger =
+ (ManagedLedgerImpl) factory.open("testGetNumberOfEntries",
managedLedgerConfig);
+ // open cursor to prevent ledger to be deleted when ledger rollover
+ ManagedCursorImpl managedCursor = (ManagedCursorImpl)
managedLedger.openCursor("cursor");
+ int numberOfEntries = 10;
+ List<Position> positions = new ArrayList<>(numberOfEntries);
+ for (int i = 0; i < numberOfEntries; i++) {
+ positions.add(managedLedger.addEntry(("entry-" +
i).getBytes(Encoding)));
+ }
+ Position mdPos = positions.get(numberOfEntries - 1);
+ Position rdPos = PositionFactory.create(mdPos.getLedgerId(),
mdPos.getEntryId() + 1);
+ managedCursor.delete(positions);
+ // trigger ledger rollover and wait for the new ledger created
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals("LedgerOpened",
WhiteboxImpl.getInternalState(managedLedger, "state").toString());
+ });
+ managedLedger.rollCurrentLedgerIfFull();
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(managedLedger.getLedgersInfo().size(), 1);
+ assertEquals(managedLedger.getState(),
ManagedLedgerImpl.State.LedgerOpened);
+ });
+
+ long length = managedLedger.getNumberOfEntries(Range.closed(mdPos,
rdPos));
+ assertEquals(length, 0);
+ }
+
@Test
public void testEstimatedBacklogSize() throws Exception {
ManagedLedgerImpl ledger = (ManagedLedgerImpl)
factory.open("testEstimatedBacklogSize");
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index 6f697d9c07e..696875c22ee 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -2265,7 +2265,7 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
Awaitility.await().untilAsserted(() ->
assertEquals(admin.topics().getStats(topicName,
true).getSubscriptions().get(subName).getMsgBacklog(),
- 5));
+ 0));
// Make consumer reconnect to broker
admin.topics().unload(topicName);