Thanks for the clarification

I have left comments on the PR.
PLAT.

Thanks,
Penghui

On Sat, May 7, 2022 at 9:56 PM Enrico Olivelli <eolive...@gmail.com> wrote:

> Peng Hui
>
> Il giorno ven 6 mag 2022 alle ore 12:54 PengHui Li
> <peng...@apache.org> ha scritto:
> >
> > > The problem is that If I use multiple subscriptions then I am not
> > guaranteed that each message is delivered only to one consumer.
> >
> > It is a weird use case, but it is something that is possible according
> > to the JMS specs.
> > I am mapping a JMS Queue with a Subscription, and everything works well,
> > but you can have multiple Consumers, with different Selectors, that
> > compete on the same Queue (Subscription).
> >
> > You can have two Consumers with overlapping selectors and we have to
> > guarantee that each message is processed
> > only by one of the two consumers.
> > This is why I need consumer level filtering and not only
> > per-subscription filters.
> >
> > It is an edge case and this is why I don't want to build a brand new
> > dispatcher to cover this feature.
> >
> > Currently, without server-side filtering, my implementation uses
> > client side negative acks for a Consumer to reject the message and
> > leave
> > it to the other consumers attached to the subscription.
> >
> > Oh I see. I understand the difference now.
> >
> > Looks like the proposal is a more simple way to cope with the
> requirement.
> >
> > > How do we handle the case that all the consumers `RESCHEDULE` the
> message?
> > Some consumers cannot start normally in a period of time or
> user-specified
> > matching
> > rules cannot cover all messages.
> >
> > How to handle this case currently? I think we need to ensure the dispatch
> > thread will not
> > run into an infinite loop.
>
> The infinite loop is not happening for two reasons in the current
> implementation in my PR:
> - at each run of the dispatcher we select a List<Entry> and we process
> each entry only once, we are not re-adding the entry to the list of
> entries to process immediately
> - when we do RESCHEDULE we do the negative ack with a delay
> (simulating the client side negative acknowledgement delay), this
> allows the dispatcher to select new entries at the next round
>
> If it happens that there is no Consumer that is able to process the
> message we get to the same situation we have in Key Shared
> subscription when there is no consumer available for a given key:
> the dispatcher cannot make progress, but it is not in a busy loop.
>
> I hope that this clarifies
>
> Enrico
>
>
> >
> > Thanks,
> > Penghui
> >
> > On Fri, May 6, 2022 at 5:45 PM Haiting Jiang <jianghait...@apache.org>
> > wrote:
> >
> > > It looks like we can add something like  `EntryDispatcher`  before the
> > > EntryFilter.
> > > Mixing entry filtering and consumer selecting seems a little confusing.
> > >
> > > The `EntryDispatcher` could works as a consumer selector in
> > > `PersistentDispatcherMultipleConsumers`.
> > > It accepts an entry and a consumer list, returns the consumer this
> entry
> > > should dispatch to.
> > > The implementation could be provided by user like  EntryDispatcher.
> > >
> > > Thanks,
> > > Haiting
> > >
> > > On 2022/05/05 12:41:10 Enrico Olivelli wrote:
> > > > Hello,
> > > > I am trying to use PIP-105 and I found out that we are missing a few
> > > > little things to cover my user case.
> > > > In my case I have two consumers who compete on the same SHARED
> > > > subscription with a "message filter".
> > > > The filter is passed as Consumer metadata.
> > > >
> > > > When you have two Consumers connected on the Subscription the
> > > > dispatcher prepares to send the message to one consumer at a time.
> > > >
> > > > The Message goes through the EntryFilter that decides if the Entry
> > > > matches the requirements of the Consumer.
> > > > - if the message matches the consumer then it returns ACCEPT
> > > > - if the message does not match the consumer then it has to be
> > > > rescheduled (RESCHEDULE)
> > > >
> > > > With this small extension to PIP-105 we can cover this simple
> scenario
> > > > without the need to introduce a new Dispatcher policy
> > > >
> > > > I sent out a patch with the implementation and a test case that
> shows my
> > > usecase
> > > > https://github.com/apache/pulsar/pull/15391
> > > >
> > > > Introducing RESCHEDULE needs some level of discussion here.
> > > >
> > > > With PIP-105 we are anticipating in the broker a decision that the
> > > > Consumer would take when the Message is already dispatched to the
> > > > application:
> > > > A) ignore the message: acknowledge immediately, without processing.
> > > (REJECT)
> > > > B) postpone the message (or let it be processed from another
> > > > consumer): negatively acknowledge immediately, without processing.
> > > > (RESCHEDULE)
> > > >
> > > > With the initial implementation of PIP-105 we are covering case A,
> and
> > > > with my proposal I want to give the opportunity to implement case B.
> > > >
> > > > The only point that is not covered by my proposal is that the NACK on
> > > > the client happens only after a delay on the client, and this has
> some
> > > > side effects.
> > > > In fact that "delay" from the client allows the dispatcher to read
> > > > more entries because it thinks that the message has been dispatched
> > > > successfully, and it is allowed to move forward.
> > > > I would prefer to start a separate discussion for this "problem",
> that
> > > > is in part related to how we deal with messages to be replayed and it
> > > > is not strictly related to my PR.
> > > >
> > > >
> > > > Cheers
> > > > Enrico
> > > >
> > >
>

Reply via email to