This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit f97f3650972fedc29164f000bc6d32ffcb977a9e Author: Baodi Shi <[email protected]> AuthorDate: Fri Oct 31 18:29:02 2025 +0800 [fix][broker] BacklogMessageAge is not reset when cursor mdPosition is on an open ledger (#24915) (cherry picked from commit 54da0c8cef54480b09272db34e770894ff0ef474) --- .../broker/service/persistent/PersistentTopic.java | 2 + .../broker/service/BacklogQuotaManagerTest.java | 55 +++++++++++++++++----- 2 files changed, 44 insertions(+), 13 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 9486c204cf1..e82e0730680 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -3826,6 +3826,8 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal oldestMarkDeleteCursorInfo.getCursor().getName(), checkResult.getEstimatedOldestUnacknowledgedMessageTimestamp(), oldestMarkDeleteCursorInfo.getVersion())); + } else { + TIME_BASED_BACKLOG_QUOTA_CHECK_RESULT_UPDATER.set(this, null); } return CompletableFuture.completedFuture(null); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java index 3c8dcfdc12f..19383d50f2e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java @@ -46,7 +46,9 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.atomic.AtomicBoolean; import lombok.Cleanup; +import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.impl.ManagedCursorContainer; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.BrokerTestUtil; @@ -151,6 +153,7 @@ public class BacklogQuotaManagerTest { config.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA); config.setManagedLedgerMaxEntriesPerLedger(MAX_ENTRIES_PER_LEDGER); config.setManagedLedgerMinLedgerRolloverTimeMinutes(0); + config.setManagedLedgerDefaultMarkDeleteRateLimit(1000); config.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED); config.setSystemTopicEnabled(true); config.setTopicLevelPoliciesEnabled(true); @@ -704,7 +707,7 @@ public class BacklogQuotaManagerTest { final String topic1 = "persistent://prop/ns-quota/topic2" + UUID.randomUUID(); final String subName1 = "c1"; - final int numMsgs = 5; + final int numMsgs = 7; Consumer<byte[]> consumer1 = client.newConsumer().topic(topic1).subscriptionName(subName1) .acknowledgmentGroupTime(0, SECONDS) @@ -712,27 +715,35 @@ public class BacklogQuotaManagerTest { Producer<byte[]> producer = createProducer(client, topic1); byte[] content = new byte[1024]; + // 1. Send messages + // The manager ledger max entries is 5, so we can send 7 messages to make sure we have multiple ledgers + // When send msg 4, the ledger closed. + // Second: 1 2 3 4 5 6 7 + // msg idx: [0 1 2 3 4] [5 6] for (int i = 0; i < numMsgs; i++) { - Thread.sleep(3000); // Guarantees if we use wrong message in age, to show up in failed test - producer.send(content); + Thread.sleep(1000); + MessageId send = producer.send(content); } + long lastLedgerCloseTime = System.currentTimeMillis() - 2000; + // 2. Receive msg-0 and ack it. + String c1MarkDeletePositionBefore = + admin.topics().getInternalStats(topic1).cursors.get(subName1).markDeletePosition; Message<byte[]> oldestMessage = consumer1.receive(); consumer1.acknowledge(oldestMessage); - log.info("Moved subscription 1, by 1 message"); - - // Unload topic to trigger the ledger close - unloadAndLoadTopic(topic1, producer); - long unloadTime = System.currentTimeMillis(); - waitForQuotaCheckToRunTwice(); + c1MarkDeletePositionBefore = waitForMarkDeletePositionToChange(topic1, subName1, + c1MarkDeletePositionBefore); + log.info("Moved subscription 1, by 1 message {}", oldestMessage.getMessageId()); - Metrics metrics = prometheusMetricsClient.getMetrics(); + // 3. Expected the oldestBacklogMessageAgeSeconds is based on last ledger close time + long expectedMessageAgeSeconds = + MILLISECONDS.toSeconds(System.currentTimeMillis() - lastLedgerCloseTime); + PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic1).get(); + topicRef.updateOldPositionInfo(); TopicStats topicStats = getTopicStats(topic1); - - long expectedMessageAgeSeconds = MILLISECONDS.toSeconds(System.currentTimeMillis() - unloadTime); assertThat(topicStats.getOldestBacklogMessageAgeSeconds()) .isCloseTo(expectedMessageAgeSeconds, within(1L)); - + Metrics metrics = prometheusMetricsClient.getMetrics(); Metric backlogAgeMetric = metrics.findSingleMetricByNameAndLabels("pulsar_storage_backlog_age_seconds", Pair.of("topic", topic1)); @@ -741,6 +752,24 @@ public class BacklogQuotaManagerTest { entry("namespace", namespace), entry("topic", topic1)); assertThat((long) backlogAgeMetric.value).isCloseTo(expectedMessageAgeSeconds, within(2L)); + + // 4. Move consumer to `end - 1`, then OldestBacklogMessageAgeSeconds should be `-1`, because the + // second ledger is not closed yet. + for (int i = 1; i < numMsgs - 1; i++) { + Message<byte[]> msg = consumer1.receive(); + consumer1.acknowledge(msg); + } + waitForMarkDeletePositionToChange(topic1, subName1, c1MarkDeletePositionBefore); + ManagedCursorContainer cursors = (ManagedCursorContainer) topicRef.getManagedLedger().getCursors(); + ManagedCursor subCursor = cursors.get(subName1); + Awaitility.await().pollInterval(100, MILLISECONDS).atMost(5, SECONDS).until( + () -> subCursor.getMarkDeletedPosition().equals(subCursor.getPersistentMarkDeletedPosition())); + topicRef.updateOldPositionInfo(); + topicStats = getTopicStats(topic1, true); + assertThat(topicStats.getSubscriptions().get(subName1).getMsgBacklog()) + .isEqualTo(1L); + assertThat(topicStats.getOldestBacklogMessageAgeSeconds()) + .isEqualTo(-1L); } }
