lhotari commented on code in PR #25188:
URL: https://github.com/apache/pulsar/pull/25188#discussion_r2741565075
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java:
##########
@@ -266,9 +370,27 @@ public void handleWatchTopicList(NamespaceName
namespaceName, long watcherId, lo
CompletableFuture<TopicListWatcher> existingWatcherFuture =
watchers.putIfAbsent(watcherId, watcherFuture);
if (existingWatcherFuture != null) {
- log.info("[{}] Watcher with the same watcherId={} is already
created.", connection, watcherId);
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Watcher with the same watcherId={} is already
created. Refreshing.", connection,
+ watcherId);
+ }
// use the existing watcher if it's already created
- watcherFuture = existingWatcherFuture;
+ watcherFuture = existingWatcherFuture.thenCompose(watcher -> {
+ CompletableFuture<TopicListWatcher> future = new
CompletableFuture<>();
+ Runnable callback = () -> future.complete(watcher);
+ // trigger a new update unless an update is already ongoing.
Register the callback to complete
+ // when the update completes.
+ if (watcher.prepareUpdateTopics(callback)) {
+ updateTopicListWatcher(watcher)
+ // run the callback also in failure cases
+ // prepareUpdateTopics handles it for success cases
+ .exceptionally(ex -> {
+ callback.run();
+ return null;
+ });
+ }
+ return future;
Review Comment:
Added tests
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]