If you want to concurrently consume from ActiveMQ (or any JMS) with
Camel, just set the concurrentConsumers property on the URI (or
configure the component/endpoint).

See
http://camel.apache.org/jms.html

Note that concurrent consumers break order of messages on a queue. To
fix this using ActiveMQ you can use Message Groups
http://activemq.apache.org/message-groups.html

On 31 March 2010 16:03, sweenma <mark.swee...@verizonwireless.com> wrote:
>
> Hi,
> I am new to ActiveMQ and Q'ing in general, so I will do my best to explain
> the issue and ask for your patience. I followed the suggestions of many
> postings on how to get ActiveMQ to forward msg's to Websphere MQ using
> camel. I got it to work, however the amount of time it takes to forward the
> msg from ActiveMQ to Websphere MQ is very high, around 2 secs. I enabled the
> logging and I noticed that it appears that the communication between
> ActiveMQ and WebSphere MQ runs 1 thread at a time. For example, I have a
> simple multithreaded producer that puts msg's in to the ActiveMQ. ActiveMQ
> is configured to forward the msg's to WebSphere MQ and then I have a simple
> MDB listener to read the msg's from the WebSphere MQ. With logging enabled I
> can see a msg being processed by a thread and then another msg being
> processed by another thread after the first thread completes. Here is a
> samle of the log:
>
> 2010-03-31 08:32:44,743 [DefaultMessageListenerContainer-29]
> org.apache.activemq.ActiveMQMessageConsumer - ID:WNJ10002LDVSBAG
> -1559-1270038729745-2:0:1:1 received message: MessageDispatch {commandId =
> 0, responseRequired = false, consumerId =
> ID:WNJ10002LDVSBAG-1559-1270038729745-2:0:1:1,
>
> lots of other log entries with the same thread id
> DefaultMessageListenerContainer-29 followed by the next msg log entry:
> 2010-03-31 08:32:46,539 [DefaultMessageListenerContainer-30]
> org.apache.activemq.ActiveMQMessageConsumer - ID:WNJ10002LDVSBAG
> -1559-1270038729745-2:0:1:1 received message: MessageDispatch {commandId =
> 0, responseRequired = false, consumerId =
> ID:WNJ10002LDVSBAG-1559-1270038729745-2:0:1:1
>
> and so on for each msg that was put into the ActiveMQ queue. I noticed that
> the ID for each ActiveMQMessageConsumer is the same, however I am not sure
> if this is relavant or not.
> I am using activemq V5.2.0I with Java V1.5. I pasted a copy of the
> activemq.xml file below. I am really stuck here and would appreciate any
> help. TIA.
>
> <beans
>  xmlns="http://www.springframework.org/schema/beans";
>  xmlns:amq="http://activemq.apache.org/schema/core";
>  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
>  xsi:schemaLocation="http://www.springframework.org/schema/beans
> http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
>  http://activemq.apache.org/schema/core
> http://activemq.apache.org/schema/core/activemq-core.xsd
>  http://activemq.apache.org/camel/schema/spring
> http://activemq.apache.org/camel/schema/spring/camel-spring.xsd";>
>
>    <!-- Allows us to use system properties as variables in this
> configuration file -->
>    <bean
> class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
>         <property name="locations">
>
> <value>file:///${activemq.base}/conf/credentials.properties</value>
>         </property>
>    </bean>
>
>    <broker xmlns="http://activemq.apache.org/schema/core";
> brokerName="mylaptop" persistent="false"
> dataDirectory="${activemq.base}/data" useJmx="false">
>
>        <!-- Destination specific policies using destination names or
> wildcards -->
>        <destinationPolicy>
>            <policyMap>
>                <policyEntries>
>                    <policyEntry queue=">" memoryLimit="5mb"/>
>                    <policyEntry topic=">" memoryLimit="5mb">
>                      <!-- you can add other policies too such as these
>                        <dispatchPolicy>
>                            <strictOrderDispatchPolicy/>
>                        </dispatchPolicy>
>                        <subscriptionRecoveryPolicy>
>                            <lastImageSubscriptionRecoveryPolicy/>
>                        </subscriptionRecoveryPolicy>
>                      -->
>                    </policyEntry>
>                </policyEntries>
>            </policyMap>
>        </destinationPolicy>
>
>        <destinations>
>            <queue name="testRIMQ" physicalName="RIMQ" />
>        </destinations>
>
>        <!-- Use the following to configure how ActiveMQ is exposed in JMX
> -->
>        <managementContext>
>            <managementContext createConnector="false"/>
>        </managementContext>
>
>        <!-- The store and forward broker networks ActiveMQ will listen to
> -->
>        <networkConnectors>
>        </networkConnectors>
>
>        <persistenceAdapter>
>            <amqPersistenceAdapter syncOnWrite="false"
> directory="${activemq.base}/data" indexPageSize="16kb"
> maxFileLength="32mb"/>
>        </persistenceAdapter>
>
>
>        <sslContext>
>            <sslContext keyStore="file:${activemq.base}/conf/broker.ks"
> keyStorePassword="password"
> trustStore="file:${activemq.base}/conf/broker.ts"
> trustStorePassword="password"/>
>        </sslContext>
>
>        <!--  The maximum about of space the broker will use before slowing
> down producers -->
>        <systemUsage>
>            <systemUsage>
>                <memoryUsage>
>                    <memoryUsage limit="20 mb"/>
>                </memoryUsage>
>                <storeUsage>
>                    <storeUsage limit="1 gb" name="foo"/>
>                </storeUsage>
>                <tempUsage>
>                    <tempUsage limit="100 mb"/>
>                </tempUsage>
>            </systemUsage>
>        </systemUsage>
>
>
>        <!-- The transport connectors ActiveMQ will listen to -->
>        <transportConnectors>
>            <transportConnector name="openwire" uri="tcp://localhost:61616"
> />
>        </transportConnectors>
>
>    </broker>
>
>    <camelContext id="camel"
> xmlns="http://activemq.apache.org/camel/schema/spring";>
>        <route>
>            <from uri="activemq:RIMQ"/>
>            <to uri="webspheremq:queue:DVST.TEST.QUEUE"/>
>        </route>
>    </camelContext>
>
>
>
>    <bean id="remoteBroker"
> class="org.apache.activemq.camel.component.ActiveMQComponent" >
>        <property name="connectionFactory">
>          <bean class="org.apache.activemq.ActiveMQConnectionFactory">
>            <property name="brokerURL"
> value="failover:(tcp://host:port)?maxReconnectAttempts=0" />
>          </bean>
>        </property>
>    </bean>
>
>    <bean id="webspheremq"
> class="org.apache.camel.component.jms.JmsComponent">
>        <property name="configuration">
>            <bean class="org.apache.camel.component.jms.JmsConfiguration">
>                <property name="connectionFactory">
>                    <bean class="com.ibm.mq.jms.MQQueueConnectionFactory">
>                        <property name="hostName" value="...."/>
>                        <property name="port" value="1...."/>
>                        <property name="queueManager" value="Q1T2"/>
>                        <property name="channel"
> value="DVST.COMMON.SVRTST"/>
>                        <property name="transportType" value="1"/>
>                    </bean>
>                </property>
>            </bean>
>        </property>
>    </bean>
>
>    <!-- An embedded servlet engine for serving up the Admin console -->
>    <jetty xmlns="http://mortbay.com/schemas/jetty/1.0";>
>        <connectors>
>            <nioConnector port="8161"/>
>        </connectors>
>
>        <handlers>
>            <webAppContext contextPath="/admin"
> resourceBase="${activemq.base}/webapps/admin" logUrlOnStart="true"/>
>            <webAppContext contextPath="/demo"
> resourceBase="${activemq.base}/webapps/demo" logUrlOnStart="true"/>
>            <webAppContext contextPath="/fileserver"
> resourceBase="${activemq.base}/webapps/fileserver" logUrlOnStart="true"/>
>        </handlers>
>    </jetty>
>
>    <!--  This xbean configuration file supports all the standard spring xml
> configuration options -->
>
>
> </beans>
>
> --
> View this message in context: 
> http://old.nabble.com/ActiveMQ-to-Websphere-MQ-tp28097168p28097168.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>
>



-- 
James
-------
http://macstrac.blogspot.com/

Open Source Integration
http://fusesource.com/

Reply via email to