Hi Baodi,

Yes, in effect, I suggest that we have new Consumer interfaces, one per
subscription type perhaps then we can “correct” the current interface
without breaking backward compatibility.

For Exclusive/Failover, since the idea in those subscription types was to
maintain order, it makes sense we would offer the following:


   - receive() - get the following message
   - cumulativeAck(msgId) - acknowledge all messages up to msgId.
      - Maybe we can try to come up with a self-explanatory name like
      ackAllUpTo(msgId).



Like you I’m interested in knowing what the experienced folks in the
community think about this.


On 30 Nov 2022 at 4:43:22, Baodi Shi <baodi....@icloud.com.invalid> wrote:

> Hi, Asaf:
>
> Thank you for the comprehensive summary.
>
> So in effect, what you would have expected here is that nack(4) in
>
> exclusive/shared will happen immediately - clear queue, write redeliver
>
> command to broker async and return immediately, hence next receive() will
>
> block until messages have been received.
>
>
> In this way, the nack interface in Exclusive/Failover subscrptionn is also
> doesn’t make sense.
>
> For 1, 2, 3, 4, 5.
> If message 3 processing fails, the application can choose to wait for a
> period of time to process or directly ack this message (skip this message).
>
> I think we should disable nack under Exclusive/Failover subscription.
>
> 4. reconsumeLater(msg): ack existing message and write it to the same topic
>
> or a different one. This is an explicit out-of-order consumption, but it
>
> can be clearly stated in docs.
>
>
> Same as above, we should also disable it in Exclusive/Failover
> subscription.
>
> I think we should have a different consumer interface holding those
>
> commands above.
>
>
> It's a transformative idea. I'd like +1. See what other contributors think.
>
>
> 2022年11月30日 00:19,Asaf Mesika <asaf.mes...@gmail.com> 写道:
>
>
> Ok, I'll try to summarize what I read here to make sure we're all on the
>
> same page :)
>
>
> Exclusive and Failover subscription types are subscriptions that guarantee
>
> two things:
>
> 1. Single active consumer per topic (partition).
>
> 2. Message processing in the order they were written to the
>
> topic (partition).
>
>
> (1) is guaranteed by the broker by allowing only a single consumer per
>
> topic.
>
> (2) is guaranteed by the broker. Since we only have a single consumer, the
>
> only thing for the broker to take care of is delivery to messages precisely
>
> in the same order they received.
>
> Normal dispatching dispatches messages in the order written to the topic.
>
> When the consumer calls redeliverUnacknowledgedMessages(), it clears the
>
> incoming queue, and the broker rewinds the cursor to the mark delete
>
> position, disregarding any individual acks done after the mark delete. So
>
> messages are always delivered without any gaps.
>
>
> Since the queue is empty, the next receive() call will block until the
>
> broker redelivers the messages and fills the consumer's internal queue.
>
>
> The problem not raised in this discussion thread is the client
>
> implementation of negativeAcknowledgment().
>
> Negative Acknowledgment in today's implementation
>
>
> Adds the negatively-acked message into the NegativeAckTracker, and sets a
>
> timer, if not already present, to send all pending acks in X seconds. Once
>
> that time is up, it sees that negative ack belongs on an Exclusive/Failover
>
> subscription type and hence translates that into
>
> redeliverUnacknowledgedMessages(). So in X seconds, it clears the queue and
>
> asks for messages to be redelivered. Since adding to NegativeAckTracker is
>
> an immediate action (add a message to the queue and return), it just
>
> returns. If you receive() 1,2,3, call nack(4) and then receive() and get
>
> 4,5,6,7,... After X seconds pass, your next receive suddenly gives you
>
> 4,5,6 again.
>
>
> So in effect, what you would have expected here is that nack(4) in
>
> exclusive/shared will happen immediately - clear queue, write redeliver
>
> command to broker async and return immediately, hence next receive() will
>
> block until messages have been received.
>
>
>
> I do side with the suggestion to change the API for exclusive / shared to
>
> be more clear.
>
> In those types of subscriptions, it seems that the only actions you are
>
> supposed to do are:
>
>
> 1. receive(): get the next message.
>
> 2. cumulativeAck(msg): acknowledge all messages up to msg have been
>
> successfully processed.
>
> 3. redeliverUnacknowledgedMessages() - clear the internal queue and ask the
>
> broker to resend messages from the last mark delete position.
>
>
> There is one additional action in which you explicitly push the messages to
>
> a different topic or even the same topic, and that is:
>
> 4. reconsumeLater(msg): ack existing message and write it to the same topic
>
> or a different one. This is an explicit out-of-order consumption, but it
>
> can be clearly stated in docs.
>
>
> I think we should have a different consumer interface holding those
>
> commands above.
>
>
>
>
> On Thu, Nov 24, 2022 at 1:43 PM 丛搏 <congbobo...@gmail.com> wrote:
>
>
> > Hi, Joe:
>
> >
>
> >> This "brokenness" is not clear to me.
>
> > https://github.com/apache/pulsar/pull/10478 This PIP solves some
>
> > problems of "brokenness",
>
> >> The sequence 3,4,5,6,7,8,9,10,11 12,13,14,15, 16
>
> > ,9,10,11,12,13,14,15,16,17, 18, 19, 20 ...does not break
>
> >> the ordering guarantees of Pulsar
>
> > If don't use transaction ack, this order is fine. but when we use
>
> > transaction ack, in this case, message 9 and message 10 will be
>
> > handled twice. Therefore, we need redeliver and receive to be
>
> > synchronized to ensure that messages received before redeliver will
>
> > not be repeated and ordered, and will not be repeatedly consumed after
>
> > redeliver. To achieve these goals, we need to redeliver to be a
>
> > synchronous method instead of async and need to retry automatically.
>
> >
>
>
>

Reply via email to