Hi Also use option concurrentConsumers on the JMS endpoint to support concurrency, instead of threads.
On Wed, Oct 21, 2009 at 2:58 PM, Claus Ibsen <[email protected]> wrote: > Hi > > Maybe this ticket can give some hints > https://issues.apache.org/activemq/browse/CAMEL-490 > > On Wed, Oct 21, 2009 at 2:15 PM, Eric Bouer <[email protected]> wrote: >> >> Hello. >> In order to optimize my InOut throughput I added explicit replyTo on my JMS >> endpoint. >> This doesn't seems to be well behaving with camel threads and spring JMS >> caching. >> The following test case shows the problem. >> remove the explicit replyTo and everything works fine. >> your comments are welcome. >> >> ================ >> >> import java.util.concurrent.ExecutorService; >> import java.util.concurrent.Executors; >> import java.util.concurrent.Callable; >> import javax.jms.TextMessage; >> import javax.jms.Destination; >> import javax.jms.Message; >> import javax.jms.Session; >> import javax.jms.JMSException; >> >> import org.apache.activemq.ActiveMQConnectionFactory; >> import org.apache.camel.Exchange; >> import org.apache.camel.Processor; >> import org.apache.camel.CamelContext; >> import org.apache.camel.component.mock.MockEndpoint; >> import org.apache.camel.builder.RouteBuilder; >> import org.apache.activemq.camel.component.ActiveMQComponent; >> import org.apache.camel.ExchangePattern; >> import org.apache.camel.test.CamelTestSupport; >> import org.springframework.jms.connection.CachingConnectionFactory; >> import org.springframework.jms.core.JmsTemplate; >> import org.springframework.jms.core.MessageCreator; >> import static >> org.apache.activemq.camel.component.ActiveMQComponent.activeMQComponent; >> >> /** >> * Unit test using a fixed replyTo specified on the JMS endpoint >> * >> * @version $Revision: 791824 $ >> */ >> public class JmsJMSReplyToEndpointUsingInOutTest extends CamelTestSupport { >> >> private ActiveMQComponent amq; >> private static String MQURI = "failover:(tcp://localhost:61616)"; >> // private static String MQURI = >> "vm://localhost?broker.persistent=false&broker.useJmx=false"; >> >> public class Replier implements Callable { >> >> �...@override >> public Object call() throws Exception { >> log.info("replier started"); >> JmsTemplate jms = new >> JmsTemplate(amq.getConfiguration().getConnectionFactory()); >> >> final TextMessage msg = (TextMessage) >> jms.receive("nameRequestor"); >> assertEquals("What's your name", msg.getText()); >> >> // there should be a JMSReplyTo so we know where to send the >> reply >> final Destination replyTo = msg.getJMSReplyTo(); >> >> // send reply >> // Thread.sleep(10000); >> jms.send(replyTo, new MessageCreator() { >> public Message createMessage(Session session) throws >> JMSException { >> TextMessage replyMsg = session.createTextMessage(); >> replyMsg.setText("My name is Arnio"); >> >> replyMsg.setJMSCorrelationID(msg.getJMSCorrelationID()); >> return replyMsg; >> } >> }); >> return null; >> } >> }; >> public void testCustomJMSReplyToInOut() throws Exception { >> >> MockEndpoint mock = getMockEndpoint("mock:result"); >> mock.expectedBodiesReceived("My name is Arnio"); >> mock.expectedMessageCount(10); >> // do not use Camel to send and receive to simulate a non Camel >> client >> // use another thread to listen and send the reply >> ExecutorService newFixedThreadPool = >> Executors.newFixedThreadPool(20); >> >> for (int i = 0 ; i < 10 ; i++) { >> newFixedThreadPool.submit(new Replier()); >> } >> >> // now get started and send the first message that gets the ball >> rolling >> JmsTemplate jms = new >> JmsTemplate(amq.getConfiguration().getConnectionFactory()); >> for (int i = 0 ; i < 10 ; i++) { >> jms.send("hello", new MessageCreator() { >> public Message createMessage(Session session) throws >> JMSException { >> TextMessage msg = session.createTextMessage(); >> msg.setText("Hello, I'm here"); >> return msg; >> } >> >> }); >> >> log.info("Hello sent"); >> Thread.sleep(10); >> } >> >> Thread.sleep(5000); >> assertMockEndpointsSatisfied(); >> } >> >> protected RouteBuilder createRouteBuilder() throws Exception { >> return new RouteBuilder() { >> >> public void configure() throws Exception { >> from("activemq:queue:hello") >> .threads() >> .process(new Processor() { >> public void process(Exchange exchange) throws >> Exception { >> >> exchange.getOut().setBody("What's your name"); >> } >> }) >> // use in out to get a reply as well >> >> .to(ExchangePattern.InOut, >> "activemq:queue:nameRequestor?replyTo=queue:namedReplyQueue) // Remove the >> replyTo and eveything works well >> .to("direct:replyprocessor"); >> >> from("direct:replyprocessor").process(new Processor() { >> public void process(Exchange exchange) throws >> Exception { >> System.out.println("Here I am processing the >> reply " + exchange.getIn().getBody()); >> } >> }).to("mock:result"); >> } >> }; >> } >> >> protected CamelContext createCamelContext() throws Exception { >> CamelContext camelContext = super.createCamelContext(); >> amq = activeMQComponent(MQURI); >> ActiveMQConnectionFactory targetFactory = new >> ActiveMQConnectionFactory(MQURI); >> log.info("Using MQ CachingConnectionFactory"); >> CachingConnectionFactory cachedAMQConnectionFactory = new >> CachingConnectionFactory(targetFactory); >> >> camelContext.addComponent("activemq", >> ActiveMQComponent.jmsComponent(cachedAMQConnectionFactory)); >> >> return camelContext; >> } >> >> >> } >> >> =========== >> -- >> View this message in context: >> http://www.nabble.com/explicitly-setting-replyTo-doesn%27t-scale-with-.threads%28%29-and-JMS-cache.-tp25991334p25991334.html >> Sent from the Camel - Users (activemq) mailing list archive at Nabble.com. >> >> > > > > -- > Claus Ibsen > Apache Camel Committer > > Open Source Integration: http://fusesource.com > Blog: http://davsclaus.blogspot.com/ > Twitter: http://twitter.com/davsclaus > -- Claus Ibsen Apache Camel Committer Author of Camel in Action: http://www.manning.com/ibsen/ Open Source Integration: http://fusesource.com Blog: http://davsclaus.blogspot.com/ Twitter: http://twitter.com/davsclaus
