This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new c969b02cf5c [fix][ml] Fix `getNumberOfEntries` may point to deleted 
ledger (#24852)
c969b02cf5c is described below

commit c969b02cf5cb429ae03e9b6fa595ff13a5747e5e
Author: Jiwei Guo <[email protected]>
AuthorDate: Fri Oct 17 11:02:19 2025 +0800

    [fix][ml] Fix `getNumberOfEntries` may point to deleted ledger (#24852)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 16 +++++++----
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 32 ++++++++++++++++++++++
 .../apache/pulsar/compaction/CompactionTest.java   |  2 +-
 3 files changed, 44 insertions(+), 6 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 7faf6047e02..960ae57db76 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -3621,11 +3621,17 @@ public class ManagedLedgerImpl implements 
ManagedLedger, CreateCallback {
         boolean toIncluded = range.upperBoundType() == BoundType.CLOSED;
 
         if (fromPosition.getLedgerId() == toPosition.getLedgerId()) {
-            // If the 2 positions are in the same ledger
-            long count = toPosition.getEntryId() - fromPosition.getEntryId() - 
1;
-            count += fromIncluded ? 1 : 0;
-            count += toIncluded ? 1 : 0;
-            return count;
+            LedgerInfo li = ledgers.get(toPosition.getLedgerId());
+            if (li != null) {
+                // If the 2 positions are in the same ledger
+                long count = toPosition.getEntryId() - 
fromPosition.getEntryId() - 1;
+                count += fromIncluded ? 1 : 0;
+                count += toIncluded ? 1 : 0;
+                return count;
+            } else {
+                // if the ledgerId is not in the ledgers, it means it has been 
deleted
+                return 0;
+            }
         } else {
             long count = 0;
             // If the from & to are pointing to different ledgers, then we 
need to :
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index fb671dcc8c0..2cc0e19a34c 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -42,6 +42,7 @@ import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertSame;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
+import com.google.common.collect.Range;
 import com.google.common.collect.Sets;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
@@ -2659,6 +2660,37 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
         assertEquals(length, numberOfEntries);
     }
 
+    @Test
+    public void testGetNumberOfEntries() throws Exception {
+        ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
+        initManagedLedgerConfig(managedLedgerConfig);
+        managedLedgerConfig.setMaxEntriesPerLedger(5);
+        ManagedLedgerImpl managedLedger =
+                (ManagedLedgerImpl) factory.open("testGetNumberOfEntries", 
managedLedgerConfig);
+        // open cursor to prevent ledger to be deleted when ledger rollover
+        ManagedCursorImpl managedCursor = (ManagedCursorImpl) 
managedLedger.openCursor("cursor");
+        int numberOfEntries = 10;
+        List<Position> positions = new ArrayList<>(numberOfEntries);
+        for (int i = 0; i < numberOfEntries; i++) {
+            positions.add(managedLedger.addEntry(("entry-" + 
i).getBytes(Encoding)));
+        }
+        Position mdPos = positions.get(numberOfEntries - 1);
+        Position rdPos = PositionFactory.create(mdPos.getLedgerId(), 
mdPos.getEntryId() + 1);
+        managedCursor.delete(positions);
+        // trigger ledger rollover and wait for the new ledger created
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals("LedgerOpened", 
WhiteboxImpl.getInternalState(managedLedger, "state").toString());
+        });
+        managedLedger.rollCurrentLedgerIfFull();
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(managedLedger.getLedgersInfo().size(), 1);
+            assertEquals(managedLedger.getState(), 
ManagedLedgerImpl.State.LedgerOpened);
+        });
+
+        long length = managedLedger.getNumberOfEntries(Range.closed(mdPos, 
rdPos));
+        assertEquals(length, 0);
+    }
+
     @Test
     public void testEstimatedBacklogSize() throws Exception {
         ManagedLedgerImpl ledger = (ManagedLedgerImpl) 
factory.open("testEstimatedBacklogSize");
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index 6f697d9c07e..696875c22ee 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -2265,7 +2265,7 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
 
         Awaitility.await().untilAsserted(() ->
                 assertEquals(admin.topics().getStats(topicName, 
true).getSubscriptions().get(subName).getMsgBacklog(),
-                        5));
+                        0));
 
         // Make consumer reconnect to broker
         admin.topics().unload(topicName);

Reply via email to