I have done a stab at a "Fair" StoreQueueCursor.

It would be great if we could discuss whether something like this is
desireable, and feasible to get into ActiveMQ.

I am not sure of the semantics wrt. the store - whether this could leave a
message stranded in case of powerloss or somesuch. I hope not, in that I do
not .remove() a message from the underlying "sub cursor" until remove() is
actually invoked on the Fair_StoreQueueCursor. I find the code a tad hard
to get my head fully around, i.e. the semantics of how these cursors, the
paging and prefetching, work together with the store

The code resides at the repo
https://github.com/stolsvik/activemq_priority_nonpersistent_issue, and the
README shows the interesting pieces of the two solutions. The
"SomewhatFair" is a very small change (just the one change in
getNextCursor(), alternating between the sub cursors), while the "Fair" is
rather more intrusive.

Kind regards,
Endre.


On Mon, Jun 19, 2023 at 11:29 PM Endre Stølsvik <en...@stolsvik.com> wrote:

> Hmm..
>
> You asked for code. As you hopefully saw, there was already code in my
> initial message - the minor suggested very simple change actually makes the
> situation better, by consuming "fairly" between the two buckets.
>
> And here's some more code - my research-repo for this, with a set of
> tests, and also a example the "hack solution" utilizing reflection to
> implement the suggested half-way fix:
> https://github.com/stolsvik/activemq_priority_nonpersistent_issue
>
> If you read the spec as ActiveMQ passing - even though you can effectively
> starve the "other side" - then ok. But AFAIU, it would be rather simple to
> get it much better: Use the implementation of the getNextCursor() I
> suggested - and it would actually still then be compliant with the spec as
> you explain it.
>
> It would also, by developers being more familiar with the ActiveMQ code,
> most probably be quite simple to make it fully and fairly handle such
> mixing - this would, as far as I have gotten into this, not be a large
> change. (If the "iterator" semantics hold all the way to the store, and
> messages are not removed from the store until the remove(msgId)-method is
> actually called, then it can possibly be implemented with the existing
> atoms present with no need for the "peek" functionality I initially
> suggested).
>
> (Note that even if the dequeuing was fully fair, you'd still not get
> "immediate prioritization" due to the page and prefetch: If a prioritized
> message was sent, but the paging had just fetched its 200 messages, and the
> client just filled up its prefetch of 1000, the client would still have to
> chug through these before any chance of getting the prioritized message.
> But it would at least not be fully starved, as seemingly can happen now).
>
> Note: You maintainers would obviously have to evaluate whether such a
> change could be shoved into a revision or minor version. Even though it
> would then behave much "nicer" wrt. mixing of persistence and priorities,
> existing users might have gotten some inadvertent dependencies on the
> existing behaviour.
>
> Kind regards,
> Endre.
>
> On Mon, Jun 19, 2023 at 10:51 PM Matt Pavlovich <mattr...@gmail.com>
> wrote:
>
>> Hi Endre-
>>
>> > On Jun 19, 2023, at 2:49 PM, Endre Stølsvik <en...@stolsvik.com> wrote:
>> >
>> > I have explained why. I have suggested a half-way fix, which for most
>> users
>> > probably would be better than the current situation. I have also
>> suggested
>> > the gist of a complete fix.
>>
>> The ActiveMQ v5.18.2 release is in progress, so it is not realistic to
>> expect a change to be included in the next release. The ActiveMQ
>> contribution process is pretty standard— make a JIRA, post a PR and some
>> tests for others to review and a discussion can be had. PR’s are welcome,
>> and will be reviewed. This request is one of those
>> source-code-is-worth-a-thousand-emails type deal.
>>
>> Keep in mind that the Queue configuration is already extensible without
>> changing package scope. There is nothing preventing you or another user
>> from extending an existing policy and writing it into a running broker.
>> Instead of extending (the very thin) QueueStorePrefetch class, you can
>> extend the AbstractStoreCursor — which is already public.
>>
>> In regards to ‘completely broken’ and ‘a feature of JMS’
>>
>> The spec does not call out that all features must be exactly supported
>> when *combined*. All providers must make trade-offs to support features.
>> For example, IBM MQ only supports configuring a queue for only one of
>> ‘priority’ or ‘ordering’. From that point of comparison, ActiveMQ is more
>> advanced as it allows both to occur on the same queue.
>>
>> JMS spec regard to priority and message ordering:
>>
>> 3.4.10 JMSPriority
>>
>> JMS does not require that a provider strictly implement priority ordering
>> of messages; however, it should do its best to deliver expedited messages
>> ahead of normal messages.
>> JMS spec regards to mixing persistent and non-persistent messages:
>>
>> 6.2.9.2 Order of message sends
>> …
>>      If both PERSISTENT and NON_PERSISTENT messages are sent to a
>> destination, order is only guaranteed within delivery mode.
>>      That is, a later NON_PERSISTENT message may arrive ahead of an
>> earlier PERSISTENT message; however, it will never arrive
>>      ahead of an earlier NON_PERSISTENT message with the same priority.
>> …
>>
>> I disagree with the statement that ActiveMQ is broken by default. Mixing
>> priority and ordering in the same destination is an anti-pattern. Mixing
>> persistence and non-persistence in the same destination is an anti-pattern.
>> Mixing both at the same time, as your use case suggests, is an extreme edge
>> case. ActiveMQ happily supports both these features in separate queues— and
>> has for a long time and meets the functionality as laid out in the JMS
>> specification.
>>
>> Best,
>> Matt Pavlovich
>>
>> > On Mon, Jun 19, 2023 at 4:01 PM Matt Pavlovich <mattr...@gmail.com>
>> wrote:
>> >
>> >> Hi Endre-
>> >>
>> >> Contributions are always welcome =).  Sample unit test scenarios and/or
>> >> PRs that demonstrate the issue and an approach to solve for it.
>> >>
>> >> That being said, keep in mind that any system that deals with the big
>> >> three— network I/O, disk I/O & CPU (like messaging and databases)
>> always
>> >> have to accept some level of trade-offs to accommodate different use
>> cases
>> >> and maintain efficiency.
>> >>
>> >> For this specific use case— this feels like one of those scenarios
>> where
>> >> the solution is better served by segmenting the message flow into
>> separate
>> >> queues by priority (and optionally priority+persistence mode). The
>> >> statements about wanting priority and message order maintained are
>> >> conflicting by nature. It suggests that this isn’t one single message
>> flow,
>> >> but "an application publishing several related message flows to one
>> >> destination.”
>> >>
>> >> If the volume of the use case can operate with pageSize=1 and
>> prefetch=1,
>> >> then message volume must be nominal.
>> >>
>> >> Recommendation:
>> >>
>> >> 1. Use a server-side CompositeQueue to fan out the messages by priority
>> >> (and possibly priority + persistence) to separate queues using filter
>> >> pattern (or use Camel, etc)
>> >>
>> >>    ie. queue://mats3.events -> filter and fan out to ->
>> >> queue://mats3.prio.0, queue://mats3.prio.1, queue://mats3.prio.2,
>> >> queue://mats3.prio.3, etc..
>> >>
>> >> 2. Optionally, (if multiple consumers are not feasible) use a
>> client-side
>> >> CompositeQueue (or wildcard queue) to have the application read from
>> all
>> >> the queues at once.
>> >>
>> >>     ie.
>> >> session.createConsumer(session.createQueue(“queue://mats3.prio.*”));
>> >>
>> >> This approach always puts the ’next’ message by priority into a queue
>> and
>> >> the consumers can be separately scaled and managed, You may want a
>> dedicate
>> >> thread for priority = 0, or adjust prefetch, etc.
>> >>
>> >> I suspect if you break up this “one flow” into “several flows” that
>> your
>> >> issues will be resolved and you won’t need to patch or maintain any
>> changes
>> >> to ActiveMQ itself.
>> >>
>> >> Hope this helps!
>> >>
>> >> Thanks,
>> >> Matt Pavlovich
>> >>
>> >>> On Jun 18, 2023, at 6:17 AM, Endre Stølsvik <en...@stolsvik.com>
>> wrote:
>> >>>
>> >>> Hi!
>> >>>
>> >>> tl;dr: It would be great if the
>> >> 'org.apache.activemq.broker.region.cursors.
>> >>> *QueueStorePrefetch*' could have its visibility increased from
>> package to
>> >>> public (take along the corresponding *TopicStorePrefetch* too while at
>> >>> it!). This so that I can make an alternative
>> *StoreQueueCursor*-variant
>> >>> outside of the package, i.e. in my own code. This so that I can get
>> >> around
>> >>> a starvation issue that this class exhibits when mixing persistent and
>> >>> non-persistent messages, which cannot be overcome by priorities.
>> >>>
>> >>> I am researching, and have identified, a problem with the
>> >> StoreQueueCursor.
>> >>> I've found a RedHat issue from 2014 (
>> >>> https://issues.redhat.com/browse/ENTMQ-872) which describes the
>> issue I
>> >>> angled in from (persistent messages are always "preferred" in front of
>> >>> non-persistent messages, even if the latter have high priority) - but
>> it
>> >>> turns out the problem is deeper than that.
>> >>>
>> >>> As described on the message-cursors page (
>> >>> https://activemq.apache.org/message-cursors), in the "paging for
>> >>> non-persistent messages" section and image, the store queue cursor has
>> >> two
>> >>> "buckets" to handle persistent and non-persistent messages.
>> >>>
>> >>> The problem arises from how it handles fetching messages from these
>> two
>> >>> buckets. Basically, it switches between these two buckets *only when
>> the
>> >>> current bucket is (effectively) empty.*
>> >>>
>> >>> This affects ordering *(if you on a producer alternate between
>> persistent
>> >>> and non-persistent messages, they will not be consumed in order, as
>> the
>> >>> "current bucket" will be emptied first)*, and can lead to starvation
>> >> *(the
>> >>> "current bucket" is emptied before switching, so if producers keep up,
>> >> you
>> >>> will never get a message from the 'opposite" bucket)*, and also thus
>> >>> effectively ignores prioritization *(since it doesn't even consider
>> the
>> >>> opposite bucket while the current is non-empty).*
>> >>>
>> >>> My situation is that in the library Mats3 (https://mats3.io/), one
>> often
>> >>> employ "interactive" messages (priority=9) *combined with*
>> non-persistent
>> >>> messaging - on the same queues. This then obviously leads to the
>> >> completely
>> >>> opposite result than the intention: The supposedly "fast, prioritized,
>> >> but
>> >>> not entirely reliable" safe or idempotent GET-style messages/commands
>> >> will
>> >>> be starved if there also are a batch of "ordinary" messages going on
>> >> using
>> >>> the same queues.
>> >>>
>> >>> I have come up with a minimal solution that fixes *my* problem: I
>> need to
>> >>> remove the starvation, and thus the ignoring of prioritization. But
>> this
>> >>> solution will possibly make the dispatch in-order situation worse.
>> What I
>> >>> do, is to change the 'getNextCursor()' method to *always* alternate
>> >> between
>> >>> the buckets if there are messages in both. That is, if there are
>> messages
>> >>> in the opposite bucket, then switch. This fixes much - *and is
>> probably
>> >>> better for most users, *without any discernible side effects.
>> >>>
>> >>> If the QueueStorePrefetch class was public, as all other classes in
>> this
>> >>> package except the corresponding TopicStorePrefetch is, then I could
>> have
>> >>> made an alternate implementation in my own code. I have actually
>> >>> successfully made an extension of the class now by using reflection on
>> >> the
>> >>> private fields and overriding the 'getNextCursor()' method (which
>> luckily
>> >>> is protected), but this is obviously not ideal.
>> >>>
>> >>> More detailed:
>> >>>
>> >>> The problem is the combination of these three methods:
>> >>>
>> >>> @Override
>> >>> public synchronized boolean hasNext() {
>> >>>   try {
>> >>>       getNextCursor();
>> >>>   } catch (Exception e) {
>> >>>       LOG.error("Failed to get current cursor ", e);
>> >>>       throw new RuntimeException(e);
>> >>>  }
>> >>>  return currentCursor != null ? currentCursor.hasNext() : false;
>> >>> }
>> >>>
>> >>> @Override
>> >>> public synchronized MessageReference next() {
>> >>>   MessageReference result = currentCursor != null ?
>> >>> currentCursor.next() : null;
>> >>>   return result;
>> >>> }
>> >>>
>> >>>
>> >>> protected synchronized PendingMessageCursor getNextCursor() throws
>> >> Exception {
>> >>>   if (currentCursor == null ||
>> >>> !currentCursor.hasMessagesBufferedToDeliver()) {
>> >>>       currentCursor = currentCursor == persistent ? nonPersistent :
>> >>> persistent;
>> >>>       // sanity check
>> >>>       if (currentCursor.isEmpty()) {
>> >>>           currentCursor = currentCursor == persistent ?
>> >>> nonPersistent : persistent;
>> >>>       }
>> >>>   }
>> >>>   return currentCursor;
>> >>> }
>> >>>
>> >>>
>> >>> If I change the getNextCursor to this, most things gets better:
>> >>>
>> >>> protected synchronized PendingMessageCursor getNextCursor() throws
>> >> Exception {
>> >>>   // ?: Sanity check that nonPersistent has been set, i.e. that
>> >>> start() has been invoked.
>> >>>   if (nonPersistent == null) {
>> >>>       // -> No, not set, so don't switch currentCursor to it - so
>> >>> that currentCursor never becomes null.
>> >>>       return currentCursor;
>> >>>   }
>> >>>
>> >>>   // Get opposite cursor:
>> >>>   PendingMessageCursor oppositeCursor = currentCursor == persistent
>> >>> ? nonPersistent : persistent;
>> >>>   // ?: Do we have any messages in the opposite?
>> >>>   if (oppositeCursor.hasNext()) {
>> >>>       // -> Yes, so do the switch
>> >>>       currentCursor = oppositeCursor;
>> >>>   }
>> >>>   return currentCursor;
>> >>> }
>> >>>
>> >>> .. but with this, producing a bunch of persistent messages, and then
>> >>> non-persistent, will lead to them being fetched alternating (even
>> though
>> >>> you wanted all the persistent first, then non-persistent). Then
>> again, if
>> >>> you did the opposite - produced a bunch of non-persistent, then
>> >> persistent
>> >>> - the current solution will first dequeue all the persistent. So, it's
>> >> bad
>> >>> anyhow.
>> >>>
>> >>> (As an aside, not for now: IMHO the defensive null-checking prevalent
>> in
>> >>> this class should also be removed.)
>> >>>
>> >>> Note: A complete solution would require a "peek" functionality to see
>> >> which
>> >>> bucket had the highest priority message, and identify a way to find
>> the
>> >>> order between two messages when their priority was equal. You'd then
>> >> always
>> >>> switch to the bucket with the "true next" message.
>> >>>
>> >>> *Note that to easily experience this, you should set both the
>> maxPageSize
>> >>> and client prefetch to 1. *Otherwise, it seems like several of these
>> >> issues
>> >>> are *masked* by either the layer above, or on the client - i.e. it
>> >>> reorders, and takes into consideration the prioritization. However,
>> when
>> >>> you produce thousands of messages, the page size of 200 and prefetch
>> of
>> >>> 1000 cannot mask it anymore, and the problem shows itself (in
>> production,
>> >>> obviously!). But it is harder to observe, and reason about, such large
>> >>> amounts of messages, thus setting these values to 1 gives you the full
>> >>> experience right away.
>> >>>
>> >>> I do have some code and tests for this if interesting - but for now,
>> it
>> >>> would be really nice if this one rather non-intrusive change was
>> >>> implemented ASAP, maybe even before 5.18.2? - so that I can implement
>> a
>> >>> better fix on our side than heavy-handed reflection of private fields:
>> >> That
>> >>> is, I humbly request that the classes QueueStorePrefetch (and
>> >> corresponding
>> >>> TopicStorePrefetch) are made public, so that one can make alternative
>> >>> implementations outside of the ActiveMQ code.
>> >>>
>> >>> PS: It seems like the corresponding topic side of this,
>> >>> StoreDurableSubscriberCursor with TopicStorePrefetch, have had some
>> work
>> >>> done for it in 2010 for probably the same type of issue, adding a
>> >>> "immediatePriorityDispatch" flag and corresponding functionality:
>> >> *"ensure
>> >>> new high priority messages get dispatched immediately rather than at
>> the
>> >>> end of the next batch, configurable via
>> >>> PendingDurableSubscriberMessageStoragePolicy.immediatePriorityDispatch
>> >>> default true, most relevant with prefetch=1"*. I don't fully
>> understand
>> >>> this solution, but can't shake the feeling that it is a literal patch
>> >>> instead of handling the underlying problem: Dequeuing from two
>> underlying
>> >>> queues ("buckets") must take into account the head of both, finding
>> the
>> >>> "true next" wrt. order and priority.
>> >>>
>> >>> Thanks a bunch,
>> >>> Kind regards,
>> >>> Endre Stølsvik.
>> >>
>> >>
>>
>>

Reply via email to