Vanlightly opened a new pull request #11691:
URL: https://github.com/apache/pulsar/pull/11691


   Fixes #11689
   
   ### Motivation
   
   When `Consumer.batchReceive()` is called concurrently by different threads 
there exists a race condition in `ConsumerBase.java` which when triggered 
causes a CompletableFuture in the queue `pendingBatchReceives` to be removed 
from the queue but not completed, causing the consumer to block forever. This 
has occurred a few times in production recently.
   
   The issue is that there are concurrent calls to peek and poll in 
`peekNextBatchReceive` and the code is only correct when what is peeked is 
polled. If another thread calls poll between a peek and poll then this bug 
occurs. There is an error message when this occurs Bug: Removed entry wasn't 
the expected one.
   
   ### Modifications
   
   - Added the protection of a lock where a peek and poll of 
`pendingBatchReceives` are required together in ConsumerBase.
   - Eagerly instantiate the `pendingBatchReceives` in `ConsumerImpl` and 
`MultiTopicsConsumerImpl` given that the lazy instantiation was not thread safe.
   - Replaced the use of peek for checking if a pending queue is empty or not.
   - Changed the peeking and polling method names to an iterator style of has 
and next.
   - The house keeping of clearing any completed futures from the queue is 
handled only by `nextBatchReceive` now.
   - Name changes caused some minor changes in some tests.
   - Some moving of imports due to checkstyle errors
   - Removed arguments from `failPendingBatchReceives` and 
`failPendingReceives` as they were superfluos.
   
   ### Verifying this change
   
   All existing tests that cover the consumer pass. 
   
   Reproduction was difficult, but I have seen it in production. The only way 
to make it trigger within a few seconds or minutes was to add a short thread 
sleep between the peek and poll while calling batchReceive concurrently from 
many threads with a non-zero `maxNumMessages`. I have run the repro steps in 
the issue with this fix and added a temporary thread sleep between peek and 
poll, and have verified it no longer occurs.
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If `yes` was chosen, please highlight the changes*
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API: (no)
     - The schema: (no)
     - The default values of configurations: (no)
     - The wire protocol: (no)
     - The rest endpoints: (no)
     - The admin cli options: (no)
     - Anything that affects deployment: (no)
   
   ### Documentation
   
   #### For contributor
   
   Nothing to document here, purely internal implementation details.
   
   #### For committer
   
   For this PR, do we need to update docs?
   
   - If yes,
     
     - if you update docs in this PR, label this PR with the `doc` label.
     
     - if you plan to update docs later, label this PR with the `doc-required` 
label.
   
     - if you need help on updating docs, create a follow-up issue with the 
`doc-required` label.
     
   - If no, label this PR with the `no-need-doc` label and explain why.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to