lhotari commented on code in PR #25188:
URL: https://github.com/apache/pulsar/pull/25188#discussion_r2737785863


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java:
##########
@@ -173,10 +175,63 @@ public synchronized void close() {
             sendTopicListUpdateTasks.clear();
         }
 
+        synchronized void prepareUpdateTopics() {
+            updatingTopics = true;
+            sendingInProgress = true;
+            sendTopicListUpdateTasks.clear();
+            matchingTopics.clear();
+        }
+
         synchronized void updateTopics(List<String> topics) {
+            if (closed) {
+                return;
+            }
             matchingTopics.clear();
             TopicList.filterTopicsToStream(topics, 
topicsPattern).forEach(matchingTopics::add);
             updatingTopics = false;
+            if (disconnected) {
+                handleNewAndDeletedTopicsWhileDisconnected();
+                matchingTopicsBeforeDisconnected = null;
+                disconnected = false;
+            }
+            sendingCompleted();
+        }
+
+        private synchronized void handleNewAndDeletedTopicsWhileDisconnected() 
{
+            List<String> newTopics = new ArrayList<>();
+            List<String> deletedTopics = new ArrayList<>();
+            Set<String> remainingTopics = new HashSet<>(matchingTopics);
+            for (String topic : matchingTopicsBeforeDisconnected) {
+                if (!remainingTopics.remove(topic)) {
+                    deletedTopics.add(topic);
+                }
+            }
+            newTopics.addAll(remainingTopics);
+            if (!newTopics.isEmpty() || !deletedTopics.isEmpty()) {
+                String hash = TopicList.calculateHash(matchingTopics);
+                sendTopicListUpdate(hash, deletedTopics, newTopics);
+            }
+        }
+
+        @Override
+        public NamespaceName getNamespaceName() {
+            return namespace;
+        }
+
+        @Override
+        public synchronized void onSessionEvent(SessionEvent event) {
+            switch (event) {
+                case SessionReestablished:
+                case Reconnected:
+                    executor.execute(() -> 
topicListService.updateTopicListWatcher(this, null));
+                    break;

Review Comment:
   addressing this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to