Hi Valdis, Many thanks for your feedback, appreciate it.
I believe I was looking for the static or dyanamic routing: - Aggregation is not suitable for my use case, as I don't have a deterministic message flow. I also want to process messages in a real-time manner. - Static/dynamic message or routing might be suitable, though the groups should be treated as a dynamic set. In my use case, I have `m` groups and `n` available executor threads. Here, `m` can be any (large) number while `n` is usually small, say 8. I have to try it out with static or dynamic routing, it would be great if I could apply a module hash to map an arbitrary group identifier to an executor thread. It's important that the RabbitMQ ACK/NACK happens *after* the successful execution on the executor thread; that is, not when it has been routed by the router. I don't want to risk dropping messages, for example during a service restart. I'll experiment and come back with my results. Thanks. On Mon, Dec 17, 2018 at 4:41 PM Valdis Andersons <valdis.anders...@vhi.ie> wrote: > Hi Peter, > > Given you have a header already identifying the message categories, you > could try to look into the Aggregator EIP: > > http://camel.apache.org/aggregator2.html > > It will allow you to group all the messages by the header into 'batches' > for each of the groups. Each batch can then be processed by a single > consumer and it would allow you to have multiple consumers taking care of > the parallel processing of different groups. This is very suitable for > periodic batch processing of messages, but requires finite batch sizes > and/or finite time limits. > > If you have a fixed set of group headers but not a deterministic size of > the messages per group or a non-deterministic time period then a Message > Router might be suitable: > > http://camel.apache.org/message-router.html > > You can directly check for the header in the choice expression and then > route the different group messages to their respective group queues where > you can have single consumers processing them in sequence in a continuous > way. The separate queues would allow to fulfil the parallel processing per > group requirement. > > There is also a Dynamic Router option, but I personally wouldn't have any > experience with it, but if none of the above suit your needs it might be > something worth looking into: > > http://camel.apache.org/dynamic-router.html > > > Thanks, > Valdis > > -----Original Message----- > From: Peter Billen [mailto:peter.bil...@gmail.com] > Sent: 17 December 2018 11:08 > To: users@camel.apache.org > Subject: How to read messages from a queue in parallel > > Hi all, > > I am reading from a RabbitMQ queue as following: > > > from("rabbitmq://localhost/?queue=camel&autoAck=false&concurrentConsumers=1& > threadPoolSize=1&prefetchEnabled=true&prefetchCount=50") > > Some remarks about the configuration parameters: > > - I set `autoAck` to false to be able to acknowledge manually in my > implementation. > - I set `concurrentConsumers` and `threadPoolSize` to 1 to guarantee that > I consume the messages in the same order as they were added to the queue. > - I set `prefetchCount` to 50 to have at most 50 inflight messages in > memory. > > Now, I want to process these 50 messages in an asynchronous fashion and > manually acknowledge when done. Each message has a `group identifier` > header set. Messages from the same group will be processed sequentially, > while messages from other groups will be processed concurrently. > > I tried to start with the following: > > from("rabbitmq://...") > .process(new AsyncProcessor() { > @Override > public boolean process(final Exchange exchange, final AsyncCallback > callback) { > System.out.println("ASYNC"); > // TODO: (1) read group identifier (2) submit task to executor > responsible for that particular group (3) call callback.done() in the task > once done > return false; > } > > @Override > public void process(final Exchange exchange) { > throw new UnsupportedOperationException(); > } > }) > > The problem is here that only the first message is given to > `process(exchange, callback)`. Is there a way to also receive the other > inflight messages? > > Note that I do *not* want to increase the number of RabbitMQ consumers, as > this would skew with the message order. It is important that messages from > the same group will be executed sequentially, hence the necessity to have > one single RabbitMQ consumer. > > Thanks! > > Vhi Group DAC (Vhi) is a holding company for insurance and healthcare > services, which include Vhi Healthcare DAC, Vhi Insurance DAC, Vhi Health > Services DAC and Vhi Investments DAC. Vhi Healthcare DAC trading as Vhi > Healthcare and Vhi Insurance DAC trading as Vhi Insurance are regulated by > the Central Bank of Ireland. Vhi Healthcare is tied to Vhi Insurance DAC > for health insurance in Ireland which is underwritten by Vhi Insurance DAC. > Vhi Healthcare is tied to Zurich Life Assurance plc for Vhi Life Term > Insurance and Vhi Mortgage Protection which is underwritten by Zurich Life > Assurance plc. Vhi Healthcare is tied to Collinson Insurance Services > Limited for MultiTrip Travel Insurance, Backpacker Travel Insurance and Vhi > Dental Insurance which are underwritten by Great Lakes Insurance SE, UK > branch and for Vhi Canada Cover and Vhi International Health Insurance > which are underwritten by Astrenska Insurance Limited. For more information > about the Vhi Group please go to: https://www.vhi.ie/about-vhi. > > > Tá Vhi Group DAC (Vhi) ina chuideachta sealbhaíochta le haghaidh seirbhísí > árachais agus seirbhísí cúram sláinte, lena n-áirítear Vhi Healthcare DAC, > Vhi Insurance DAC, Vhi Health Services DAC agus Vhi Investments DAC. > Déanann Banc Ceannais na hÉireann rialáil ar Vhi Healthcare DAC, ag trádáil > dó mar Vhi Healthcare, agus ar Vhi Insurance DAC, ag trádáil dó mar Vhi > Insurance. Tá Vhi Healthcare ceangailte le Vhi Insurance DAC le haghaidh > árachas sláinte in Éirinn, rud atá frithgheallta ag Vhi Insurance DAC. Tá > Vhi Healthcare ceangailte le Zurich Life Assurance plc le haghaidh Árachais > Saoil de chuid Vhi agus Árachas Cosanta Morgáiste de chuid Vhi atá > frithgheallta ag Zurich Life Assurance plc. Tá Vhi Healthcare ceangailte le > Collinson Insurance Services Limited le haghaidh Árachas Taistil Ilturais > agus Turasóirí Mála Droma agus Árachas Fiaclóireachta de chuid Vhi atá > frithgheallta ag Great Lakes Insurance SE, UK branch agus le haghaidh > Clúdach Cheanada de chuid Vhi agus Árachas Sláinte Idirnáisiúnta de chuid > Vhi atá frithgheallta ag Astrenska Insurance Limited. Chun tuilleadh > faisnéise a fháil faoi Ghrúpa Vhi, tabhair cuairt ar: > https://www.vhi.ie/about-vhi. > > This e-mail and any files transmitted with it contain information which > may be confidential and which may also be privileged and is intended solely > for the use of the individual or entity to whom it is addressed. Unless you > are the intended recipient you may not copy or use it, or disclose it to > anyone else. Any opinions expressed are that of the individual and not > necessarily that of the Vhi Group. If you have received this e-mail in > error please notify the sender by return. > > > > > > >