The issue with selectors is because the broker can't do random access into a persistent store, so there's a cursor that reads the earliest N messages. Within that (large) batch, things like priorities, selectors, and message groups are respected. But if the cursor is full of messages of one type but the full store contains messages of other types, the other messages can't be evaluated until enough messages of the first type are consumed to cause the other messages to be read into the cursor (because they're now in the first N). What's implemented is a trade-off, allowing the broker to only read each message once and linearly (which will be more efficient for some message stores) but ultimately causing more problems than I think the performance and the simplicity are worth. One partial workaround is to make the cursor bigger, but since it still has to fit in memory it's not a full cure. The other is to have enough consumers who are online at any time to ensure that messages never back up beyond what the cursor can hold, which is possible for the use cases you've previously described but might cost you money for more hardware.
Using message groups will probably be more flexible than using hard-coded buckets, though the buckets will mean each cursor is less likely to get full of messages for only a few consumers. Also, if you search this list for threads about message groups you'll see a feature suggestion for rebalancing groups across consumers which might be important at the volumes you typically handle. I don't think anyone has tackled it yet, though if you discover you have to have it and you implement something and are willing to contribute it back, we'd gladly take it. Rolling back your messages till you either have a full batch or you DLQ them seems like not an ideal solution. You'll process the message repeatedly (so it's inefficient) and your decision to give up is based on an arbitrary number of attempts, not whether you have space to keep holding the message, how much time has elapsed, etc. Better to consume once but not ack till the whole batch has arrived. The only problem with INDIVIDUAL_ACK is that there is no way to nack a message individually, so your only option is to roll back the consumer and nack everything all at once, including stuff you don't actually want to nack. I submitted an enhancement request for individual nacks ( https://issues.apache.org/jira/browse/AMQ-6098, http://activemq.2283324.n4.nabble.com/How-to-use-INDIVIDUAL-ACKNOWLEDGE-mode-td4704793.html), but Tim Bish was unconvinced that we should implement the capability on the grounds that it's not required by the JMS spec (we're allowed to extend the spec in non-standard ways, so that's not a convincing argument) and to the best of my knowledge no one (him, me, or anyone else) has made any attempt to implement it since. But with that feature implemented, I think INDIVIDUAL_ACK plus message groups plus enough consumers to keep all messages in memory is the way to go; without it, things get much harder, and maybe you need one destination per message group with a Camel route to pull messages from a queue and move them to the right per-group destination? Tim Kevin Burton wrote: I think part of this is that I'm unsure how some of the ActiveMQ internals work here. For example, I've run into problems with message selectors where they don't operate properly if one message subset produces fewer messages than the other message subsets due to the way the 'head of queue' system works in ActiveMQ. the memory gets choked out and so you end up with some subset of your messages having higher latencies. Another thing I was considering was building sort of a bucket system similar to the way storm operates or DHTs operate where we pre-allocate like 20k 'buckets' and put messages into these buckets. This way a consumer only sees the buckets its responsible and some smaller amount of volume. It can then group the messages in bulk when it receives them all. Another way to handle the incomple groups, IE if some of them get dropped .. is to keep rolling them back until they go into the DLQ... On Sun, Jan 24, 2016 at 2:06 PM, Tim Bain <[email protected]> wrote: > I'd use message groups for the messages produced by A, ensuring that all 15 > go to a single B consumer. Then B can use CLIENT_ACK or INDIVIDUAL_ACK > mode to only ack the messages when all 15 have been received (holding them > in memory and unacknowledged until all 15 arrive), and then B can publish a > single message to be consumed by C that encompasses all of the information > from the 15 messages. This only requires two queues (AB for messages > between A and B, and BC for messages between B and C), so it'll be simpler > to manage than having multiple aggregation_N queues. > > You'll want to consider what happens if you never get all 15 messages (I'd > eventually write them into a database so the batch can be reassembled later > if they eventually show up), as well as how to do your acking if a single B > consumer can get interleaved message groups (which is why I'd suggest > considering INDIVIDUAL_ACK mode). Also, although I agree that doing all of > this in a database isn't likely to be performant, I think any database > (Cassandra, Oracle, whatever) would be able to handle just your > incomplete-groups use case, since the percentage of groups that end up > routed through that mechanism is presumably small. > > All of this assumes that most message groups will be completed quickly > enough that it's reasonable to hold all messages from most open groups in > the memory (and the prefetch buffer) of one or another of your B consumers, > and that the number of groups that would have to be overflowed. If that's > not reasonable, this gets harder, but you could potentially write the > messages to some type of database but have the consumer just keep track of > which messages it has (so you're storing a single bit-mapped integer for > each message group instead of storing the full message content), and then > read them back in when the 15th one arrives. That means the database > doesn't have to query to know which batches are complete, only be capable > of retrieving the messages for a single group on demand. > > Tim > > On Sun, Jan 24, 2016 at 1:16 PM, Kevin Burton <[email protected]> wrote: > > > I have a pattern which I think I need advice on... > > > > I have three tasks... each a type of message consumer. > > > > Let's call them A, B, an C. > > > > A runs once,, creates 15 messages, sends them to B... then B process > these > > messages then generates 15 new messages. > > > > However, they need to be combined into a group, and sent to C all at > once, > > in one composite message. > > > > So its similar to map/reduce in a way in that C should execute once with > a > > block of these 15 messages. > > > > Conceptually I'm calling them (message group checkpoints).. but I'm > > wondering if there's already a more formal name for this concept. > > > > I'm not sure the best way to handle this with ActiveMQ. > > > > One strategy is that I could have one queue per C tasks (the final tasks) > > and then have C running and consuming them one at a time, and then > > performing the execution (and message commit) once it receives all 15 > > messages. > > > > I HAVE to get all the messages until I can make a decision, I can't > stream > > process them unfortunately because the algorithm needs all data points. > > > > I could use a database.. but the problem there is that it would incur > some > > database and Cassandra (in our case) doesn't handle this queue pattern > very > > well. > > > > Another idea is to use a series of queues ... say aggregation_0, > > aggregation_1, aggregation_2,... > > > > then I receive these messages into the first queue (aggregation_0), then > > sort it, if any of the groups are finished I send them on to the final > > destination task. If any are unfinished then I overflow them on to > > aggregation1 (pre-sorted)... > > > > Thoughts? > > > > -- > > > > We’re hiring if you know of any awesome Java Devops or Linux Operations > > Engineers! > > > > Founder/CEO Spinn3r.com > > Location: *San Francisco, CA* > > blog: http://burtonator.wordpress.com > > … or check out my Google+ profile > > <https://plus.google.com/102718274791889610666/posts> > > > -- We’re hiring if you know of any awesome Java Devops or Linux Operations Engineers! Founder/CEO Spinn3r.com Location: *San Francisco, CA* blog: http://burtonator.wordpress.com … or check out my Google+ profile <https://plus.google.com/102718274791889610666/posts>
