lhotari opened a new pull request, #25188:
URL: https://github.com/apache/pulsar/pull/25188
Fixes #25020
### Motivation
The `TopicListService` implementation does not handle metadata store session
events (`SessionEvent`), which causes the client-side topic list to become out
of sync when the metadata store session experiences temporary disruptions.
During metadata store disconnections, topic add/delete events are missed,
leading to inconsistent state between the broker and pattern topic consumers.
Additionally, metadata store change events may be missed since their delivery
is not guaranteed. Therefore, state reconciliation is necessary to maintain
consistency.
Another motivation is to improve the topic hash handling so that the state
of the client and the broker can be properly kept in sync while avoiding to
send topics from the broker to the client when the hash matches.
### Modifications
This PR introduces session event handling and hash-based verification to
ensure topic list consistency:
**Server-side changes:**
- Added `TopicListener` interface to handle both topic events and session
events
- Modified `TopicResources` to register session listeners and dispatch
session events to topic listeners
- Enhanced `TopicListService.TopicListWatcher` to:
- Retrieve the topic list after metadata store reconnection
- Reconstruct missed add/delete events by comparing pre-disconnection and
post-reconnection states
- Changed `PulsarResources` to use `MetadataStoreExtended` instead of
`MetadataStore` to access session event functionality
- Changed `TopicListService` to accept topic list watcher commands for
already existing watchers. This is used to reconcile the state when there's a
mismatch in the client size topic listing hash.
- Postponed creation of the topic list watcher after the initial topic
subscription has completed
- prevents possible race conditions
**Client-side changes:**
- Schedule periodic state refresh using the configured
`patternAutoDiscoveryPeriod` interval to detect and correct any drift
- Implemented hash-based verification to detect inconsistencies:
- Updated `CommandWatchTopicList` to include the client's current topics
hash
- Modified `PatternConsumerUpdateQueue` to combine add and delete
operations into a single task, enabling hash comparison after applying changes
to local state
- Added hash mismatch detection in `PatternMultiTopicsConsumerImpl` that
triggers reconciliation via `recheckTopicsChange()` when the broker's hash
differs from the expected local state
- Enhanced reconciliation logic to compare local state with broker state
and trigger a refresh when differences are detected
### Documentation
<!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
- [ ] `doc` <!-- Your PR contains doc changes. -->
- [ ] `doc-required` <!-- Your PR changes impact docs and you will update
later -->
- [x] `doc-not-needed` <!-- Your PR changes do not impact docs -->
- [ ] `doc-complete` <!-- Docs have been already added -->
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]