Hi all,

you can see the problem in a google doc and comments.

google doc link:
https://docs.google.com/document/d/1J1xGcj8YORrdlCa_XDt28uV0TMp03gSmX_z43ZRhwZo/edit?usp=sharing

Thanks!
Bo

丛搏 <bog...@apache.org> 于2022年9月8日周四 10:55写道:
>
> Hello, Pulsar community:
>
>
> Now the consumer does not filter messages that have already been
> consumed. After consumer reconnection, the broker will dispatch
> messages to the consumer from the markDeletePosition. In Failover and
> Exclusive subscription type, all messages in a topic will be
> dispatched to the same consumer. Let's look at the following example:
>
> ```
>
>     @Test
>
>     public void testConsumerReconnectionRepeatedConsumeMessage()
> throws Exception {
>
>         final String topic = "testConsumerReconnectionRepeatedConsumeMessage";
>
>         @Cleanup
>
>         Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
>
>                 .topic(topic).sendTimeout(0,
> TimeUnit.SECONDS).enableBatching(false).create();
>
>         @Cleanup
>
>         Consumer<String> consumer =
> pulsarClient.newConsumer(Schema.STRING).subscriptionType(SubscriptionType.Exclusive)
>
>                 .topic(topic).subscriptionName("test").subscribe();
>
>
>         // send 5 message
>
>         for (int i = 0; i < 5; i++) {
>
>             producer.send("test" + i);
>
>         }
>
>
>         // consumer receive 5 messages
>
>         for (int i = 0; i < 5; i++) {
>
>             consumer.receive();
>
>         }
>
>
>         admin.topics().unload(topic);
>
>
>         // consumer receive also can receive 5 messages
>
>         Message<String> message = null;
>
>         for (int i = 0; i < 5; i++) {
>
>             message = consumer.receive();
>
>         }
>
>         consumer.acknowledgeCumulativeAsync(message.getMessageId());
>
>     }
>
> ```
>
> Through the above example, the consumer repeatedly consumes the 5
> messages sent by the producer, and acks through cumulative ack. If per
> 1000, 10000 messages cumulative ack once, there will be a lot of
> repeated consumption that may be caused by consumer reconnection.
> Although it does not affect the semantics at at-least-once, it will
> cause a lot of useless overhead.
>
>
> Most importantly it destroys the semantics of pulsar transactions 
> exactly-once.
>
>
> I want to discuss whether we should fix normal and transaction
> cumulative acks in the same way. Prevent repeated consumption of
> messages due to consumer reconnection, and filter messages that users
> have received through `consumer.receive()`. Or do we only guarantee
> excetly-once semantics, only guarantee use transaction will not
> receive the same messages by cumulative ack with the transaction?
>
>
> Please leave your opinion, thanks! :)
>
>
>
> Thanks,
>
> Bo

Reply via email to