Copilot commented on code in PR #25188:
URL: https://github.com/apache/pulsar/pull/25188#discussion_r2737340824


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java:
##########
@@ -266,9 +321,17 @@ public void handleWatchTopicList(NamespaceName 
namespaceName, long watcherId, lo
         CompletableFuture<TopicListWatcher> existingWatcherFuture = 
watchers.putIfAbsent(watcherId, watcherFuture);
 
         if (existingWatcherFuture != null) {
-            log.info("[{}] Watcher with the same watcherId={} is already 
created.", connection, watcherId);
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Watcher with the same watcherId={} is already 
created. Refreshing.", connection,
+                        watcherId);
+            }
             // use the existing watcher if it's already created
-            watcherFuture = existingWatcherFuture;
+            watcherFuture = existingWatcherFuture.thenCompose(watcher -> {
+                watcher.prepareUpdateTopics();
+                CompletableFuture<TopicListWatcher> future = new 
CompletableFuture<>();
+                updateTopicListWatcher(watcher, () -> 
future.complete(watcher));
+                return future;
+            });

Review Comment:
   When handling an existing watcher (lines 329-334), the code calls 
prepareUpdateTopics which clears the matchingTopics collection and sets various 
flags. However, if the watcher is in the middle of processing other events 
(e.g., topic add/delete events from the metadata store), this could lead to 
lost events or inconsistent state. Consider adding checks to ensure the watcher 
is in a stable state before initiating a refresh, or document that this is 
acceptable behavior for reconciliation.



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java:
##########
@@ -80,37 +78,39 @@ public PatternMultiTopicsConsumerImpl(TopicsPattern 
topicsPattern,
         super(client, conf, executorProvider, subscribeFuture, schema, 
interceptors,
                 false /* createTopicIfDoesNotExist */);
         this.topicsPattern = topicsPattern;
-        this.topicsHash = topicsHash;
         this.subscriptionMode = subscriptionMode;
         this.namespaceName = topicsPattern.namespace();
-
         this.topicsChangeListener = new PatternTopicsChangedListener();
         this.updateTaskQueue = new PatternConsumerUpdateQueue(this);
         if (subscriptionMode == Mode.PERSISTENT) {
-            long watcherId = client.newTopicListWatcherId();
-            topicListWatcher = new TopicListWatcher(updateTaskQueue, client, 
topicsPattern, watcherId,
-                namespaceName, topicsHash, watcherFuture, () -> 
recheckTopicsChangeAfterReconnect());
-            watcherFuture
-               .exceptionally(ex -> {
-                   if (closed) {
-                       log.warn("Pattern consumer [{}] was closed while 
creating topic list watcher",
-                               conf.getSubscriptionName(), ex);
-                   } else {
-                       log.warn(
-                               "Pattern consumer [{}] unable to create topic 
list watcher. Falling back to only polling"
-                                       + " for new topics", 
conf.getSubscriptionName(), ex);
-                       this.recheckPatternTimeout = client.timer()
-                               .newTimeout(this, Math.max(1, 
conf.getPatternAutoDiscoveryPeriod()), TimeUnit.SECONDS);
-                   }
-                   return null;
-               });
+            subscribeFuture.whenComplete((__, exception) -> {
+                if (!closed && exception == null) {
+                    long watcherId = client.newTopicListWatcherId();
+                    topicListWatcher = new TopicListWatcher(updateTaskQueue, 
client, topicsPattern, watcherId,
+                            namespaceName, this::getLocalStateTopicsHash, 
watcherFuture,
+                            this::getNextRecheckPatternEpoch);

Review Comment:
   The topicListWatcher field is declared as volatile, and it's assigned in a 
callback (line 89). However, there's a potential race condition: if multiple 
threads access topicListWatcher between the assignment on line 89 and when it's 
used, they might see it as null even though watcherFuture is being processed. 
While this might be acceptable given the usage pattern, consider documenting 
that topicListWatcher should only be accessed after watcherFuture completes, or 
add null checks where it's used.



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternConsumerUpdateQueue.java:
##########
@@ -173,12 +231,31 @@ synchronized void triggerNextTask() {
                 });
                 break;
             }
-            case TOPICS_ADDED: {
-                newTaskFuture = 
topicsChangeListener.onTopicsAdded(task.getRight());
-                break;
-            }
-            case TOPICS_REMOVED: {
-                newTaskFuture = 
topicsChangeListener.onTopicsRemoved(task.getRight());
+            case TOPICS_CHANGED: {
+                TopicsAddedOrRemovedTask topicsAddedOrRemovedTask = 
(TopicsAddedOrRemovedTask) task;
+                newTaskFuture = 
topicsChangeListener.onTopicsRemoved(topicsAddedOrRemovedTask.removedTopics)
+                        .thenCompose(__ ->
+                                
topicsChangeListener.onTopicsAdded(topicsAddedOrRemovedTask.addedTopics))
+                        .thenRun(() -> {
+                            if 
(!patternConsumer.supportsTopicListWatcherReconcile()) {
+                                // ignore topics hash unless topic list 
watcher reconcile is supported since
+                                // the broker side state might be out of sync 
and could cause unnecessary
+                                // reconciliation.
+                                // reconciliation will happen later when the 
client requests the topic listing
+                                // after the next patternAutoDiscoveryPeriod 
interval
+                                // Broker versions that support topic list 
watcher reconcile will also update the
+                                // broker side state when reconciliation is 
requested.
+                                return;
+                            }
+                            String localHash = 
patternConsumer.getLocalStateTopicsHash();
+                            String brokerHash = 
topicsAddedOrRemovedTask.topicsHash;
+                            if (brokerHash != null && brokerHash.length() > 0 
&& !brokerHash.equals(localHash)) {
+                                log.info("[{}][{}] Hash mismatch detected 
(local: {}, broker: {}). Triggering "
+                                                + "reconciliation.", 
patternConsumer.getPattern().inputPattern(),
+                                        patternConsumer.getSubscription(), 
localHash, brokerHash);
+                                appendRecheckOp();

Review Comment:
   In the TOPICS_CHANGED case, when hash mismatch is detected and 
appendRecheckOp is called, this adds a new recheck operation to the queue. 
However, there's no check to prevent multiple recheck operations from being 
added if multiple hash mismatches are detected in quick succession. While the 
PatternConsumerUpdateQueue has a recheckTaskInQueue flag to handle this, the 
logic could be clearer. Consider checking the recheckTaskInQueue flag before 
calling appendRecheckOp to avoid unnecessary queue operations.
   ```suggestion
                                   synchronized 
(PatternConsumerUpdateQueue.this) {
                                       if (!recheckTaskInQueue) {
                                           appendRecheckOp();
                                       }
                                   }
   ```



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java:
##########
@@ -280,7 +282,33 @@ private void cleanupAtClose(CompletableFuture<Void> 
closeFuture, Throwable excep
     }
 
     public void handleCommandWatchTopicUpdate(CommandWatchTopicUpdate update) {
-        
patternConsumerUpdateQueue.appendTopicsRemovedOp(update.getDeletedTopicsList());
-        
patternConsumerUpdateQueue.appendTopicsAddedOp(update.getNewTopicsList());
+        if (update == null) {
+            return;
+        }
+        
patternConsumerUpdateQueue.appendTopicsChangedOp(update.getNewTopicsList(), 
update.getDeletedTopicsList(),
+                update.hasTopicsHash() ? update.getTopicsHash() : "");
+    }
+
+    /**
+     * Perform a single reconciliation request using the existing watcher id 
and the watcher's last-known topics hash.
+     * This will send a WatchTopicList request including the topics-hash to 
the broker. If the watcher is not connected,
+     * the returned future will be completed exceptionally.
+     */
+    public CompletableFuture<CommandWatchTopicListSuccess> reconcile(String 
localStateTopicsHash) {
+        ClientCnx c = cnx();
+        if (c == null || !isConnected()) {
+            CompletableFuture<CommandWatchTopicListSuccess> f = new 
CompletableFuture<>();
+            f.completeExceptionally(new IllegalStateException("Watcher is not 
connected"));
+            return f;

Review Comment:
   When the watcher is not connected (line 299-302), the reconcile method 
returns a failed future with an IllegalStateException. However, this is 
expected behavior in normal operation when the connection is temporarily lost. 
Consider using a more specific exception type (e.g., 
WatcherNotConnectedException) or changing the exception type to indicate that 
this is a transient condition rather than an illegal state. This would make 
error handling clearer for callers.



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternConsumerUpdateQueue.java:
##########
@@ -173,12 +231,31 @@ synchronized void triggerNextTask() {
                 });
                 break;
             }
-            case TOPICS_ADDED: {
-                newTaskFuture = 
topicsChangeListener.onTopicsAdded(task.getRight());
-                break;
-            }
-            case TOPICS_REMOVED: {
-                newTaskFuture = 
topicsChangeListener.onTopicsRemoved(task.getRight());
+            case TOPICS_CHANGED: {
+                TopicsAddedOrRemovedTask topicsAddedOrRemovedTask = 
(TopicsAddedOrRemovedTask) task;
+                newTaskFuture = 
topicsChangeListener.onTopicsRemoved(topicsAddedOrRemovedTask.removedTopics)
+                        .thenCompose(__ ->
+                                
topicsChangeListener.onTopicsAdded(topicsAddedOrRemovedTask.addedTopics))
+                        .thenRun(() -> {
+                            if 
(!patternConsumer.supportsTopicListWatcherReconcile()) {
+                                // ignore topics hash unless topic list 
watcher reconcile is supported since
+                                // the broker side state might be out of sync 
and could cause unnecessary
+                                // reconciliation.
+                                // reconciliation will happen later when the 
client requests the topic listing
+                                // after the next patternAutoDiscoveryPeriod 
interval
+                                // Broker versions that support topic list 
watcher reconcile will also update the
+                                // broker side state when reconciliation is 
requested.
+                                return;
+                            }
+                            String localHash = 
patternConsumer.getLocalStateTopicsHash();
+                            String brokerHash = 
topicsAddedOrRemovedTask.topicsHash;
+                            if (brokerHash != null && brokerHash.length() > 0 
&& !brokerHash.equals(localHash)) {
+                                log.info("[{}][{}] Hash mismatch detected 
(local: {}, broker: {}). Triggering "
+                                                + "reconciliation.", 
patternConsumer.getPattern().inputPattern(),
+                                        patternConsumer.getSubscription(), 
localHash, brokerHash);
+                                appendRecheckOp();

Review Comment:
   The check for broker support (line 240) will always return false for older 
brokers that don't support topic list watcher reconcile. This means that hash 
verification will be skipped entirely for those brokers, even though they could 
still benefit from detecting hash mismatches (though they couldn't fix them via 
reconciliation). Consider logging a warning when hash mismatches are detected 
but reconciliation is not supported, to help diagnose issues with older brokers.
   ```suggestion
                               String localHash = 
patternConsumer.getLocalStateTopicsHash();
                               String brokerHash = 
topicsAddedOrRemovedTask.topicsHash;
                               if (brokerHash != null && brokerHash.length() > 
0 && !brokerHash.equals(localHash)) {
                                   if 
(patternConsumer.supportsTopicListWatcherReconcile()) {
                                       log.info("[{}][{}] Hash mismatch 
detected (local: {}, broker: {}). Triggering "
                                                       + "reconciliation.",
                                               
patternConsumer.getPattern().inputPattern(),
                                               
patternConsumer.getSubscription(), localHash, brokerHash);
                                       appendRecheckOp();
                                   } else {
                                       log.warn("[{}][{}] Hash mismatch 
detected (local: {}, broker: {}), but topic "
                                                       + "list watcher 
reconcile is not supported by the broker. "
                                                       + "This may indicate 
that the client and broker topic lists "
                                                       + "are out of sync and 
cannot be automatically reconciled.",
                                               
patternConsumer.getPattern().inputPattern(),
                                               
patternConsumer.getSubscription(), localHash, brokerHash);
                                   }
   ```



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java:
##########
@@ -136,46 +136,113 @@ public void run(Timeout timeout) throws Exception {
     }
 
     CompletableFuture<Void> recheckTopicsChange() {
-        String pattern = topicsPattern.inputPattern();
-        final int epoch = recheckPatternEpoch.incrementAndGet();
-        return client.getLookup().getTopicsUnderNamespace(namespaceName, 
subscriptionMode, pattern, topicsHash)
-            .thenCompose(getTopicsResult -> {
-                // If "recheckTopicsChange" has been called more than one 
times, only make the last one take affects.
-                // Use "synchronized (recheckPatternTaskBackoff)" instead of
-                // `synchronized(PatternMultiTopicsConsumerImpl.this)` to 
avoid locking in a wider range.
-                synchronized (PatternMultiTopicsConsumerImpl.this) {
-                    if (recheckPatternEpoch.get() > epoch) {
-                        return CompletableFuture.completedFuture(null);
-                    }
-                    if (log.isDebugEnabled()) {
-                        log.debug("Pattern consumer [{}] get topics under 
namespace {}, topics.size: {},"
-                                        + " topicsHash: {}, filtered: {}",
-                                
PatternMultiTopicsConsumerImpl.this.getSubscription(),
-                                namespaceName, 
getTopicsResult.getTopics().size(), getTopicsResult.getTopicsHash(),
-                                getTopicsResult.isFiltered());
-                        getTopicsResult.getTopics().forEach(topicName ->
-                                log.debug("Get topics under namespace {}, 
topic: {}", namespaceName, topicName));
-                    }
+        final int epoch = getNextRecheckPatternEpoch();
 
-                    final List<String> oldTopics = new 
ArrayList<>(getPartitions());
-                    return updateSubscriptions(topicsPattern, 
this::setTopicsHash, getTopicsResult,
-                            topicsChangeListener, oldTopics, subscription);
+        CompletableFuture<Void> recheckFuture;
+        // Prefer watcher-based reconcile when a watcher exists and is 
connected. Fallback to lookup if watcher
+        // is not available or the watcher-based request fails.
+        if (supportsTopicListWatcherReconcile()) {
+            String localStateTopicsHash = getLocalStateTopicsHash();
+            recheckFuture = 
topicListWatcher.reconcile(localStateTopicsHash).thenCompose(response -> {
+                return handleWatchTopicListSuccess(response, 
localStateTopicsHash, epoch);
+            }).handle((res, ex) -> {
+                if (ex != null) {
+                    // watcher-based reconcile failed -> fall back to 
lookup-based recheck
+                    return doLookupBasedRecheck(epoch);
+                } else {
+                    // watcher-based reconcile completed successfully
+                    return CompletableFuture.<Void>completedFuture(null);
                 }
-            }).thenAccept(__ -> {
-                if (recheckPatternTimeout != null) {
-                    this.recheckPatternTimeout = client.timer().newTimeout(
-                            this, Math.max(1, 
conf.getPatternAutoDiscoveryPeriod()), TimeUnit.SECONDS);
-                }
-            });
+            }).thenCompose(Function.identity());
+        } else {
+            // Fallback: perform the existing lookup-based recheck
+            recheckFuture = doLookupBasedRecheck(epoch);
+        }
+
+        return recheckFuture.handle((__, ex) -> {

Review Comment:
   The handle method here swallows exceptions by returning null. This means 
that if recheckTopicsChange fails due to an exception, the error is not 
propagated or logged at this level. While scheduleRecheckTopics will still be 
called, the lack of error handling or logging could make it difficult to 
diagnose issues. Consider logging the exception before returning null, or at 
minimum document that errors are expected to be logged at a lower level.
   ```suggestion
           return recheckFuture.handle((__, ex) -> {
               if (ex != null) {
                   log.warn("Failed to recheck topics change, will reschedule 
next check", ex);
               }
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java:
##########
@@ -173,10 +175,63 @@ public synchronized void close() {
             sendTopicListUpdateTasks.clear();
         }
 
+        synchronized void prepareUpdateTopics() {
+            updatingTopics = true;
+            sendingInProgress = true;
+            sendTopicListUpdateTasks.clear();
+            matchingTopics.clear();
+        }
+
         synchronized void updateTopics(List<String> topics) {
+            if (closed) {
+                return;
+            }
             matchingTopics.clear();
             TopicList.filterTopicsToStream(topics, 
topicsPattern).forEach(matchingTopics::add);
             updatingTopics = false;
+            if (disconnected) {
+                handleNewAndDeletedTopicsWhileDisconnected();
+                matchingTopicsBeforeDisconnected = null;
+                disconnected = false;
+            }
+            sendingCompleted();
+        }
+
+        private synchronized void handleNewAndDeletedTopicsWhileDisconnected() 
{
+            List<String> newTopics = new ArrayList<>();
+            List<String> deletedTopics = new ArrayList<>();
+            Set<String> remainingTopics = new HashSet<>(matchingTopics);
+            for (String topic : matchingTopicsBeforeDisconnected) {
+                if (!remainingTopics.remove(topic)) {
+                    deletedTopics.add(topic);
+                }
+            }
+            newTopics.addAll(remainingTopics);
+            if (!newTopics.isEmpty() || !deletedTopics.isEmpty()) {
+                String hash = TopicList.calculateHash(matchingTopics);
+                sendTopicListUpdate(hash, deletedTopics, newTopics);
+            }
+        }
+
+        @Override
+        public NamespaceName getNamespaceName() {
+            return namespace;
+        }
+
+        @Override
+        public synchronized void onSessionEvent(SessionEvent event) {
+            switch (event) {
+                case SessionReestablished:
+                case Reconnected:
+                    executor.execute(() -> 
topicListService.updateTopicListWatcher(this, null));
+                    break;

Review Comment:
   The onSessionEvent method schedules updateTopicListWatcher on the executor 
for Reconnected and SessionReestablished events. However, if multiple session 
events occur in quick succession (e.g., ConnectionLost followed immediately by 
Reconnected), multiple updateTopicListWatcher calls could be scheduled. While 
the updatingTopics flag provides some protection, consider adding logic to 
cancel or skip pending updates if a new session event arrives before the 
previous update completes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to