Hi all, you can see the problem in a google doc and comments.
google doc link: https://docs.google.com/document/d/1J1xGcj8YORrdlCa_XDt28uV0TMp03gSmX_z43ZRhwZo/edit?usp=sharing Thanks! Bo 丛搏 <bog...@apache.org> 于2022年9月8日周四 10:55写道: > > Hello, Pulsar community: > > > Now the consumer does not filter messages that have already been > consumed. After consumer reconnection, the broker will dispatch > messages to the consumer from the markDeletePosition. In Failover and > Exclusive subscription type, all messages in a topic will be > dispatched to the same consumer. Let's look at the following example: > > ``` > > @Test > > public void testConsumerReconnectionRepeatedConsumeMessage() > throws Exception { > > final String topic = "testConsumerReconnectionRepeatedConsumeMessage"; > > @Cleanup > > Producer<String> producer = pulsarClient.newProducer(Schema.STRING) > > .topic(topic).sendTimeout(0, > TimeUnit.SECONDS).enableBatching(false).create(); > > @Cleanup > > Consumer<String> consumer = > pulsarClient.newConsumer(Schema.STRING).subscriptionType(SubscriptionType.Exclusive) > > .topic(topic).subscriptionName("test").subscribe(); > > > // send 5 message > > for (int i = 0; i < 5; i++) { > > producer.send("test" + i); > > } > > > // consumer receive 5 messages > > for (int i = 0; i < 5; i++) { > > consumer.receive(); > > } > > > admin.topics().unload(topic); > > > // consumer receive also can receive 5 messages > > Message<String> message = null; > > for (int i = 0; i < 5; i++) { > > message = consumer.receive(); > > } > > consumer.acknowledgeCumulativeAsync(message.getMessageId()); > > } > > ``` > > Through the above example, the consumer repeatedly consumes the 5 > messages sent by the producer, and acks through cumulative ack. If per > 1000, 10000 messages cumulative ack once, there will be a lot of > repeated consumption that may be caused by consumer reconnection. > Although it does not affect the semantics at at-least-once, it will > cause a lot of useless overhead. > > > Most importantly it destroys the semantics of pulsar transactions > exactly-once. > > > I want to discuss whether we should fix normal and transaction > cumulative acks in the same way. Prevent repeated consumption of > messages due to consumer reconnection, and filter messages that users > have received through `consumer.receive()`. Or do we only guarantee > excetly-once semantics, only guarantee use transaction will not > receive the same messages by cumulative ack with the transaction? > > > Please leave your opinion, thanks! :) > > > > Thanks, > > Bo