This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit d8fd80ffdc7faffd97c58ecd1205af824793a408 Author: Penghui Li <[email protected]> AuthorDate: Thu Jul 17 11:42:10 2025 -0700 [fix][broker] Fix wrong backlog age metrics when the mark delete position point to a deleted ledger (#24518) (cherry picked from commit 7282c06c55c7991123b964d19561e07913bf3ac4) --- .../broker/service/persistent/PersistentTopic.java | 11 +++++- .../PersistentTopicProtectedMethodsTest.java | 44 +++++++++++++++++++++- 2 files changed, 53 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index ceeb6533685..e44e05f0024 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -3700,7 +3700,8 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal PositionImpl oldestMarkDeletePosition = oldestMarkDeleteCursorInfo.getPosition(); OldestPositionInfo lastOldestPositionInfo = oldestPositionInfo; if (lastOldestPositionInfo != null - && oldestMarkDeletePosition.compareTo(lastOldestPositionInfo.getOldestCursorMarkDeletePosition()) == 0) { + && oldestMarkDeletePosition.compareTo(lastOldestPositionInfo.getOldestCursorMarkDeletePosition()) == 0 + && oldestMarkDeletePosition.compareTo(((ManagedLedgerImpl) ledger).getFirstPosition()) >= 0) { // Same position, but the cursor causing it has changed? if (!lastOldestPositionInfo.getCursorName().equals(oldestMarkDeleteCursorInfo.getCursor().getName())) { updateResultIfNewer(new OldestPositionInfo( @@ -3830,6 +3831,14 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo markDeletePositionLedgerInfo = ledger.getLedgerInfo(markDeletePosition.getLedgerId()).get(); + // If markDeletePositionLedgerInfo is null (ledger no longer exists due to retention/cleanup), + // use the next valid position instead to get a meaningful timestamp + if (markDeletePositionLedgerInfo == null) { + PositionImpl nextValidPosition = ((ManagedLedgerImpl) ledger).getNextValidPosition(markDeletePosition); + markDeletePositionLedgerInfo = ledger.getLedgerInfo(nextValidPosition.getLedgerId()).get(); + markDeletePosition = nextValidPosition; + } + org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo positionToCheckLedgerInfo = markDeletePositionLedgerInfo; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicProtectedMethodsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicProtectedMethodsTest.java index b582eb94d12..d6494ef1d8b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicProtectedMethodsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicProtectedMethodsTest.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.service.persistent; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; import java.util.concurrent.CompletableFuture; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; @@ -51,7 +52,6 @@ public class PersistentTopicProtectedMethodsTest extends ProducerConsumerBase { } protected void doInitConf() throws Exception { - this.conf.setPreciseTimeBasedBacklogQuotaCheck(true); this.conf.setManagedLedgerMaxEntriesPerLedger(2); this.conf.setManagedLedgerMaxLedgerRolloverTimeMinutes(10); this.conf.setManagedLedgerMinLedgerRolloverTimeMinutes(0); @@ -111,4 +111,46 @@ public class PersistentTopicProtectedMethodsTest extends ProducerConsumerBase { c1.close(); admin.topics().delete(tp, false); } + + @Test + public void testEstimatedTimeBasedBacklogQuotaCheckWithTopicUnloading() throws Exception { + final String tp = BrokerTestUtil.newUniqueName("public/default/tp-with-topic-unloading"); + admin.topics().createNonPartitionedTopic(tp); + + Consumer<byte[]> c1 = pulsarClient.newConsumer().topic(tp).subscriptionName("s1").subscribe(); + Producer<byte[]> p1 = pulsarClient.newProducer().topic(tp).create(); + + byte[] content = new byte[]{1}; + for (int i = 0; i < 10; i++) { + p1.send(content); + } + + PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopic(tp, false).join().get(); + + Awaitility.await().untilAsserted(() -> { + admin.brokers().backlogQuotaCheck(); + assertTrue(persistentTopic.getBestEffortOldestUnacknowledgedMessageAgeSeconds() > 0); + }); + + for (int i = 0; i < 10; i++) { + c1.acknowledge(c1.receive()); + } + + Awaitility.await().untilAsserted(() -> assertEquals(persistentTopic.getBacklogSize(), 0)); + admin.topics().unload(tp); + for (int i = 0; i < 10; i++) { + p1.send(content); + } + + PersistentTopic persistentTopicNew = (PersistentTopic) pulsar.getBrokerService() + .getTopic(tp, false).join().get(); + Awaitility.await().untilAsserted(() -> { + admin.brokers().backlogQuotaCheck(); + assertTrue(persistentTopicNew.getBestEffortOldestUnacknowledgedMessageAgeSeconds() > 0); + }); + + p1.close(); + c1.close(); + admin.topics().delete(tp, false); + } }
