Hi Peter,

Given you have a header already identifying the message categories, you could 
try to look into the Aggregator EIP:

http://camel.apache.org/aggregator2.html

It will allow you to group all the messages by the header into 'batches' for 
each of the groups. Each batch can then be processed by a single consumer and 
it would allow you to have multiple consumers taking care of the parallel 
processing of different groups. This is very suitable for periodic batch 
processing of messages, but requires finite batch sizes and/or finite time 
limits.

If you have a fixed set of group headers but not a deterministic size of the 
messages per group or a non-deterministic time period then a Message Router 
might be suitable:

http://camel.apache.org/message-router.html

You can directly check for the header in the choice expression and then route 
the different group messages to their respective group queues where you can 
have single consumers processing them in sequence in a continuous way. The 
separate queues would allow to fulfil the parallel processing per group 
requirement.

There is also a Dynamic Router option, but I personally wouldn't have any 
experience with it, but if none of the above suit your needs it might be 
something worth looking into:

http://camel.apache.org/dynamic-router.html


Thanks,
Valdis

-----Original Message-----
From: Peter Billen [mailto:peter.bil...@gmail.com] 
Sent: 17 December 2018 11:08
To: users@camel.apache.org
Subject: How to read messages from a queue in parallel

Hi all,

I am reading from a RabbitMQ queue as following:

from("rabbitmq://localhost/?queue=camel&autoAck=false&concurrentConsumers=1&
threadPoolSize=1&prefetchEnabled=true&prefetchCount=50")

Some remarks about the configuration parameters:

- I set `autoAck` to false to be able to acknowledge manually in my 
implementation.
- I set `concurrentConsumers` and `threadPoolSize` to 1 to guarantee that I 
consume the messages in the same order as they were added to the queue.
- I set `prefetchCount` to 50 to have at most 50 inflight messages in memory.

Now, I want to process these 50 messages in an asynchronous fashion and 
manually acknowledge when done. Each message has a `group identifier` header 
set. Messages from the same group will be processed sequentially, while 
messages from other groups will be processed concurrently.

I tried to start with the following:

from("rabbitmq://...")
.process(new AsyncProcessor() {
    @Override
    public boolean process(final Exchange exchange, final AsyncCallback
callback) {
        System.out.println("ASYNC");
        // TODO: (1) read group identifier (2) submit task to executor 
responsible for that particular group (3) call callback.done() in the task once 
done
        return false;
    }

    @Override
    public void process(final Exchange exchange) {
        throw new UnsupportedOperationException();
    }
})

The problem is here that only the first message is given to `process(exchange, 
callback)`. Is there a way to also receive the other inflight messages?

Note that I do *not* want to increase the number of RabbitMQ consumers, as this 
would skew with the message order. It is important that messages from the same 
group will be executed sequentially, hence the necessity to have one single 
RabbitMQ consumer.

Thanks!

Vhi Group DAC (Vhi) is a holding company for insurance and healthcare services, 
which include Vhi Healthcare DAC, Vhi Insurance DAC, Vhi Health Services DAC 
and Vhi Investments DAC. Vhi Healthcare DAC trading as Vhi Healthcare and Vhi 
Insurance DAC trading as Vhi Insurance are regulated by the Central Bank of 
Ireland. Vhi Healthcare is tied to Vhi Insurance DAC for health insurance in 
Ireland which is underwritten by Vhi Insurance DAC. Vhi Healthcare is tied to 
Zurich Life Assurance plc for Vhi Life Term Insurance and Vhi Mortgage 
Protection which is underwritten by Zurich Life Assurance plc. Vhi Healthcare 
is tied to Collinson Insurance Services Limited for MultiTrip Travel Insurance, 
Backpacker Travel Insurance and Vhi Dental Insurance which are underwritten by 
Great Lakes Insurance SE, UK branch and for Vhi Canada Cover and Vhi 
International Health Insurance which are underwritten by Astrenska Insurance 
Limited. For more information about the Vhi Group please go to: 
https://www.vhi.ie/about-vhi. 


Tá Vhi Group DAC (Vhi) ina chuideachta sealbhaíochta le haghaidh seirbhísí 
árachais agus seirbhísí cúram sláinte, lena n-áirítear Vhi Healthcare DAC, Vhi 
Insurance DAC, Vhi Health Services DAC agus Vhi Investments DAC. Déanann Banc 
Ceannais na hÉireann rialáil ar Vhi Healthcare DAC, ag trádáil dó mar Vhi 
Healthcare, agus ar Vhi Insurance DAC, ag trádáil dó mar Vhi Insurance. Tá Vhi 
Healthcare ceangailte le Vhi Insurance DAC le haghaidh árachas sláinte in 
Éirinn, rud atá frithgheallta ag Vhi Insurance DAC. Tá Vhi Healthcare 
ceangailte le Zurich Life Assurance plc le haghaidh Árachais Saoil de chuid Vhi 
agus Árachas Cosanta Morgáiste de chuid Vhi atá frithgheallta ag Zurich Life 
Assurance plc. Tá Vhi Healthcare ceangailte le Collinson Insurance Services 
Limited le haghaidh Árachas Taistil Ilturais agus Turasóirí Mála Droma agus 
Árachas Fiaclóireachta de chuid Vhi atá frithgheallta ag Great Lakes Insurance 
SE, UK branch agus le haghaidh Clúdach Cheanada de chuid Vhi agus Árachas 
Sláinte Idirnáisiúnta de chuid Vhi atá frithgheallta ag Astrenska Insurance 
Limited. Chun tuilleadh faisnéise a fháil faoi Ghrúpa Vhi, tabhair cuairt ar: 
https://www.vhi.ie/about-vhi. 

This e-mail and any files transmitted with it contain information which may be 
confidential and which may also be privileged and is intended solely for the 
use of the individual or entity to whom it is addressed. Unless you are the 
intended recipient you may not copy or use it, or disclose it to anyone else. 
Any opinions expressed are that of the individual and not necessarily that of 
the Vhi Group. If you have received this e-mail in error please notify the 
sender by return.






Reply via email to