Hi Jim, others

I got some code for supporting exclusive replyTo queues. I just want
to know if you run in a clustered environment.
As the replyTo queue would be exclusive to each CamelContext (eg each
node). So if you have 2 nodes running with Camel then each node would
have to use an unique replyTo queue.

If you run in a single instance of CamelContext then there is of
course no problem.

Alternative you can always use shared queues in a cluster, as each
CamelContext (each node) will only pickup intended reply message for
it (using a JMS selector), which is also the case why its slower. You
can tweak the performance with the receiveTimeout. That is default 1
sec. If you set it lower, then it will potential react faster to new
replies coming back.



On Fri, Jul 8, 2011 at 3:18 AM, Jim Newsham <jnews...@referentia.com> wrote:
>
> I'm using Camel 2.7.1 on top of ActiveMQ 5.5.0.  For some reason, when I
> specify a custom replyTo destination on the endpoint url, the time it takes
> for the producer to receive a reply increases drastically.  The curious
> thing is that the time to receive a reply is almost exactly 1 second.  When
> I remove the replyTo from the url, everything's fast again.
>
> I created a very simple, stand-alone test to demonstrate what I'm seeing.
>  There is a server class [4] which runs an embedded instance of ActiveMQ and
> simply replies to messages as they arrive; and a client [3] class which
> simply sends messages to the server, and prints the elapsed time.  The
> USE_REPLY_TO symbolic constant in the client determines whether a replyTo
> value is added to the url or not.
>
> The client output when USE_REPLY_TO is false is shown as [1].  The client
> output when USE_REPLY_TO is true is shown as [2].  The code is pretty
> trivial.  Am I doing something wrong, or is this a Camel and/or ActiveMQ
> issue?
>
> Thanks!
> Jim
>
>
> [1] USE_REPLY_TO = false
>
> received reply in: 0.476 s
> received reply in: 0.006 s
> received reply in: 0.006 s
> received reply in: 0.006 s
> received reply in: 0.006 s
> ...
>
>
> [2] USE_REPLY_TO = true
>
> received reply in: 1.524 s
> received reply in: 1.002 s
> received reply in: 1.003 s
> received reply in: 1.003 s
> received reply in: 1.002 s
> ...
>
>
> [3] TestReplyToClient.java
>
> package test;
>
> import org.apache.activemq.ActiveMQConnectionFactory;
> import org.apache.activemq.camel.component.ActiveMQComponent;
> import org.apache.camel.CamelContext;
> import org.apache.camel.ProducerTemplate;
> import org.apache.camel.impl.DefaultCamelContext;
>
> public class TestReplyToClient {
>
>  private static final boolean USE_REPLY_TO = false;
>
>  public static void main(String... args) throws Exception {
>    // create camel context; configure activemq component for
> tcp://localhost:7001
>    CamelContext context = new DefaultCamelContext();
>    ActiveMQComponent activemqComponent =
> ActiveMQComponent.activeMQComponent();
>    activemqComponent.setConnectionFactory(new ActiveMQConnectionFactory(
>      null, null, "tcp://localhost:7001"));
>    context.addComponent("activemq", activemqComponent);
>    context.start();
>
>    // define url to send requests to
>    String sendUrl = "activemq:queue:dest";
>    if (USE_REPLY_TO) {
>      sendUrl += "?replyTo=replyQueue";
>    }
>    System.err.println("sending to url: " + sendUrl);
>
>    // repeatedly send requests; measure elapsed time
>    ProducerTemplate template = context.createProducerTemplate();
>    while (true) {
>      long startNanos = System.nanoTime();
>      template.requestBody(sendUrl, "abc");
>      long elapsedNanos = System.nanoTime() - startNanos;
>      System.err.println(String.format("received reply in: %.3f s",
> elapsedNanos / 1000000000.0));
>    }
>  }
>
> }
>
>
> [4] TestReplyToServer.java
>
> package test;
>
> import org.apache.activemq.broker.BrokerService;
> import org.apache.activemq.camel.component.ActiveMQComponent;
> import org.apache.camel.CamelContext;
> import org.apache.camel.Exchange;
> import org.apache.camel.Processor;
> import org.apache.camel.builder.RouteBuilder;
> import org.apache.camel.impl.DefaultCamelContext;
>
> public class TestReplyToServer {
>
>  private static final String BROKER_NAME = "thebroker";
>
>  public static void main(String... args) throws Exception {
>    startBroker();
>    startCamel();
>    Thread.sleep(Long.MAX_VALUE);
>  }
>
>  private static void startBroker() throws Exception {
>    BrokerService brokerService = new BrokerService();
>    brokerService.setBrokerName(BROKER_NAME);
>    brokerService.setSchedulerSupport(false);
>    brokerService.setPersistent(false);
>    brokerService.addConnector("tcp://0.0.0.0:7001");
>    brokerService.start();
>    brokerService.waitUntilStarted();
>  }
>
>
>  private static void startCamel() throws Exception {
>    CamelContext context = new DefaultCamelContext();
>
>    ActiveMQComponent activemqComponent =
> ActiveMQComponent.activeMQComponent();
>    activemqComponent.setBrokerURL(String.format("vm://%s?create=false",
> BROKER_NAME));
>    context.addComponent("activemq", activemqComponent);
>
>    final String receiveUrl = "activemq:queue:dest";
>    context.addRoutes(new RouteBuilder() {
>      @Override
>      public void configure() throws Exception {
>        from(receiveUrl).process(new Processor() {
>          @Override
>          public void process(Exchange exchange) throws Exception {
>            System.err.println("received request");
>            exchange.getOut().setBody("reply");
>          }
>        });
>      }
>    });
>
>    context.start();
>    System.err.println("listening on url: " + receiveUrl);
>  }
>
> }
>
>
>
>
>



-- 
Claus Ibsen
-----------------
FuseSource
Email: cib...@fusesource.com
Web: http://fusesource.com
Twitter: davsclaus, fusenews
Blog: http://davsclaus.blogspot.com/
Author of Camel in Action: http://www.manning.com/ibsen/

Reply via email to