Hi Endre- > On Jun 19, 2023, at 2:49 PM, Endre Stølsvik <en...@stolsvik.com> wrote: > > I have explained why. I have suggested a half-way fix, which for most users > probably would be better than the current situation. I have also suggested > the gist of a complete fix.
The ActiveMQ v5.18.2 release is in progress, so it is not realistic to expect a change to be included in the next release. The ActiveMQ contribution process is pretty standard— make a JIRA, post a PR and some tests for others to review and a discussion can be had. PR’s are welcome, and will be reviewed. This request is one of those source-code-is-worth-a-thousand-emails type deal. Keep in mind that the Queue configuration is already extensible without changing package scope. There is nothing preventing you or another user from extending an existing policy and writing it into a running broker. Instead of extending (the very thin) QueueStorePrefetch class, you can extend the AbstractStoreCursor — which is already public. In regards to ‘completely broken’ and ‘a feature of JMS’ The spec does not call out that all features must be exactly supported when *combined*. All providers must make trade-offs to support features. For example, IBM MQ only supports configuring a queue for only one of ‘priority’ or ‘ordering’. From that point of comparison, ActiveMQ is more advanced as it allows both to occur on the same queue. JMS spec regard to priority and message ordering: 3.4.10 JMSPriority JMS does not require that a provider strictly implement priority ordering of messages; however, it should do its best to deliver expedited messages ahead of normal messages. JMS spec regards to mixing persistent and non-persistent messages: 6.2.9.2 Order of message sends … If both PERSISTENT and NON_PERSISTENT messages are sent to a destination, order is only guaranteed within delivery mode. That is, a later NON_PERSISTENT message may arrive ahead of an earlier PERSISTENT message; however, it will never arrive ahead of an earlier NON_PERSISTENT message with the same priority. … I disagree with the statement that ActiveMQ is broken by default. Mixing priority and ordering in the same destination is an anti-pattern. Mixing persistence and non-persistence in the same destination is an anti-pattern. Mixing both at the same time, as your use case suggests, is an extreme edge case. ActiveMQ happily supports both these features in separate queues— and has for a long time and meets the functionality as laid out in the JMS specification. Best, Matt Pavlovich > On Mon, Jun 19, 2023 at 4:01 PM Matt Pavlovich <mattr...@gmail.com> wrote: > >> Hi Endre- >> >> Contributions are always welcome =). Sample unit test scenarios and/or >> PRs that demonstrate the issue and an approach to solve for it. >> >> That being said, keep in mind that any system that deals with the big >> three— network I/O, disk I/O & CPU (like messaging and databases) always >> have to accept some level of trade-offs to accommodate different use cases >> and maintain efficiency. >> >> For this specific use case— this feels like one of those scenarios where >> the solution is better served by segmenting the message flow into separate >> queues by priority (and optionally priority+persistence mode). The >> statements about wanting priority and message order maintained are >> conflicting by nature. It suggests that this isn’t one single message flow, >> but "an application publishing several related message flows to one >> destination.” >> >> If the volume of the use case can operate with pageSize=1 and prefetch=1, >> then message volume must be nominal. >> >> Recommendation: >> >> 1. Use a server-side CompositeQueue to fan out the messages by priority >> (and possibly priority + persistence) to separate queues using filter >> pattern (or use Camel, etc) >> >> ie. queue://mats3.events -> filter and fan out to -> >> queue://mats3.prio.0, queue://mats3.prio.1, queue://mats3.prio.2, >> queue://mats3.prio.3, etc.. >> >> 2. Optionally, (if multiple consumers are not feasible) use a client-side >> CompositeQueue (or wildcard queue) to have the application read from all >> the queues at once. >> >> ie. >> session.createConsumer(session.createQueue(“queue://mats3.prio.*”)); >> >> This approach always puts the ’next’ message by priority into a queue and >> the consumers can be separately scaled and managed, You may want a dedicate >> thread for priority = 0, or adjust prefetch, etc. >> >> I suspect if you break up this “one flow” into “several flows” that your >> issues will be resolved and you won’t need to patch or maintain any changes >> to ActiveMQ itself. >> >> Hope this helps! >> >> Thanks, >> Matt Pavlovich >> >>> On Jun 18, 2023, at 6:17 AM, Endre Stølsvik <en...@stolsvik.com> wrote: >>> >>> Hi! >>> >>> tl;dr: It would be great if the >> 'org.apache.activemq.broker.region.cursors. >>> *QueueStorePrefetch*' could have its visibility increased from package to >>> public (take along the corresponding *TopicStorePrefetch* too while at >>> it!). This so that I can make an alternative *StoreQueueCursor*-variant >>> outside of the package, i.e. in my own code. This so that I can get >> around >>> a starvation issue that this class exhibits when mixing persistent and >>> non-persistent messages, which cannot be overcome by priorities. >>> >>> I am researching, and have identified, a problem with the >> StoreQueueCursor. >>> I've found a RedHat issue from 2014 ( >>> https://issues.redhat.com/browse/ENTMQ-872) which describes the issue I >>> angled in from (persistent messages are always "preferred" in front of >>> non-persistent messages, even if the latter have high priority) - but it >>> turns out the problem is deeper than that. >>> >>> As described on the message-cursors page ( >>> https://activemq.apache.org/message-cursors), in the "paging for >>> non-persistent messages" section and image, the store queue cursor has >> two >>> "buckets" to handle persistent and non-persistent messages. >>> >>> The problem arises from how it handles fetching messages from these two >>> buckets. Basically, it switches between these two buckets *only when the >>> current bucket is (effectively) empty.* >>> >>> This affects ordering *(if you on a producer alternate between persistent >>> and non-persistent messages, they will not be consumed in order, as the >>> "current bucket" will be emptied first)*, and can lead to starvation >> *(the >>> "current bucket" is emptied before switching, so if producers keep up, >> you >>> will never get a message from the 'opposite" bucket)*, and also thus >>> effectively ignores prioritization *(since it doesn't even consider the >>> opposite bucket while the current is non-empty).* >>> >>> My situation is that in the library Mats3 (https://mats3.io/), one often >>> employ "interactive" messages (priority=9) *combined with* non-persistent >>> messaging - on the same queues. This then obviously leads to the >> completely >>> opposite result than the intention: The supposedly "fast, prioritized, >> but >>> not entirely reliable" safe or idempotent GET-style messages/commands >> will >>> be starved if there also are a batch of "ordinary" messages going on >> using >>> the same queues. >>> >>> I have come up with a minimal solution that fixes *my* problem: I need to >>> remove the starvation, and thus the ignoring of prioritization. But this >>> solution will possibly make the dispatch in-order situation worse. What I >>> do, is to change the 'getNextCursor()' method to *always* alternate >> between >>> the buckets if there are messages in both. That is, if there are messages >>> in the opposite bucket, then switch. This fixes much - *and is probably >>> better for most users, *without any discernible side effects. >>> >>> If the QueueStorePrefetch class was public, as all other classes in this >>> package except the corresponding TopicStorePrefetch is, then I could have >>> made an alternate implementation in my own code. I have actually >>> successfully made an extension of the class now by using reflection on >> the >>> private fields and overriding the 'getNextCursor()' method (which luckily >>> is protected), but this is obviously not ideal. >>> >>> More detailed: >>> >>> The problem is the combination of these three methods: >>> >>> @Override >>> public synchronized boolean hasNext() { >>> try { >>> getNextCursor(); >>> } catch (Exception e) { >>> LOG.error("Failed to get current cursor ", e); >>> throw new RuntimeException(e); >>> } >>> return currentCursor != null ? currentCursor.hasNext() : false; >>> } >>> >>> @Override >>> public synchronized MessageReference next() { >>> MessageReference result = currentCursor != null ? >>> currentCursor.next() : null; >>> return result; >>> } >>> >>> >>> protected synchronized PendingMessageCursor getNextCursor() throws >> Exception { >>> if (currentCursor == null || >>> !currentCursor.hasMessagesBufferedToDeliver()) { >>> currentCursor = currentCursor == persistent ? nonPersistent : >>> persistent; >>> // sanity check >>> if (currentCursor.isEmpty()) { >>> currentCursor = currentCursor == persistent ? >>> nonPersistent : persistent; >>> } >>> } >>> return currentCursor; >>> } >>> >>> >>> If I change the getNextCursor to this, most things gets better: >>> >>> protected synchronized PendingMessageCursor getNextCursor() throws >> Exception { >>> // ?: Sanity check that nonPersistent has been set, i.e. that >>> start() has been invoked. >>> if (nonPersistent == null) { >>> // -> No, not set, so don't switch currentCursor to it - so >>> that currentCursor never becomes null. >>> return currentCursor; >>> } >>> >>> // Get opposite cursor: >>> PendingMessageCursor oppositeCursor = currentCursor == persistent >>> ? nonPersistent : persistent; >>> // ?: Do we have any messages in the opposite? >>> if (oppositeCursor.hasNext()) { >>> // -> Yes, so do the switch >>> currentCursor = oppositeCursor; >>> } >>> return currentCursor; >>> } >>> >>> .. but with this, producing a bunch of persistent messages, and then >>> non-persistent, will lead to them being fetched alternating (even though >>> you wanted all the persistent first, then non-persistent). Then again, if >>> you did the opposite - produced a bunch of non-persistent, then >> persistent >>> - the current solution will first dequeue all the persistent. So, it's >> bad >>> anyhow. >>> >>> (As an aside, not for now: IMHO the defensive null-checking prevalent in >>> this class should also be removed.) >>> >>> Note: A complete solution would require a "peek" functionality to see >> which >>> bucket had the highest priority message, and identify a way to find the >>> order between two messages when their priority was equal. You'd then >> always >>> switch to the bucket with the "true next" message. >>> >>> *Note that to easily experience this, you should set both the maxPageSize >>> and client prefetch to 1. *Otherwise, it seems like several of these >> issues >>> are *masked* by either the layer above, or on the client - i.e. it >>> reorders, and takes into consideration the prioritization. However, when >>> you produce thousands of messages, the page size of 200 and prefetch of >>> 1000 cannot mask it anymore, and the problem shows itself (in production, >>> obviously!). But it is harder to observe, and reason about, such large >>> amounts of messages, thus setting these values to 1 gives you the full >>> experience right away. >>> >>> I do have some code and tests for this if interesting - but for now, it >>> would be really nice if this one rather non-intrusive change was >>> implemented ASAP, maybe even before 5.18.2? - so that I can implement a >>> better fix on our side than heavy-handed reflection of private fields: >> That >>> is, I humbly request that the classes QueueStorePrefetch (and >> corresponding >>> TopicStorePrefetch) are made public, so that one can make alternative >>> implementations outside of the ActiveMQ code. >>> >>> PS: It seems like the corresponding topic side of this, >>> StoreDurableSubscriberCursor with TopicStorePrefetch, have had some work >>> done for it in 2010 for probably the same type of issue, adding a >>> "immediatePriorityDispatch" flag and corresponding functionality: >> *"ensure >>> new high priority messages get dispatched immediately rather than at the >>> end of the next batch, configurable via >>> PendingDurableSubscriberMessageStoragePolicy.immediatePriorityDispatch >>> default true, most relevant with prefetch=1"*. I don't fully understand >>> this solution, but can't shake the feeling that it is a literal patch >>> instead of handling the underlying problem: Dequeuing from two underlying >>> queues ("buckets") must take into account the head of both, finding the >>> "true next" wrt. order and priority. >>> >>> Thanks a bunch, >>> Kind regards, >>> Endre Stølsvik. >> >>