Hi Jérôme, You are using a network of brokers.
Did you try to decreaseNetworkPriority on the network connector ? By default, messages will be forwarded to another broker only if there's at least an active consumer. Don't you have slow consumers identified ? No DLQ ? Regards JB On 23/01/2020 11:19, Jérôme Barotin wrote: > Hi, > > I've got two AMQ brokers installed in cluster mode with KahaDB, in order > to have an HA configuration. I have configured 2 queues (no topic) : > > * one that exchange 200 000 messages by day, message group is enabled, > time to process a message goes between 10 to 60 seconds, it's done > by 4 job with 15 threads each. > * one others deal with 10 000 messages by day and there no group, it's > consumed by 4 job with 5 thread each. > > Consumers are multithreaded with PooledConnectionFactory. > > All is working well but some time there messages that stuck. My > consumers are paused and are doing nothing. I can see that, on the > ActiveMq monitoring GUI : > > * Number of pending message is higher than zero > * All consumer, Dispatched queue is setted to zero > > Message can stuck 1 or 2 hours and get unblocked by theirself without > explicit reason, if I restart brockers : it unblocks messages. > > I checked AMQ documentation and lot of parameters and haven't found a > solution. That's why I'm posting here a description of my issue. Is > anybody have an idea of a something to change or something to monitor, > in order to help me ? > > The AMQ configuration is the following : > > <beans > xmlns="http://www.springframework.org/schema/beans" > xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" > xsi:schemaLocation="http://www.springframework.org/schema/beans > http://www.springframework.org/schema/beans/spring-beans.xsd > http://activemq.apache.org/schema/core > http://activemq.apache.org/schema/core/activemq-core.xsd"> > > <!-- Allows us to use system properties as variables in this > configuration file --> > <bean > class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> > > <property name="locations"> > <value>file:${activemq.conf}/credentials.properties</value> > </property> > </bean> > > <!-- Allows accessing the server log --> > <bean id="logQuery" class="io.fabric8.insight.log.log4j.Log4jLogQuery" > lazy-init="false" scope="singleton" > init-method="start" destroy-method="stop"> > </bean> > > <!-- > The <broker> element is used to configure the ActiveMQ broker. > --> > <broker xmlns="http://activemq.apache.org/schema/core" > brokerName="energysoft" dataDirectory="${activemq.data}" useJmx="true"> > > <networkConnectors> > <networkConnector uri="static:(tcp://myOtherBrokerIP:61616)" > userName="mylogin" > password="mypassword" > prefetchSize="100" > duplex="true" > /> > </networkConnectors> > > <destinationPolicy> > <policyMap> > <policyEntries> > <policyEntry queue=">" maxPageSize="10000" > maxBrowsePageSize="0" enableAudit="false" > > <networkBridgeFilterFactory> > <conditionalNetworkBridgeFilterFactory > replayWhenNoConsumers="true"/> > </networkBridgeFilterFactory> > <messageGroupMapFactory> > <simpleMessageGroupMapFactory/> > </messageGroupMapFactory> > </policyEntry> > <policyEntry topic=">" > > <pendingMessageLimitStrategy> > <constantPendingMessageLimitStrategy limit="1000"/> > </pendingMessageLimitStrategy> > </policyEntry> > </policyEntries> > </policyMap> > </destinationPolicy> > > <plugins> > <simpleAuthenticationPlugin> > <users> > <authenticationUser username="mylogin" > password="mypassword" groups="users,admins"/> > </users> > </simpleAuthenticationPlugin> > </plugins> > > <managementContext> > <managementContext createConnector="false"/> > </managementContext> > > > <persistenceAdapter> > <kahaDB directory="${activemq.data}/kahadb" > journalMaxFileLength="32mb"/> > </persistenceAdapter> > > <systemUsage> > <systemUsage> > <memoryUsage> > <memoryUsage percentOfJvmHeap="70" /> > </memoryUsage> > <storeUsage> > <storeUsage limit="100 gb"/> > </storeUsage> > <tempUsage> > <tempUsage limit="50 gb"/> > </tempUsage> > </systemUsage> > </systemUsage> > > <transportConnectors> > <!-- DOS protection, limit concurrent connections to 1000 > and frame size to 100MB --> > <transportConnector name="openwire" > uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600" > /> > <transportConnector name="amqp" > uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> > > <transportConnector name="stomp" > uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> > > <transportConnector name="mqtt" > uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> > > <transportConnector name="ws" > uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> > > <!-- cluster port --> > <!-- <transportConnector uri="tcp://localhost:62002"/> --> > </transportConnectors> > > <!-- destroy the spring context on shutdown to stop jetty --> > <shutdownHooks> > <bean xmlns="http://www.springframework.org/schema/beans" > class="org.apache.activemq.hooks.SpringContextHook" /> > </shutdownHooks> > </broker> > > <import resource="jetty.xml"/> > > </beans> > > -- Jean-Baptiste Onofré jbono...@apache.org http://blog.nanthrax.net Talend - http://www.talend.com