I tried to reproduce your issue with the following test - without look
(using Camel 2.10.2 and ActiveMQ 5.7.0):
- 10 parallel producer
- each will send 200 messages
- 5 consumers total

public class CamelJmsSharedFixReplyQueueTest extends CamelSpringTestSupport
{

    private int senderCount = 10;

    @Test
    public void test() throws InterruptedException {
        final CountDownLatch latch = new CountDownLatch(senderCount);

        ExecutorService executor = Executors.newCachedThreadPool();
        for (int i = 0; i < senderCount; i++) {
            executor.execute(new Sender(context, latch));
        }

        assertTrue(latch.await(30, TimeUnit.SECONDS));
    }

    @Override
    protected AbstractApplicationContext createApplicationContext() {
        return new
ClassPathXmlApplicationContext("META-INF/spring/camel-jms-consumer.xml");
    }

    public static class Sender implements Runnable {

        private int messagePerSenderCount = 200;
        private CamelContext context;
        private CountDownLatch latch;

        public Sender(CamelContext context, CountDownLatch latch) {
            this.context = context;
            this.latch = latch;
        }

        @Override
        public void run() {
            ProducerTemplate template = context.createProducerTemplate();

template.setDefaultEndpointUri("activemq:ORDER_ENTRY?replyTo=ORDER_ENTRY.REPLY&receiveTimeout=200");

            for (int i = 0; i < messagePerSenderCount; i++) {
                Object response = template.requestBody("Camel");
                assertEquals("Hello Camel", response);
            }

            latch.countDown();
        }
    }
}

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans";
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
  xsi:schemaLocation="
       http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
       http://camel.apache.org/schema/spring
http://camel.apache.org/schema/spring/camel-spring.xsd";>

  <bean id="broker" class="org.apache.activemq.broker.BrokerService"
init-method="start" destroy-method="stop">
    <property name="persistent" value="false" />
    <property name="transportConnectorURIs">
      <list>
        <value>tcp://localhost:61616</value>
      </list>
    </property>
  </bean>

  <camelContext xmlns="http://camel.apache.org/schema/spring";>
    <jmxAgent id="agent" disabled="true"/>

    <route>
      <from uri="activemq:ORDER_ENTRY?concurrentConsumers=5" />
      <transform><simple>Hello ${body}</simple></transform>
    </route>
  </camelContext>

  <bean id="activemq"
class="org.apache.activemq.camel.component.ActiveMQComponent">
    <property name="connectionFactory" ref="connectionFactory" />
  </bean>

  <bean id="connectionFactory"
class="org.apache.activemq.pool.PooledConnectionFactory"
depends-on="broker">
    <property name="maxConnections" value="10" />
    <property name="maximumActive" value="500" />
    <property name="connectionFactory">
      <bean id="jmsConnectionFactory"
class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:61616" />
      </bean>
    </property>
  </bean>
</beans>

Can you tell me where your set up is different?

Best,
Christian


On Fri, Nov 16, 2012 at 10:50 AM, anaCortes <[email protected]> wrote:

> PooledConnectionFactory seems not to cache MessageConsumers. Then, the next
> configuration would be the same idea?
>
> PooledConnectionFactory == CachingConnectionFactory+cacheConsumers=false
>
>
>
> --
> View this message in context:
> http://camel.465427.n5.nabble.com/JMS-Shared-fixed-reply-queue-doesn-t-consume-all-the-messages-tp5721700p5722847.html
> Sent from the Camel - Users mailing list archive at Nabble.com.
>

Reply via email to