Hi Haiting

When using cumulative ack, we can save the maximum received MessageId
on the consumer client side to filter the message duplication caused
by reconnection, if the consumer client process restarts the maximum
received MessageId will not exist in the consumer client. This
requires the user to be responsible for the received messages, and if
the user wants to re-consume the received messages, they need to call
`void redeliverUnacknowledgedMessages().` then clear the maximum
received MessageId from the consumer client

Thanks,
Bo

Haiting Jiang <jianghait...@gmail.com> 于2022年9月8日周四 14:51写道:
>
> From the user's perspective, I think we should always avoid delivering
> repeated messages.
> But can the broker tell if the reconnection is caused by topic
> unloading or consumer client process restarting?
> For the latter case, the message should be redelivered, it's the whole
> point of user explicit acking.
>
> Thanks,
> Haiting
>
> On Thu, Sep 8, 2022 at 10:56 AM 丛搏 <bog...@apache.org> wrote:
> >
> > Hello, Pulsar community:
> >
> >
> > Now the consumer does not filter messages that have already been
> > consumed. After consumer reconnection, the broker will dispatch
> > messages to the consumer from the markDeletePosition. In Failover and
> > Exclusive subscription type, all messages in a topic will be
> > dispatched to the same consumer. Let's look at the following example:
> >
> > ```
> >
> >     @Test
> >
> >     public void testConsumerReconnectionRepeatedConsumeMessage()
> > throws Exception {
> >
> >         final String topic = 
> > "testConsumerReconnectionRepeatedConsumeMessage";
> >
> >         @Cleanup
> >
> >         Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
> >
> >                 .topic(topic).sendTimeout(0,
> > TimeUnit.SECONDS).enableBatching(false).create();
> >
> >         @Cleanup
> >
> >         Consumer<String> consumer =
> > pulsarClient.newConsumer(Schema.STRING).subscriptionType(SubscriptionType.Exclusive)
> >
> >                 .topic(topic).subscriptionName("test").subscribe();
> >
> >
> >         // send 5 message
> >
> >         for (int i = 0; i < 5; i++) {
> >
> >             producer.send("test" + i);
> >
> >         }
> >
> >
> >         // consumer receive 5 messages
> >
> >         for (int i = 0; i < 5; i++) {
> >
> >             consumer.receive();
> >
> >         }
> >
> >
> >         admin.topics().unload(topic);
> >
> >
> >         // consumer receive also can receive 5 messages
> >
> >         Message<String> message = null;
> >
> >         for (int i = 0; i < 5; i++) {
> >
> >             message = consumer.receive();
> >
> >         }
> >
> >         consumer.acknowledgeCumulativeAsync(message.getMessageId());
> >
> >     }
> >
> > ```
> >
> > Through the above example, the consumer repeatedly consumes the 5
> > messages sent by the producer, and acks through cumulative ack. If per
> > 1000, 10000 messages cumulative ack once, there will be a lot of
> > repeated consumption that may be caused by consumer reconnection.
> > Although it does not affect the semantics at at-least-once, it will
> > cause a lot of useless overhead.
> >
> >
> > Most importantly it destroys the semantics of pulsar transactions 
> > exactly-once.
> >
> >
> > I want to discuss whether we should fix normal and transaction
> > cumulative acks in the same way. Prevent repeated consumption of
> > messages due to consumer reconnection, and filter messages that users
> > have received through `consumer.receive()`. Or do we only guarantee
> > excetly-once semantics, only guarantee use transaction will not
> > receive the same messages by cumulative ack with the transaction?
> >
> >
> > Please leave your opinion, thanks! :)
> >
> >
> >
> > Thanks,
> >
> > Bo

Reply via email to