YinY1 commented on issue #25204:
URL: https://github.com/apache/pulsar/issues/25204#issuecomment-3839587533

   I suppose the bug may caused by 
`MultiTopicsConsumerImpl.receiveMessageFromConsumer()`.
   More specificly, consumer async received a batch of messages, then validated 
consumer epoch. If `redeliverUnacknowledgedMessages` increses epoch 
concurrently, messages with same epoch may be filtered out after epoch updated, 
but some will not.
   
   For example, a list of messages with epoch [0, 0, 0]. User call 
`redeliverUnacknowledgedMessages` updated epoch to 1, so user wants get 
messages with epoch 1 after called that.
   But in internal consumer, `messagesFuture` may completed before user's call, 
then iterate each message and validate epoch. msg[0] will not be filtered out 
for current epoch is 0. Then user's call happened, epoch has been updated to 1, 
so msg[1..] will be filtered out.
   
   timeline is like:
   `CONSUMER_EPOCH` = 0 -->
   `messagesFuture.thenAcceptAsync` with messages **epochs** [0, 0, 0] -->
   isValidConsumerEpoch(messages[0]) = **true** -->
   `redeliverUnacknowledgedMessages()` -->
   `CONSUMER_EPOCH.incrementAndGet` (now is 1)
   `isValidConsumerEpoch(messages[1])` = **false** -->
   `isValidConsumerEpoch(messages[2])` = **false** -->
   `consumer.receive()` got *messages[0]*
   
   relivant code is in 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java:250-284


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to