I am hoping someone could point me in the right direction here.

I am using the ActiveMQ camel component as a consumer of a Durable topic in
activemq 5.5.1

I am currently working on producing a test case for my issue, but am hoping
someone might be able to point me in the right direction.  My scenario is as
follows:

1) Create durable subscription in AMQ
2) Publish 30k messages to the topic and disconnect
3) Startup durable subscriber using camel and process the messages.  
Currently just a simple route
from("activemq://topic...clientId=....dura..").to("log:output")

What ends up happening is that camel processes about 3k messages at a rate
of about 300msg/s, then starts to hang, only processing about
2msg/20seconds.   What I notice in the JMX console is that ActiveMQ
eventually sees that it is waiting for acknoledgments for 50 messages (which
is the value of the prefetch size).   Eventually the "SlowConsumer"
attribute = "true".

To me, it looks like the acknowledgements are being lost somehow or
FlowControl is blocking me.. (even though the producer is offline)

Things I have tried:
- I have tried disabling flowControl and enabling fileDurableCursors.  
Cranking the memory to 3gb.
- Adjusting prefetch size between 50-1000
- setting consumer.dispatchAsync=false
- Restarting the camel component does nothing, it's only when you restart
ActiveMQ which resets the "MessageContAwaitingAcknowledgement" to 0 does
stuff start working again (temporarily).
- Tried switching to clientAcknowledgeMode instead of automatic
- If there is no message backlog and the consumer is listening while
messages are being published, everything works fine..

Can anyone offer any other suggestions of things to try?  I would really
appreciate some help on this one.  Does anyone have any ideas why ActiveMQ
seems to be waiting for ack's?  Is there a way to completely disable
"slowConsumers"?

Below is the configuration I am currently using, although I have tried many
variations without success.

Any help would be appreciated.

Thanks.

##################################
# activemq.xml
##################################
<beans
  xmlns="http://www.springframework.org/schema/beans";
  xmlns:amq="http://activemq.apache.org/schema/core";
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
  xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
  http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd";>

    
    <bean
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"
>
        <property name="locations">
            <value>file:${activemq.base}/conf/credentials.properties</value>
        </property>
    </bean>

    <broker xmlns="http://activemq.apache.org/schema/core";
brokerName="localhost" dataDirectory="${activemq.base}/data" useJmx="true"
advisorySupport="true">

        <destinationPolicy>
            <policyMap>
              <policyEntries>
                <policyEntry topic=">" producerFlowControl="false"
memoryLimit="4096mb">
                  
                 <pendingDurableSubscriberPolicy>
                   <fileDurableSubscriberCursor/>
                 </pendingDurableSubscriberPolicy>
                </policyEntry>
                <policyEntry queue=">" producerFlowControl="false"
memoryLimit="1024mb">
                  <pendingQueuePolicy>
                    <fileQueueCursor/>
                  </pendingQueuePolicy>
                </policyEntry>
              </policyEntries>
            </policyMap>
        </destinationPolicy>

        <persistenceAdapter>
            <kahaDB directory="${activemq.base}/data/kahadb"
enableJournalDiskSyncs="true" indexWriteBatchSize="10000"
indexCacheSize="1000" journalMaxFileLength="1024mb"/>
        </persistenceAdapter>

        <systemUsage>
            <systemUsage sendFailIfNoSpaceAfterTimeout="60000">
                <memoryUsage>
                    <memoryUsage limit="2048 mb"/>
                </memoryUsage>
                <storeUsage>
                    <storeUsage limit="20 gb"/>
                </storeUsage>
                <tempUsage>
                    <tempUsage limit="5 gb"/>
                </tempUsage>
            </systemUsage>
        </systemUsage>

        <transportConnectors>
            <transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/>
        </transportConnectors>

    </broker>

    <import resource="jetty.xml"/>

</beans>



--
View this message in context: 
http://activemq.2283324.n4.nabble.com/Slow-hanging-subscriber-urgent-tp3948861p3948861.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Reply via email to