My question about DLQ is if you see some messages moved in DLQ (because they expired for instance).
Anyway, I would try first: 1. decrease network priority to consume locally and avoid use of the bridge 2. increase the prefetch on the bridge (the network connector) Regards JB On 23/01/2020 14:52, Jérôme Barotin wrote: > Hi Jean Baptiste and Tim , > > My response to your very interesting questions, below : > > @Jean Baptiste >> You are using a network of brokers. > Yes exactly. > >> Did you try decreaseNetworkPriority on the network connector ? > > Nope I'll try that immediatly, we'll see in few days if it . > >> By default, messages will be forwarded to another broker only if > there's at least an active consumer. > >> Don't you have slow consumers identified ? > I don't know, the only way to check that is to use JMX ? > >> By the way, what's the broker URL in the connection factory ? failover ? > Yep, I'm using failover url with url of both brocker. ( > failover:(tcp://broker1:61616,tcp://broker2:61616) ) > >> Are you using rebalance client ? > Nope, my topology is static, I think I don't need to do that. > >> No DLQ ? > What do you mean by this question, I have a DLQ, it's not the purpose > here, I think ?. > >> What's the prefetch on client side configuration ? > For the 200 000 messages a day queue, I set a prefetch size of 10 > For the 10 000 messages a day queue, I set a prefetch size of 1 > > @Tim > >> Do messages get stuck in both queues, or only one of them? > > Message stuck only in one queue, the other one works very well. > > To complete, in the second brocker, there's no networkConnectors part, > cause, of the duplex=true. > > > > Le 23/01/2020 à 13:42, Jean-Baptiste Onofré a écrit : > > >> > >> Regards >> JB >> >> On 23/01/2020 11:19, Jérôme Barotin wrote: >>> Hi, >>> >>> I've got two AMQ brokers installed in cluster mode with KahaDB, in order >>> to have an HA configuration. I have configured 2 queues (no topic) : >>> >>> * one that exchange 200 000 messages by day, message group is enabled, >>> time to process a message goes between 10 to 60 seconds, it's done >>> by 4 job with 15 threads each. >>> * one others deal with 10 000 messages by day and there no group, it's >>> consumed by 4 job with 5 thread each. >>> >>> Consumers are multithreaded with PooledConnectionFactory. >>> >>> All is working well but some time there messages that stuck. My >>> consumers are paused and are doing nothing. I can see that, on the >>> ActiveMq monitoring GUI : >>> >>> * Number of pending message is higher than zero >>> * All consumer, Dispatched queue is setted to zero >>> >>> Message can stuck 1 or 2 hours and get unblocked by theirself without >>> explicit reason, if I restart brockers : it unblocks messages. >>> >>> I checked AMQ documentation and lot of parameters and haven't found a >>> solution. That's why I'm posting here a description of my issue. Is >>> anybody have an idea of a something to change or something to monitor, >>> in order to help me ? >>> >>> The AMQ configuration is the following : >>> >>> <beans >>> xmlns="http://www.springframework.org/schema/beans" >>> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" >>> xsi:schemaLocation="http://www.springframework.org/schema/beans >>> http://www.springframework.org/schema/beans/spring-beans.xsd >>> http://activemq.apache.org/schema/core >>> http://activemq.apache.org/schema/core/activemq-core.xsd"> >>> >>> <!-- Allows us to use system properties as variables in this >>> configuration file --> >>> <bean >>> class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> >>> >>> >>> <property name="locations"> >>> <value>file:${activemq.conf}/credentials.properties</value> >>> </property> >>> </bean> >>> >>> <!-- Allows accessing the server log --> >>> <bean id="logQuery" >>> class="io.fabric8.insight.log.log4j.Log4jLogQuery" >>> lazy-init="false" scope="singleton" >>> init-method="start" destroy-method="stop"> >>> </bean> >>> >>> <!-- >>> The <broker> element is used to configure the ActiveMQ broker. >>> --> >>> <broker xmlns="http://activemq.apache.org/schema/core" >>> brokerName="energysoft" dataDirectory="${activemq.data}" useJmx="true"> >>> >>> <networkConnectors> >>> <networkConnector uri="static:(tcp://myOtherBrokerIP:61616)" >>> userName="mylogin" >>> password="mypassword" >>> prefetchSize="100" >>> duplex="true" >>> /> >>> </networkConnectors> >>> >>> <destinationPolicy> >>> <policyMap> >>> <policyEntries> >>> <policyEntry queue=">" maxPageSize="10000" >>> maxBrowsePageSize="0" enableAudit="false" > >>> <networkBridgeFilterFactory> >>> <conditionalNetworkBridgeFilterFactory >>> replayWhenNoConsumers="true"/> >>> </networkBridgeFilterFactory> >>> <messageGroupMapFactory> >>> <simpleMessageGroupMapFactory/> >>> </messageGroupMapFactory> >>> </policyEntry> >>> <policyEntry topic=">" > >>> <pendingMessageLimitStrategy> >>> <constantPendingMessageLimitStrategy limit="1000"/> >>> </pendingMessageLimitStrategy> >>> </policyEntry> >>> </policyEntries> >>> </policyMap> >>> </destinationPolicy> >>> >>> <plugins> >>> <simpleAuthenticationPlugin> >>> <users> >>> <authenticationUser username="mylogin" >>> password="mypassword" groups="users,admins"/> >>> </users> >>> </simpleAuthenticationPlugin> >>> </plugins> >>> >>> <managementContext> >>> <managementContext createConnector="false"/> >>> </managementContext> >>> >>> >>> <persistenceAdapter> >>> <kahaDB directory="${activemq.data}/kahadb" >>> journalMaxFileLength="32mb"/> >>> </persistenceAdapter> >>> >>> <systemUsage> >>> <systemUsage> >>> <memoryUsage> >>> <memoryUsage percentOfJvmHeap="70" /> >>> </memoryUsage> >>> <storeUsage> >>> <storeUsage limit="100 gb"/> >>> </storeUsage> >>> <tempUsage> >>> <tempUsage limit="50 gb"/> >>> </tempUsage> >>> </systemUsage> >>> </systemUsage> >>> >>> <transportConnectors> >>> <!-- DOS protection, limit concurrent connections to 1000 >>> and frame size to 100MB --> >>> <transportConnector name="openwire" >>> uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600" >>> >>> /> >>> <transportConnector name="amqp" >>> uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> >>> >>> >>> <transportConnector name="stomp" >>> uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> >>> >>> >>> <transportConnector name="mqtt" >>> uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> >>> >>> >>> <transportConnector name="ws" >>> uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> >>> >>> >>> <!-- cluster port --> >>> <!-- <transportConnector uri="tcp://localhost:62002"/> --> >>> </transportConnectors> >>> >>> <!-- destroy the spring context on shutdown to stop jetty --> >>> <shutdownHooks> >>> <bean xmlns="http://www.springframework.org/schema/beans" >>> class="org.apache.activemq.hooks.SpringContextHook" /> >>> </shutdownHooks> >>> </broker> >>> >>> <import resource="jetty.xml"/> >>> >>> </beans> >>> >>> > -- Jean-Baptiste Onofré jbono...@apache.org http://blog.nanthrax.net Talend - http://www.talend.com