I am hoping someone could point me in the right direction here. I am using the ActiveMQ camel component as a consumer of a Durable topic in activemq 5.5.1
I am currently working on producing a test case for my issue, but am hoping someone might be able to point me in the right direction. My scenario is as follows: 1) Create durable subscription in AMQ 2) Publish 30k messages to the topic and disconnect 3) Startup durable subscriber using camel and process the messages. Currently just a simple route from("activemq://topic...clientId=....dura..").to("log:output") What ends up happening is that camel processes about 3k messages at a rate of about 300msg/s, then starts to hang, only processing about 2msg/20seconds. What I notice in the JMX console is that ActiveMQ eventually sees that it is waiting for acknoledgments for 50 messages (which is the value of the prefetch size). Eventually the "SlowConsumer" attribute = "true". To me, it looks like the acknowledgements are being lost somehow or FlowControl is blocking me.. (even though the producer is offline) Things I have tried: - I have tried disabling flowControl and enabling fileDurableCursors. Cranking the memory to 3gb. - Adjusting prefetch size between 50-1000 - setting consumer.dispatchAsync=false - Restarting the camel component does nothing, it's only when you restart ActiveMQ which resets the "MessageContAwaitingAcknowledgement" to 0 does stuff start working again (temporarily). - Tried switching to clientAcknowledgeMode instead of automatic - If there is no message backlog and the consumer is listening while messages are being published, everything works fine.. Can anyone offer any other suggestions of things to try? I would really appreciate some help on this one. Does anyone have any ideas why ActiveMQ seems to be waiting for ack's? Is there a way to completely disable "slowConsumers"? Below is the configuration I am currently using, although I have tried many variations without success. Any help would be appreciated. Thanks. ################################## # activemq.xml ################################## <beans xmlns="http://www.springframework.org/schema/beans" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer" > <property name="locations"> <value>file:${activemq.base}/conf/credentials.properties</value> </property> </bean> <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.base}/data" useJmx="true" advisorySupport="true"> <destinationPolicy> <policyMap> <policyEntries> <policyEntry topic=">" producerFlowControl="false" memoryLimit="4096mb"> <pendingDurableSubscriberPolicy> <fileDurableSubscriberCursor/> </pendingDurableSubscriberPolicy> </policyEntry> <policyEntry queue=">" producerFlowControl="false" memoryLimit="1024mb"> <pendingQueuePolicy> <fileQueueCursor/> </pendingQueuePolicy> </policyEntry> </policyEntries> </policyMap> </destinationPolicy> <persistenceAdapter> <kahaDB directory="${activemq.base}/data/kahadb" enableJournalDiskSyncs="true" indexWriteBatchSize="10000" indexCacheSize="1000" journalMaxFileLength="1024mb"/> </persistenceAdapter> <systemUsage> <systemUsage sendFailIfNoSpaceAfterTimeout="60000"> <memoryUsage> <memoryUsage limit="2048 mb"/> </memoryUsage> <storeUsage> <storeUsage limit="20 gb"/> </storeUsage> <tempUsage> <tempUsage limit="5 gb"/> </tempUsage> </systemUsage> </systemUsage> <transportConnectors> <transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/> </transportConnectors> </broker> <import resource="jetty.xml"/> </beans> -- View this message in context: http://activemq.2283324.n4.nabble.com/Slow-hanging-subscriber-urgent-tp3948861p3948861.html Sent from the ActiveMQ - User mailing list archive at Nabble.com.