Hi all,

I am reading from a RabbitMQ queue as following:

from("rabbitmq://localhost/?queue=camel&autoAck=false&concurrentConsumers=1&
threadPoolSize=1&prefetchEnabled=true&prefetchCount=50")

Some remarks about the configuration parameters:

- I set `autoAck` to false to be able to acknowledge manually in my
implementation.
- I set `concurrentConsumers` and `threadPoolSize` to 1 to guarantee that I
consume the messages in the same order as they were added to the queue.
- I set `prefetchCount` to 50 to have at most 50 inflight messages in
memory.

Now, I want to process these 50 messages in an asynchronous fashion and
manually acknowledge when done. Each message has a `group identifier`
header set. Messages from the same group will be processed sequentially,
while messages from other groups will be processed concurrently.

I tried to start with the following:

from("rabbitmq://...")
.process(new AsyncProcessor() {
    @Override
    public boolean process(final Exchange exchange, final AsyncCallback
callback) {
        System.out.println("ASYNC");
        // TODO: (1) read group identifier (2) submit task to executor
responsible for that particular group (3) call callback.done() in the task
once done
        return false;
    }

    @Override
    public void process(final Exchange exchange) {
        throw new UnsupportedOperationException();
    }
})

The problem is here that only the first message is given to
`process(exchange, callback)`. Is there a way to also receive the other
inflight messages?

Note that I do *not* want to increase the number of RabbitMQ consumers, as
this would skew with the message order. It is important that messages from
the same group will be executed sequentially, hence the necessity to have
one single RabbitMQ consumer.

Thanks!

Reply via email to