If you want to concurrently consume from ActiveMQ (or any JMS) with Camel, just set the concurrentConsumers property on the URI (or configure the component/endpoint).
See http://camel.apache.org/jms.html Note that concurrent consumers break order of messages on a queue. To fix this using ActiveMQ you can use Message Groups http://activemq.apache.org/message-groups.html On 31 March 2010 16:03, sweenma <mark.swee...@verizonwireless.com> wrote: > > Hi, > I am new to ActiveMQ and Q'ing in general, so I will do my best to explain > the issue and ask for your patience. I followed the suggestions of many > postings on how to get ActiveMQ to forward msg's to Websphere MQ using > camel. I got it to work, however the amount of time it takes to forward the > msg from ActiveMQ to Websphere MQ is very high, around 2 secs. I enabled the > logging and I noticed that it appears that the communication between > ActiveMQ and WebSphere MQ runs 1 thread at a time. For example, I have a > simple multithreaded producer that puts msg's in to the ActiveMQ. ActiveMQ > is configured to forward the msg's to WebSphere MQ and then I have a simple > MDB listener to read the msg's from the WebSphere MQ. With logging enabled I > can see a msg being processed by a thread and then another msg being > processed by another thread after the first thread completes. Here is a > samle of the log: > > 2010-03-31 08:32:44,743 [DefaultMessageListenerContainer-29] > org.apache.activemq.ActiveMQMessageConsumer - ID:WNJ10002LDVSBAG > -1559-1270038729745-2:0:1:1 received message: MessageDispatch {commandId = > 0, responseRequired = false, consumerId = > ID:WNJ10002LDVSBAG-1559-1270038729745-2:0:1:1, > > lots of other log entries with the same thread id > DefaultMessageListenerContainer-29 followed by the next msg log entry: > 2010-03-31 08:32:46,539 [DefaultMessageListenerContainer-30] > org.apache.activemq.ActiveMQMessageConsumer - ID:WNJ10002LDVSBAG > -1559-1270038729745-2:0:1:1 received message: MessageDispatch {commandId = > 0, responseRequired = false, consumerId = > ID:WNJ10002LDVSBAG-1559-1270038729745-2:0:1:1 > > and so on for each msg that was put into the ActiveMQ queue. I noticed that > the ID for each ActiveMQMessageConsumer is the same, however I am not sure > if this is relavant or not. > I am using activemq V5.2.0I with Java V1.5. I pasted a copy of the > activemq.xml file below. I am really stuck here and would appreciate any > help. TIA. > > <beans > xmlns="http://www.springframework.org/schema/beans" > xmlns:amq="http://activemq.apache.org/schema/core" > xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" > xsi:schemaLocation="http://www.springframework.org/schema/beans > http://www.springframework.org/schema/beans/spring-beans-2.0.xsd > http://activemq.apache.org/schema/core > http://activemq.apache.org/schema/core/activemq-core.xsd > http://activemq.apache.org/camel/schema/spring > http://activemq.apache.org/camel/schema/spring/camel-spring.xsd"> > > <!-- Allows us to use system properties as variables in this > configuration file --> > <bean > class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> > <property name="locations"> > > <value>file:///${activemq.base}/conf/credentials.properties</value> > </property> > </bean> > > <broker xmlns="http://activemq.apache.org/schema/core" > brokerName="mylaptop" persistent="false" > dataDirectory="${activemq.base}/data" useJmx="false"> > > <!-- Destination specific policies using destination names or > wildcards --> > <destinationPolicy> > <policyMap> > <policyEntries> > <policyEntry queue=">" memoryLimit="5mb"/> > <policyEntry topic=">" memoryLimit="5mb"> > <!-- you can add other policies too such as these > <dispatchPolicy> > <strictOrderDispatchPolicy/> > </dispatchPolicy> > <subscriptionRecoveryPolicy> > <lastImageSubscriptionRecoveryPolicy/> > </subscriptionRecoveryPolicy> > --> > </policyEntry> > </policyEntries> > </policyMap> > </destinationPolicy> > > <destinations> > <queue name="testRIMQ" physicalName="RIMQ" /> > </destinations> > > <!-- Use the following to configure how ActiveMQ is exposed in JMX > --> > <managementContext> > <managementContext createConnector="false"/> > </managementContext> > > <!-- The store and forward broker networks ActiveMQ will listen to > --> > <networkConnectors> > </networkConnectors> > > <persistenceAdapter> > <amqPersistenceAdapter syncOnWrite="false" > directory="${activemq.base}/data" indexPageSize="16kb" > maxFileLength="32mb"/> > </persistenceAdapter> > > > <sslContext> > <sslContext keyStore="file:${activemq.base}/conf/broker.ks" > keyStorePassword="password" > trustStore="file:${activemq.base}/conf/broker.ts" > trustStorePassword="password"/> > </sslContext> > > <!-- The maximum about of space the broker will use before slowing > down producers --> > <systemUsage> > <systemUsage> > <memoryUsage> > <memoryUsage limit="20 mb"/> > </memoryUsage> > <storeUsage> > <storeUsage limit="1 gb" name="foo"/> > </storeUsage> > <tempUsage> > <tempUsage limit="100 mb"/> > </tempUsage> > </systemUsage> > </systemUsage> > > > <!-- The transport connectors ActiveMQ will listen to --> > <transportConnectors> > <transportConnector name="openwire" uri="tcp://localhost:61616" > /> > </transportConnectors> > > </broker> > > <camelContext id="camel" > xmlns="http://activemq.apache.org/camel/schema/spring"> > <route> > <from uri="activemq:RIMQ"/> > <to uri="webspheremq:queue:DVST.TEST.QUEUE"/> > </route> > </camelContext> > > > > <bean id="remoteBroker" > class="org.apache.activemq.camel.component.ActiveMQComponent" > > <property name="connectionFactory"> > <bean class="org.apache.activemq.ActiveMQConnectionFactory"> > <property name="brokerURL" > value="failover:(tcp://host:port)?maxReconnectAttempts=0" /> > </bean> > </property> > </bean> > > <bean id="webspheremq" > class="org.apache.camel.component.jms.JmsComponent"> > <property name="configuration"> > <bean class="org.apache.camel.component.jms.JmsConfiguration"> > <property name="connectionFactory"> > <bean class="com.ibm.mq.jms.MQQueueConnectionFactory"> > <property name="hostName" value="...."/> > <property name="port" value="1...."/> > <property name="queueManager" value="Q1T2"/> > <property name="channel" > value="DVST.COMMON.SVRTST"/> > <property name="transportType" value="1"/> > </bean> > </property> > </bean> > </property> > </bean> > > <!-- An embedded servlet engine for serving up the Admin console --> > <jetty xmlns="http://mortbay.com/schemas/jetty/1.0"> > <connectors> > <nioConnector port="8161"/> > </connectors> > > <handlers> > <webAppContext contextPath="/admin" > resourceBase="${activemq.base}/webapps/admin" logUrlOnStart="true"/> > <webAppContext contextPath="/demo" > resourceBase="${activemq.base}/webapps/demo" logUrlOnStart="true"/> > <webAppContext contextPath="/fileserver" > resourceBase="${activemq.base}/webapps/fileserver" logUrlOnStart="true"/> > </handlers> > </jetty> > > <!-- This xbean configuration file supports all the standard spring xml > configuration options --> > > > </beans> > > -- > View this message in context: > http://old.nabble.com/ActiveMQ-to-Websphere-MQ-tp28097168p28097168.html > Sent from the ActiveMQ - User mailing list archive at Nabble.com. > > -- James ------- http://macstrac.blogspot.com/ Open Source Integration http://fusesource.com/