Hi,

I've got two AMQ brokers installed in cluster mode with KahaDB, in order to have an HA configuration. I have configured 2 queues (no topic) :

 * one that exchange 200 000 messages by day, message group is enabled,
   time to process a message goes between 10 to 60 seconds, it's done
   by 4 job with 15 threads each.
 * one others deal with 10 000 messages by day and there no group, it's
   consumed by 4 job with 5 thread each.

Consumers are multithreaded with PooledConnectionFactory.

All is working well but some time there messages that stuck. My consumers are paused and are doing nothing. I can see that, on the ActiveMq monitoring GUI :

 * Number of pending message is higher than zero
 * All consumer, Dispatched queue is setted to zero

Message can stuck 1 or 2 hours and get unblocked by theirself without explicit reason, if I restart brockers : it unblocks messages.

I checked AMQ documentation and lot of parameters and haven't found a solution. That's why I'm posting here a description of my issue. Is anybody have an idea of a something to change or something to monitor, in order to help me ?

The AMQ configuration is the following :

<beans
  xmlns="http://www.springframework.org/schema/beans";
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd   http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd";>

    <!-- Allows us to use system properties as variables in this configuration file -->     <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="locations">
<value>file:${activemq.conf}/credentials.properties</value>
        </property>
    </bean>

   <!-- Allows accessing the server log -->
    <bean id="logQuery" class="io.fabric8.insight.log.log4j.Log4jLogQuery"
          lazy-init="false" scope="singleton"
          init-method="start" destroy-method="stop">
    </bean>

    <!--
        The <broker> element is used to configure the ActiveMQ broker.
    -->
    <broker xmlns="http://activemq.apache.org/schema/core"; brokerName="energysoft" dataDirectory="${activemq.data}" useJmx="true">

        <networkConnectors>
          <networkConnector uri="static:(tcp://myOtherBrokerIP:61616)"
                            userName="mylogin"
                            password="mypassword"
                            prefetchSize="100"
                            duplex="true"
                            />
        </networkConnectors>

        <destinationPolicy>
            <policyMap>
              <policyEntries>
                    <policyEntry queue=">" maxPageSize="10000" maxBrowsePageSize="0" enableAudit="false" >
                  <networkBridgeFilterFactory>
                    <conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true"/>
                  </networkBridgeFilterFactory>
                  <messageGroupMapFactory>
                    <simpleMessageGroupMapFactory/>
                  </messageGroupMapFactory>
                </policyEntry>
                <policyEntry topic=">" >
                  <pendingMessageLimitStrategy>
                    <constantPendingMessageLimitStrategy limit="1000"/>
                  </pendingMessageLimitStrategy>
                </policyEntry>
              </policyEntries>
            </policyMap>
        </destinationPolicy>

        <plugins>
          <simpleAuthenticationPlugin>
              <users>
                  <authenticationUser username="mylogin" password="mypassword" groups="users,admins"/>
              </users>
          </simpleAuthenticationPlugin>
        </plugins>

        <managementContext>
            <managementContext createConnector="false"/>
        </managementContext>


        <persistenceAdapter>
            <kahaDB directory="${activemq.data}/kahadb" journalMaxFileLength="32mb"/>
        </persistenceAdapter>

          <systemUsage>
            <systemUsage>
                <memoryUsage>
                    <memoryUsage percentOfJvmHeap="70" />
                </memoryUsage>
                <storeUsage>
                    <storeUsage limit="100 gb"/>
                </storeUsage>
                <tempUsage>
                    <tempUsage limit="50 gb"/>
                </tempUsage>
            </systemUsage>
        </systemUsage>

        <transportConnectors>
            <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->             <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600" />             <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>             <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>             <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>             <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <!-- cluster port -->
            <!-- <transportConnector uri="tcp://localhost:62002"/> -->
        </transportConnectors>

        <!-- destroy the spring context on shutdown to stop jetty -->
        <shutdownHooks>
            <bean xmlns="http://www.springframework.org/schema/beans"; class="org.apache.activemq.hooks.SpringContextHook" />
        </shutdownHooks>
    </broker>

    <import resource="jetty.xml"/>

</beans>

Reply via email to