lhotari opened a new pull request, #25188:
URL: https://github.com/apache/pulsar/pull/25188

   Fixes #25020
   
   ### Motivation
   
   The `TopicListService` implementation does not handle metadata store session 
events (`SessionEvent`), which causes the client-side topic list to become out 
of sync when the metadata store session experiences temporary disruptions. 
During metadata store disconnections, topic add/delete events are missed, 
leading to inconsistent state between the broker and pattern topic consumers. 
Additionally, metadata store change events may be missed since their delivery 
is not guaranteed. Therefore, state reconciliation is necessary to maintain 
consistency.
   Another motivation is to improve the topic hash handling so that the state 
of the client and the broker can be properly kept in sync while avoiding to 
send topics from the broker to the client when the hash matches.
   
   ### Modifications
   
   This PR introduces session event handling and hash-based verification to 
ensure topic list consistency:
   
   **Server-side changes:**
   - Added `TopicListener` interface to handle both topic events and session 
events
   - Modified `TopicResources` to register session listeners and dispatch 
session events to topic listeners
   - Enhanced `TopicListService.TopicListWatcher` to:
     - Retrieve the topic list after metadata store reconnection
     - Reconstruct missed add/delete events by comparing pre-disconnection and 
post-reconnection states
   - Changed `PulsarResources` to use `MetadataStoreExtended` instead of 
`MetadataStore` to access session event functionality
   - Changed `TopicListService` to accept topic list watcher commands for 
already existing watchers. This is used to reconcile the state when there's a 
mismatch in the client size topic listing hash.
   - Postponed creation of the topic list watcher after the initial topic 
subscription has completed
     - prevents possible race conditions
   
   **Client-side changes:**
   - Schedule periodic state refresh using the configured 
`patternAutoDiscoveryPeriod` interval to detect and correct any drift
   - Implemented hash-based verification to detect inconsistencies:
     - Updated `CommandWatchTopicList` to include the client's current topics 
hash
     - Modified `PatternConsumerUpdateQueue` to combine add and delete 
operations into a single task, enabling hash comparison after applying changes 
to local state
     - Added hash mismatch detection in `PatternMultiTopicsConsumerImpl` that 
triggers reconciliation via `recheckTopicsChange()` when the broker's hash 
differs from the expected local state
     - Enhanced reconciliation logic to compare local state with broker state 
and trigger a refresh when differences are detected
   
   ### Documentation
   
   <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
   
   - [ ] `doc` <!-- Your PR contains doc changes. -->
   - [ ] `doc-required` <!-- Your PR changes impact docs and you will update 
later -->
   - [x] `doc-not-needed` <!-- Your PR changes do not impact docs -->
   - [ ] `doc-complete` <!-- Docs have been already added -->


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to