Hi All,
I need urgent help here.
I have an activemq queue from which my camel route receives the events. This
activemq queue is pre filled with almost 90K events. When I start the route
to receive the events I see that around 20K events are picked up ( through
jmx I see that around 6K in DEQUEUE COUNT and around 14 K events in
InFlightCount). When all the messages from InFlightCount get processed reach
0 and thereby DEQUEUE COUNT becomes same as DISPATCH COUNT, the route stops
there. There are 70K events still in the queue and they do not get picked up
after this processing. Why is this so ?

Following is my camel-context .xml:
------------------------------------------------------------------------
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans";
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
xmlns:broker="http://activemq.apache.org/schema/core";
        xmlns:camel="http://camel.apache.org/schema/spring";
        xsi:schemaLocation="http://camel.apache.org/schema/spring
http://camel.apache.org/schema/spring/camel-spring.xsd
        http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
        http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core-5.5.0.xsd";>
        <bean id="throttlePolicy"
class="org.apache.camel.impl.ThrottlingInflightRoutePolicy">
                <property name="maxInflightExchanges" value="1000" />
                <property name="resumePercentOfMax" value="50"/>
                <property name="loggingLevel" value="WARN"/>
        </bean>
        <camel:camelContext xmlns="http://camel.apache.org/schema/spring";
                id="camelContext">
                <camel:propertyPlaceholder id="camelApplicationProperties"
                        location="classpath:application.properties" />
                <camel:template id="template" />
                <camel:threadPoolProfile id="notification-thread-pool"
                        defaultProfile="true"
maxPoolSize="{{camel.threadpool.profile.maxpoolsize}}"
                        poolSize="{{camel.threadpool.profile.poolsize}}"
rejectedPolicy="CallerRuns" />
                
                <camel:endpoint id="myroute1"
                       
uri="jms:queue:queue1?concurrentConsumers=10&amp;asyncConsumer=true&amp;maxConcurrentConsumers=100&amp;consumer.prefetchSize=0&amp;cacheLevelName=CACHE_NONE"
/>
                <camel:route id="queueRoute"
routePolicyRef="throttlePolicy">
                        <camel:from ref="myroute1" />
                        <camel:log loggingLevel="DEBUG"
logName="com.services.routes"
                                message="In route" />
                        <camel:choice>…..</camel:choice>
</camel:route>
        <bean id="pooledAmqConnectionFactory"
class="org.apache.activemq.pool.PooledConnectionFactory"
                init-method="start" destroy-method="stop">
                <property name="maxConnections" value="500" />
                <property name="connectionFactory"
ref="amqJmsConnectionFactory" />
        </bean>

        <bean id="jms"
class="org.apache.activemq.camel.component.ActiveMQComponent">
                <property name="connectionFactory"
ref="pooledAmqConnectionFactory" />
        </bean>

        <broker:broker useJmx="true" persistent="true"
                brokerName="tca-mq-broker"
tmpDataDirectory="${correlation.engine.home}/tmp">
                <broker:persistenceAdapter>
                        <broker:amqPersistenceAdapter
directory="${correlation.engine.home}/data"
                                maxFileLength="32mb" cleanupInterval="60000"
/>
                </broker:persistenceAdapter>
                <broker:destinationPolicy>
                        <broker:policyMap>
                                <broker:policyEntries>
                                        <broker:policyEntry queue=">"
                                                producerFlowControl="false"
/>
                                </broker:policyEntries>
                        </broker:policyMap>
                </broker:destinationPolicy>
                                <broker:transportConnectors>
                        

                        <broker:transportConnector name="vm"
                                uri="vm://tca-mq-broker" />
                        <broker:transportConnector name="openwire"
                               
uri="tcp://0.0.0.0:${activemq.broker.tcp.port}" enableStatusMonitor="true"
/>
                </broker:transportConnectors>
        </broker:broker>
        <broker:connectionFactory id="amqJmsConnectionFactory"
brokerURL="vm://tca-mq-broker?create=false&amp;waitForStart=10000"/>
</beans>
----------------------------------------------------------------
                                                                                
                                                
What am I doing wrong here ? After looking at the posts I tried setting the
prefetch to 0 so that the consumer does polling for events, set
CONSUMER_CACHE to none, but still after picking up the first set of events
from the route the route just stops and does not pick up events anymore. 

The events get picked up again when I restart the java process (JVM). Again
the route starts and next set of around 20k events are getting picked up. I
don't understand how the number to pick up this event is also decided.

Following is my configuration:
OS: OEL 5.9
Activemq: persistent connection
Max number of consumer connections: 100 
camel is executed from java jvm.

Please help and provide me some insight and tip and on whats going wrong.

Thanks
Kavita


              




--
View this message in context: 
http://camel.465427.n5.nabble.com/Camel-route-does-not-resume-to-pick-the-events-from-the-pre-filled-activemq-queue-tp5746211.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Reply via email to