Hello.
In order to optimize my InOut throughput I added explicit replyTo  on my JMS
endpoint.
This doesn't seems to be well behaving with camel threads and spring JMS
caching.
The following test case shows the problem.
remove the explicit replyTo and everything works fine.
your comments are welcome.

================

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Callable;
import javax.jms.TextMessage;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.JMSException;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.CamelContext;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.builder.RouteBuilder;
import org.apache.activemq.camel.component.ActiveMQComponent;
import org.apache.camel.ExchangePattern;
import org.apache.camel.test.CamelTestSupport;
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import static
org.apache.activemq.camel.component.ActiveMQComponent.activeMQComponent;

/**
 * Unit test using a fixed replyTo specified on the JMS endpoint
 * 
 * @version $Revision: 791824 $
 */
public class JmsJMSReplyToEndpointUsingInOutTest extends CamelTestSupport {

    private ActiveMQComponent amq;
    private static String MQURI = "failover:(tcp://localhost:61616)";
   // private static String MQURI =
"vm://localhost?broker.persistent=false&broker.useJmx=false";

    public class Replier implements Callable {

        @Override
                public Object call() throws Exception {
                log.info("replier started");
                JmsTemplate jms = new
JmsTemplate(amq.getConfiguration().getConnectionFactory());

                final TextMessage msg = (TextMessage)
jms.receive("nameRequestor");
                assertEquals("What's your name", msg.getText());

                // there should be a JMSReplyTo so we know where to send the
reply
                final Destination replyTo = msg.getJMSReplyTo();

                // send reply
               // Thread.sleep(10000);
                jms.send(replyTo, new MessageCreator() {
                    public Message createMessage(Session session) throws
JMSException {
                        TextMessage replyMsg = session.createTextMessage();
                        replyMsg.setText("My name is Arnio");
                       
replyMsg.setJMSCorrelationID(msg.getJMSCorrelationID());
                        return replyMsg;
                    }
                });
                return null;
            }
    };
    public void testCustomJMSReplyToInOut() throws Exception {

        MockEndpoint mock = getMockEndpoint("mock:result");
        mock.expectedBodiesReceived("My name is Arnio");
        mock.expectedMessageCount(10);
        // do not use Camel to send and receive to simulate a non Camel
client
        // use another thread to listen and send the reply
        ExecutorService newFixedThreadPool =
Executors.newFixedThreadPool(20);

        for (int i = 0 ; i < 10 ; i++) {
          newFixedThreadPool.submit(new Replier());
        }
        
        // now get started and send the first message that gets the ball
rolling
        JmsTemplate jms = new
JmsTemplate(amq.getConfiguration().getConnectionFactory());
        for (int i = 0 ; i < 10 ; i++) {
        jms.send("hello", new MessageCreator() {
            public Message createMessage(Session session) throws
JMSException {
                TextMessage msg = session.createTextMessage();
                msg.setText("Hello, I'm here");
                return msg;
            }

        });

        log.info("Hello sent");
        Thread.sleep(10);
        }

        Thread.sleep(5000);
        assertMockEndpointsSatisfied();
    }

    protected RouteBuilder createRouteBuilder() throws Exception {
        return new RouteBuilder() {

            public void configure() throws Exception {
                from("activemq:queue:hello")
                   .threads()
                    .process(new Processor() {
                        public void process(Exchange exchange) throws
Exception {

                            exchange.getOut().setBody("What's your name");
                        }
                    })
                    // use in out to get a reply as well
                    
                    .to(ExchangePattern.InOut,
"activemq:queue:nameRequestor?replyTo=queue:namedReplyQueue) // Remove the
replyTo and eveything works well
                    .to("direct:replyprocessor");

                    from("direct:replyprocessor").process(new Processor() {
                            public void process(Exchange exchange) throws
Exception {
                                System.out.println("Here I am processing the
reply " + exchange.getIn().getBody());
                            }
                    }).to("mock:result");
            }
        };
    }

    protected CamelContext createCamelContext() throws Exception {
         CamelContext camelContext = super.createCamelContext();
        amq = activeMQComponent(MQURI);
        ActiveMQConnectionFactory targetFactory = new
ActiveMQConnectionFactory(MQURI);
         log.info("Using MQ CachingConnectionFactory");
            CachingConnectionFactory cachedAMQConnectionFactory = new
CachingConnectionFactory(targetFactory);

        camelContext.addComponent("activemq",
ActiveMQComponent.jmsComponent(cachedAMQConnectionFactory));

        return camelContext;
    }


}

===========
-- 
View this message in context: 
http://www.nabble.com/explicitly-setting-replyTo-doesn%27t-scale-with-.threads%28%29-and-JMS-cache.-tp25991334p25991334.html
Sent from the Camel - Users (activemq) mailing list archive at Nabble.com.

Reply via email to