Praveenkumar76 opened a new pull request, #25679:
URL: https://github.com/apache/pulsar/pull/25679
Fixes #25141
### Motivation
`OneWayReplicatorDeduplicationTest.testDeduplication` is flaky and
occasionally fails with a `ConditionTimeoutException`.
The root cause is a race condition in `MessageDeduplication.takeSnapshot`.
When multiple snapshot requests occur concurrently, the current implementation
uses a `compareAndSet` guard to allow only one active snapshot. If another
request arrives while a snapshot is already in progress, the method immediately
returns a completed future, effectively dropping the new request.
Since the test depends on the snapshot reaching a specific state, dropping
requests leads to inconsistent behavior and eventual timeouts.
### Modifications
- Implemented request coalescing in `MessageDeduplication.takeSnapshot` to
ensure snapshot requests are not silently ignored.
- Introduced a `nextSnapshotFuture` to track pending snapshot requests.
- When a snapshot is already in progress:
- Subsequent requests are grouped into a shared `CompletableFuture`.
- After the current snapshot completes:
- Exactly one additional snapshot is triggered to process all queued
requests.
- Ensured minimal synchronization to avoid performance impact while
maintaining correctness.
### Verifying this change
- Verified that the flaky test now runs consistently without failures.
- Tested by introducing artificial delays in snapshot execution to reproduce
the race condition.
- Confirmed that queued requests are properly handled and no longer dropped.
This change is already covered by existing tests, such as:
-
`org.apache.pulsar.broker.service.OneWayReplicatorDeduplicationTest.testDeduplication`
**Highlight of changes:**
- **Threading model:** Added lightweight synchronization in `takeSnapshot`
to safely coordinate snapshot request batching. This avoids dropping concurrent
requests without introducing significant contention or blocking.
### Does this pull request potentially affect one of the following parts:
- [ ] Dependencies (add or upgrade a dependency)
- [ ] The public API
- [ ] The schema
- [ ] The default values of configurations
- [x] The threading model
- [ ] The binary protocol
- [ ] The REST endpoints
- [ ] The admin CLI options
- [ ] The metrics
- [ ] Anything that affects deployment
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]