I am using *ActiveMQ 5.13.4* and I have set up a *network-of-brokers* with two brokers *A* and *B* that are connected via a *networkConnector*. Each broker has a consumer connected to it (*A' *and *B'*). *A'* and *B'* consume messages from the same *distributed queue* (e.g. `requests-queue`). Each consumer is a java application, represented by a Spring *DefaultJmsListenerContainerFactory* having the setting *concurrency: 5-10*. Basically, each consumer has max 10 threads that can consume simultaneously messages from the queue.
In broker configuration (`activemq.xml`) I have set the prefetch size to 1, assuming that there are not many messages and they need a lot of processing time. ``` <policyEntry queue=">" producerFlowControl="true" optimizedDispatch="true" queuePrefetch="1" enableAudit="false"> ``` The *desired behavior* is when I send 100 messages to `A` it starts processing 10 messages simultaneously (one for each consumer thread) and forwards another 10 to `B`. When `B` finishes processing the first 10 messages, it would get an extra 10. This would continue until all the messages are processed. The *actual behavior* is that when I send 100 messages to `A`, it sends 10 messages, one per each thread, to the local consumer `A'` and forwards the remaining 90 to `B'`. The consumers on `A'` finish processing the messages and they wait while 90 messages get processed on broker `B` by its consumers. *Question:* How can I configure the network in order to balance messages based on consumer availability? The full `activemq.xml` configuration for one of the brokers is the following. The other one has the same configuration only a different name and IP in the networkConnector. / <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="locations"> <value>file:${activemq.conf}/credentials.properties</value> </property> </bean> <bean id="logQuery" class="io.fabric8.insight.log.log4j.Log4jLogQuery" lazy-init="false" scope="singleton" init-method="start" destroy-method="stop"> </bean> <broker xmlns="http://activemq.apache.org/schema/core" brokerName="broker-jms-tux-01-qa-gnd" persistent="false" dataDirectory="${activemq.data}" cacheTempDestinations="false" advisorySupport="true" useJmx="true"> <destinationPolicy> <policyMap> <policyEntries> <policyEntry topic=">" producerFlowControl="true"> <pendingMessageLimitStrategy> <constantPendingMessageLimitStrategy limit="1000"/> </pendingMessageLimitStrategy> <pendingSubscriberPolicy> <vmCursor/> </pendingSubscriberPolicy> </policyEntry> <policyEntry queue=">" producerFlowControl="true" optimizedDispatch="true" queuePrefetch="1" enableAudit="false"> <networkBridgeFilterFactory> <conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true"/> </networkBridgeFilterFactory> <pendingQueuePolicy> <vmQueueCursor/> </pendingQueuePolicy> <deadLetterStrategy> <sharedDeadLetterStrategy expiration="300000"/> </deadLetterStrategy> </policyEntry> </policyEntries> </policyMap> </destinationPolicy> <managementContext> <managementContext createConnector="true" connectorPort="1099"/> </managementContext> <networkConnectors> <networkConnector name="broker-jms-tux-01-qa-gnd->broker-jms-tux-02-qa-gnd" uri="static:(tcp://10.83.16.22:61616)" conduitSubscriptions="false" dynamicOnly="true" networkTTL="2" alwaysSyncSend="true" decreaseNetworkConsumerPriority="true" duplex="false"/> </networkConnectors> <systemUsage> <systemUsage> <memoryUsage> <memoryUsage percentOfJvmHeap="80"/> </memoryUsage> <storeUsage> <storeUsage limit="10 gb"/> </storeUsage> <tempUsage> <tempUsage limit="4 gb"/> </tempUsage> </systemUsage> </systemUsage> <transportConnectors> <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=3200000000"/> </transportConnectors> <shutdownHooks> <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook"/> </shutdownHooks> </broker> <import resource="jetty.xml"/> </beans>/ Thanks you in advance! -- View this message in context: http://activemq.2283324.n4.nabble.com/ActiveMQ-network-of-brokers-messages-are-not-balanced-tp4718074.html Sent from the ActiveMQ - User mailing list archive at Nabble.com.