I´ve created a dummy unit test that checks that with fixed shared reply queues and CachingConnectionFactory, the parameter replyToCacheLevelName doesn´t solve the stuck messages in the reply queue problem.
In this test, I send 3 messages (at almost the same time) to one queue that uses the replyTo parameter using inOut. After receiving the reply, the message is sent to one mock (mock:result). In the end, this mock only receives 1 message, not 3. And in the log i can read: 16:20:13,169 INFO org.apache.camel.impl.DefaultShutdownStrategy| Waiting as there are still 2 inflight and pending exchanges to complete, timeout in 10 seconds. 16:20:14,170 INFO org.apache.camel.impl.DefaultShutdownStrategy| Waiting as there are still 2 inflight and pending exchanges to complete, timeout in 9 seconds. ... This is the unit test: public class FixedReplyQueueWithCachingConnectiongTest extends CamelTestSupport { protected CamelContext createCamelContext() throws Exception { CamelContext camelContext = super.createCamelContext(); ConnectionFactory connectionFactory = createConnectionFactory(); ActiveMQComponent activemqComponent = new ActiveMQComponent(); activemqComponent.setReplyToCacheLevelName("CACHE_NONE"); activemqComponent.setConnectionFactory(connectionFactory); camelContext.addComponent("activemq", activemqComponent); return camelContext; } private ConnectionFactory createConnectionFactory() { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); // optimize AMQ to be as fast as possible so unit testing is quicker connectionFactory.setCopyMessageOnSend(false); connectionFactory.setOptimizeAcknowledge(true); connectionFactory.setOptimizedMessageDispatch(true); // When using asyncSend, producers will not be guaranteed to send in the order we // have in the tests (which may be confusing for queues) so we need this set to false. // Another way of guaranteeing order is to use persistent messages or transactions. connectionFactory.setUseAsyncSend(false); connectionFactory.setAlwaysSessionAsync(false); CachingConnectionFactory cached = new CachingConnectionFactory(connectionFactory); cached.setSessionCacheSize(100); return cached; } @Test public void testStuckMessagesWithFixedReplyQueue() throws Exception { getMockEndpoint("mock:result").expectedMessageCount(1); new MyThread().start(); new MyThread().start(); new MyThread().start(); assertMockEndpointsSatisfied(); } @Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { public void configure() throws Exception { from("direct:start") .inOut("activemq:queue:foo?replyTo=queue:bar") .to("mock:result"); from("activemq:queue:foo").transform(body().prepend("Bye ")); } }; } class MyThread extends Thread { public void run() { template.requestBody("direct:start", "Camel"+getName(), String.class); } } } If I change cacheConsumers to false in the CachingConnectionFactory, the problem is solved: cached.setCacheConsumers(false) But i would like to deactivate the cache only for the reply queues, not in general, using the replyToCacheLevelName. Any idea why replyToCacheLevelName is not working? -- View this message in context: http://camel.465427.n5.nabble.com/JMS-Shared-fixed-reply-queue-doesn-t-consume-all-the-messages-tp5721700p5722304.html Sent from the Camel - Users mailing list archive at Nabble.com.