This is an automated email from the ASF dual-hosted git repository.

baodi pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 2741ef3d395 [fix][broker] Fix topic status for 
oldestBacklogMessageAgeSeconds continuously increases even when there is no 
backlog. (#22907)
2741ef3d395 is described below

commit 2741ef3d395b8458c4d483f772624a89896ee9c7
Author: Baodi Shi <ba...@apache.org>
AuthorDate: Fri Jun 14 20:05:09 2024 +0800

    [fix][broker] Fix topic status for oldestBacklogMessageAgeSeconds 
continuously increases even when there is no backlog. (#22907)
    
    (cherry picked from commit 6831231e7aeffa39c4d79f5983ef9dc7ba25c449)
---
 .../broker/service/persistent/PersistentTopic.java |  24 +--
 .../broker/service/BacklogQuotaManagerTest.java    | 180 ++++++++++++++++++++-
 2 files changed, 194 insertions(+), 10 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 40f4814d6f4..56e3603ad6e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1462,7 +1462,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                     return FutureUtil.failedFuture(
                             new TopicBusyException("Topic has subscriptions: " 
+ subscriptions.keys()));
                 } else if (failIfHasBacklogs) {
-                    if (hasBacklogs()) {
+                    if (hasBacklogs(false)) {
                         List<String> backlogSubs =
                                 subscriptions.values().stream()
                                         .filter(sub -> 
sub.getNumberOfEntriesInBacklog(false) > 0)
@@ -2633,12 +2633,9 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         stats.backlogQuotaLimitTime = 
getBacklogQuota(BacklogQuotaType.message_age).getLimitTime();
 
         TimeBasedBacklogQuotaCheckResult backlogQuotaCheckResult = 
timeBasedBacklogQuotaCheckResult;
-        stats.oldestBacklogMessageAgeSeconds = (backlogQuotaCheckResult == 
null)
-            ? (long) -1
-                : TimeUnit.MILLISECONDS.toSeconds(
-                Clock.systemUTC().millis() - 
backlogQuotaCheckResult.getPositionPublishTimestampInMillis());
-
+        stats.oldestBacklogMessageAgeSeconds = 
getBestEffortOldestUnacknowledgedMessageAgeSeconds();
         stats.oldestBacklogMessageSubscriptionName = (backlogQuotaCheckResult 
== null)
+                || !hasBacklogs(getStatsOptions.isGetPreciseBacklog())
             ? null
             : backlogQuotaCheckResult.getCursorName();
 
@@ -2896,7 +2893,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                 }
                 break;
             case delete_when_subscriptions_caught_up:
-                if (hasBacklogs()) {
+                if (hasBacklogs(false)) {
                     return true;
                 }
                 break;
@@ -2909,8 +2906,8 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         }
     }
 
-    private boolean hasBacklogs() {
-        return subscriptions.values().stream().anyMatch(sub -> 
sub.getNumberOfEntriesInBacklog(false) > 0);
+    private boolean hasBacklogs(boolean getPreciseBacklog) {
+        return subscriptions.values().stream().anyMatch(sub -> 
sub.getNumberOfEntriesInBacklog(getPreciseBacklog) > 0);
     }
 
     @Override
@@ -3456,6 +3453,9 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
 
     @Override
     public long getBestEffortOldestUnacknowledgedMessageAgeSeconds() {
+        if (!hasBacklogs(false)) {
+            return 0;
+        }
         TimeBasedBacklogQuotaCheckResult result = 
timeBasedBacklogQuotaCheckResult;
         if (result == null) {
             return -1;
@@ -3543,6 +3543,9 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         }
 
         if 
(brokerService.pulsar().getConfiguration().isPreciseTimeBasedBacklogQuotaCheck())
 {
+            if (!hasBacklogs(true)) {
+                return CompletableFuture.completedFuture(false);
+            }
             CompletableFuture<Boolean> future = new CompletableFuture<>();
             // Check if first unconsumed message(first message after mark 
delete position)
             // for slowest cursor's has expired.
@@ -3596,6 +3599,9 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
             return future;
         } else {
             try {
+                if (!hasBacklogs(false)) {
+                    return CompletableFuture.completedFuture(false);
+                }
                 EstimateTimeBasedBacklogQuotaCheckResult checkResult =
                         
estimatedTimeBasedBacklogQuotaCheck(oldestMarkDeletePosition);
                 if 
(checkResult.getEstimatedOldestUnacknowledgedMessageTimestamp() != null) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
index 6be7023b161..56f9f4f9124 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
@@ -29,6 +29,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.AssertionsForClassTypes.within;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import com.google.common.collect.Sets;
@@ -296,8 +297,12 @@ public class BacklogQuotaManagerTest {
     }
 
     private TopicStats getTopicStats(String topic1) throws 
PulsarAdminException {
+        return getTopicStats(topic1, true);
+    }
+
+    private TopicStats getTopicStats(String topic1, boolean getPreciseBacklog) 
throws PulsarAdminException {
         TopicStats stats =
-                admin.topics().getStats(topic1, 
GetStatsOptions.builder().getPreciseBacklog(true).build());
+                admin.topics().getStats(topic1, 
GetStatsOptions.builder().getPreciseBacklog(getPreciseBacklog).build());
         return stats;
     }
 
@@ -502,9 +507,117 @@ public class BacklogQuotaManagerTest {
 
             // Cache should be used, since position hasn't changed
             assertThat(getReadEntries(topic1)).isEqualTo(readEntries);
+
+            // Move subscription 1 and 2 to end
+            Message<byte[]> msg = consumer1.receive();
+            consumer1.acknowledge(msg);
+            consumer2.acknowledge(secondOldestMessage);
+            for (int i = 0; i < 2; i++) {
+                Message<byte[]> message = consumer2.receive();
+                log.info("Subscription 2 about to ack message ID {}", 
message.getMessageId());
+                consumer2.acknowledge(message);
+            }
+
+            log.info("Subscription 1 and 2 moved to end. Now should not 
backlog");
+            waitForMarkDeletePositionToChange(topic1, subName1, 
c1MarkDeletePositionBefore);
+            waitForQuotaCheckToRunTwice();
+
+            topicStats = getTopicStats(topic1);
+            assertThat(topicStats.getBacklogSize()).isEqualTo(0);
+            
assertThat(topicStats.getSubscriptions().get(subName1).getMsgBacklog()).isEqualTo(0);
+            
assertThat(topicStats.getSubscriptions().get(subName2).getMsgBacklog()).isEqualTo(0);
+            
assertThat(topicStats.getOldestBacklogMessageAgeSeconds()).isEqualTo(0);
+            
assertThat(topicStats.getOldestBacklogMessageSubscriptionName()).isNull();
+
+            metrics = prometheusMetricsClient.getMetrics();
+            backlogAgeMetric =
+                    
metrics.findSingleMetricByNameAndLabels("pulsar_storage_backlog_age_seconds",
+                            Pair.of("topic", topic1));
+            assertThat(backlogAgeMetric.tags).containsExactly(
+                    entry("cluster", CLUSTER_NAME),
+                    entry("namespace", namespace),
+                    entry("topic", topic1));
+            assertThat((long) backlogAgeMetric.value).isEqualTo(0);
+
+            // producer should create success.
+            Producer<byte[]> producer2 = createProducer(client, topic1);
+            assertNotNull(producer2);
         }
     }
 
+    @Test
+    public void backlogsStatsPreciseWithNoBacklog() throws 
PulsarAdminException, PulsarClientException, InterruptedException {
+        config.setPreciseTimeBasedBacklogQuotaCheck(true);
+        config.setExposePreciseBacklogInPrometheus(true);
+        final String namespace = "prop/ns-quota";
+        assertEquals(admin.namespaces().getBacklogQuotaMap(namespace), new 
HashMap<>());
+        final int timeLimitSeconds = 2;
+        admin.namespaces().setBacklogQuota(
+                namespace,
+                BacklogQuota.builder()
+                        .limitTime(timeLimitSeconds)
+                        
.retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception)
+                        .build(),
+                message_age);
+
+        try (PulsarClient client = 
PulsarClient.builder().serviceUrl(adminUrl.toString())
+                .maxBackoffInterval(5, SECONDS)
+                .statsInterval(0, SECONDS).build()) {
+            final String topic1 = "persistent://prop/ns-quota/topic2" + 
UUID.randomUUID();
+
+            final String subName1 = "c1";
+            final int numMsgs = 4;
+
+            Consumer<byte[]> consumer1 = 
client.newConsumer().topic(topic1).subscriptionName(subName1)
+                    .acknowledgmentGroupTime(0, SECONDS)
+                    .subscribe();
+            Producer<byte[]> producer = createProducer(client, topic1);
+
+            byte[] content = new byte[1024];
+            for (int i = 0; i < numMsgs; i++) {
+                MessageId send = producer.send(content);
+                System.out.println(i + ":msg:" + 
MILLISECONDS.toSeconds(System.currentTimeMillis()));
+            }
+
+            String c1MarkDeletePositionBefore =
+                    
admin.topics().getInternalStats(topic1).cursors.get(subName1).markDeletePosition;
+
+            // Move subscription 1 to end
+            for (int i = 0; i < numMsgs; i++) {
+                Message<byte[]> message1 = consumer1.receive();
+                consumer1.acknowledge(message1);
+            }
+
+            // This code will wait about 4~5 Seconds, to make sure the oldest 
message is 4~5 seconds old
+            c1MarkDeletePositionBefore = 
waitForMarkDeletePositionToChange(topic1, subName1, c1MarkDeletePositionBefore);
+            waitForQuotaCheckToRunTwice();
+
+            Metrics metrics = prometheusMetricsClient.getMetrics();
+            TopicStats topicStats = getTopicStats(topic1);
+
+            
assertThat(topicStats.getBacklogQuotaLimitTime()).isEqualTo(timeLimitSeconds);
+            assertThat(topicStats.getBacklogSize()).isEqualTo(0);
+            
assertThat(topicStats.getSubscriptions().get(subName1).getMsgBacklog()).isEqualTo(0);
+            
assertThat(topicStats.getOldestBacklogMessageAgeSeconds()).isEqualTo(0);
+            
assertThat(topicStats.getOldestBacklogMessageSubscriptionName()).isNull();
+
+            Metric backlogAgeMetric =
+                    
metrics.findSingleMetricByNameAndLabels("pulsar_storage_backlog_age_seconds",
+                            Pair.of("topic", topic1));
+            assertThat(backlogAgeMetric.tags).containsExactly(
+                    entry("cluster", CLUSTER_NAME),
+                    entry("namespace", namespace),
+                    entry("topic", topic1));
+            assertThat((long) backlogAgeMetric.value).isEqualTo(0);
+
+            // producer should create success.
+            Producer<byte[]> producer2 = createProducer(client, topic1);
+            assertNotNull(producer2);
+        }
+        config.setPreciseTimeBasedBacklogQuotaCheck(false);
+        config.setExposePreciseBacklogInPrometheus(false);
+    }
+
     private long getReadEntries(String topic1) {
         return ((PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topic1).get())
                 .getManagedLedger().getStats().getEntriesReadTotalCount();
@@ -609,6 +722,71 @@ public class BacklogQuotaManagerTest {
         }
     }
 
+    @Test
+    public void backlogsStatsNotPreciseWithNoBacklog() throws 
PulsarAdminException, PulsarClientException, InterruptedException {
+        config.setPreciseTimeBasedBacklogQuotaCheck(false);
+        config.setExposePreciseBacklogInPrometheus(false);
+        config.setManagedLedgerMaxEntriesPerLedger(6);
+        final String namespace = "prop/ns-quota";
+        assertEquals(admin.namespaces().getBacklogQuotaMap(namespace), new 
HashMap<>());
+        final int timeLimitSeconds = 2;
+        admin.namespaces().setBacklogQuota(
+                namespace,
+                BacklogQuota.builder()
+                        .limitTime(timeLimitSeconds)
+                        
.retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception)
+                        .build(),
+                message_age);
+
+        try (PulsarClient client = 
PulsarClient.builder().serviceUrl(adminUrl.toString())
+                .maxBackoffInterval(3, SECONDS)
+                .statsInterval(0, SECONDS).build()) {
+            final String topic1 = "persistent://prop/ns-quota/topic2" + 
UUID.randomUUID();
+
+            final String subName1 = "brandNewC1";
+            final int numMsgs = 5;
+
+            Consumer<byte[]> consumer1 = 
client.newConsumer().topic(topic1).subscriptionName(subName1)
+                    .acknowledgmentGroupTime(0, SECONDS)
+                    .isAckReceiptEnabled(true)
+                    .subscribe();
+            Producer<byte[]> producer = createProducer(client, topic1);
+
+            byte[] content = new byte[1024];
+            for (int i = 0; i < numMsgs; i++) {
+                producer.send(content);
+            }
+
+            String c1MarkDeletePositionBefore =
+                    
admin.topics().getInternalStats(topic1).cursors.get(subName1).markDeletePosition;
+
+            log.info("Moved subscription 1 to end");
+            for (int i = 0; i < numMsgs; i++) {
+                consumer1.acknowledge(consumer1.receive());
+            }
+
+            c1MarkDeletePositionBefore = 
waitForMarkDeletePositionToChange(topic1, subName1, c1MarkDeletePositionBefore);
+            waitForQuotaCheckToRunTwice();
+
+            // backlog and backlogAceSeconds should be 0
+            TopicStats topicStats = getTopicStats(topic1, false);
+            Metrics metrics = prometheusMetricsClient.getMetrics();
+            
assertEquals(topicStats.getSubscriptions().get(subName1).getMsgBacklog(), 0);
+            
assertThat(topicStats.getOldestBacklogMessageSubscriptionName()).isNull();
+            
assertThat(topicStats.getOldestBacklogMessageAgeSeconds()).isEqualTo(0);
+            Metric backlogAgeMetric =
+                    
metrics.findSingleMetricByNameAndLabels("pulsar_storage_backlog_age_seconds",
+                            Pair.of("topic", topic1));
+            assertThat(backlogAgeMetric.value).isEqualTo(0);
+
+            // producer should create success.
+            Producer<byte[]> producer2 = createProducer(client, topic1);
+            assertNotNull(producer2);
+
+            config.setManagedLedgerMaxEntriesPerLedger(MAX_ENTRIES_PER_LEDGER);
+        }
+    }
+
     private void unloadAndLoadTopic(String topic, Producer producer) throws 
PulsarAdminException,
             PulsarClientException {
         admin.topics().unload(topic);

Reply via email to