Hi Jim, others I got some code for supporting exclusive replyTo queues. I just want to know if you run in a clustered environment. As the replyTo queue would be exclusive to each CamelContext (eg each node). So if you have 2 nodes running with Camel then each node would have to use an unique replyTo queue.
If you run in a single instance of CamelContext then there is of course no problem. Alternative you can always use shared queues in a cluster, as each CamelContext (each node) will only pickup intended reply message for it (using a JMS selector), which is also the case why its slower. You can tweak the performance with the receiveTimeout. That is default 1 sec. If you set it lower, then it will potential react faster to new replies coming back. On Fri, Jul 8, 2011 at 3:18 AM, Jim Newsham <jnews...@referentia.com> wrote: > > I'm using Camel 2.7.1 on top of ActiveMQ 5.5.0. For some reason, when I > specify a custom replyTo destination on the endpoint url, the time it takes > for the producer to receive a reply increases drastically. The curious > thing is that the time to receive a reply is almost exactly 1 second. When > I remove the replyTo from the url, everything's fast again. > > I created a very simple, stand-alone test to demonstrate what I'm seeing. > There is a server class [4] which runs an embedded instance of ActiveMQ and > simply replies to messages as they arrive; and a client [3] class which > simply sends messages to the server, and prints the elapsed time. The > USE_REPLY_TO symbolic constant in the client determines whether a replyTo > value is added to the url or not. > > The client output when USE_REPLY_TO is false is shown as [1]. The client > output when USE_REPLY_TO is true is shown as [2]. The code is pretty > trivial. Am I doing something wrong, or is this a Camel and/or ActiveMQ > issue? > > Thanks! > Jim > > > [1] USE_REPLY_TO = false > > received reply in: 0.476 s > received reply in: 0.006 s > received reply in: 0.006 s > received reply in: 0.006 s > received reply in: 0.006 s > ... > > > [2] USE_REPLY_TO = true > > received reply in: 1.524 s > received reply in: 1.002 s > received reply in: 1.003 s > received reply in: 1.003 s > received reply in: 1.002 s > ... > > > [3] TestReplyToClient.java > > package test; > > import org.apache.activemq.ActiveMQConnectionFactory; > import org.apache.activemq.camel.component.ActiveMQComponent; > import org.apache.camel.CamelContext; > import org.apache.camel.ProducerTemplate; > import org.apache.camel.impl.DefaultCamelContext; > > public class TestReplyToClient { > > private static final boolean USE_REPLY_TO = false; > > public static void main(String... args) throws Exception { > // create camel context; configure activemq component for > tcp://localhost:7001 > CamelContext context = new DefaultCamelContext(); > ActiveMQComponent activemqComponent = > ActiveMQComponent.activeMQComponent(); > activemqComponent.setConnectionFactory(new ActiveMQConnectionFactory( > null, null, "tcp://localhost:7001")); > context.addComponent("activemq", activemqComponent); > context.start(); > > // define url to send requests to > String sendUrl = "activemq:queue:dest"; > if (USE_REPLY_TO) { > sendUrl += "?replyTo=replyQueue"; > } > System.err.println("sending to url: " + sendUrl); > > // repeatedly send requests; measure elapsed time > ProducerTemplate template = context.createProducerTemplate(); > while (true) { > long startNanos = System.nanoTime(); > template.requestBody(sendUrl, "abc"); > long elapsedNanos = System.nanoTime() - startNanos; > System.err.println(String.format("received reply in: %.3f s", > elapsedNanos / 1000000000.0)); > } > } > > } > > > [4] TestReplyToServer.java > > package test; > > import org.apache.activemq.broker.BrokerService; > import org.apache.activemq.camel.component.ActiveMQComponent; > import org.apache.camel.CamelContext; > import org.apache.camel.Exchange; > import org.apache.camel.Processor; > import org.apache.camel.builder.RouteBuilder; > import org.apache.camel.impl.DefaultCamelContext; > > public class TestReplyToServer { > > private static final String BROKER_NAME = "thebroker"; > > public static void main(String... args) throws Exception { > startBroker(); > startCamel(); > Thread.sleep(Long.MAX_VALUE); > } > > private static void startBroker() throws Exception { > BrokerService brokerService = new BrokerService(); > brokerService.setBrokerName(BROKER_NAME); > brokerService.setSchedulerSupport(false); > brokerService.setPersistent(false); > brokerService.addConnector("tcp://0.0.0.0:7001"); > brokerService.start(); > brokerService.waitUntilStarted(); > } > > > private static void startCamel() throws Exception { > CamelContext context = new DefaultCamelContext(); > > ActiveMQComponent activemqComponent = > ActiveMQComponent.activeMQComponent(); > activemqComponent.setBrokerURL(String.format("vm://%s?create=false", > BROKER_NAME)); > context.addComponent("activemq", activemqComponent); > > final String receiveUrl = "activemq:queue:dest"; > context.addRoutes(new RouteBuilder() { > @Override > public void configure() throws Exception { > from(receiveUrl).process(new Processor() { > @Override > public void process(Exchange exchange) throws Exception { > System.err.println("received request"); > exchange.getOut().setBody("reply"); > } > }); > } > }); > > context.start(); > System.err.println("listening on url: " + receiveUrl); > } > > } > > > > > -- Claus Ibsen ----------------- FuseSource Email: cib...@fusesource.com Web: http://fusesource.com Twitter: davsclaus, fusenews Blog: http://davsclaus.blogspot.com/ Author of Camel in Action: http://www.manning.com/ibsen/