This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7282c06c55c [fix][broker] Fix wrong backlog age metrics when the mark
delete position point to a deleted ledger (#24518)
7282c06c55c is described below
commit 7282c06c55c7991123b964d19561e07913bf3ac4
Author: Penghui Li <[email protected]>
AuthorDate: Thu Jul 17 11:42:10 2025 -0700
[fix][broker] Fix wrong backlog age metrics when the mark delete position
point to a deleted ledger (#24518)
---
.../broker/service/persistent/PersistentTopic.java | 11 +++++-
.../PersistentTopicProtectedMethodsTest.java | 44 +++++++++++++++++++++-
2 files changed, 53 insertions(+), 2 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index fb41bc52ea6..70765548b98 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -3651,7 +3651,8 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
Position oldestMarkDeletePosition =
oldestMarkDeleteCursorInfo.getPosition();
OldestPositionInfo lastOldestPositionInfo = oldestPositionInfo;
if (lastOldestPositionInfo != null
- &&
oldestMarkDeletePosition.compareTo(lastOldestPositionInfo.getOldestCursorMarkDeletePosition())
== 0) {
+ &&
oldestMarkDeletePosition.compareTo(lastOldestPositionInfo.getOldestCursorMarkDeletePosition())
== 0
+ && oldestMarkDeletePosition.compareTo(ledger.getFirstPosition())
>= 0) {
// Same position, but the cursor causing it has changed?
if
(!lastOldestPositionInfo.getCursorName().equals(oldestMarkDeleteCursorInfo.getCursor().getName()))
{
updateResultIfNewer(new OldestPositionInfo(
@@ -3780,6 +3781,14 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo
markDeletePositionLedgerInfo =
ledger.getLedgerInfo(markDeletePosition.getLedgerId()).get();
+ // If markDeletePositionLedgerInfo is null (ledger no longer exists
due to retention/cleanup),
+ // use the next valid position instead to get a meaningful timestamp
+ if (markDeletePositionLedgerInfo == null) {
+ Position nextValidPosition =
ledger.getNextValidPosition(markDeletePosition);
+ markDeletePositionLedgerInfo =
ledger.getLedgerInfo(nextValidPosition.getLedgerId()).get();
+ markDeletePosition = nextValidPosition;
+ }
+
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo
positionToCheckLedgerInfo =
markDeletePositionLedgerInfo;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicProtectedMethodsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicProtectedMethodsTest.java
index 1d841483ed7..bba0d2050d8 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicProtectedMethodsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicProtectedMethodsTest.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.broker.service.persistent;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Position;
@@ -51,7 +52,6 @@ public class PersistentTopicProtectedMethodsTest extends
ProducerConsumerBase {
}
protected void doInitConf() throws Exception {
- this.conf.setPreciseTimeBasedBacklogQuotaCheck(true);
this.conf.setManagedLedgerMaxEntriesPerLedger(2);
this.conf.setManagedLedgerMaxLedgerRolloverTimeMinutes(10);
this.conf.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
@@ -111,4 +111,46 @@ public class PersistentTopicProtectedMethodsTest extends
ProducerConsumerBase {
c1.close();
admin.topics().delete(tp, false);
}
+
+ @Test
+ public void testEstimatedTimeBasedBacklogQuotaCheckWithTopicUnloading()
throws Exception {
+ final String tp =
BrokerTestUtil.newUniqueName("public/default/tp-with-topic-unloading");
+ admin.topics().createNonPartitionedTopic(tp);
+
+ Consumer<byte[]> c1 =
pulsarClient.newConsumer().topic(tp).subscriptionName("s1").subscribe();
+ Producer<byte[]> p1 = pulsarClient.newProducer().topic(tp).create();
+
+ byte[] content = new byte[]{1};
+ for (int i = 0; i < 10; i++) {
+ p1.send(content);
+ }
+
+ PersistentTopic persistentTopic = (PersistentTopic)
pulsar.getBrokerService().getTopic(tp, false).join().get();
+
+ Awaitility.await().untilAsserted(() -> {
+ admin.brokers().backlogQuotaCheck();
+
assertTrue(persistentTopic.getBestEffortOldestUnacknowledgedMessageAgeSeconds()
> 0);
+ });
+
+ for (int i = 0; i < 10; i++) {
+ c1.acknowledge(c1.receive());
+ }
+
+ Awaitility.await().untilAsserted(() ->
assertEquals(persistentTopic.getBacklogSize(), 0));
+ admin.topics().unload(tp);
+ for (int i = 0; i < 10; i++) {
+ p1.send(content);
+ }
+
+ PersistentTopic persistentTopicNew = (PersistentTopic)
pulsar.getBrokerService()
+ .getTopic(tp, false).join().get();
+ Awaitility.await().untilAsserted(() -> {
+ admin.brokers().backlogQuotaCheck();
+
assertTrue(persistentTopicNew.getBestEffortOldestUnacknowledgedMessageAgeSeconds()
> 0);
+ });
+
+ p1.close();
+ c1.close();
+ admin.topics().delete(tp, false);
+ }
}