This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1300f0a4b1c [fix][client] rollback TopicListWatcher retry behavior 
(#24752)
1300f0a4b1c is described below

commit 1300f0a4b1c015e278ff321d58c12a7e85ba12e7
Author: Oneby <[email protected]>
AuthorDate: Wed Sep 17 21:06:48 2025 +0800

    [fix][client] rollback TopicListWatcher retry behavior (#24752)
    
    Co-authored-by: oneby-wang <[email protected]>
---
 .../impl/PatternMultiTopicsConsumerImpl.java       | 30 ++++++++++++++--------
 .../pulsar/client/impl/TopicListWatcher.java       | 17 +++++-------
 2 files changed, 26 insertions(+), 21 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
index ee949593c19..8a3798a670c 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
@@ -52,6 +52,7 @@ public class PatternMultiTopicsConsumerImpl<T> extends 
MultiTopicsConsumerImpl<T
     private final TopicsPattern topicsPattern;
     final TopicsChangedListener topicsChangeListener;
     private final Mode subscriptionMode;
+    private final TopicListWatcher topicListWatcher;
     private final CompletableFuture<TopicListWatcher> watcherFuture = new 
CompletableFuture<>();
     protected NamespaceName namespaceName;
 
@@ -62,6 +63,7 @@ public class PatternMultiTopicsConsumerImpl<T> extends 
MultiTopicsConsumerImpl<T
     private volatile String topicsHash;
 
     private PatternConsumerUpdateQueue updateTaskQueue;
+    private volatile boolean closed = false;
 
     /***
      * @param topicsPattern The regexp for the topic name(not contains 
partition suffix).
@@ -86,19 +88,26 @@ public class PatternMultiTopicsConsumerImpl<T> extends 
MultiTopicsConsumerImpl<T
         this.updateTaskQueue = new PatternConsumerUpdateQueue(this);
         if (subscriptionMode == Mode.PERSISTENT) {
             long watcherId = client.newTopicListWatcherId();
-            new TopicListWatcher(updateTaskQueue, client, topicsPattern, 
watcherId,
+            topicListWatcher = new TopicListWatcher(updateTaskQueue, client, 
topicsPattern, watcherId,
                 namespaceName, topicsHash, watcherFuture, () -> 
recheckTopicsChangeAfterReconnect());
             watcherFuture
                .exceptionally(ex -> {
-                   log.warn("Pattern consumer [{}] unable to create topic list 
watcher. Falling back to only polling"
-                           + " for new topics", conf.getSubscriptionName(), 
ex);
-                   this.recheckPatternTimeout = client.timer().newTimeout(
-                           this, Math.max(1, 
conf.getPatternAutoDiscoveryPeriod()), TimeUnit.SECONDS);
+                   if (closed) {
+                       log.warn("Pattern consumer [{}] was closed while 
creating topic list watcher",
+                               conf.getSubscriptionName(), ex);
+                   } else {
+                       log.warn(
+                               "Pattern consumer [{}] unable to create topic 
list watcher. Falling back to only polling"
+                                       + " for new topics", 
conf.getSubscriptionName(), ex);
+                       this.recheckPatternTimeout = client.timer()
+                               .newTimeout(this, Math.max(1, 
conf.getPatternAutoDiscoveryPeriod()), TimeUnit.SECONDS);
+                   }
                    return null;
                });
         } else {
             log.debug("Pattern consumer [{}] not creating topic list watcher 
for subscription mode {}",
                     conf.getSubscriptionName(), subscriptionMode);
+            topicListWatcher = null;
             watcherFuture.complete(null);
             this.recheckPatternTimeout = client.timer().newTimeout(
                     this, Math.max(1, conf.getPatternAutoDiscoveryPeriod()), 
TimeUnit.SECONDS);
@@ -120,7 +129,7 @@ public class PatternMultiTopicsConsumerImpl<T> extends 
MultiTopicsConsumerImpl<T
     // TimerTask to recheck topics change, and trigger subscribe/unsubscribe 
based on the change.
     @Override
     public void run(Timeout timeout) throws Exception {
-        if (timeout.isCancelled()) {
+        if (timeout.isCancelled() || closed) {
             return;
         }
         updateTaskQueue.appendRecheckOp();
@@ -381,16 +390,17 @@ public class PatternMultiTopicsConsumerImpl<T> extends 
MultiTopicsConsumerImpl<T
     @Override
     @SuppressFBWarnings
     public CompletableFuture<Void> closeAsync() {
+        closed = true;
         Timeout timeout = recheckPatternTimeout;
         if (timeout != null) {
             timeout.cancel();
             recheckPatternTimeout = null;
         }
-        CompletableFuture<Void> watcherCloseFuture = watcherFuture.thenCompose(
-                topicListWatcher -> 
Optional.ofNullable(topicListWatcher).map(TopicListWatcher::closeAsync)
-                        
.orElse(CompletableFuture.completedFuture(null))).exceptionally(t -> null);
+        CompletableFuture<Void> topicListWatcherCloseFuture =
+                
Optional.ofNullable(topicListWatcher).map(TopicListWatcher::closeAsync)
+                        
.orElse(CompletableFuture.completedFuture(null)).exceptionally(t -> null);
         CompletableFuture<Void> runningTaskCancelFuture = 
updateTaskQueue.cancelAllAndWaitForTheRunningTask();
-        return FutureUtil.waitForAll(Lists.newArrayList(watcherCloseFuture, 
runningTaskCancelFuture))
+        return 
FutureUtil.waitForAll(Lists.newArrayList(topicListWatcherCloseFuture, 
runningTaskCancelFuture))
                 .exceptionally(t -> null).thenCompose(__ -> 
super.closeAsync());
     }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
index d83ae3e7337..49cf64656fe 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
@@ -47,7 +47,6 @@ public class TopicListWatcher extends HandlerState implements 
ConnectionHandler.
     private final ConnectionHandler connectionHandler;
     private final TopicsPattern topicsPattern;
     private final long watcherId;
-    private final long lookupDeadline;
     private volatile long createWatcherDeadline = 0;
     private final NamespaceName namespace;
     // TODO maintain the value based on updates from broker and warn the user 
if inconsistent with hash from polling
@@ -81,7 +80,6 @@ public class TopicListWatcher extends HandlerState implements 
ConnectionHandler.
                 this);
         this.topicsPattern = topicsPattern;
         this.watcherId = watcherId;
-        this.lookupDeadline = System.currentTimeMillis() + 
client.getConfiguration().getLookupTimeoutMs();
         this.namespace = namespace;
         this.topicsHash = topicsHash;
         this.watcherFuture = watcherFuture;
@@ -93,17 +91,12 @@ public class TopicListWatcher extends HandlerState 
implements ConnectionHandler.
     @Override
     public boolean connectionFailed(PulsarClientException exception) {
         boolean nonRetriableError = 
!PulsarClientException.isRetriableError(exception);
-        boolean timeout = System.currentTimeMillis() > lookupDeadline;
-        if (nonRetriableError || timeout) {
+        if (nonRetriableError) {
             exception.setPreviousExceptionCount(previousExceptionCount);
             if (watcherFuture.completeExceptionally(exception)) {
                 setState(State.Failed);
-                if (nonRetriableError) {
-                    log.info("[{}] Watcher creation failed for {} with 
non-retriable error {}", topic, name,
-                            exception.getMessage());
-                } else {
-                    log.info("[{}] Watcher creation failed for {} after 
timeout", topic, name);
-                }
+                log.info("[{}] Watcher creation failed for {} with 
non-retriable error {}",
+                        topic, name, exception.getMessage());
                 deregisterFromClientCnx();
                 return false;
             }
@@ -209,8 +202,10 @@ public class TopicListWatcher extends HandlerState 
implements ConnectionHandler.
     }
 
     public CompletableFuture<Void> closeAsync() {
-
         CompletableFuture<Void> closeFuture = new CompletableFuture<>();
+        // since we set closed flag in PatternMultiTopicsConsumerImpl, it is 
ok to directly cancel watcherFuture whether
+        // it's completed or not to make sure watcherFuture is completed
+        watcherFuture.cancel(false);
 
         if (getState() == State.Closing || getState() == State.Closed) {
             closeFuture.complete(null);

Reply via email to