Hi Bo,

Overall it makes sense to me.
It is basically the same as broker side deduplication mechanism when
producing messages, which uses `sequenceId`.
In your case, messageId is used for deduplication. It should work as
long as the received messageId increases monotonically.

So we should be careful of any operations that would reset the cursor.
For example, if the user resets the cursor with the admin client. We
need more detail info on this matter.
And I am not sure if there are other operations that would reset the
cursor implicitly.

Thanks,
Haiting

On Thu, Sep 8, 2022 at 11:36 PM 丛搏 <congbobo...@gmail.com> wrote:
>
> Hi Haiting
>
> When using cumulative ack, we can save the maximum received MessageId
> on the consumer client side to filter the message duplication caused
> by reconnection, if the consumer client process restarts the maximum
> received MessageId will not exist in the consumer client. This
> requires the user to be responsible for the received messages, and if
> the user wants to re-consume the received messages, they need to call
> `void redeliverUnacknowledgedMessages().` then clear the maximum
> received MessageId from the consumer client
>
> Thanks,
> Bo
>
> Haiting Jiang <jianghait...@gmail.com> 于2022年9月8日周四 14:51写道:
> >
> > From the user's perspective, I think we should always avoid delivering
> > repeated messages.
> > But can the broker tell if the reconnection is caused by topic
> > unloading or consumer client process restarting?
> > For the latter case, the message should be redelivered, it's the whole
> > point of user explicit acking.
> >
> > Thanks,
> > Haiting
> >
> > On Thu, Sep 8, 2022 at 10:56 AM 丛搏 <bog...@apache.org> wrote:
> > >
> > > Hello, Pulsar community:
> > >
> > >
> > > Now the consumer does not filter messages that have already been
> > > consumed. After consumer reconnection, the broker will dispatch
> > > messages to the consumer from the markDeletePosition. In Failover and
> > > Exclusive subscription type, all messages in a topic will be
> > > dispatched to the same consumer. Let's look at the following example:
> > >
> > > ```
> > >
> > >     @Test
> > >
> > >     public void testConsumerReconnectionRepeatedConsumeMessage()
> > > throws Exception {
> > >
> > >         final String topic = 
> > > "testConsumerReconnectionRepeatedConsumeMessage";
> > >
> > >         @Cleanup
> > >
> > >         Producer<String> producer = 
> > > pulsarClient.newProducer(Schema.STRING)
> > >
> > >                 .topic(topic).sendTimeout(0,
> > > TimeUnit.SECONDS).enableBatching(false).create();
> > >
> > >         @Cleanup
> > >
> > >         Consumer<String> consumer =
> > > pulsarClient.newConsumer(Schema.STRING).subscriptionType(SubscriptionType.Exclusive)
> > >
> > >                 .topic(topic).subscriptionName("test").subscribe();
> > >
> > >
> > >         // send 5 message
> > >
> > >         for (int i = 0; i < 5; i++) {
> > >
> > >             producer.send("test" + i);
> > >
> > >         }
> > >
> > >
> > >         // consumer receive 5 messages
> > >
> > >         for (int i = 0; i < 5; i++) {
> > >
> > >             consumer.receive();
> > >
> > >         }
> > >
> > >
> > >         admin.topics().unload(topic);
> > >
> > >
> > >         // consumer receive also can receive 5 messages
> > >
> > >         Message<String> message = null;
> > >
> > >         for (int i = 0; i < 5; i++) {
> > >
> > >             message = consumer.receive();
> > >
> > >         }
> > >
> > >         consumer.acknowledgeCumulativeAsync(message.getMessageId());
> > >
> > >     }
> > >
> > > ```
> > >
> > > Through the above example, the consumer repeatedly consumes the 5
> > > messages sent by the producer, and acks through cumulative ack. If per
> > > 1000, 10000 messages cumulative ack once, there will be a lot of
> > > repeated consumption that may be caused by consumer reconnection.
> > > Although it does not affect the semantics at at-least-once, it will
> > > cause a lot of useless overhead.
> > >
> > >
> > > Most importantly it destroys the semantics of pulsar transactions 
> > > exactly-once.
> > >
> > >
> > > I want to discuss whether we should fix normal and transaction
> > > cumulative acks in the same way. Prevent repeated consumption of
> > > messages due to consumer reconnection, and filter messages that users
> > > have received through `consumer.receive()`. Or do we only guarantee
> > > excetly-once semantics, only guarantee use transaction will not
> > > receive the same messages by cumulative ack with the transaction?
> > >
> > >
> > > Please leave your opinion, thanks! :)
> > >
> > >
> > >
> > > Thanks,
> > >
> > > Bo

Reply via email to