Vanlightly opened a new pull request #11691: URL: https://github.com/apache/pulsar/pull/11691
Fixes #11689 ### Motivation When `Consumer.batchReceive()` is called concurrently by different threads there exists a race condition in `ConsumerBase.java` which when triggered causes a CompletableFuture in the queue `pendingBatchReceives` to be removed from the queue but not completed, causing the consumer to block forever. This has occurred a few times in production recently. The issue is that there are concurrent calls to peek and poll in `peekNextBatchReceive` and the code is only correct when what is peeked is polled. If another thread calls poll between a peek and poll then this bug occurs. There is an error message when this occurs Bug: Removed entry wasn't the expected one. ### Modifications - Added the protection of a lock where a peek and poll of `pendingBatchReceives` are required together in ConsumerBase. - Eagerly instantiate the `pendingBatchReceives` in `ConsumerImpl` and `MultiTopicsConsumerImpl` given that the lazy instantiation was not thread safe. - Replaced the use of peek for checking if a pending queue is empty or not. - Changed the peeking and polling method names to an iterator style of has and next. - The house keeping of clearing any completed futures from the queue is handled only by `nextBatchReceive` now. - Name changes caused some minor changes in some tests. - Some moving of imports due to checkstyle errors - Removed arguments from `failPendingBatchReceives` and `failPendingReceives` as they were superfluos. ### Verifying this change All existing tests that cover the consumer pass. Reproduction was difficult, but I have seen it in production. The only way to make it trigger within a few seconds or minutes was to add a short thread sleep between the peek and poll while calling batchReceive concurrently from many threads with a non-zero `maxNumMessages`. I have run the repro steps in the issue with this fix and added a temporary thread sleep between peek and poll, and have verified it no longer occurs. ### Does this pull request potentially affect one of the following parts: *If `yes` was chosen, please highlight the changes* - Dependencies (does it add or upgrade a dependency): (no) - The public API: (no) - The schema: (no) - The default values of configurations: (no) - The wire protocol: (no) - The rest endpoints: (no) - The admin cli options: (no) - Anything that affects deployment: (no) ### Documentation #### For contributor Nothing to document here, purely internal implementation details. #### For committer For this PR, do we need to update docs? - If yes, - if you update docs in this PR, label this PR with the `doc` label. - if you plan to update docs later, label this PR with the `doc-required` label. - if you need help on updating docs, create a follow-up issue with the `doc-required` label. - If no, label this PR with the `no-need-doc` label and explain why. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org