Hi Jérôme,

You are using a network of brokers.

Did you try to decreaseNetworkPriority on the network connector ?

By default, messages will be forwarded to another broker only if there's
at least an active consumer.

Don't you have slow consumers identified ?

No DLQ ?

Regards
JB

On 23/01/2020 11:19, Jérôme Barotin wrote:
> Hi,
> 
> I've got two AMQ brokers installed in cluster mode with KahaDB, in order
> to have an HA configuration. I have configured 2 queues (no topic) :
> 
>  * one that exchange 200 000 messages by day, message group is enabled,
>    time to process a message goes between 10 to 60 seconds, it's done
>    by 4 job with 15 threads each.
>  * one others deal with 10 000 messages by day and there no group, it's
>    consumed by 4 job with 5 thread each.
> 
> Consumers are multithreaded with PooledConnectionFactory.
> 
> All is working well but some time there messages that stuck. My
> consumers are paused and are doing nothing. I can see that, on the
> ActiveMq monitoring GUI :
> 
>  * Number of pending message is higher than zero
>  * All consumer, Dispatched queue is setted to zero
> 
> Message can stuck 1 or 2 hours and get unblocked by theirself without
> explicit reason, if I restart brockers : it unblocks messages.
> 
> I checked AMQ documentation and lot of parameters and haven't found a
> solution. That's why I'm posting here a description of my issue. Is
> anybody have an idea of a something to change or something to monitor,
> in order to help me ?
> 
> The AMQ configuration is the following :
> 
> <beans
>   xmlns="http://www.springframework.org/schema/beans";
>   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
>   xsi:schemaLocation="http://www.springframework.org/schema/beans
> http://www.springframework.org/schema/beans/spring-beans.xsd
>   http://activemq.apache.org/schema/core
> http://activemq.apache.org/schema/core/activemq-core.xsd";>
> 
>     <!-- Allows us to use system properties as variables in this
> configuration file -->
>     <bean
> class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
> 
>         <property name="locations">
> <value>file:${activemq.conf}/credentials.properties</value>
>         </property>
>     </bean>
> 
>    <!-- Allows accessing the server log -->
>     <bean id="logQuery" class="io.fabric8.insight.log.log4j.Log4jLogQuery"
>           lazy-init="false" scope="singleton"
>           init-method="start" destroy-method="stop">
>     </bean>
> 
>     <!--
>         The <broker> element is used to configure the ActiveMQ broker.
>     -->
>     <broker xmlns="http://activemq.apache.org/schema/core";
> brokerName="energysoft" dataDirectory="${activemq.data}" useJmx="true">
> 
>         <networkConnectors>
>           <networkConnector uri="static:(tcp://myOtherBrokerIP:61616)"
>                             userName="mylogin"
>                             password="mypassword"
>                             prefetchSize="100"
>                             duplex="true"
>                             />
>         </networkConnectors>
> 
>         <destinationPolicy>
>             <policyMap>
>               <policyEntries>
>                     <policyEntry queue=">" maxPageSize="10000"
> maxBrowsePageSize="0" enableAudit="false" >
>                   <networkBridgeFilterFactory>
>                     <conditionalNetworkBridgeFilterFactory
> replayWhenNoConsumers="true"/>
>                   </networkBridgeFilterFactory>
>                   <messageGroupMapFactory>
>                     <simpleMessageGroupMapFactory/>
>                   </messageGroupMapFactory>
>                 </policyEntry>
>                 <policyEntry topic=">" >
>                   <pendingMessageLimitStrategy>
>                     <constantPendingMessageLimitStrategy limit="1000"/>
>                   </pendingMessageLimitStrategy>
>                 </policyEntry>
>               </policyEntries>
>             </policyMap>
>         </destinationPolicy>
> 
>         <plugins>
>           <simpleAuthenticationPlugin>
>               <users>
>                   <authenticationUser username="mylogin"
> password="mypassword" groups="users,admins"/>
>               </users>
>           </simpleAuthenticationPlugin>
>         </plugins>
> 
>         <managementContext>
>             <managementContext createConnector="false"/>
>         </managementContext>
> 
> 
>         <persistenceAdapter>
>             <kahaDB directory="${activemq.data}/kahadb"
> journalMaxFileLength="32mb"/>
>         </persistenceAdapter>
> 
>           <systemUsage>
>             <systemUsage>
>                 <memoryUsage>
>                     <memoryUsage percentOfJvmHeap="70" />
>                 </memoryUsage>
>                 <storeUsage>
>                     <storeUsage limit="100 gb"/>
>                 </storeUsage>
>                 <tempUsage>
>                     <tempUsage limit="50 gb"/>
>                 </tempUsage>
>             </systemUsage>
>         </systemUsage>
> 
>         <transportConnectors>
>             <!-- DOS protection, limit concurrent connections to 1000
> and frame size to 100MB -->
>             <transportConnector name="openwire"
> uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"
> />
>             <transportConnector name="amqp"
> uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
> 
>             <transportConnector name="stomp"
> uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
> 
>             <transportConnector name="mqtt"
> uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
> 
>             <transportConnector name="ws"
> uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
> 
>             <!-- cluster port -->
>             <!-- <transportConnector uri="tcp://localhost:62002"/> -->
>         </transportConnectors>
> 
>         <!-- destroy the spring context on shutdown to stop jetty -->
>         <shutdownHooks>
>             <bean xmlns="http://www.springframework.org/schema/beans";
> class="org.apache.activemq.hooks.SpringContextHook" />
>         </shutdownHooks>
>     </broker>
> 
>     <import resource="jetty.xml"/>
> 
> </beans>
> 
> 

-- 
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

Reply via email to