lhotari commented on code in PR #25188:
URL: https://github.com/apache/pulsar/pull/25188#discussion_r2741437109


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java:
##########
@@ -136,46 +146,117 @@ public void run(Timeout timeout) throws Exception {
     }
 
     CompletableFuture<Void> recheckTopicsChange() {
-        String pattern = topicsPattern.inputPattern();
-        final int epoch = recheckPatternEpoch.incrementAndGet();
-        return client.getLookup().getTopicsUnderNamespace(namespaceName, 
subscriptionMode, pattern, topicsHash)
-            .thenCompose(getTopicsResult -> {
-                // If "recheckTopicsChange" has been called more than one 
times, only make the last one take affects.
-                // Use "synchronized (recheckPatternTaskBackoff)" instead of
-                // `synchronized(PatternMultiTopicsConsumerImpl.this)` to 
avoid locking in a wider range.
-                synchronized (PatternMultiTopicsConsumerImpl.this) {
-                    if (recheckPatternEpoch.get() > epoch) {
-                        return CompletableFuture.completedFuture(null);
-                    }
-                    if (log.isDebugEnabled()) {
-                        log.debug("Pattern consumer [{}] get topics under 
namespace {}, topics.size: {},"
-                                        + " topicsHash: {}, filtered: {}",
-                                
PatternMultiTopicsConsumerImpl.this.getSubscription(),
-                                namespaceName, 
getTopicsResult.getTopics().size(), getTopicsResult.getTopicsHash(),
-                                getTopicsResult.isFiltered());
-                        getTopicsResult.getTopics().forEach(topicName ->
-                                log.debug("Get topics under namespace {}, 
topic: {}", namespaceName, topicName));
-                    }
+        final int epoch = getNextRecheckPatternEpoch();
 
-                    final List<String> oldTopics = new 
ArrayList<>(getPartitions());
-                    return updateSubscriptions(topicsPattern, 
this::setTopicsHash, getTopicsResult,
-                            topicsChangeListener, oldTopics, subscription);
-                }
-            }).thenAccept(__ -> {
-                if (recheckPatternTimeout != null) {
-                    this.recheckPatternTimeout = client.timer().newTimeout(
-                            this, Math.max(1, 
conf.getPatternAutoDiscoveryPeriod()), TimeUnit.SECONDS);
+        CompletableFuture<Void> recheckFuture;
+        // Prefer watcher-based reconcile when a watcher exists and is 
connected. Fallback to lookup if watcher
+        // is not available or the watcher-based request fails.
+        if (supportsTopicListWatcherReconcile()) {
+            String localStateTopicsHash = getLocalStateTopicsHash();
+            recheckFuture = 
topicListWatcher.reconcile(localStateTopicsHash).thenCompose(response -> {
+                return handleWatchTopicListSuccess(response, 
localStateTopicsHash, epoch);
+            }).handle((res, ex) -> {
+                if (ex != null) {
+                    // watcher-based reconcile failed -> fall back to 
lookup-based recheck
+                    return doLookupBasedRecheck(epoch);
+                } else {
+                    // watcher-based reconcile completed successfully
+                    return CompletableFuture.<Void>completedFuture(null);
                 }
-            });
+            }).thenCompose(Function.identity());
+        } else {
+            // Fallback: perform the existing lookup-based recheck
+            recheckFuture = doLookupBasedRecheck(epoch);
+        }
+
+        return recheckFuture.handle((__, ex) -> {
+            if (ex != null) {
+                log.info("[{}][{}] Pattern consumer failed to recheck topics 
changes: {}",
+                        getPattern().inputPattern(), getSubscription(), 
ex.getMessage());
+            }
+            scheduleRecheckTopics();
+            return null;
+        });
+    }
+
+    int getNextRecheckPatternEpoch() {
+        return recheckPatternEpoch.incrementAndGet();
+    }
+
+    CompletableFuture<Void> 
handleWatchTopicListSuccess(CommandWatchTopicListSuccess response,
+                                                        String 
localStateTopicsHash, int epoch) {
+        synchronized (PatternMultiTopicsConsumerImpl.this) {
+            if (recheckPatternEpoch.get() > epoch) {
+                return CompletableFuture.completedFuture(null);
+            }
+            // Build a GetTopicsResult-like object from the watch response
+            // so we can reuse updateSubscriptions
+            final List<String> topics = (response != null)
+                    ? response.getTopicsList()
+                    : Collections.emptyList();
+            final String hash = (response != null && response.hasTopicsHash())
+                    ? response.getTopicsHash()
+                    : null;
+            final boolean changed = !localStateTopicsHash.equals(hash);
+            final GetTopicsResult getTopicsResult =
+                    new GetTopicsResult(topics, hash, true, changed);
+
+            final List<String> oldTopics = new ArrayList<>(getPartitions());
+            return updateSubscriptions(topicsPattern, getTopicsResult, 
topicsChangeListener, oldTopics,
+                    subscription);
+        }
+    }
+
+    boolean supportsTopicListWatcherReconcile() {
+        return topicListWatcher != null && 
topicListWatcher.supportsReconcile() && watcherFuture.isDone()
+                && !watcherFuture.isCompletedExceptionally() && 
topicListWatcher.isConnected();
+    }
+
+    private void scheduleRecheckTopics() {
+        if (!closed) {
+            this.recheckPatternTimeout = client.timer().newTimeout(this,
+                    Math.max(1, conf.getPatternAutoDiscoveryPeriod()), 
TimeUnit.SECONDS);
+        }

Review Comment:
   Good point. Adding the logic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to