
I've got two AMQ brokers installed in cluster mode with KahaDB, in order to have an HA configuration. I have configured 2 queues (no topic) :

 * one that exchange 200 000 messages by day, message group is enabled,
   time to process a message goes between 10 to 60 seconds, it's done
   by 4 job with 15 threads each.
 * one others deal with 10 000 messages by day and there no group, it's
   consumed by 4 job with 5 thread each.

Consumers are multithreaded with PooledConnectionFactory.

All is working well but some time there messages that stuck. My consumers are paused and are doing nothing. I can see that, on the ActiveMq monitoring GUI :

 * Number of pending message is higher than zero
 * All consumer, Dispatched queue is setted to zero

Message can stuck 1 or 2 hours and get unblocked by theirself without explicit reason, if I restart brockers : it unblocks messages.

I checked AMQ documentation and lot of parameters and haven't found a solution. That's why I'm posting here a description of my issue. Is anybody have an idea of a something to change or something to monitor, in order to help me ?

The AMQ configuration is the following :

  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd   http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd";>

    <!-- Allows us to use system properties as variables in this configuration file -->     <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="locations">

   <!-- Allows accessing the server log -->
    <bean id="logQuery" class="io.fabric8.insight.log.log4j.Log4jLogQuery"
          lazy-init="false" scope="singleton"
          init-method="start" destroy-method="stop">

        The <broker> element is used to configure the ActiveMQ broker.
    <broker xmlns="http://activemq.apache.org/schema/core"; brokerName="energysoft" dataDirectory="${activemq.data}" useJmx="true">

          <networkConnector uri="static:(tcp://myOtherBrokerIP:61616)"

                    <policyEntry queue=">" maxPageSize="10000" maxBrowsePageSize="0" enableAudit="false" >
                    <conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true"/>
                <policyEntry topic=">" >
                    <constantPendingMessageLimitStrategy limit="1000"/>

                  <authenticationUser username="mylogin" password="mypassword" groups="users,admins"/>

            <managementContext createConnector="false"/>

            <kahaDB directory="${activemq.data}/kahadb" journalMaxFileLength="32mb"/>

                    <memoryUsage percentOfJvmHeap="70" />
                    <storeUsage limit="100 gb"/>
                    <tempUsage limit="50 gb"/>

            <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->             <transportConnector name="openwire" uri="tcp://;wireFormat.maxFrameSize=104857600" />             <transportConnector name="amqp" uri="amqp://;wireFormat.maxFrameSize=104857600"/>             <transportConnector name="stomp" uri="stomp://;wireFormat.maxFrameSize=104857600"/>             <transportConnector name="mqtt" uri="mqtt://;wireFormat.maxFrameSize=104857600"/>             <transportConnector name="ws" uri="ws://;wireFormat.maxFrameSize=104857600"/>
            <!-- cluster port -->
            <!-- <transportConnector uri="tcp://localhost:62002"/> -->

        <!-- destroy the spring context on shutdown to stop jetty -->
            <bean xmlns="http://www.springframework.org/schema/beans"; class="org.apache.activemq.hooks.SpringContextHook" />

    <import resource="jetty.xml"/>


Reply via email to