This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ea6641e3e51 [improve][broker] Replaced checkBackloggedCursors with 
checkBackloggedCursor(single subscription check) upon subscription (#19343)
ea6641e3e51 is described below

commit ea6641e3e51d7681670fea111cbac34080036e1a
Author: Heesung Sohn <[email protected]>
AuthorDate: Sat Jan 28 23:19:56 2023 -0800

    [improve][broker] Replaced checkBackloggedCursors with 
checkBackloggedCursor(single subscription check) upon subscription (#19343)
---
 .../broker/service/persistent/PersistentTopic.java | 22 ++++++++++++++--------
 1 file changed, 14 insertions(+), 8 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 15bf568cbb2..ace27c3be96 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -812,7 +812,9 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                         readCompacted, keySharedMeta, startMessageId, 
consumerEpoch);
 
                 return addConsumerToSubscription(subscription, 
consumer).thenCompose(v -> {
-                    checkBackloggedCursors();
+                    if (subscription instanceof PersistentSubscription 
persistentSubscription) {
+                        checkBackloggedCursor(persistentSubscription);
+                    }
                     if (!cnx.isActive()) {
                         try {
                             consumer.close();
@@ -2566,17 +2568,21 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
 
     @Override
     public void checkBackloggedCursors() {
-        // activate caught up cursors which include consumers
         subscriptions.forEach((subName, subscription) -> {
-            if (!subscription.getConsumers().isEmpty()
-                && subscription.getCursor().getNumberOfEntries() < 
backloggedCursorThresholdEntries) {
-                subscription.getCursor().setActive();
-            } else {
-                subscription.getCursor().setInactive();
-            }
+            checkBackloggedCursor(subscription);
         });
     }
 
+    private void checkBackloggedCursor(PersistentSubscription subscription) {
+        // activate caught up cursor which include consumers
+        if (!subscription.getConsumers().isEmpty()
+                && subscription.getCursor().getNumberOfEntries() < 
backloggedCursorThresholdEntries) {
+            subscription.getCursor().setActive();
+        } else {
+            subscription.getCursor().setInactive();
+        }
+    }
+
     public void checkInactiveLedgers() {
         ledger.checkInactiveLedgerAndRollOver();
     }

Reply via email to