shibd opened a new pull request, #24730:
URL: https://github.com/apache/pulsar/pull/24730
### Motivation
This PR fixes a critical bug where the `KeyShared` sticky mode consumer in
Pulsar (reproduced with the Node.js client) would incorrectly consume messages
from hash ranges not explicitly assigned to it. This deviates from the expected
"sticky" behavior and instead mimics "auto-split" logic.
Specifically, if a single `KeyShared` consumer is configured with
non-contiguous sticky ranges, e.g.:
- `[0, 9999]`
- `[20000, 29999]`
- `[40000, 49999]`
The consumer would incorrectly receive messages for keys falling into the
gaps (e.g., `15000` which is between `9999` and `20000`). The root cause lies
in the `HashRangeExclusiveStickyKeyConsumerSelector`'s `select` method, which
previously stored only the start/end points of ranges in its `rangeMap`. This
led to an erroneous interpretation where any hash between a range's `end` and
the next range's `start` would be assigned to the previous range's consumer,
effectively closing the gaps.
This issue is demonstrable with:
- The `testConsumerSelectWithMultipRanges` unit test in this PR.
- The `testCustomStickyRange` integration test in this PR.
### Modifications
This PR refactors the `HashRangeExclusiveStickyKeyConsumerSelector` to
ensure strict enforcement of explicitly defined sticky ranges:
1. **Refactored `rangeMap`**: Changed `rangeMap` from
`ConcurrentSkipListMap<Integer, Consumer>` to `ConcurrentSkipListMap<Integer,
Pair<Range, Consumer>>`. This now allows the selector to store the complete
`Range` object for each assignment, eliminating the need to infer range
boundaries and enabling precise range containment checks.
2. **Enhanced `select(int hash)` Logic**: The `select` method is updated to
directly utilize the stored `Range` objects. It now accurately checks if a
given `hash` is explicitly contained within an assigned `Range` using
`pair.getLeft().contains(hash)`, rather than relying on potentially ambiguous
`floorEntry`/`ceilingEntry` comparisons.
3. **Improved `validateKeySharedMeta(Consumer consumer)` for Strict
Validation**:
* **Internal Range Validity**: A new validation step ensures that a
consumer's own `KeySharedMeta` ranges are valid. It verifies that `start <=
end` for all `IntRange` objects.
* **Internal Range Overlap**: Critically, it now detects and rejects
`KeySharedMeta` where a consumer's own assigned `IntRange`s internally overlap.
This prevents consumers from being registered with self-conflicting
configurations.
4. **Refined `findConflictingConsumer(List<IntRange> newConsumerRanges)`**:
This method's logic was updated to leverage the explicit `Pair<Range,
Consumer>` storage in `rangeMap`. It now accurately detects overlaps between a
new consumer's ranges and ranges already assigned to other active consumers,
utilizing `floorEntry`, `tailMap`, and an `checkRangesOverlap` helper for
efficient and precise conflict detection.
These modifications collectively guarantee that
`HashRangeExclusiveStickyKeyConsumerSelector` strictly enforces range
exclusivity and assignment according to the `KeyShared` sticky policy,
resolving the unintended message consumption from unassigned ranges.
### Verifying this change
- [x] Make sure that the change passes the CI checks.
This change added new unit tests and an integration test and can be verified
as follows:
- **`HashRangeExclusiveStickyKeyConsumerSelectorTest` (Unit Tests)**:
* `testConsumerSelect`: Updated to verify `select(hash)` returns the
correct consumer only when `hash` is within an explicitly defined range, and
`null` for hashes in gaps.
* `testConsumerSelectWithMultipRanges`: Added to confirm a single
consumer with multiple distinct sticky ranges correctly selects messages only
within those ranges and returns `null` for hashes in gaps.
* `testOneConsumerRangeConflict`: Added to verify that a consumer cannot
be added if its own `KeySharedMeta` contains internally conflicting or invalid
(`start > end`) ranges.
* `testSingleRangeConflict` and `testMultipleRangeConflict`: Updated to
correctly assert expected conflicts and non-conflicts based on the new strict
range overlap detection logic.
- **`KeySharedSubscriptionTest.testCustomStickyRange` (Integration Test)**:
* A new end-to-end integration test has been added. It simulates the
reported scenario with a partitioned topic and two consumers assigned
non-overlapping sticky ranges. This test verifies that each consumer *only*
receives messages for keys falling within its *explicitly assigned ranges*,
confirming the fix for the unintended auto-split behavior.
### Does this pull request potentially affect one of the following parts:
*If the box was checked, please highlight the changes*
- [ ] Dependencies (add or upgrade a dependency)
- [ ] The public API
- [ ] The schema
- [ ] The default values of configurations
- [ ] The threading model
- [x] The binary protocol (Changes to `KeySharedMeta` validation might
affect how clients sending invalid `IntRange` lists are handled by the broker,
potentially rejecting previously accepted invalid configurations.)
- [ ] The REST endpoints
- [ ] The admin CLI options
- [ ] The metrics
- [x] Anything that affects deployment (The broker's consumer selection
logic is fundamentally changed for `KeyShared` sticky subscriptions, enforcing
strict range adherence. This alters message distribution behavior for affected
subscriptions.)
### Documentation
- [ ] `doc`
- [x] `doc-required` - [ ] `doc-not-needed`
- [ ] `doc-complete`
### Matching PR in forked repository
PR in forked repository: ```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]