Hello. In order to optimize my InOut throughput I added explicit replyTo on my JMS endpoint. This doesn't seems to be well behaving with camel threads and spring JMS caching. The following test case shows the problem. remove the explicit replyTo and everything works fine. your comments are welcome.
================ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Callable; import javax.jms.TextMessage; import javax.jms.Destination; import javax.jms.Message; import javax.jms.Session; import javax.jms.JMSException; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.CamelContext; import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.builder.RouteBuilder; import org.apache.activemq.camel.component.ActiveMQComponent; import org.apache.camel.ExchangePattern; import org.apache.camel.test.CamelTestSupport; import org.springframework.jms.connection.CachingConnectionFactory; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import static org.apache.activemq.camel.component.ActiveMQComponent.activeMQComponent; /** * Unit test using a fixed replyTo specified on the JMS endpoint * * @version $Revision: 791824 $ */ public class JmsJMSReplyToEndpointUsingInOutTest extends CamelTestSupport { private ActiveMQComponent amq; private static String MQURI = "failover:(tcp://localhost:61616)"; // private static String MQURI = "vm://localhost?broker.persistent=false&broker.useJmx=false"; public class Replier implements Callable { @Override public Object call() throws Exception { log.info("replier started"); JmsTemplate jms = new JmsTemplate(amq.getConfiguration().getConnectionFactory()); final TextMessage msg = (TextMessage) jms.receive("nameRequestor"); assertEquals("What's your name", msg.getText()); // there should be a JMSReplyTo so we know where to send the reply final Destination replyTo = msg.getJMSReplyTo(); // send reply // Thread.sleep(10000); jms.send(replyTo, new MessageCreator() { public Message createMessage(Session session) throws JMSException { TextMessage replyMsg = session.createTextMessage(); replyMsg.setText("My name is Arnio"); replyMsg.setJMSCorrelationID(msg.getJMSCorrelationID()); return replyMsg; } }); return null; } }; public void testCustomJMSReplyToInOut() throws Exception { MockEndpoint mock = getMockEndpoint("mock:result"); mock.expectedBodiesReceived("My name is Arnio"); mock.expectedMessageCount(10); // do not use Camel to send and receive to simulate a non Camel client // use another thread to listen and send the reply ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(20); for (int i = 0 ; i < 10 ; i++) { newFixedThreadPool.submit(new Replier()); } // now get started and send the first message that gets the ball rolling JmsTemplate jms = new JmsTemplate(amq.getConfiguration().getConnectionFactory()); for (int i = 0 ; i < 10 ; i++) { jms.send("hello", new MessageCreator() { public Message createMessage(Session session) throws JMSException { TextMessage msg = session.createTextMessage(); msg.setText("Hello, I'm here"); return msg; } }); log.info("Hello sent"); Thread.sleep(10); } Thread.sleep(5000); assertMockEndpointsSatisfied(); } protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { public void configure() throws Exception { from("activemq:queue:hello") .threads() .process(new Processor() { public void process(Exchange exchange) throws Exception { exchange.getOut().setBody("What's your name"); } }) // use in out to get a reply as well .to(ExchangePattern.InOut, "activemq:queue:nameRequestor?replyTo=queue:namedReplyQueue) // Remove the replyTo and eveything works well .to("direct:replyprocessor"); from("direct:replyprocessor").process(new Processor() { public void process(Exchange exchange) throws Exception { System.out.println("Here I am processing the reply " + exchange.getIn().getBody()); } }).to("mock:result"); } }; } protected CamelContext createCamelContext() throws Exception { CamelContext camelContext = super.createCamelContext(); amq = activeMQComponent(MQURI); ActiveMQConnectionFactory targetFactory = new ActiveMQConnectionFactory(MQURI); log.info("Using MQ CachingConnectionFactory"); CachingConnectionFactory cachedAMQConnectionFactory = new CachingConnectionFactory(targetFactory); camelContext.addComponent("activemq", ActiveMQComponent.jmsComponent(cachedAMQConnectionFactory)); return camelContext; } } =========== -- View this message in context: http://www.nabble.com/explicitly-setting-replyTo-doesn%27t-scale-with-.threads%28%29-and-JMS-cache.-tp25991334p25991334.html Sent from the Camel - Users (activemq) mailing list archive at Nabble.com.