I´ve created a dummy unit test that checks that with fixed shared reply
queues and CachingConnectionFactory, the parameter replyToCacheLevelName
doesn´t solve the stuck messages in the reply queue problem.

In this test, I send 3 messages (at almost the same time) to one queue that
uses the replyTo parameter using inOut. After receiving the reply, the
message is sent to one mock (mock:result). In the end, this mock only
receives 1 message, not 3. And in the log i can read: 

16:20:13,169 INFO       org.apache.camel.impl.DefaultShutdownStrategy|
Waiting as there are still 2 inflight and pending exchanges to complete,
timeout in 10 seconds.
16:20:14,170 INFO       org.apache.camel.impl.DefaultShutdownStrategy|
Waiting as there are still 2 inflight and pending exchanges to complete,
timeout in 9 seconds.
...

This is the unit test:

public class FixedReplyQueueWithCachingConnectiongTest extends
CamelTestSupport {

    protected CamelContext createCamelContext() throws Exception {
        CamelContext camelContext = super.createCamelContext();

        ConnectionFactory connectionFactory = createConnectionFactory();
        ActiveMQComponent activemqComponent = new ActiveMQComponent();
        activemqComponent.setReplyToCacheLevelName("CACHE_NONE");
        activemqComponent.setConnectionFactory(connectionFactory);
        camelContext.addComponent("activemq", activemqComponent);

        return camelContext;
    }
    
    private ConnectionFactory createConnectionFactory() {
        ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory("tcp://localhost:61616");
        // optimize AMQ to be as fast as possible so unit testing is quicker
        connectionFactory.setCopyMessageOnSend(false);
        connectionFactory.setOptimizeAcknowledge(true);
        connectionFactory.setOptimizedMessageDispatch(true);

        // When using asyncSend, producers will not be guaranteed to send in
the order we 
        // have in the tests (which may be confusing for queues) so we need
this set to false.
        // Another way of guaranteeing order is to use persistent messages
or transactions.
        connectionFactory.setUseAsyncSend(false);

        connectionFactory.setAlwaysSessionAsync(false);

        CachingConnectionFactory cached = new
CachingConnectionFactory(connectionFactory);
        cached.setSessionCacheSize(100);
        return cached;
    }
    
    @Test
    public void testStuckMessagesWithFixedReplyQueue() throws Exception {
        getMockEndpoint("mock:result").expectedMessageCount(1);
        
        new MyThread().start();
        new MyThread().start();
        new MyThread().start();

        assertMockEndpointsSatisfied();
    }

    @Override
    protected RouteBuilder createRouteBuilder() throws Exception {
        return new RouteBuilder() {
            public void configure() throws Exception {
                from("direct:start")
                        .inOut("activemq:queue:foo?replyTo=queue:bar")
                        .to("mock:result");

                from("activemq:queue:foo").transform(body().prepend("Bye
"));
            }
        };
    }
    
    class MyThread extends Thread {
        
        public void run() {
            template.requestBody("direct:start", "Camel"+getName(),
String.class);            
        }
        
    }
}

If I change cacheConsumers to false in the CachingConnectionFactory, the
problem is solved:
cached.setCacheConsumers(false)

But i would like to deactivate the cache only for the reply queues, not in
general, using the replyToCacheLevelName.

Any idea why replyToCacheLevelName is not working? 



--
View this message in context: 
http://camel.465427.n5.nabble.com/JMS-Shared-fixed-reply-queue-doesn-t-consume-all-the-messages-tp5721700p5722304.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Reply via email to