Hi

Also use option concurrentConsumers on the JMS endpoint to support
concurrency, instead of threads.


On Wed, Oct 21, 2009 at 2:58 PM, Claus Ibsen <[email protected]> wrote:
> Hi
>
> Maybe this ticket can give some hints
> https://issues.apache.org/activemq/browse/CAMEL-490
>
> On Wed, Oct 21, 2009 at 2:15 PM, Eric Bouer <[email protected]> wrote:
>>
>> Hello.
>> In order to optimize my InOut throughput I added explicit replyTo  on my JMS
>> endpoint.
>> This doesn't seems to be well behaving with camel threads and spring JMS
>> caching.
>> The following test case shows the problem.
>> remove the explicit replyTo and everything works fine.
>> your comments are welcome.
>>
>> ================
>>
>> import java.util.concurrent.ExecutorService;
>> import java.util.concurrent.Executors;
>> import java.util.concurrent.Callable;
>> import javax.jms.TextMessage;
>> import javax.jms.Destination;
>> import javax.jms.Message;
>> import javax.jms.Session;
>> import javax.jms.JMSException;
>>
>> import org.apache.activemq.ActiveMQConnectionFactory;
>> import org.apache.camel.Exchange;
>> import org.apache.camel.Processor;
>> import org.apache.camel.CamelContext;
>> import org.apache.camel.component.mock.MockEndpoint;
>> import org.apache.camel.builder.RouteBuilder;
>> import org.apache.activemq.camel.component.ActiveMQComponent;
>> import org.apache.camel.ExchangePattern;
>> import org.apache.camel.test.CamelTestSupport;
>> import org.springframework.jms.connection.CachingConnectionFactory;
>> import org.springframework.jms.core.JmsTemplate;
>> import org.springframework.jms.core.MessageCreator;
>> import static
>> org.apache.activemq.camel.component.ActiveMQComponent.activeMQComponent;
>>
>> /**
>>  * Unit test using a fixed replyTo specified on the JMS endpoint
>>  *
>>  * @version $Revision: 791824 $
>>  */
>> public class JmsJMSReplyToEndpointUsingInOutTest extends CamelTestSupport {
>>
>>    private ActiveMQComponent amq;
>>    private static String MQURI = "failover:(tcp://localhost:61616)";
>>   // private static String MQURI =
>> "vm://localhost?broker.persistent=false&broker.useJmx=false";
>>
>>    public class Replier implements Callable {
>>
>>       �...@override
>>                public Object call() throws Exception {
>>                log.info("replier started");
>>                JmsTemplate jms = new
>> JmsTemplate(amq.getConfiguration().getConnectionFactory());
>>
>>                final TextMessage msg = (TextMessage)
>> jms.receive("nameRequestor");
>>                assertEquals("What's your name", msg.getText());
>>
>>                // there should be a JMSReplyTo so we know where to send the
>> reply
>>                final Destination replyTo = msg.getJMSReplyTo();
>>
>>                // send reply
>>               // Thread.sleep(10000);
>>                jms.send(replyTo, new MessageCreator() {
>>                    public Message createMessage(Session session) throws
>> JMSException {
>>                        TextMessage replyMsg = session.createTextMessage();
>>                        replyMsg.setText("My name is Arnio");
>>
>> replyMsg.setJMSCorrelationID(msg.getJMSCorrelationID());
>>                        return replyMsg;
>>                    }
>>                });
>>                return null;
>>            }
>>    };
>>    public void testCustomJMSReplyToInOut() throws Exception {
>>
>>        MockEndpoint mock = getMockEndpoint("mock:result");
>>        mock.expectedBodiesReceived("My name is Arnio");
>>        mock.expectedMessageCount(10);
>>        // do not use Camel to send and receive to simulate a non Camel
>> client
>>        // use another thread to listen and send the reply
>>        ExecutorService newFixedThreadPool =
>> Executors.newFixedThreadPool(20);
>>
>>        for (int i = 0 ; i < 10 ; i++) {
>>          newFixedThreadPool.submit(new Replier());
>>        }
>>
>>        // now get started and send the first message that gets the ball
>> rolling
>>        JmsTemplate jms = new
>> JmsTemplate(amq.getConfiguration().getConnectionFactory());
>>        for (int i = 0 ; i < 10 ; i++) {
>>        jms.send("hello", new MessageCreator() {
>>            public Message createMessage(Session session) throws
>> JMSException {
>>                TextMessage msg = session.createTextMessage();
>>                msg.setText("Hello, I'm here");
>>                return msg;
>>            }
>>
>>        });
>>
>>        log.info("Hello sent");
>>        Thread.sleep(10);
>>        }
>>
>>        Thread.sleep(5000);
>>        assertMockEndpointsSatisfied();
>>    }
>>
>>    protected RouteBuilder createRouteBuilder() throws Exception {
>>        return new RouteBuilder() {
>>
>>            public void configure() throws Exception {
>>                from("activemq:queue:hello")
>>                   .threads()
>>                    .process(new Processor() {
>>                        public void process(Exchange exchange) throws
>> Exception {
>>
>>                            exchange.getOut().setBody("What's your name");
>>                        }
>>                    })
>>                    // use in out to get a reply as well
>>
>>                    .to(ExchangePattern.InOut,
>> "activemq:queue:nameRequestor?replyTo=queue:namedReplyQueue) // Remove the
>> replyTo and eveything works well
>>                    .to("direct:replyprocessor");
>>
>>                    from("direct:replyprocessor").process(new Processor() {
>>                            public void process(Exchange exchange) throws
>> Exception {
>>                                System.out.println("Here I am processing the
>> reply " + exchange.getIn().getBody());
>>                            }
>>                    }).to("mock:result");
>>            }
>>        };
>>    }
>>
>>    protected CamelContext createCamelContext() throws Exception {
>>         CamelContext camelContext = super.createCamelContext();
>>        amq = activeMQComponent(MQURI);
>>        ActiveMQConnectionFactory targetFactory = new
>> ActiveMQConnectionFactory(MQURI);
>>         log.info("Using MQ CachingConnectionFactory");
>>            CachingConnectionFactory cachedAMQConnectionFactory = new
>> CachingConnectionFactory(targetFactory);
>>
>>        camelContext.addComponent("activemq",
>> ActiveMQComponent.jmsComponent(cachedAMQConnectionFactory));
>>
>>        return camelContext;
>>    }
>>
>>
>> }
>>
>> ===========
>> --
>> View this message in context: 
>> http://www.nabble.com/explicitly-setting-replyTo-doesn%27t-scale-with-.threads%28%29-and-JMS-cache.-tp25991334p25991334.html
>> Sent from the Camel - Users (activemq) mailing list archive at Nabble.com.
>>
>>
>
>
>
> --
> Claus Ibsen
> Apache Camel Committer
>
> Open Source Integration: http://fusesource.com
> Blog: http://davsclaus.blogspot.com/
> Twitter: http://twitter.com/davsclaus
>



-- 
Claus Ibsen
Apache Camel Committer

Author of Camel in Action: http://www.manning.com/ibsen/
Open Source Integration: http://fusesource.com
Blog: http://davsclaus.blogspot.com/
Twitter: http://twitter.com/davsclaus

Reply via email to