poorbarcode commented on code in PR #24104:
URL: https://github.com/apache/pulsar/pull/24104#discussion_r2007250072


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java:
##########
@@ -173,6 +158,11 @@ CompletableFuture<Void> recheckTopicsChange() {
                     return updateSubscriptions(topicsPattern, 
this::setTopicsHash, getTopicsResult,
                             topicsChangeListener, oldTopics, subscription);
                 }
+            }).thenAccept(__ -> {
+                if (recheckPatternTimeout != null) {

Review Comment:
   Sure, we'd better add a comment for this variable to mark that it has two 
meanings



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java:
##########
@@ -102,23 +86,24 @@ public PatternMultiTopicsConsumerImpl(Pattern 
topicsPattern,
 
         this.topicsChangeListener = new PatternTopicsChangedListener();
         this.updateTaskQueue = new PatternConsumerUpdateQueue(this);
-        this.recheckPatternTimeout = client.timer()
-                .newTimeout(this, Math.max(1, 
conf.getPatternAutoDiscoveryPeriod()), TimeUnit.SECONDS);
         if (subscriptionMode == Mode.PERSISTENT) {
             long watcherId = client.newTopicListWatcherId();
             new TopicListWatcher(updateTaskQueue, client, topicsPattern, 
watcherId,
                 namespaceName, topicsHash, watcherFuture, () -> 
recheckTopicsChangeAfterReconnect());
             watcherFuture
-               .thenAccept(__ -> recheckPatternTimeout.cancel())
                .exceptionally(ex -> {
                    log.warn("Pattern consumer [{}] unable to create topic list 
watcher. Falling back to only polling"
                            + " for new topics", conf.getSubscriptionName(), 
ex);
+                   this.recheckPatternTimeout = client.timer().newTimeout(
+                           this, Math.max(1, 
conf.getPatternAutoDiscoveryPeriod()), TimeUnit.SECONDS);
                    return null;
                });
         } else {
             log.debug("Pattern consumer [{}] not creating topic list watcher 
for subscription mode {}",
                     conf.getSubscriptionName(), subscriptionMode);
             watcherFuture.complete(null);
+            this.recheckPatternTimeout = client.timer().newTimeout(

Review Comment:
   How about adding a test that covers the scenarios that use non-persistent 
topic regexp?



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java:
##########
@@ -102,23 +86,24 @@ public PatternMultiTopicsConsumerImpl(Pattern 
topicsPattern,
 
         this.topicsChangeListener = new PatternTopicsChangedListener();
         this.updateTaskQueue = new PatternConsumerUpdateQueue(this);
-        this.recheckPatternTimeout = client.timer()
-                .newTimeout(this, Math.max(1, 
conf.getPatternAutoDiscoveryPeriod()), TimeUnit.SECONDS);
         if (subscriptionMode == Mode.PERSISTENT) {
             long watcherId = client.newTopicListWatcherId();
             new TopicListWatcher(updateTaskQueue, client, topicsPattern, 
watcherId,
                 namespaceName, topicsHash, watcherFuture, () -> 
recheckTopicsChangeAfterReconnect());
             watcherFuture
-               .thenAccept(__ -> recheckPatternTimeout.cancel())
                .exceptionally(ex -> {
                    log.warn("Pattern consumer [{}] unable to create topic list 
watcher. Falling back to only polling"
                            + " for new topics", conf.getSubscriptionName(), 
ex);
+                   this.recheckPatternTimeout = client.timer().newTimeout(
+                           this, Math.max(1, 
conf.getPatternAutoDiscoveryPeriod()), TimeUnit.SECONDS);

Review Comment:
   We'd better set a larger min value because the frequent calling of 
`getTopicsUnderNamespace` will cause the issue that the CPU is fully used when 
there are thousands of topics under the same namespace.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to