Hi Haiting,

In this case, the user wants to reset to the first message reconsume
all the messages, but the user can't receive a message anymore, the
`admin.topics().resetCursor(topic, subName, MessageId.earliest)` may
use HTTP to call, so it is wrong behavior.

Thanks,
Bo

Haiting Jiang <jianghait...@gmail.com> 于2022年9月15日周四 11:27写道:
>
> Hi Bo,
>
> > In this case, the consumer can not receive any message again in the
> > end. We have to fix it because it causes a loss of messages.
> > I think we need to redefine the use of cumulative acks and not just
> > solve the problem of receiving messages repeatedly.
> >
>
> There seems to be nothing wrong in this case. You already ack the last
> message, although, you reset the cursor before.
> Currently, we don't have a restriction on the message id user acked.
> They don't have to come from a valid message, and they can even be
> created directly by the user.
>
> Thanks,
> Haiting
>
> On Wed, Sep 14, 2022 at 8:34 PM 丛搏 <congbobo...@gmail.com> wrote:
> >
> > Hi Haiting, Michael ,
> >
> > I tested an example, and there may be some problems when using the sub
> > reset cursor and cumulative ack at the same time. e.g
> >
> > ```
> >
> > Message<byte[]> message = null;
> > for (int i = 0; i < 3; i++) {
> >     message = consumer.receive();
> > }
> > admin.topics().resetCursor(topic, subName, MessageId.earliest);
> >
> > consumer.acknowledgeCumulative(message.getMessageId());
> >
> > admin.topics().unload(topic);
> > consumer.receive();
> >
> > ```
> > In this case, the consumer can not receive any message again in the
> > end. We have to fix it because it causes a loss of messages.
> > I think we need to redefine the use of cumulative acks and not just
> > solve the problem of receiving messages repeatedly.
> >
> > Thanks!
> > Bo
> >
> > Michael Marshall <mmarsh...@apache.org> 于2022年9月9日周五 12:05写道:
> > >
> > > I agree that reducing unnecessary duplicates is a good goal.
> > >
> > > For the topic unloading case, it might help to think about how we can
> > > improve the protocol. The current handover is very blunt. A new
> > > solution could focus on decreasing duplicate message delivery while
> > > also focusing on decreasing time where the topic is unavailable.
> > >
> > > This thread is probably a good requirement when thinking about ways to
> > > improve the topic handover logic, which has been discussed as a
> > > potential load balancing enhancement or a 3.0 enhancement.
> > >
> > > > So we should be careful of any operations that would reset the cursor.
> > > > For example, if the user resets the cursor with the admin client. We
> > > > need more detail info on this matter.
> > >
> > > This is a great point! In this case, it seems important to redeliver
> > > these messages. (For those unfamiliar, I just confirmed that the
> > > broker disconnects consumers when a cursor is reset.)
> > >
> > > Thanks,
> > > Michael
> > >
> > > On Thu, Sep 8, 2022 at 9:33 PM Haiting Jiang <jianghait...@gmail.com> 
> > > wrote:
> > > >
> > > > Hi Bo,
> > > >
> > > > Overall it makes sense to me.
> > > > It is basically the same as broker side deduplication mechanism when
> > > > producing messages, which uses `sequenceId`.
> > > > In your case, messageId is used for deduplication. It should work as
> > > > long as the received messageId increases monotonically.
> > > >
> > > > So we should be careful of any operations that would reset the cursor.
> > > > For example, if the user resets the cursor with the admin client. We
> > > > need more detail info on this matter.
> > > > And I am not sure if there are other operations that would reset the
> > > > cursor implicitly.
> > > >
> > > > Thanks,
> > > > Haiting
> > > >
> > > > On Thu, Sep 8, 2022 at 11:36 PM 丛搏 <congbobo...@gmail.com> wrote:
> > > > >
> > > > > Hi Haiting
> > > > >
> > > > > When using cumulative ack, we can save the maximum received MessageId
> > > > > on the consumer client side to filter the message duplication caused
> > > > > by reconnection, if the consumer client process restarts the maximum
> > > > > received MessageId will not exist in the consumer client. This
> > > > > requires the user to be responsible for the received messages, and if
> > > > > the user wants to re-consume the received messages, they need to call
> > > > > `void redeliverUnacknowledgedMessages().` then clear the maximum
> > > > > received MessageId from the consumer client
> > > > >
> > > > > Thanks,
> > > > > Bo
> > > > >
> > > > > Haiting Jiang <jianghait...@gmail.com> 于2022年9月8日周四 14:51写道:
> > > > > >
> > > > > > From the user's perspective, I think we should always avoid 
> > > > > > delivering
> > > > > > repeated messages.
> > > > > > But can the broker tell if the reconnection is caused by topic
> > > > > > unloading or consumer client process restarting?
> > > > > > For the latter case, the message should be redelivered, it's the 
> > > > > > whole
> > > > > > point of user explicit acking.
> > > > > >
> > > > > > Thanks,
> > > > > > Haiting
> > > > > >
> > > > > > On Thu, Sep 8, 2022 at 10:56 AM 丛搏 <bog...@apache.org> wrote:
> > > > > > >
> > > > > > > Hello, Pulsar community:
> > > > > > >
> > > > > > >
> > > > > > > Now the consumer does not filter messages that have already been
> > > > > > > consumed. After consumer reconnection, the broker will dispatch
> > > > > > > messages to the consumer from the markDeletePosition. In Failover 
> > > > > > > and
> > > > > > > Exclusive subscription type, all messages in a topic will be
> > > > > > > dispatched to the same consumer. Let's look at the following 
> > > > > > > example:
> > > > > > >
> > > > > > > ```
> > > > > > >
> > > > > > >     @Test
> > > > > > >
> > > > > > >     public void testConsumerReconnectionRepeatedConsumeMessage()
> > > > > > > throws Exception {
> > > > > > >
> > > > > > >         final String topic = 
> > > > > > > "testConsumerReconnectionRepeatedConsumeMessage";
> > > > > > >
> > > > > > >         @Cleanup
> > > > > > >
> > > > > > >         Producer<String> producer = 
> > > > > > > pulsarClient.newProducer(Schema.STRING)
> > > > > > >
> > > > > > >                 .topic(topic).sendTimeout(0,
> > > > > > > TimeUnit.SECONDS).enableBatching(false).create();
> > > > > > >
> > > > > > >         @Cleanup
> > > > > > >
> > > > > > >         Consumer<String> consumer =
> > > > > > > pulsarClient.newConsumer(Schema.STRING).subscriptionType(SubscriptionType.Exclusive)
> > > > > > >
> > > > > > >                 
> > > > > > > .topic(topic).subscriptionName("test").subscribe();
> > > > > > >
> > > > > > >
> > > > > > >         // send 5 message
> > > > > > >
> > > > > > >         for (int i = 0; i < 5; i++) {
> > > > > > >
> > > > > > >             producer.send("test" + i);
> > > > > > >
> > > > > > >         }
> > > > > > >
> > > > > > >
> > > > > > >         // consumer receive 5 messages
> > > > > > >
> > > > > > >         for (int i = 0; i < 5; i++) {
> > > > > > >
> > > > > > >             consumer.receive();
> > > > > > >
> > > > > > >         }
> > > > > > >
> > > > > > >
> > > > > > >         admin.topics().unload(topic);
> > > > > > >
> > > > > > >
> > > > > > >         // consumer receive also can receive 5 messages
> > > > > > >
> > > > > > >         Message<String> message = null;
> > > > > > >
> > > > > > >         for (int i = 0; i < 5; i++) {
> > > > > > >
> > > > > > >             message = consumer.receive();
> > > > > > >
> > > > > > >         }
> > > > > > >
> > > > > > >         
> > > > > > > consumer.acknowledgeCumulativeAsync(message.getMessageId());
> > > > > > >
> > > > > > >     }
> > > > > > >
> > > > > > > ```
> > > > > > >
> > > > > > > Through the above example, the consumer repeatedly consumes the 5
> > > > > > > messages sent by the producer, and acks through cumulative ack. 
> > > > > > > If per
> > > > > > > 1000, 10000 messages cumulative ack once, there will be a lot of
> > > > > > > repeated consumption that may be caused by consumer reconnection.
> > > > > > > Although it does not affect the semantics at at-least-once, it 
> > > > > > > will
> > > > > > > cause a lot of useless overhead.
> > > > > > >
> > > > > > >
> > > > > > > Most importantly it destroys the semantics of pulsar transactions 
> > > > > > > exactly-once.
> > > > > > >
> > > > > > >
> > > > > > > I want to discuss whether we should fix normal and transaction
> > > > > > > cumulative acks in the same way. Prevent repeated consumption of
> > > > > > > messages due to consumer reconnection, and filter messages that 
> > > > > > > users
> > > > > > > have received through `consumer.receive()`. Or do we only 
> > > > > > > guarantee
> > > > > > > excetly-once semantics, only guarantee use transaction will not
> > > > > > > receive the same messages by cumulative ack with the transaction?
> > > > > > >
> > > > > > >
> > > > > > > Please leave your opinion, thanks! :)
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > Bo

Reply via email to