This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit d8fd80ffdc7faffd97c58ecd1205af824793a408
Author: Penghui Li <[email protected]>
AuthorDate: Thu Jul 17 11:42:10 2025 -0700

    [fix][broker] Fix wrong backlog age metrics when the mark delete position 
point to a deleted ledger (#24518)
    
    (cherry picked from commit 7282c06c55c7991123b964d19561e07913bf3ac4)
---
 .../broker/service/persistent/PersistentTopic.java | 11 +++++-
 .../PersistentTopicProtectedMethodsTest.java       | 44 +++++++++++++++++++++-
 2 files changed, 53 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index ceeb6533685..e44e05f0024 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -3700,7 +3700,8 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         PositionImpl oldestMarkDeletePosition = 
oldestMarkDeleteCursorInfo.getPosition();
         OldestPositionInfo lastOldestPositionInfo = oldestPositionInfo;
         if (lastOldestPositionInfo != null
-            && 
oldestMarkDeletePosition.compareTo(lastOldestPositionInfo.getOldestCursorMarkDeletePosition())
 == 0) {
+            && 
oldestMarkDeletePosition.compareTo(lastOldestPositionInfo.getOldestCursorMarkDeletePosition())
 == 0
+            && oldestMarkDeletePosition.compareTo(((ManagedLedgerImpl) 
ledger).getFirstPosition()) >= 0) {
             // Same position, but the cursor causing it has changed?
             if 
(!lastOldestPositionInfo.getCursorName().equals(oldestMarkDeleteCursorInfo.getCursor().getName()))
 {
                 updateResultIfNewer(new OldestPositionInfo(
@@ -3830,6 +3831,14 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo
                 markDeletePositionLedgerInfo = 
ledger.getLedgerInfo(markDeletePosition.getLedgerId()).get();
 
+        // If markDeletePositionLedgerInfo is null (ledger no longer exists 
due to retention/cleanup),
+        // use the next valid position instead to get a meaningful timestamp
+        if (markDeletePositionLedgerInfo == null) {
+            PositionImpl nextValidPosition = ((ManagedLedgerImpl) 
ledger).getNextValidPosition(markDeletePosition);
+            markDeletePositionLedgerInfo = 
ledger.getLedgerInfo(nextValidPosition.getLedgerId()).get();
+            markDeletePosition = nextValidPosition;
+        }
+
         
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo 
positionToCheckLedgerInfo =
                 markDeletePositionLedgerInfo;
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicProtectedMethodsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicProtectedMethodsTest.java
index b582eb94d12..d6494ef1d8b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicProtectedMethodsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicProtectedMethodsTest.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.service.persistent;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
 import java.util.concurrent.CompletableFuture;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
@@ -51,7 +52,6 @@ public class PersistentTopicProtectedMethodsTest extends 
ProducerConsumerBase {
     }
 
     protected void doInitConf() throws Exception {
-        this.conf.setPreciseTimeBasedBacklogQuotaCheck(true);
         this.conf.setManagedLedgerMaxEntriesPerLedger(2);
         this.conf.setManagedLedgerMaxLedgerRolloverTimeMinutes(10);
         this.conf.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
@@ -111,4 +111,46 @@ public class PersistentTopicProtectedMethodsTest extends 
ProducerConsumerBase {
         c1.close();
         admin.topics().delete(tp, false);
     }
+
+    @Test
+    public void testEstimatedTimeBasedBacklogQuotaCheckWithTopicUnloading() 
throws Exception {
+        final String tp = 
BrokerTestUtil.newUniqueName("public/default/tp-with-topic-unloading");
+        admin.topics().createNonPartitionedTopic(tp);
+
+        Consumer<byte[]> c1 = 
pulsarClient.newConsumer().topic(tp).subscriptionName("s1").subscribe();
+        Producer<byte[]> p1 = pulsarClient.newProducer().topic(tp).create();
+
+        byte[] content = new byte[]{1};
+        for (int i = 0; i < 10; i++) {
+            p1.send(content);
+        }
+
+        PersistentTopic persistentTopic = (PersistentTopic) 
pulsar.getBrokerService().getTopic(tp, false).join().get();
+
+        Awaitility.await().untilAsserted(() -> {
+            admin.brokers().backlogQuotaCheck();
+            
assertTrue(persistentTopic.getBestEffortOldestUnacknowledgedMessageAgeSeconds() 
> 0);
+        });
+
+        for (int i = 0; i < 10; i++) {
+            c1.acknowledge(c1.receive());
+        }
+
+        Awaitility.await().untilAsserted(() -> 
assertEquals(persistentTopic.getBacklogSize(), 0));
+        admin.topics().unload(tp);
+        for (int i = 0; i < 10; i++) {
+            p1.send(content);
+        }
+
+        PersistentTopic persistentTopicNew = (PersistentTopic) 
pulsar.getBrokerService()
+                .getTopic(tp, false).join().get();
+        Awaitility.await().untilAsserted(() -> {
+            admin.brokers().backlogQuotaCheck();
+            
assertTrue(persistentTopicNew.getBestEffortOldestUnacknowledgedMessageAgeSeconds()
 > 0);
+        });
+
+        p1.close();
+        c1.close();
+        admin.topics().delete(tp, false);
+    }
 }

Reply via email to