Hi, Asaf, Baodi:

I'm very sorry for my late reply. Thanks for your discussion.

> - receive() - get the following message
>   - cumulativeAck(msgId) - acknowledge all messages up to msgId.
>     - Maybe we can try to come up with a self-explanatory name like
>     ackAllUpTo(msgId).

If the user wants the messages in order, the `receive ()` and
`cumulativeAck ()` must be in a single thread. Otherwise, the
`cumulativeAck` will lose its meaning.

If users use cumulative ack code like:
```
while (true) {
    Message<String> message = consumer.receive();
    process(message);
    consumer.acknowledgeCumulative(message.getMessageId());
}
```
I think it is not a good way for users to use `acknowledgeCumulative
`. because one message doesn't need `cumulativeAck`, it's meaningless.
They use `acknowledgeCumulative ` should like code:
```
while (true) {
    Messages<String> messages = consumer.batchReceive();
    process(messages);
    consumer.acknowledgeCumulative(messages);
}
```
then we should think about when `process(messages);` throw any
exception, the user how to reprocess this message.

1. one case is the user reprocess these messages, the
`process(messages)` code like:
```
private void process(Messages<String> messages) {
    try {
        // so something
    } catch (Exception e) {
        process(messages);
    }
};
```
in this way, the consumer doesn't need to do anything

2. pulsar rewind the cursor, and redeliver these messages

```
        while (true) {
            Messages<String> messages = consumer.batchReceive();
            try {
                process(messages);
            } catch (Exception e) {

                consumer.rewind(); // this method can redeliver the
messages, whatever the name of this method. before this method
succeeds, the consumer can't invoke consumer.batchReceive() again.
                 continue;
            }
            consumer.acknowledgeCumulative(messages);
        }
```
int this way, the consumer needs a method that can redeliver these
messages, `redeliverUnacknowledgedMessages` is an async method that
can't guarantee the messages are in order. so we need a new method,
and it is a sync method.
<<<<<<<<<<<<<

In the above two solutions, it can keep messages in order. but in the
first solution, we don't know how many messages the user process and
then cumulative once. If the message numbers are 10000000, maybe the
user can't store the message in the memory for reprocessing. so users
need a method to redeliver these messages.

< I think we should disable nack under Exclusive/Failover subscription.

Failover also can be individual ack, so we can't disable
`reconumserLate`r and `negativeAcknowledge`.

Thanks,
Bo

Asaf Mesika <asaf.mes...@gmail.com> 于2022年12月18日周日 18:36写道:
>
> Hi Baodi,
>
> Yes, in effect, I suggest that we have new Consumer interfaces, one per
> subscription type perhaps then we can “correct” the current interface
> without breaking backward compatibility.
>
> For Exclusive/Failover, since the idea in those subscription types was to
> maintain order, it makes sense we would offer the following:
>
>
>    - receive() - get the following message
>    - cumulativeAck(msgId) - acknowledge all messages up to msgId.
>       - Maybe we can try to come up with a self-explanatory name like
>       ackAllUpTo(msgId).
>
>
>
> Like you I’m interested in knowing what the experienced folks in the
> community think about this.
>
>
> On 30 Nov 2022 at 4:43:22, Baodi Shi <baodi....@icloud.com.invalid> wrote:
>
> > Hi, Asaf:
> >
> > Thank you for the comprehensive summary.
> >
> > So in effect, what you would have expected here is that nack(4) in
> >
> > exclusive/shared will happen immediately - clear queue, write redeliver
> >
> > command to broker async and return immediately, hence next receive() will
> >
> > block until messages have been received.
> >
> >
> > In this way, the nack interface in Exclusive/Failover subscrptionn is also
> > doesn’t make sense.
> >
> > For 1, 2, 3, 4, 5.
> > If message 3 processing fails, the application can choose to wait for a
> > period of time to process or directly ack this message (skip this message).
> >
> > I think we should disable nack under Exclusive/Failover subscription.
> >
> > 4. reconsumeLater(msg): ack existing message and write it to the same topic
> >
> > or a different one. This is an explicit out-of-order consumption, but it
> >
> > can be clearly stated in docs.
> >
> >
> > Same as above, we should also disable it in Exclusive/Failover
> > subscription.
> >
> > I think we should have a different consumer interface holding those
> >
> > commands above.
> >
> >
> > It's a transformative idea. I'd like +1. See what other contributors think.
> >
> >
> > 2022年11月30日 00:19,Asaf Mesika <asaf.mes...@gmail.com> 写道:
> >
> >
> > Ok, I'll try to summarize what I read here to make sure we're all on the
> >
> > same page :)
> >
> >
> > Exclusive and Failover subscription types are subscriptions that guarantee
> >
> > two things:
> >
> > 1. Single active consumer per topic (partition).
> >
> > 2. Message processing in the order they were written to the
> >
> > topic (partition).
> >
> >
> > (1) is guaranteed by the broker by allowing only a single consumer per
> >
> > topic.
> >
> > (2) is guaranteed by the broker. Since we only have a single consumer, the
> >
> > only thing for the broker to take care of is delivery to messages precisely
> >
> > in the same order they received.
> >
> > Normal dispatching dispatches messages in the order written to the topic.
> >
> > When the consumer calls redeliverUnacknowledgedMessages(), it clears the
> >
> > incoming queue, and the broker rewinds the cursor to the mark delete
> >
> > position, disregarding any individual acks done after the mark delete. So
> >
> > messages are always delivered without any gaps.
> >
> >
> > Since the queue is empty, the next receive() call will block until the
> >
> > broker redelivers the messages and fills the consumer's internal queue.
> >
> >
> > The problem not raised in this discussion thread is the client
> >
> > implementation of negativeAcknowledgment().
> >
> > Negative Acknowledgment in today's implementation
> >
> >
> > Adds the negatively-acked message into the NegativeAckTracker, and sets a
> >
> > timer, if not already present, to send all pending acks in X seconds. Once
> >
> > that time is up, it sees that negative ack belongs on an Exclusive/Failover
> >
> > subscription type and hence translates that into
> >
> > redeliverUnacknowledgedMessages(). So in X seconds, it clears the queue and
> >
> > asks for messages to be redelivered. Since adding to NegativeAckTracker is
> >
> > an immediate action (add a message to the queue and return), it just
> >
> > returns. If you receive() 1,2,3, call nack(4) and then receive() and get
> >
> > 4,5,6,7,... After X seconds pass, your next receive suddenly gives you
> >
> > 4,5,6 again.
> >
> >
> > So in effect, what you would have expected here is that nack(4) in
> >
> > exclusive/shared will happen immediately - clear queue, write redeliver
> >
> > command to broker async and return immediately, hence next receive() will
> >
> > block until messages have been received.
> >
> >
> >
> > I do side with the suggestion to change the API for exclusive / shared to
> >
> > be more clear.
> >
> > In those types of subscriptions, it seems that the only actions you are
> >
> > supposed to do are:
> >
> >
> > 1. receive(): get the next message.
> >
> > 2. cumulativeAck(msg): acknowledge all messages up to msg have been
> >
> > successfully processed.
> >
> > 3. redeliverUnacknowledgedMessages() - clear the internal queue and ask the
> >
> > broker to resend messages from the last mark delete position.
> >
> >
> > There is one additional action in which you explicitly push the messages to
> >
> > a different topic or even the same topic, and that is:
> >
> > 4. reconsumeLater(msg): ack existing message and write it to the same topic
> >
> > or a different one. This is an explicit out-of-order consumption, but it
> >
> > can be clearly stated in docs.
> >
> >
> > I think we should have a different consumer interface holding those
> >
> > commands above.
> >
> >
> >
> >
> > On Thu, Nov 24, 2022 at 1:43 PM 丛搏 <congbobo...@gmail.com> wrote:
> >
> >
> > > Hi, Joe:
> >
> > >
> >
> > >> This "brokenness" is not clear to me.
> >
> > > https://github.com/apache/pulsar/pull/10478 This PIP solves some
> >
> > > problems of "brokenness",
> >
> > >> The sequence 3,4,5,6,7,8,9,10,11 12,13,14,15, 16
> >
> > > ,9,10,11,12,13,14,15,16,17, 18, 19, 20 ...does not break
> >
> > >> the ordering guarantees of Pulsar
> >
> > > If don't use transaction ack, this order is fine. but when we use
> >
> > > transaction ack, in this case, message 9 and message 10 will be
> >
> > > handled twice. Therefore, we need redeliver and receive to be
> >
> > > synchronized to ensure that messages received before redeliver will
> >
> > > not be repeated and ordered, and will not be repeatedly consumed after
> >
> > > redeliver. To achieve these goals, we need to redeliver to be a
> >
> > > synchronous method instead of async and need to retry automatically.
> >
> > >
> >
> >
> >

Reply via email to