berg223 commented on issue #25145: URL: https://github.com/apache/pulsar/issues/25145#issuecomment-3798271727
@YinY1 @lhotari I have reproduced the in issue in my branch named [reproduce_issue_25145](https://github.com/apache/pulsar/commit/437a68a4b5ddb5c9b5949b0d9410613fccb4e901) And a pr will be committed in the nex days. Consumer lost message due to race condition in acknowledge with batch message There is two threads: 1. Thread A: receive batch messages from broker. It will filter duplicate message by ackSet. ``` org.apache.pulsar.client.impl.ConsumerImpl#receiveIndividualMessagesFromBatch org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker#isDuplicate ``` 2. Thread B: flush ack request to broker. It will clear the ackSet at the end. ``` org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker#flushAsync org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker#newMessageAckCommandAndWrite org.apache.pulsar.common.protocol.Commands#newMultiMessageAck org.apache.pulsar.common.protocol.Commands#newMultiMessageAckCommon org.apache.pulsar.common.util.collections.ConcurrentBitSetRecyclable#recycle ``` In the race condition, thread B cleared the bitSet. Then thread A mistakenly believe the message is duplicate and ignore it. I have abstracted the log: ``` 2628511 2026-01-26T02:04:10,170+0800 [pulsar-client-io-1-5] DEBUG org.apache.pulsar.client.impl.PersistentAcknowledgm entsGroupingTracker - [ConsumerBase{subscription='sub-2', consumerName='zfGrH', topic='persistent://public/default/partitioned_topic_25145-partition-6'}] Before pendingIndividualBatchIndexAcks messageId 17:33:6:589 2628602 2026-01-26T02:04:10,170+0800 [pulsar-client-io-1-5] DEBUG org.apache.pulsar.client.impl.PersistentAcknowledgm entsGroupingTracker - [ConsumerBase{subscription='sub-2', consumerName='zfGrH', topic='persistent://public/default/partitioned_topic_25145-partition-6'}] After pendingIndividualBatchIndexAcks messageId 17:33:6:589 isEmpty true 2628589 2026-01-26T02:04:10,171+0800 [pulsar-client-io-1-3] DEBUG org.apache.pulsar.client.impl.PersistentAcknowledgm entsGroupingTracker - [ConsumerBase{subscription='sub-2', consumerName='zfGrH', topic='persistent://public/default/partitioned_topic_25145-partition-6'}] polled pending batch index ack, messageId 17:33:6 2628599 2026-01-26T02:04:10,171+0800 [pulsar-client-io-1-3] DEBUG org.apache.pulsar.common.protocol.Commands - before recycle 17:33 2628601 2026-01-26T02:04:10,171+0800 [pulsar-client-io-1-3] DEBUG org.apache.pulsar.common.protocol.Commands - after recycle 17:33 2628604 2026-01-26T02:04:10,171+0800 [pulsar-client-io-1-5] DEBUG org.apache.pulsar.client.impl.PersistentAcknowledgm entsGroupingTracker - [ConsumerBase{subscription='sub-2', consumerName='zfGrH', topic='persistent://public/de fault/partitioned_topic_25145-partition-6'}] complete ackSet check messageId 17:33:6:589 ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
