I agree that reducing unnecessary duplicates is a good goal.

For the topic unloading case, it might help to think about how we can
improve the protocol. The current handover is very blunt. A new
solution could focus on decreasing duplicate message delivery while
also focusing on decreasing time where the topic is unavailable.

This thread is probably a good requirement when thinking about ways to
improve the topic handover logic, which has been discussed as a
potential load balancing enhancement or a 3.0 enhancement.

> So we should be careful of any operations that would reset the cursor.
> For example, if the user resets the cursor with the admin client. We
> need more detail info on this matter.

This is a great point! In this case, it seems important to redeliver
these messages. (For those unfamiliar, I just confirmed that the
broker disconnects consumers when a cursor is reset.)

Thanks,
Michael

On Thu, Sep 8, 2022 at 9:33 PM Haiting Jiang <jianghait...@gmail.com> wrote:
>
> Hi Bo,
>
> Overall it makes sense to me.
> It is basically the same as broker side deduplication mechanism when
> producing messages, which uses `sequenceId`.
> In your case, messageId is used for deduplication. It should work as
> long as the received messageId increases monotonically.
>
> So we should be careful of any operations that would reset the cursor.
> For example, if the user resets the cursor with the admin client. We
> need more detail info on this matter.
> And I am not sure if there are other operations that would reset the
> cursor implicitly.
>
> Thanks,
> Haiting
>
> On Thu, Sep 8, 2022 at 11:36 PM 丛搏 <congbobo...@gmail.com> wrote:
> >
> > Hi Haiting
> >
> > When using cumulative ack, we can save the maximum received MessageId
> > on the consumer client side to filter the message duplication caused
> > by reconnection, if the consumer client process restarts the maximum
> > received MessageId will not exist in the consumer client. This
> > requires the user to be responsible for the received messages, and if
> > the user wants to re-consume the received messages, they need to call
> > `void redeliverUnacknowledgedMessages().` then clear the maximum
> > received MessageId from the consumer client
> >
> > Thanks,
> > Bo
> >
> > Haiting Jiang <jianghait...@gmail.com> 于2022年9月8日周四 14:51写道:
> > >
> > > From the user's perspective, I think we should always avoid delivering
> > > repeated messages.
> > > But can the broker tell if the reconnection is caused by topic
> > > unloading or consumer client process restarting?
> > > For the latter case, the message should be redelivered, it's the whole
> > > point of user explicit acking.
> > >
> > > Thanks,
> > > Haiting
> > >
> > > On Thu, Sep 8, 2022 at 10:56 AM 丛搏 <bog...@apache.org> wrote:
> > > >
> > > > Hello, Pulsar community:
> > > >
> > > >
> > > > Now the consumer does not filter messages that have already been
> > > > consumed. After consumer reconnection, the broker will dispatch
> > > > messages to the consumer from the markDeletePosition. In Failover and
> > > > Exclusive subscription type, all messages in a topic will be
> > > > dispatched to the same consumer. Let's look at the following example:
> > > >
> > > > ```
> > > >
> > > >     @Test
> > > >
> > > >     public void testConsumerReconnectionRepeatedConsumeMessage()
> > > > throws Exception {
> > > >
> > > >         final String topic = 
> > > > "testConsumerReconnectionRepeatedConsumeMessage";
> > > >
> > > >         @Cleanup
> > > >
> > > >         Producer<String> producer = 
> > > > pulsarClient.newProducer(Schema.STRING)
> > > >
> > > >                 .topic(topic).sendTimeout(0,
> > > > TimeUnit.SECONDS).enableBatching(false).create();
> > > >
> > > >         @Cleanup
> > > >
> > > >         Consumer<String> consumer =
> > > > pulsarClient.newConsumer(Schema.STRING).subscriptionType(SubscriptionType.Exclusive)
> > > >
> > > >                 .topic(topic).subscriptionName("test").subscribe();
> > > >
> > > >
> > > >         // send 5 message
> > > >
> > > >         for (int i = 0; i < 5; i++) {
> > > >
> > > >             producer.send("test" + i);
> > > >
> > > >         }
> > > >
> > > >
> > > >         // consumer receive 5 messages
> > > >
> > > >         for (int i = 0; i < 5; i++) {
> > > >
> > > >             consumer.receive();
> > > >
> > > >         }
> > > >
> > > >
> > > >         admin.topics().unload(topic);
> > > >
> > > >
> > > >         // consumer receive also can receive 5 messages
> > > >
> > > >         Message<String> message = null;
> > > >
> > > >         for (int i = 0; i < 5; i++) {
> > > >
> > > >             message = consumer.receive();
> > > >
> > > >         }
> > > >
> > > >         consumer.acknowledgeCumulativeAsync(message.getMessageId());
> > > >
> > > >     }
> > > >
> > > > ```
> > > >
> > > > Through the above example, the consumer repeatedly consumes the 5
> > > > messages sent by the producer, and acks through cumulative ack. If per
> > > > 1000, 10000 messages cumulative ack once, there will be a lot of
> > > > repeated consumption that may be caused by consumer reconnection.
> > > > Although it does not affect the semantics at at-least-once, it will
> > > > cause a lot of useless overhead.
> > > >
> > > >
> > > > Most importantly it destroys the semantics of pulsar transactions 
> > > > exactly-once.
> > > >
> > > >
> > > > I want to discuss whether we should fix normal and transaction
> > > > cumulative acks in the same way. Prevent repeated consumption of
> > > > messages due to consumer reconnection, and filter messages that users
> > > > have received through `consumer.receive()`. Or do we only guarantee
> > > > excetly-once semantics, only guarantee use transaction will not
> > > > receive the same messages by cumulative ack with the transaction?
> > > >
> > > >
> > > > Please leave your opinion, thanks! :)
> > > >
> > > >
> > > >
> > > > Thanks,
> > > >
> > > > Bo

Reply via email to