Denovo1998 commented on code in PR #25188:
URL: https://github.com/apache/pulsar/pull/25188#discussion_r2741338643


##########
pulsar-common/src/main/proto/PulsarApi.proto:
##########
@@ -315,6 +315,7 @@ message FeatureFlags {
   optional bool supports_topic_watchers = 4 [default = false];
   optional bool supports_get_partitioned_metadata_without_auto_creation = 5 
[default = false];
   optional bool supports_repl_dedup_by_lid_and_eid = 6 [default = false];
+  optional bool supports_topic_watcher_reconcile = 7 [default = false];

Review Comment:
   The comment suggests that a new feature flag needs to be synchronously added 
to `PulsarClientException.FailedFeatureCheck`. Should it be added?



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java:
##########
@@ -136,46 +146,117 @@ public void run(Timeout timeout) throws Exception {
     }
 
     CompletableFuture<Void> recheckTopicsChange() {
-        String pattern = topicsPattern.inputPattern();
-        final int epoch = recheckPatternEpoch.incrementAndGet();
-        return client.getLookup().getTopicsUnderNamespace(namespaceName, 
subscriptionMode, pattern, topicsHash)
-            .thenCompose(getTopicsResult -> {
-                // If "recheckTopicsChange" has been called more than one 
times, only make the last one take affects.
-                // Use "synchronized (recheckPatternTaskBackoff)" instead of
-                // `synchronized(PatternMultiTopicsConsumerImpl.this)` to 
avoid locking in a wider range.
-                synchronized (PatternMultiTopicsConsumerImpl.this) {
-                    if (recheckPatternEpoch.get() > epoch) {
-                        return CompletableFuture.completedFuture(null);
-                    }
-                    if (log.isDebugEnabled()) {
-                        log.debug("Pattern consumer [{}] get topics under 
namespace {}, topics.size: {},"
-                                        + " topicsHash: {}, filtered: {}",
-                                
PatternMultiTopicsConsumerImpl.this.getSubscription(),
-                                namespaceName, 
getTopicsResult.getTopics().size(), getTopicsResult.getTopicsHash(),
-                                getTopicsResult.isFiltered());
-                        getTopicsResult.getTopics().forEach(topicName ->
-                                log.debug("Get topics under namespace {}, 
topic: {}", namespaceName, topicName));
-                    }
+        final int epoch = getNextRecheckPatternEpoch();
 
-                    final List<String> oldTopics = new 
ArrayList<>(getPartitions());
-                    return updateSubscriptions(topicsPattern, 
this::setTopicsHash, getTopicsResult,
-                            topicsChangeListener, oldTopics, subscription);
-                }
-            }).thenAccept(__ -> {
-                if (recheckPatternTimeout != null) {
-                    this.recheckPatternTimeout = client.timer().newTimeout(
-                            this, Math.max(1, 
conf.getPatternAutoDiscoveryPeriod()), TimeUnit.SECONDS);
+        CompletableFuture<Void> recheckFuture;
+        // Prefer watcher-based reconcile when a watcher exists and is 
connected. Fallback to lookup if watcher
+        // is not available or the watcher-based request fails.
+        if (supportsTopicListWatcherReconcile()) {
+            String localStateTopicsHash = getLocalStateTopicsHash();
+            recheckFuture = 
topicListWatcher.reconcile(localStateTopicsHash).thenCompose(response -> {
+                return handleWatchTopicListSuccess(response, 
localStateTopicsHash, epoch);
+            }).handle((res, ex) -> {
+                if (ex != null) {
+                    // watcher-based reconcile failed -> fall back to 
lookup-based recheck
+                    return doLookupBasedRecheck(epoch);
+                } else {
+                    // watcher-based reconcile completed successfully
+                    return CompletableFuture.<Void>completedFuture(null);
                 }
-            });
+            }).thenCompose(Function.identity());
+        } else {
+            // Fallback: perform the existing lookup-based recheck
+            recheckFuture = doLookupBasedRecheck(epoch);
+        }
+
+        return recheckFuture.handle((__, ex) -> {
+            if (ex != null) {
+                log.info("[{}][{}] Pattern consumer failed to recheck topics 
changes: {}",
+                        getPattern().inputPattern(), getSubscription(), 
ex.getMessage());
+            }
+            scheduleRecheckTopics();
+            return null;
+        });
+    }
+
+    int getNextRecheckPatternEpoch() {
+        return recheckPatternEpoch.incrementAndGet();
+    }
+
+    CompletableFuture<Void> 
handleWatchTopicListSuccess(CommandWatchTopicListSuccess response,
+                                                        String 
localStateTopicsHash, int epoch) {
+        synchronized (PatternMultiTopicsConsumerImpl.this) {
+            if (recheckPatternEpoch.get() > epoch) {
+                return CompletableFuture.completedFuture(null);
+            }
+            // Build a GetTopicsResult-like object from the watch response
+            // so we can reuse updateSubscriptions
+            final List<String> topics = (response != null)
+                    ? response.getTopicsList()
+                    : Collections.emptyList();
+            final String hash = (response != null && response.hasTopicsHash())
+                    ? response.getTopicsHash()
+                    : null;
+            final boolean changed = !localStateTopicsHash.equals(hash);
+            final GetTopicsResult getTopicsResult =
+                    new GetTopicsResult(topics, hash, true, changed);
+
+            final List<String> oldTopics = new ArrayList<>(getPartitions());
+            return updateSubscriptions(topicsPattern, getTopicsResult, 
topicsChangeListener, oldTopics,
+                    subscription);
+        }
+    }
+
+    boolean supportsTopicListWatcherReconcile() {
+        return topicListWatcher != null && 
topicListWatcher.supportsReconcile() && watcherFuture.isDone()
+                && !watcherFuture.isCompletedExceptionally() && 
topicListWatcher.isConnected();
+    }
+
+    private void scheduleRecheckTopics() {
+        if (!closed) {
+            this.recheckPatternTimeout = client.timer().newTimeout(this,
+                    Math.max(1, conf.getPatternAutoDiscoveryPeriod()), 
TimeUnit.SECONDS);
+        }

Review Comment:
   Is there a problem with duplicate scheduled tasks? Need to cancel old 
timeout references?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java:
##########
@@ -266,9 +370,27 @@ public void handleWatchTopicList(NamespaceName 
namespaceName, long watcherId, lo
         CompletableFuture<TopicListWatcher> existingWatcherFuture = 
watchers.putIfAbsent(watcherId, watcherFuture);
 
         if (existingWatcherFuture != null) {
-            log.info("[{}] Watcher with the same watcherId={} is already 
created.", connection, watcherId);
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Watcher with the same watcherId={} is already 
created. Refreshing.", connection,
+                        watcherId);
+            }
             // use the existing watcher if it's already created
-            watcherFuture = existingWatcherFuture;
+            watcherFuture = existingWatcherFuture.thenCompose(watcher -> {
+                CompletableFuture<TopicListWatcher> future = new 
CompletableFuture<>();
+                Runnable callback = () -> future.complete(watcher);
+                // trigger a new update unless an update is already ongoing. 
Register the callback to complete
+                // when the update completes.
+                if (watcher.prepareUpdateTopics(callback)) {
+                    updateTopicListWatcher(watcher)
+                            // run the callback also in failure cases
+                            // prepareUpdateTopics handles it for success cases
+                            .exceptionally(ex -> {
+                                callback.run();
+                                return null;
+                            });
+                }
+                return future;

Review Comment:
   Possible hang for existing watcher refresh: when CommandWatchTopicList hits 
an already-existing watcher and prepareUpdateTopics(callback) returns false 
(update in progress), we return a future that’s only completed when callbacks 
are drained in TopicListWatcher.updateTopics() (success path).
   
   If the in-flight update fails terminally (non-permit / non-retry case), 
callbacks are never drained, so this watch request can hang indefinitely (and 
potentially block lookup semaphore release / response)?
   
   Maybe we should: drain/complete callbacks on update failure as well (and 
reset update state), or track the in-flight update CompletableFuture and attach 
the request completion to it (success + failure)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to